1use anyhow::Result;
2use bgpkit_parser::bmp::messages::{PeerDownNotification, PeerUpNotification, RouteMonitoring};
3use bgpkit_parser::parse_bmp_msg;
4use bgpkit_parser::parser::bmp::messages::BmpMessage;
5use bytes::Bytes;
6use metrics::{counter, gauge};
7use rand::Rng;
8use tokio::sync::mpsc::Sender;
9use tracing::{error, trace};
10
11use crate::state::AsyncState;
12use crate::state::{peer_up_withdraws_handler, process_updates, synthesize_withdraw_update};
13use crate::state_store::store::StateStore;
14use crate::update::{decode_updates, Update, UpdateMetadata};
15
16pub fn decode_bmp_message(bytes: &mut Bytes) -> Result<BmpMessage> {
17 let message = match parse_bmp_msg(bytes) {
18 Ok(message) => message,
19 Err(_) => return Err(anyhow::anyhow!("failed to parse BMP message")),
20 };
21
22 Ok(message)
23}
24
25pub async fn peer_up_notification<T: StateStore>(
26 state: Option<AsyncState<T>>,
27 tx: Sender<Update>,
28 metadata: UpdateMetadata,
29 _: PeerUpNotification,
30) -> Result<()> {
31 if let Some(state) = state {
32 let mut state_lock = state.lock().await;
33
34 state_lock
36 .add_peer(&metadata.router_socket.ip(), &metadata.peer_addr)
37 .unwrap();
38
39 gauge!(
40 "risotto_peer_established",
41 "router" => metadata.router_socket.ip().to_string(),
42 "peer" => metadata.peer_addr.to_string()
43 )
44 .set(1);
45
46 let spawn_state = state.clone();
47 let random = {
48 let mut rng = rand::rng();
49 rng.random_range(-60.0..60.0) as u64
50 };
51 let sleep_time = 300 + random; tokio::spawn(async move {
53 if let Err(e) = peer_up_withdraws_handler(spawn_state, tx, metadata, sleep_time).await {
54 error!("Error in peer_up_withdraws_handler: {}", e);
55 }
56 });
57 }
58
59 Ok(())
60}
61
62pub async fn route_monitoring<T: StateStore>(
63 state: Option<AsyncState<T>>,
64 tx: Sender<Update>,
65 metadata: UpdateMetadata,
66 body: RouteMonitoring,
67) -> Result<()> {
68 let updates = decode_updates(body, metadata.clone()).unwrap_or_default();
70
71 counter!(
72 "risotto_rx_updates_total",
73 "router" => metadata.router_socket.ip().to_string(),
74 "peer" => metadata.peer_addr.to_string(),
75 )
76 .increment(updates.len() as u64);
77
78 process_updates(state, tx, updates).await?;
80
81 Ok(())
82}
83
84pub async fn peer_down_notification<T: StateStore>(
85 state: Option<AsyncState<T>>,
86 tx: Sender<Update>,
87 metadata: UpdateMetadata,
88 _: PeerDownNotification,
89) -> Result<()> {
90 if let Some(state) = state {
91 let mut state_lock = state.lock().await;
94
95 let mut synthetic_updates = Vec::new();
96 let updates = state_lock
97 .get_updates_by_peer(&metadata.router_socket.ip(), &metadata.peer_addr)
98 .unwrap();
99 for prefix in updates {
100 synthetic_updates.push(synthesize_withdraw_update(prefix.clone(), metadata.clone()));
101 }
102
103 state_lock
105 .remove_peer(&metadata.router_socket.ip(), &metadata.peer_addr)
106 .unwrap();
107
108 gauge!(
109 "risotto_peer_established",
110 "router" => metadata.router_socket.ip().to_string(),
111 "peer" => metadata.peer_addr.to_string()
112 )
113 .set(0);
114
115 counter!(
116 "risotto_tx_updates_total",
117 "router" => metadata.router_socket.ip().to_string(),
118 "peer" => metadata.peer_addr.to_string(),
119 )
120 .increment(synthetic_updates.len() as u64);
121
122 for update in synthetic_updates {
123 trace!("{:?}", update);
124
125 tx.send(update).await?;
127 }
128 }
129
130 Ok(())
131}