risotto_lib/
update.rs

1use bgpkit_parser::bmp::messages::{BmpMessage, PerPeerFlags, RouteMonitoring};
2use bgpkit_parser::models::*;
3use chrono::{DateTime, MappedLocalTime, TimeZone, Utc};
4use core::net::IpAddr;
5use std::net::Ipv4Addr;
6use tracing::error;
7
8#[derive(Debug, Clone, PartialEq)]
9pub struct UpdateMetadata {
10    pub time_bmp_header_ns: i64,
11    pub router_addr: IpAddr,
12    pub router_port: u16,
13    pub peer_addr: IpAddr,
14    pub peer_bgp_id: Ipv4Addr,
15    pub peer_asn: u32,
16    pub is_post_policy: bool,
17    pub is_adj_rib_out: bool,
18}
19
20#[derive(Debug, Clone, PartialEq)]
21pub struct Update {
22    pub time_received_ns: DateTime<Utc>,
23    pub time_bmp_header_ns: DateTime<Utc>,
24    pub router_addr: IpAddr,
25    pub router_port: u16,
26    pub peer_addr: IpAddr,
27    pub peer_bgp_id: Ipv4Addr,
28    pub peer_asn: u32,
29    pub prefix_addr: IpAddr,
30    pub prefix_len: u8,
31    pub is_post_policy: bool,
32    pub is_adj_rib_out: bool,
33    pub announced: bool,
34    pub next_hop: Option<IpAddr>,
35    pub origin: String,
36    pub path: Vec<u32>,
37    pub local_preference: Option<u32>,
38    pub med: Option<u32>,
39    pub communities: Vec<(u32, u16)>,
40    pub synthetic: bool,
41}
42
43pub fn decode_updates(message: RouteMonitoring, metadata: UpdateMetadata) -> Option<Vec<Update>> {
44    let mut updates = Vec::new();
45
46    match message.bgp_message {
47        bgpkit_parser::models::BgpMessage::Update(bgp_update) => {
48            // https://datatracker.ietf.org/doc/html/rfc4271
49            let mut prefixes_to_update = Vec::new();
50            for prefix in bgp_update.announced_prefixes {
51                prefixes_to_update.push((prefix, true));
52            }
53            for prefix in bgp_update.withdrawn_prefixes {
54                prefixes_to_update.push((prefix, false));
55            }
56
57            // https://datatracker.ietf.org/doc/html/rfc4760
58            let attributes = bgp_update.attributes;
59            if let Some(nlri) = attributes.get_reachable_nlri() {
60                for prefix in &nlri.prefixes {
61                    prefixes_to_update.push((*prefix, true));
62                }
63            }
64            if let Some(nlri) = attributes.get_unreachable_nlri() {
65                for prefix in &nlri.prefixes {
66                    prefixes_to_update.push((*prefix, false));
67                }
68            }
69
70            // Get the other attributes
71            let next_hop = attributes.next_hop();
72            let origin = attributes.origin();
73            let path = match attributes.as_path() {
74                Some(path) => Some(path.clone()),
75                None => None,
76            };
77            let local_preference = attributes.local_preference();
78            let med = attributes.multi_exit_discriminator();
79            let communities: Vec<MetaCommunity> = attributes.iter_communities().collect();
80
81            let time_bmp_header_ns = match Utc.timestamp_millis_opt(metadata.time_bmp_header_ns) {
82                MappedLocalTime::Single(dt) => dt,
83                _ => {
84                    error!(
85                        "failed to parse timestamp: {}, using Utc::now()",
86                        metadata.time_bmp_header_ns
87                    );
88                    Utc::now()
89                }
90            };
91
92            for (prefix, announced) in prefixes_to_update {
93                updates.push(Update {
94                    time_received_ns: Utc::now(),
95                    time_bmp_header_ns,
96                    router_addr: metadata.router_addr,
97                    router_port: metadata.router_port,
98                    peer_addr: metadata.peer_addr,
99                    peer_bgp_id: metadata.peer_bgp_id,
100                    peer_asn: metadata.peer_asn,
101                    prefix_addr: map_to_ipv6(prefix.prefix.addr()),
102                    prefix_len: prefix.prefix.prefix_len(),
103                    is_post_policy: metadata.is_post_policy,
104                    is_adj_rib_out: metadata.is_adj_rib_out,
105                    announced,
106                    next_hop: next_hop.map(|ip| map_to_ipv6(ip)),
107                    origin: origin.to_string(),
108                    path: new_path(path.clone()),
109                    local_preference,
110                    med,
111                    communities: new_communities(&communities.clone()),
112                    synthetic: false,
113                });
114            }
115
116            Some(updates)
117        }
118        _ => None,
119    }
120}
121
122pub fn new_metadata(
123    router_addr: IpAddr,
124    router_port: u16,
125    message: &BmpMessage,
126) -> Option<UpdateMetadata> {
127    // Get peer information
128    let Some(pph) = message.per_peer_header else {
129        return None;
130    };
131    let peer = Peer::new(pph.peer_bgp_id, pph.peer_ip, pph.peer_asn);
132
133    // Get header information
134    let time_bmp_header_ns = (pph.timestamp * 1000.0) as i64;
135
136    let is_post_policy = match pph.peer_flags {
137        PerPeerFlags::PeerFlags(flags) => flags.is_post_policy(),
138        PerPeerFlags::LocalRibPeerFlags(_) => false,
139    };
140
141    let is_adj_rib_out = match pph.peer_flags {
142        PerPeerFlags::PeerFlags(flags) => flags.is_adj_rib_out(),
143        PerPeerFlags::LocalRibPeerFlags(_) => false,
144    };
145
146    Some(UpdateMetadata {
147        time_bmp_header_ns,
148        router_addr,
149        router_port,
150        peer_addr: map_to_ipv6(peer.peer_address),
151        peer_bgp_id: peer.peer_bgp_id,
152        peer_asn: peer.peer_asn.to_u32(),
153        is_post_policy,
154        is_adj_rib_out,
155    })
156}
157
158pub fn new_peer_from_metadata(metadata: UpdateMetadata) -> Peer {
159    Peer::new(
160        metadata.peer_bgp_id,
161        metadata.peer_addr,
162        Asn::new_32bit(metadata.peer_asn),
163    )
164}
165
166pub fn new_path(path: Option<AsPath>) -> Vec<u32> {
167    match path {
168        Some(mut path) => {
169            let mut constructed_path: Vec<u32> = Vec::new();
170            path.coalesce();
171            for segment in path.into_segments_iter() {
172                if let AsPathSegment::AsSequence(dedup_asns) = segment {
173                    for asn in dedup_asns {
174                        constructed_path.push(asn.to_u32());
175                    }
176                }
177            }
178            constructed_path
179        }
180        None => Vec::new(),
181    }
182}
183
184pub fn new_communities(communities: &[MetaCommunity]) -> Vec<(u32, u16)> {
185    let mut constructed_communities = Vec::new();
186    for community in communities {
187        match community {
188            MetaCommunity::Plain(community) => match community {
189                bgpkit_parser::models::Community::Custom(asn, value) => {
190                    constructed_communities.push((asn.to_u32(), *value));
191                }
192                _ => (), // TODO
193            },
194            _ => (), // TODO
195        }
196    }
197    constructed_communities
198}
199
200fn map_to_ipv6(ip: IpAddr) -> IpAddr {
201    if ip.is_ipv4() {
202        format!("::ffff:{}", ip).parse().unwrap()
203    } else {
204        ip
205    }
206}