1use bgpkit_parser::bmp::messages::{BmpMessage, PerPeerFlags, RouteMonitoring};
2use bgpkit_parser::models::*;
3use chrono::{DateTime, MappedLocalTime, TimeZone, Utc};
4use core::net::IpAddr;
5use std::net::Ipv4Addr;
6use tracing::error;
7
8#[derive(Debug, Clone, PartialEq)]
9pub struct UpdateMetadata {
10 pub time_bmp_header_ns: i64,
11 pub router_addr: IpAddr,
12 pub router_port: u16,
13 pub peer_addr: IpAddr,
14 pub peer_bgp_id: Ipv4Addr,
15 pub peer_asn: u32,
16 pub is_post_policy: bool,
17 pub is_adj_rib_out: bool,
18}
19
20#[derive(Debug, Clone, PartialEq)]
21pub struct Update {
22 pub time_received_ns: DateTime<Utc>,
23 pub time_bmp_header_ns: DateTime<Utc>,
24 pub router_addr: IpAddr,
25 pub router_port: u16,
26 pub peer_addr: IpAddr,
27 pub peer_bgp_id: Ipv4Addr,
28 pub peer_asn: u32,
29 pub prefix_addr: IpAddr,
30 pub prefix_len: u8,
31 pub is_post_policy: bool,
32 pub is_adj_rib_out: bool,
33 pub announced: bool,
34 pub next_hop: Option<IpAddr>,
35 pub origin: String,
36 pub path: Vec<u32>,
37 pub local_preference: Option<u32>,
38 pub med: Option<u32>,
39 pub communities: Vec<(u32, u16)>,
40 pub synthetic: bool,
41}
42
43pub fn decode_updates(message: RouteMonitoring, metadata: UpdateMetadata) -> Option<Vec<Update>> {
44 let mut updates = Vec::new();
45
46 match message.bgp_message {
47 bgpkit_parser::models::BgpMessage::Update(bgp_update) => {
48 let mut prefixes_to_update = Vec::new();
50 for prefix in bgp_update.announced_prefixes {
51 prefixes_to_update.push((prefix, true));
52 }
53 for prefix in bgp_update.withdrawn_prefixes {
54 prefixes_to_update.push((prefix, false));
55 }
56
57 let attributes = bgp_update.attributes;
59 if let Some(nlri) = attributes.get_reachable_nlri() {
60 for prefix in &nlri.prefixes {
61 prefixes_to_update.push((*prefix, true));
62 }
63 }
64 if let Some(nlri) = attributes.get_unreachable_nlri() {
65 for prefix in &nlri.prefixes {
66 prefixes_to_update.push((*prefix, false));
67 }
68 }
69
70 let next_hop = attributes.next_hop();
72 let origin = attributes.origin();
73 let path = match attributes.as_path() {
74 Some(path) => Some(path.clone()),
75 None => None,
76 };
77 let local_preference = attributes.local_preference();
78 let med = attributes.multi_exit_discriminator();
79 let communities: Vec<MetaCommunity> = attributes.iter_communities().collect();
80
81 let time_bmp_header_ns = match Utc.timestamp_millis_opt(metadata.time_bmp_header_ns) {
82 MappedLocalTime::Single(dt) => dt,
83 _ => {
84 error!(
85 "failed to parse timestamp: {}, using Utc::now()",
86 metadata.time_bmp_header_ns
87 );
88 Utc::now()
89 }
90 };
91
92 for (prefix, announced) in prefixes_to_update {
93 updates.push(Update {
94 time_received_ns: Utc::now(),
95 time_bmp_header_ns,
96 router_addr: metadata.router_addr,
97 router_port: metadata.router_port,
98 peer_addr: metadata.peer_addr,
99 peer_bgp_id: metadata.peer_bgp_id,
100 peer_asn: metadata.peer_asn,
101 prefix_addr: map_to_ipv6(prefix.prefix.addr()),
102 prefix_len: prefix.prefix.prefix_len(),
103 is_post_policy: metadata.is_post_policy,
104 is_adj_rib_out: metadata.is_adj_rib_out,
105 announced,
106 next_hop: next_hop.map(|ip| map_to_ipv6(ip)),
107 origin: origin.to_string(),
108 path: new_path(path.clone()),
109 local_preference,
110 med,
111 communities: new_communities(&communities.clone()),
112 synthetic: false,
113 });
114 }
115
116 Some(updates)
117 }
118 _ => None,
119 }
120}
121
122pub fn new_metadata(
123 router_addr: IpAddr,
124 router_port: u16,
125 message: &BmpMessage,
126) -> Option<UpdateMetadata> {
127 let Some(pph) = message.per_peer_header else {
129 return None;
130 };
131 let peer = Peer::new(pph.peer_bgp_id, pph.peer_ip, pph.peer_asn);
132
133 let time_bmp_header_ns = (pph.timestamp * 1000.0) as i64;
135
136 let is_post_policy = match pph.peer_flags {
137 PerPeerFlags::PeerFlags(flags) => flags.is_post_policy(),
138 PerPeerFlags::LocalRibPeerFlags(_) => false,
139 };
140
141 let is_adj_rib_out = match pph.peer_flags {
142 PerPeerFlags::PeerFlags(flags) => flags.is_adj_rib_out(),
143 PerPeerFlags::LocalRibPeerFlags(_) => false,
144 };
145
146 Some(UpdateMetadata {
147 time_bmp_header_ns,
148 router_addr,
149 router_port,
150 peer_addr: map_to_ipv6(peer.peer_address),
151 peer_bgp_id: peer.peer_bgp_id,
152 peer_asn: peer.peer_asn.to_u32(),
153 is_post_policy,
154 is_adj_rib_out,
155 })
156}
157
158pub fn new_peer_from_metadata(metadata: UpdateMetadata) -> Peer {
159 Peer::new(
160 metadata.peer_bgp_id,
161 metadata.peer_addr,
162 Asn::new_32bit(metadata.peer_asn),
163 )
164}
165
166pub fn new_path(path: Option<AsPath>) -> Vec<u32> {
167 match path {
168 Some(mut path) => {
169 let mut constructed_path: Vec<u32> = Vec::new();
170 path.coalesce();
171 for segment in path.into_segments_iter() {
172 if let AsPathSegment::AsSequence(dedup_asns) = segment {
173 for asn in dedup_asns {
174 constructed_path.push(asn.to_u32());
175 }
176 }
177 }
178 constructed_path
179 }
180 None => Vec::new(),
181 }
182}
183
184pub fn new_communities(communities: &[MetaCommunity]) -> Vec<(u32, u16)> {
185 let mut constructed_communities = Vec::new();
186 for community in communities {
187 match community {
188 MetaCommunity::Plain(community) => match community {
189 bgpkit_parser::models::Community::Custom(asn, value) => {
190 constructed_communities.push((asn.to_u32(), *value));
191 }
192 _ => (), },
194 _ => (), }
196 }
197 constructed_communities
198}
199
200fn map_to_ipv6(ip: IpAddr) -> IpAddr {
201 if ip.is_ipv4() {
202 format!("::ffff:{}", ip).parse().unwrap()
203 } else {
204 ip
205 }
206}