1#![doc(
203 html_logo_url = "https://raw.githubusercontent.com/bgpkit/assets/main/logos/icon-transparent.png",
204 html_favicon_url = "https://raw.githubusercontent.com/bgpkit/assets/main/logos/favicon.ico"
205)]
206#![allow(unknown_lints)]
207
208mod collector;
209#[cfg(feature = "cli")]
210pub mod config;
211#[cfg(feature = "cli")]
212mod crawler;
213#[cfg(feature = "backend")]
214pub mod db;
215mod error;
216mod item;
217#[cfg(feature = "nats")]
218pub mod notifier;
219mod peer;
220mod query;
221mod shortcuts;
222
223use crate::collector::DEFAULT_COLLECTORS_CONFIG;
224use crate::peer::BrokerPeersResult;
225use crate::query::{BrokerQueryResult, CollectorLatestResult};
226use chrono::{DateTime, NaiveDate, TimeZone, Utc};
227pub use collector::{load_collectors, Collector};
228
229#[cfg(feature = "cli")]
230pub use config::BrokerConfig;
231#[cfg(feature = "cli")]
232pub use crawler::crawl_collector;
233#[cfg(feature = "backend")]
234pub use db::{LocalBrokerDb, UpdatesMeta, DEFAULT_PAGE_SIZE};
235pub use error::BrokerError;
236pub use item::BrokerItem;
237pub use peer::BrokerPeer;
238pub use query::{QueryParams, SortOrder};
239pub use shortcuts::SnapshotFiles;
240use std::collections::{HashMap, HashSet};
241use std::fmt::Display;
242use std::net::IpAddr;
243
244#[derive(Clone)]
248pub struct BgpkitBroker {
249 pub broker_url: String,
250 pub query_params: QueryParams,
251 client: reqwest::blocking::Client,
252 collector_project_map: HashMap<String, String>,
253}
254
255impl Default for BgpkitBroker {
256 fn default() -> Self {
257 dotenvy::dotenv().ok();
258 let url = match std::env::var("BGPKIT_BROKER_URL") {
259 Ok(url) => url.trim_end_matches('/').to_string(),
260 Err(_) => "https://api.bgpkit.com/v3/broker".to_string(),
261 };
262
263 let collector_project_map = DEFAULT_COLLECTORS_CONFIG.clone().to_project_map();
264
265 let accept_invalid_certs = match std::env::var("ONEIO_ACCEPT_INVALID_CERTS") {
266 Ok(t) => {
267 let l = t.to_lowercase();
268 l.starts_with("true") || l.starts_with("y")
269 }
270 Err(_) => false,
271 };
272
273 let client = match reqwest::blocking::ClientBuilder::new()
274 .danger_accept_invalid_certs(accept_invalid_certs)
275 .user_agent(concat!("bgpkit-broker/", env!("CARGO_PKG_VERSION")))
276 .build()
277 {
278 Ok(c) => c,
279 Err(e) => {
280 panic!("Failed to build HTTP client for broker requests: {}", e);
281 }
282 };
283
284 Self {
285 broker_url: url,
286 query_params: Default::default(),
287 client,
288 collector_project_map,
289 }
290 }
291}
292
293impl BgpkitBroker {
294 pub fn new() -> Self {
307 Self::default()
308 }
309
310 pub fn broker_url<S: Display>(self, url: S) -> Self {
321 let broker_url = url.to_string().trim_end_matches('/').to_string();
322 Self {
323 broker_url,
324 query_params: self.query_params,
325 client: self.client,
326 collector_project_map: self.collector_project_map,
327 }
328 }
329
330 pub fn accept_invalid_certs(self) -> Self {
332 #[allow(clippy::unwrap_used)]
333 Self {
334 broker_url: self.broker_url,
335 query_params: self.query_params,
336 client: reqwest::blocking::ClientBuilder::new()
337 .danger_accept_invalid_certs(true)
338 .build()
339 .unwrap(),
340 collector_project_map: self.collector_project_map,
341 }
342 }
343
344 #[deprecated(since = "0.7.1", note = "Please use `accept_invalid_certs` instead.")]
346 pub fn disable_ssl_check(self) -> Self {
347 Self::accept_invalid_certs(self)
348 }
349
350 fn parse_timestamp(timestamp: &str) -> Result<DateTime<Utc>, BrokerError> {
364 let ts_str = timestamp.trim();
365
366 if let Ok(dt_with_tz) = DateTime::parse_from_rfc3339(ts_str) {
368 return Ok(dt_with_tz.with_timezone(&Utc));
369 }
370
371 if let Ok(naive_dt) = chrono::NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%dT%H:%M:%SZ") {
373 return Ok(Utc.from_utc_datetime(&naive_dt));
374 }
375
376 if let Ok(naive_dt) = chrono::NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%dT%H:%M:%S") {
378 return Ok(Utc.from_utc_datetime(&naive_dt));
379 }
380
381 if let Ok(naive_dt) = chrono::NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%d %H:%M:%S") {
383 return Ok(Utc.from_utc_datetime(&naive_dt));
384 }
385
386 let date_formats = [
388 "%Y-%m-%d", "%Y/%m/%d", "%Y.%m.%d", "%Y%m%d", ];
393
394 for format in &date_formats {
395 if let Ok(date) = NaiveDate::parse_from_str(ts_str, format) {
396 if format == &"%Y%m%d" && ts_str.len() != 8 {
398 continue;
399 }
400 if let Some(naive_datetime) = date.and_hms_opt(0, 0, 0) {
402 return Ok(Utc.from_utc_datetime(&naive_datetime));
403 }
404 }
405 }
406
407 if ts_str.len() >= 9 && ts_str.len() <= 13 && ts_str.chars().all(|c| c.is_ascii_digit()) {
409 if let Ok(timestamp) = ts_str.parse::<i64>() {
410 if let Some(dt) = Utc.timestamp_opt(timestamp, 0).single() {
411 return Ok(dt);
412 }
413 }
414 }
415
416 Err(BrokerError::ConfigurationError(format!(
417 "Invalid timestamp format '{ts_str}'. Supported formats:\n\
418 - Unix timestamp: '1640995200'\n\
419 - RFC3339 with timezone: '2022-01-01T00:00:00+00:00', '2022-01-01T00:00:00Z', '2022-01-01T05:00:00-05:00'\n\
420 - RFC3339 without timezone: '2022-01-01T00:00:00' (assumes UTC)\n\
421 - Date with time: '2022-01-01 00:00:00'\n\
422 - Pure date: '2022-01-01', '2022/01/01', '2022.01.01', '20220101'"
423 )))
424 }
425
426 fn validate_configuration(&self) -> Result<QueryParams, BrokerError> {
431 let mut normalized_params = self.query_params.clone();
433
434 if let Some(ts) = &self.query_params.ts_start {
435 let parsed_datetime = Self::parse_timestamp(ts)?;
436 normalized_params.ts_start =
437 Some(parsed_datetime.format("%Y-%m-%dT%H:%M:%SZ").to_string());
438 }
439
440 if let Some(ts) = &self.query_params.ts_end {
441 let parsed_datetime = Self::parse_timestamp(ts)?;
442 normalized_params.ts_end =
443 Some(parsed_datetime.format("%Y-%m-%dT%H:%M:%SZ").to_string());
444 }
445
446 if let Some(collector_str) = &self.query_params.collector_id {
448 let collectors: Vec<String> = collector_str
449 .split(',')
450 .map(|s| s.trim())
451 .filter(|s| !s.is_empty())
452 .map(|s| s.to_string())
453 .collect();
454
455 if collectors.is_empty() {
456 return Err(BrokerError::ConfigurationError(
457 "Collector ID cannot be empty".to_string(),
458 ));
459 }
460
461 let mut seen = HashSet::new();
463 let mut deduped = Vec::with_capacity(collectors.len());
464 for c in collectors {
465 if seen.insert(c.clone()) {
466 deduped.push(c);
467 }
468 }
469
470 normalized_params.collector_id = Some(deduped.join(","));
471 }
472
473 if let Some(project_str) = &self.query_params.project {
475 let project_lower = project_str.to_lowercase();
476 match project_lower.as_str() {
477 "rrc" | "riperis" | "ripe_ris" | "routeviews" | "route_views" | "rv" => {
478 }
480 _ => {
481 return Err(BrokerError::ConfigurationError(format!(
482 "Invalid project '{project_str}'. Valid projects are: 'riperis' (aliases: 'rrc', 'ripe_ris') or 'routeviews' (aliases: 'route_views', 'rv')"
483 )));
484 }
485 }
486 }
487
488 if let Some(data_type_str) = &self.query_params.data_type {
490 let data_type_lower = data_type_str.to_lowercase();
491 match data_type_lower.as_str() {
492 "rib" | "ribs" | "r" | "update" | "updates" => {
493 }
495 _ => {
496 return Err(BrokerError::ConfigurationError(format!(
497 "Invalid data type '{data_type_str}'. Valid data types are: 'rib' (aliases: 'ribs', 'r') or 'updates' (alias: 'update')"
498 )));
499 }
500 }
501 }
502
503 if self.query_params.page < 1 {
505 return Err(BrokerError::ConfigurationError(format!(
506 "Invalid page number {}. Page number must be >= 1",
507 self.query_params.page
508 )));
509 }
510
511 if !(1..=100000).contains(&self.query_params.page_size) {
513 return Err(BrokerError::ConfigurationError(format!(
514 "Invalid page size {}. Page size must be between 1 and 100000",
515 self.query_params.page_size
516 )));
517 }
518
519 Ok(normalized_params)
520 }
521
522 pub fn ts_start<S: Display>(self, ts_start: S) -> Self {
555 let mut query_params = self.query_params;
556 query_params.ts_start = Some(ts_start.to_string());
557 Self {
558 broker_url: self.broker_url,
559 query_params,
560 client: self.client,
561 collector_project_map: self.collector_project_map,
562 }
563 }
564
565 pub fn ts_end<S: Display>(self, ts_end: S) -> Self {
590 let mut query_params = self.query_params;
591 query_params.ts_end = Some(ts_end.to_string());
592 Self {
593 broker_url: self.broker_url,
594 client: self.client,
595 query_params,
596 collector_project_map: self.collector_project_map,
597 }
598 }
599
600 pub fn collector_id<S: Display>(self, collector_id: S) -> Self {
619 let mut query_params = self.query_params;
620 query_params.collector_id = Some(collector_id.to_string());
621 Self {
622 client: self.client,
623 broker_url: self.broker_url,
624 query_params,
625 collector_project_map: self.collector_project_map,
626 }
627 }
628
629 pub fn project<S: Display>(self, project: S) -> Self {
643 let mut query_params = self.query_params;
644 query_params.project = Some(project.to_string());
645 Self {
646 client: self.client,
647 broker_url: self.broker_url,
648 query_params,
649 collector_project_map: self.collector_project_map,
650 }
651 }
652
653 pub fn data_type<S: Display>(self, data_type: S) -> Self {
669 let mut query_params = self.query_params;
670 query_params.data_type = Some(data_type.to_string());
671 Self {
672 broker_url: self.broker_url,
673 client: self.client,
674 query_params,
675 collector_project_map: self.collector_project_map,
676 }
677 }
678
679 pub fn page(self, page: i64) -> Self {
691 let mut query_params = self.query_params;
692 query_params.page = page;
693 Self {
694 broker_url: self.broker_url,
695 client: self.client,
696 query_params,
697 collector_project_map: self.collector_project_map,
698 }
699 }
700
701 pub fn page_size(self, page_size: i64) -> Self {
713 let mut query_params = self.query_params;
714 query_params.page_size = page_size;
715 Self {
716 broker_url: self.broker_url,
717 client: self.client,
718 query_params,
719 collector_project_map: self.collector_project_map,
720 }
721 }
722
723 pub fn peers_ip(self, peer_ip: IpAddr) -> Self {
732 let mut query_params = self.query_params;
733 query_params.peers_ip = Some(peer_ip);
734 Self {
735 broker_url: self.broker_url,
736 client: self.client,
737 query_params,
738 collector_project_map: self.collector_project_map,
739 }
740 }
741
742 pub fn peers_asn(self, peer_asn: u32) -> Self {
751 let mut query_params = self.query_params;
752 query_params.peers_asn = Some(peer_asn);
753 Self {
754 broker_url: self.broker_url,
755 client: self.client,
756 query_params,
757 collector_project_map: self.collector_project_map,
758 }
759 }
760
761 pub fn peers_only_full_feed(self, peer_full_feed: bool) -> Self {
770 let mut query_params = self.query_params;
771 query_params.peers_only_full_feed = peer_full_feed;
772 Self {
773 broker_url: self.broker_url,
774 client: self.client,
775 query_params,
776 collector_project_map: self.collector_project_map,
777 }
778 }
779
780 pub fn turn_page(&mut self, page: i64) {
795 self.query_params.page = page;
796 }
797
798 pub fn query_single_page(&self) -> Result<Vec<BrokerItem>, BrokerError> {
808 let validated_params = self.validate_configuration()?;
809 let url = format!("{}/search{}", &self.broker_url, &validated_params);
810 log::info!("sending broker query to {}", &url);
811 match self.run_files_query(url.as_str()) {
812 Ok(res) => Ok(res.data),
813 Err(e) => Err(e),
814 }
815 }
816
817 pub fn query_total_count(&self) -> Result<i64, BrokerError> {
841 let validated_params = self.validate_configuration()?;
842 let url = format!("{}/search{}", &self.broker_url, &validated_params);
843 match self.run_files_query(url.as_str()) {
844 Ok(res) => res.total.ok_or(BrokerError::BrokerError(
845 "count not found in response".to_string(),
846 )),
847 Err(e) => Err(e),
848 }
849 }
850
851 pub fn health_check(&self) -> Result<(), BrokerError> {
860 let url = format!("{}/health", &self.broker_url.trim_end_matches('/'));
861 match self.client.get(url.as_str()).send() {
862 Ok(response) => {
863 if response.status() == reqwest::StatusCode::OK {
864 Ok(())
865 } else {
866 Err(BrokerError::BrokerError(format!(
867 "endpoint unhealthy {}",
868 self.broker_url
869 )))
870 }
871 }
872 Err(_e) => Err(BrokerError::BrokerError(format!(
873 "endpoint unhealthy {}",
874 self.broker_url
875 ))),
876 }
877 }
878
879 pub fn query(&self) -> Result<Vec<BrokerItem>, BrokerError> {
898 let mut p = self.validate_configuration()?;
899
900 let mut items = vec![];
901 loop {
902 let url = format!("{}/search{}", &self.broker_url, &p);
903
904 let res_items = self.run_files_query(url.as_str())?.data;
905
906 let items_count = res_items.len() as i64;
907
908 if items_count == 0 {
909 break;
911 }
912
913 items.extend(res_items);
914 let cur_page = p.page;
915 p = p.page(cur_page + 1);
916
917 if items_count < p.page_size {
918 break;
920 }
921 }
922 Ok(items)
923 }
924
925 pub fn latest(&self) -> Result<Vec<BrokerItem>, BrokerError> {
939 let latest_query_url = format!("{}/latest", self.broker_url);
940 let mut items = match self.client.get(latest_query_url.as_str()).send() {
941 Ok(response) => match response.json::<CollectorLatestResult>() {
942 Ok(result) => result.data,
943 Err(_) => {
944 return Err(BrokerError::BrokerError(
945 "Error parsing response".to_string(),
946 ));
947 }
948 },
949 Err(e) => {
950 return Err(BrokerError::BrokerError(format!(
951 "Unable to connect to the URL ({latest_query_url}): {e}"
952 )));
953 }
954 };
955
956 items.retain(|item| {
957 let mut matches = true;
958 if let Some(project) = &self.query_params.project {
959 match project.to_lowercase().as_str() {
960 "rrc" | "riperis" | "ripe_ris" => {
961 matches = self
962 .collector_project_map
963 .get(&item.collector_id)
964 .cloned()
965 .unwrap_or_default()
966 .as_str()
967 == "riperis";
968 }
969 "routeviews" | "route_views" | "rv" => {
970 matches = self
971 .collector_project_map
972 .get(&item.collector_id)
973 .cloned()
974 .unwrap_or_default()
975 .as_str()
976 == "routeviews";
977 }
978 _ => {}
979 }
980 }
981
982 if let Some(data_type) = &self.query_params.data_type {
983 match data_type.to_lowercase().as_str() {
984 "rib" | "ribs" | "r" => {
985 if !item.is_rib() {
986 matches = false
988 }
989 }
990 "update" | "updates" => {
991 if item.is_rib() {
992 matches = false
994 }
995 }
996 _ => {}
997 }
998 }
999
1000 if let Some(collector_id) = &self.query_params.collector_id {
1001 let wanted: HashSet<&str> = collector_id
1002 .split(',')
1003 .map(|s| s.trim())
1004 .filter(|s| !s.is_empty())
1005 .collect();
1006
1007 if !wanted.contains(item.collector_id.as_str()) {
1008 return false;
1009 }
1010 }
1011
1012 matches
1013 });
1014
1015 Ok(items)
1016 }
1017
1018 pub fn get_peers(&self) -> Result<Vec<BrokerPeer>, BrokerError> {
1090 let mut url = format!("{}/peers", self.broker_url);
1091 let mut param_strings = vec![];
1092 if let Some(ip) = &self.query_params.peers_ip {
1093 param_strings.push(format!("ip={ip}"));
1094 }
1095 if let Some(asn) = &self.query_params.peers_asn {
1096 param_strings.push(format!("asn={asn}"));
1097 }
1098 if self.query_params.peers_only_full_feed {
1099 param_strings.push("full_feed=true".to_string());
1100 }
1101 if let Some(collector_id) = &self.query_params.collector_id {
1102 param_strings.push(format!("collector={collector_id}"));
1103 }
1104 if !param_strings.is_empty() {
1105 let param_string = param_strings.join("&");
1106 url = format!("{url}?{param_string}");
1107 }
1108
1109 let peers = match self.client.get(url.as_str()).send() {
1110 Ok(response) => match response.json::<BrokerPeersResult>() {
1111 Ok(result) => result.data,
1112 Err(_) => {
1113 return Err(BrokerError::BrokerError(
1114 "Error parsing response".to_string(),
1115 ));
1116 }
1117 },
1118 Err(e) => {
1119 return Err(BrokerError::BrokerError(format!(
1120 "Unable to connect to the URL ({url}): {e}"
1121 )));
1122 }
1123 };
1124 Ok(peers)
1125 }
1126
1127 fn run_files_query(&self, url: &str) -> Result<BrokerQueryResult, BrokerError> {
1128 log::info!("sending broker query to {}", &url);
1129 match self.client.get(url).send() {
1130 Ok(res) => match res.json::<BrokerQueryResult>() {
1131 Ok(res) => {
1132 if let Some(e) = res.error {
1133 Err(BrokerError::BrokerError(e))
1134 } else {
1135 Ok(res)
1136 }
1137 }
1138 Err(e) => {
1139 Err(BrokerError::BrokerError(e.to_string()))
1142 }
1143 },
1144 Err(e) => Err(BrokerError::from(e)),
1145 }
1146 }
1147}
1148
1149pub struct BrokerItemIterator {
1176 broker: BgpkitBroker,
1177 cached_items: Vec<BrokerItem>,
1178 first_run: bool,
1179}
1180
1181impl BrokerItemIterator {
1182 pub fn new(broker: BgpkitBroker) -> BrokerItemIterator {
1183 BrokerItemIterator {
1184 broker,
1185 cached_items: vec![],
1186 first_run: true,
1187 }
1188 }
1189}
1190
1191impl Iterator for BrokerItemIterator {
1192 type Item = BrokerItem;
1193
1194 fn next(&mut self) -> Option<Self::Item> {
1195 if let Some(item) = self.cached_items.pop() {
1197 return Some(item);
1198 }
1199
1200 if self.first_run {
1202 self.first_run = false;
1204 } else {
1205 self.broker.query_params.page += 1;
1207 }
1208
1209 let items = match self.broker.query_single_page() {
1211 Ok(i) => i,
1212 Err(_) => return None,
1213 };
1214
1215 if items.is_empty() {
1216 return None;
1218 } else {
1219 self.cached_items = items;
1221 self.cached_items.reverse();
1222 }
1223
1224 #[allow(clippy::unwrap_used)]
1225 Some(self.cached_items.pop().unwrap())
1226 }
1227}
1228
1229impl IntoIterator for BgpkitBroker {
1230 type Item = BrokerItem;
1231 type IntoIter = BrokerItemIterator;
1232
1233 fn into_iter(self) -> Self::IntoIter {
1234 BrokerItemIterator::new(self)
1235 }
1236}
1237
1238impl IntoIterator for &BgpkitBroker {
1239 type Item = BrokerItem;
1240 type IntoIter = BrokerItemIterator;
1241
1242 fn into_iter(self) -> Self::IntoIter {
1243 BrokerItemIterator::new(self.clone())
1244 }
1245}
1246
1247#[cfg(test)]
1248mod tests {
1249 use super::*;
1250
1251 #[test]
1252 fn test_query() {
1253 let broker = BgpkitBroker::new()
1254 .ts_start("1634693400")
1255 .ts_end("1634693400");
1256 let res = broker.query();
1257 assert!(&res.is_ok());
1258 let data = res.unwrap();
1259 assert!(!data.is_empty());
1260 }
1261
1262 #[test]
1263 fn test_network_error() {
1264 let broker = BgpkitBroker::new().broker_url("https://api.broker.example.com/v2");
1265 let res = broker.query();
1266 assert!(res.is_err());
1268 assert!(matches!(res.err(), Some(BrokerError::NetworkError(_))));
1269 }
1270
1271 #[test]
1272 fn test_broker_error() {
1273 let broker = BgpkitBroker::new().page(-1);
1274 let result = broker.query();
1275 assert!(result.is_err());
1276 assert!(matches!(
1277 result.err(),
1278 Some(BrokerError::ConfigurationError(_))
1279 ));
1280 }
1281
1282 #[test]
1283 fn test_query_all() {
1284 let broker = BgpkitBroker::new()
1285 .ts_start("1634693400")
1286 .ts_end("1634693400")
1287 .page_size(100);
1288 let res = broker.query();
1289 assert!(res.is_ok());
1290 assert!(res.ok().unwrap().len() >= 54);
1291 }
1292
1293 #[test]
1294 fn test_iterator() {
1295 let broker = BgpkitBroker::new()
1296 .ts_start("1634693400")
1297 .ts_end("1634693400");
1298 assert!(broker.into_iter().count() >= 54);
1299 }
1300
1301 #[test]
1302 fn test_filters() {
1303 let broker = BgpkitBroker::new()
1304 .ts_start("1634693400")
1305 .ts_end("1634693400");
1306 let items = broker.query().unwrap();
1307 assert!(items.len() >= 54);
1308
1309 let broker = BgpkitBroker::new()
1310 .ts_start("1634693400")
1311 .ts_end("1634693400")
1312 .collector_id("rrc00");
1313 let items = broker.query().unwrap();
1314 assert_eq!(items.len(), 1);
1315
1316 let broker = BgpkitBroker::new()
1317 .ts_start("1634693400")
1318 .ts_end("1634693400")
1319 .project("riperis");
1320 let items = broker.query().unwrap();
1321 assert_eq!(items.len(), 23);
1322 }
1323
1324 #[test]
1325 fn test_latest() {
1326 let broker = BgpkitBroker::new();
1327 let items = broker.latest().unwrap();
1328 assert!(items.len() >= 125);
1329
1330 let broker = BgpkitBroker::new().project("routeviews".to_string());
1331 let items = broker.latest().unwrap();
1332 assert!(!items.is_empty());
1333 assert!(items
1334 .iter()
1335 .all(|item| !item.collector_id.starts_with("rrc")));
1336
1337 let broker = BgpkitBroker::new().project("riperis".to_string());
1338 let items = broker.latest().unwrap();
1339 assert!(!items.is_empty());
1340 assert!(items
1341 .iter()
1342 .all(|item| item.collector_id.starts_with("rrc")));
1343
1344 let broker = BgpkitBroker::new().data_type("rib".to_string());
1345 let items = broker.latest().unwrap();
1346 assert!(!items.is_empty());
1347 assert!(items.iter().all(|item| item.is_rib()));
1348
1349 let broker = BgpkitBroker::new().data_type("update".to_string());
1350 let items = broker.latest().unwrap();
1351 assert!(!items.is_empty());
1352 assert!(items.iter().all(|item| !item.is_rib()));
1353
1354 let broker = BgpkitBroker::new().collector_id("rrc00".to_string());
1355 let items = broker.latest().unwrap();
1356 assert!(!items.is_empty());
1357 assert!(items
1358 .iter()
1359 .all(|item| item.collector_id.as_str() == "rrc00"));
1360 assert_eq!(items.len(), 2);
1361 }
1362
1363 #[test]
1364 fn test_latest_no_ssl() {
1365 let broker = BgpkitBroker::new().accept_invalid_certs();
1366 let items = broker.latest().unwrap();
1367 assert!(items.len() >= 125);
1368 }
1369
1370 #[test]
1371 fn test_health_check() {
1372 let broker = BgpkitBroker::new();
1373 let res = broker.health_check();
1374 assert!(res.is_ok());
1375 }
1376
1377 #[test]
1378 fn test_peers() {
1379 let broker = BgpkitBroker::new();
1380 let all_peers = broker.get_peers().unwrap();
1381 assert!(!all_peers.is_empty());
1382 let first_peer = all_peers.first().unwrap();
1383 let first_ip = first_peer.ip;
1384 let first_asn = first_peer.asn;
1385
1386 let broker = BgpkitBroker::new().peers_ip(first_ip);
1387 let peers = broker.get_peers().unwrap();
1388 assert!(!peers.is_empty());
1389
1390 let broker = BgpkitBroker::new().peers_asn(first_asn);
1391 let peers = broker.get_peers().unwrap();
1392 assert!(!peers.is_empty());
1393
1394 let broker = BgpkitBroker::new().peers_only_full_feed(true);
1395 let full_feed_peers = broker.get_peers().unwrap();
1396 assert!(!full_feed_peers.is_empty());
1397 assert!(full_feed_peers.len() < all_peers.len());
1398
1399 let broker = BgpkitBroker::new().collector_id("rrc00");
1400 let rrc_peers = broker.get_peers().unwrap();
1401 assert!(!rrc_peers.is_empty());
1402 assert!(rrc_peers.iter().all(|peer| peer.collector == "rrc00"));
1403
1404 let broker = BgpkitBroker::new().collector_id("rrc00,route-views2");
1405 let rrc_rv_peers = broker.get_peers().unwrap();
1406 assert!(!rrc_rv_peers.is_empty());
1407 assert!(rrc_rv_peers
1408 .iter()
1409 .any(|peer| peer.collector == "rrc00" || peer.collector == "route-views2"));
1410
1411 assert!(rrc_rv_peers.len() > rrc_peers.len());
1412 }
1413
1414 #[test]
1415 fn test_timestamp_parsing_unix() {
1416 let broker = BgpkitBroker::new();
1417
1418 let result = broker.clone().ts_start("1640995200");
1420 assert_eq!(result.query_params.ts_start, Some("1640995200".to_string()));
1422
1423 let result = broker.clone().ts_end("1640995200");
1424 assert_eq!(result.query_params.ts_end, Some("1640995200".to_string()));
1425 }
1426
1427 #[test]
1428 fn test_timestamp_parsing_rfc3339() {
1429 let broker = BgpkitBroker::new();
1430
1431 let result = broker.clone().ts_start("2022-01-01T00:00:00Z");
1433 assert_eq!(
1434 result.query_params.ts_start,
1435 Some("2022-01-01T00:00:00Z".to_string())
1436 );
1437
1438 let result = broker.clone().ts_start("2022-01-01T12:30:45");
1440 assert_eq!(
1441 result.query_params.ts_start,
1442 Some("2022-01-01T12:30:45".to_string())
1443 );
1444
1445 let result = broker.clone().ts_end("2022-01-01 12:30:45");
1447 assert_eq!(
1448 result.query_params.ts_end,
1449 Some("2022-01-01 12:30:45".to_string())
1450 );
1451 }
1452
1453 #[test]
1454 fn test_timestamp_parsing_pure_dates() {
1455 let broker = BgpkitBroker::new();
1456
1457 let result = broker.clone().ts_start("2022-01-01");
1459 assert_eq!(result.query_params.ts_start, Some("2022-01-01".to_string()));
1460
1461 let result = broker.clone().ts_start("2022/01/01");
1463 assert_eq!(result.query_params.ts_start, Some("2022/01/01".to_string()));
1464
1465 let result = broker.clone().ts_end("2022.01.01");
1467 assert_eq!(result.query_params.ts_end, Some("2022.01.01".to_string()));
1468
1469 let result = broker.clone().ts_end("20220101");
1471 assert_eq!(result.query_params.ts_end, Some("20220101".to_string()));
1472 }
1473
1474 #[test]
1475 fn test_timestamp_parsing_whitespace() {
1476 let broker = BgpkitBroker::new();
1477
1478 let result = broker.clone().ts_start(" 2022-01-01 ");
1480 assert_eq!(
1481 result.query_params.ts_start,
1482 Some(" 2022-01-01 ".to_string())
1483 );
1484
1485 let result = broker.clone().ts_end("\t1640995200\n");
1486 assert_eq!(
1487 result.query_params.ts_end,
1488 Some("\t1640995200\n".to_string())
1489 );
1490 }
1491
1492 #[test]
1493 fn test_timestamp_parsing_errors() {
1494 let broker = BgpkitBroker::new();
1495
1496 let broker_with_invalid = broker.clone().ts_start("invalid-timestamp");
1498 let result = broker_with_invalid.query();
1499 assert!(result.is_err());
1500 assert!(matches!(
1501 result.err(),
1502 Some(BrokerError::ConfigurationError(_))
1503 ));
1504
1505 let broker_with_invalid = broker.clone().ts_end("2022-13-01");
1507 let result = broker_with_invalid.query();
1508 assert!(result.is_err());
1509 assert!(matches!(
1510 result.err(),
1511 Some(BrokerError::ConfigurationError(_))
1512 ));
1513
1514 let broker_with_invalid = broker.clone().ts_start("20221301");
1516 let result = broker_with_invalid.query();
1517 assert!(result.is_err());
1518 assert!(matches!(
1519 result.err(),
1520 Some(BrokerError::ConfigurationError(_))
1521 ));
1522
1523 let broker_with_invalid = broker.clone().ts_start("2022-01");
1525 let result = broker_with_invalid.query();
1526 assert!(result.is_err());
1527 assert!(matches!(
1528 result.err(),
1529 Some(BrokerError::ConfigurationError(_))
1530 ));
1531 }
1532
1533 #[test]
1534 fn test_parse_timestamp_direct() {
1535 use chrono::{NaiveDate, NaiveDateTime};
1536
1537 let expected_unix = Utc.timestamp_opt(1640995200, 0).single().unwrap();
1541 assert_eq!(
1542 BgpkitBroker::parse_timestamp("1640995200").unwrap(),
1543 expected_unix
1544 );
1545
1546 let expected_rfc3339_z = Utc.from_utc_datetime(
1548 &NaiveDateTime::parse_from_str("2022-01-01T00:00:00", "%Y-%m-%dT%H:%M:%S").unwrap(),
1549 );
1550 assert_eq!(
1551 BgpkitBroker::parse_timestamp("2022-01-01T00:00:00Z").unwrap(),
1552 expected_rfc3339_z
1553 );
1554
1555 let expected_rfc3339_no_z = Utc.from_utc_datetime(
1556 &NaiveDateTime::parse_from_str("2022-01-01T12:30:45", "%Y-%m-%dT%H:%M:%S").unwrap(),
1557 );
1558 assert_eq!(
1559 BgpkitBroker::parse_timestamp("2022-01-01T12:30:45").unwrap(),
1560 expected_rfc3339_no_z
1561 );
1562
1563 let expected_space_format = Utc.from_utc_datetime(
1564 &NaiveDateTime::parse_from_str("2022-01-01 12:30:45", "%Y-%m-%d %H:%M:%S").unwrap(),
1565 );
1566 assert_eq!(
1567 BgpkitBroker::parse_timestamp("2022-01-01 12:30:45").unwrap(),
1568 expected_space_format
1569 );
1570
1571 let expected_date = Utc.from_utc_datetime(
1573 &NaiveDate::from_ymd_opt(2022, 1, 1)
1574 .unwrap()
1575 .and_hms_opt(0, 0, 0)
1576 .unwrap(),
1577 );
1578 assert_eq!(
1579 BgpkitBroker::parse_timestamp("2022-01-01").unwrap(),
1580 expected_date
1581 );
1582 assert_eq!(
1583 BgpkitBroker::parse_timestamp("2022/01/01").unwrap(),
1584 expected_date
1585 );
1586 assert_eq!(
1587 BgpkitBroker::parse_timestamp("2022.01.01").unwrap(),
1588 expected_date
1589 );
1590 assert_eq!(
1591 BgpkitBroker::parse_timestamp("20220101").unwrap(),
1592 expected_date
1593 );
1594
1595 let result_plus_tz = BgpkitBroker::parse_timestamp("2022-01-01T00:00:00+00:00").unwrap();
1597 assert_eq!(result_plus_tz, expected_date);
1598 println!("✓ +00:00 timezone format works");
1599
1600 let result_minus_tz = BgpkitBroker::parse_timestamp("2022-01-01T05:00:00-05:00").unwrap();
1602 let expected_10am = Utc.with_ymd_and_hms(2022, 1, 1, 10, 0, 0).unwrap();
1603 assert_eq!(result_minus_tz, expected_10am);
1604 println!("✓ -05:00 timezone format works (05:00-05:00 = 10:00Z)");
1605
1606 assert!(BgpkitBroker::parse_timestamp("invalid").is_err());
1608 assert!(BgpkitBroker::parse_timestamp("2022-13-01").is_err());
1609 assert!(BgpkitBroker::parse_timestamp("2022-01").is_err());
1610 }
1611
1612 #[test]
1613 fn test_collector_id_validation() {
1614 let broker = BgpkitBroker::new();
1615
1616 let broker_valid = broker.clone().collector_id("rrc00");
1618 let result = broker_valid.validate_configuration();
1619 assert!(result.is_ok());
1620
1621 let broker_valid = broker.clone().collector_id("rrc00,route-views2");
1623 let result = broker_valid.validate_configuration();
1624 assert!(result.is_ok());
1625
1626 let broker_unknown = broker.clone().collector_id("brand-new-collector");
1628 let result = broker_unknown.validate_configuration();
1629 assert!(result.is_ok());
1630
1631 let broker_mixed = broker.clone().collector_id("rrc00,brand-new-collector");
1633 let result = broker_mixed.validate_configuration();
1634 assert!(result.is_ok());
1635
1636 let broker_empty = broker.clone().collector_id(", , ,");
1638 let result = broker_empty.validate_configuration();
1639 assert!(result.is_err());
1640 assert!(matches!(
1641 result.err(),
1642 Some(BrokerError::ConfigurationError(_))
1643 ));
1644 }
1645
1646 #[test]
1647 fn test_project_validation() {
1648 let broker = BgpkitBroker::new();
1649
1650 let broker_valid = broker.clone().project("riperis");
1652 let result = broker_valid.validate_configuration();
1653 assert!(result.is_ok());
1654
1655 let broker_valid = broker.clone().project("routeviews");
1656 let result = broker_valid.validate_configuration();
1657 assert!(result.is_ok());
1658
1659 let broker_valid = broker.clone().project("rrc");
1661 let result = broker_valid.validate_configuration();
1662 assert!(result.is_ok());
1663
1664 let broker_valid = broker.clone().project("rv");
1665 let result = broker_valid.validate_configuration();
1666 assert!(result.is_ok());
1667
1668 let broker_invalid = broker.clone().project("invalid-project");
1670 let result = broker_invalid.validate_configuration();
1671 assert!(result.is_err());
1672 assert!(matches!(
1673 result.err(),
1674 Some(BrokerError::ConfigurationError(_))
1675 ));
1676 }
1677
1678 #[test]
1679 fn test_data_type_validation() {
1680 let broker = BgpkitBroker::new();
1681
1682 let broker_valid = broker.clone().data_type("rib");
1684 let result = broker_valid.validate_configuration();
1685 assert!(result.is_ok());
1686
1687 let broker_valid = broker.clone().data_type("updates");
1688 let result = broker_valid.validate_configuration();
1689 assert!(result.is_ok());
1690
1691 let broker_valid = broker.clone().data_type("ribs");
1693 let result = broker_valid.validate_configuration();
1694 assert!(result.is_ok());
1695
1696 let broker_valid = broker.clone().data_type("update");
1697 let result = broker_valid.validate_configuration();
1698 assert!(result.is_ok());
1699
1700 let broker_invalid = broker.clone().data_type("invalid-type");
1702 let result = broker_invalid.validate_configuration();
1703 assert!(result.is_err());
1704 assert!(matches!(
1705 result.err(),
1706 Some(BrokerError::ConfigurationError(_))
1707 ));
1708 }
1709
1710 #[test]
1711 fn test_page_validation() {
1712 let broker = BgpkitBroker::new();
1713
1714 let broker_valid = broker.clone().page(1);
1716 let result = broker_valid.validate_configuration();
1717 assert!(result.is_ok());
1718
1719 let broker_valid = broker.clone().page(100);
1720 let result = broker_valid.validate_configuration();
1721 assert!(result.is_ok());
1722
1723 let broker_invalid = broker.clone().page(0);
1725 let result = broker_invalid.validate_configuration();
1726 assert!(result.is_err());
1727 assert!(matches!(
1728 result.err(),
1729 Some(BrokerError::ConfigurationError(_))
1730 ));
1731 }
1732
1733 #[test]
1734 fn test_page_size_validation() {
1735 let broker = BgpkitBroker::new();
1736
1737 let broker_valid = broker.clone().page_size(1);
1739 let result = broker_valid.validate_configuration();
1740 assert!(result.is_ok());
1741
1742 let broker_valid = broker.clone().page_size(100);
1743 let result = broker_valid.validate_configuration();
1744 assert!(result.is_ok());
1745
1746 let broker_valid = broker.clone().page_size(100000);
1747 let result = broker_valid.validate_configuration();
1748 assert!(result.is_ok());
1749
1750 let broker_invalid = broker.clone().page_size(0);
1752 let result = broker_invalid.validate_configuration();
1753 assert!(result.is_err());
1754 assert!(matches!(
1755 result.err(),
1756 Some(BrokerError::ConfigurationError(_))
1757 ));
1758
1759 let broker_invalid = broker.clone().page_size(100001);
1760 let result = broker_invalid.validate_configuration();
1761 assert!(result.is_err());
1762 assert!(matches!(
1763 result.err(),
1764 Some(BrokerError::ConfigurationError(_))
1765 ));
1766 }
1767
1768 #[test]
1769 fn test_method_chaining() {
1770 let broker = BgpkitBroker::new()
1771 .ts_start("1634693400")
1772 .ts_end("1634693400")
1773 .collector_id("rrc00")
1774 .project("riperis")
1775 .data_type("rib")
1776 .page(1)
1777 .page_size(10);
1778
1779 assert_eq!(broker.query_params.ts_start, Some("1634693400".to_string()));
1781 assert_eq!(broker.query_params.ts_end, Some("1634693400".to_string()));
1782 assert_eq!(broker.query_params.collector_id, Some("rrc00".to_string()));
1783 assert_eq!(broker.query_params.project, Some("riperis".to_string()));
1784 assert_eq!(broker.query_params.data_type, Some("rib".to_string()));
1785 assert_eq!(broker.query_params.page, 1);
1786 assert_eq!(broker.query_params.page_size, 10);
1787 }
1788}