1use anyhow::Result;
2use bgpkit_parser::bmp::messages::{PeerDownNotification, PeerUpNotification, RouteMonitoring};
3use bgpkit_parser::parse_bmp_msg;
4use bgpkit_parser::parser::bmp::messages::BmpMessage;
5use bytes::Bytes;
6use rand::Rng;
7use std::sync::mpsc::Sender;
8use tracing::trace;
9
10use crate::state::AsyncState;
11use crate::state::{peer_up_withdraws_handler, synthesize_withdraw_update};
12use crate::state_store::store::StateStore;
13use crate::update::{decode_updates, Update, UpdateMetadata};
14
15pub fn decode_bmp_message(bytes: &mut Bytes) -> Result<BmpMessage> {
16 let message = match parse_bmp_msg(bytes) {
17 Ok(message) => message,
18 Err(_) => return Err(anyhow::anyhow!("failed to parse BMP message")),
19 };
20
21 Ok(message)
22}
23
24pub async fn peer_up_notification<T: StateStore>(
25 state: Option<AsyncState<T>>,
26 tx: Sender<Update>,
27 metadata: UpdateMetadata,
28 _: PeerUpNotification,
29) {
30 if let Some(spawn_state) = state {
31 let spawn_state = spawn_state.clone();
32 let random = {
33 let mut rng = rand::rng();
34 rng.random_range(-60.0..60.0) as u64
35 };
36 let sleep_time = 300 + random; tokio::spawn(async move {
38 peer_up_withdraws_handler(spawn_state, tx, metadata, sleep_time).await;
39 });
40 }
41}
42
43pub async fn route_monitoring<T: StateStore>(
44 state: Option<AsyncState<T>>,
45 tx: Sender<Update>,
46 metadata: UpdateMetadata,
47 body: RouteMonitoring,
48) {
49 let potential_updates = decode_updates(body, metadata.clone()).unwrap_or_default();
50
51 let mut legitimate_updates = Vec::new();
52 if let Some(state) = &state {
53 let mut state_lock = state.lock().await;
54 for update in potential_updates {
55 let is_updated = state_lock
56 .update(&metadata.router_addr.clone(), &metadata.peer_addr, &update)
57 .unwrap();
58 if is_updated {
59 legitimate_updates.push(update);
60 }
61 }
62 } else {
63 legitimate_updates = potential_updates;
64 }
65
66 for update in legitimate_updates {
67 trace!("{:?}", update);
68
69 tx.send(update).unwrap();
71 }
72}
73
74pub async fn peer_down_notification<T: StateStore>(
75 state: Option<AsyncState<T>>,
76 tx: Sender<Update>,
77 metadata: UpdateMetadata,
78 _: PeerDownNotification,
79) {
80 if let Some(state) = state {
81 let mut state_lock = state.lock().await;
84
85 let mut synthetic_updates = Vec::new();
86 let updates = state_lock
87 .get_updates_by_peer(&metadata.router_addr, &metadata.peer_addr)
88 .unwrap();
89 for prefix in updates {
90 synthetic_updates.push(synthesize_withdraw_update(prefix.clone(), metadata.clone()));
91 }
92
93 state_lock
95 .remove_updates(&metadata.router_addr, &metadata.peer_addr)
96 .unwrap();
97
98 for update in synthetic_updates {
99 trace!("{:?}", update);
100
101 tx.send(update).unwrap();
103 }
104 }
105}