risotto_lib/
update.rs

1use bgpkit_parser::bmp::messages::{BmpMessage, PerPeerFlags, RouteMonitoring};
2use bgpkit_parser::models::*;
3use chrono::{DateTime, TimeZone, Utc};
4use core::net::IpAddr;
5use std::net::{Ipv4Addr, SocketAddr};
6use tracing::warn;
7
8#[derive(Debug, Clone, PartialEq)]
9pub struct UpdateMetadata {
10    pub time_bmp_header_ns: i64,
11    pub router_socket: SocketAddr,
12    pub peer_addr: IpAddr,
13    pub peer_bgp_id: Ipv4Addr,
14    pub peer_asn: u32,
15    pub is_post_policy: bool,
16    pub is_adj_rib_out: bool,
17}
18
19#[derive(Debug, Clone, PartialEq)]
20pub struct Update {
21    pub time_received_ns: DateTime<Utc>,
22    pub time_bmp_header_ns: DateTime<Utc>,
23    pub router_addr: IpAddr,
24    pub router_port: u16,
25    pub peer_addr: IpAddr,
26    pub peer_bgp_id: Ipv4Addr,
27    pub peer_asn: u32,
28    pub prefix_addr: IpAddr,
29    pub prefix_len: u8,
30    pub is_post_policy: bool,
31    pub is_adj_rib_out: bool,
32    pub announced: bool,
33    pub synthetic: bool,
34
35    // BGP Attributes
36    pub origin: String,
37    pub as_path: Vec<u32>,
38    pub next_hop: Option<IpAddr>,
39    pub multi_exit_discriminator: Option<u32>,
40    pub local_preference: Option<u32>,
41    pub only_to_customer: Option<u32>,
42    pub atomic_aggregate: bool,
43    pub aggregator_asn: Option<u32>,
44    pub aggregator_bgp_id: Option<u32>,
45    pub communities: Vec<(u32, u16)>,
46    pub extended_communities: Vec<(u8, u8, Vec<u8>)>, // (type_high, type_low, value)
47    pub large_communities: Vec<(u32, u32, u32)>,      // (global_admin, local_data1, local_data2)
48    pub originator_id: Option<u32>,
49    pub cluster_list: Vec<u32>,
50    pub mp_reach_afi: Option<u16>,
51    pub mp_reach_safi: Option<u8>,
52    pub mp_unreach_afi: Option<u16>,
53    pub mp_unreach_safi: Option<u8>,
54}
55
56pub fn decode_updates(message: RouteMonitoring, metadata: UpdateMetadata) -> Option<Vec<Update>> {
57    let bgp_update = match message.bgp_message {
58        bgpkit_parser::models::BgpMessage::Update(update) => update,
59        _ => return None,
60    };
61
62    // Collect all prefixes with their announcement status in one go
63    let prefixes_to_update: Vec<_> = bgp_update
64        .announced_prefixes
65        .into_iter()
66        .map(|p| (p, true))
67        .chain(
68            bgp_update
69                .withdrawn_prefixes
70                .into_iter()
71                .map(|p| (p, false)),
72        )
73        .chain(
74            bgp_update
75                .attributes
76                .get_reachable_nlri()
77                .map(|nlri| nlri.prefixes.iter().map(|&p| (p, true)))
78                .into_iter()
79                .flatten(),
80        )
81        .chain(
82            bgp_update
83                .attributes
84                .get_unreachable_nlri()
85                .map(|nlri| nlri.prefixes.iter().map(|&p| (p, false)))
86                .into_iter()
87                .flatten(),
88        )
89        .collect();
90
91    if prefixes_to_update.is_empty() {
92        return None;
93    }
94
95    // Extract all attributes once
96    let attributes = &bgp_update.attributes;
97    let communities: Vec<MetaCommunity> = attributes.iter_communities().collect();
98
99    // Parse timestamp once
100    let time_bmp_header_ns = Utc
101        .timestamp_millis_opt(metadata.time_bmp_header_ns)
102        .single()
103        .unwrap_or_else(|| {
104            warn!(
105                "failed to parse timestamp: {}, using Utc::now()",
106                metadata.time_bmp_header_ns
107            );
108            Utc::now()
109        });
110
111    // Create base update template to avoid repetition
112    let base_update = Update {
113        time_received_ns: Utc::now(),
114        time_bmp_header_ns,
115        router_addr: map_to_ipv6(metadata.router_socket.ip()),
116        router_port: metadata.router_socket.port(),
117        peer_addr: map_to_ipv6(metadata.peer_addr),
118        peer_bgp_id: metadata.peer_bgp_id,
119        peer_asn: metadata.peer_asn,
120        is_post_policy: metadata.is_post_policy,
121        is_adj_rib_out: metadata.is_adj_rib_out,
122        synthetic: false,
123
124        // BGP Attributes - simple types for easy serialization
125        origin: attributes.origin().to_string(),
126        as_path: new_path(attributes.as_path().cloned()),
127        next_hop: attributes.next_hop().map(map_to_ipv6),
128        multi_exit_discriminator: attributes.multi_exit_discriminator(),
129        local_preference: attributes.local_preference(),
130        only_to_customer: attributes.only_to_customer().map(|asn| asn.to_u32()),
131        atomic_aggregate: attributes.atomic_aggregate(),
132        aggregator_asn: attributes.aggregator().map(|(asn, _)| asn.to_u32()),
133        aggregator_bgp_id: attributes.aggregator().map(|(_, id)| u32::from(id)),
134        communities: new_communities(&communities),
135        extended_communities: extract_extended_communities(&communities),
136        large_communities: extract_large_communities(&communities),
137        originator_id: attributes.origin_id().map(|id| u32::from(id)),
138        cluster_list: attributes.clusters().map_or_else(Vec::new, |c| c.to_vec()),
139        mp_reach_afi: attributes.get_reachable_nlri().map(|nlri| match nlri.afi {
140            bgpkit_parser::models::Afi::Ipv4 => 1u16,
141            bgpkit_parser::models::Afi::Ipv6 => 2u16,
142        }),
143        mp_reach_safi: attributes.get_reachable_nlri().map(|nlri| match nlri.safi {
144            bgpkit_parser::models::Safi::Unicast => 1u8,
145            bgpkit_parser::models::Safi::Multicast => 2u8,
146            _ => 0u8,
147        }),
148        mp_unreach_afi: attributes
149            .get_unreachable_nlri()
150            .map(|nlri| match nlri.afi {
151                bgpkit_parser::models::Afi::Ipv4 => 1u16,
152                bgpkit_parser::models::Afi::Ipv6 => 2u16,
153            }),
154        mp_unreach_safi: attributes
155            .get_unreachable_nlri()
156            .map(|nlri| match nlri.safi {
157                bgpkit_parser::models::Safi::Unicast => 1u8,
158                bgpkit_parser::models::Safi::Multicast => 2u8,
159                _ => 0u8,
160            }),
161
162        // These will be overridden per prefix
163        prefix_addr: std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
164        prefix_len: 0,
165        announced: false,
166    };
167
168    // Generate updates for each prefix
169    let updates = prefixes_to_update
170        .into_iter()
171        .map(|(prefix, announced)| Update {
172            prefix_addr: map_to_ipv6(prefix.prefix.addr()),
173            prefix_len: prefix.prefix.prefix_len(),
174            announced,
175            ..base_update.clone()
176        })
177        .collect();
178
179    Some(updates)
180}
181
182pub fn new_metadata(socket: SocketAddr, message: &BmpMessage) -> Option<UpdateMetadata> {
183    // Get peer information
184    let Some(pph) = message.per_peer_header else {
185        return None;
186    };
187    let peer = Peer::new(pph.peer_bgp_id, pph.peer_ip, pph.peer_asn);
188
189    // Get header information
190    let time_bmp_header_ns = (pph.timestamp * 1000.0) as i64;
191
192    let is_post_policy = match pph.peer_flags {
193        PerPeerFlags::PeerFlags(flags) => flags.is_post_policy(),
194        PerPeerFlags::LocalRibPeerFlags(_) => false,
195    };
196
197    let is_adj_rib_out = match pph.peer_flags {
198        PerPeerFlags::PeerFlags(flags) => flags.is_adj_rib_out(),
199        PerPeerFlags::LocalRibPeerFlags(_) => false,
200    };
201
202    Some(UpdateMetadata {
203        time_bmp_header_ns,
204        router_socket: socket,
205        peer_addr: peer.peer_address,
206        peer_bgp_id: peer.peer_bgp_id,
207        peer_asn: peer.peer_asn.to_u32(),
208        is_post_policy,
209        is_adj_rib_out,
210    })
211}
212
213pub fn new_peer_from_metadata(metadata: UpdateMetadata) -> Peer {
214    Peer::new(
215        metadata.peer_bgp_id,
216        metadata.peer_addr,
217        Asn::new_32bit(metadata.peer_asn),
218    )
219}
220
221pub fn new_path(path: Option<AsPath>) -> Vec<u32> {
222    match path {
223        Some(mut path) => {
224            let mut constructed_path: Vec<u32> = Vec::new();
225            path.coalesce();
226            for segment in path.into_segments_iter() {
227                if let AsPathSegment::AsSequence(dedup_asns) = segment {
228                    for asn in dedup_asns {
229                        constructed_path.push(asn.to_u32());
230                    }
231                }
232            }
233            constructed_path
234        }
235        None => Vec::new(),
236    }
237}
238
239pub fn new_communities(communities: &[MetaCommunity]) -> Vec<(u32, u16)> {
240    communities
241        .iter()
242        .filter_map(|community| match community {
243            MetaCommunity::Plain(community) => match community {
244                bgpkit_parser::models::Community::Custom(asn, value) => {
245                    Some((asn.to_u32(), *value))
246                }
247                // Well-known communities use reserved ASN 65535 (0xFFFF)
248                bgpkit_parser::models::Community::NoExport => Some((0xFFFF, 0xFF01)),
249                bgpkit_parser::models::Community::NoAdvertise => Some((0xFFFF, 0xFF02)),
250                bgpkit_parser::models::Community::NoExportSubConfed => Some((0xFFFF, 0xFF03)),
251            },
252            // For extended and large communities, we could extract the first 32 bits
253            // but since the return type is (u32, u16), we'll skip them for now
254            // as they don't fit the standard community format
255            MetaCommunity::Extended(_)
256            | MetaCommunity::Ipv6Extended(_)
257            | MetaCommunity::Large(_) => None,
258        })
259        .collect()
260}
261
262pub fn map_to_ipv6(ip: IpAddr) -> IpAddr {
263    if ip.is_ipv4() {
264        format!("::ffff:{}", ip).parse().unwrap()
265    } else {
266        ip
267    }
268}
269
270// Helper functions for extracting community data into simple types
271fn extract_extended_communities(communities: &[MetaCommunity]) -> Vec<(u8, u8, Vec<u8>)> {
272    communities
273        .iter()
274        .filter_map(|community| match community {
275            MetaCommunity::Extended(ext_comm) => {
276                let type_byte = u8::from(ext_comm.community_type());
277                match ext_comm {
278                    bgpkit_parser::models::ExtendedCommunity::TransitiveTwoOctetAs(ec)
279                    | bgpkit_parser::models::ExtendedCommunity::NonTransitiveTwoOctetAs(ec) => {
280                        Some((
281                            type_byte,
282                            ec.subtype,
283                            [
284                                &ec.global_admin.to_u32().to_be_bytes()[2..],
285                                &ec.local_admin,
286                            ]
287                            .concat(),
288                        ))
289                    }
290                    bgpkit_parser::models::ExtendedCommunity::TransitiveIpv4Addr(ec)
291                    | bgpkit_parser::models::ExtendedCommunity::NonTransitiveIpv4Addr(ec) => {
292                        Some((
293                            type_byte,
294                            ec.subtype,
295                            [
296                                ec.global_admin.octets().as_slice(),
297                                ec.local_admin.as_slice(),
298                            ]
299                            .concat(),
300                        ))
301                    }
302                    bgpkit_parser::models::ExtendedCommunity::TransitiveFourOctetAs(ec)
303                    | bgpkit_parser::models::ExtendedCommunity::NonTransitiveFourOctetAs(ec) => {
304                        Some((
305                            type_byte,
306                            ec.subtype,
307                            [
308                                ec.global_admin.to_u32().to_be_bytes().as_slice(),
309                                ec.local_admin.as_slice(),
310                            ]
311                            .concat(),
312                        ))
313                    }
314                    bgpkit_parser::models::ExtendedCommunity::TransitiveOpaque(ec)
315                    | bgpkit_parser::models::ExtendedCommunity::NonTransitiveOpaque(ec) => {
316                        Some((type_byte, ec.subtype, ec.value.to_vec()))
317                    }
318                    bgpkit_parser::models::ExtendedCommunity::Raw(raw) => {
319                        Some((raw[0], raw[1], raw[2..].to_vec()))
320                    }
321                }
322            }
323            MetaCommunity::Ipv6Extended(ipv6_ext_comm) => {
324                let type_byte = u8::from(ipv6_ext_comm.community_type);
325                Some((
326                    type_byte,
327                    ipv6_ext_comm.subtype,
328                    [
329                        ipv6_ext_comm.global_admin.octets().as_slice(),
330                        &ipv6_ext_comm.local_admin,
331                    ]
332                    .concat(),
333                ))
334            }
335            _ => None,
336        })
337        .collect()
338}
339
340fn extract_large_communities(communities: &[MetaCommunity]) -> Vec<(u32, u32, u32)> {
341    communities
342        .iter()
343        .filter_map(|community| match community {
344            MetaCommunity::Large(large_comm) => Some((
345                large_comm.global_admin,
346                large_comm.local_data[0],
347                large_comm.local_data[1],
348            )),
349            _ => None,
350        })
351        .collect()
352}