risotto_lib/
processor.rs

1use anyhow::Result;
2use bgpkit_parser::bmp::messages::{PeerDownNotification, PeerUpNotification, RouteMonitoring};
3use bgpkit_parser::parse_bmp_msg;
4use bgpkit_parser::parser::bmp::messages::BmpMessage;
5use bytes::Bytes;
6use metrics::{counter, gauge};
7use rand::Rng;
8use tokio::sync::mpsc::Sender;
9use tracing::{debug, error, trace};
10
11use crate::state::AsyncState;
12use crate::state::{peer_up_withdraws_handler, synthesize_withdraw_update};
13use crate::state_store::store::StateStore;
14use crate::update::{decode_updates, Update, UpdateMetadata};
15
16pub fn decode_bmp_message(bytes: &mut Bytes) -> Result<BmpMessage> {
17    let message = match parse_bmp_msg(bytes) {
18        Ok(message) => message,
19        Err(_) => return Err(anyhow::anyhow!("failed to parse BMP message")),
20    };
21
22    Ok(message)
23}
24
25pub async fn peer_up_notification<T: StateStore>(
26    state: Option<AsyncState<T>>,
27    tx: Sender<Update>,
28    metadata: UpdateMetadata,
29    _: PeerUpNotification,
30) -> Result<()> {
31    if let Some(state) = state {
32        let mut state_lock = state.lock().await;
33
34        // Add the peer to the state
35        state_lock
36            .add_peer(&metadata.router_socket.ip(), &metadata.peer_addr)
37            .unwrap();
38
39        gauge!(
40            "risotto_peer_established",
41            "router" => metadata.router_socket.ip().to_string(),
42            "peer" => metadata.peer_addr.to_string()
43        )
44        .set(1);
45
46        let spawn_state = state.clone();
47        let random = {
48            let mut rng = rand::rng();
49            rng.random_range(-60.0..60.0) as u64
50        };
51        let sleep_time = 300 + random; // 5 minutes +/- 1 minute
52        tokio::spawn(async move {
53            if let Err(e) = peer_up_withdraws_handler(spawn_state, tx, metadata, sleep_time).await {
54                error!("Error in peer_up_withdraws_handler: {}", e);
55            }
56        });
57    }
58
59    Ok(())
60}
61
62pub async fn route_monitoring<T: StateStore>(
63    state: Option<AsyncState<T>>,
64    tx: Sender<Update>,
65    metadata: UpdateMetadata,
66    body: RouteMonitoring,
67) -> Result<()> {
68    let potential_updates = decode_updates(body, metadata.clone()).unwrap_or_default();
69    counter!(
70        "risotto_rx_updates_total",
71        "router" => metadata.router_socket.ip().to_string(),
72        "peer" => metadata.peer_addr.to_string(),
73    )
74    .increment(potential_updates.len() as u64);
75
76    let mut legitimate_updates = Vec::new();
77    if let Some(state) = &state {
78        let mut state_lock = state.lock().await;
79        for update in potential_updates {
80            debug!("{}: {:?}", metadata.router_socket, update);
81            let is_updated = state_lock
82                .update(&metadata.router_socket.ip(), &metadata.peer_addr, &update)
83                .unwrap();
84            if is_updated {
85                legitimate_updates.push(update);
86            }
87        }
88    } else {
89        legitimate_updates = potential_updates;
90    }
91
92    counter!(
93        "risotto_tx_updates_total",
94        "router" => metadata.router_socket.ip().to_string(),
95        "peer" => metadata.peer_addr.to_string(),
96    )
97    .increment(legitimate_updates.len() as u64);
98
99    for update in legitimate_updates {
100        trace!("{:?}", update);
101
102        // Sent to the event pipeline
103        tx.send(update).await?;
104    }
105
106    Ok(())
107}
108
109pub async fn peer_down_notification<T: StateStore>(
110    state: Option<AsyncState<T>>,
111    tx: Sender<Update>,
112    metadata: UpdateMetadata,
113    _: PeerDownNotification,
114) -> Result<()> {
115    if let Some(state) = state {
116        // Remove the peer and the associated updates from the state
117        // We start by emiting synthetic withdraw updates
118        let mut state_lock = state.lock().await;
119
120        let mut synthetic_updates = Vec::new();
121        let updates = state_lock
122            .get_updates_by_peer(&metadata.router_socket.ip(), &metadata.peer_addr)
123            .unwrap();
124        for prefix in updates {
125            synthetic_updates.push(synthesize_withdraw_update(prefix.clone(), metadata.clone()));
126        }
127
128        // Remove the peer from the state
129        state_lock
130            .remove_peer(&metadata.router_socket.ip(), &metadata.peer_addr)
131            .unwrap();
132
133        gauge!(
134            "risotto_peer_established",
135            "router" => metadata.router_socket.ip().to_string(),
136            "peer" => metadata.peer_addr.to_string()
137        )
138        .set(0);
139
140        counter!(
141            "risotto_tx_updates_total",
142            "router" => metadata.router_socket.ip().to_string(),
143            "peer" => metadata.peer_addr.to_string(),
144        )
145        .increment(synthetic_updates.len() as u64);
146
147        for update in synthetic_updates {
148            trace!("{:?}", update);
149
150            // Send the synthetic updates to the event pipeline
151            tx.send(update).await?;
152        }
153    }
154
155    Ok(())
156}