use crate::{BgpkitBroker, BrokerError, BrokerItem};
use chrono::{DateTime, Timelike, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt::Display;
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct SnapshotFiles {
pub collector_id: String,
pub rib_url: String,
pub updates_urls: Vec<String>,
}
impl Display for SnapshotFiles {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"SnapshotFiles {{ collector_id: {}, rib_url: {}, updates_count: {} }}",
self.collector_id,
self.rib_url,
self.updates_urls.len()
)
}
}
impl BgpkitBroker {
pub fn daily_ribs(&self) -> Result<Vec<BrokerItem>, BrokerError> {
let rib_broker = self.clone().data_type("rib");
let all_ribs = rib_broker.query()?;
let daily_ribs = all_ribs
.into_iter()
.filter(|item| {
item.ts_start.hour() == 0 && item.ts_start.minute() == 0
})
.collect();
Ok(daily_ribs)
}
pub fn recent_updates(&self, hours: u32) -> Result<Vec<BrokerItem>, BrokerError> {
let now = Utc::now();
let hours_ago = now - chrono::Duration::hours(hours as i64);
let updates_broker = self
.clone()
.data_type("updates")
.ts_start(hours_ago.format("%Y-%m-%dT%H:%M:%SZ").to_string())
.ts_end(now.format("%Y-%m-%dT%H:%M:%SZ").to_string());
updates_broker.query()
}
pub fn most_diverse_collectors(
&self,
n: usize,
project: Option<&str>,
) -> Result<Vec<String>, BrokerError> {
let mut full_feed_broker = self.clone().peers_only_full_feed(true);
if let Some(proj) = project {
full_feed_broker = full_feed_broker.project(proj);
}
let peers = full_feed_broker.get_peers()?;
let mut collector_asn_sets: HashMap<String, std::collections::HashSet<u32>> =
HashMap::new();
for peer in peers {
collector_asn_sets
.entry(peer.collector.clone())
.or_default()
.insert(peer.asn);
}
if collector_asn_sets.is_empty() || n == 0 {
return Ok(Vec::new());
}
let mut selected_collectors = Vec::new();
let mut covered_asns = std::collections::HashSet::new();
let mut remaining_collectors = collector_asn_sets;
for _ in 0..n {
if remaining_collectors.is_empty() {
break;
}
let best_collector = remaining_collectors
.iter()
.max_by_key(|(_, asns)| asns.difference(&covered_asns).count())
.map(|(collector, _)| collector.clone());
if let Some(collector) = best_collector {
if let Some(asns) = remaining_collectors.remove(&collector) {
covered_asns.extend(&asns);
selected_collectors.push(collector);
}
} else {
break;
}
}
Ok(selected_collectors)
}
pub fn get_snapshot_files<S: AsRef<str>, T: Display>(
&self,
collector_ids: &[S],
timestamp: T,
) -> Result<Vec<SnapshotFiles>, BrokerError> {
let target_ts = Self::parse_timestamp(×tamp.to_string())?;
let search_start = target_ts - chrono::Duration::hours(24);
let mut results = Vec::new();
for collector_id in collector_ids {
let collector_id_str = collector_id.as_ref();
let rib_items = self
.clone()
.collector_id(collector_id_str)
.data_type("rib")
.ts_start(search_start.format("%Y-%m-%dT%H:%M:%SZ").to_string())
.ts_end(target_ts.format("%Y-%m-%dT%H:%M:%SZ").to_string())
.query()?;
let closest_rib = rib_items
.into_iter()
.filter(|item| {
let item_ts = DateTime::<Utc>::from_naive_utc_and_offset(item.ts_start, Utc);
item_ts <= target_ts
})
.max_by_key(|item| item.ts_start);
let Some(rib_item) = closest_rib else {
continue;
};
let rib_ts = DateTime::<Utc>::from_naive_utc_and_offset(rib_item.ts_start, Utc);
let updates_items = self
.clone()
.collector_id(collector_id_str)
.data_type("updates")
.ts_start(rib_ts.format("%Y-%m-%dT%H:%M:%SZ").to_string())
.ts_end(target_ts.format("%Y-%m-%dT%H:%M:%SZ").to_string())
.query()?;
let mut filtered_updates: Vec<BrokerItem> = updates_items
.into_iter()
.filter(|item| {
let item_start = DateTime::<Utc>::from_naive_utc_and_offset(item.ts_start, Utc);
let item_end = DateTime::<Utc>::from_naive_utc_and_offset(item.ts_end, Utc);
item_start >= rib_ts && item_end <= target_ts
})
.collect();
filtered_updates.sort_by_key(|item| item.ts_start);
let updates_urls: Vec<String> =
filtered_updates.into_iter().map(|item| item.url).collect();
results.push(SnapshotFiles {
collector_id: collector_id_str.to_string(),
rib_url: rib_item.url,
updates_urls,
});
}
Ok(results)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_daily_ribs() {
let broker = BgpkitBroker::new()
.ts_start("1634693400") .ts_end("1634693400");
let result = broker.daily_ribs();
assert!(result.is_ok());
let daily_ribs = result.unwrap();
for item in &daily_ribs {
assert!(item.is_rib());
assert_eq!(item.ts_start.hour(), 0);
assert_eq!(item.ts_start.minute(), 0);
}
}
#[test]
fn test_recent_updates() {
use chrono::{Duration, Utc};
let broker = BgpkitBroker::new();
let now = Utc::now();
let hours_ago = now - Duration::hours(24);
let updates_broker = broker
.clone()
.data_type("updates")
.ts_start(hours_ago.format("%Y-%m-%dT%H:%M:%SZ").to_string())
.ts_end(now.format("%Y-%m-%dT%H:%M:%SZ").to_string());
assert_eq!(
updates_broker.query_params.data_type,
Some("updates".to_string())
);
assert!(updates_broker.query_params.ts_start.is_some());
assert!(updates_broker.query_params.ts_end.is_some());
let validation_result = updates_broker.validate_configuration();
assert!(validation_result.is_ok());
}
#[test]
fn test_most_diverse_collectors() {
let broker = BgpkitBroker::new();
let result = broker.most_diverse_collectors(5, None);
assert!(result.is_ok());
let collectors = result.unwrap();
assert!(!collectors.is_empty());
assert!(collectors.len() <= 5);
let unique_collectors: std::collections::HashSet<_> = collectors.iter().collect();
assert_eq!(unique_collectors.len(), collectors.len());
}
#[test]
fn test_most_diverse_collectors_zero() {
let broker = BgpkitBroker::new();
let result = broker.most_diverse_collectors(0, None);
assert!(result.is_ok());
assert_eq!(result.unwrap().len(), 0);
}
#[test]
fn test_most_diverse_collectors_project_filter() {
let broker = BgpkitBroker::new();
let rv_result = broker.most_diverse_collectors(3, Some("routeviews"));
assert!(rv_result.is_ok());
let ripe_result = broker.most_diverse_collectors(3, Some("riperis"));
assert!(ripe_result.is_ok());
let rv_collectors = rv_result.unwrap();
let ripe_collectors = ripe_result.unwrap();
if !rv_collectors.is_empty() {
let unique_rv: std::collections::HashSet<_> = rv_collectors.iter().collect();
assert_eq!(unique_rv.len(), rv_collectors.len());
}
if !ripe_collectors.is_empty() {
let unique_ripe: std::collections::HashSet<_> = ripe_collectors.iter().collect();
assert_eq!(unique_ripe.len(), ripe_collectors.len());
}
}
#[test]
fn test_get_snapshot_files() {
let broker = BgpkitBroker::new();
let result = broker.get_snapshot_files(&["route-views2"], "2021-10-20T04:00:00Z");
assert!(result.is_ok());
let snapshots = result.unwrap();
if !snapshots.is_empty() {
let snapshot = &snapshots[0];
assert_eq!(snapshot.collector_id, "route-views2");
assert!(!snapshot.rib_url.is_empty());
assert!(snapshot.updates_urls.iter().all(|url| !url.is_empty()));
}
}
#[test]
fn test_get_snapshot_files_multiple_collectors() {
let broker = BgpkitBroker::new();
let result = broker.get_snapshot_files(&["route-views2", "rrc00"], "2021-10-20T04:00:00Z");
assert!(result.is_ok());
let snapshots = result.unwrap();
let collector_ids: std::collections::HashSet<_> =
snapshots.iter().map(|s| &s.collector_id).collect();
assert_eq!(collector_ids.len(), snapshots.len());
}
#[test]
fn test_get_snapshot_files_invalid_timestamp() {
let broker = BgpkitBroker::new();
let result = broker.get_snapshot_files(&["route-views2"], "invalid-timestamp");
assert!(result.is_err());
assert!(matches!(
result.err(),
Some(BrokerError::ConfigurationError(_))
));
}
#[test]
fn test_get_snapshot_files_empty_collectors() {
let broker = BgpkitBroker::new();
let empty: &[&str] = &[];
let result = broker.get_snapshot_files(empty, "2021-10-20T04:00:00Z");
assert!(result.is_ok());
assert!(result.unwrap().is_empty());
}
#[test]
fn test_snapshot_files_display() {
let snapshot = SnapshotFiles {
collector_id: "route-views2".to_string(),
rib_url: "http://example.com/rib.bz2".to_string(),
updates_urls: vec![
"http://example.com/updates1.bz2".to_string(),
"http://example.com/updates2.bz2".to_string(),
],
};
let display = format!("{}", snapshot);
assert!(display.contains("route-views2"));
assert!(display.contains("http://example.com/rib.bz2"));
assert!(display.contains("updates_count: 2"));
}
}