use bgpkit_parser::bmp::messages::RouteMonitoring;
use bgpkit_parser::models::*;
use chrono::{DateTime, MappedLocalTime, TimeZone, Utc};
use core::net::IpAddr;
use tracing::error;
pub struct UpdateHeader {
pub timestamp: i64,
pub is_post_policy: bool,
pub is_adj_rib_out: bool,
}
#[derive(Debug, Clone, PartialEq)]
pub struct Update {
pub prefix: NetworkPrefix,
pub announced: bool,
pub origin: Origin,
pub path: Option<AsPath>,
pub communities: Vec<MetaCommunity>,
pub is_post_policy: bool,
pub is_adj_rib_out: bool,
pub timestamp: DateTime<Utc>,
pub synthetic: bool,
}
pub fn decode_updates(message: RouteMonitoring, header: UpdateHeader) -> Option<Vec<Update>> {
let mut updates = Vec::new();
match message.bgp_message {
bgpkit_parser::models::BgpMessage::Update(bgp_update) => {
let mut prefixes_to_update = Vec::new();
for prefix in bgp_update.announced_prefixes {
prefixes_to_update.push((prefix, true));
}
for prefix in bgp_update.withdrawn_prefixes {
prefixes_to_update.push((prefix, false));
}
let attributes = bgp_update.attributes;
if let Some(nlri) = attributes.get_reachable_nlri() {
for prefix in &nlri.prefixes {
prefixes_to_update.push((*prefix, true));
}
}
if let Some(nlri) = attributes.get_unreachable_nlri() {
for prefix in &nlri.prefixes {
prefixes_to_update.push((*prefix, false));
}
}
let origin = attributes.origin();
let path = match attributes.as_path() {
Some(path) => Some(path.clone()),
None => None,
};
let communities: Vec<MetaCommunity> = attributes.iter_communities().collect();
let timestamp = match Utc.timestamp_millis_opt(header.timestamp) {
MappedLocalTime::Single(dt) => dt,
_ => {
error!(
"failed to parse timestamp: {}, using Utc::now()",
header.timestamp
);
Utc::now()
}
};
for (prefix, announced) in prefixes_to_update {
updates.push(Update {
prefix,
announced,
origin,
path: path.clone(),
communities: communities.clone(),
is_post_policy: header.is_post_policy,
is_adj_rib_out: header.is_adj_rib_out,
timestamp,
synthetic: false,
});
}
Some(updates)
}
_ => None,
}
}
pub fn construct_as_path(path: Option<AsPath>) -> Vec<u32> {
match path {
Some(mut path) => {
let mut contructed_path: Vec<u32> = Vec::new();
path.coalesce();
for segment in path.into_segments_iter() {
if let AsPathSegment::AsSequence(dedup_asns) = segment {
for asn in dedup_asns {
contructed_path.push(asn.to_u32());
}
}
}
contructed_path
}
None => Vec::new(),
}
}
pub fn construct_communities(communities: &[MetaCommunity]) -> Vec<(u32, u16)> {
let mut constructed_communities = Vec::new();
for community in communities {
match community {
MetaCommunity::Plain(community) => match community {
bgpkit_parser::models::Community::Custom(asn, value) => {
constructed_communities.push((asn.to_u32(), *value));
}
_ => (), },
_ => (), }
}
constructed_communities
}
fn map_to_ipv6(ip: IpAddr) -> IpAddr {
if ip.is_ipv4() {
format!("::ffff:{}", ip).parse().unwrap()
} else {
ip
}
}
pub fn format_update(
router_addr: IpAddr,
router_port: u16,
peer: &Peer,
update: &mut Update,
) -> String {
let as_path_str = construct_as_path(update.path.clone())
.iter()
.map(|x| x.to_string())
.collect::<Vec<String>>()
.join(",");
let as_path_str = format!("\"[{}]\"", as_path_str);
let communities_str = construct_communities(update.communities.as_ref())
.iter()
.map(|x| format!("({},{})", x.0, x.1))
.collect::<Vec<String>>()
.join(",");
let communities_str = format!("\"[{}]\"", communities_str);
let mut row: Vec<String> = Vec::new();
row.push(format!("{}", update.timestamp.timestamp_millis()));
row.push(format!("{}", map_to_ipv6(router_addr)));
row.push(format!("{}", router_port));
row.push(format!("{}", map_to_ipv6(peer.peer_address)));
row.push(format!("{}", peer.peer_bgp_id));
row.push(format!("{}", peer.peer_asn));
row.push(format!("{}", map_to_ipv6(update.prefix.prefix.addr())));
row.push(format!("{}", update.prefix.prefix.prefix_len()));
row.push(format!("{}", update.is_post_policy));
row.push(format!("{}", update.is_adj_rib_out));
row.push(format!("{}", update.announced));
row.push(format!("{}", update.origin));
row.push(format!("{}", as_path_str));
row.push(format!("{}", communities_str));
row.push(format!("{}", update.synthetic));
row.join(",")
}