risotto_lib/
lib.rs

1pub mod processor;
2pub mod state;
3pub mod state_store;
4pub mod statistics;
5pub mod update;
6
7use bgpkit_parser::parser::bmp::messages::BmpMessageBody;
8use bytes::Bytes;
9use core::net::IpAddr;
10use statistics::AsyncStatistics;
11use std::sync::mpsc::Sender;
12use tracing::{debug, error, info, trace};
13
14use crate::processor::{
15    decode_bmp_message, peer_down_notification, peer_up_notification, route_monitoring,
16};
17use crate::state::AsyncState;
18use crate::state_store::store::StateStore;
19use crate::update::{new_metadata, Update};
20
21pub async fn process_bmp_message<T: StateStore>(
22    state: Option<AsyncState<T>>,
23    statistics: AsyncStatistics,
24    tx: Sender<Update>,
25    router_addr: IpAddr,
26    router_port: u16,
27    bytes: &mut Bytes,
28) {
29    // Parse the BMP message
30    let message = match decode_bmp_message(bytes) {
31        Ok(message) => message,
32        Err(e) => {
33            error!("failed to decode BMP message: {}", e);
34            return;
35        }
36    };
37
38    let mut statistics_lock = statistics.lock().await;
39
40    trace!("[{}]:{} - {:?}", router_addr, router_port, message);
41    statistics_lock.rx_bmp_messages += 1;
42
43    // Extract header and peer information
44    let metadata = new_metadata(router_addr, router_port, &message);
45
46    match message.message_body {
47        BmpMessageBody::InitiationMessage(body) => {
48            trace!("{:?}", body);
49            let tlvs_info = body
50                .tlvs
51                .iter()
52                .map(|tlv| tlv.info.clone())
53                .collect::<Vec<_>>();
54            debug!(
55                "[{}]:{} - InitiationMessage: {:?}",
56                router_addr, router_port, tlvs_info
57            );
58            statistics_lock.rx_bmp_initiation += 1;
59            // No-Op
60        }
61        BmpMessageBody::PeerUpNotification(body) => {
62            trace!("{:?}", body);
63            if metadata.is_none() {
64                error!(
65                    "[{}]:{} - PeerUpNotification - no per-peer header",
66                    router_addr, router_port
67                );
68                return;
69            }
70            let metadata = metadata.unwrap();
71            debug!(
72                "[{}]:{} - PeerUpNotification - {}",
73                metadata.router_addr, metadata.router_port, metadata.peer_addr
74            );
75            statistics_lock.rx_bmp_peer_up += 1;
76            peer_up_notification(state, tx, metadata, body).await;
77        }
78        BmpMessageBody::RouteMonitoring(body) => {
79            debug!("{:?}", body);
80            if metadata.is_none() {
81                error!(
82                    "[{}]:{} - RouteMonitoring - no per-peer header",
83                    router_addr, router_port
84                );
85                return;
86            }
87            let metadata = metadata.unwrap();
88            statistics_lock.rx_bmp_route_monitoring += 1;
89            route_monitoring(state, tx, metadata, body).await;
90        }
91        BmpMessageBody::RouteMirroring(body) => {
92            trace!("{:?}", body);
93            debug!("[{}]:{} - RouteMirroring", router_addr, router_port);
94            statistics_lock.rx_bmp_route_mirroring += 1;
95            // No-Op
96        }
97        BmpMessageBody::PeerDownNotification(body) => {
98            trace!("{:?}", body);
99            if metadata.is_none() {
100                error!(
101                    "[{}]:{} - RouteMonitoring - no per-peer header",
102                    router_addr, router_port
103                );
104                return;
105            }
106            let metadata = metadata.unwrap();
107            debug!(
108                "[{}]:{} - PeerDownNotification: - {}",
109                metadata.router_addr, metadata.router_port, metadata.peer_addr
110            );
111            statistics_lock.rx_bmp_peer_down += 1;
112            peer_down_notification(state, tx, metadata, body).await;
113        }
114
115        BmpMessageBody::TerminationMessage(body) => {
116            trace!("{:?}", body);
117            info!("[{}]:{} - TerminationMessage", router_addr, router_port);
118            statistics_lock.rx_bmp_termination += 1;
119            // No-Op
120        }
121        BmpMessageBody::StatsReport(body) => {
122            trace!("{:?}", body);
123            info!("[{}]:{} - StatsReport", router_addr, router_port);
124            statistics_lock.rx_bmp_stats_report += 1;
125            // No-Op
126        }
127    };
128}