1use bgpkit_parser::bmp::messages::{BmpMessage, PerPeerFlags, RouteMonitoring};
2use bgpkit_parser::models::*;
3use chrono::{DateTime, TimeZone, Utc};
4use core::net::IpAddr;
5use std::net::{Ipv4Addr, SocketAddr};
6use tracing::warn;
7
8#[derive(Debug, Clone, PartialEq)]
9pub struct UpdateMetadata {
10 pub time_bmp_header_ns: i64,
11 pub router_socket: SocketAddr,
12 pub peer_addr: IpAddr,
13 pub peer_bgp_id: Ipv4Addr,
14 pub peer_asn: u32,
15 pub is_post_policy: bool,
16 pub is_adj_rib_out: bool,
17}
18
19#[derive(Debug, Clone, PartialEq)]
20pub struct Update {
21 pub time_received_ns: DateTime<Utc>,
22 pub time_bmp_header_ns: DateTime<Utc>,
23 pub router_addr: IpAddr,
24 pub router_port: u16,
25 pub peer_addr: IpAddr,
26 pub peer_bgp_id: Ipv4Addr,
27 pub peer_asn: u32,
28 pub prefix_addr: IpAddr,
29 pub prefix_len: u8,
30 pub is_post_policy: bool,
31 pub is_adj_rib_out: bool,
32 pub announced: bool,
33 pub synthetic: bool,
34
35 pub origin: String,
37 pub as_path: Vec<u32>,
38 pub next_hop: Option<IpAddr>,
39 pub multi_exit_discriminator: Option<u32>,
40 pub local_preference: Option<u32>,
41 pub only_to_customer: Option<u32>,
42 pub atomic_aggregate: bool,
43 pub aggregator_asn: Option<u32>,
44 pub aggregator_bgp_id: Option<u32>,
45 pub communities: Vec<(u32, u16)>,
46 pub extended_communities: Vec<(u8, u8, Vec<u8>)>, pub large_communities: Vec<(u32, u32, u32)>, pub originator_id: Option<u32>,
49 pub cluster_list: Vec<u32>,
50 pub mp_reach_afi: Option<u16>,
51 pub mp_reach_safi: Option<u8>,
52 pub mp_unreach_afi: Option<u16>,
53 pub mp_unreach_safi: Option<u8>,
54}
55
56pub fn decode_updates(message: RouteMonitoring, metadata: UpdateMetadata) -> Option<Vec<Update>> {
57 let bgp_update = match message.bgp_message {
58 bgpkit_parser::models::BgpMessage::Update(update) => update,
59 _ => return None,
60 };
61
62 let prefixes_to_update: Vec<_> = bgp_update
64 .announced_prefixes
65 .into_iter()
66 .map(|p| (p, true))
67 .chain(
68 bgp_update
69 .withdrawn_prefixes
70 .into_iter()
71 .map(|p| (p, false)),
72 )
73 .chain(
74 bgp_update
75 .attributes
76 .get_reachable_nlri()
77 .map(|nlri| nlri.prefixes.iter().map(|&p| (p, true)))
78 .into_iter()
79 .flatten(),
80 )
81 .chain(
82 bgp_update
83 .attributes
84 .get_unreachable_nlri()
85 .map(|nlri| nlri.prefixes.iter().map(|&p| (p, false)))
86 .into_iter()
87 .flatten(),
88 )
89 .collect();
90
91 if prefixes_to_update.is_empty() {
92 return None;
93 }
94
95 let attributes = &bgp_update.attributes;
97 let communities: Vec<MetaCommunity> = attributes.iter_communities().collect();
98
99 let time_bmp_header_ns = Utc
101 .timestamp_millis_opt(metadata.time_bmp_header_ns)
102 .single()
103 .unwrap_or_else(|| {
104 warn!(
105 "failed to parse timestamp: {}, using Utc::now()",
106 metadata.time_bmp_header_ns
107 );
108 Utc::now()
109 });
110
111 let base_update = Update {
113 time_received_ns: Utc::now(),
114 time_bmp_header_ns,
115 router_addr: map_to_ipv6(metadata.router_socket.ip()),
116 router_port: metadata.router_socket.port(),
117 peer_addr: map_to_ipv6(metadata.peer_addr),
118 peer_bgp_id: metadata.peer_bgp_id,
119 peer_asn: metadata.peer_asn,
120 is_post_policy: metadata.is_post_policy,
121 is_adj_rib_out: metadata.is_adj_rib_out,
122 synthetic: false,
123
124 origin: attributes.origin().to_string(),
126 as_path: new_path(attributes.as_path().cloned()),
127 next_hop: attributes.next_hop().map(map_to_ipv6),
128 multi_exit_discriminator: attributes.multi_exit_discriminator(),
129 local_preference: attributes.local_preference(),
130 only_to_customer: attributes.only_to_customer().map(|asn| asn.to_u32()),
131 atomic_aggregate: attributes.atomic_aggregate(),
132 aggregator_asn: attributes.aggregator().map(|(asn, _)| asn.to_u32()),
133 aggregator_bgp_id: attributes.aggregator().map(|(_, id)| u32::from(id)),
134 communities: new_communities(&communities),
135 extended_communities: extract_extended_communities(&communities),
136 large_communities: extract_large_communities(&communities),
137 originator_id: attributes.origin_id().map(|id| u32::from(id)),
138 cluster_list: attributes.clusters().map_or_else(Vec::new, |c| c.to_vec()),
139 mp_reach_afi: attributes.get_reachable_nlri().map(|nlri| match nlri.afi {
140 bgpkit_parser::models::Afi::Ipv4 => 1u16,
141 bgpkit_parser::models::Afi::Ipv6 => 2u16,
142 }),
143 mp_reach_safi: attributes.get_reachable_nlri().map(|nlri| match nlri.safi {
144 bgpkit_parser::models::Safi::Unicast => 1u8,
145 bgpkit_parser::models::Safi::Multicast => 2u8,
146 _ => 0u8,
147 }),
148 mp_unreach_afi: attributes
149 .get_unreachable_nlri()
150 .map(|nlri| match nlri.afi {
151 bgpkit_parser::models::Afi::Ipv4 => 1u16,
152 bgpkit_parser::models::Afi::Ipv6 => 2u16,
153 }),
154 mp_unreach_safi: attributes
155 .get_unreachable_nlri()
156 .map(|nlri| match nlri.safi {
157 bgpkit_parser::models::Safi::Unicast => 1u8,
158 bgpkit_parser::models::Safi::Multicast => 2u8,
159 _ => 0u8,
160 }),
161
162 prefix_addr: std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
164 prefix_len: 0,
165 announced: false,
166 };
167
168 let updates = prefixes_to_update
170 .into_iter()
171 .map(|(prefix, announced)| Update {
172 prefix_addr: map_to_ipv6(prefix.prefix.addr()),
173 prefix_len: prefix.prefix.prefix_len(),
174 announced,
175 ..base_update.clone()
176 })
177 .collect();
178
179 Some(updates)
180}
181
182pub fn new_metadata(socket: SocketAddr, message: &BmpMessage) -> Option<UpdateMetadata> {
183 let Some(pph) = message.per_peer_header else {
185 return None;
186 };
187 let peer = Peer::new(pph.peer_bgp_id, pph.peer_ip, pph.peer_asn);
188
189 let time_bmp_header_ns = (pph.timestamp * 1000.0) as i64;
191
192 let is_post_policy = match pph.peer_flags {
193 PerPeerFlags::PeerFlags(flags) => flags.is_post_policy(),
194 PerPeerFlags::LocalRibPeerFlags(_) => false,
195 };
196
197 let is_adj_rib_out = match pph.peer_flags {
198 PerPeerFlags::PeerFlags(flags) => flags.is_adj_rib_out(),
199 PerPeerFlags::LocalRibPeerFlags(_) => false,
200 };
201
202 Some(UpdateMetadata {
203 time_bmp_header_ns,
204 router_socket: socket,
205 peer_addr: peer.peer_address,
206 peer_bgp_id: peer.peer_bgp_id,
207 peer_asn: peer.peer_asn.to_u32(),
208 is_post_policy,
209 is_adj_rib_out,
210 })
211}
212
213pub fn new_peer_from_metadata(metadata: UpdateMetadata) -> Peer {
214 Peer::new(
215 metadata.peer_bgp_id,
216 metadata.peer_addr,
217 Asn::new_32bit(metadata.peer_asn),
218 )
219}
220
221pub fn new_path(path: Option<AsPath>) -> Vec<u32> {
222 match path {
223 Some(mut path) => {
224 let mut constructed_path: Vec<u32> = Vec::new();
225 path.coalesce();
226 for segment in path.into_segments_iter() {
227 if let AsPathSegment::AsSequence(dedup_asns) = segment {
228 for asn in dedup_asns {
229 constructed_path.push(asn.to_u32());
230 }
231 }
232 }
233 constructed_path
234 }
235 None => Vec::new(),
236 }
237}
238
239pub fn new_communities(communities: &[MetaCommunity]) -> Vec<(u32, u16)> {
240 communities
241 .iter()
242 .filter_map(|community| match community {
243 MetaCommunity::Plain(community) => match community {
244 bgpkit_parser::models::Community::Custom(asn, value) => {
245 Some((asn.to_u32(), *value))
246 }
247 bgpkit_parser::models::Community::NoExport => Some((0xFFFF, 0xFF01)),
249 bgpkit_parser::models::Community::NoAdvertise => Some((0xFFFF, 0xFF02)),
250 bgpkit_parser::models::Community::NoExportSubConfed => Some((0xFFFF, 0xFF03)),
251 },
252 MetaCommunity::Extended(_)
256 | MetaCommunity::Ipv6Extended(_)
257 | MetaCommunity::Large(_) => None,
258 })
259 .collect()
260}
261
262pub fn map_to_ipv6(ip: IpAddr) -> IpAddr {
263 if ip.is_ipv4() {
264 format!("::ffff:{}", ip).parse().unwrap()
265 } else {
266 ip
267 }
268}
269
270fn extract_extended_communities(communities: &[MetaCommunity]) -> Vec<(u8, u8, Vec<u8>)> {
272 communities
273 .iter()
274 .filter_map(|community| match community {
275 MetaCommunity::Extended(ext_comm) => {
276 let type_byte = u8::from(ext_comm.community_type());
277 match ext_comm {
278 bgpkit_parser::models::ExtendedCommunity::TransitiveTwoOctetAs(ec)
279 | bgpkit_parser::models::ExtendedCommunity::NonTransitiveTwoOctetAs(ec) => {
280 Some((
281 type_byte,
282 ec.subtype,
283 [
284 &ec.global_admin.to_u32().to_be_bytes()[2..],
285 &ec.local_admin,
286 ]
287 .concat(),
288 ))
289 }
290 bgpkit_parser::models::ExtendedCommunity::TransitiveIpv4Addr(ec)
291 | bgpkit_parser::models::ExtendedCommunity::NonTransitiveIpv4Addr(ec) => {
292 Some((
293 type_byte,
294 ec.subtype,
295 [
296 ec.global_admin.octets().as_slice(),
297 ec.local_admin.as_slice(),
298 ]
299 .concat(),
300 ))
301 }
302 bgpkit_parser::models::ExtendedCommunity::TransitiveFourOctetAs(ec)
303 | bgpkit_parser::models::ExtendedCommunity::NonTransitiveFourOctetAs(ec) => {
304 Some((
305 type_byte,
306 ec.subtype,
307 [
308 ec.global_admin.to_u32().to_be_bytes().as_slice(),
309 ec.local_admin.as_slice(),
310 ]
311 .concat(),
312 ))
313 }
314 bgpkit_parser::models::ExtendedCommunity::TransitiveOpaque(ec)
315 | bgpkit_parser::models::ExtendedCommunity::NonTransitiveOpaque(ec) => {
316 Some((type_byte, ec.subtype, ec.value.to_vec()))
317 }
318 bgpkit_parser::models::ExtendedCommunity::Raw(raw) => {
319 Some((raw[0], raw[1], raw[2..].to_vec()))
320 }
321 }
322 }
323 MetaCommunity::Ipv6Extended(ipv6_ext_comm) => {
324 let type_byte = u8::from(ipv6_ext_comm.community_type);
325 Some((
326 type_byte,
327 ipv6_ext_comm.subtype,
328 [
329 ipv6_ext_comm.global_admin.octets().as_slice(),
330 &ipv6_ext_comm.local_admin,
331 ]
332 .concat(),
333 ))
334 }
335 _ => None,
336 })
337 .collect()
338}
339
340fn extract_large_communities(communities: &[MetaCommunity]) -> Vec<(u32, u32, u32)> {
341 communities
342 .iter()
343 .filter_map(|community| match community {
344 MetaCommunity::Large(large_comm) => Some((
345 large_comm.global_admin,
346 large_comm.local_data[0],
347 large_comm.local_data[1],
348 )),
349 _ => None,
350 })
351 .collect()
352}