1use bgpkit_parser::bmp::messages::{BmpMessage, PerPeerFlags, RouteMonitoring};
2use bgpkit_parser::models::*;
3use chrono::{DateTime, MappedLocalTime, TimeZone, Utc};
4use core::net::IpAddr;
5use std::net::{Ipv4Addr, SocketAddr};
6use tracing::warn;
7
8#[derive(Debug, Clone, PartialEq)]
9pub struct UpdateMetadata {
10 pub time_bmp_header_ns: i64,
11 pub router_socket: SocketAddr,
12 pub peer_addr: IpAddr,
13 pub peer_bgp_id: Ipv4Addr,
14 pub peer_asn: u32,
15 pub is_post_policy: bool,
16 pub is_adj_rib_out: bool,
17}
18
19#[derive(Debug, Clone, PartialEq)]
20pub struct Update {
21 pub time_received_ns: DateTime<Utc>,
22 pub time_bmp_header_ns: DateTime<Utc>,
23 pub router_addr: IpAddr,
24 pub router_port: u16,
25 pub peer_addr: IpAddr,
26 pub peer_bgp_id: Ipv4Addr,
27 pub peer_asn: u32,
28 pub prefix_addr: IpAddr,
29 pub prefix_len: u8,
30 pub is_post_policy: bool,
31 pub is_adj_rib_out: bool,
32 pub announced: bool,
33 pub next_hop: Option<IpAddr>,
34 pub origin: String,
35 pub path: Vec<u32>,
36 pub local_preference: Option<u32>,
37 pub med: Option<u32>,
38 pub communities: Vec<(u32, u16)>,
39 pub synthetic: bool,
40}
41
42pub fn decode_updates(message: RouteMonitoring, metadata: UpdateMetadata) -> Option<Vec<Update>> {
43 let mut updates = Vec::new();
44
45 match message.bgp_message {
46 bgpkit_parser::models::BgpMessage::Update(bgp_update) => {
47 let mut prefixes_to_update = Vec::new();
49 for prefix in bgp_update.announced_prefixes {
50 prefixes_to_update.push((prefix, true));
51 }
52 for prefix in bgp_update.withdrawn_prefixes {
53 prefixes_to_update.push((prefix, false));
54 }
55
56 let attributes = bgp_update.attributes;
58 if let Some(nlri) = attributes.get_reachable_nlri() {
59 for prefix in &nlri.prefixes {
60 prefixes_to_update.push((*prefix, true));
61 }
62 }
63 if let Some(nlri) = attributes.get_unreachable_nlri() {
64 for prefix in &nlri.prefixes {
65 prefixes_to_update.push((*prefix, false));
66 }
67 }
68
69 let next_hop = attributes.next_hop();
71 let origin = attributes.origin();
72 let path = match attributes.as_path() {
73 Some(path) => Some(path.clone()),
74 None => None,
75 };
76 let local_preference = attributes.local_preference();
77 let med = attributes.multi_exit_discriminator();
78 let communities: Vec<MetaCommunity> = attributes.iter_communities().collect();
79
80 let time_bmp_header_ns = match Utc.timestamp_millis_opt(metadata.time_bmp_header_ns) {
81 MappedLocalTime::Single(dt) => dt,
82 _ => {
83 warn!(
84 "failed to parse timestamp: {}, using Utc::now()",
85 metadata.time_bmp_header_ns
86 );
87 Utc::now()
88 }
89 };
90
91 for (prefix, announced) in prefixes_to_update {
92 updates.push(Update {
93 time_received_ns: Utc::now(),
94 time_bmp_header_ns,
95 router_addr: map_to_ipv6(metadata.router_socket.ip()),
96 router_port: metadata.router_socket.port(),
97 peer_addr: map_to_ipv6(metadata.peer_addr),
98 peer_bgp_id: metadata.peer_bgp_id,
99 peer_asn: metadata.peer_asn,
100 prefix_addr: map_to_ipv6(prefix.prefix.addr()),
101 prefix_len: prefix.prefix.prefix_len(),
102 is_post_policy: metadata.is_post_policy,
103 is_adj_rib_out: metadata.is_adj_rib_out,
104 announced,
105 next_hop: next_hop.map(|ip| map_to_ipv6(ip)),
106 origin: origin.to_string(),
107 path: new_path(path.clone()),
108 local_preference,
109 med,
110 communities: new_communities(&communities.clone()),
111 synthetic: false,
112 });
113 }
114
115 Some(updates)
116 }
117 _ => None,
118 }
119}
120
121pub fn new_metadata(socket: SocketAddr, message: &BmpMessage) -> Option<UpdateMetadata> {
122 let Some(pph) = message.per_peer_header else {
124 return None;
125 };
126 let peer = Peer::new(pph.peer_bgp_id, pph.peer_ip, pph.peer_asn);
127
128 let time_bmp_header_ns = (pph.timestamp * 1000.0) as i64;
130
131 let is_post_policy = match pph.peer_flags {
132 PerPeerFlags::PeerFlags(flags) => flags.is_post_policy(),
133 PerPeerFlags::LocalRibPeerFlags(_) => false,
134 };
135
136 let is_adj_rib_out = match pph.peer_flags {
137 PerPeerFlags::PeerFlags(flags) => flags.is_adj_rib_out(),
138 PerPeerFlags::LocalRibPeerFlags(_) => false,
139 };
140
141 Some(UpdateMetadata {
142 time_bmp_header_ns,
143 router_socket: socket,
144 peer_addr: peer.peer_address,
145 peer_bgp_id: peer.peer_bgp_id,
146 peer_asn: peer.peer_asn.to_u32(),
147 is_post_policy,
148 is_adj_rib_out,
149 })
150}
151
152pub fn new_peer_from_metadata(metadata: UpdateMetadata) -> Peer {
153 Peer::new(
154 metadata.peer_bgp_id,
155 metadata.peer_addr,
156 Asn::new_32bit(metadata.peer_asn),
157 )
158}
159
160pub fn new_path(path: Option<AsPath>) -> Vec<u32> {
161 match path {
162 Some(mut path) => {
163 let mut constructed_path: Vec<u32> = Vec::new();
164 path.coalesce();
165 for segment in path.into_segments_iter() {
166 if let AsPathSegment::AsSequence(dedup_asns) = segment {
167 for asn in dedup_asns {
168 constructed_path.push(asn.to_u32());
169 }
170 }
171 }
172 constructed_path
173 }
174 None => Vec::new(),
175 }
176}
177
178pub fn new_communities(communities: &[MetaCommunity]) -> Vec<(u32, u16)> {
179 let mut constructed_communities = Vec::new();
180 for community in communities {
181 match community {
182 MetaCommunity::Plain(community) => match community {
183 bgpkit_parser::models::Community::Custom(asn, value) => {
184 constructed_communities.push((asn.to_u32(), *value));
185 }
186 _ => (), },
188 _ => (), }
190 }
191 constructed_communities
192}
193
194pub fn map_to_ipv6(ip: IpAddr) -> IpAddr {
195 if ip.is_ipv4() {
196 format!("::ffff:{}", ip).parse().unwrap()
197 } else {
198 ip
199 }
200}