use std::fmt::Display;
use bgpkit_broker::{load_collectors, BrokerError};
use bgpkit_parser::Filter;
use chrono::{DateTime, NaiveDate, TimeZone, Utc};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum DataType {
Update,
Rib,
Both,
}
#[derive(Debug, Clone)]
pub struct BgpStreamConfig {
pub ts_start: String,
pub ts_end: String,
pub collectors: Vec<String>,
pub data_type: DataType,
pub filters: Option<Vec<Filter>>,
}
pub fn parse_timestamp(timestamp: &str) -> Result<DateTime<Utc>, BrokerError> {
let ts_str = timestamp.trim();
if let Ok(dt_with_tz) = DateTime::parse_from_rfc3339(ts_str) {
return Ok(dt_with_tz.with_timezone(&Utc));
}
if let Ok(naive_dt) = chrono::NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%dT%H:%M:%SZ") {
return Ok(Utc.from_utc_datetime(&naive_dt));
}
if let Ok(naive_dt) = chrono::NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%dT%H:%M:%S") {
return Ok(Utc.from_utc_datetime(&naive_dt));
}
if let Ok(naive_dt) = chrono::NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%d %H:%M:%S") {
return Ok(Utc.from_utc_datetime(&naive_dt));
}
let date_formats = [
"%Y-%m-%d", "%Y/%m/%d", "%Y.%m.%d", "%Y%m%d", ];
for format in &date_formats {
if let Ok(date) = NaiveDate::parse_from_str(ts_str, format) {
if format == &"%Y%m%d" && ts_str.len() != 8 {
continue;
}
if let Some(naive_datetime) = date.and_hms_opt(0, 0, 0) {
return Ok(Utc.from_utc_datetime(&naive_datetime));
}
}
}
if ts_str.len() >= 9 && ts_str.len() <= 13 && ts_str.chars().all(|c| c.is_ascii_digit()) {
if let Ok(timestamp) = ts_str.parse::<i64>() {
if let Some(dt) = Utc.timestamp_opt(timestamp, 0).single() {
return Ok(dt);
}
}
}
Err(BrokerError::ConfigurationError(format!(
"Invalid timestamp format '{ts_str}'. Supported formats:\n\
- Unix timestamp: '1640995200'\n\
- RFC3339 with timezone: '2022-01-01T00:00:00+00:00', '2022-01-01T00:00:00Z', '2022-01-01T05:00:00-05:00'\n\
- RFC3339 without timezone: '2022-01-01T00:00:00' (assumes UTC)\n\
- Date with time: '2022-01-01 00:00:00'\n\
- Pure date: '2022-01-01', '2022/01/01', '2022.01.01', '20220101'"
)))
}
impl BgpStreamConfig {
pub fn new<S: Display>(
ts_start: S,
ts_end: S,
collectors: &[&str],
data_type: DataType,
) -> Result<BgpStreamConfig, BrokerError> {
let ts_start = parse_timestamp(&ts_start.to_string())?;
let ts_end = parse_timestamp(&ts_end.to_string())?;
let ts_start = ts_start.format("%Y-%m-%dT%H:%M:%SZ").to_string();
let ts_end = ts_end.format("%Y-%m-%dT%H:%M:%SZ").to_string();
let collectors = validate_collectors(collectors)?;
let config = BgpStreamConfig {
ts_start,
ts_end,
collectors,
data_type,
filters: None,
};
Ok(config)
}
pub fn with_filters(mut self, filters: Vec<Filter>) -> Self {
self.filters = Some(filters);
self
}
pub fn add_filter(mut self, filter: Filter) -> Self {
let mut filters = self.filters.unwrap_or_default();
filters.push(filter);
self.filters = Some(filters);
self
}
}
pub fn validate_collectors(collectors: &[&str]) -> Result<Vec<String>, BrokerError> {
let collectors: Vec<String> = collectors.iter().map(|e| e.to_string()).collect();
let available_collectors: Vec<String> = load_collectors()?.into_iter().map(|c| c.id).collect();
let (collectors, invalid_collectors): (Vec<String>, Vec<String>) = collectors
.into_iter()
.partition(|c| available_collectors.contains(c));
for c in invalid_collectors {
eprintln!("Dropped collector {} (not valid)", c);
}
Ok(collectors)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn basic_config() -> Result<(), BrokerError> {
let config = BgpStreamConfig::new(
"1283299200",
"1283306400",
&["route-views.wide", "route-views.sydney", "cute cat"],
DataType::Rib,
)?;
assert_eq!(
vec![
String::from("route-views.wide"),
String::from("route-views.sydney")
],
config.collectors
);
assert_eq!("2010-09-01T00:00:00Z", config.ts_start);
assert_eq!("2010-09-01T02:00:00Z", config.ts_end);
Ok(())
}
}