1use bgpkit_parser::bmp::messages::{BmpMessage, PerPeerFlags, RouteMonitoring};
2use bgpkit_parser::models::*;
3use chrono::{DateTime, TimeZone, Utc};
4use core::net::IpAddr;
5use std::net::{Ipv4Addr, SocketAddr};
6use tracing::warn;
7
8#[derive(Debug, Clone, PartialEq)]
9pub struct UpdateMetadata {
10 pub time_bmp_header_ns: i64,
11 pub router_socket: SocketAddr,
12 pub peer_addr: IpAddr,
13 pub peer_bgp_id: Ipv4Addr,
14 pub peer_asn: u32,
15 pub is_post_policy: bool,
16 pub is_adj_rib_out: bool,
17}
18
19#[derive(Debug, Clone, PartialEq)]
20pub struct Update {
21 pub time_received_ns: DateTime<Utc>,
22 pub time_bmp_header_ns: DateTime<Utc>,
23 pub router_addr: IpAddr,
24 pub router_port: u16,
25 pub peer_addr: IpAddr,
26 pub peer_bgp_id: Ipv4Addr,
27 pub peer_asn: u32,
28 pub prefix_addr: IpAddr,
29 pub prefix_len: u8,
30 pub is_post_policy: bool,
31 pub is_adj_rib_out: bool,
32 pub announced: bool,
33 pub synthetic: bool,
34
35 pub origin: String,
37 pub as_path: Vec<u32>,
38 pub next_hop: Option<IpAddr>,
39 pub multi_exit_discriminator: Option<u32>,
40 pub local_preference: Option<u32>,
41 pub only_to_customer: Option<u32>,
42 pub atomic_aggregate: bool,
43 pub aggregator_asn: Option<u32>,
44 pub aggregator_bgp_id: Option<u32>,
45 pub communities: Vec<(u32, u16)>,
46 pub extended_communities: Vec<(u8, u8, Vec<u8>)>, pub large_communities: Vec<(u32, u32, u32)>, pub originator_id: Option<u32>,
49 pub cluster_list: Vec<u32>,
50 pub mp_reach_afi: Option<u16>,
51 pub mp_reach_safi: Option<u8>,
52 pub mp_unreach_afi: Option<u16>,
53 pub mp_unreach_safi: Option<u8>,
54}
55
56impl Update {
57 pub fn partition_key(&self) -> String {
61 format!("{}:{}", self.router_addr, self.peer_addr)
62 }
63}
64
65pub fn decode_updates(message: RouteMonitoring, metadata: UpdateMetadata) -> Option<Vec<Update>> {
66 let bgp_update = match message.bgp_message {
67 bgpkit_parser::models::BgpMessage::Update(update) => update,
68 _ => return None,
69 };
70
71 let prefixes_to_update: Vec<_> = bgp_update
73 .announced_prefixes
74 .into_iter()
75 .map(|p| (p, true))
76 .chain(
77 bgp_update
78 .withdrawn_prefixes
79 .into_iter()
80 .map(|p| (p, false)),
81 )
82 .chain(
83 bgp_update
84 .attributes
85 .get_reachable_nlri()
86 .map(|nlri| nlri.prefixes.iter().map(|&p| (p, true)))
87 .into_iter()
88 .flatten(),
89 )
90 .chain(
91 bgp_update
92 .attributes
93 .get_unreachable_nlri()
94 .map(|nlri| nlri.prefixes.iter().map(|&p| (p, false)))
95 .into_iter()
96 .flatten(),
97 )
98 .collect();
99
100 if prefixes_to_update.is_empty() {
101 return None;
102 }
103
104 let attributes = &bgp_update.attributes;
106 let communities: Vec<MetaCommunity> = attributes.iter_communities().collect();
107
108 let time_bmp_header_ns = Utc
110 .timestamp_millis_opt(metadata.time_bmp_header_ns)
111 .single()
112 .unwrap_or_else(|| {
113 warn!(
114 "failed to parse timestamp: {}, using Utc::now()",
115 metadata.time_bmp_header_ns
116 );
117 Utc::now()
118 });
119
120 let base_update = Update {
122 time_received_ns: Utc::now(),
123 time_bmp_header_ns,
124 router_addr: map_to_ipv6(metadata.router_socket.ip()),
125 router_port: metadata.router_socket.port(),
126 peer_addr: map_to_ipv6(metadata.peer_addr),
127 peer_bgp_id: metadata.peer_bgp_id,
128 peer_asn: metadata.peer_asn,
129 is_post_policy: metadata.is_post_policy,
130 is_adj_rib_out: metadata.is_adj_rib_out,
131 synthetic: false,
132
133 origin: attributes.origin().to_string(),
135 as_path: new_path(attributes.as_path().cloned()),
136 next_hop: attributes.next_hop().map(map_to_ipv6),
137 multi_exit_discriminator: attributes.multi_exit_discriminator(),
138 local_preference: attributes.local_preference(),
139 only_to_customer: attributes.only_to_customer().map(|asn| asn.to_u32()),
140 atomic_aggregate: attributes.atomic_aggregate(),
141 aggregator_asn: attributes.aggregator().map(|(asn, _)| asn.to_u32()),
142 aggregator_bgp_id: attributes.aggregator().map(|(_, id)| u32::from(id)),
143 communities: new_communities(&communities),
144 extended_communities: extract_extended_communities(&communities),
145 large_communities: extract_large_communities(&communities),
146 originator_id: attributes.origin_id().map(|id| u32::from(id)),
147 cluster_list: attributes.clusters().map_or_else(Vec::new, |c| c.to_vec()),
148 mp_reach_afi: attributes.get_reachable_nlri().map(|nlri| match nlri.afi {
149 bgpkit_parser::models::Afi::Ipv4 => 1u16,
150 bgpkit_parser::models::Afi::Ipv6 => 2u16,
151 }),
152 mp_reach_safi: attributes.get_reachable_nlri().map(|nlri| match nlri.safi {
153 bgpkit_parser::models::Safi::Unicast => 1u8,
154 bgpkit_parser::models::Safi::Multicast => 2u8,
155 _ => 0u8,
156 }),
157 mp_unreach_afi: attributes
158 .get_unreachable_nlri()
159 .map(|nlri| match nlri.afi {
160 bgpkit_parser::models::Afi::Ipv4 => 1u16,
161 bgpkit_parser::models::Afi::Ipv6 => 2u16,
162 }),
163 mp_unreach_safi: attributes
164 .get_unreachable_nlri()
165 .map(|nlri| match nlri.safi {
166 bgpkit_parser::models::Safi::Unicast => 1u8,
167 bgpkit_parser::models::Safi::Multicast => 2u8,
168 _ => 0u8,
169 }),
170
171 prefix_addr: std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
173 prefix_len: 0,
174 announced: false,
175 };
176
177 let updates = prefixes_to_update
179 .into_iter()
180 .map(|(prefix, announced)| Update {
181 prefix_addr: map_to_ipv6(prefix.prefix.addr()),
182 prefix_len: prefix.prefix.prefix_len(),
183 announced,
184 ..base_update.clone()
185 })
186 .collect();
187
188 Some(updates)
189}
190
191pub fn new_metadata(socket: SocketAddr, message: &BmpMessage) -> Option<UpdateMetadata> {
192 let Some(pph) = message.per_peer_header else {
194 return None;
195 };
196 let peer = Peer::new(pph.peer_bgp_id, pph.peer_ip, pph.peer_asn);
197
198 let time_bmp_header_ns = (pph.timestamp * 1000.0) as i64;
200
201 let is_post_policy = match pph.peer_flags {
202 PerPeerFlags::PeerFlags(flags) => flags.is_post_policy(),
203 PerPeerFlags::LocalRibPeerFlags(_) => false,
204 };
205
206 let is_adj_rib_out = match pph.peer_flags {
207 PerPeerFlags::PeerFlags(flags) => flags.is_adj_rib_out(),
208 PerPeerFlags::LocalRibPeerFlags(_) => false,
209 };
210
211 Some(UpdateMetadata {
212 time_bmp_header_ns,
213 router_socket: socket,
214 peer_addr: peer.peer_address,
215 peer_bgp_id: peer.peer_bgp_id,
216 peer_asn: peer.peer_asn.to_u32(),
217 is_post_policy,
218 is_adj_rib_out,
219 })
220}
221
222pub fn new_peer_from_metadata(metadata: UpdateMetadata) -> Peer {
223 Peer::new(
224 metadata.peer_bgp_id,
225 metadata.peer_addr,
226 Asn::new_32bit(metadata.peer_asn),
227 )
228}
229
230pub fn new_path(path: Option<AsPath>) -> Vec<u32> {
231 match path {
232 Some(mut path) => {
233 let mut constructed_path: Vec<u32> = Vec::new();
234 path.coalesce();
235 for segment in path.into_segments_iter() {
236 if let AsPathSegment::AsSequence(dedup_asns) = segment {
237 for asn in dedup_asns {
238 constructed_path.push(asn.to_u32());
239 }
240 }
241 }
242 constructed_path
243 }
244 None => Vec::new(),
245 }
246}
247
248pub fn new_communities(communities: &[MetaCommunity]) -> Vec<(u32, u16)> {
249 communities
250 .iter()
251 .filter_map(|community| match community {
252 MetaCommunity::Plain(community) => match community {
253 bgpkit_parser::models::Community::Custom(asn, value) => {
254 Some((asn.to_u32(), *value))
255 }
256 bgpkit_parser::models::Community::NoExport => Some((0xFFFF, 0xFF01)),
258 bgpkit_parser::models::Community::NoAdvertise => Some((0xFFFF, 0xFF02)),
259 bgpkit_parser::models::Community::NoExportSubConfed => Some((0xFFFF, 0xFF03)),
260 },
261 MetaCommunity::Extended(_)
265 | MetaCommunity::Ipv6Extended(_)
266 | MetaCommunity::Large(_) => None,
267 })
268 .collect()
269}
270
271pub fn map_to_ipv6(ip: IpAddr) -> IpAddr {
272 if ip.is_ipv4() {
273 format!("::ffff:{}", ip).parse().unwrap()
274 } else {
275 ip
276 }
277}
278
279fn extract_extended_communities(communities: &[MetaCommunity]) -> Vec<(u8, u8, Vec<u8>)> {
281 communities
282 .iter()
283 .filter_map(|community| match community {
284 MetaCommunity::Extended(ext_comm) => {
285 let type_byte = u8::from(ext_comm.community_type());
286 match ext_comm {
287 bgpkit_parser::models::ExtendedCommunity::TransitiveTwoOctetAs(ec)
288 | bgpkit_parser::models::ExtendedCommunity::NonTransitiveTwoOctetAs(ec) => {
289 Some((
290 type_byte,
291 ec.subtype,
292 [
293 &ec.global_admin.to_u32().to_be_bytes()[2..],
294 &ec.local_admin,
295 ]
296 .concat(),
297 ))
298 }
299 bgpkit_parser::models::ExtendedCommunity::TransitiveIpv4Addr(ec)
300 | bgpkit_parser::models::ExtendedCommunity::NonTransitiveIpv4Addr(ec) => {
301 Some((
302 type_byte,
303 ec.subtype,
304 [
305 ec.global_admin.octets().as_slice(),
306 ec.local_admin.as_slice(),
307 ]
308 .concat(),
309 ))
310 }
311 bgpkit_parser::models::ExtendedCommunity::TransitiveFourOctetAs(ec)
312 | bgpkit_parser::models::ExtendedCommunity::NonTransitiveFourOctetAs(ec) => {
313 Some((
314 type_byte,
315 ec.subtype,
316 [
317 ec.global_admin.to_u32().to_be_bytes().as_slice(),
318 ec.local_admin.as_slice(),
319 ]
320 .concat(),
321 ))
322 }
323 bgpkit_parser::models::ExtendedCommunity::TransitiveOpaque(ec)
324 | bgpkit_parser::models::ExtendedCommunity::NonTransitiveOpaque(ec) => {
325 Some((type_byte, ec.subtype, ec.value.to_vec()))
326 }
327 bgpkit_parser::models::ExtendedCommunity::Raw(raw) => {
328 Some((raw[0], raw[1], raw[2..].to_vec()))
329 }
330 }
331 }
332 MetaCommunity::Ipv6Extended(ipv6_ext_comm) => {
333 let type_byte = u8::from(ipv6_ext_comm.community_type);
334 Some((
335 type_byte,
336 ipv6_ext_comm.subtype,
337 [
338 ipv6_ext_comm.global_admin.octets().as_slice(),
339 &ipv6_ext_comm.local_admin,
340 ]
341 .concat(),
342 ))
343 }
344 _ => None,
345 })
346 .collect()
347}
348
349fn extract_large_communities(communities: &[MetaCommunity]) -> Vec<(u32, u32, u32)> {
350 communities
351 .iter()
352 .filter_map(|community| match community {
353 MetaCommunity::Large(large_comm) => Some((
354 large_comm.global_admin,
355 large_comm.local_data[0],
356 large_comm.local_data[1],
357 )),
358 _ => None,
359 })
360 .collect()
361}