risotto_lib/
processor.rs

1use anyhow::Result;
2use bgpkit_parser::bmp::messages::{PeerDownNotification, PeerUpNotification, RouteMonitoring};
3use bgpkit_parser::parse_bmp_msg;
4use bgpkit_parser::parser::bmp::messages::BmpMessage;
5use bytes::Bytes;
6use std::sync::mpsc::Sender;
7use tracing::trace;
8
9use crate::state::AsyncState;
10use crate::state::{peer_up_withdraws_handler, synthesize_withdraw_update};
11use crate::update::{decode_updates, new_peer_from_metadata, Update, UpdateMetadata};
12
13pub fn decode_bmp_message(bytes: &mut Bytes) -> Result<BmpMessage> {
14    let message = match parse_bmp_msg(bytes) {
15        Ok(message) => message,
16        Err(_) => return Err(anyhow::anyhow!("failed to parse BMP message")),
17    };
18
19    Ok(message)
20}
21
22pub async fn peer_up_notification(
23    state: Option<AsyncState>,
24    tx: Sender<Update>,
25    metadata: UpdateMetadata,
26    _: PeerUpNotification,
27) {
28    if let Some(spawn_state) = state {
29        let spawn_state = spawn_state.clone();
30        tokio::spawn(async move {
31            peer_up_withdraws_handler(spawn_state, tx, metadata).await;
32        });
33    }
34}
35
36pub async fn route_monitoring(
37    state: Option<AsyncState>,
38    tx: Sender<Update>,
39    metadata: UpdateMetadata,
40    body: RouteMonitoring,
41) {
42    let potential_updates = decode_updates(body, metadata.clone()).unwrap_or_default();
43
44    let mut legitimate_updates = Vec::new();
45    if let Some(state) = &state {
46        let peer = new_peer_from_metadata(metadata.clone());
47        let mut state_lock = state.lock().unwrap();
48        for update in potential_updates {
49            let is_updated = state_lock
50                .update(&metadata.router_addr.clone(), &peer, &update)
51                .unwrap();
52            if is_updated {
53                legitimate_updates.push(update);
54            }
55        }
56    } else {
57        legitimate_updates = potential_updates;
58    }
59
60    for update in legitimate_updates {
61        trace!("{:?}", update);
62
63        // Sent to the event pipeline
64        tx.send(update).unwrap();
65    }
66}
67
68pub async fn peer_down_notification(
69    state: Option<AsyncState>,
70    tx: Sender<Update>,
71    metadata: UpdateMetadata,
72    _: PeerDownNotification,
73) {
74    if let Some(state) = state {
75        // Remove the peer and the associated updates from the state
76        // We start by emiting synthetic withdraw updates
77
78        let peer = new_peer_from_metadata(metadata.clone());
79        let mut state_lock = state.lock().unwrap();
80
81        let mut synthetic_updates = Vec::new();
82        let updates = state_lock
83            .get_updates_by_peer(&metadata.router_addr, &peer)
84            .unwrap();
85        for prefix in updates {
86            synthetic_updates.push(synthesize_withdraw_update(prefix.clone(), metadata.clone()));
87        }
88
89        // Then update the state
90        state_lock
91            .remove_updates(&metadata.router_addr, &peer)
92            .unwrap();
93
94        for update in synthetic_updates {
95            trace!("{:?}", update);
96
97            // Send the synthetic updates to the event pipeline
98            tx.send(update).unwrap();
99        }
100    }
101}