1#![doc(
203 html_logo_url = "https://raw.githubusercontent.com/bgpkit/assets/main/logos/icon-transparent.png",
204 html_favicon_url = "https://raw.githubusercontent.com/bgpkit/assets/main/logos/favicon.ico"
205)]
206#![allow(unknown_lints)]
207
208mod collector;
209#[cfg(feature = "cli")]
210pub mod config;
211#[cfg(feature = "cli")]
212mod crawler;
213#[cfg(feature = "backend")]
214pub mod db;
215mod error;
216mod item;
217mod peer;
218mod query;
219mod shortcuts;
220#[cfg(feature = "sse")]
221mod sse;
222
223use crate::collector::DEFAULT_COLLECTORS_CONFIG;
224use crate::peer::BrokerPeersResult;
225use crate::query::{BrokerQueryResult, CollectorLatestResult};
226use chrono::{DateTime, NaiveDate, TimeZone, Utc};
227pub use collector::{load_collectors, Collector};
228
229#[cfg(feature = "cli")]
230pub use config::BrokerConfig;
231#[cfg(feature = "cli")]
232pub use crawler::crawl_collector;
233#[cfg(feature = "backend")]
234pub use db::{LocalBrokerDb, UpdatesMeta, DEFAULT_PAGE_SIZE};
235pub use error::BrokerError;
236pub use item::BrokerItem;
237pub use peer::BrokerPeer;
238pub use query::{QueryParams, SortOrder};
239pub use shortcuts::SnapshotFiles;
240#[cfg(feature = "sse")]
241pub use sse::{BrokerItemSubscription, SseSubscriptionOptions};
242use std::collections::{HashMap, HashSet};
243use std::fmt::Display;
244use std::net::IpAddr;
245use std::path::PathBuf;
246
247const SDK_USER_AGENT: &str = concat!("bgpkit-broker/", env!("CARGO_PKG_VERSION"));
248
249#[derive(Clone)]
253pub struct BgpkitBroker {
254 pub broker_url: String,
255 pub query_params: QueryParams,
256 client: reqwest::blocking::Client,
257 collector_project_map: HashMap<String, String>,
258 accept_invalid_certs: bool,
259 cache_dir: Option<PathBuf>,
260}
261
262impl Default for BgpkitBroker {
263 fn default() -> Self {
264 dotenvy::dotenv().ok();
265 let url = match std::env::var("BGPKIT_BROKER_URL") {
266 Ok(url) => url.trim_end_matches('/').to_string(),
267 Err(_) => "https://api.bgpkit.com/v3/broker".to_string(),
268 };
269
270 let collector_project_map = DEFAULT_COLLECTORS_CONFIG.clone().to_project_map();
271
272 let accept_invalid_certs = read_accept_invalid_certs_from_env();
273 let client = build_blocking_client(accept_invalid_certs);
274
275 Self {
276 broker_url: url,
277 query_params: Default::default(),
278 client,
279 collector_project_map,
280 accept_invalid_certs,
281 cache_dir: None,
282 }
283 }
284}
285
286fn read_accept_invalid_certs_from_env() -> bool {
287 match std::env::var("ONEIO_ACCEPT_INVALID_CERTS") {
288 Ok(t) => {
289 let l = t.to_lowercase();
290 l.starts_with("true") || l.starts_with("y")
291 }
292 Err(_) => false,
293 }
294}
295
296fn build_blocking_client(accept_invalid_certs: bool) -> reqwest::blocking::Client {
297 match reqwest::blocking::ClientBuilder::new()
298 .danger_accept_invalid_certs(accept_invalid_certs)
299 .user_agent(SDK_USER_AGENT)
300 .build()
301 {
302 Ok(c) => c,
303 Err(e) => {
304 panic!("Failed to build HTTP client for broker requests: {}", e);
305 }
306 }
307}
308
309#[cfg(feature = "sse")]
310pub(crate) fn build_async_client(
311 accept_invalid_certs: bool,
312) -> Result<reqwest::Client, BrokerError> {
313 reqwest::ClientBuilder::new()
314 .danger_accept_invalid_certs(accept_invalid_certs)
315 .user_agent(SDK_USER_AGENT)
316 .build()
317 .map_err(BrokerError::NetworkError)
318}
319
320impl BgpkitBroker {
321 pub fn new() -> Self {
334 Self::default()
335 }
336
337 pub fn broker_url<S: Display>(self, url: S) -> Self {
348 let broker_url = url.to_string().trim_end_matches('/').to_string();
349 Self {
350 broker_url,
351 query_params: self.query_params,
352 client: self.client,
353 collector_project_map: self.collector_project_map,
354 accept_invalid_certs: self.accept_invalid_certs,
355 cache_dir: self.cache_dir,
356 }
357 }
358
359 pub fn accept_invalid_certs(self) -> Self {
361 Self {
362 broker_url: self.broker_url,
363 query_params: self.query_params,
364 client: build_blocking_client(true),
365 collector_project_map: self.collector_project_map,
366 accept_invalid_certs: true,
367 cache_dir: self.cache_dir,
368 }
369 }
370
371 #[deprecated(since = "0.7.1", note = "Please use `accept_invalid_certs` instead.")]
373 pub fn disable_ssl_check(self) -> Self {
374 Self::accept_invalid_certs(self)
375 }
376
377 pub fn cache_dir<P: Into<PathBuf>>(mut self, path: P) -> Self {
392 let path = path.into();
393 if !path.exists() {
394 std::fs::create_dir_all(&path).expect("Failed to create cache directory");
395 }
396 self.cache_dir = Some(path);
397 self
398 }
399
400 fn cache_key(&self) -> String {
402 use sha2::{Digest, Sha256};
403
404 let params_str = format!(
405 "{}:{}:{}:{}:{}:{}:{}:{}",
406 self.broker_url,
407 self.query_params.ts_start.as_deref().unwrap_or(""),
408 self.query_params.ts_end.as_deref().unwrap_or(""),
409 self.query_params.collector_id.as_deref().unwrap_or(""),
410 self.query_params.project.as_deref().unwrap_or(""),
411 self.query_params.data_type.as_deref().unwrap_or(""),
412 self.query_params.page,
413 self.query_params.page_size
414 );
415
416 let mut hasher = Sha256::new();
417 hasher.update(params_str.as_bytes());
418 format!("{:x}", hasher.finalize())
419 }
420
421 fn load_cache(&self) -> Option<Vec<BrokerItem>> {
423 let cache_dir = self.cache_dir.as_ref()?;
424 let cache_file = cache_dir.join(self.cache_key()).with_extension("json");
425
426 if !cache_file.exists() {
427 return None;
428 }
429
430 match std::fs::read_to_string(&cache_file) {
431 Ok(contents) => {
432 match serde_json::from_str::<Vec<BrokerItem>>(&contents) {
433 Ok(items) => {
434 log::info!("Loaded {} items from cache", items.len());
435 Some(items)
436 }
437 Err(e) => {
438 log::warn!("Failed to deserialize cache file: {}", e);
439 None
440 }
441 }
442 }
443 Err(e) => {
444 log::warn!("Failed to read cache file: {}", e);
445 None
446 }
447 }
448 }
449
450 fn save_cache(&self, items: &[BrokerItem]) {
452 let Some(cache_dir) = self.cache_dir.as_ref() else {
453 return;
454 };
455
456 let cache_file = cache_dir.join(self.cache_key()).with_extension("json");
457
458 match serde_json::to_string(items) {
459 Ok(json) => {
460 if let Err(e) = std::fs::write(&cache_file, json) {
461 log::warn!("Failed to write cache file: {}", e);
462 } else {
463 log::info!("Saved {} items to cache", items.len());
464 }
465 }
466 Err(e) => {
467 log::warn!("Failed to serialize items for cache: {}", e);
468 }
469 }
470 }
471
472 fn parse_timestamp(timestamp: &str) -> Result<DateTime<Utc>, BrokerError> {
486 let ts_str = timestamp.trim();
487
488 if let Ok(dt_with_tz) = DateTime::parse_from_rfc3339(ts_str) {
490 return Ok(dt_with_tz.with_timezone(&Utc));
491 }
492
493 if let Ok(naive_dt) = chrono::NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%dT%H:%M:%SZ") {
495 return Ok(Utc.from_utc_datetime(&naive_dt));
496 }
497
498 if let Ok(naive_dt) = chrono::NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%dT%H:%M:%S") {
500 return Ok(Utc.from_utc_datetime(&naive_dt));
501 }
502
503 if let Ok(naive_dt) = chrono::NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%d %H:%M:%S") {
505 return Ok(Utc.from_utc_datetime(&naive_dt));
506 }
507
508 let date_formats = [
510 "%Y-%m-%d", "%Y/%m/%d", "%Y.%m.%d", "%Y%m%d", ];
515
516 for format in &date_formats {
517 if let Ok(date) = NaiveDate::parse_from_str(ts_str, format) {
518 if format == &"%Y%m%d" && ts_str.len() != 8 {
520 continue;
521 }
522 if let Some(naive_datetime) = date.and_hms_opt(0, 0, 0) {
524 return Ok(Utc.from_utc_datetime(&naive_datetime));
525 }
526 }
527 }
528
529 if ts_str.len() >= 9 && ts_str.len() <= 13 && ts_str.chars().all(|c| c.is_ascii_digit()) {
531 if let Ok(timestamp) = ts_str.parse::<i64>() {
532 if let Some(dt) = Utc.timestamp_opt(timestamp, 0).single() {
533 return Ok(dt);
534 }
535 }
536 }
537
538 Err(BrokerError::ConfigurationError(format!(
539 "Invalid timestamp format '{ts_str}'. Supported formats:\n\
540 - Unix timestamp: '1640995200'\n\
541 - RFC3339 with timezone: '2022-01-01T00:00:00+00:00', '2022-01-01T00:00:00Z', '2022-01-01T05:00:00-05:00'\n\
542 - RFC3339 without timezone: '2022-01-01T00:00:00' (assumes UTC)\n\
543 - Date with time: '2022-01-01 00:00:00'\n\
544 - Pure date: '2022-01-01', '2022/01/01', '2022.01.01', '20220101'"
545 )))
546 }
547
548 fn validate_configuration(&self) -> Result<QueryParams, BrokerError> {
553 let mut normalized_params = self.query_params.clone();
555
556 if let Some(ts) = &self.query_params.ts_start {
557 let parsed_datetime = Self::parse_timestamp(ts)?;
558 normalized_params.ts_start =
559 Some(parsed_datetime.format("%Y-%m-%dT%H:%M:%SZ").to_string());
560 }
561
562 if let Some(ts) = &self.query_params.ts_end {
563 let parsed_datetime = Self::parse_timestamp(ts)?;
564 normalized_params.ts_end =
565 Some(parsed_datetime.format("%Y-%m-%dT%H:%M:%SZ").to_string());
566 }
567
568 if let Some(collector_str) = &self.query_params.collector_id {
570 let collectors: Vec<String> = collector_str
571 .split(',')
572 .map(|s| s.trim())
573 .filter(|s| !s.is_empty())
574 .map(|s| s.to_string())
575 .collect();
576
577 if collectors.is_empty() {
578 return Err(BrokerError::ConfigurationError(
579 "Collector ID cannot be empty".to_string(),
580 ));
581 }
582
583 let mut seen = HashSet::new();
585 let mut deduped = Vec::with_capacity(collectors.len());
586 for c in collectors {
587 if seen.insert(c.clone()) {
588 deduped.push(c);
589 }
590 }
591
592 normalized_params.collector_id = Some(deduped.join(","));
593 }
594
595 if let Some(project_str) = &self.query_params.project {
597 let project_lower = project_str.to_lowercase();
598 match project_lower.as_str() {
599 "rrc" | "riperis" | "ripe_ris" | "routeviews" | "route_views" | "rv" => {
600 }
602 _ => {
603 return Err(BrokerError::ConfigurationError(format!(
604 "Invalid project '{project_str}'. Valid projects are: 'riperis' (aliases: 'rrc', 'ripe_ris') or 'routeviews' (aliases: 'route_views', 'rv')"
605 )));
606 }
607 }
608 }
609
610 if let Some(data_type_str) = &self.query_params.data_type {
612 let data_type_lower = data_type_str.to_lowercase();
613 match data_type_lower.as_str() {
614 "rib" | "ribs" | "r" | "update" | "updates" => {
615 }
617 _ => {
618 return Err(BrokerError::ConfigurationError(format!(
619 "Invalid data type '{data_type_str}'. Valid data types are: 'rib' (aliases: 'ribs', 'r') or 'updates' (alias: 'update')"
620 )));
621 }
622 }
623 }
624
625 if self.query_params.page < 1 {
627 return Err(BrokerError::ConfigurationError(format!(
628 "Invalid page number {}. Page number must be >= 1",
629 self.query_params.page
630 )));
631 }
632
633 if !(1..=100000).contains(&self.query_params.page_size) {
635 return Err(BrokerError::ConfigurationError(format!(
636 "Invalid page size {}. Page size must be between 1 and 100000",
637 self.query_params.page_size
638 )));
639 }
640
641 Ok(normalized_params)
642 }
643
644 pub fn ts_start<S: Display>(self, ts_start: S) -> Self {
677 let mut query_params = self.query_params;
678 query_params.ts_start = Some(ts_start.to_string());
679 Self {
680 broker_url: self.broker_url,
681 query_params,
682 client: self.client,
683 collector_project_map: self.collector_project_map,
684 accept_invalid_certs: self.accept_invalid_certs,
685 cache_dir: self.cache_dir,
686 }
687 }
688
689 pub fn ts_end<S: Display>(self, ts_end: S) -> Self {
714 let mut query_params = self.query_params;
715 query_params.ts_end = Some(ts_end.to_string());
716 Self {
717 broker_url: self.broker_url,
718 client: self.client,
719 query_params,
720 collector_project_map: self.collector_project_map,
721 accept_invalid_certs: self.accept_invalid_certs,
722 cache_dir: self.cache_dir,
723 }
724 }
725
726 pub fn collector_id<S: Display>(self, collector_id: S) -> Self {
745 let mut query_params = self.query_params;
746 query_params.collector_id = Some(collector_id.to_string());
747 Self {
748 client: self.client,
749 broker_url: self.broker_url,
750 query_params,
751 collector_project_map: self.collector_project_map,
752 accept_invalid_certs: self.accept_invalid_certs,
753 cache_dir: self.cache_dir,
754 }
755 }
756
757 pub fn project<S: Display>(self, project: S) -> Self {
771 let mut query_params = self.query_params;
772 query_params.project = Some(project.to_string());
773 Self {
774 client: self.client,
775 broker_url: self.broker_url,
776 query_params,
777 collector_project_map: self.collector_project_map,
778 accept_invalid_certs: self.accept_invalid_certs,
779 cache_dir: self.cache_dir,
780 }
781 }
782
783 pub fn data_type<S: Display>(self, data_type: S) -> Self {
799 let mut query_params = self.query_params;
800 query_params.data_type = Some(data_type.to_string());
801 Self {
802 broker_url: self.broker_url,
803 client: self.client,
804 query_params,
805 collector_project_map: self.collector_project_map,
806 accept_invalid_certs: self.accept_invalid_certs,
807 cache_dir: self.cache_dir,
808 }
809 }
810
811 pub fn page(self, page: i64) -> Self {
823 let mut query_params = self.query_params;
824 query_params.page = page;
825 Self {
826 broker_url: self.broker_url,
827 client: self.client,
828 query_params,
829 collector_project_map: self.collector_project_map,
830 accept_invalid_certs: self.accept_invalid_certs,
831 cache_dir: self.cache_dir,
832 }
833 }
834
835 pub fn page_size(self, page_size: i64) -> Self {
847 let mut query_params = self.query_params;
848 query_params.page_size = page_size;
849 Self {
850 broker_url: self.broker_url,
851 client: self.client,
852 query_params,
853 collector_project_map: self.collector_project_map,
854 accept_invalid_certs: self.accept_invalid_certs,
855 cache_dir: self.cache_dir,
856 }
857 }
858
859 pub fn peers_ip(self, peer_ip: IpAddr) -> Self {
868 let mut query_params = self.query_params;
869 query_params.peers_ip = Some(peer_ip);
870 Self {
871 broker_url: self.broker_url,
872 client: self.client,
873 query_params,
874 collector_project_map: self.collector_project_map,
875 accept_invalid_certs: self.accept_invalid_certs,
876 cache_dir: self.cache_dir,
877 }
878 }
879
880 pub fn peers_asn(self, peer_asn: u32) -> Self {
889 let mut query_params = self.query_params;
890 query_params.peers_asn = Some(peer_asn);
891 Self {
892 broker_url: self.broker_url,
893 client: self.client,
894 query_params,
895 collector_project_map: self.collector_project_map,
896 accept_invalid_certs: self.accept_invalid_certs,
897 cache_dir: self.cache_dir,
898 }
899 }
900
901 pub fn peers_only_full_feed(self, peer_full_feed: bool) -> Self {
910 let mut query_params = self.query_params;
911 query_params.peers_only_full_feed = peer_full_feed;
912 Self {
913 broker_url: self.broker_url,
914 client: self.client,
915 query_params,
916 collector_project_map: self.collector_project_map,
917 accept_invalid_certs: self.accept_invalid_certs,
918 cache_dir: self.cache_dir,
919 }
920 }
921
922 pub fn turn_page(&mut self, page: i64) {
937 self.query_params.page = page;
938 }
939
940 pub fn query_single_page(&self) -> Result<Vec<BrokerItem>, BrokerError> {
950 if let Some(cached_items) = self.load_cache() {
952 return Ok(cached_items);
953 }
954
955 let validated_params = self.validate_configuration()?;
956 let url = format!("{}/search{}", &self.broker_url, &validated_params);
957 log::info!("sending broker query to {}", &url);
958 match self.run_files_query(url.as_str()) {
959 Ok(res) => {
960 self.save_cache(&res.data);
962 Ok(res.data)
963 }
964 Err(e) => Err(e),
965 }
966 }
967
968 pub fn query_total_count(&self) -> Result<i64, BrokerError> {
992 let validated_params = self.validate_configuration()?;
993 let url = format!("{}/search{}", &self.broker_url, &validated_params);
994 match self.run_files_query(url.as_str()) {
995 Ok(res) => res.total.ok_or(BrokerError::BrokerError(
996 "count not found in response".to_string(),
997 )),
998 Err(e) => Err(e),
999 }
1000 }
1001
1002 pub fn health_check(&self) -> Result<(), BrokerError> {
1011 let url = format!("{}/health", &self.broker_url.trim_end_matches('/'));
1012 match self.client.get(url.as_str()).send() {
1013 Ok(response) => {
1014 if response.status() == reqwest::StatusCode::OK {
1015 Ok(())
1016 } else {
1017 Err(BrokerError::BrokerError(format!(
1018 "endpoint unhealthy {}",
1019 self.broker_url
1020 )))
1021 }
1022 }
1023 Err(_e) => Err(BrokerError::BrokerError(format!(
1024 "endpoint unhealthy {}",
1025 self.broker_url
1026 ))),
1027 }
1028 }
1029
1030 pub fn query(&self) -> Result<Vec<BrokerItem>, BrokerError> {
1049 let mut p = self.validate_configuration()?;
1050
1051 let mut items = vec![];
1052 loop {
1053 let url = format!("{}/search{}", &self.broker_url, &p);
1054
1055 let res_items = self.run_files_query(url.as_str())?.data;
1056
1057 let items_count = res_items.len() as i64;
1058
1059 if items_count == 0 {
1060 break;
1062 }
1063
1064 items.extend(res_items);
1065 let cur_page = p.page;
1066 p = p.page(cur_page + 1);
1067
1068 if items_count < p.page_size {
1069 break;
1071 }
1072 }
1073 Ok(items)
1074 }
1075
1076 pub fn latest(&self) -> Result<Vec<BrokerItem>, BrokerError> {
1090 let latest_query_url = format!("{}/latest", self.broker_url);
1091 let mut items = match self.client.get(latest_query_url.as_str()).send() {
1092 Ok(response) => match response.json::<CollectorLatestResult>() {
1093 Ok(result) => result.data,
1094 Err(_) => {
1095 return Err(BrokerError::BrokerError(
1096 "Error parsing response".to_string(),
1097 ));
1098 }
1099 },
1100 Err(e) => {
1101 return Err(BrokerError::BrokerError(format!(
1102 "Unable to connect to the URL ({latest_query_url}): {e}"
1103 )));
1104 }
1105 };
1106
1107 items.retain(|item| {
1108 let mut matches = true;
1109 if let Some(project) = &self.query_params.project {
1110 match project.to_lowercase().as_str() {
1111 "rrc" | "riperis" | "ripe_ris" => {
1112 matches = self
1113 .collector_project_map
1114 .get(&item.collector_id)
1115 .cloned()
1116 .unwrap_or_default()
1117 .as_str()
1118 == "riperis";
1119 }
1120 "routeviews" | "route_views" | "rv" => {
1121 matches = self
1122 .collector_project_map
1123 .get(&item.collector_id)
1124 .cloned()
1125 .unwrap_or_default()
1126 .as_str()
1127 == "routeviews";
1128 }
1129 _ => {}
1130 }
1131 }
1132
1133 if let Some(data_type) = &self.query_params.data_type {
1134 match data_type.to_lowercase().as_str() {
1135 "rib" | "ribs" | "r" => {
1136 if !item.is_rib() {
1137 matches = false
1139 }
1140 }
1141 "update" | "updates" => {
1142 if item.is_rib() {
1143 matches = false
1145 }
1146 }
1147 _ => {}
1148 }
1149 }
1150
1151 if let Some(collector_id) = &self.query_params.collector_id {
1152 let wanted: HashSet<&str> = collector_id
1153 .split(',')
1154 .map(|s| s.trim())
1155 .filter(|s| !s.is_empty())
1156 .collect();
1157
1158 if !wanted.contains(item.collector_id.as_str()) {
1159 return false;
1160 }
1161 }
1162
1163 matches
1164 });
1165
1166 Ok(items)
1167 }
1168
1169 pub fn get_peers(&self) -> Result<Vec<BrokerPeer>, BrokerError> {
1241 let mut url = format!("{}/peers", self.broker_url);
1242 let mut param_strings = vec![];
1243 if let Some(ip) = &self.query_params.peers_ip {
1244 param_strings.push(format!("ip={ip}"));
1245 }
1246 if let Some(asn) = &self.query_params.peers_asn {
1247 param_strings.push(format!("asn={asn}"));
1248 }
1249 if self.query_params.peers_only_full_feed {
1250 param_strings.push("full_feed=true".to_string());
1251 }
1252 if let Some(collector_id) = &self.query_params.collector_id {
1253 param_strings.push(format!("collector={collector_id}"));
1254 }
1255 if !param_strings.is_empty() {
1256 let param_string = param_strings.join("&");
1257 url = format!("{url}?{param_string}");
1258 }
1259
1260 let peers = match self.client.get(url.as_str()).send() {
1261 Ok(response) => match response.json::<BrokerPeersResult>() {
1262 Ok(result) => result.data,
1263 Err(_) => {
1264 return Err(BrokerError::BrokerError(
1265 "Error parsing response".to_string(),
1266 ));
1267 }
1268 },
1269 Err(e) => {
1270 return Err(BrokerError::BrokerError(format!(
1271 "Unable to connect to the URL ({url}): {e}"
1272 )));
1273 }
1274 };
1275 Ok(peers)
1276 }
1277
1278 fn run_files_query(&self, url: &str) -> Result<BrokerQueryResult, BrokerError> {
1279 log::info!("sending broker query to {}", &url);
1280 match self.client.get(url).send() {
1281 Ok(res) => match res.json::<BrokerQueryResult>() {
1282 Ok(res) => {
1283 if let Some(e) = res.error {
1284 Err(BrokerError::BrokerError(e))
1285 } else {
1286 Ok(res)
1287 }
1288 }
1289 Err(e) => {
1290 Err(BrokerError::BrokerError(e.to_string()))
1293 }
1294 },
1295 Err(e) => Err(BrokerError::from(e)),
1296 }
1297 }
1298}
1299
1300pub struct BrokerItemIterator {
1327 broker: BgpkitBroker,
1328 cached_items: Vec<BrokerItem>,
1329 first_run: bool,
1330}
1331
1332impl BrokerItemIterator {
1333 pub fn new(broker: BgpkitBroker) -> BrokerItemIterator {
1334 BrokerItemIterator {
1335 broker,
1336 cached_items: vec![],
1337 first_run: true,
1338 }
1339 }
1340}
1341
1342impl Iterator for BrokerItemIterator {
1343 type Item = BrokerItem;
1344
1345 fn next(&mut self) -> Option<Self::Item> {
1346 if let Some(item) = self.cached_items.pop() {
1348 return Some(item);
1349 }
1350
1351 if self.first_run {
1353 self.first_run = false;
1355 } else {
1356 self.broker.query_params.page += 1;
1358 }
1359
1360 let items = match self.broker.query_single_page() {
1362 Ok(i) => i,
1363 Err(_) => return None,
1364 };
1365
1366 if items.is_empty() {
1367 return None;
1369 } else {
1370 self.cached_items = items;
1372 self.cached_items.reverse();
1373 }
1374
1375 #[allow(clippy::unwrap_used)]
1376 Some(self.cached_items.pop().unwrap())
1377 }
1378}
1379
1380impl IntoIterator for BgpkitBroker {
1381 type Item = BrokerItem;
1382 type IntoIter = BrokerItemIterator;
1383
1384 fn into_iter(self) -> Self::IntoIter {
1385 BrokerItemIterator::new(self)
1386 }
1387}
1388
1389impl IntoIterator for &BgpkitBroker {
1390 type Item = BrokerItem;
1391 type IntoIter = BrokerItemIterator;
1392
1393 fn into_iter(self) -> Self::IntoIter {
1394 BrokerItemIterator::new(self.clone())
1395 }
1396}
1397
1398#[cfg(test)]
1399mod tests {
1400 use super::*;
1401
1402 #[test]
1403 fn test_query() {
1404 let broker = BgpkitBroker::new()
1405 .ts_start("1634693400")
1406 .ts_end("1634693400");
1407 let res = broker.query();
1408 assert!(&res.is_ok());
1409 let data = res.unwrap();
1410 assert!(!data.is_empty());
1411 }
1412
1413 #[test]
1414 fn test_network_error() {
1415 let broker = BgpkitBroker::new().broker_url("https://api.broker.example.com/v2");
1416 let res = broker.query();
1417 assert!(res.is_err());
1419 assert!(matches!(res.err(), Some(BrokerError::NetworkError(_))));
1420 }
1421
1422 #[test]
1423 fn test_broker_error() {
1424 let broker = BgpkitBroker::new().page(-1);
1425 let result = broker.query();
1426 assert!(result.is_err());
1427 assert!(matches!(
1428 result.err(),
1429 Some(BrokerError::ConfigurationError(_))
1430 ));
1431 }
1432
1433 #[test]
1434 fn test_query_all() {
1435 let broker = BgpkitBroker::new()
1436 .ts_start("1634693400")
1437 .ts_end("1634693400")
1438 .page_size(100);
1439 let res = broker.query();
1440 assert!(res.is_ok());
1441 assert!(res.ok().unwrap().len() >= 54);
1442 }
1443
1444 #[test]
1445 fn test_iterator() {
1446 let broker = BgpkitBroker::new()
1447 .ts_start("1634693400")
1448 .ts_end("1634693400");
1449 assert!(broker.into_iter().count() >= 54);
1450 }
1451
1452 #[test]
1453 fn test_filters() {
1454 let broker = BgpkitBroker::new()
1455 .ts_start("1634693400")
1456 .ts_end("1634693400");
1457 let items = broker.query().unwrap();
1458 assert!(items.len() >= 54);
1459
1460 let broker = BgpkitBroker::new()
1461 .ts_start("1634693400")
1462 .ts_end("1634693400")
1463 .collector_id("rrc00");
1464 let items = broker.query().unwrap();
1465 assert_eq!(items.len(), 1);
1466
1467 let broker = BgpkitBroker::new()
1468 .ts_start("1634693400")
1469 .ts_end("1634693400")
1470 .project("riperis");
1471 let items = broker.query().unwrap();
1472 assert_eq!(items.len(), 23);
1473 }
1474
1475 #[test]
1476 fn test_latest() {
1477 let broker = BgpkitBroker::new();
1478 let items = broker.latest().unwrap();
1479 assert!(items.len() >= 125);
1480
1481 let broker = BgpkitBroker::new().project("routeviews".to_string());
1482 let items = broker.latest().unwrap();
1483 assert!(!items.is_empty());
1484 assert!(items
1485 .iter()
1486 .all(|item| !item.collector_id.starts_with("rrc")));
1487
1488 let broker = BgpkitBroker::new().project("riperis".to_string());
1489 let items = broker.latest().unwrap();
1490 assert!(!items.is_empty());
1491 assert!(items
1492 .iter()
1493 .all(|item| item.collector_id.starts_with("rrc")));
1494
1495 let broker = BgpkitBroker::new().data_type("rib".to_string());
1496 let items = broker.latest().unwrap();
1497 assert!(!items.is_empty());
1498 assert!(items.iter().all(|item| item.is_rib()));
1499
1500 let broker = BgpkitBroker::new().data_type("update".to_string());
1501 let items = broker.latest().unwrap();
1502 assert!(!items.is_empty());
1503 assert!(items.iter().all(|item| !item.is_rib()));
1504
1505 let broker = BgpkitBroker::new().collector_id("rrc00".to_string());
1506 let items = broker.latest().unwrap();
1507 assert!(!items.is_empty());
1508 assert!(items
1509 .iter()
1510 .all(|item| item.collector_id.as_str() == "rrc00"));
1511 assert_eq!(items.len(), 2);
1512 }
1513
1514 #[test]
1515 fn test_latest_no_ssl() {
1516 let broker = BgpkitBroker::new().accept_invalid_certs();
1517 let items = broker.latest().unwrap();
1518 assert!(items.len() >= 125);
1519 }
1520
1521 #[test]
1522 fn test_health_check() {
1523 let broker = BgpkitBroker::new();
1524 let res = broker.health_check();
1525 assert!(res.is_ok());
1526 }
1527
1528 #[test]
1529 fn test_peers() {
1530 let broker = BgpkitBroker::new();
1531 let all_peers = broker.get_peers().unwrap();
1532 assert!(!all_peers.is_empty());
1533 let first_peer = all_peers.first().unwrap();
1534 let first_ip = first_peer.ip;
1535 let first_asn = first_peer.asn;
1536
1537 let broker = BgpkitBroker::new().peers_ip(first_ip);
1538 let peers = broker.get_peers().unwrap();
1539 assert!(!peers.is_empty());
1540
1541 let broker = BgpkitBroker::new().peers_asn(first_asn);
1542 let peers = broker.get_peers().unwrap();
1543 assert!(!peers.is_empty());
1544
1545 let broker = BgpkitBroker::new().peers_only_full_feed(true);
1546 let full_feed_peers = broker.get_peers().unwrap();
1547 assert!(!full_feed_peers.is_empty());
1548 assert!(full_feed_peers.len() < all_peers.len());
1549
1550 let broker = BgpkitBroker::new().collector_id("rrc00");
1551 let rrc_peers = broker.get_peers().unwrap();
1552 assert!(!rrc_peers.is_empty());
1553 assert!(rrc_peers.iter().all(|peer| peer.collector == "rrc00"));
1554
1555 let broker = BgpkitBroker::new().collector_id("rrc00,route-views2");
1556 let rrc_rv_peers = broker.get_peers().unwrap();
1557 assert!(!rrc_rv_peers.is_empty());
1558 assert!(rrc_rv_peers
1559 .iter()
1560 .any(|peer| peer.collector == "rrc00" || peer.collector == "route-views2"));
1561
1562 assert!(rrc_rv_peers.len() > rrc_peers.len());
1563 }
1564
1565 #[test]
1566 fn test_timestamp_parsing_unix() {
1567 let broker = BgpkitBroker::new();
1568
1569 let result = broker.clone().ts_start("1640995200");
1571 assert_eq!(result.query_params.ts_start, Some("1640995200".to_string()));
1573
1574 let result = broker.clone().ts_end("1640995200");
1575 assert_eq!(result.query_params.ts_end, Some("1640995200".to_string()));
1576 }
1577
1578 #[test]
1579 fn test_timestamp_parsing_rfc3339() {
1580 let broker = BgpkitBroker::new();
1581
1582 let result = broker.clone().ts_start("2022-01-01T00:00:00Z");
1584 assert_eq!(
1585 result.query_params.ts_start,
1586 Some("2022-01-01T00:00:00Z".to_string())
1587 );
1588
1589 let result = broker.clone().ts_start("2022-01-01T12:30:45");
1591 assert_eq!(
1592 result.query_params.ts_start,
1593 Some("2022-01-01T12:30:45".to_string())
1594 );
1595
1596 let result = broker.clone().ts_end("2022-01-01 12:30:45");
1598 assert_eq!(
1599 result.query_params.ts_end,
1600 Some("2022-01-01 12:30:45".to_string())
1601 );
1602 }
1603
1604 #[test]
1605 fn test_timestamp_parsing_pure_dates() {
1606 let broker = BgpkitBroker::new();
1607
1608 let result = broker.clone().ts_start("2022-01-01");
1610 assert_eq!(result.query_params.ts_start, Some("2022-01-01".to_string()));
1611
1612 let result = broker.clone().ts_start("2022/01/01");
1614 assert_eq!(result.query_params.ts_start, Some("2022/01/01".to_string()));
1615
1616 let result = broker.clone().ts_end("2022.01.01");
1618 assert_eq!(result.query_params.ts_end, Some("2022.01.01".to_string()));
1619
1620 let result = broker.clone().ts_end("20220101");
1622 assert_eq!(result.query_params.ts_end, Some("20220101".to_string()));
1623 }
1624
1625 #[test]
1626 fn test_timestamp_parsing_whitespace() {
1627 let broker = BgpkitBroker::new();
1628
1629 let result = broker.clone().ts_start(" 2022-01-01 ");
1631 assert_eq!(
1632 result.query_params.ts_start,
1633 Some(" 2022-01-01 ".to_string())
1634 );
1635
1636 let result = broker.clone().ts_end("\t1640995200\n");
1637 assert_eq!(
1638 result.query_params.ts_end,
1639 Some("\t1640995200\n".to_string())
1640 );
1641 }
1642
1643 #[test]
1644 fn test_timestamp_parsing_errors() {
1645 let broker = BgpkitBroker::new();
1646
1647 let broker_with_invalid = broker.clone().ts_start("invalid-timestamp");
1649 let result = broker_with_invalid.query();
1650 assert!(result.is_err());
1651 assert!(matches!(
1652 result.err(),
1653 Some(BrokerError::ConfigurationError(_))
1654 ));
1655
1656 let broker_with_invalid = broker.clone().ts_end("2022-13-01");
1658 let result = broker_with_invalid.query();
1659 assert!(result.is_err());
1660 assert!(matches!(
1661 result.err(),
1662 Some(BrokerError::ConfigurationError(_))
1663 ));
1664
1665 let broker_with_invalid = broker.clone().ts_start("20221301");
1667 let result = broker_with_invalid.query();
1668 assert!(result.is_err());
1669 assert!(matches!(
1670 result.err(),
1671 Some(BrokerError::ConfigurationError(_))
1672 ));
1673
1674 let broker_with_invalid = broker.clone().ts_start("2022-01");
1676 let result = broker_with_invalid.query();
1677 assert!(result.is_err());
1678 assert!(matches!(
1679 result.err(),
1680 Some(BrokerError::ConfigurationError(_))
1681 ));
1682 }
1683
1684 #[test]
1685 fn test_parse_timestamp_direct() {
1686 use chrono::{NaiveDate, NaiveDateTime};
1687
1688 let expected_unix = Utc.timestamp_opt(1640995200, 0).single().unwrap();
1692 assert_eq!(
1693 BgpkitBroker::parse_timestamp("1640995200").unwrap(),
1694 expected_unix
1695 );
1696
1697 let expected_rfc3339_z = Utc.from_utc_datetime(
1699 &NaiveDateTime::parse_from_str("2022-01-01T00:00:00", "%Y-%m-%dT%H:%M:%S").unwrap(),
1700 );
1701 assert_eq!(
1702 BgpkitBroker::parse_timestamp("2022-01-01T00:00:00Z").unwrap(),
1703 expected_rfc3339_z
1704 );
1705
1706 let expected_rfc3339_no_z = Utc.from_utc_datetime(
1707 &NaiveDateTime::parse_from_str("2022-01-01T12:30:45", "%Y-%m-%dT%H:%M:%S").unwrap(),
1708 );
1709 assert_eq!(
1710 BgpkitBroker::parse_timestamp("2022-01-01T12:30:45").unwrap(),
1711 expected_rfc3339_no_z
1712 );
1713
1714 let expected_space_format = Utc.from_utc_datetime(
1715 &NaiveDateTime::parse_from_str("2022-01-01 12:30:45", "%Y-%m-%d %H:%M:%S").unwrap(),
1716 );
1717 assert_eq!(
1718 BgpkitBroker::parse_timestamp("2022-01-01 12:30:45").unwrap(),
1719 expected_space_format
1720 );
1721
1722 let expected_date = Utc.from_utc_datetime(
1724 &NaiveDate::from_ymd_opt(2022, 1, 1)
1725 .unwrap()
1726 .and_hms_opt(0, 0, 0)
1727 .unwrap(),
1728 );
1729 assert_eq!(
1730 BgpkitBroker::parse_timestamp("2022-01-01").unwrap(),
1731 expected_date
1732 );
1733 assert_eq!(
1734 BgpkitBroker::parse_timestamp("2022/01/01").unwrap(),
1735 expected_date
1736 );
1737 assert_eq!(
1738 BgpkitBroker::parse_timestamp("2022.01.01").unwrap(),
1739 expected_date
1740 );
1741 assert_eq!(
1742 BgpkitBroker::parse_timestamp("20220101").unwrap(),
1743 expected_date
1744 );
1745
1746 let result_plus_tz = BgpkitBroker::parse_timestamp("2022-01-01T00:00:00+00:00").unwrap();
1748 assert_eq!(result_plus_tz, expected_date);
1749 println!("✓ +00:00 timezone format works");
1750
1751 let result_minus_tz = BgpkitBroker::parse_timestamp("2022-01-01T05:00:00-05:00").unwrap();
1753 let expected_10am = Utc.with_ymd_and_hms(2022, 1, 1, 10, 0, 0).unwrap();
1754 assert_eq!(result_minus_tz, expected_10am);
1755 println!("✓ -05:00 timezone format works (05:00-05:00 = 10:00Z)");
1756
1757 assert!(BgpkitBroker::parse_timestamp("invalid").is_err());
1759 assert!(BgpkitBroker::parse_timestamp("2022-13-01").is_err());
1760 assert!(BgpkitBroker::parse_timestamp("2022-01").is_err());
1761 }
1762
1763 #[test]
1764 fn test_collector_id_validation() {
1765 let broker = BgpkitBroker::new();
1766
1767 let broker_valid = broker.clone().collector_id("rrc00");
1769 let result = broker_valid.validate_configuration();
1770 assert!(result.is_ok());
1771
1772 let broker_valid = broker.clone().collector_id("rrc00,route-views2");
1774 let result = broker_valid.validate_configuration();
1775 assert!(result.is_ok());
1776
1777 let broker_unknown = broker.clone().collector_id("brand-new-collector");
1779 let result = broker_unknown.validate_configuration();
1780 assert!(result.is_ok());
1781
1782 let broker_mixed = broker.clone().collector_id("rrc00,brand-new-collector");
1784 let result = broker_mixed.validate_configuration();
1785 assert!(result.is_ok());
1786
1787 let broker_empty = broker.clone().collector_id(", , ,");
1789 let result = broker_empty.validate_configuration();
1790 assert!(result.is_err());
1791 assert!(matches!(
1792 result.err(),
1793 Some(BrokerError::ConfigurationError(_))
1794 ));
1795 }
1796
1797 #[test]
1798 fn test_project_validation() {
1799 let broker = BgpkitBroker::new();
1800
1801 let broker_valid = broker.clone().project("riperis");
1803 let result = broker_valid.validate_configuration();
1804 assert!(result.is_ok());
1805
1806 let broker_valid = broker.clone().project("routeviews");
1807 let result = broker_valid.validate_configuration();
1808 assert!(result.is_ok());
1809
1810 let broker_valid = broker.clone().project("rrc");
1812 let result = broker_valid.validate_configuration();
1813 assert!(result.is_ok());
1814
1815 let broker_valid = broker.clone().project("rv");
1816 let result = broker_valid.validate_configuration();
1817 assert!(result.is_ok());
1818
1819 let broker_invalid = broker.clone().project("invalid-project");
1821 let result = broker_invalid.validate_configuration();
1822 assert!(result.is_err());
1823 assert!(matches!(
1824 result.err(),
1825 Some(BrokerError::ConfigurationError(_))
1826 ));
1827 }
1828
1829 #[test]
1830 fn test_data_type_validation() {
1831 let broker = BgpkitBroker::new();
1832
1833 let broker_valid = broker.clone().data_type("rib");
1835 let result = broker_valid.validate_configuration();
1836 assert!(result.is_ok());
1837
1838 let broker_valid = broker.clone().data_type("updates");
1839 let result = broker_valid.validate_configuration();
1840 assert!(result.is_ok());
1841
1842 let broker_valid = broker.clone().data_type("ribs");
1844 let result = broker_valid.validate_configuration();
1845 assert!(result.is_ok());
1846
1847 let broker_valid = broker.clone().data_type("update");
1848 let result = broker_valid.validate_configuration();
1849 assert!(result.is_ok());
1850
1851 let broker_invalid = broker.clone().data_type("invalid-type");
1853 let result = broker_invalid.validate_configuration();
1854 assert!(result.is_err());
1855 assert!(matches!(
1856 result.err(),
1857 Some(BrokerError::ConfigurationError(_))
1858 ));
1859 }
1860
1861 #[test]
1862 fn test_page_validation() {
1863 let broker = BgpkitBroker::new();
1864
1865 let broker_valid = broker.clone().page(1);
1867 let result = broker_valid.validate_configuration();
1868 assert!(result.is_ok());
1869
1870 let broker_valid = broker.clone().page(100);
1871 let result = broker_valid.validate_configuration();
1872 assert!(result.is_ok());
1873
1874 let broker_invalid = broker.clone().page(0);
1876 let result = broker_invalid.validate_configuration();
1877 assert!(result.is_err());
1878 assert!(matches!(
1879 result.err(),
1880 Some(BrokerError::ConfigurationError(_))
1881 ));
1882 }
1883
1884 #[test]
1885 fn test_page_size_validation() {
1886 let broker = BgpkitBroker::new();
1887
1888 let broker_valid = broker.clone().page_size(1);
1890 let result = broker_valid.validate_configuration();
1891 assert!(result.is_ok());
1892
1893 let broker_valid = broker.clone().page_size(100);
1894 let result = broker_valid.validate_configuration();
1895 assert!(result.is_ok());
1896
1897 let broker_valid = broker.clone().page_size(100000);
1898 let result = broker_valid.validate_configuration();
1899 assert!(result.is_ok());
1900
1901 let broker_invalid = broker.clone().page_size(0);
1903 let result = broker_invalid.validate_configuration();
1904 assert!(result.is_err());
1905 assert!(matches!(
1906 result.err(),
1907 Some(BrokerError::ConfigurationError(_))
1908 ));
1909
1910 let broker_invalid = broker.clone().page_size(100001);
1911 let result = broker_invalid.validate_configuration();
1912 assert!(result.is_err());
1913 assert!(matches!(
1914 result.err(),
1915 Some(BrokerError::ConfigurationError(_))
1916 ));
1917 }
1918
1919 #[test]
1920 fn test_method_chaining() {
1921 let broker = BgpkitBroker::new()
1922 .ts_start("1634693400")
1923 .ts_end("1634693400")
1924 .collector_id("rrc00")
1925 .project("riperis")
1926 .data_type("rib")
1927 .page(1)
1928 .page_size(10);
1929
1930 assert_eq!(broker.query_params.ts_start, Some("1634693400".to_string()));
1932 assert_eq!(broker.query_params.ts_end, Some("1634693400".to_string()));
1933 assert_eq!(broker.query_params.collector_id, Some("rrc00".to_string()));
1934 assert_eq!(broker.query_params.project, Some("riperis".to_string()));
1935 assert_eq!(broker.query_params.data_type, Some("rib".to_string()));
1936 assert_eq!(broker.query_params.page, 1);
1937 assert_eq!(broker.query_params.page_size, 10);
1938 }
1939}