1pub mod processor;
2pub mod state;
3pub mod state_store;
4pub mod update;
5
6use anyhow::Result;
7use bgpkit_parser::parser::bmp::messages::BmpMessageBody;
8use bytes::Bytes;
9use metrics::counter;
10use std::net::SocketAddr;
11use tokio::sync::mpsc::Sender;
12use tracing::{debug, trace};
13
14use crate::processor::{
15 decode_bmp_message, peer_down_notification, peer_up_notification, route_monitoring,
16};
17use crate::state::AsyncState;
18use crate::state_store::store::StateStore;
19use crate::update::{new_metadata, Update};
20
21pub async fn process_bmp_message<T: StateStore>(
24 state: Option<AsyncState<T>>,
25 tx: Sender<Update>,
26 socket: SocketAddr,
27 bytes: &mut Bytes,
28) -> Result<()> {
29 let message = match decode_bmp_message(bytes) {
31 Ok(message) => message,
32 Err(e) => {
33 anyhow::bail!("failed to decode BMP message: {}", e);
34 }
35 };
36
37 trace!("{} - {:?}", socket, message);
38
39 let metadata = new_metadata(socket.clone(), &message);
41
42 let metric_name = "risotto_bmp_messages_total";
43 match message.message_body {
44 BmpMessageBody::InitiationMessage(body) => {
45 trace!("{}: {:?}", socket.to_string(), body);
46 let tlvs_info = body
47 .tlvs
48 .iter()
49 .map(|tlv| tlv.info.clone())
50 .collect::<Vec<_>>();
51 debug!("{}: InitiationMessage: {:?}", socket.to_string(), tlvs_info);
52 counter!(metric_name, "router" => socket.ip().to_string(), "type" => "initiation")
53 .increment(1);
54 }
56 BmpMessageBody::PeerUpNotification(body) => {
57 trace!("{}: {:?}", socket.to_string(), body);
58 if metadata.is_none() {
59 anyhow::bail!(
60 "{}: PeerUpNotification: no per-peer header",
61 socket.to_string()
62 );
63 }
64 let metadata = metadata.unwrap();
65 debug!(
66 "{}: PeerUpNotification: {}",
67 socket.to_string(),
68 metadata.peer_addr
69 );
70 counter!(metric_name, "router" => socket.ip().to_string(), "type" => "peer_up_notification")
71 .increment(1);
72 peer_up_notification(state, tx, metadata, body).await?;
73 }
74 BmpMessageBody::RouteMonitoring(body) => {
75 trace!("{}: {:?}", socket.to_string(), body);
76 if metadata.is_none() {
77 anyhow::bail!(
78 "{}: RouteMonitoring - no per-peer header",
79 socket.to_string()
80 );
81 }
82 let metadata = metadata.unwrap();
83 if !metadata.peer_addr.is_unspecified() {
86 counter!(metric_name, "router" => socket.ip().to_string(), "type" => "route_monitoring")
87 .increment(1);
88 route_monitoring(state, tx, metadata, body).await?;
89 }
90 }
91 BmpMessageBody::RouteMirroring(body) => {
92 trace!("{}: {:?}", socket.to_string(), body);
93 debug!("{}: RouteMirroring", socket.to_string());
94 counter!(metric_name, "router" => socket.ip().to_string(), "type" => "route_mirroring")
95 .increment(1);
96 }
98 BmpMessageBody::PeerDownNotification(body) => {
99 trace!("{}: {:?}", socket.to_string(), body);
100 if metadata.is_none() {
101 anyhow::bail!(
102 "{}: PeerDownNotification: no per-peer header",
103 socket.to_string()
104 );
105 }
106 let metadata = metadata.unwrap();
107 debug!(
108 "{}: PeerDownNotification: {}. Reason: {:?}",
109 socket.to_string(),
110 metadata.peer_addr,
111 body.reason
112 );
113 counter!(metric_name, "router" => socket.ip().to_string(), "type" => "peer_down_notification")
114 .increment(1);
115 peer_down_notification(state, tx, metadata, body).await?;
116 }
117
118 BmpMessageBody::TerminationMessage(body) => {
119 trace!("{}: {:?}", socket.to_string(), body);
120 debug!("{}: TerminationMessage", socket.to_string());
121 counter!(metric_name, "router" => socket.ip().to_string(), "type" => "termination")
122 .increment(1);
123 }
125 BmpMessageBody::StatsReport(body) => {
126 trace!("{}: {:?}", socket.to_string(), body);
127 debug!("{}: StatsReport", socket.to_string());
128 counter!(metric_name, "router" => socket.ip().to_string(), "type" => "stats_report")
129 .increment(1);
130 }
132 };
133
134 Ok(())
135}