#![doc(
html_logo_url = "https://raw.githubusercontent.com/bgpkit/assets/main/logos/icon-transparent.png",
html_favicon_url = "https://raw.githubusercontent.com/bgpkit/assets/main/logos/favicon.ico"
)]
#![allow(unknown_lints)]
mod collector;
#[cfg(feature = "cli")]
pub mod config;
#[cfg(feature = "cli")]
mod crawler;
#[cfg(feature = "backend")]
pub mod db;
mod error;
mod item;
mod peer;
mod query;
mod shortcuts;
#[cfg(feature = "sse")]
mod sse;
use crate::collector::DEFAULT_COLLECTORS_CONFIG;
use crate::peer::BrokerPeersResult;
use crate::query::{BrokerQueryResult, CollectorLatestResult};
use chrono::{DateTime, NaiveDate, TimeZone, Utc};
pub use collector::{load_collectors, Collector};
#[cfg(feature = "cli")]
pub use config::BrokerConfig;
#[cfg(feature = "cli")]
pub use crawler::crawl_collector;
#[cfg(feature = "backend")]
pub use db::{LocalBrokerDb, UpdatesMeta, DEFAULT_PAGE_SIZE};
pub use error::BrokerError;
pub use item::BrokerItem;
pub use peer::BrokerPeer;
pub use query::{QueryParams, SortOrder};
pub use shortcuts::SnapshotFiles;
#[cfg(feature = "sse")]
pub use sse::{BrokerItemSubscription, SseSubscriptionOptions};
use std::collections::{HashMap, HashSet};
use std::fmt::Display;
use std::net::IpAddr;
use std::path::PathBuf;
const SDK_USER_AGENT: &str = concat!("bgpkit-broker/", env!("CARGO_PKG_VERSION"));
#[derive(Clone)]
pub struct BgpkitBroker {
pub broker_url: String,
pub query_params: QueryParams,
client: reqwest::blocking::Client,
collector_project_map: HashMap<String, String>,
accept_invalid_certs: bool,
cache_dir: Option<PathBuf>,
}
impl Default for BgpkitBroker {
fn default() -> Self {
dotenvy::dotenv().ok();
let url = match std::env::var("BGPKIT_BROKER_URL") {
Ok(url) => url.trim_end_matches('/').to_string(),
Err(_) => "https://api.bgpkit.com/v3/broker".to_string(),
};
let collector_project_map = DEFAULT_COLLECTORS_CONFIG.clone().to_project_map();
let accept_invalid_certs = read_accept_invalid_certs_from_env();
let client = build_blocking_client(accept_invalid_certs);
Self {
broker_url: url,
query_params: Default::default(),
client,
collector_project_map,
accept_invalid_certs,
cache_dir: None,
}
}
}
fn read_accept_invalid_certs_from_env() -> bool {
match std::env::var("ONEIO_ACCEPT_INVALID_CERTS") {
Ok(t) => {
let l = t.to_lowercase();
l.starts_with("true") || l.starts_with("y")
}
Err(_) => false,
}
}
fn build_blocking_client(accept_invalid_certs: bool) -> reqwest::blocking::Client {
match reqwest::blocking::ClientBuilder::new()
.danger_accept_invalid_certs(accept_invalid_certs)
.user_agent(SDK_USER_AGENT)
.build()
{
Ok(c) => c,
Err(e) => {
panic!("Failed to build HTTP client for broker requests: {}", e);
}
}
}
#[cfg(feature = "sse")]
pub(crate) fn build_async_client(
accept_invalid_certs: bool,
) -> Result<reqwest::Client, BrokerError> {
reqwest::ClientBuilder::new()
.danger_accept_invalid_certs(accept_invalid_certs)
.user_agent(SDK_USER_AGENT)
.build()
.map_err(BrokerError::NetworkError)
}
impl BgpkitBroker {
pub fn new() -> Self {
Self::default()
}
pub fn broker_url<S: Display>(self, url: S) -> Self {
let broker_url = url.to_string().trim_end_matches('/').to_string();
Self {
broker_url,
query_params: self.query_params,
client: self.client,
collector_project_map: self.collector_project_map,
accept_invalid_certs: self.accept_invalid_certs,
cache_dir: self.cache_dir,
}
}
pub fn accept_invalid_certs(self) -> Self {
Self {
broker_url: self.broker_url,
query_params: self.query_params,
client: build_blocking_client(true),
collector_project_map: self.collector_project_map,
accept_invalid_certs: true,
cache_dir: self.cache_dir,
}
}
#[deprecated(since = "0.7.1", note = "Please use `accept_invalid_certs` instead.")]
pub fn disable_ssl_check(self) -> Self {
Self::accept_invalid_certs(self)
}
pub fn cache_dir<P: Into<PathBuf>>(mut self, path: P) -> Self {
let path = path.into();
if !path.exists() {
std::fs::create_dir_all(&path).expect("Failed to create cache directory");
}
self.cache_dir = Some(path);
self
}
fn cache_key(&self) -> String {
use sha2::{Digest, Sha256};
let params_str = format!(
"{}:{}:{}:{}:{}:{}:{}:{}",
self.broker_url,
self.query_params.ts_start.as_deref().unwrap_or(""),
self.query_params.ts_end.as_deref().unwrap_or(""),
self.query_params.collector_id.as_deref().unwrap_or(""),
self.query_params.project.as_deref().unwrap_or(""),
self.query_params.data_type.as_deref().unwrap_or(""),
self.query_params.page,
self.query_params.page_size
);
let mut hasher = Sha256::new();
hasher.update(params_str.as_bytes());
format!("{:x}", hasher.finalize())
}
fn load_cache(&self) -> Option<Vec<BrokerItem>> {
let cache_dir = self.cache_dir.as_ref()?;
let cache_file = cache_dir.join(self.cache_key()).with_extension("json");
if !cache_file.exists() {
return None;
}
match std::fs::read_to_string(&cache_file) {
Ok(contents) => {
match serde_json::from_str::<Vec<BrokerItem>>(&contents) {
Ok(items) => {
log::info!("Loaded {} items from cache", items.len());
Some(items)
}
Err(e) => {
log::warn!("Failed to deserialize cache file: {}", e);
None
}
}
}
Err(e) => {
log::warn!("Failed to read cache file: {}", e);
None
}
}
}
fn save_cache(&self, items: &[BrokerItem]) {
let Some(cache_dir) = self.cache_dir.as_ref() else {
return;
};
let cache_file = cache_dir.join(self.cache_key()).with_extension("json");
match serde_json::to_string(items) {
Ok(json) => {
if let Err(e) = std::fs::write(&cache_file, json) {
log::warn!("Failed to write cache file: {}", e);
} else {
log::info!("Saved {} items to cache", items.len());
}
}
Err(e) => {
log::warn!("Failed to serialize items for cache: {}", e);
}
}
}
fn parse_timestamp(timestamp: &str) -> Result<DateTime<Utc>, BrokerError> {
let ts_str = timestamp.trim();
if let Ok(dt_with_tz) = DateTime::parse_from_rfc3339(ts_str) {
return Ok(dt_with_tz.with_timezone(&Utc));
}
if let Ok(naive_dt) = chrono::NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%dT%H:%M:%SZ") {
return Ok(Utc.from_utc_datetime(&naive_dt));
}
if let Ok(naive_dt) = chrono::NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%dT%H:%M:%S") {
return Ok(Utc.from_utc_datetime(&naive_dt));
}
if let Ok(naive_dt) = chrono::NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%d %H:%M:%S") {
return Ok(Utc.from_utc_datetime(&naive_dt));
}
let date_formats = [
"%Y-%m-%d", "%Y/%m/%d", "%Y.%m.%d", "%Y%m%d", ];
for format in &date_formats {
if let Ok(date) = NaiveDate::parse_from_str(ts_str, format) {
if format == &"%Y%m%d" && ts_str.len() != 8 {
continue;
}
if let Some(naive_datetime) = date.and_hms_opt(0, 0, 0) {
return Ok(Utc.from_utc_datetime(&naive_datetime));
}
}
}
if ts_str.len() >= 9 && ts_str.len() <= 13 && ts_str.chars().all(|c| c.is_ascii_digit()) {
if let Ok(timestamp) = ts_str.parse::<i64>() {
if let Some(dt) = Utc.timestamp_opt(timestamp, 0).single() {
return Ok(dt);
}
}
}
Err(BrokerError::ConfigurationError(format!(
"Invalid timestamp format '{ts_str}'. Supported formats:\n\
- Unix timestamp: '1640995200'\n\
- RFC3339 with timezone: '2022-01-01T00:00:00+00:00', '2022-01-01T00:00:00Z', '2022-01-01T05:00:00-05:00'\n\
- RFC3339 without timezone: '2022-01-01T00:00:00' (assumes UTC)\n\
- Date with time: '2022-01-01 00:00:00'\n\
- Pure date: '2022-01-01', '2022/01/01', '2022.01.01', '20220101'"
)))
}
fn validate_configuration(&self) -> Result<QueryParams, BrokerError> {
let mut normalized_params = self.query_params.clone();
if let Some(ts) = &self.query_params.ts_start {
let parsed_datetime = Self::parse_timestamp(ts)?;
normalized_params.ts_start =
Some(parsed_datetime.format("%Y-%m-%dT%H:%M:%SZ").to_string());
}
if let Some(ts) = &self.query_params.ts_end {
let parsed_datetime = Self::parse_timestamp(ts)?;
normalized_params.ts_end =
Some(parsed_datetime.format("%Y-%m-%dT%H:%M:%SZ").to_string());
}
if let Some(collector_str) = &self.query_params.collector_id {
let collectors: Vec<String> = collector_str
.split(',')
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.map(|s| s.to_string())
.collect();
if collectors.is_empty() {
return Err(BrokerError::ConfigurationError(
"Collector ID cannot be empty".to_string(),
));
}
let mut seen = HashSet::new();
let mut deduped = Vec::with_capacity(collectors.len());
for c in collectors {
if seen.insert(c.clone()) {
deduped.push(c);
}
}
normalized_params.collector_id = Some(deduped.join(","));
}
if let Some(project_str) = &self.query_params.project {
let project_lower = project_str.to_lowercase();
match project_lower.as_str() {
"rrc" | "riperis" | "ripe_ris" | "routeviews" | "route_views" | "rv" => {
}
_ => {
return Err(BrokerError::ConfigurationError(format!(
"Invalid project '{project_str}'. Valid projects are: 'riperis' (aliases: 'rrc', 'ripe_ris') or 'routeviews' (aliases: 'route_views', 'rv')"
)));
}
}
}
if let Some(data_type_str) = &self.query_params.data_type {
let data_type_lower = data_type_str.to_lowercase();
match data_type_lower.as_str() {
"rib" | "ribs" | "r" | "update" | "updates" => {
}
_ => {
return Err(BrokerError::ConfigurationError(format!(
"Invalid data type '{data_type_str}'. Valid data types are: 'rib' (aliases: 'ribs', 'r') or 'updates' (alias: 'update')"
)));
}
}
}
if self.query_params.page < 1 {
return Err(BrokerError::ConfigurationError(format!(
"Invalid page number {}. Page number must be >= 1",
self.query_params.page
)));
}
if !(1..=100000).contains(&self.query_params.page_size) {
return Err(BrokerError::ConfigurationError(format!(
"Invalid page size {}. Page size must be between 1 and 100000",
self.query_params.page_size
)));
}
Ok(normalized_params)
}
pub fn ts_start<S: Display>(self, ts_start: S) -> Self {
let mut query_params = self.query_params;
query_params.ts_start = Some(ts_start.to_string());
Self {
broker_url: self.broker_url,
query_params,
client: self.client,
collector_project_map: self.collector_project_map,
accept_invalid_certs: self.accept_invalid_certs,
cache_dir: self.cache_dir,
}
}
pub fn ts_end<S: Display>(self, ts_end: S) -> Self {
let mut query_params = self.query_params;
query_params.ts_end = Some(ts_end.to_string());
Self {
broker_url: self.broker_url,
client: self.client,
query_params,
collector_project_map: self.collector_project_map,
accept_invalid_certs: self.accept_invalid_certs,
cache_dir: self.cache_dir,
}
}
pub fn collector_id<S: Display>(self, collector_id: S) -> Self {
let mut query_params = self.query_params;
query_params.collector_id = Some(collector_id.to_string());
Self {
client: self.client,
broker_url: self.broker_url,
query_params,
collector_project_map: self.collector_project_map,
accept_invalid_certs: self.accept_invalid_certs,
cache_dir: self.cache_dir,
}
}
pub fn project<S: Display>(self, project: S) -> Self {
let mut query_params = self.query_params;
query_params.project = Some(project.to_string());
Self {
client: self.client,
broker_url: self.broker_url,
query_params,
collector_project_map: self.collector_project_map,
accept_invalid_certs: self.accept_invalid_certs,
cache_dir: self.cache_dir,
}
}
pub fn data_type<S: Display>(self, data_type: S) -> Self {
let mut query_params = self.query_params;
query_params.data_type = Some(data_type.to_string());
Self {
broker_url: self.broker_url,
client: self.client,
query_params,
collector_project_map: self.collector_project_map,
accept_invalid_certs: self.accept_invalid_certs,
cache_dir: self.cache_dir,
}
}
pub fn page(self, page: i64) -> Self {
let mut query_params = self.query_params;
query_params.page = page;
Self {
broker_url: self.broker_url,
client: self.client,
query_params,
collector_project_map: self.collector_project_map,
accept_invalid_certs: self.accept_invalid_certs,
cache_dir: self.cache_dir,
}
}
pub fn page_size(self, page_size: i64) -> Self {
let mut query_params = self.query_params;
query_params.page_size = page_size;
Self {
broker_url: self.broker_url,
client: self.client,
query_params,
collector_project_map: self.collector_project_map,
accept_invalid_certs: self.accept_invalid_certs,
cache_dir: self.cache_dir,
}
}
pub fn peers_ip(self, peer_ip: IpAddr) -> Self {
let mut query_params = self.query_params;
query_params.peers_ip = Some(peer_ip);
Self {
broker_url: self.broker_url,
client: self.client,
query_params,
collector_project_map: self.collector_project_map,
accept_invalid_certs: self.accept_invalid_certs,
cache_dir: self.cache_dir,
}
}
pub fn peers_asn(self, peer_asn: u32) -> Self {
let mut query_params = self.query_params;
query_params.peers_asn = Some(peer_asn);
Self {
broker_url: self.broker_url,
client: self.client,
query_params,
collector_project_map: self.collector_project_map,
accept_invalid_certs: self.accept_invalid_certs,
cache_dir: self.cache_dir,
}
}
pub fn peers_only_full_feed(self, peer_full_feed: bool) -> Self {
let mut query_params = self.query_params;
query_params.peers_only_full_feed = peer_full_feed;
Self {
broker_url: self.broker_url,
client: self.client,
query_params,
collector_project_map: self.collector_project_map,
accept_invalid_certs: self.accept_invalid_certs,
cache_dir: self.cache_dir,
}
}
pub fn turn_page(&mut self, page: i64) {
self.query_params.page = page;
}
pub fn query_single_page(&self) -> Result<Vec<BrokerItem>, BrokerError> {
if let Some(cached_items) = self.load_cache() {
return Ok(cached_items);
}
let validated_params = self.validate_configuration()?;
let url = format!("{}/search{}", &self.broker_url, &validated_params);
log::info!("sending broker query to {}", &url);
match self.run_files_query(url.as_str()) {
Ok(res) => {
self.save_cache(&res.data);
Ok(res.data)
}
Err(e) => Err(e),
}
}
pub fn query_total_count(&self) -> Result<i64, BrokerError> {
let validated_params = self.validate_configuration()?;
let url = format!("{}/search{}", &self.broker_url, &validated_params);
match self.run_files_query(url.as_str()) {
Ok(res) => res.total.ok_or(BrokerError::BrokerError(
"count not found in response".to_string(),
)),
Err(e) => Err(e),
}
}
pub fn health_check(&self) -> Result<(), BrokerError> {
let url = format!("{}/health", &self.broker_url.trim_end_matches('/'));
match self.client.get(url.as_str()).send() {
Ok(response) => {
if response.status() == reqwest::StatusCode::OK {
Ok(())
} else {
Err(BrokerError::BrokerError(format!(
"endpoint unhealthy {}",
self.broker_url
)))
}
}
Err(_e) => Err(BrokerError::BrokerError(format!(
"endpoint unhealthy {}",
self.broker_url
))),
}
}
pub fn query(&self) -> Result<Vec<BrokerItem>, BrokerError> {
let mut p = self.validate_configuration()?;
let mut items = vec![];
loop {
let url = format!("{}/search{}", &self.broker_url, &p);
let res_items = self.run_files_query(url.as_str())?.data;
let items_count = res_items.len() as i64;
if items_count == 0 {
break;
}
items.extend(res_items);
let cur_page = p.page;
p = p.page(cur_page + 1);
if items_count < p.page_size {
break;
}
}
Ok(items)
}
pub fn latest(&self) -> Result<Vec<BrokerItem>, BrokerError> {
let latest_query_url = format!("{}/latest", self.broker_url);
let mut items = match self.client.get(latest_query_url.as_str()).send() {
Ok(response) => match response.json::<CollectorLatestResult>() {
Ok(result) => result.data,
Err(_) => {
return Err(BrokerError::BrokerError(
"Error parsing response".to_string(),
));
}
},
Err(e) => {
return Err(BrokerError::BrokerError(format!(
"Unable to connect to the URL ({latest_query_url}): {e}"
)));
}
};
items.retain(|item| {
let mut matches = true;
if let Some(project) = &self.query_params.project {
match project.to_lowercase().as_str() {
"rrc" | "riperis" | "ripe_ris" => {
matches = self
.collector_project_map
.get(&item.collector_id)
.cloned()
.unwrap_or_default()
.as_str()
== "riperis";
}
"routeviews" | "route_views" | "rv" => {
matches = self
.collector_project_map
.get(&item.collector_id)
.cloned()
.unwrap_or_default()
.as_str()
== "routeviews";
}
_ => {}
}
}
if let Some(data_type) = &self.query_params.data_type {
match data_type.to_lowercase().as_str() {
"rib" | "ribs" | "r" => {
if !item.is_rib() {
matches = false
}
}
"update" | "updates" => {
if item.is_rib() {
matches = false
}
}
_ => {}
}
}
if let Some(collector_id) = &self.query_params.collector_id {
let wanted: HashSet<&str> = collector_id
.split(',')
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.collect();
if !wanted.contains(item.collector_id.as_str()) {
return false;
}
}
matches
});
Ok(items)
}
pub fn get_peers(&self) -> Result<Vec<BrokerPeer>, BrokerError> {
let mut url = format!("{}/peers", self.broker_url);
let mut param_strings = vec![];
if let Some(ip) = &self.query_params.peers_ip {
param_strings.push(format!("ip={ip}"));
}
if let Some(asn) = &self.query_params.peers_asn {
param_strings.push(format!("asn={asn}"));
}
if self.query_params.peers_only_full_feed {
param_strings.push("full_feed=true".to_string());
}
if let Some(collector_id) = &self.query_params.collector_id {
param_strings.push(format!("collector={collector_id}"));
}
if !param_strings.is_empty() {
let param_string = param_strings.join("&");
url = format!("{url}?{param_string}");
}
let peers = match self.client.get(url.as_str()).send() {
Ok(response) => match response.json::<BrokerPeersResult>() {
Ok(result) => result.data,
Err(_) => {
return Err(BrokerError::BrokerError(
"Error parsing response".to_string(),
));
}
},
Err(e) => {
return Err(BrokerError::BrokerError(format!(
"Unable to connect to the URL ({url}): {e}"
)));
}
};
Ok(peers)
}
fn run_files_query(&self, url: &str) -> Result<BrokerQueryResult, BrokerError> {
log::info!("sending broker query to {}", &url);
match self.client.get(url).send() {
Ok(res) => match res.json::<BrokerQueryResult>() {
Ok(res) => {
if let Some(e) = res.error {
Err(BrokerError::BrokerError(e))
} else {
Ok(res)
}
}
Err(e) => {
Err(BrokerError::BrokerError(e.to_string()))
}
},
Err(e) => Err(BrokerError::from(e)),
}
}
}
pub struct BrokerItemIterator {
broker: BgpkitBroker,
cached_items: Vec<BrokerItem>,
first_run: bool,
}
impl BrokerItemIterator {
pub fn new(broker: BgpkitBroker) -> BrokerItemIterator {
BrokerItemIterator {
broker,
cached_items: vec![],
first_run: true,
}
}
}
impl Iterator for BrokerItemIterator {
type Item = BrokerItem;
fn next(&mut self) -> Option<Self::Item> {
if let Some(item) = self.cached_items.pop() {
return Some(item);
}
if self.first_run {
self.first_run = false;
} else {
self.broker.query_params.page += 1;
}
let items = match self.broker.query_single_page() {
Ok(i) => i,
Err(_) => return None,
};
if items.is_empty() {
return None;
} else {
self.cached_items = items;
self.cached_items.reverse();
}
#[allow(clippy::unwrap_used)]
Some(self.cached_items.pop().unwrap())
}
}
impl IntoIterator for BgpkitBroker {
type Item = BrokerItem;
type IntoIter = BrokerItemIterator;
fn into_iter(self) -> Self::IntoIter {
BrokerItemIterator::new(self)
}
}
impl IntoIterator for &BgpkitBroker {
type Item = BrokerItem;
type IntoIter = BrokerItemIterator;
fn into_iter(self) -> Self::IntoIter {
BrokerItemIterator::new(self.clone())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_query() {
let broker = BgpkitBroker::new()
.ts_start("1634693400")
.ts_end("1634693400");
let res = broker.query();
assert!(&res.is_ok());
let data = res.unwrap();
assert!(!data.is_empty());
}
#[test]
fn test_network_error() {
let broker = BgpkitBroker::new().broker_url("https://api.broker.example.com/v2");
let res = broker.query();
assert!(res.is_err());
assert!(matches!(res.err(), Some(BrokerError::NetworkError(_))));
}
#[test]
fn test_broker_error() {
let broker = BgpkitBroker::new().page(-1);
let result = broker.query();
assert!(result.is_err());
assert!(matches!(
result.err(),
Some(BrokerError::ConfigurationError(_))
));
}
#[test]
fn test_query_all() {
let broker = BgpkitBroker::new()
.ts_start("1634693400")
.ts_end("1634693400")
.page_size(100);
let res = broker.query();
assert!(res.is_ok());
assert!(res.ok().unwrap().len() >= 54);
}
#[test]
fn test_iterator() {
let broker = BgpkitBroker::new()
.ts_start("1634693400")
.ts_end("1634693400");
assert!(broker.into_iter().count() >= 54);
}
#[test]
fn test_filters() {
let broker = BgpkitBroker::new()
.ts_start("1634693400")
.ts_end("1634693400");
let items = broker.query().unwrap();
assert!(items.len() >= 54);
let broker = BgpkitBroker::new()
.ts_start("1634693400")
.ts_end("1634693400")
.collector_id("rrc00");
let items = broker.query().unwrap();
assert_eq!(items.len(), 1);
let broker = BgpkitBroker::new()
.ts_start("1634693400")
.ts_end("1634693400")
.project("riperis");
let items = broker.query().unwrap();
assert_eq!(items.len(), 23);
}
#[test]
fn test_latest() {
let broker = BgpkitBroker::new();
let items = broker.latest().unwrap();
assert!(items.len() >= 125);
let broker = BgpkitBroker::new().project("routeviews".to_string());
let items = broker.latest().unwrap();
assert!(!items.is_empty());
assert!(items
.iter()
.all(|item| !item.collector_id.starts_with("rrc")));
let broker = BgpkitBroker::new().project("riperis".to_string());
let items = broker.latest().unwrap();
assert!(!items.is_empty());
assert!(items
.iter()
.all(|item| item.collector_id.starts_with("rrc")));
let broker = BgpkitBroker::new().data_type("rib".to_string());
let items = broker.latest().unwrap();
assert!(!items.is_empty());
assert!(items.iter().all(|item| item.is_rib()));
let broker = BgpkitBroker::new().data_type("update".to_string());
let items = broker.latest().unwrap();
assert!(!items.is_empty());
assert!(items.iter().all(|item| !item.is_rib()));
let broker = BgpkitBroker::new().collector_id("rrc00".to_string());
let items = broker.latest().unwrap();
assert!(!items.is_empty());
assert!(items
.iter()
.all(|item| item.collector_id.as_str() == "rrc00"));
assert_eq!(items.len(), 2);
}
#[test]
fn test_latest_no_ssl() {
let broker = BgpkitBroker::new().accept_invalid_certs();
let items = broker.latest().unwrap();
assert!(items.len() >= 125);
}
#[test]
fn test_health_check() {
let broker = BgpkitBroker::new();
let res = broker.health_check();
assert!(res.is_ok());
}
#[test]
fn test_peers() {
let broker = BgpkitBroker::new();
let all_peers = broker.get_peers().unwrap();
assert!(!all_peers.is_empty());
let first_peer = all_peers.first().unwrap();
let first_ip = first_peer.ip;
let first_asn = first_peer.asn;
let broker = BgpkitBroker::new().peers_ip(first_ip);
let peers = broker.get_peers().unwrap();
assert!(!peers.is_empty());
let broker = BgpkitBroker::new().peers_asn(first_asn);
let peers = broker.get_peers().unwrap();
assert!(!peers.is_empty());
let broker = BgpkitBroker::new().peers_only_full_feed(true);
let full_feed_peers = broker.get_peers().unwrap();
assert!(!full_feed_peers.is_empty());
assert!(full_feed_peers.len() < all_peers.len());
let broker = BgpkitBroker::new().collector_id("rrc00");
let rrc_peers = broker.get_peers().unwrap();
assert!(!rrc_peers.is_empty());
assert!(rrc_peers.iter().all(|peer| peer.collector == "rrc00"));
let broker = BgpkitBroker::new().collector_id("rrc00,route-views2");
let rrc_rv_peers = broker.get_peers().unwrap();
assert!(!rrc_rv_peers.is_empty());
assert!(rrc_rv_peers
.iter()
.any(|peer| peer.collector == "rrc00" || peer.collector == "route-views2"));
assert!(rrc_rv_peers.len() > rrc_peers.len());
}
#[test]
fn test_timestamp_parsing_unix() {
let broker = BgpkitBroker::new();
let result = broker.clone().ts_start("1640995200");
assert_eq!(result.query_params.ts_start, Some("1640995200".to_string()));
let result = broker.clone().ts_end("1640995200");
assert_eq!(result.query_params.ts_end, Some("1640995200".to_string()));
}
#[test]
fn test_timestamp_parsing_rfc3339() {
let broker = BgpkitBroker::new();
let result = broker.clone().ts_start("2022-01-01T00:00:00Z");
assert_eq!(
result.query_params.ts_start,
Some("2022-01-01T00:00:00Z".to_string())
);
let result = broker.clone().ts_start("2022-01-01T12:30:45");
assert_eq!(
result.query_params.ts_start,
Some("2022-01-01T12:30:45".to_string())
);
let result = broker.clone().ts_end("2022-01-01 12:30:45");
assert_eq!(
result.query_params.ts_end,
Some("2022-01-01 12:30:45".to_string())
);
}
#[test]
fn test_timestamp_parsing_pure_dates() {
let broker = BgpkitBroker::new();
let result = broker.clone().ts_start("2022-01-01");
assert_eq!(result.query_params.ts_start, Some("2022-01-01".to_string()));
let result = broker.clone().ts_start("2022/01/01");
assert_eq!(result.query_params.ts_start, Some("2022/01/01".to_string()));
let result = broker.clone().ts_end("2022.01.01");
assert_eq!(result.query_params.ts_end, Some("2022.01.01".to_string()));
let result = broker.clone().ts_end("20220101");
assert_eq!(result.query_params.ts_end, Some("20220101".to_string()));
}
#[test]
fn test_timestamp_parsing_whitespace() {
let broker = BgpkitBroker::new();
let result = broker.clone().ts_start(" 2022-01-01 ");
assert_eq!(
result.query_params.ts_start,
Some(" 2022-01-01 ".to_string())
);
let result = broker.clone().ts_end("\t1640995200\n");
assert_eq!(
result.query_params.ts_end,
Some("\t1640995200\n".to_string())
);
}
#[test]
fn test_timestamp_parsing_errors() {
let broker = BgpkitBroker::new();
let broker_with_invalid = broker.clone().ts_start("invalid-timestamp");
let result = broker_with_invalid.query();
assert!(result.is_err());
assert!(matches!(
result.err(),
Some(BrokerError::ConfigurationError(_))
));
let broker_with_invalid = broker.clone().ts_end("2022-13-01");
let result = broker_with_invalid.query();
assert!(result.is_err());
assert!(matches!(
result.err(),
Some(BrokerError::ConfigurationError(_))
));
let broker_with_invalid = broker.clone().ts_start("20221301");
let result = broker_with_invalid.query();
assert!(result.is_err());
assert!(matches!(
result.err(),
Some(BrokerError::ConfigurationError(_))
));
let broker_with_invalid = broker.clone().ts_start("2022-01");
let result = broker_with_invalid.query();
assert!(result.is_err());
assert!(matches!(
result.err(),
Some(BrokerError::ConfigurationError(_))
));
}
#[test]
fn test_parse_timestamp_direct() {
use chrono::{NaiveDate, NaiveDateTime};
let expected_unix = Utc.timestamp_opt(1640995200, 0).single().unwrap();
assert_eq!(
BgpkitBroker::parse_timestamp("1640995200").unwrap(),
expected_unix
);
let expected_rfc3339_z = Utc.from_utc_datetime(
&NaiveDateTime::parse_from_str("2022-01-01T00:00:00", "%Y-%m-%dT%H:%M:%S").unwrap(),
);
assert_eq!(
BgpkitBroker::parse_timestamp("2022-01-01T00:00:00Z").unwrap(),
expected_rfc3339_z
);
let expected_rfc3339_no_z = Utc.from_utc_datetime(
&NaiveDateTime::parse_from_str("2022-01-01T12:30:45", "%Y-%m-%dT%H:%M:%S").unwrap(),
);
assert_eq!(
BgpkitBroker::parse_timestamp("2022-01-01T12:30:45").unwrap(),
expected_rfc3339_no_z
);
let expected_space_format = Utc.from_utc_datetime(
&NaiveDateTime::parse_from_str("2022-01-01 12:30:45", "%Y-%m-%d %H:%M:%S").unwrap(),
);
assert_eq!(
BgpkitBroker::parse_timestamp("2022-01-01 12:30:45").unwrap(),
expected_space_format
);
let expected_date = Utc.from_utc_datetime(
&NaiveDate::from_ymd_opt(2022, 1, 1)
.unwrap()
.and_hms_opt(0, 0, 0)
.unwrap(),
);
assert_eq!(
BgpkitBroker::parse_timestamp("2022-01-01").unwrap(),
expected_date
);
assert_eq!(
BgpkitBroker::parse_timestamp("2022/01/01").unwrap(),
expected_date
);
assert_eq!(
BgpkitBroker::parse_timestamp("2022.01.01").unwrap(),
expected_date
);
assert_eq!(
BgpkitBroker::parse_timestamp("20220101").unwrap(),
expected_date
);
let result_plus_tz = BgpkitBroker::parse_timestamp("2022-01-01T00:00:00+00:00").unwrap();
assert_eq!(result_plus_tz, expected_date);
println!("✓ +00:00 timezone format works");
let result_minus_tz = BgpkitBroker::parse_timestamp("2022-01-01T05:00:00-05:00").unwrap();
let expected_10am = Utc.with_ymd_and_hms(2022, 1, 1, 10, 0, 0).unwrap();
assert_eq!(result_minus_tz, expected_10am);
println!("✓ -05:00 timezone format works (05:00-05:00 = 10:00Z)");
assert!(BgpkitBroker::parse_timestamp("invalid").is_err());
assert!(BgpkitBroker::parse_timestamp("2022-13-01").is_err());
assert!(BgpkitBroker::parse_timestamp("2022-01").is_err());
}
#[test]
fn test_collector_id_validation() {
let broker = BgpkitBroker::new();
let broker_valid = broker.clone().collector_id("rrc00");
let result = broker_valid.validate_configuration();
assert!(result.is_ok());
let broker_valid = broker.clone().collector_id("rrc00,route-views2");
let result = broker_valid.validate_configuration();
assert!(result.is_ok());
let broker_unknown = broker.clone().collector_id("brand-new-collector");
let result = broker_unknown.validate_configuration();
assert!(result.is_ok());
let broker_mixed = broker.clone().collector_id("rrc00,brand-new-collector");
let result = broker_mixed.validate_configuration();
assert!(result.is_ok());
let broker_empty = broker.clone().collector_id(", , ,");
let result = broker_empty.validate_configuration();
assert!(result.is_err());
assert!(matches!(
result.err(),
Some(BrokerError::ConfigurationError(_))
));
}
#[test]
fn test_project_validation() {
let broker = BgpkitBroker::new();
let broker_valid = broker.clone().project("riperis");
let result = broker_valid.validate_configuration();
assert!(result.is_ok());
let broker_valid = broker.clone().project("routeviews");
let result = broker_valid.validate_configuration();
assert!(result.is_ok());
let broker_valid = broker.clone().project("rrc");
let result = broker_valid.validate_configuration();
assert!(result.is_ok());
let broker_valid = broker.clone().project("rv");
let result = broker_valid.validate_configuration();
assert!(result.is_ok());
let broker_invalid = broker.clone().project("invalid-project");
let result = broker_invalid.validate_configuration();
assert!(result.is_err());
assert!(matches!(
result.err(),
Some(BrokerError::ConfigurationError(_))
));
}
#[test]
fn test_data_type_validation() {
let broker = BgpkitBroker::new();
let broker_valid = broker.clone().data_type("rib");
let result = broker_valid.validate_configuration();
assert!(result.is_ok());
let broker_valid = broker.clone().data_type("updates");
let result = broker_valid.validate_configuration();
assert!(result.is_ok());
let broker_valid = broker.clone().data_type("ribs");
let result = broker_valid.validate_configuration();
assert!(result.is_ok());
let broker_valid = broker.clone().data_type("update");
let result = broker_valid.validate_configuration();
assert!(result.is_ok());
let broker_invalid = broker.clone().data_type("invalid-type");
let result = broker_invalid.validate_configuration();
assert!(result.is_err());
assert!(matches!(
result.err(),
Some(BrokerError::ConfigurationError(_))
));
}
#[test]
fn test_page_validation() {
let broker = BgpkitBroker::new();
let broker_valid = broker.clone().page(1);
let result = broker_valid.validate_configuration();
assert!(result.is_ok());
let broker_valid = broker.clone().page(100);
let result = broker_valid.validate_configuration();
assert!(result.is_ok());
let broker_invalid = broker.clone().page(0);
let result = broker_invalid.validate_configuration();
assert!(result.is_err());
assert!(matches!(
result.err(),
Some(BrokerError::ConfigurationError(_))
));
}
#[test]
fn test_page_size_validation() {
let broker = BgpkitBroker::new();
let broker_valid = broker.clone().page_size(1);
let result = broker_valid.validate_configuration();
assert!(result.is_ok());
let broker_valid = broker.clone().page_size(100);
let result = broker_valid.validate_configuration();
assert!(result.is_ok());
let broker_valid = broker.clone().page_size(100000);
let result = broker_valid.validate_configuration();
assert!(result.is_ok());
let broker_invalid = broker.clone().page_size(0);
let result = broker_invalid.validate_configuration();
assert!(result.is_err());
assert!(matches!(
result.err(),
Some(BrokerError::ConfigurationError(_))
));
let broker_invalid = broker.clone().page_size(100001);
let result = broker_invalid.validate_configuration();
assert!(result.is_err());
assert!(matches!(
result.err(),
Some(BrokerError::ConfigurationError(_))
));
}
#[test]
fn test_method_chaining() {
let broker = BgpkitBroker::new()
.ts_start("1634693400")
.ts_end("1634693400")
.collector_id("rrc00")
.project("riperis")
.data_type("rib")
.page(1)
.page_size(10);
assert_eq!(broker.query_params.ts_start, Some("1634693400".to_string()));
assert_eq!(broker.query_params.ts_end, Some("1634693400".to_string()));
assert_eq!(broker.query_params.collector_id, Some("rrc00".to_string()));
assert_eq!(broker.query_params.project, Some("riperis".to_string()));
assert_eq!(broker.query_params.data_type, Some("rib".to_string()));
assert_eq!(broker.query_params.page, 1);
assert_eq!(broker.query_params.page_size, 10);
}
}