risotto_lib/
lib.rs

1pub mod processor;
2pub mod state;
3pub mod state_store;
4pub mod update;
5
6use anyhow::Result;
7use bgpkit_parser::parser::bmp::messages::BmpMessageBody;
8use bytes::Bytes;
9use metrics::counter;
10use std::net::SocketAddr;
11use tokio::sync::mpsc::Sender;
12use tracing::{debug, trace};
13
14use crate::processor::{
15    decode_bmp_message, peer_down_notification, peer_up_notification, route_monitoring,
16};
17use crate::state::AsyncState;
18use crate::state_store::store::StateStore;
19use crate::update::{new_metadata, Update};
20
21/// Process a BMP message from raw bytes
22/// This is a convenience wrapper for BMP collectors
23pub async fn process_bmp_message<T: StateStore>(
24    state: Option<AsyncState<T>>,
25    tx: Sender<Update>,
26    socket: SocketAddr,
27    bytes: &mut Bytes,
28) -> Result<()> {
29    // Parse the BMP message
30    let message = match decode_bmp_message(bytes) {
31        Ok(message) => message,
32        Err(e) => {
33            anyhow::bail!("failed to decode BMP message: {}", e);
34        }
35    };
36
37    trace!("{} - {:?}", socket, message);
38
39    // Extract header and peer information
40    let metadata = new_metadata(socket.clone(), &message);
41
42    let metric_name = "risotto_bmp_messages_total";
43    match message.message_body {
44        BmpMessageBody::InitiationMessage(body) => {
45            trace!("{}: {:?}", socket.to_string(), body);
46            let tlvs_info = body
47                .tlvs
48                .iter()
49                .map(|tlv| tlv.info.clone())
50                .collect::<Vec<_>>();
51            debug!("{}: InitiationMessage: {:?}", socket.to_string(), tlvs_info);
52            counter!(metric_name, "router" =>  socket.ip().to_string(), "type" => "initiation")
53                .increment(1);
54            // No-Op
55        }
56        BmpMessageBody::PeerUpNotification(body) => {
57            trace!("{}: {:?}", socket.to_string(), body);
58            if metadata.is_none() {
59                anyhow::bail!(
60                    "{}: PeerUpNotification: no per-peer header",
61                    socket.to_string()
62                );
63            }
64            let metadata = metadata.unwrap();
65            debug!(
66                "{}: PeerUpNotification: {}",
67                socket.to_string(),
68                metadata.peer_addr
69            );
70            counter!(metric_name, "router" =>  socket.ip().to_string(), "type" => "peer_up_notification")
71                .increment(1);
72            peer_up_notification(state, tx, metadata, body).await?;
73        }
74        BmpMessageBody::RouteMonitoring(body) => {
75            trace!("{}: {:?}", socket.to_string(), body);
76            if metadata.is_none() {
77                anyhow::bail!(
78                    "{}: RouteMonitoring - no per-peer header",
79                    socket.to_string()
80                );
81            }
82            let metadata = metadata.unwrap();
83            // We do not process the message if the peer address is unspecified
84            // Most likely a local RIB update
85            if !metadata.peer_addr.is_unspecified() {
86                counter!(metric_name, "router" =>  socket.ip().to_string(), "type" => "route_monitoring")
87                .increment(1);
88                route_monitoring(state, tx, metadata, body).await?;
89            }
90        }
91        BmpMessageBody::RouteMirroring(body) => {
92            trace!("{}: {:?}", socket.to_string(), body);
93            debug!("{}: RouteMirroring", socket.to_string());
94            counter!(metric_name, "router" =>  socket.ip().to_string(), "type" => "route_mirroring")
95                .increment(1);
96            // No-Op
97        }
98        BmpMessageBody::PeerDownNotification(body) => {
99            trace!("{}: {:?}", socket.to_string(), body);
100            if metadata.is_none() {
101                anyhow::bail!(
102                    "{}: PeerDownNotification: no per-peer header",
103                    socket.to_string()
104                );
105            }
106            let metadata = metadata.unwrap();
107            debug!(
108                "{}: PeerDownNotification: {}. Reason: {:?}",
109                socket.to_string(),
110                metadata.peer_addr,
111                body.reason
112            );
113            counter!(metric_name, "router" =>  socket.ip().to_string(), "type" => "peer_down_notification")
114                .increment(1);
115            peer_down_notification(state, tx, metadata, body).await?;
116        }
117
118        BmpMessageBody::TerminationMessage(body) => {
119            trace!("{}: {:?}", socket.to_string(), body);
120            debug!("{}: TerminationMessage", socket.to_string());
121            counter!(metric_name, "router" =>  socket.ip().to_string(), "type" => "termination")
122                .increment(1);
123            // No-Op
124        }
125        BmpMessageBody::StatsReport(body) => {
126            trace!("{}: {:?}", socket.to_string(), body);
127            debug!("{}: StatsReport", socket.to_string());
128            counter!(metric_name, "router" =>  socket.ip().to_string(), "type" => "stats_report")
129                .increment(1);
130            // No-Op
131        }
132    };
133
134    Ok(())
135}