risotto_lib/
lib.rs

1pub mod processor;
2pub mod state;
3pub mod state_store;
4pub mod update;
5
6use bgpkit_parser::parser::bmp::messages::BmpMessageBody;
7use bytes::Bytes;
8use core::net::IpAddr;
9use std::sync::mpsc::Sender;
10use tracing::{debug, error, info, trace};
11
12use crate::processor::{
13    decode_bmp_message, peer_down_notification, peer_up_notification, route_monitoring,
14};
15use crate::state::AsyncState;
16use crate::state_store::store::StateStore;
17use crate::update::{new_metadata, Update};
18
19pub async fn process_bmp_message<T: StateStore>(
20    state: Option<AsyncState<T>>,
21    tx: Sender<Update>,
22    router_addr: IpAddr,
23    router_port: u16,
24    bytes: &mut Bytes,
25) {
26    // Parse the BMP message
27    let message = match decode_bmp_message(bytes) {
28        Ok(message) => message,
29        Err(e) => {
30            error!("failed to decode BMP message: {}", e);
31            return;
32        }
33    };
34
35    trace!("[{}]:{} - {:?}", router_addr, router_port, message);
36
37    // Extract header and peer information
38    let metadata = new_metadata(router_addr, router_port, &message);
39
40    match message.message_body {
41        BmpMessageBody::InitiationMessage(body) => {
42            let tlvs_info = body
43                .tlvs
44                .iter()
45                .map(|tlv| tlv.info.clone())
46                .collect::<Vec<_>>();
47            info!(
48                "[{}]:{} - InitiationMessage: {:?}",
49                router_addr, router_port, tlvs_info
50            );
51            // No-Op
52        }
53        BmpMessageBody::PeerUpNotification(body) => {
54            trace!("{:?}", body);
55            if metadata.is_none() {
56                error!(
57                    "[{}]:{} - PeerUpNotification - no per-peer header",
58                    router_addr, router_port
59                );
60                return;
61            }
62            let metadata = metadata.unwrap();
63            info!(
64                "[{}]:{} - PeerUpNotification - {}",
65                metadata.router_addr, metadata.router_port, metadata.peer_addr
66            );
67            peer_up_notification(state, tx, metadata, body).await;
68        }
69        BmpMessageBody::RouteMonitoring(body) => {
70            debug!("{:?}", body);
71            if metadata.is_none() {
72                error!(
73                    "[{}]:{} - RouteMonitoring - no per-peer header",
74                    router_addr, router_port
75                );
76                return;
77            }
78            let metadata = metadata.unwrap();
79            route_monitoring(state, tx, metadata, body).await;
80        }
81        BmpMessageBody::RouteMirroring(_) => {
82            info!("[{}]:{} - RouteMirroring", router_addr, router_port)
83            // No-Op
84        }
85        BmpMessageBody::PeerDownNotification(body) => {
86            trace!("{:?}", body);
87            if metadata.is_none() {
88                error!(
89                    "[{}]:{} - RouteMonitoring - no per-peer header",
90                    router_addr, router_port
91                );
92                return;
93            }
94            let metadata = metadata.unwrap();
95            info!(
96                "[{}]:{} - PeerDownNotification: - {}",
97                metadata.router_addr, metadata.router_port, metadata.peer_addr
98            );
99            peer_down_notification(state, tx, metadata, body).await;
100        }
101
102        BmpMessageBody::TerminationMessage(_) => {
103            info!("[{}]:{} - TerminationMessage", router_addr, router_port)
104            // No-Op
105        }
106        BmpMessageBody::StatsReport(_) => {
107            info!("[{}]:{} - StatsReport", router_addr, router_port)
108            // No-Op
109        }
110    }
111}