risotto_lib/
update.rs

1use bgpkit_parser::bmp::messages::{BmpMessage, PerPeerFlags, RouteMonitoring};
2use bgpkit_parser::models::*;
3use chrono::{DateTime, MappedLocalTime, TimeZone, Utc};
4use core::net::IpAddr;
5use std::net::{Ipv4Addr, SocketAddr};
6use tracing::warn;
7
8#[derive(Debug, Clone, PartialEq)]
9pub struct UpdateMetadata {
10    pub time_bmp_header_ns: i64,
11    pub router_socket: SocketAddr,
12    pub peer_addr: IpAddr,
13    pub peer_bgp_id: Ipv4Addr,
14    pub peer_asn: u32,
15    pub is_post_policy: bool,
16    pub is_adj_rib_out: bool,
17}
18
19#[derive(Debug, Clone, PartialEq)]
20pub struct Update {
21    pub time_received_ns: DateTime<Utc>,
22    pub time_bmp_header_ns: DateTime<Utc>,
23    pub router_addr: IpAddr,
24    pub router_port: u16,
25    pub peer_addr: IpAddr,
26    pub peer_bgp_id: Ipv4Addr,
27    pub peer_asn: u32,
28    pub prefix_addr: IpAddr,
29    pub prefix_len: u8,
30    pub is_post_policy: bool,
31    pub is_adj_rib_out: bool,
32    pub announced: bool,
33    pub next_hop: Option<IpAddr>,
34    pub origin: String,
35    pub path: Vec<u32>,
36    pub local_preference: Option<u32>,
37    pub med: Option<u32>,
38    pub communities: Vec<(u32, u16)>,
39    pub synthetic: bool,
40}
41
42pub fn decode_updates(message: RouteMonitoring, metadata: UpdateMetadata) -> Option<Vec<Update>> {
43    let mut updates = Vec::new();
44
45    match message.bgp_message {
46        bgpkit_parser::models::BgpMessage::Update(bgp_update) => {
47            // https://datatracker.ietf.org/doc/html/rfc4271
48            let mut prefixes_to_update = Vec::new();
49            for prefix in bgp_update.announced_prefixes {
50                prefixes_to_update.push((prefix, true));
51            }
52            for prefix in bgp_update.withdrawn_prefixes {
53                prefixes_to_update.push((prefix, false));
54            }
55
56            // https://datatracker.ietf.org/doc/html/rfc4760
57            let attributes = bgp_update.attributes;
58            if let Some(nlri) = attributes.get_reachable_nlri() {
59                for prefix in &nlri.prefixes {
60                    prefixes_to_update.push((*prefix, true));
61                }
62            }
63            if let Some(nlri) = attributes.get_unreachable_nlri() {
64                for prefix in &nlri.prefixes {
65                    prefixes_to_update.push((*prefix, false));
66                }
67            }
68
69            // Get the other attributes
70            let next_hop = attributes.next_hop();
71            let origin = attributes.origin();
72            let path = match attributes.as_path() {
73                Some(path) => Some(path.clone()),
74                None => None,
75            };
76            let local_preference = attributes.local_preference();
77            let med = attributes.multi_exit_discriminator();
78            let communities: Vec<MetaCommunity> = attributes.iter_communities().collect();
79
80            let time_bmp_header_ns = match Utc.timestamp_millis_opt(metadata.time_bmp_header_ns) {
81                MappedLocalTime::Single(dt) => dt,
82                _ => {
83                    warn!(
84                        "failed to parse timestamp: {}, using Utc::now()",
85                        metadata.time_bmp_header_ns
86                    );
87                    Utc::now()
88                }
89            };
90
91            for (prefix, announced) in prefixes_to_update {
92                updates.push(Update {
93                    time_received_ns: Utc::now(),
94                    time_bmp_header_ns,
95                    router_addr: map_to_ipv6(metadata.router_socket.ip()),
96                    router_port: metadata.router_socket.port(),
97                    peer_addr: map_to_ipv6(metadata.peer_addr),
98                    peer_bgp_id: metadata.peer_bgp_id,
99                    peer_asn: metadata.peer_asn,
100                    prefix_addr: map_to_ipv6(prefix.prefix.addr()),
101                    prefix_len: prefix.prefix.prefix_len(),
102                    is_post_policy: metadata.is_post_policy,
103                    is_adj_rib_out: metadata.is_adj_rib_out,
104                    announced,
105                    next_hop: next_hop.map(|ip| map_to_ipv6(ip)),
106                    origin: origin.to_string(),
107                    path: new_path(path.clone()),
108                    local_preference,
109                    med,
110                    communities: new_communities(&communities.clone()),
111                    synthetic: false,
112                });
113            }
114
115            Some(updates)
116        }
117        _ => None,
118    }
119}
120
121pub fn new_metadata(socket: SocketAddr, message: &BmpMessage) -> Option<UpdateMetadata> {
122    // Get peer information
123    let Some(pph) = message.per_peer_header else {
124        return None;
125    };
126    let peer = Peer::new(pph.peer_bgp_id, pph.peer_ip, pph.peer_asn);
127
128    // Get header information
129    let time_bmp_header_ns = (pph.timestamp * 1000.0) as i64;
130
131    let is_post_policy = match pph.peer_flags {
132        PerPeerFlags::PeerFlags(flags) => flags.is_post_policy(),
133        PerPeerFlags::LocalRibPeerFlags(_) => false,
134    };
135
136    let is_adj_rib_out = match pph.peer_flags {
137        PerPeerFlags::PeerFlags(flags) => flags.is_adj_rib_out(),
138        PerPeerFlags::LocalRibPeerFlags(_) => false,
139    };
140
141    Some(UpdateMetadata {
142        time_bmp_header_ns,
143        router_socket: socket,
144        peer_addr: peer.peer_address,
145        peer_bgp_id: peer.peer_bgp_id,
146        peer_asn: peer.peer_asn.to_u32(),
147        is_post_policy,
148        is_adj_rib_out,
149    })
150}
151
152pub fn new_peer_from_metadata(metadata: UpdateMetadata) -> Peer {
153    Peer::new(
154        metadata.peer_bgp_id,
155        metadata.peer_addr,
156        Asn::new_32bit(metadata.peer_asn),
157    )
158}
159
160pub fn new_path(path: Option<AsPath>) -> Vec<u32> {
161    match path {
162        Some(mut path) => {
163            let mut constructed_path: Vec<u32> = Vec::new();
164            path.coalesce();
165            for segment in path.into_segments_iter() {
166                if let AsPathSegment::AsSequence(dedup_asns) = segment {
167                    for asn in dedup_asns {
168                        constructed_path.push(asn.to_u32());
169                    }
170                }
171            }
172            constructed_path
173        }
174        None => Vec::new(),
175    }
176}
177
178pub fn new_communities(communities: &[MetaCommunity]) -> Vec<(u32, u16)> {
179    let mut constructed_communities = Vec::new();
180    for community in communities {
181        match community {
182            MetaCommunity::Plain(community) => match community {
183                bgpkit_parser::models::Community::Custom(asn, value) => {
184                    constructed_communities.push((asn.to_u32(), *value));
185                }
186                _ => (), // TODO
187            },
188            _ => (), // TODO
189        }
190    }
191    constructed_communities
192}
193
194pub fn map_to_ipv6(ip: IpAddr) -> IpAddr {
195    if ip.is_ipv4() {
196        format!("::ffff:{}", ip).parse().unwrap()
197    } else {
198        ip
199    }
200}