1use bgpkit_parser::bmp::messages::{BmpMessage, PerPeerFlags, RouteMonitoring};
2use bgpkit_parser::models::*;
3use chrono::{DateTime, MappedLocalTime, TimeZone, Utc};
4use core::net::IpAddr;
5use std::net::Ipv4Addr;
6use tracing::error;
7
8#[derive(Debug, Clone, PartialEq)]
9pub struct UpdateMetadata {
10 pub timestamp: i64,
11 pub router_addr: IpAddr,
12 pub router_port: u16,
13 pub peer_addr: IpAddr,
14 pub peer_bgp_id: Ipv4Addr,
15 pub peer_asn: u32,
16 pub is_post_policy: bool,
17 pub is_adj_rib_out: bool,
18}
19
20#[derive(Debug, Clone, PartialEq)]
21pub struct Update {
22 pub timestamp: DateTime<Utc>,
23 pub router_addr: IpAddr,
24 pub router_port: u16,
25 pub peer_addr: IpAddr,
26 pub peer_bgp_id: Ipv4Addr,
27 pub peer_asn: u32,
28 pub prefix_addr: IpAddr,
29 pub prefix_len: u8,
30 pub announced: bool,
31 pub is_post_policy: bool,
32 pub is_adj_rib_out: bool,
33 pub origin: String,
34 pub path: Vec<u32>,
35 pub communities: Vec<(u32, u16)>,
36 pub synthetic: bool,
37}
38
39pub fn decode_updates(message: RouteMonitoring, metadata: UpdateMetadata) -> Option<Vec<Update>> {
40 let mut updates = Vec::new();
41
42 match message.bgp_message {
43 bgpkit_parser::models::BgpMessage::Update(bgp_update) => {
44 let mut prefixes_to_update = Vec::new();
46 for prefix in bgp_update.announced_prefixes {
47 prefixes_to_update.push((prefix, true));
48 }
49 for prefix in bgp_update.withdrawn_prefixes {
50 prefixes_to_update.push((prefix, false));
51 }
52
53 let attributes = bgp_update.attributes;
55 if let Some(nlri) = attributes.get_reachable_nlri() {
56 for prefix in &nlri.prefixes {
57 prefixes_to_update.push((*prefix, true));
58 }
59 }
60 if let Some(nlri) = attributes.get_unreachable_nlri() {
61 for prefix in &nlri.prefixes {
62 prefixes_to_update.push((*prefix, false));
63 }
64 }
65
66 let origin = attributes.origin();
68 let path = match attributes.as_path() {
69 Some(path) => Some(path.clone()),
70 None => None,
71 };
72 let communities: Vec<MetaCommunity> = attributes.iter_communities().collect();
73
74 let timestamp = match Utc.timestamp_millis_opt(metadata.timestamp) {
75 MappedLocalTime::Single(dt) => dt,
76 _ => {
77 error!(
78 "failed to parse timestamp: {}, using Utc::now()",
79 metadata.timestamp
80 );
81 Utc::now()
82 }
83 };
84
85 for (prefix, announced) in prefixes_to_update {
86 updates.push(Update {
87 timestamp,
88 router_addr: metadata.router_addr,
89 router_port: metadata.router_port,
90 peer_addr: metadata.peer_addr,
91 peer_bgp_id: metadata.peer_bgp_id,
92 peer_asn: metadata.peer_asn,
93 prefix_addr: map_to_ipv6(prefix.prefix.addr()),
94 prefix_len: prefix.prefix.prefix_len(),
95 announced,
96 is_post_policy: metadata.is_post_policy,
97 is_adj_rib_out: metadata.is_adj_rib_out,
98 origin: origin.to_string(),
99 path: new_path(path.clone()),
100 communities: new_communities(&communities.clone()),
101 synthetic: false,
102 });
103 }
104
105 Some(updates)
106 }
107 _ => None,
108 }
109}
110
111pub fn new_metadata(
112 router_addr: IpAddr,
113 router_port: u16,
114 message: BmpMessage,
115) -> Option<UpdateMetadata> {
116 let Some(pph) = message.per_peer_header else {
118 return None;
119 };
120 let peer = Peer::new(pph.peer_bgp_id, pph.peer_ip, pph.peer_asn);
121
122 let timestamp = (pph.timestamp * 1000.0) as i64;
124
125 let is_post_policy = match pph.peer_flags {
126 PerPeerFlags::PeerFlags(flags) => flags.is_post_policy(),
127 PerPeerFlags::LocalRibPeerFlags(_) => false,
128 };
129
130 let is_adj_rib_out = match pph.peer_flags {
131 PerPeerFlags::PeerFlags(flags) => flags.is_adj_rib_out(),
132 PerPeerFlags::LocalRibPeerFlags(_) => false,
133 };
134
135 Some(UpdateMetadata {
136 timestamp,
137 router_addr,
138 router_port,
139 peer_addr: map_to_ipv6(peer.peer_address),
140 peer_bgp_id: peer.peer_bgp_id,
141 peer_asn: peer.peer_asn.to_u32(),
142 is_post_policy,
143 is_adj_rib_out,
144 })
145}
146
147pub fn new_peer_from_metadata(metadata: UpdateMetadata) -> Peer {
148 Peer::new(
149 metadata.peer_bgp_id,
150 metadata.peer_addr,
151 Asn::new_32bit(metadata.peer_asn),
152 )
153}
154
155pub fn new_path(path: Option<AsPath>) -> Vec<u32> {
156 match path {
157 Some(mut path) => {
158 let mut constructed_path: Vec<u32> = Vec::new();
159 path.coalesce();
160 for segment in path.into_segments_iter() {
161 if let AsPathSegment::AsSequence(dedup_asns) = segment {
162 for asn in dedup_asns {
163 constructed_path.push(asn.to_u32());
164 }
165 }
166 }
167 constructed_path
168 }
169 None => Vec::new(),
170 }
171}
172
173pub fn new_communities(communities: &[MetaCommunity]) -> Vec<(u32, u16)> {
174 let mut constructed_communities = Vec::new();
175 for community in communities {
176 match community {
177 MetaCommunity::Plain(community) => match community {
178 bgpkit_parser::models::Community::Custom(asn, value) => {
179 constructed_communities.push((asn.to_u32(), *value));
180 }
181 _ => (), },
183 _ => (), }
185 }
186 constructed_communities
187}
188
189fn map_to_ipv6(ip: IpAddr) -> IpAddr {
190 if ip.is_ipv4() {
191 format!("::ffff:{}", ip).parse().unwrap()
192 } else {
193 ip
194 }
195}