risotto_lib/
lib.rs

1pub mod processor;
2pub mod state;
3pub mod state_store;
4pub mod update;
5
6use anyhow::Result;
7use bgpkit_parser::parser::bmp::messages::BmpMessageBody;
8use bytes::Bytes;
9use metrics::counter;
10use std::net::SocketAddr;
11use tokio::sync::mpsc::Sender;
12use tracing::{debug, trace};
13
14use crate::processor::{
15    decode_bmp_message, peer_down_notification, peer_up_notification, route_monitoring,
16};
17use crate::state::AsyncState;
18use crate::state_store::store::StateStore;
19use crate::update::{new_metadata, Update};
20
21pub async fn process_bmp_message<T: StateStore>(
22    state: Option<AsyncState<T>>,
23    tx: Sender<Update>,
24    socket: SocketAddr,
25    bytes: &mut Bytes,
26) -> Result<()> {
27    // Parse the BMP message
28    let message = match decode_bmp_message(bytes) {
29        Ok(message) => message,
30        Err(e) => {
31            anyhow::bail!("failed to decode BMP message: {}", e);
32        }
33    };
34
35    trace!("{} - {:?}", socket, message);
36
37    // Extract header and peer information
38    let metadata = new_metadata(socket.clone(), &message);
39
40    let metric_name = "risotto_bmp_messages_total";
41    match message.message_body {
42        BmpMessageBody::InitiationMessage(body) => {
43            trace!("{}: {:?}", socket.to_string(), body);
44            let tlvs_info = body
45                .tlvs
46                .iter()
47                .map(|tlv| tlv.info.clone())
48                .collect::<Vec<_>>();
49            debug!("{}: InitiationMessage: {:?}", socket.to_string(), tlvs_info);
50            counter!(metric_name, "router" =>  socket.ip().to_string(), "type" => "initiation")
51                .increment(1);
52            // No-Op
53        }
54        BmpMessageBody::PeerUpNotification(body) => {
55            trace!("{}: {:?}", socket.to_string(), body);
56            if metadata.is_none() {
57                anyhow::bail!(
58                    "{}: PeerUpNotification: no per-peer header",
59                    socket.to_string()
60                );
61            }
62            let metadata = metadata.unwrap();
63            debug!(
64                "{}: PeerUpNotification: {}",
65                socket.to_string(),
66                metadata.peer_addr
67            );
68            counter!(metric_name, "router" =>  socket.ip().to_string(), "type" => "peer_up_notification")
69                .increment(1);
70            peer_up_notification(state, tx, metadata, body).await?;
71        }
72        BmpMessageBody::RouteMonitoring(body) => {
73            trace!("{}: {:?}", socket.to_string(), body);
74            if metadata.is_none() {
75                anyhow::bail!(
76                    "{}: RouteMonitoring - no per-peer header",
77                    socket.to_string()
78                );
79            }
80            let metadata = metadata.unwrap();
81            // We do not process the message if the peer address is unspecified
82            // Most likely a local RIB update
83            if !metadata.peer_addr.is_unspecified() {
84                counter!(metric_name, "router" =>  socket.ip().to_string(), "type" => "route_monitoring")
85                .increment(1);
86                route_monitoring(state, tx, metadata, body).await?;
87            }
88        }
89        BmpMessageBody::RouteMirroring(body) => {
90            trace!("{}: {:?}", socket.to_string(), body);
91            debug!("{}: RouteMirroring", socket.to_string());
92            counter!(metric_name, "router" =>  socket.ip().to_string(), "type" => "route_mirroring")
93                .increment(1);
94            // No-Op
95        }
96        BmpMessageBody::PeerDownNotification(body) => {
97            trace!("{}: {:?}", socket.to_string(), body);
98            if metadata.is_none() {
99                anyhow::bail!(
100                    "{}: PeerDownNotification: no per-peer header",
101                    socket.to_string()
102                );
103            }
104            let metadata = metadata.unwrap();
105            debug!(
106                "{}: PeerDownNotification: {}. Reason: {:?}",
107                socket.to_string(),
108                metadata.peer_addr,
109                body.reason
110            );
111            counter!(metric_name, "router" =>  socket.ip().to_string(), "type" => "peer_down_notification")
112                .increment(1);
113            peer_down_notification(state, tx, metadata, body).await?;
114        }
115
116        BmpMessageBody::TerminationMessage(body) => {
117            trace!("{}: {:?}", socket.to_string(), body);
118            debug!("{}: TerminationMessage", socket.to_string());
119            counter!(metric_name, "router" =>  socket.ip().to_string(), "type" => "termination")
120                .increment(1);
121            // No-Op
122        }
123        BmpMessageBody::StatsReport(body) => {
124            trace!("{}: {:?}", socket.to_string(), body);
125            debug!("{}: StatsReport", socket.to_string());
126            counter!(metric_name, "router" =>  socket.ip().to_string(), "type" => "stats_report")
127                .increment(1);
128            // No-Op
129        }
130    };
131
132    Ok(())
133}