risotto_lib/
processor.rs

1use anyhow::Result;
2use bgpkit_parser::bmp::messages::{PeerDownNotification, PeerUpNotification, RouteMonitoring};
3use bgpkit_parser::parse_bmp_msg;
4use bgpkit_parser::parser::bmp::messages::BmpMessage;
5use bytes::Bytes;
6use metrics::{counter, gauge};
7use rand::Rng;
8use tokio::sync::mpsc::Sender;
9use tracing::{error, trace};
10
11use crate::state::AsyncState;
12use crate::state::{peer_up_withdraws_handler, process_updates, synthesize_withdraw_update};
13use crate::state_store::store::StateStore;
14use crate::update::{decode_updates, Update, UpdateMetadata};
15
16pub fn decode_bmp_message(bytes: &mut Bytes) -> Result<BmpMessage> {
17    let message = match parse_bmp_msg(bytes) {
18        Ok(message) => message,
19        Err(_) => return Err(anyhow::anyhow!("failed to parse BMP message")),
20    };
21
22    Ok(message)
23}
24
25pub async fn peer_up_notification<T: StateStore>(
26    state: Option<AsyncState<T>>,
27    tx: Sender<Update>,
28    metadata: UpdateMetadata,
29    _: PeerUpNotification,
30) -> Result<()> {
31    if let Some(state) = state {
32        let mut state_lock = state.lock().await;
33
34        // Add the peer to the state
35        state_lock
36            .add_peer(&metadata.router_socket.ip(), &metadata.peer_addr)
37            .unwrap();
38
39        gauge!(
40            "risotto_peer_established",
41            "router" => metadata.router_socket.ip().to_string(),
42            "peer" => metadata.peer_addr.to_string()
43        )
44        .set(1);
45
46        let spawn_state = state.clone();
47        let random = {
48            let mut rng = rand::rng();
49            rng.random_range(-60.0..60.0) as u64
50        };
51        let sleep_time = 300 + random; // 5 minutes +/- 1 minute
52        tokio::spawn(async move {
53            if let Err(e) = peer_up_withdraws_handler(spawn_state, tx, metadata, sleep_time).await {
54                error!("Error in peer_up_withdraws_handler: {}", e);
55            }
56        });
57    }
58
59    Ok(())
60}
61
62pub async fn route_monitoring<T: StateStore>(
63    state: Option<AsyncState<T>>,
64    tx: Sender<Update>,
65    metadata: UpdateMetadata,
66    body: RouteMonitoring,
67) -> Result<()> {
68    // Decode BMP RouteMonitoring message into Update structs
69    let updates = decode_updates(body, metadata.clone()).unwrap_or_default();
70
71    counter!(
72        "risotto_rx_updates_total",
73        "router" => metadata.router_socket.ip().to_string(),
74        "peer" => metadata.peer_addr.to_string(),
75    )
76    .increment(updates.len() as u64);
77
78    // Process updates through state machine
79    process_updates(state, tx, updates).await?;
80
81    Ok(())
82}
83
84pub async fn peer_down_notification<T: StateStore>(
85    state: Option<AsyncState<T>>,
86    tx: Sender<Update>,
87    metadata: UpdateMetadata,
88    _: PeerDownNotification,
89) -> Result<()> {
90    if let Some(state) = state {
91        // Remove the peer and the associated updates from the state
92        // We start by emiting synthetic withdraw updates
93        let mut state_lock = state.lock().await;
94
95        let mut synthetic_updates = Vec::new();
96        let updates = state_lock
97            .get_updates_by_peer(&metadata.router_socket.ip(), &metadata.peer_addr)
98            .unwrap();
99        for prefix in updates {
100            synthetic_updates.push(synthesize_withdraw_update(prefix.clone(), metadata.clone()));
101        }
102
103        // Remove the peer from the state
104        state_lock
105            .remove_peer(&metadata.router_socket.ip(), &metadata.peer_addr)
106            .unwrap();
107
108        gauge!(
109            "risotto_peer_established",
110            "router" => metadata.router_socket.ip().to_string(),
111            "peer" => metadata.peer_addr.to_string()
112        )
113        .set(0);
114
115        counter!(
116            "risotto_tx_updates_total",
117            "router" => metadata.router_socket.ip().to_string(),
118            "peer" => metadata.peer_addr.to_string(),
119        )
120        .increment(synthetic_updates.len() as u64);
121
122        for update in synthetic_updates {
123            trace!("{:?}", update);
124
125            // Send the synthetic updates to the event pipeline
126            tx.send(update).await?;
127        }
128    }
129
130    Ok(())
131}