risotto_lib/
update.rs

1use bgpkit_parser::bmp::messages::RouteMonitoring;
2use bgpkit_parser::models::*;
3use chrono::{DateTime, MappedLocalTime, TimeZone, Utc};
4use core::net::IpAddr;
5use tracing::error;
6
7pub struct UpdateHeader {
8    pub timestamp: i64,
9    pub is_post_policy: bool,
10    pub is_adj_rib_out: bool,
11}
12
13#[derive(Debug, Clone, PartialEq)]
14pub struct Update {
15    pub prefix: NetworkPrefix,
16    pub announced: bool,
17    pub origin: Origin,
18    pub path: Option<AsPath>,
19    pub communities: Vec<MetaCommunity>,
20    pub is_post_policy: bool,
21    pub is_adj_rib_out: bool,
22    pub timestamp: DateTime<Utc>,
23    pub synthetic: bool,
24}
25
26pub fn decode_updates(message: RouteMonitoring, header: UpdateHeader) -> Option<Vec<Update>> {
27    let mut updates = Vec::new();
28
29    match message.bgp_message {
30        bgpkit_parser::models::BgpMessage::Update(bgp_update) => {
31            // https://datatracker.ietf.org/doc/html/rfc4271
32            let mut prefixes_to_update = Vec::new();
33            for prefix in bgp_update.announced_prefixes {
34                prefixes_to_update.push((prefix, true));
35            }
36            for prefix in bgp_update.withdrawn_prefixes {
37                prefixes_to_update.push((prefix, false));
38            }
39
40            // https://datatracker.ietf.org/doc/html/rfc4760
41            let attributes = bgp_update.attributes;
42            if let Some(nlri) = attributes.get_reachable_nlri() {
43                for prefix in &nlri.prefixes {
44                    prefixes_to_update.push((*prefix, true));
45                }
46            }
47            if let Some(nlri) = attributes.get_unreachable_nlri() {
48                for prefix in &nlri.prefixes {
49                    prefixes_to_update.push((*prefix, false));
50                }
51            }
52
53            // Get the other attributes
54            let origin = attributes.origin();
55            let path = match attributes.as_path() {
56                Some(path) => Some(path.clone()),
57                None => None,
58            };
59            let communities: Vec<MetaCommunity> = attributes.iter_communities().collect();
60
61            let timestamp = match Utc.timestamp_millis_opt(header.timestamp) {
62                MappedLocalTime::Single(dt) => dt,
63                _ => {
64                    error!(
65                        "failed to parse timestamp: {}, using Utc::now()",
66                        header.timestamp
67                    );
68                    Utc::now()
69                }
70            };
71
72            for (prefix, announced) in prefixes_to_update {
73                updates.push(Update {
74                    prefix,
75                    announced,
76                    origin,
77                    path: path.clone(),
78                    communities: communities.clone(),
79                    is_post_policy: header.is_post_policy,
80                    is_adj_rib_out: header.is_adj_rib_out,
81                    timestamp,
82                    synthetic: false,
83                });
84            }
85
86            Some(updates)
87        }
88        _ => None,
89    }
90}
91
92pub fn construct_as_path(path: Option<AsPath>) -> Vec<u32> {
93    match path {
94        Some(mut path) => {
95            let mut contructed_path: Vec<u32> = Vec::new();
96            path.coalesce();
97            for segment in path.into_segments_iter() {
98                if let AsPathSegment::AsSequence(dedup_asns) = segment {
99                    for asn in dedup_asns {
100                        contructed_path.push(asn.to_u32());
101                    }
102                }
103            }
104            contructed_path
105        }
106        None => Vec::new(),
107    }
108}
109
110pub fn construct_communities(communities: &[MetaCommunity]) -> Vec<(u32, u16)> {
111    let mut constructed_communities = Vec::new();
112    for community in communities {
113        match community {
114            MetaCommunity::Plain(community) => match community {
115                bgpkit_parser::models::Community::Custom(asn, value) => {
116                    constructed_communities.push((asn.to_u32(), *value));
117                }
118                _ => (), // TODO
119            },
120            _ => (), // TODO
121        }
122    }
123    constructed_communities
124}
125
126fn map_to_ipv6(ip: IpAddr) -> IpAddr {
127    if ip.is_ipv4() {
128        format!("::ffff:{}", ip).parse().unwrap()
129    } else {
130        ip
131    }
132}
133
134// Returns a CSV line corresponding to this schema
135// timestamp,router_addr,router_port,peer_addr,peer_bgp_id,peer_asn,prefix_addr,prefix_len,announced,is_post_policy,is_adj_rib_out,origin,path,communities,synthetic
136pub fn format_update(
137    router_addr: IpAddr,
138    router_port: u16,
139    peer: &Peer,
140    update: &mut Update,
141) -> String {
142    let as_path_str = construct_as_path(update.path.clone())
143        .iter()
144        .map(|x| x.to_string())
145        .collect::<Vec<String>>()
146        .join(",");
147    let as_path_str = format!("\"[{}]\"", as_path_str);
148
149    let communities_str = construct_communities(update.communities.as_ref())
150        .iter()
151        .map(|x| format!("({},{})", x.0, x.1))
152        .collect::<Vec<String>>()
153        .join(",");
154    let communities_str = format!("\"[{}]\"", communities_str);
155
156    let mut row: Vec<String> = Vec::new();
157    row.push(format!("{}", update.timestamp.timestamp_millis()));
158    row.push(format!("{}", map_to_ipv6(router_addr)));
159    row.push(format!("{}", router_port));
160    row.push(format!("{}", map_to_ipv6(peer.peer_address)));
161    row.push(format!("{}", peer.peer_bgp_id));
162    row.push(format!("{}", peer.peer_asn));
163    row.push(format!("{}", map_to_ipv6(update.prefix.prefix.addr())));
164    row.push(format!("{}", update.prefix.prefix.prefix_len()));
165    row.push(format!("{}", update.is_post_policy));
166    row.push(format!("{}", update.is_adj_rib_out));
167    row.push(format!("{}", update.announced));
168    row.push(format!("{}", update.origin));
169    row.push(format!("{}", as_path_str));
170    row.push(format!("{}", communities_str));
171    row.push(format!("{}", update.synthetic));
172
173    row.join(",")
174}