risotto_lib/
processor.rs

1use anyhow::Result;
2use bgpkit_parser::bmp::messages::{PeerDownNotification, PeerUpNotification, RouteMonitoring};
3use bgpkit_parser::parse_bmp_msg;
4use bgpkit_parser::parser::bmp::messages::BmpMessage;
5use bytes::Bytes;
6use rand::Rng;
7use std::sync::mpsc::Sender;
8use tracing::trace;
9
10use crate::state::AsyncState;
11use crate::state::{peer_up_withdraws_handler, synthesize_withdraw_update};
12use crate::state_store::store::StateStore;
13use crate::update::{decode_updates, Update, UpdateMetadata};
14
15pub fn decode_bmp_message(bytes: &mut Bytes) -> Result<BmpMessage> {
16    let message = match parse_bmp_msg(bytes) {
17        Ok(message) => message,
18        Err(_) => return Err(anyhow::anyhow!("failed to parse BMP message")),
19    };
20
21    Ok(message)
22}
23
24pub async fn peer_up_notification<T: StateStore>(
25    state: Option<AsyncState<T>>,
26    tx: Sender<Update>,
27    metadata: UpdateMetadata,
28    _: PeerUpNotification,
29) {
30    if let Some(spawn_state) = state {
31        let spawn_state = spawn_state.clone();
32        let random = {
33            let mut rng = rand::rng();
34            rng.random_range(-60.0..60.0) as u64
35        };
36        let sleep_time = 300 + random; // 5 minutes +/- 1 minute
37        tokio::spawn(async move {
38            peer_up_withdraws_handler(spawn_state, tx, metadata, sleep_time).await;
39        });
40    }
41}
42
43pub async fn route_monitoring<T: StateStore>(
44    state: Option<AsyncState<T>>,
45    tx: Sender<Update>,
46    metadata: UpdateMetadata,
47    body: RouteMonitoring,
48) {
49    let potential_updates = decode_updates(body, metadata.clone()).unwrap_or_default();
50
51    let mut legitimate_updates = Vec::new();
52    if let Some(state) = &state {
53        let mut state_lock = state.lock().await;
54        for update in potential_updates {
55            let is_updated = state_lock
56                .update(&metadata.router_addr.clone(), &metadata.peer_addr, &update)
57                .unwrap();
58            if is_updated {
59                legitimate_updates.push(update);
60            }
61        }
62    } else {
63        legitimate_updates = potential_updates;
64    }
65
66    for update in legitimate_updates {
67        trace!("{:?}", update);
68
69        // Sent to the event pipeline
70        tx.send(update).unwrap();
71    }
72}
73
74pub async fn peer_down_notification<T: StateStore>(
75    state: Option<AsyncState<T>>,
76    tx: Sender<Update>,
77    metadata: UpdateMetadata,
78    _: PeerDownNotification,
79) {
80    if let Some(state) = state {
81        // Remove the peer and the associated updates from the state
82        // We start by emiting synthetic withdraw updates
83        let mut state_lock = state.lock().await;
84
85        let mut synthetic_updates = Vec::new();
86        let updates = state_lock
87            .get_updates_by_peer(&metadata.router_addr, &metadata.peer_addr)
88            .unwrap();
89        for prefix in updates {
90            synthetic_updates.push(synthesize_withdraw_update(prefix.clone(), metadata.clone()));
91        }
92
93        // Then update the state
94        state_lock
95            .remove_updates(&metadata.router_addr, &metadata.peer_addr)
96            .unwrap();
97
98        for update in synthetic_updates {
99            trace!("{:?}", update);
100
101            // Send the synthetic updates to the event pipeline
102            tx.send(update).unwrap();
103        }
104    }
105}