1pub mod processor;
2pub mod state;
3pub mod state_store;
4pub mod update;
5
6use anyhow::Result;
7use bgpkit_parser::parser::bmp::messages::BmpMessageBody;
8use bytes::Bytes;
9use metrics::counter;
10use std::net::SocketAddr;
11use tokio::sync::mpsc::Sender;
12use tracing::{debug, trace};
13
14use crate::processor::{
15 decode_bmp_message, peer_down_notification, peer_up_notification, route_monitoring,
16};
17use crate::state::AsyncState;
18use crate::state_store::store::StateStore;
19use crate::update::{new_metadata, Update};
20
21pub async fn process_bmp_message<T: StateStore>(
22 state: Option<AsyncState<T>>,
23 tx: Sender<Update>,
24 socket: SocketAddr,
25 bytes: &mut Bytes,
26) -> Result<()> {
27 let message = match decode_bmp_message(bytes) {
29 Ok(message) => message,
30 Err(e) => {
31 anyhow::bail!("failed to decode BMP message: {}", e);
32 }
33 };
34
35 trace!("{} - {:?}", socket, message);
36
37 let metadata = new_metadata(socket.clone(), &message);
39
40 let metric_name = "risotto_bmp_messages_total";
41 match message.message_body {
42 BmpMessageBody::InitiationMessage(body) => {
43 trace!("{}: {:?}", socket.to_string(), body);
44 let tlvs_info = body
45 .tlvs
46 .iter()
47 .map(|tlv| tlv.info.clone())
48 .collect::<Vec<_>>();
49 debug!("{}: InitiationMessage: {:?}", socket.to_string(), tlvs_info);
50 counter!(metric_name, "router" => socket.ip().to_string(), "type" => "initiation")
51 .increment(1);
52 }
54 BmpMessageBody::PeerUpNotification(body) => {
55 trace!("{}: {:?}", socket.to_string(), body);
56 if metadata.is_none() {
57 anyhow::bail!(
58 "{}: PeerUpNotification: no per-peer header",
59 socket.to_string()
60 );
61 }
62 let metadata = metadata.unwrap();
63 debug!(
64 "{}: PeerUpNotification: {}",
65 socket.to_string(),
66 metadata.peer_addr
67 );
68 counter!(metric_name, "router" => socket.ip().to_string(), "type" => "peer_up_notification")
69 .increment(1);
70 peer_up_notification(state, tx, metadata, body).await?;
71 }
72 BmpMessageBody::RouteMonitoring(body) => {
73 trace!("{}: {:?}", socket.to_string(), body);
74 if metadata.is_none() {
75 anyhow::bail!(
76 "{}: RouteMonitoring - no per-peer header",
77 socket.to_string()
78 );
79 }
80 let metadata = metadata.unwrap();
81 if !metadata.peer_addr.is_unspecified() {
84 counter!(metric_name, "router" => socket.ip().to_string(), "type" => "route_monitoring")
85 .increment(1);
86 route_monitoring(state, tx, metadata, body).await?;
87 }
88 }
89 BmpMessageBody::RouteMirroring(body) => {
90 trace!("{}: {:?}", socket.to_string(), body);
91 debug!("{}: RouteMirroring", socket.to_string());
92 counter!(metric_name, "router" => socket.ip().to_string(), "type" => "route_mirroring")
93 .increment(1);
94 }
96 BmpMessageBody::PeerDownNotification(body) => {
97 trace!("{}: {:?}", socket.to_string(), body);
98 if metadata.is_none() {
99 anyhow::bail!(
100 "{}: PeerDownNotification: no per-peer header",
101 socket.to_string()
102 );
103 }
104 let metadata = metadata.unwrap();
105 debug!(
106 "{}: PeerDownNotification: {}. Reason: {:?}",
107 socket.to_string(),
108 metadata.peer_addr,
109 body.reason
110 );
111 counter!(metric_name, "router" => socket.ip().to_string(), "type" => "peer_down_notification")
112 .increment(1);
113 peer_down_notification(state, tx, metadata, body).await?;
114 }
115
116 BmpMessageBody::TerminationMessage(body) => {
117 trace!("{}: {:?}", socket.to_string(), body);
118 debug!("{}: TerminationMessage", socket.to_string());
119 counter!(metric_name, "router" => socket.ip().to_string(), "type" => "termination")
120 .increment(1);
121 }
123 BmpMessageBody::StatsReport(body) => {
124 trace!("{}: {:?}", socket.to_string(), body);
125 debug!("{}: StatsReport", socket.to_string());
126 counter!(metric_name, "router" => socket.ip().to_string(), "type" => "stats_report")
127 .increment(1);
128 }
130 };
131
132 Ok(())
133}