1use anyhow::Result;
2use bgpkit_parser::bmp::messages::{PeerDownNotification, PeerUpNotification, RouteMonitoring};
3use bgpkit_parser::parse_bmp_msg;
4use bgpkit_parser::parser::bmp::messages::BmpMessage;
5use bytes::Bytes;
6use metrics::{counter, gauge};
7use rand::Rng;
8use tokio::sync::mpsc::Sender;
9use tracing::{debug, error, trace};
10
11use crate::state::AsyncState;
12use crate::state::{peer_up_withdraws_handler, synthesize_withdraw_update};
13use crate::state_store::store::StateStore;
14use crate::update::{decode_updates, Update, UpdateMetadata};
15
16pub fn decode_bmp_message(bytes: &mut Bytes) -> Result<BmpMessage> {
17 let message = match parse_bmp_msg(bytes) {
18 Ok(message) => message,
19 Err(_) => return Err(anyhow::anyhow!("failed to parse BMP message")),
20 };
21
22 Ok(message)
23}
24
25pub async fn peer_up_notification<T: StateStore>(
26 state: Option<AsyncState<T>>,
27 tx: Sender<Update>,
28 metadata: UpdateMetadata,
29 _: PeerUpNotification,
30) -> Result<()> {
31 if let Some(state) = state {
32 let mut state_lock = state.lock().await;
33
34 state_lock
36 .add_peer(&metadata.router_socket.ip(), &metadata.peer_addr)
37 .unwrap();
38
39 gauge!(
40 "risotto_peer_established",
41 "router" => metadata.router_socket.ip().to_string(),
42 "peer" => metadata.peer_addr.to_string()
43 )
44 .set(1);
45
46 let spawn_state = state.clone();
47 let random = {
48 let mut rng = rand::rng();
49 rng.random_range(-60.0..60.0) as u64
50 };
51 let sleep_time = 300 + random; tokio::spawn(async move {
53 if let Err(e) = peer_up_withdraws_handler(spawn_state, tx, metadata, sleep_time).await {
54 error!("Error in peer_up_withdraws_handler: {}", e);
55 }
56 });
57 }
58
59 Ok(())
60}
61
62pub async fn route_monitoring<T: StateStore>(
63 state: Option<AsyncState<T>>,
64 tx: Sender<Update>,
65 metadata: UpdateMetadata,
66 body: RouteMonitoring,
67) -> Result<()> {
68 let potential_updates = decode_updates(body, metadata.clone()).unwrap_or_default();
69 counter!(
70 "risotto_rx_updates_total",
71 "router" => metadata.router_socket.ip().to_string(),
72 "peer" => metadata.peer_addr.to_string(),
73 )
74 .increment(potential_updates.len() as u64);
75
76 let mut legitimate_updates = Vec::new();
77 if let Some(state) = &state {
78 let mut state_lock = state.lock().await;
79 for update in potential_updates {
80 debug!("{}: {:?}", metadata.router_socket, update);
81 let is_updated = state_lock
82 .update(&metadata.router_socket.ip(), &metadata.peer_addr, &update)
83 .unwrap();
84 if is_updated {
85 legitimate_updates.push(update);
86 }
87 }
88 } else {
89 legitimate_updates = potential_updates;
90 }
91
92 counter!(
93 "risotto_tx_updates_total",
94 "router" => metadata.router_socket.ip().to_string(),
95 "peer" => metadata.peer_addr.to_string(),
96 )
97 .increment(legitimate_updates.len() as u64);
98
99 for update in legitimate_updates {
100 trace!("{:?}", update);
101
102 tx.send(update).await?;
104 }
105
106 Ok(())
107}
108
109pub async fn peer_down_notification<T: StateStore>(
110 state: Option<AsyncState<T>>,
111 tx: Sender<Update>,
112 metadata: UpdateMetadata,
113 _: PeerDownNotification,
114) -> Result<()> {
115 if let Some(state) = state {
116 let mut state_lock = state.lock().await;
119
120 let mut synthetic_updates = Vec::new();
121 let updates = state_lock
122 .get_updates_by_peer(&metadata.router_socket.ip(), &metadata.peer_addr)
123 .unwrap();
124 for prefix in updates {
125 synthetic_updates.push(synthesize_withdraw_update(prefix.clone(), metadata.clone()));
126 }
127
128 state_lock
130 .remove_peer(&metadata.router_socket.ip(), &metadata.peer_addr)
131 .unwrap();
132
133 gauge!(
134 "risotto_peer_established",
135 "router" => metadata.router_socket.ip().to_string(),
136 "peer" => metadata.peer_addr.to_string()
137 )
138 .set(0);
139
140 counter!(
141 "risotto_tx_updates_total",
142 "router" => metadata.router_socket.ip().to_string(),
143 "peer" => metadata.peer_addr.to_string(),
144 )
145 .increment(synthetic_updates.len() as u64);
146
147 for update in synthetic_updates {
148 trace!("{:?}", update);
149
150 tx.send(update).await?;
152 }
153 }
154
155 Ok(())
156}