1use anyhow::Result;
2use bgpkit_parser::bmp::messages::{PerPeerFlags, RouteMonitoring};
3use bgpkit_parser::models::Peer;
4use bgpkit_parser::parse_bmp_msg;
5use bgpkit_parser::parser::bmp::messages::{BmpMessage, BmpMessageBody};
6use bytes::Bytes;
7use core::net::IpAddr;
8use std::sync::mpsc::Sender;
9use tracing::{error, info, trace};
10
11use crate::state::AsyncState;
12use crate::state::{peer_up_withdraws_handler, synthesize_withdraw_update};
13use crate::update::{decode_updates, format_update, UpdateHeader};
14
15pub fn decode_bmp_message(bytes: &mut Bytes) -> Result<BmpMessage> {
16 let message = match parse_bmp_msg(bytes) {
17 Ok(message) => message,
18 Err(_) => return Err(anyhow::anyhow!("failed to parse BMP message")),
19 };
20
21 Ok(message)
22}
23
24pub async fn process_peer_up_notification(
25 state: Option<AsyncState>,
26 tx: Sender<String>,
27 router_addr: IpAddr,
28 router_port: u16,
29 peer: Peer,
30) {
31 if let Some(spawn_state) = state {
32 let spawn_state = spawn_state.clone();
33 tokio::spawn(async move {
34 peer_up_withdraws_handler(spawn_state, router_addr, router_port, peer, tx).await;
35 });
36 }
37}
38
39pub async fn process_route_monitoring(
40 state: Option<AsyncState>,
41 tx: Sender<String>,
42 router_addr: IpAddr,
43 router_port: u16,
44 peer: Peer,
45 header: UpdateHeader,
46 body: RouteMonitoring,
47) {
48 let potential_updates = decode_updates(body, header).unwrap_or_default();
49
50 let mut legitimate_updates = Vec::new();
51 if let Some(state) = &state {
52 let mut state_lock = state.lock().unwrap();
53 for update in potential_updates {
54 let is_updated = state_lock.update(&router_addr, &peer, &update).unwrap();
55 if is_updated {
56 legitimate_updates.push(update);
57 }
58 }
59 } else {
60 legitimate_updates = potential_updates;
61 }
62
63 for mut update in legitimate_updates {
64 let update = format_update(router_addr, router_port, &peer, &mut update);
65 trace!("{:?}", update);
66
67 tx.send(update).unwrap();
69 }
70}
71
72pub async fn process_peer_down_notification(
73 state: Option<AsyncState>,
74 tx: Sender<String>,
75 router_addr: IpAddr,
76 router_port: u16,
77 peer: Peer,
78) {
79 if let Some(state) = state {
80 let mut state_lock = state.lock().unwrap();
81
82 let mut synthetic_updates = Vec::new();
85 let updates = state_lock.get_updates_by_peer(&router_addr, &peer).unwrap();
86 for prefix in updates {
87 synthetic_updates.push(synthesize_withdraw_update(prefix.clone()));
88 }
89
90 state_lock.remove_updates(&router_addr, &peer).unwrap();
92
93 for mut update in synthetic_updates {
94 let update = format_update(router_addr, router_port, &peer, &mut update);
95 trace!("{:?}", update);
96
97 tx.send(update).unwrap();
99 }
100 }
101}
102
103pub async fn process_bmp_message(
104 state: Option<AsyncState>,
105 tx: Sender<String>,
106 router_addr: IpAddr,
107 router_port: u16,
108 bytes: &mut Bytes,
109) {
110 let message = match decode_bmp_message(bytes) {
112 Ok(message) => message,
113 Err(e) => {
114 error!("failed to decode BMP message: {}", e);
115 return;
116 }
117 };
118
119 let Some(pph) = message.per_peer_header else {
121 return;
122 };
123 let peer = Peer::new(pph.peer_bgp_id, pph.peer_ip, pph.peer_asn);
124
125 let timestamp = (pph.timestamp * 1000.0) as i64;
127
128 let is_post_policy = match pph.peer_flags {
129 PerPeerFlags::PeerFlags(flags) => flags.is_post_policy(),
130 PerPeerFlags::LocalRibPeerFlags(_) => false,
131 };
132
133 let is_adj_rib_out = match pph.peer_flags {
134 PerPeerFlags::PeerFlags(flags) => flags.is_adj_rib_out(),
135 PerPeerFlags::LocalRibPeerFlags(_) => false,
136 };
137
138 let header = UpdateHeader {
139 timestamp,
140 is_post_policy,
141 is_adj_rib_out,
142 };
143
144 match message.message_body {
145 BmpMessageBody::PeerUpNotification(body) => {
146 trace!("{:?}", body);
147 info!(
148 "PeerUpNotification: {} - {}",
149 router_addr, peer.peer_address
150 );
151
152 process_peer_up_notification(state, tx, router_addr, router_port, peer).await;
153 }
154 BmpMessageBody::RouteMonitoring(body) => {
155 trace!("{:?}", body);
156
157 process_route_monitoring(state, tx, router_addr, router_port, peer, header, body).await;
158 }
159 BmpMessageBody::PeerDownNotification(body) => {
160 trace!("{:?}", body);
161 info!(
162 "PeerDownNotification: {} - {}",
163 router_addr, peer.peer_address
164 );
165
166 process_peer_down_notification(state, tx, router_addr, router_port, peer).await;
167 }
168 _ => (),
169 }
170}