1pub mod processor;
2pub mod state;
3pub mod state_store;
4pub mod statistics;
5pub mod update;
6
7use bgpkit_parser::parser::bmp::messages::BmpMessageBody;
8use bytes::Bytes;
9use core::net::IpAddr;
10use statistics::AsyncStatistics;
11use std::sync::mpsc::Sender;
12use tracing::{debug, error, info, trace};
13
14use crate::processor::{
15 decode_bmp_message, peer_down_notification, peer_up_notification, route_monitoring,
16};
17use crate::state::AsyncState;
18use crate::state_store::store::StateStore;
19use crate::update::{new_metadata, Update};
20
21pub async fn process_bmp_message<T: StateStore>(
22 state: Option<AsyncState<T>>,
23 statistics: AsyncStatistics,
24 tx: Sender<Update>,
25 router_addr: IpAddr,
26 router_port: u16,
27 bytes: &mut Bytes,
28) {
29 let message = match decode_bmp_message(bytes) {
31 Ok(message) => message,
32 Err(e) => {
33 error!("failed to decode BMP message: {}", e);
34 return;
35 }
36 };
37
38 let mut statistics_lock = statistics.lock().await;
39
40 trace!("[{}]:{} - {:?}", router_addr, router_port, message);
41 statistics_lock.rx_bmp_messages += 1;
42
43 let metadata = new_metadata(router_addr, router_port, &message);
45
46 match message.message_body {
47 BmpMessageBody::InitiationMessage(body) => {
48 trace!("{:?}", body);
49 let tlvs_info = body
50 .tlvs
51 .iter()
52 .map(|tlv| tlv.info.clone())
53 .collect::<Vec<_>>();
54 debug!(
55 "[{}]:{} - InitiationMessage: {:?}",
56 router_addr, router_port, tlvs_info
57 );
58 statistics_lock.rx_bmp_initiation += 1;
59 }
61 BmpMessageBody::PeerUpNotification(body) => {
62 trace!("{:?}", body);
63 if metadata.is_none() {
64 error!(
65 "[{}]:{} - PeerUpNotification - no per-peer header",
66 router_addr, router_port
67 );
68 return;
69 }
70 let metadata = metadata.unwrap();
71 debug!(
72 "[{}]:{} - PeerUpNotification - {}",
73 metadata.router_addr, metadata.router_port, metadata.peer_addr
74 );
75 statistics_lock.rx_bmp_peer_up += 1;
76 peer_up_notification(state, tx, metadata, body).await;
77 }
78 BmpMessageBody::RouteMonitoring(body) => {
79 debug!("{:?}", body);
80 if metadata.is_none() {
81 error!(
82 "[{}]:{} - RouteMonitoring - no per-peer header",
83 router_addr, router_port
84 );
85 return;
86 }
87 let metadata = metadata.unwrap();
88 statistics_lock.rx_bmp_route_monitoring += 1;
89 route_monitoring(state, tx, metadata, body).await;
90 }
91 BmpMessageBody::RouteMirroring(body) => {
92 trace!("{:?}", body);
93 debug!("[{}]:{} - RouteMirroring", router_addr, router_port);
94 statistics_lock.rx_bmp_route_mirroring += 1;
95 }
97 BmpMessageBody::PeerDownNotification(body) => {
98 trace!("{:?}", body);
99 if metadata.is_none() {
100 error!(
101 "[{}]:{} - RouteMonitoring - no per-peer header",
102 router_addr, router_port
103 );
104 return;
105 }
106 let metadata = metadata.unwrap();
107 debug!(
108 "[{}]:{} - PeerDownNotification: - {}",
109 metadata.router_addr, metadata.router_port, metadata.peer_addr
110 );
111 statistics_lock.rx_bmp_peer_down += 1;
112 peer_down_notification(state, tx, metadata, body).await;
113 }
114
115 BmpMessageBody::TerminationMessage(body) => {
116 trace!("{:?}", body);
117 info!("[{}]:{} - TerminationMessage", router_addr, router_port);
118 statistics_lock.rx_bmp_termination += 1;
119 }
121 BmpMessageBody::StatsReport(body) => {
122 trace!("{:?}", body);
123 info!("[{}]:{} - StatsReport", router_addr, router_port);
124 statistics_lock.rx_bmp_stats_report += 1;
125 }
127 };
128}