risotto_lib/
update.rs

1use bgpkit_parser::bmp::messages::{BmpMessage, PerPeerFlags, RouteMonitoring};
2use bgpkit_parser::models::*;
3use chrono::{DateTime, MappedLocalTime, TimeZone, Utc};
4use core::net::IpAddr;
5use std::net::Ipv4Addr;
6use tracing::error;
7
8#[derive(Debug, Clone, PartialEq)]
9pub struct UpdateMetadata {
10    pub timestamp: i64,
11    pub router_addr: IpAddr,
12    pub router_port: u16,
13    pub peer_addr: IpAddr,
14    pub peer_bgp_id: Ipv4Addr,
15    pub peer_asn: u32,
16    pub is_post_policy: bool,
17    pub is_adj_rib_out: bool,
18}
19
20#[derive(Debug, Clone, PartialEq)]
21pub struct Update {
22    pub timestamp: DateTime<Utc>,
23    pub router_addr: IpAddr,
24    pub router_port: u16,
25    pub peer_addr: IpAddr,
26    pub peer_bgp_id: Ipv4Addr,
27    pub peer_asn: u32,
28    pub prefix_addr: IpAddr,
29    pub prefix_len: u8,
30    pub announced: bool,
31    pub is_post_policy: bool,
32    pub is_adj_rib_out: bool,
33    pub origin: String,
34    pub path: Vec<u32>,
35    pub communities: Vec<(u32, u16)>,
36    pub synthetic: bool,
37}
38
39pub fn decode_updates(message: RouteMonitoring, metadata: UpdateMetadata) -> Option<Vec<Update>> {
40    let mut updates = Vec::new();
41
42    match message.bgp_message {
43        bgpkit_parser::models::BgpMessage::Update(bgp_update) => {
44            // https://datatracker.ietf.org/doc/html/rfc4271
45            let mut prefixes_to_update = Vec::new();
46            for prefix in bgp_update.announced_prefixes {
47                prefixes_to_update.push((prefix, true));
48            }
49            for prefix in bgp_update.withdrawn_prefixes {
50                prefixes_to_update.push((prefix, false));
51            }
52
53            // https://datatracker.ietf.org/doc/html/rfc4760
54            let attributes = bgp_update.attributes;
55            if let Some(nlri) = attributes.get_reachable_nlri() {
56                for prefix in &nlri.prefixes {
57                    prefixes_to_update.push((*prefix, true));
58                }
59            }
60            if let Some(nlri) = attributes.get_unreachable_nlri() {
61                for prefix in &nlri.prefixes {
62                    prefixes_to_update.push((*prefix, false));
63                }
64            }
65
66            // Get the other attributes
67            let origin = attributes.origin();
68            let path = match attributes.as_path() {
69                Some(path) => Some(path.clone()),
70                None => None,
71            };
72            let communities: Vec<MetaCommunity> = attributes.iter_communities().collect();
73
74            let timestamp = match Utc.timestamp_millis_opt(metadata.timestamp) {
75                MappedLocalTime::Single(dt) => dt,
76                _ => {
77                    error!(
78                        "failed to parse timestamp: {}, using Utc::now()",
79                        metadata.timestamp
80                    );
81                    Utc::now()
82                }
83            };
84
85            for (prefix, announced) in prefixes_to_update {
86                updates.push(Update {
87                    timestamp,
88                    router_addr: metadata.router_addr,
89                    router_port: metadata.router_port,
90                    peer_addr: metadata.peer_addr,
91                    peer_bgp_id: metadata.peer_bgp_id,
92                    peer_asn: metadata.peer_asn,
93                    prefix_addr: map_to_ipv6(prefix.prefix.addr()),
94                    prefix_len: prefix.prefix.prefix_len(),
95                    announced,
96                    is_post_policy: metadata.is_post_policy,
97                    is_adj_rib_out: metadata.is_adj_rib_out,
98                    origin: origin.to_string(),
99                    path: new_path(path.clone()),
100                    communities: new_communities(&communities.clone()),
101                    synthetic: false,
102                });
103            }
104
105            Some(updates)
106        }
107        _ => None,
108    }
109}
110
111pub fn new_metadata(
112    router_addr: IpAddr,
113    router_port: u16,
114    message: BmpMessage,
115) -> Option<UpdateMetadata> {
116    // Get peer information
117    let Some(pph) = message.per_peer_header else {
118        return None;
119    };
120    let peer = Peer::new(pph.peer_bgp_id, pph.peer_ip, pph.peer_asn);
121
122    // Get header information
123    let timestamp = (pph.timestamp * 1000.0) as i64;
124
125    let is_post_policy = match pph.peer_flags {
126        PerPeerFlags::PeerFlags(flags) => flags.is_post_policy(),
127        PerPeerFlags::LocalRibPeerFlags(_) => false,
128    };
129
130    let is_adj_rib_out = match pph.peer_flags {
131        PerPeerFlags::PeerFlags(flags) => flags.is_adj_rib_out(),
132        PerPeerFlags::LocalRibPeerFlags(_) => false,
133    };
134
135    Some(UpdateMetadata {
136        timestamp,
137        router_addr,
138        router_port,
139        peer_addr: map_to_ipv6(peer.peer_address),
140        peer_bgp_id: peer.peer_bgp_id,
141        peer_asn: peer.peer_asn.to_u32(),
142        is_post_policy,
143        is_adj_rib_out,
144    })
145}
146
147pub fn new_peer_from_metadata(metadata: UpdateMetadata) -> Peer {
148    Peer::new(
149        metadata.peer_bgp_id,
150        metadata.peer_addr,
151        Asn::new_32bit(metadata.peer_asn),
152    )
153}
154
155pub fn new_path(path: Option<AsPath>) -> Vec<u32> {
156    match path {
157        Some(mut path) => {
158            let mut constructed_path: Vec<u32> = Vec::new();
159            path.coalesce();
160            for segment in path.into_segments_iter() {
161                if let AsPathSegment::AsSequence(dedup_asns) = segment {
162                    for asn in dedup_asns {
163                        constructed_path.push(asn.to_u32());
164                    }
165                }
166            }
167            constructed_path
168        }
169        None => Vec::new(),
170    }
171}
172
173pub fn new_communities(communities: &[MetaCommunity]) -> Vec<(u32, u16)> {
174    let mut constructed_communities = Vec::new();
175    for community in communities {
176        match community {
177            MetaCommunity::Plain(community) => match community {
178                bgpkit_parser::models::Community::Custom(asn, value) => {
179                    constructed_communities.push((asn.to_u32(), *value));
180                }
181                _ => (), // TODO
182            },
183            _ => (), // TODO
184        }
185    }
186    constructed_communities
187}
188
189fn map_to_ipv6(ip: IpAddr) -> IpAddr {
190    if ip.is_ipv4() {
191        format!("::ffff:{}", ip).parse().unwrap()
192    } else {
193        ip
194    }
195}