1use bgpkit_parser::bmp::messages::RouteMonitoring;
2use bgpkit_parser::models::*;
3use chrono::{DateTime, MappedLocalTime, TimeZone, Utc};
4use core::net::IpAddr;
5use tracing::error;
6
7pub struct UpdateHeader {
8 pub timestamp: i64,
9 pub is_post_policy: bool,
10 pub is_adj_rib_out: bool,
11}
12
13#[derive(Debug, Clone, PartialEq)]
14pub struct Update {
15 pub prefix: NetworkPrefix,
16 pub announced: bool,
17 pub origin: Origin,
18 pub path: Option<AsPath>,
19 pub communities: Vec<MetaCommunity>,
20 pub is_post_policy: bool,
21 pub is_adj_rib_out: bool,
22 pub timestamp: DateTime<Utc>,
23 pub synthetic: bool,
24}
25
26pub fn decode_updates(message: RouteMonitoring, header: UpdateHeader) -> Option<Vec<Update>> {
27 let mut updates = Vec::new();
28
29 match message.bgp_message {
30 bgpkit_parser::models::BgpMessage::Update(bgp_update) => {
31 let mut prefixes_to_update = Vec::new();
33 for prefix in bgp_update.announced_prefixes {
34 prefixes_to_update.push((prefix, true));
35 }
36 for prefix in bgp_update.withdrawn_prefixes {
37 prefixes_to_update.push((prefix, false));
38 }
39
40 let attributes = bgp_update.attributes;
42 if let Some(nlri) = attributes.get_reachable_nlri() {
43 for prefix in &nlri.prefixes {
44 prefixes_to_update.push((*prefix, true));
45 }
46 }
47 if let Some(nlri) = attributes.get_unreachable_nlri() {
48 for prefix in &nlri.prefixes {
49 prefixes_to_update.push((*prefix, false));
50 }
51 }
52
53 let origin = attributes.origin();
55 let path = match attributes.as_path() {
56 Some(path) => Some(path.clone()),
57 None => None,
58 };
59 let communities: Vec<MetaCommunity> = attributes.iter_communities().collect();
60
61 let timestamp = match Utc.timestamp_millis_opt(header.timestamp) {
62 MappedLocalTime::Single(dt) => dt,
63 _ => {
64 error!(
65 "failed to parse timestamp: {}, using Utc::now()",
66 header.timestamp
67 );
68 Utc::now()
69 }
70 };
71
72 for (prefix, announced) in prefixes_to_update {
73 updates.push(Update {
74 prefix,
75 announced,
76 origin,
77 path: path.clone(),
78 communities: communities.clone(),
79 is_post_policy: header.is_post_policy,
80 is_adj_rib_out: header.is_adj_rib_out,
81 timestamp,
82 synthetic: false,
83 });
84 }
85
86 Some(updates)
87 }
88 _ => None,
89 }
90}
91
92pub fn construct_as_path(path: Option<AsPath>) -> Vec<u32> {
93 match path {
94 Some(mut path) => {
95 let mut contructed_path: Vec<u32> = Vec::new();
96 path.coalesce();
97 for segment in path.into_segments_iter() {
98 if let AsPathSegment::AsSequence(dedup_asns) = segment {
99 for asn in dedup_asns {
100 contructed_path.push(asn.to_u32());
101 }
102 }
103 }
104 contructed_path
105 }
106 None => Vec::new(),
107 }
108}
109
110pub fn construct_communities(communities: &[MetaCommunity]) -> Vec<(u32, u16)> {
111 let mut constructed_communities = Vec::new();
112 for community in communities {
113 match community {
114 MetaCommunity::Plain(community) => match community {
115 bgpkit_parser::models::Community::Custom(asn, value) => {
116 constructed_communities.push((asn.to_u32(), *value));
117 }
118 _ => (), },
120 _ => (), }
122 }
123 constructed_communities
124}
125
126fn map_to_ipv6(ip: IpAddr) -> IpAddr {
127 if ip.is_ipv4() {
128 format!("::ffff:{}", ip).parse().unwrap()
129 } else {
130 ip
131 }
132}
133
134pub fn format_update(
137 router_addr: IpAddr,
138 router_port: u16,
139 peer: &Peer,
140 update: &mut Update,
141) -> String {
142 let as_path_str = construct_as_path(update.path.clone())
143 .iter()
144 .map(|x| x.to_string())
145 .collect::<Vec<String>>()
146 .join(",");
147 let as_path_str = format!("\"[{}]\"", as_path_str);
148
149 let communities_str = construct_communities(update.communities.as_ref())
150 .iter()
151 .map(|x| format!("({},{})", x.0, x.1))
152 .collect::<Vec<String>>()
153 .join(",");
154 let communities_str = format!("\"[{}]\"", communities_str);
155
156 let mut row: Vec<String> = Vec::new();
157 row.push(format!("{}", update.timestamp.timestamp_millis()));
158 row.push(format!("{}", map_to_ipv6(router_addr)));
159 row.push(format!("{}", router_port));
160 row.push(format!("{}", map_to_ipv6(peer.peer_address)));
161 row.push(format!("{}", peer.peer_bgp_id));
162 row.push(format!("{}", peer.peer_asn));
163 row.push(format!("{}", map_to_ipv6(update.prefix.prefix.addr())));
164 row.push(format!("{}", update.prefix.prefix.prefix_len()));
165 row.push(format!("{}", update.is_post_policy));
166 row.push(format!("{}", update.is_adj_rib_out));
167 row.push(format!("{}", update.announced));
168 row.push(format!("{}", update.origin));
169 row.push(format!("{}", as_path_str));
170 row.push(format!("{}", communities_str));
171 row.push(format!("{}", update.synthetic));
172
173 row.join(",")
174}