risotto_lib/
handler.rs

1use anyhow::Result;
2use bgpkit_parser::bmp::messages::{PerPeerFlags, RouteMonitoring};
3use bgpkit_parser::models::Peer;
4use bgpkit_parser::parse_bmp_msg;
5use bgpkit_parser::parser::bmp::messages::{BmpMessage, BmpMessageBody};
6use bytes::Bytes;
7use core::net::IpAddr;
8use std::sync::mpsc::Sender;
9use tracing::{error, info, trace};
10
11use crate::state::AsyncState;
12use crate::state::{peer_up_withdraws_handler, synthesize_withdraw_update};
13use crate::update::{decode_updates, format_update, UpdateHeader};
14
15pub fn decode_bmp_message(bytes: &mut Bytes) -> Result<BmpMessage> {
16    let message = match parse_bmp_msg(bytes) {
17        Ok(message) => message,
18        Err(_) => return Err(anyhow::anyhow!("failed to parse BMP message")),
19    };
20
21    Ok(message)
22}
23
24pub async fn process_peer_up_notification(
25    state: Option<AsyncState>,
26    tx: Sender<String>,
27    router_addr: IpAddr,
28    router_port: u16,
29    peer: Peer,
30) {
31    if let Some(spawn_state) = state {
32        let spawn_state = spawn_state.clone();
33        tokio::spawn(async move {
34            peer_up_withdraws_handler(spawn_state, router_addr, router_port, peer, tx).await;
35        });
36    }
37}
38
39pub async fn process_route_monitoring(
40    state: Option<AsyncState>,
41    tx: Sender<String>,
42    router_addr: IpAddr,
43    router_port: u16,
44    peer: Peer,
45    header: UpdateHeader,
46    body: RouteMonitoring,
47) {
48    let potential_updates = decode_updates(body, header).unwrap_or_default();
49
50    let mut legitimate_updates = Vec::new();
51    if let Some(state) = &state {
52        let mut state_lock = state.lock().unwrap();
53        for update in potential_updates {
54            let is_updated = state_lock.update(&router_addr, &peer, &update).unwrap();
55            if is_updated {
56                legitimate_updates.push(update);
57            }
58        }
59    } else {
60        legitimate_updates = potential_updates;
61    }
62
63    for mut update in legitimate_updates {
64        let update = format_update(router_addr, router_port, &peer, &mut update);
65        trace!("{:?}", update);
66
67        // Sent to the event pipeline
68        tx.send(update).unwrap();
69    }
70}
71
72pub async fn process_peer_down_notification(
73    state: Option<AsyncState>,
74    tx: Sender<String>,
75    router_addr: IpAddr,
76    router_port: u16,
77    peer: Peer,
78) {
79    if let Some(state) = state {
80        let mut state_lock = state.lock().unwrap();
81
82        // Remove the peer and the associated updates from the state
83        // We start by emiting synthetic withdraw updates
84        let mut synthetic_updates = Vec::new();
85        let updates = state_lock.get_updates_by_peer(&router_addr, &peer).unwrap();
86        for prefix in updates {
87            synthetic_updates.push(synthesize_withdraw_update(prefix.clone()));
88        }
89
90        // Then update the state
91        state_lock.remove_updates(&router_addr, &peer).unwrap();
92
93        for mut update in synthetic_updates {
94            let update = format_update(router_addr, router_port, &peer, &mut update);
95            trace!("{:?}", update);
96
97            // Send the synthetic updates to the event pipeline
98            tx.send(update).unwrap();
99        }
100    }
101}
102
103pub async fn process_bmp_message(
104    state: Option<AsyncState>,
105    tx: Sender<String>,
106    router_addr: IpAddr,
107    router_port: u16,
108    bytes: &mut Bytes,
109) {
110    // Parse the BMP message
111    let message = match decode_bmp_message(bytes) {
112        Ok(message) => message,
113        Err(e) => {
114            error!("failed to decode BMP message: {}", e);
115            return;
116        }
117    };
118
119    // Get peer information
120    let Some(pph) = message.per_peer_header else {
121        return;
122    };
123    let peer = Peer::new(pph.peer_bgp_id, pph.peer_ip, pph.peer_asn);
124
125    // Get header information
126    let timestamp = (pph.timestamp * 1000.0) as i64;
127
128    let is_post_policy = match pph.peer_flags {
129        PerPeerFlags::PeerFlags(flags) => flags.is_post_policy(),
130        PerPeerFlags::LocalRibPeerFlags(_) => false,
131    };
132
133    let is_adj_rib_out = match pph.peer_flags {
134        PerPeerFlags::PeerFlags(flags) => flags.is_adj_rib_out(),
135        PerPeerFlags::LocalRibPeerFlags(_) => false,
136    };
137
138    let header = UpdateHeader {
139        timestamp,
140        is_post_policy,
141        is_adj_rib_out,
142    };
143
144    match message.message_body {
145        BmpMessageBody::PeerUpNotification(body) => {
146            trace!("{:?}", body);
147            info!(
148                "PeerUpNotification: {} - {}",
149                router_addr, peer.peer_address
150            );
151
152            process_peer_up_notification(state, tx, router_addr, router_port, peer).await;
153        }
154        BmpMessageBody::RouteMonitoring(body) => {
155            trace!("{:?}", body);
156
157            process_route_monitoring(state, tx, router_addr, router_port, peer, header, body).await;
158        }
159        BmpMessageBody::PeerDownNotification(body) => {
160            trace!("{:?}", body);
161            info!(
162                "PeerDownNotification: {} - {}",
163                router_addr, peer.peer_address
164            );
165
166            process_peer_down_notification(state, tx, router_addr, router_port, peer).await;
167        }
168        _ => (),
169    }
170}