1pub mod processor;
2pub mod state;
3pub mod state_store;
4pub mod update;
5
6use bgpkit_parser::parser::bmp::messages::BmpMessageBody;
7use bytes::Bytes;
8use core::net::IpAddr;
9use std::sync::mpsc::Sender;
10use tracing::{debug, error, info, trace};
11
12use crate::processor::{
13 decode_bmp_message, peer_down_notification, peer_up_notification, route_monitoring,
14};
15use crate::state::AsyncState;
16use crate::state_store::store::StateStore;
17use crate::update::{new_metadata, Update};
18
19pub async fn process_bmp_message<T: StateStore>(
20 state: Option<AsyncState<T>>,
21 tx: Sender<Update>,
22 router_addr: IpAddr,
23 router_port: u16,
24 bytes: &mut Bytes,
25) {
26 let message = match decode_bmp_message(bytes) {
28 Ok(message) => message,
29 Err(e) => {
30 error!("failed to decode BMP message: {}", e);
31 return;
32 }
33 };
34
35 trace!("[{}]:{} - {:?}", router_addr, router_port, message);
36
37 let metadata = new_metadata(router_addr, router_port, &message);
39
40 match message.message_body {
41 BmpMessageBody::InitiationMessage(body) => {
42 let tlvs_info = body
43 .tlvs
44 .iter()
45 .map(|tlv| tlv.info.clone())
46 .collect::<Vec<_>>();
47 info!(
48 "[{}]:{} - InitiationMessage: {:?}",
49 router_addr, router_port, tlvs_info
50 );
51 }
53 BmpMessageBody::PeerUpNotification(body) => {
54 trace!("{:?}", body);
55 if metadata.is_none() {
56 error!(
57 "[{}]:{} - PeerUpNotification - no per-peer header",
58 router_addr, router_port
59 );
60 return;
61 }
62 let metadata = metadata.unwrap();
63 info!(
64 "[{}]:{} - PeerUpNotification - {}",
65 metadata.router_addr, metadata.router_port, metadata.peer_addr
66 );
67 peer_up_notification(state, tx, metadata, body).await;
68 }
69 BmpMessageBody::RouteMonitoring(body) => {
70 debug!("{:?}", body);
71 if metadata.is_none() {
72 error!(
73 "[{}]:{} - RouteMonitoring - no per-peer header",
74 router_addr, router_port
75 );
76 return;
77 }
78 let metadata = metadata.unwrap();
79 route_monitoring(state, tx, metadata, body).await;
80 }
81 BmpMessageBody::RouteMirroring(_) => {
82 info!("[{}]:{} - RouteMirroring", router_addr, router_port)
83 }
85 BmpMessageBody::PeerDownNotification(body) => {
86 trace!("{:?}", body);
87 if metadata.is_none() {
88 error!(
89 "[{}]:{} - RouteMonitoring - no per-peer header",
90 router_addr, router_port
91 );
92 return;
93 }
94 let metadata = metadata.unwrap();
95 info!(
96 "[{}]:{} - PeerDownNotification: - {}",
97 metadata.router_addr, metadata.router_port, metadata.peer_addr
98 );
99 peer_down_notification(state, tx, metadata, body).await;
100 }
101
102 BmpMessageBody::TerminationMessage(_) => {
103 info!("[{}]:{} - TerminationMessage", router_addr, router_port)
104 }
106 BmpMessageBody::StatsReport(_) => {
107 info!("[{}]:{} - StatsReport", router_addr, router_port)
108 }
110 }
111}