use crate::lens::time::TimeLens;
use anyhow::anyhow;
use anyhow::Result;
use bgpkit_parser::BgpElem;
use bgpkit_parser::BgpkitParser;
use ipnet::IpNet;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::fmt::Display;
use std::io::Read;
use std::net::IpAddr;
use std::sync::Arc;
use std::time::Instant;
#[cfg(feature = "cli")]
use clap::{Args, ValueEnum};
pub const PARSE_PROGRESS_INTERVAL: u64 = 10_000;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ParseProgress {
Started {
file_path: String,
},
Update {
messages_processed: u64,
#[serde(skip_serializing_if = "Option::is_none")]
rate: Option<f64>,
elapsed_secs: f64,
},
Completed {
total_messages: u64,
duration_secs: f64,
#[serde(skip_serializing_if = "Option::is_none")]
rate: Option<f64>,
},
}
pub type ParseProgressCallback = Arc<dyn Fn(ParseProgress) + Send + Sync>;
#[derive(Clone, Debug, Serialize, Deserialize)]
#[cfg_attr(feature = "cli", derive(ValueEnum))]
pub enum ParseElemType {
A,
W,
}
impl Display for ParseElemType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match self {
ParseElemType::A => "announcement",
ParseElemType::W => "withdrawal",
})
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[cfg_attr(feature = "cli", derive(Args))]
pub struct ParseFilters {
#[cfg_attr(feature = "cli", clap(short = 'o', long, value_delimiter = ','))]
#[serde(default)]
pub origin_asn: Vec<String>,
#[cfg_attr(feature = "cli", clap(short = 'p', long, value_delimiter = ','))]
#[serde(default)]
pub prefix: Vec<String>,
#[cfg_attr(feature = "cli", clap(short = 's', long))]
#[serde(default)]
pub include_super: bool,
#[cfg_attr(feature = "cli", clap(short = 'S', long))]
#[serde(default)]
pub include_sub: bool,
#[cfg_attr(feature = "cli", clap(short = 'j', long))]
#[serde(default)]
pub peer_ip: Vec<IpAddr>,
#[cfg_attr(feature = "cli", clap(short = 'J', long, value_delimiter = ','))]
#[serde(default)]
pub peer_asn: Vec<String>,
#[cfg_attr(
feature = "cli",
clap(
short = 'C',
long = "community",
visible_alias = "communities",
value_delimiter = ','
)
)]
#[serde(default)]
pub communities: Vec<String>,
#[cfg_attr(feature = "cli", clap(short = 'm', long, value_enum))]
pub elem_type: Option<ParseElemType>,
#[cfg_attr(feature = "cli", clap(short = 't', long, visible_alias = "ts-start"))]
pub start_ts: Option<String>,
#[cfg_attr(feature = "cli", clap(short = 'T', long, visible_alias = "ts-end"))]
pub end_ts: Option<String>,
#[cfg_attr(feature = "cli", clap(short = 'd', long))]
pub duration: Option<String>,
#[cfg_attr(feature = "cli", clap(short = 'a', long))]
pub as_path: Option<String>,
}
impl ParseFilters {
pub fn parse_start_end_strings(&self) -> Result<(i64, i64)> {
let time_lens = TimeLens::new();
let mut start_ts = None;
let mut end_ts = None;
if let Some(ts) = &self.start_ts {
match time_lens.parse_time_string(ts.as_str()) {
Ok(t) => start_ts = Some(t),
Err(_) => return Err(anyhow!("start-ts is not a valid time string: {}", ts)),
}
}
if let Some(ts) = &self.end_ts {
match time_lens.parse_time_string(ts.as_str()) {
Ok(t) => end_ts = Some(t),
Err(_) => return Err(anyhow!("end-ts is not a valid time string: {}", ts)),
}
}
match (&self.start_ts, &self.end_ts, &self.duration) {
(Some(_), Some(_), Some(_)) => {
return Err(anyhow!(
"cannot specify start_ts, end_ts, and duration all at the same time"
))
}
(Some(_), None, None) | (None, Some(_), None) => {
return Err(anyhow!(
"must specify two from: start_ts, end_ts and duration"
));
}
(None, None, _) => {
return Err(anyhow!(
"must specify two from: start_ts, end_ts and duration"
));
}
_ => {}
}
if let Some(duration) = &self.duration {
let duration = match humantime::parse_duration(duration) {
Ok(d) => d,
Err(_) => {
return Err(anyhow!(
"duration is not a valid time duration string: {}",
duration
))
}
};
if let Some(ts) = start_ts {
return Ok((ts.timestamp(), (ts + duration).timestamp()));
}
if let Some(ts) = end_ts {
return Ok(((ts - duration).timestamp(), ts.timestamp()));
}
} else {
match (start_ts, end_ts) {
(Some(start), Some(end)) => return Ok((start.timestamp(), end.timestamp())),
_ => {
return Err(anyhow!(
"Both start_ts and end_ts must be provided when duration is not set"
))
}
}
}
Err(anyhow!("unexpected time-string parsing result"))
}
pub fn validate(&self) -> Result<()> {
let time_lens = TimeLens::new();
if let Some(ts) = &self.start_ts {
if time_lens.parse_time_string(ts.as_str()).is_err() {
return Err(anyhow!("start-ts is not a valid time string: {}", ts));
}
}
if let Some(ts) = &self.end_ts {
if time_lens.parse_time_string(ts.as_str()).is_err() {
return Err(anyhow!("end-ts is not a valid time string: {}", ts));
}
}
for asn in &self.origin_asn {
Self::validate_asn(asn)?;
}
for asn in &self.peer_asn {
Self::validate_asn(asn)?;
}
for prefix in &self.prefix {
Self::validate_prefix(prefix)?;
}
for community in &self.communities {
Self::validate_community(community)?;
}
Self::check_negation_consistency(&self.origin_asn, "origin-asn")?;
Self::check_negation_consistency(&self.peer_asn, "peer-asn")?;
Self::check_negation_consistency(&self.prefix, "prefix")?;
Self::check_negation_consistency(&self.communities, "community")?;
Ok(())
}
fn validate_asn(value: &str) -> Result<()> {
let asn_str = value.strip_prefix('!').unwrap_or(value);
asn_str.parse::<u32>().map_err(|_| {
anyhow!(
"Invalid ASN '{}': must be a valid 32-bit unsigned integer",
value
)
})?;
Ok(())
}
fn validate_prefix(value: &str) -> Result<()> {
let prefix_str = value.strip_prefix('!').unwrap_or(value);
prefix_str.parse::<IpNet>().map_err(|_| {
anyhow!(
"Invalid prefix '{}': must be valid CIDR notation (e.g., 1.1.1.0/24)",
value
)
})?;
Ok(())
}
fn validate_community(value: &str) -> Result<()> {
let community_str = value.strip_prefix('!').unwrap_or(value);
let parts: Vec<&str> = community_str.split(':').collect();
match parts.len() {
2 => {
let first_valid = parts[0] == "*" || parts[0].parse::<u16>().is_ok();
let second_valid = parts[1] == "*" || parts[1].parse::<u16>().is_ok();
if !first_valid || !second_valid {
return Err(anyhow!(
"Invalid community '{}': A:B parts must each be 0-65535 or '*'",
value
));
}
}
3 => {
let all_valid = parts.iter().all(|p| *p == "*" || p.parse::<u32>().is_ok());
if !all_valid {
return Err(anyhow!(
"Invalid community '{}': A:B:C parts must each be 0-4294967295 or '*'",
value
));
}
}
_ => {
return Err(anyhow!(
"Invalid community '{}': must be A:B or A:B:C (e.g., 13335:100, *:100, 57866:104:31)",
value
));
}
}
Ok(())
}
fn community_pattern_to_regex_body(pattern: &str) -> Result<String> {
Self::validate_community(pattern)?;
let value = pattern.strip_prefix('!').unwrap_or(pattern);
let parts: Vec<&str> = value.split(':').collect();
let regex_parts = parts
.iter()
.map(|part| {
if *part == "*" {
"\\d+".to_string()
} else {
(*part).to_string()
}
})
.collect::<Vec<String>>();
Ok(regex_parts.join(":"))
}
fn build_community_filter_value(&self) -> Result<Option<String>> {
if self.communities.is_empty() {
return Ok(None);
}
Self::check_negation_consistency(&self.communities, "community")?;
let is_negated = self
.communities
.first()
.map(|v| v.starts_with('!'))
.unwrap_or(false);
let mut pattern_bodies = Vec::with_capacity(self.communities.len());
for pattern in &self.communities {
pattern_bodies.push(Self::community_pattern_to_regex_body(pattern)?);
}
let regex = format!("^(?:{})$", pattern_bodies.join("|"));
if is_negated {
Ok(Some(format!("!{}", regex)))
} else {
Ok(Some(regex))
}
}
fn check_negation_consistency(values: &[String], field_name: &str) -> Result<()> {
if values.len() > 1 {
let negated_count = values.iter().filter(|v| v.starts_with('!')).count();
if negated_count > 0 && negated_count < values.len() {
return Err(anyhow!(
"Invalid {}: cannot mix positive and negative values (all must be prefixed with ! or none)",
field_name
));
}
}
Ok(())
}
pub fn to_parser(&self, file_path: &str) -> Result<BgpkitParser<Box<dyn Read + Send>>> {
let mut parser = BgpkitParser::new(file_path)?.disable_warnings();
if let Some(v) = &self.as_path {
parser = parser.add_filter("as_path", v.to_string().as_str())?;
}
if !self.origin_asn.is_empty() {
let value = self.origin_asn.join(",");
parser = parser.add_filter("origin_asns", &value)?;
}
if !self.prefix.is_empty() {
let value = self.prefix.join(",");
let filter_key = match (self.include_super, self.include_sub) {
(false, false) => "prefixes",
(true, false) => "prefixes_super",
(false, true) => "prefixes_sub",
(true, true) => "prefixes_super_sub",
};
parser = parser.add_filter(filter_key, &value)?;
}
if !self.peer_ip.is_empty() {
let v = self.peer_ip.iter().map(|p| p.to_string()).join(",");
parser = parser.add_filter("peer_ips", v.as_str())?;
}
if !self.peer_asn.is_empty() {
let value = self.peer_asn.join(",");
parser = parser.add_filter("peer_asns", &value)?;
}
if let Some(value) = self.build_community_filter_value()? {
parser = parser.add_filter("community", &value)?;
}
if let Some(v) = &self.elem_type {
parser = parser.add_filter("type", v.to_string().as_str())?;
}
match self.parse_start_end_strings() {
Ok((start_ts, end_ts)) => {
parser = parser.add_filter("start_ts", start_ts.to_string().as_str())?;
parser = parser.add_filter("end_ts", end_ts.to_string().as_str())?;
}
Err(_) => {
let time_lens = TimeLens::new();
if let Some(v) = &self.start_ts {
let ts = time_lens.parse_time_string(v.as_str())?.timestamp();
parser = parser.add_filter("start_ts", ts.to_string().as_str())?;
}
if let Some(v) = &self.end_ts {
let ts = time_lens.parse_time_string(v.as_str())?.timestamp();
parser = parser.add_filter("end_ts", ts.to_string().as_str())?;
}
}
}
Ok(parser)
}
}
pub struct ParseLens;
impl ParseLens {
pub fn new() -> Self {
Self
}
pub fn create_parser(
&self,
filters: &ParseFilters,
file_path: &str,
) -> Result<BgpkitParser<Box<dyn Read + Send>>> {
filters.to_parser(file_path)
}
pub fn validate_filters(&self, filters: &ParseFilters) -> Result<()> {
filters.validate()
}
pub fn parse_with_progress(
&self,
filters: &ParseFilters,
file_path: &str,
callback: Option<ParseProgressCallback>,
) -> Result<Vec<BgpElem>> {
let parser = self.create_parser(filters, file_path)?;
if let Some(ref cb) = callback {
cb(ParseProgress::Started {
file_path: file_path.to_string(),
});
}
let start_time = Instant::now();
let mut messages_processed: u64 = 0;
let mut elements = Vec::new();
for elem in parser {
elements.push(elem);
messages_processed += 1;
if messages_processed.is_multiple_of(PARSE_PROGRESS_INTERVAL) {
if let Some(ref cb) = callback {
let elapsed = start_time.elapsed().as_secs_f64();
let rate = if elapsed > 0.0 {
Some(messages_processed as f64 / elapsed)
} else {
None
};
cb(ParseProgress::Update {
messages_processed,
rate,
elapsed_secs: elapsed,
});
}
}
}
if let Some(ref cb) = callback {
let duration_secs = start_time.elapsed().as_secs_f64();
let rate = if duration_secs > 0.0 {
Some(messages_processed as f64 / duration_secs)
} else {
None
};
cb(ParseProgress::Completed {
total_messages: messages_processed,
duration_secs,
rate,
});
}
Ok(elements)
}
pub fn parse_with_handler<F>(
&self,
filters: &ParseFilters,
file_path: &str,
progress_callback: Option<ParseProgressCallback>,
mut element_handler: F,
) -> Result<u64>
where
F: FnMut(BgpElem),
{
let parser = self.create_parser(filters, file_path)?;
if let Some(ref cb) = progress_callback {
cb(ParseProgress::Started {
file_path: file_path.to_string(),
});
}
let start_time = Instant::now();
let mut messages_processed: u64 = 0;
for elem in parser {
element_handler(elem);
messages_processed += 1;
if messages_processed.is_multiple_of(PARSE_PROGRESS_INTERVAL) {
if let Some(ref cb) = progress_callback {
let elapsed = start_time.elapsed().as_secs_f64();
let rate = if elapsed > 0.0 {
Some(messages_processed as f64 / elapsed)
} else {
None
};
cb(ParseProgress::Update {
messages_processed,
rate,
elapsed_secs: elapsed,
});
}
}
}
if let Some(ref cb) = progress_callback {
let duration_secs = start_time.elapsed().as_secs_f64();
let rate = if duration_secs > 0.0 {
Some(messages_processed as f64 / duration_secs)
} else {
None
};
cb(ParseProgress::Completed {
total_messages: messages_processed,
duration_secs,
rate,
});
}
Ok(messages_processed)
}
}
impl Default for ParseLens {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_progress_serialization() {
let progress = ParseProgress::Started {
file_path: "test.mrt".to_string(),
};
let json = serde_json::to_string(&progress).expect("Failed to serialize");
assert!(json.contains("test.mrt"));
let progress = ParseProgress::Update {
messages_processed: 10000,
rate: Some(5000.0),
elapsed_secs: 2.0,
};
let json = serde_json::to_string(&progress).expect("Failed to serialize");
assert!(json.contains("10000"));
assert!(json.contains("messages_processed"));
let progress = ParseProgress::Completed {
total_messages: 50000,
duration_secs: 10.0,
rate: Some(5000.0),
};
let json = serde_json::to_string(&progress).expect("Failed to serialize");
assert!(json.contains("50000"));
assert!(json.contains("duration_secs"));
}
#[test]
fn test_parse_progress_interval() {
assert_eq!(PARSE_PROGRESS_INTERVAL, 10_000);
}
#[test]
fn test_validate_asn_valid() {
assert!(ParseFilters::validate_asn("13335").is_ok());
assert!(ParseFilters::validate_asn("!13335").is_ok());
assert!(ParseFilters::validate_asn("0").is_ok());
assert!(ParseFilters::validate_asn("4294967295").is_ok()); }
#[test]
fn test_validate_asn_invalid() {
assert!(ParseFilters::validate_asn("invalid").is_err());
assert!(ParseFilters::validate_asn("!invalid").is_err());
assert!(ParseFilters::validate_asn("-1").is_err());
assert!(ParseFilters::validate_asn("4294967296").is_err()); }
#[test]
fn test_validate_prefix_valid() {
assert!(ParseFilters::validate_prefix("1.1.1.0/24").is_ok());
assert!(ParseFilters::validate_prefix("!1.1.1.0/24").is_ok());
assert!(ParseFilters::validate_prefix("2001:db8::/32").is_ok());
assert!(ParseFilters::validate_prefix("!2001:db8::/32").is_ok());
}
#[test]
fn test_validate_prefix_invalid() {
assert!(ParseFilters::validate_prefix("invalid").is_err());
assert!(ParseFilters::validate_prefix("1.1.1.1").is_err()); assert!(ParseFilters::validate_prefix("1.1.1.0/33").is_err()); }
#[test]
fn test_validate_community_valid() {
assert!(ParseFilters::validate_community("13335:100").is_ok());
assert!(ParseFilters::validate_community("!13335:100").is_ok());
assert!(ParseFilters::validate_community("0:0").is_ok());
assert!(ParseFilters::validate_community("65535:65535").is_ok());
assert!(ParseFilters::validate_community("*:100").is_ok());
assert!(ParseFilters::validate_community("13335:*").is_ok());
assert!(ParseFilters::validate_community("*:*").is_ok());
assert!(ParseFilters::validate_community("!*:100").is_ok());
assert!(ParseFilters::validate_community("57866:104:31").is_ok());
assert!(ParseFilters::validate_community("!57866:104:31").is_ok());
assert!(ParseFilters::validate_community("*:104:31").is_ok());
assert!(ParseFilters::validate_community("57866:*:*").is_ok());
assert!(ParseFilters::validate_community("4294967295:0:1").is_ok());
}
#[test]
fn test_validate_community_invalid() {
assert!(ParseFilters::validate_community("13335").is_err());
assert!(ParseFilters::validate_community("13335:").is_err());
assert!(ParseFilters::validate_community(":100").is_err());
assert!(ParseFilters::validate_community("65536:1").is_err());
assert!(ParseFilters::validate_community("1:65536").is_err());
assert!(ParseFilters::validate_community("abc:100").is_err());
assert!(ParseFilters::validate_community("13*:100").is_err());
assert!(ParseFilters::validate_community("*6:100").is_err());
assert!(ParseFilters::validate_community("1:2:3:4").is_err());
assert!(ParseFilters::validate_community("4294967296:1:1").is_err());
assert!(ParseFilters::validate_community("1::3").is_err());
}
#[test]
fn test_community_pattern_to_regex_body() {
assert_eq!(
ParseFilters::community_pattern_to_regex_body("1299:*").unwrap(),
"1299:\\d+"
);
assert_eq!(
ParseFilters::community_pattern_to_regex_body("*:100").unwrap(),
"\\d+:100"
);
assert_eq!(
ParseFilters::community_pattern_to_regex_body("57866:104:31").unwrap(),
"57866:104:31"
);
assert_eq!(
ParseFilters::community_pattern_to_regex_body("*:*:*").unwrap(),
"\\d+:\\d+:\\d+"
);
}
#[test]
fn test_build_community_filter_value() {
let filters = ParseFilters {
communities: vec!["1299:*".to_string(), "*:100".to_string()],
..Default::default()
};
assert_eq!(
filters.build_community_filter_value().unwrap(),
Some("^(?:1299:\\d+|\\d+:100)$".to_string())
);
let filters = ParseFilters {
communities: vec!["!1299:*".to_string(), "!*:100".to_string()],
..Default::default()
};
assert_eq!(
filters.build_community_filter_value().unwrap(),
Some("!^(?:1299:\\d+|\\d+:100)$".to_string())
);
}
#[test]
fn test_negation_consistency_valid() {
let values = vec!["13335".to_string(), "15169".to_string()];
assert!(ParseFilters::check_negation_consistency(&values, "test").is_ok());
let values = vec!["!13335".to_string(), "!15169".to_string()];
assert!(ParseFilters::check_negation_consistency(&values, "test").is_ok());
let values = vec!["13335".to_string()];
assert!(ParseFilters::check_negation_consistency(&values, "test").is_ok());
let values = vec!["!13335".to_string()];
assert!(ParseFilters::check_negation_consistency(&values, "test").is_ok());
let values: Vec<String> = vec![];
assert!(ParseFilters::check_negation_consistency(&values, "test").is_ok());
}
#[test]
fn test_negation_consistency_invalid() {
let values = vec!["13335".to_string(), "!15169".to_string()];
assert!(ParseFilters::check_negation_consistency(&values, "test").is_err());
let values = vec!["!13335".to_string(), "15169".to_string()];
assert!(ParseFilters::check_negation_consistency(&values, "test").is_err());
}
#[test]
fn test_parse_filters_validate() {
let filters = ParseFilters {
origin_asn: vec!["13335".to_string(), "15169".to_string()],
prefix: vec!["1.1.1.0/24".to_string()],
peer_asn: vec!["!174".to_string()],
communities: vec!["*:100".to_string()],
..Default::default()
};
assert!(filters.validate().is_ok());
let filters = ParseFilters {
origin_asn: vec!["invalid".to_string()],
..Default::default()
};
assert!(filters.validate().is_err());
let filters = ParseFilters {
prefix: vec!["not-a-prefix".to_string()],
..Default::default()
};
assert!(filters.validate().is_err());
let filters = ParseFilters {
communities: vec!["not-a-community".to_string()],
..Default::default()
};
assert!(filters.validate().is_err());
let filters = ParseFilters {
origin_asn: vec!["13335".to_string(), "!15169".to_string()],
..Default::default()
};
assert!(filters.validate().is_err());
let filters = ParseFilters {
communities: vec!["13335:100".to_string(), "!15169:100".to_string()],
..Default::default()
};
assert!(filters.validate().is_err());
}
}