1use anyhow::Result;
2use bgpkit_parser::bmp::messages::{PeerDownNotification, PeerUpNotification, RouteMonitoring};
3use bgpkit_parser::parse_bmp_msg;
4use bgpkit_parser::parser::bmp::messages::BmpMessage;
5use bytes::Bytes;
6use std::sync::mpsc::Sender;
7use tracing::trace;
8
9use crate::state::AsyncState;
10use crate::state::{peer_up_withdraws_handler, synthesize_withdraw_update};
11use crate::update::{decode_updates, new_peer_from_metadata, Update, UpdateMetadata};
12
13pub fn decode_bmp_message(bytes: &mut Bytes) -> Result<BmpMessage> {
14 let message = match parse_bmp_msg(bytes) {
15 Ok(message) => message,
16 Err(_) => return Err(anyhow::anyhow!("failed to parse BMP message")),
17 };
18
19 Ok(message)
20}
21
22pub async fn peer_up_notification(
23 state: Option<AsyncState>,
24 tx: Sender<Update>,
25 metadata: UpdateMetadata,
26 _: PeerUpNotification,
27) {
28 if let Some(spawn_state) = state {
29 let spawn_state = spawn_state.clone();
30 tokio::spawn(async move {
31 peer_up_withdraws_handler(spawn_state, tx, metadata).await;
32 });
33 }
34}
35
36pub async fn route_monitoring(
37 state: Option<AsyncState>,
38 tx: Sender<Update>,
39 metadata: UpdateMetadata,
40 body: RouteMonitoring,
41) {
42 let potential_updates = decode_updates(body, metadata.clone()).unwrap_or_default();
43
44 let mut legitimate_updates = Vec::new();
45 if let Some(state) = &state {
46 let peer = new_peer_from_metadata(metadata.clone());
47 let mut state_lock = state.lock().unwrap();
48 for update in potential_updates {
49 let is_updated = state_lock
50 .update(&metadata.router_addr.clone(), &peer, &update)
51 .unwrap();
52 if is_updated {
53 legitimate_updates.push(update);
54 }
55 }
56 } else {
57 legitimate_updates = potential_updates;
58 }
59
60 for update in legitimate_updates {
61 trace!("{:?}", update);
62
63 tx.send(update).unwrap();
65 }
66}
67
68pub async fn peer_down_notification(
69 state: Option<AsyncState>,
70 tx: Sender<Update>,
71 metadata: UpdateMetadata,
72 _: PeerDownNotification,
73) {
74 if let Some(state) = state {
75 let peer = new_peer_from_metadata(metadata.clone());
79 let mut state_lock = state.lock().unwrap();
80
81 let mut synthetic_updates = Vec::new();
82 let updates = state_lock
83 .get_updates_by_peer(&metadata.router_addr, &peer)
84 .unwrap();
85 for prefix in updates {
86 synthetic_updates.push(synthesize_withdraw_update(prefix.clone(), metadata.clone()));
87 }
88
89 state_lock
91 .remove_updates(&metadata.router_addr, &peer)
92 .unwrap();
93
94 for update in synthetic_updates {
95 trace!("{:?}", update);
96
97 tx.send(update).unwrap();
99 }
100 }
101}