Skip to main content

bgpkit_broker/
lib.rs

1/*!
2# Overview
3
4[bgpkit-broker][crate] is a package that allows accessing the BGPKIT Broker API and search for BGP archive
5files with different search parameters available.
6
7# Examples
8
9## Basic Usage with Iterator
10
11The recommended usage to collect [BrokerItem]s is to use the built-in iterator. The
12[BrokerItemIterator] handles making API queries so that it can continuously stream new items until
13it reaches the end of items. This is useful for simply getting **all** matching items without need
14to worry about pagination.
15
16```no_run
17use bgpkit_broker::{BgpkitBroker, BrokerItem};
18
19let broker = BgpkitBroker::new()
20    .ts_start("2022-01-01")
21    .ts_end("2022-01-02")
22    .collector_id("route-views2");
23
24// Iterate by reference (reusable broker)
25for item in &broker {
26    println!("BGP file: {} from {} ({})",
27             item.url, item.collector_id, item.data_type);
28}
29
30// Or collect into vector
31let items: Vec<BrokerItem> = broker.into_iter().collect();
32println!("Found {} BGP archive files", items.len());
33```
34
35## Practical BGP Data Analysis with Shortcuts
36
37The SDK provides convenient shortcuts for common BGP data analysis patterns:
38
39### Daily RIB Analysis Across Diverse Collectors
40
41```no_run
42use bgpkit_broker::BgpkitBroker;
43
44// Find the most diverse collectors for comprehensive analysis
45let broker = BgpkitBroker::new()
46    .ts_start("2024-01-01")
47    .ts_end("2024-01-31");
48
49let diverse_collectors = broker.most_diverse_collectors(5, None).unwrap();
50println!("Selected {} diverse collectors: {:?}",
51         diverse_collectors.len(), diverse_collectors);
52
53// Get daily RIB snapshots from these collectors
54let daily_ribs = broker
55    .clone()
56    .collector_id(&diverse_collectors.join(","))
57    .daily_ribs().unwrap();
58
59println!("Found {} daily RIB snapshots for analysis", daily_ribs.len());
60for rib in daily_ribs.iter().take(3) {
61    println!("Daily snapshot: {} from {} at {}",
62             rib.collector_id,
63             rib.ts_start.format("%Y-%m-%d"),
64             rib.url);
65}
66```
67
68### Recent BGP Updates Monitoring
69
70```no_run
71use bgpkit_broker::BgpkitBroker;
72
73// Monitor recent BGP updates from multiple collectors
74let recent_updates = BgpkitBroker::new()
75    .collector_id("route-views2,rrc00,route-views6")
76    .recent_updates(6).unwrap(); // last 6 hours
77
78println!("Found {} recent BGP update files", recent_updates.len());
79for update in recent_updates.iter().take(5) {
80    println!("Update: {} from {} at {}",
81             update.collector_id,
82             update.ts_start.format("%Y-%m-%d %H:%M:%S"),
83             update.url);
84}
85```
86
87### Project-specific Analysis
88
89```no_run
90use bgpkit_broker::BgpkitBroker;
91
92// Compare RouteViews vs RIPE RIS daily snapshots
93let routeviews_ribs = BgpkitBroker::new()
94    .ts_start("2024-01-01")
95    .ts_end("2024-01-07")
96    .project("routeviews")
97    .daily_ribs().unwrap();
98
99let ripe_ribs = BgpkitBroker::new()
100    .ts_start("2024-01-01")
101    .ts_end("2024-01-07")
102    .project("riperis")
103    .daily_ribs().unwrap();
104
105println!("RouteViews daily RIBs: {}", routeviews_ribs.len());
106println!("RIPE RIS daily RIBs: {}", ripe_ribs.len());
107```
108
109### Advanced Collector Selection
110
111```no_run
112use bgpkit_broker::BgpkitBroker;
113
114let broker = BgpkitBroker::new();
115
116// Get diverse RouteViews collectors for focused analysis
117let rv_collectors = broker.most_diverse_collectors(3, Some("routeviews")).unwrap();
118println!("Diverse RouteViews collectors: {:?}", rv_collectors);
119
120// Use them to get comprehensive recent updates
121let comprehensive_updates = broker
122    .clone()
123    .collector_id(&rv_collectors.join(","))
124    .recent_updates(12).unwrap(); // last 12 hours
125
126println!("Got {} updates from {} collectors",
127         comprehensive_updates.len(), rv_collectors.len());
128```
129
130### Routing Table Snapshot Reconstruction
131
132```no_run
133use bgpkit_broker::BgpkitBroker;
134
135// Get the MRT files needed to construct a routing table snapshot
136let broker = BgpkitBroker::new();
137let snapshots = broker.get_snapshot_files(
138    &["route-views2", "rrc00"],
139    "2024-01-01T12:00:00Z"
140).unwrap();
141
142for snapshot in snapshots {
143    println!("Collector: {}", snapshot.collector_id);
144    println!("RIB dump: {}", snapshot.rib_url);
145    println!("Updates to apply: {}", snapshot.updates_urls.len());
146
147    // Use with bgpkit-parser to reconstruct routing table:
148    // 1. Parse RIB dump for initial state
149    // 2. Apply updates in order to reach target timestamp
150}
151```
152
153## Manual Page Queries
154
155For fine-grained control over pagination or custom iteration patterns:
156
157```rust,no_run
158use bgpkit_broker::BgpkitBroker;
159
160let mut broker = BgpkitBroker::new()
161    .ts_start("2022-01-01")
162    .ts_end("2022-01-02")
163    .page(1)
164    .page_size(50);
165
166// Query specific page
167let page1_items = broker.query_single_page().unwrap();
168println!("Page 1: {} items", page1_items.len());
169
170// Move to next page
171broker.turn_page(2);
172let page2_items = broker.query_single_page().unwrap();
173println!("Page 2: {} items", page2_items.len());
174```
175
176## Getting Latest Files and Peer Information
177
178Access the most recent data and peer information:
179
180```rust,no_run
181use bgpkit_broker::BgpkitBroker;
182
183// Get latest files from all collectors
184let broker = BgpkitBroker::new();
185let latest_files = broker.latest().unwrap();
186println!("Latest files from {} collectors", latest_files.len());
187
188// Get full-feed peers from specific collector
189let peers = BgpkitBroker::new()
190    .collector_id("route-views2")
191    .peers_only_full_feed(true)
192    .get_peers().unwrap();
193
194println!("Found {} full-feed peers", peers.len());
195for peer in peers.iter().take(3) {
196    println!("Peer: AS{} ({}) - v4: {}, v6: {}",
197             peer.asn, peer.ip, peer.num_v4_pfxs, peer.num_v6_pfxs);
198}
199```
200*/
201
202#![doc(
203    html_logo_url = "https://raw.githubusercontent.com/bgpkit/assets/main/logos/icon-transparent.png",
204    html_favicon_url = "https://raw.githubusercontent.com/bgpkit/assets/main/logos/favicon.ico"
205)]
206#![allow(unknown_lints)]
207
208mod collector;
209#[cfg(feature = "cli")]
210pub mod config;
211#[cfg(feature = "cli")]
212mod crawler;
213#[cfg(feature = "backend")]
214pub mod db;
215mod error;
216mod item;
217mod peer;
218mod query;
219mod shortcuts;
220#[cfg(feature = "sse")]
221mod sse;
222
223use crate::collector::DEFAULT_COLLECTORS_CONFIG;
224use crate::peer::BrokerPeersResult;
225use crate::query::{BrokerQueryResult, CollectorLatestResult};
226use chrono::{DateTime, NaiveDate, TimeZone, Utc};
227pub use collector::{load_collectors, Collector};
228
229#[cfg(feature = "cli")]
230pub use config::BrokerConfig;
231#[cfg(feature = "cli")]
232pub use crawler::crawl_collector;
233#[cfg(feature = "backend")]
234pub use db::{LocalBrokerDb, UpdatesMeta, DEFAULT_PAGE_SIZE};
235pub use error::BrokerError;
236pub use item::BrokerItem;
237pub use peer::BrokerPeer;
238pub use query::{QueryParams, SortOrder};
239pub use shortcuts::SnapshotFiles;
240#[cfg(feature = "sse")]
241pub use sse::{BrokerItemSubscription, SseSubscriptionOptions};
242use std::collections::{HashMap, HashSet};
243use std::fmt::Display;
244use std::net::IpAddr;
245use std::path::PathBuf;
246
247const SDK_USER_AGENT: &str = concat!("bgpkit-broker/", env!("CARGO_PKG_VERSION"));
248
249/// BgpkitBroker struct maintains the broker's URL and handles making API queries.
250///
251/// See [module doc][crate#examples] for usage examples.
252#[derive(Clone)]
253pub struct BgpkitBroker {
254    pub broker_url: String,
255    pub query_params: QueryParams,
256    client: reqwest::blocking::Client,
257    collector_project_map: HashMap<String, String>,
258    accept_invalid_certs: bool,
259    cache_dir: Option<PathBuf>,
260}
261
262impl Default for BgpkitBroker {
263    fn default() -> Self {
264        dotenvy::dotenv().ok();
265        let url = match std::env::var("BGPKIT_BROKER_URL") {
266            Ok(url) => url.trim_end_matches('/').to_string(),
267            Err(_) => "https://api.bgpkit.com/v3/broker".to_string(),
268        };
269
270        let collector_project_map = DEFAULT_COLLECTORS_CONFIG.clone().to_project_map();
271
272        let accept_invalid_certs = read_accept_invalid_certs_from_env();
273        let client = build_blocking_client(accept_invalid_certs);
274
275        Self {
276            broker_url: url,
277            query_params: Default::default(),
278            client,
279            collector_project_map,
280            accept_invalid_certs,
281            cache_dir: None,
282        }
283    }
284}
285
286fn read_accept_invalid_certs_from_env() -> bool {
287    match std::env::var("ONEIO_ACCEPT_INVALID_CERTS") {
288        Ok(t) => {
289            let l = t.to_lowercase();
290            l.starts_with("true") || l.starts_with("y")
291        }
292        Err(_) => false,
293    }
294}
295
296fn build_blocking_client(accept_invalid_certs: bool) -> reqwest::blocking::Client {
297    match reqwest::blocking::ClientBuilder::new()
298        .danger_accept_invalid_certs(accept_invalid_certs)
299        .user_agent(SDK_USER_AGENT)
300        .build()
301    {
302        Ok(c) => c,
303        Err(e) => {
304            panic!("Failed to build HTTP client for broker requests: {}", e);
305        }
306    }
307}
308
309#[cfg(feature = "sse")]
310pub(crate) fn build_async_client(
311    accept_invalid_certs: bool,
312) -> Result<reqwest::Client, BrokerError> {
313    reqwest::ClientBuilder::new()
314        .danger_accept_invalid_certs(accept_invalid_certs)
315        .user_agent(SDK_USER_AGENT)
316        .build()
317        .map_err(BrokerError::NetworkError)
318}
319
320impl BgpkitBroker {
321    /// Construct a new BgpkitBroker object.
322    ///
323    /// The URL and query parameters can be adjusted with other functions.
324    ///
325    /// Users can opt in to accept invalid SSL certificates by setting the environment variable
326    /// `ONEIO_ACCEPT_INVALID_CERTS` to `true`.
327    ///
328    /// # Examples
329    /// ```
330    /// use bgpkit_broker::BgpkitBroker;
331    /// let broker = BgpkitBroker::new();
332    /// ```
333    pub fn new() -> Self {
334        Self::default()
335    }
336
337    /// Configure broker URL.
338    ///
339    /// You can change the default broker URL to point to your own broker instance.
340    /// You can also change the URL by setting the environment variable `BGPKIT_BROKER_URL`.
341    ///
342    /// # Examples
343    /// ```
344    /// let broker = bgpkit_broker::BgpkitBroker::new()
345    ///     .broker_url("api.broker.example.com/v3");
346    /// ```
347    pub fn broker_url<S: Display>(self, url: S) -> Self {
348        let broker_url = url.to_string().trim_end_matches('/').to_string();
349        Self {
350            broker_url,
351            query_params: self.query_params,
352            client: self.client,
353            collector_project_map: self.collector_project_map,
354            accept_invalid_certs: self.accept_invalid_certs,
355            cache_dir: self.cache_dir,
356        }
357    }
358
359    /// DANGER: Accept invalid SSL certificates.
360    pub fn accept_invalid_certs(self) -> Self {
361        Self {
362            broker_url: self.broker_url,
363            query_params: self.query_params,
364            client: build_blocking_client(true),
365            collector_project_map: self.collector_project_map,
366            accept_invalid_certs: true,
367            cache_dir: self.cache_dir,
368        }
369    }
370
371    /// Disable SSL certificate check.
372    #[deprecated(since = "0.7.1", note = "Please use `accept_invalid_certs` instead.")]
373    pub fn disable_ssl_check(self) -> Self {
374        Self::accept_invalid_certs(self)
375    }
376
377    /// Set the cache directory for storing query results.
378    ///
379    /// When a cache directory is specified, query results will be cached to disk
380    /// and loaded from cache on subsequent queries with the same parameters.
381    /// This is useful for development and offline usage.
382    ///
383    /// The directory will be created if it doesn't exist. Panics if unable to create.
384    ///
385    /// # Examples
386    ///
387    /// ```no_run
388    /// let broker = bgpkit_broker::BgpkitBroker::new()
389    ///     .cache_dir("/tmp/bgpkit-cache");
390    /// ```
391    pub fn cache_dir<P: Into<PathBuf>>(mut self, path: P) -> Self {
392        let path = path.into();
393        if !path.exists() {
394            std::fs::create_dir_all(&path).expect("Failed to create cache directory");
395        }
396        self.cache_dir = Some(path);
397        self
398    }
399
400    /// Generate cache key from current query parameters.
401    fn cache_key(&self) -> String {
402        use sha2::{Digest, Sha256};
403        
404        let params_str = format!(
405            "{}:{}:{}:{}:{}:{}:{}:{}",
406            self.broker_url,
407            self.query_params.ts_start.as_deref().unwrap_or(""),
408            self.query_params.ts_end.as_deref().unwrap_or(""),
409            self.query_params.collector_id.as_deref().unwrap_or(""),
410            self.query_params.project.as_deref().unwrap_or(""),
411            self.query_params.data_type.as_deref().unwrap_or(""),
412            self.query_params.page,
413            self.query_params.page_size
414        );
415        
416        let mut hasher = Sha256::new();
417        hasher.update(params_str.as_bytes());
418        format!("{:x}", hasher.finalize())
419    }
420
421    /// Try to load cached results for current query parameters.
422    fn load_cache(&self) -> Option<Vec<BrokerItem>> {
423        let cache_dir = self.cache_dir.as_ref()?;
424        let cache_file = cache_dir.join(self.cache_key()).with_extension("json");
425        
426        if !cache_file.exists() {
427            return None;
428        }
429        
430        match std::fs::read_to_string(&cache_file) {
431            Ok(contents) => {
432                match serde_json::from_str::<Vec<BrokerItem>>(&contents) {
433                    Ok(items) => {
434                        log::info!("Loaded {} items from cache", items.len());
435                        Some(items)
436                    }
437                    Err(e) => {
438                        log::warn!("Failed to deserialize cache file: {}", e);
439                        None
440                    }
441                }
442            }
443            Err(e) => {
444                log::warn!("Failed to read cache file: {}", e);
445                None
446            }
447        }
448    }
449
450    /// Save results to cache for current query parameters.
451    fn save_cache(&self, items: &[BrokerItem]) {
452        let Some(cache_dir) = self.cache_dir.as_ref() else {
453            return;
454        };
455        
456        let cache_file = cache_dir.join(self.cache_key()).with_extension("json");
457        
458        match serde_json::to_string(items) {
459            Ok(json) => {
460                if let Err(e) = std::fs::write(&cache_file, json) {
461                    log::warn!("Failed to write cache file: {}", e);
462                } else {
463                    log::info!("Saved {} items to cache", items.len());
464                }
465            }
466            Err(e) => {
467                log::warn!("Failed to serialize items for cache: {}", e);
468            }
469        }
470    }
471
472    /// Parse and validate timestamp string with support for multiple formats.
473    ///
474    /// Supported formats:
475    /// - Unix timestamp: "1640995200"
476    /// - RFC3339/ISO8601: "2022-01-01T00:00:00Z", "2022-01-01T12:30:45Z"
477    /// - RFC3339 without Z: "2022-01-01T00:00:00", "2022-01-01T12:30:45"
478    /// - Date with time: "2022-01-01 00:00:00", "2022-01-01 12:30:45"
479    /// - Pure date (start of day): "2022-01-01", "2022/01/01"
480    /// - Pure date with dots: "2022.01.01"
481    /// - Compact date: "20220101"
482    ///
483    /// For pure date formats, the time component defaults to 00:00:00 (start of day).
484    /// Returns a `DateTime<Utc>` for consistent handling and formatting.
485    fn parse_timestamp(timestamp: &str) -> Result<DateTime<Utc>, BrokerError> {
486        let ts_str = timestamp.trim();
487
488        // Try parsing as RFC3339 with timezone (including +00:00, -05:00, Z, etc.)
489        if let Ok(dt_with_tz) = DateTime::parse_from_rfc3339(ts_str) {
490            return Ok(dt_with_tz.with_timezone(&Utc));
491        }
492
493        // Try parsing as RFC3339/ISO8601 with Z
494        if let Ok(naive_dt) = chrono::NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%dT%H:%M:%SZ") {
495            return Ok(Utc.from_utc_datetime(&naive_dt));
496        }
497
498        // Try parsing as RFC3339 without Z (assume UTC)
499        if let Ok(naive_dt) = chrono::NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%dT%H:%M:%S") {
500            return Ok(Utc.from_utc_datetime(&naive_dt));
501        }
502
503        // Try parsing as "YYYY-MM-DD HH:MM:SS" (assume UTC)
504        if let Ok(naive_dt) = chrono::NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%d %H:%M:%S") {
505            return Ok(Utc.from_utc_datetime(&naive_dt));
506        }
507
508        // Try parsing pure date formats and convert to start of day
509        let date_formats = [
510            "%Y-%m-%d", // 2022-01-01
511            "%Y/%m/%d", // 2022/01/01
512            "%Y.%m.%d", // 2022.01.01
513            "%Y%m%d",   // 20220101 - must be exactly 8 digits
514        ];
515
516        for format in &date_formats {
517            if let Ok(date) = NaiveDate::parse_from_str(ts_str, format) {
518                // Additional validation for compact format to ensure it's actually a date
519                if format == &"%Y%m%d" && ts_str.len() != 8 {
520                    continue;
521                }
522                // Convert to start of day in UTC
523                if let Some(naive_datetime) = date.and_hms_opt(0, 0, 0) {
524                    return Ok(Utc.from_utc_datetime(&naive_datetime));
525                }
526            }
527        }
528
529        // Finally, try parsing as Unix timestamp (only if it's reasonable length and all digits)
530        if ts_str.len() >= 9 && ts_str.len() <= 13 && ts_str.chars().all(|c| c.is_ascii_digit()) {
531            if let Ok(timestamp) = ts_str.parse::<i64>() {
532                if let Some(dt) = Utc.timestamp_opt(timestamp, 0).single() {
533                    return Ok(dt);
534                }
535            }
536        }
537
538        Err(BrokerError::ConfigurationError(format!(
539            "Invalid timestamp format '{ts_str}'. Supported formats:\n\
540                - Unix timestamp: '1640995200'\n\
541                - RFC3339 with timezone: '2022-01-01T00:00:00+00:00', '2022-01-01T00:00:00Z', '2022-01-01T05:00:00-05:00'\n\
542                - RFC3339 without timezone: '2022-01-01T00:00:00' (assumes UTC)\n\
543                - Date with time: '2022-01-01 00:00:00'\n\
544                - Pure date: '2022-01-01', '2022/01/01', '2022.01.01', '20220101'"
545        )))
546    }
547
548    /// Validate all configuration parameters before making API calls.
549    ///
550    /// This performs the same validation that was previously done at configuration time,
551    /// but now happens just before queries are executed. Returns normalized query parameters.
552    fn validate_configuration(&self) -> Result<QueryParams, BrokerError> {
553        // Validate timestamps and normalize them
554        let mut normalized_params = self.query_params.clone();
555
556        if let Some(ts) = &self.query_params.ts_start {
557            let parsed_datetime = Self::parse_timestamp(ts)?;
558            normalized_params.ts_start =
559                Some(parsed_datetime.format("%Y-%m-%dT%H:%M:%SZ").to_string());
560        }
561
562        if let Some(ts) = &self.query_params.ts_end {
563            let parsed_datetime = Self::parse_timestamp(ts)?;
564            normalized_params.ts_end =
565                Some(parsed_datetime.format("%Y-%m-%dT%H:%M:%SZ").to_string());
566        }
567
568        // Permissive collector validation: normalize only, no network I/O
569        if let Some(collector_str) = &self.query_params.collector_id {
570            let collectors: Vec<String> = collector_str
571                .split(',')
572                .map(|s| s.trim())
573                .filter(|s| !s.is_empty())
574                .map(|s| s.to_string())
575                .collect();
576
577            if collectors.is_empty() {
578                return Err(BrokerError::ConfigurationError(
579                    "Collector ID cannot be empty".to_string(),
580                ));
581            }
582
583            // Deduplicate while preserving order
584            let mut seen = HashSet::new();
585            let mut deduped = Vec::with_capacity(collectors.len());
586            for c in collectors {
587                if seen.insert(c.clone()) {
588                    deduped.push(c);
589                }
590            }
591
592            normalized_params.collector_id = Some(deduped.join(","));
593        }
594
595        // Validate project
596        if let Some(project_str) = &self.query_params.project {
597            let project_lower = project_str.to_lowercase();
598            match project_lower.as_str() {
599                "rrc" | "riperis" | "ripe_ris" | "routeviews" | "route_views" | "rv" => {
600                    // Valid project
601                }
602                _ => {
603                    return Err(BrokerError::ConfigurationError(format!(
604                        "Invalid project '{project_str}'. Valid projects are: 'riperis' (aliases: 'rrc', 'ripe_ris') or 'routeviews' (aliases: 'route_views', 'rv')"
605                    )));
606                }
607            }
608        }
609
610        // Validate data type
611        if let Some(data_type_str) = &self.query_params.data_type {
612            let data_type_lower = data_type_str.to_lowercase();
613            match data_type_lower.as_str() {
614                "rib" | "ribs" | "r" | "update" | "updates" => {
615                    // Valid data type
616                }
617                _ => {
618                    return Err(BrokerError::ConfigurationError(format!(
619                        "Invalid data type '{data_type_str}'. Valid data types are: 'rib' (aliases: 'ribs', 'r') or 'updates' (alias: 'update')"
620                    )));
621                }
622            }
623        }
624
625        // Validate page number
626        if self.query_params.page < 1 {
627            return Err(BrokerError::ConfigurationError(format!(
628                "Invalid page number {}. Page number must be >= 1",
629                self.query_params.page
630            )));
631        }
632
633        // Validate page size
634        if !(1..=100000).contains(&self.query_params.page_size) {
635            return Err(BrokerError::ConfigurationError(format!(
636                "Invalid page size {}. Page size must be between 1 and 100000",
637                self.query_params.page_size
638            )));
639        }
640
641        Ok(normalized_params)
642    }
643
644    /// Add a filter of starting timestamp.
645    ///
646    /// Supports multiple timestamp formats including Unix timestamps, RFC3339 dates, and pure dates.
647    /// Validation occurs at query time.
648    ///
649    /// # Examples
650    ///
651    /// Specify a Unix timestamp:
652    /// ```
653    /// let broker = bgpkit_broker::BgpkitBroker::new()
654    ///     .ts_start("1640995200");
655    /// ```
656    ///
657    /// Specify a RFC3339-formatted time string:
658    /// ```
659    /// let broker = bgpkit_broker::BgpkitBroker::new()
660    ///     .ts_start("2022-01-01T00:00:00Z");
661    /// ```
662    ///
663    /// Specify a pure date (defaults to start of day):
664    /// ```
665    /// let broker = bgpkit_broker::BgpkitBroker::new()
666    ///     .ts_start("2022-01-01");
667    /// ```
668    ///
669    /// Other supported formats:
670    /// ```
671    /// let broker = bgpkit_broker::BgpkitBroker::new()
672    ///     .ts_start("2022/01/01")  // slash format
673    ///     .ts_start("2022.01.01")  // dot format
674    ///     .ts_start("20220101");   // compact format
675    /// ```
676    pub fn ts_start<S: Display>(self, ts_start: S) -> Self {
677        let mut query_params = self.query_params;
678        query_params.ts_start = Some(ts_start.to_string());
679        Self {
680            broker_url: self.broker_url,
681            query_params,
682            client: self.client,
683            collector_project_map: self.collector_project_map,
684            accept_invalid_certs: self.accept_invalid_certs,
685            cache_dir: self.cache_dir,
686        }
687    }
688
689    /// Add a filter of ending timestamp.
690    ///
691    /// Supports the same multiple timestamp formats as `ts_start`.
692    /// Validation occurs at query time.
693    ///
694    /// # Examples
695    ///
696    /// Specify a Unix timestamp:
697    /// ```
698    /// let broker = bgpkit_broker::BgpkitBroker::new()
699    ///     .ts_end("1640995200");
700    /// ```
701    ///
702    /// Specify a RFC3339-formatted time string:
703    /// ```
704    /// let broker = bgpkit_broker::BgpkitBroker::new()
705    ///     .ts_end("2022-01-01T00:00:00Z");
706    /// ```
707    ///
708    /// Specify a pure date (defaults to start of day):
709    /// ```
710    /// let broker = bgpkit_broker::BgpkitBroker::new()
711    ///     .ts_end("2022-01-01");
712    /// ```
713    pub fn ts_end<S: Display>(self, ts_end: S) -> Self {
714        let mut query_params = self.query_params;
715        query_params.ts_end = Some(ts_end.to_string());
716        Self {
717            broker_url: self.broker_url,
718            client: self.client,
719            query_params,
720            collector_project_map: self.collector_project_map,
721            accept_invalid_certs: self.accept_invalid_certs,
722            cache_dir: self.cache_dir,
723        }
724    }
725
726    /// Add a filter of collector ID (e.g. `rrc00` or `route-views2`).
727    ///
728    /// See the full list of collectors [here](https://github.com/bgpkit/bgpkit-broker-backend/blob/main/deployment/full-config.json).
729    /// Validation occurs at query time.
730    ///
731    /// # Examples
732    ///
733    /// filter by single collector
734    /// ```
735    /// let broker = bgpkit_broker::BgpkitBroker::new()
736    ///     .collector_id("rrc00");
737    /// ```
738    ///
739    /// filter by multiple collector
740    /// ```
741    /// let broker = bgpkit_broker::BgpkitBroker::new()
742    ///     .collector_id("route-views2,route-views6");
743    /// ```
744    pub fn collector_id<S: Display>(self, collector_id: S) -> Self {
745        let mut query_params = self.query_params;
746        query_params.collector_id = Some(collector_id.to_string());
747        Self {
748            client: self.client,
749            broker_url: self.broker_url,
750            query_params,
751            collector_project_map: self.collector_project_map,
752            accept_invalid_certs: self.accept_invalid_certs,
753            cache_dir: self.cache_dir,
754        }
755    }
756
757    /// Add a filter of project name with validation, i.e. `riperis` or `routeviews`.
758    ///
759    /// # Examples
760    ///
761    /// ```
762    /// let broker = bgpkit_broker::BgpkitBroker::new()
763    ///     .project("riperis");
764    /// ```
765    ///
766    /// ```
767    /// let broker = bgpkit_broker::BgpkitBroker::new()
768    ///     .project("routeviews");
769    /// ```
770    pub fn project<S: Display>(self, project: S) -> Self {
771        let mut query_params = self.query_params;
772        query_params.project = Some(project.to_string());
773        Self {
774            client: self.client,
775            broker_url: self.broker_url,
776            query_params,
777            collector_project_map: self.collector_project_map,
778            accept_invalid_certs: self.accept_invalid_certs,
779            cache_dir: self.cache_dir,
780        }
781    }
782
783    /// Add filter of data type, i.e. `rib` or `updates`.
784    ///
785    /// Validation occurs at query time.
786    ///
787    /// # Examples
788    ///
789    /// ```
790    /// let broker = bgpkit_broker::BgpkitBroker::new()
791    ///     .data_type("rib");
792    /// ```
793    ///
794    /// ```
795    /// let broker = bgpkit_broker::BgpkitBroker::new()
796    ///     .data_type("updates");
797    /// ```
798    pub fn data_type<S: Display>(self, data_type: S) -> Self {
799        let mut query_params = self.query_params;
800        query_params.data_type = Some(data_type.to_string());
801        Self {
802            broker_url: self.broker_url,
803            client: self.client,
804            query_params,
805            collector_project_map: self.collector_project_map,
806            accept_invalid_certs: self.accept_invalid_certs,
807            cache_dir: self.cache_dir,
808        }
809    }
810
811    /// Change the current page number, starting from 1.
812    ///
813    /// Validation occurs at query time.
814    ///
815    /// # Examples
816    ///
817    /// Start iterating with page 2.
818    /// ```
819    /// let broker = bgpkit_broker::BgpkitBroker::new()
820    ///     .page(2);
821    /// ```
822    pub fn page(self, page: i64) -> Self {
823        let mut query_params = self.query_params;
824        query_params.page = page;
825        Self {
826            broker_url: self.broker_url,
827            client: self.client,
828            query_params,
829            collector_project_map: self.collector_project_map,
830            accept_invalid_certs: self.accept_invalid_certs,
831            cache_dir: self.cache_dir,
832        }
833    }
834
835    /// Change current page size, default 100.
836    ///
837    /// Validation occurs at query time.
838    ///
839    /// # Examples
840    ///
841    /// Set page size to 20.
842    /// ```
843    /// let broker = bgpkit_broker::BgpkitBroker::new()
844    ///     .page_size(10);
845    /// ```
846    pub fn page_size(self, page_size: i64) -> Self {
847        let mut query_params = self.query_params;
848        query_params.page_size = page_size;
849        Self {
850            broker_url: self.broker_url,
851            client: self.client,
852            query_params,
853            collector_project_map: self.collector_project_map,
854            accept_invalid_certs: self.accept_invalid_certs,
855            cache_dir: self.cache_dir,
856        }
857    }
858
859    /// Add a filter of peer IP address when listing peers.
860    ///
861    /// # Examples
862    ///
863    /// ```
864    /// let broker = bgpkit_broker::BgpkitBroker::new()
865    ///    .peers_ip("192.168.1.1".parse().unwrap());
866    /// ```
867    pub fn peers_ip(self, peer_ip: IpAddr) -> Self {
868        let mut query_params = self.query_params;
869        query_params.peers_ip = Some(peer_ip);
870        Self {
871            broker_url: self.broker_url,
872            client: self.client,
873            query_params,
874            collector_project_map: self.collector_project_map,
875            accept_invalid_certs: self.accept_invalid_certs,
876            cache_dir: self.cache_dir,
877        }
878    }
879
880    /// Add a filter of peer ASN when listing peers.
881    ///
882    /// # Examples
883    ///
884    /// ```
885    /// let broker = bgpkit_broker::BgpkitBroker::new()
886    ///    .peers_asn(64496);
887    /// ```
888    pub fn peers_asn(self, peer_asn: u32) -> Self {
889        let mut query_params = self.query_params;
890        query_params.peers_asn = Some(peer_asn);
891        Self {
892            broker_url: self.broker_url,
893            client: self.client,
894            query_params,
895            collector_project_map: self.collector_project_map,
896            accept_invalid_certs: self.accept_invalid_certs,
897            cache_dir: self.cache_dir,
898        }
899    }
900
901    /// Add a filter of peer full feed status when listing peers.
902    ///
903    /// # Examples
904    ///
905    /// ```
906    /// let broker = bgpkit_broker::BgpkitBroker::new()
907    ///   .peers_only_full_feed(true);
908    /// ```
909    pub fn peers_only_full_feed(self, peer_full_feed: bool) -> Self {
910        let mut query_params = self.query_params;
911        query_params.peers_only_full_feed = peer_full_feed;
912        Self {
913            broker_url: self.broker_url,
914            client: self.client,
915            query_params,
916            collector_project_map: self.collector_project_map,
917            accept_invalid_certs: self.accept_invalid_certs,
918            cache_dir: self.cache_dir,
919        }
920    }
921
922    /// Turn to specified page, page starting from 1.
923    ///
924    /// This works with [Self::query_single_page] function to manually paginate.
925    ///
926    /// # Examples
927    ///
928    /// Manually get the first two pages of items.
929    /// ```no_run
930    /// let mut broker = bgpkit_broker::BgpkitBroker::new();
931    /// let mut items = vec![];
932    /// items.extend(broker.query_single_page().unwrap());
933    /// broker.turn_page(2);
934    /// items.extend(broker.query_single_page().unwrap());
935    /// ```
936    pub fn turn_page(&mut self, page: i64) {
937        self.query_params.page = page;
938    }
939
940    /// Send API for a single page of items.
941    ///
942    /// # Examples
943    ///
944    /// Manually get the first page of items.
945    /// ```no_run
946    /// let broker = bgpkit_broker::BgpkitBroker::new();
947    /// let items = broker.query_single_page().unwrap();
948    /// ```
949    pub fn query_single_page(&self) -> Result<Vec<BrokerItem>, BrokerError> {
950        // Try to load from cache first
951        if let Some(cached_items) = self.load_cache() {
952            return Ok(cached_items);
953        }
954        
955        let validated_params = self.validate_configuration()?;
956        let url = format!("{}/search{}", &self.broker_url, &validated_params);
957        log::info!("sending broker query to {}", &url);
958        match self.run_files_query(url.as_str()) {
959            Ok(res) => {
960                // Save to cache if cache_dir is set
961                self.save_cache(&res.data);
962                Ok(res.data)
963            }
964            Err(e) => Err(e),
965        }
966    }
967
968    /// Query the total count of items matching the current search criteria without fetching the items.
969    ///
970    /// This method is useful when you need to know how many items match your search criteria
971    /// without downloading all the items. It performs the same validation as a regular query
972    /// but only returns the count.
973    ///
974    /// # Returns
975    /// - `Ok(i64)`: The total number of matching items
976    /// - `Err(BrokerError)`: If the query fails or the count is missing from the response
977    ///
978    /// # Examples
979    ///
980    /// ```no_run
981    /// use bgpkit_broker::BgpkitBroker;
982    ///
983    /// let broker = BgpkitBroker::new()
984    ///     .ts_start("2024-01-01")
985    ///     .ts_end("2024-01-02")
986    ///     .collector_id("route-views2");
987    ///
988    /// let count = broker.query_total_count().unwrap();
989    /// println!("Found {} matching items", count);
990    /// ```
991    pub fn query_total_count(&self) -> Result<i64, BrokerError> {
992        let validated_params = self.validate_configuration()?;
993        let url = format!("{}/search{}", &self.broker_url, &validated_params);
994        match self.run_files_query(url.as_str()) {
995            Ok(res) => res.total.ok_or(BrokerError::BrokerError(
996                "count not found in response".to_string(),
997            )),
998            Err(e) => Err(e),
999        }
1000    }
1001
1002    /// Check if the broker instance is healthy.
1003    ///
1004    /// # Examples
1005    ///
1006    /// ```no_run
1007    /// let broker = bgpkit_broker::BgpkitBroker::new();
1008    /// assert!(broker.health_check().is_ok())
1009    /// ```
1010    pub fn health_check(&self) -> Result<(), BrokerError> {
1011        let url = format!("{}/health", &self.broker_url.trim_end_matches('/'));
1012        match self.client.get(url.as_str()).send() {
1013            Ok(response) => {
1014                if response.status() == reqwest::StatusCode::OK {
1015                    Ok(())
1016                } else {
1017                    Err(BrokerError::BrokerError(format!(
1018                        "endpoint unhealthy {}",
1019                        self.broker_url
1020                    )))
1021                }
1022            }
1023            Err(_e) => Err(BrokerError::BrokerError(format!(
1024                "endpoint unhealthy {}",
1025                self.broker_url
1026            ))),
1027        }
1028    }
1029
1030    /// Send a query to get **all** data times returned.
1031    ///
1032    /// This usually is what one needs.
1033    ///
1034    /// # Examples
1035    ///
1036    /// Get all RIB files on 2022-01-01 from route-views2.
1037    /// ```no_run
1038    /// let broker = bgpkit_broker::BgpkitBroker::new()
1039    ///     .ts_start("2022-01-01T00:00:00Z")
1040    ///     .ts_end("2022-01-01T23:59:00Z")
1041    ///     .data_type("rib")
1042    ///     .collector_id("route-views2");
1043    /// let items = broker.query().unwrap();
1044    ///
1045    /// // 1 RIB dump very 2 hours, total of 12 files for 1 day
1046    /// assert_eq!(items.len(), 12);
1047    /// ```
1048    pub fn query(&self) -> Result<Vec<BrokerItem>, BrokerError> {
1049        let mut p = self.validate_configuration()?;
1050
1051        let mut items = vec![];
1052        loop {
1053            let url = format!("{}/search{}", &self.broker_url, &p);
1054
1055            let res_items = self.run_files_query(url.as_str())?.data;
1056
1057            let items_count = res_items.len() as i64;
1058
1059            if items_count == 0 {
1060                // reaches the end
1061                break;
1062            }
1063
1064            items.extend(res_items);
1065            let cur_page = p.page;
1066            p = p.page(cur_page + 1);
1067
1068            if items_count < p.page_size {
1069                // reaches the end
1070                break;
1071            }
1072        }
1073        Ok(items)
1074    }
1075
1076    /// Send a query to get the **latest** data for each collector.
1077    ///
1078    /// The returning result is structured as a vector of [CollectorLatestItem] objects.
1079    ///
1080    /// # Examples
1081    ///
1082    /// ```no_run
1083    /// let broker = bgpkit_broker::BgpkitBroker::new();
1084    /// let latest_items = broker.latest().unwrap();
1085    /// for item in &latest_items {
1086    ///     println!("{}", item);
1087    /// }
1088    /// ```
1089    pub fn latest(&self) -> Result<Vec<BrokerItem>, BrokerError> {
1090        let latest_query_url = format!("{}/latest", self.broker_url);
1091        let mut items = match self.client.get(latest_query_url.as_str()).send() {
1092            Ok(response) => match response.json::<CollectorLatestResult>() {
1093                Ok(result) => result.data,
1094                Err(_) => {
1095                    return Err(BrokerError::BrokerError(
1096                        "Error parsing response".to_string(),
1097                    ));
1098                }
1099            },
1100            Err(e) => {
1101                return Err(BrokerError::BrokerError(format!(
1102                    "Unable to connect to the URL ({latest_query_url}): {e}"
1103                )));
1104            }
1105        };
1106
1107        items.retain(|item| {
1108            let mut matches = true;
1109            if let Some(project) = &self.query_params.project {
1110                match project.to_lowercase().as_str() {
1111                    "rrc" | "riperis" | "ripe_ris" => {
1112                        matches = self
1113                            .collector_project_map
1114                            .get(&item.collector_id)
1115                            .cloned()
1116                            .unwrap_or_default()
1117                            .as_str()
1118                            == "riperis";
1119                    }
1120                    "routeviews" | "route_views" | "rv" => {
1121                        matches = self
1122                            .collector_project_map
1123                            .get(&item.collector_id)
1124                            .cloned()
1125                            .unwrap_or_default()
1126                            .as_str()
1127                            == "routeviews";
1128                    }
1129                    _ => {}
1130                }
1131            }
1132
1133            if let Some(data_type) = &self.query_params.data_type {
1134                match data_type.to_lowercase().as_str() {
1135                    "rib" | "ribs" | "r" => {
1136                        if !item.is_rib() {
1137                            // if not RIB file, not match
1138                            matches = false
1139                        }
1140                    }
1141                    "update" | "updates" => {
1142                        if item.is_rib() {
1143                            // if is RIB file, not match
1144                            matches = false
1145                        }
1146                    }
1147                    _ => {}
1148                }
1149            }
1150
1151            if let Some(collector_id) = &self.query_params.collector_id {
1152                let wanted: HashSet<&str> = collector_id
1153                    .split(',')
1154                    .map(|s| s.trim())
1155                    .filter(|s| !s.is_empty())
1156                    .collect();
1157
1158                if !wanted.contains(item.collector_id.as_str()) {
1159                    return false;
1160                }
1161            }
1162
1163            matches
1164        });
1165
1166        Ok(items)
1167    }
1168
1169    /// Get the most recent information for collector peers.
1170    ///
1171    /// The returning result is structured as a vector of [BrokerPeer] objects.
1172    ///
1173    /// # Examples
1174    ///
1175    /// ## Get all peers
1176    ///
1177    /// ```no_run
1178    /// let broker = bgpkit_broker::BgpkitBroker::new();
1179    /// let peers = broker.get_peers().unwrap();
1180    /// for peer in &peers {
1181    ///     println!("{:?}", peer);
1182    /// }
1183    /// ```
1184    ///
1185    /// ## Get peers from a specific collector
1186    ///
1187    /// ```no_run
1188    /// let broker = bgpkit_broker::BgpkitBroker::new()
1189    ///    .collector_id("route-views2");
1190    /// let peers = broker.get_peers().unwrap();
1191    /// for peer in &peers {
1192    ///    println!("{:?}", peer);
1193    /// }
1194    /// ```
1195    ///
1196    /// ## Get peers from a specific ASN
1197    ///
1198    /// ```no_run
1199    /// let broker = bgpkit_broker::BgpkitBroker::new()
1200    ///   .peers_asn(64496);
1201    /// let peers = broker.get_peers().unwrap();
1202    /// for peer in &peers {
1203    ///    println!("{:?}", peer);
1204    /// }
1205    /// ```
1206    ///
1207    /// ## Get peers from a specific IP address
1208    ///
1209    /// ```no_run
1210    /// let broker = bgpkit_broker::BgpkitBroker::new()
1211    ///   .peers_ip("192.168.1.1".parse().unwrap());
1212    /// let peers = broker.get_peers().unwrap();
1213    /// for peer in &peers {
1214    ///   println!("{:?}", peer);
1215    /// }
1216    /// ```
1217    ///
1218    /// ## Get peers with full feed
1219    ///
1220    /// ```no_run
1221    /// let broker = bgpkit_broker::BgpkitBroker::new()
1222    ///  .peers_only_full_feed(true);
1223    /// let peers = broker.get_peers().unwrap();
1224    /// for peer in &peers {
1225    ///     println!("{:?}", peer);
1226    /// }
1227    /// ```
1228    ///
1229    /// ## Get peers from a specific collector with full feed
1230    ///
1231    /// ```no_run
1232    /// let broker = bgpkit_broker::BgpkitBroker::new()
1233    ///  .collector_id("route-views2")
1234    /// .peers_only_full_feed(true);
1235    /// let peers = broker.get_peers().unwrap();
1236    /// for peer in &peers {
1237    ///    println!("{:?}", peer);
1238    /// }
1239    /// ```
1240    pub fn get_peers(&self) -> Result<Vec<BrokerPeer>, BrokerError> {
1241        let mut url = format!("{}/peers", self.broker_url);
1242        let mut param_strings = vec![];
1243        if let Some(ip) = &self.query_params.peers_ip {
1244            param_strings.push(format!("ip={ip}"));
1245        }
1246        if let Some(asn) = &self.query_params.peers_asn {
1247            param_strings.push(format!("asn={asn}"));
1248        }
1249        if self.query_params.peers_only_full_feed {
1250            param_strings.push("full_feed=true".to_string());
1251        }
1252        if let Some(collector_id) = &self.query_params.collector_id {
1253            param_strings.push(format!("collector={collector_id}"));
1254        }
1255        if !param_strings.is_empty() {
1256            let param_string = param_strings.join("&");
1257            url = format!("{url}?{param_string}");
1258        }
1259
1260        let peers = match self.client.get(url.as_str()).send() {
1261            Ok(response) => match response.json::<BrokerPeersResult>() {
1262                Ok(result) => result.data,
1263                Err(_) => {
1264                    return Err(BrokerError::BrokerError(
1265                        "Error parsing response".to_string(),
1266                    ));
1267                }
1268            },
1269            Err(e) => {
1270                return Err(BrokerError::BrokerError(format!(
1271                    "Unable to connect to the URL ({url}): {e}"
1272                )));
1273            }
1274        };
1275        Ok(peers)
1276    }
1277
1278    fn run_files_query(&self, url: &str) -> Result<BrokerQueryResult, BrokerError> {
1279        log::info!("sending broker query to {}", &url);
1280        match self.client.get(url).send() {
1281            Ok(res) => match res.json::<BrokerQueryResult>() {
1282                Ok(res) => {
1283                    if let Some(e) = res.error {
1284                        Err(BrokerError::BrokerError(e))
1285                    } else {
1286                        Ok(res)
1287                    }
1288                }
1289                Err(e) => {
1290                    // json decoding error. most likely the service returns an error message without
1291                    // `data` field.
1292                    Err(BrokerError::BrokerError(e.to_string()))
1293                }
1294            },
1295            Err(e) => Err(BrokerError::from(e)),
1296        }
1297    }
1298}
1299
1300/// Iterator for BGPKIT Broker that iterates through one [BrokerItem] at a time.
1301///
1302/// The [IntoIterator] trait is implemented for both the struct and the reference, so that you can
1303/// either iterate through items by taking the ownership of the broker, or use the reference to broker
1304/// to iterate.
1305///
1306/// ```no_run
1307/// use bgpkit_broker::{BgpkitBroker, BrokerItem};
1308///
1309/// let mut broker = BgpkitBroker::new()
1310///     .ts_start("1634693400")
1311///     .ts_end("1634693400")
1312///     .page_size(10)
1313///     .page(2);
1314///
1315/// // create iterator from reference (so that you can reuse the broker object)
1316/// // same as `&broker.into_intr()`
1317/// for item in &broker {
1318///     println!("{}", item);
1319/// }
1320///
1321/// // create iterator from the broker object (taking ownership)
1322/// let items = broker.into_iter().collect::<Vec<BrokerItem>>();
1323///
1324/// assert_eq!(items.len(), 43);
1325/// ```
1326pub struct BrokerItemIterator {
1327    broker: BgpkitBroker,
1328    cached_items: Vec<BrokerItem>,
1329    first_run: bool,
1330}
1331
1332impl BrokerItemIterator {
1333    pub fn new(broker: BgpkitBroker) -> BrokerItemIterator {
1334        BrokerItemIterator {
1335            broker,
1336            cached_items: vec![],
1337            first_run: true,
1338        }
1339    }
1340}
1341
1342impl Iterator for BrokerItemIterator {
1343    type Item = BrokerItem;
1344
1345    fn next(&mut self) -> Option<Self::Item> {
1346        // if we have cached items, simply pop and return
1347        if let Some(item) = self.cached_items.pop() {
1348            return Some(item);
1349        }
1350
1351        // no more cached items, refill cache by one more broker query
1352        if self.first_run {
1353            // if it's the first time running, do not change page, and switch the flag.
1354            self.first_run = false;
1355        } else {
1356            // if it's not the first time running, add page number by one.
1357            self.broker.query_params.page += 1;
1358        }
1359
1360        // query the current page
1361        let items = match self.broker.query_single_page() {
1362            Ok(i) => i,
1363            Err(_) => return None,
1364        };
1365
1366        if items.is_empty() {
1367            // break out the iteration
1368            return None;
1369        } else {
1370            // fill the cache
1371            self.cached_items = items;
1372            self.cached_items.reverse();
1373        }
1374
1375        #[allow(clippy::unwrap_used)]
1376        Some(self.cached_items.pop().unwrap())
1377    }
1378}
1379
1380impl IntoIterator for BgpkitBroker {
1381    type Item = BrokerItem;
1382    type IntoIter = BrokerItemIterator;
1383
1384    fn into_iter(self) -> Self::IntoIter {
1385        BrokerItemIterator::new(self)
1386    }
1387}
1388
1389impl IntoIterator for &BgpkitBroker {
1390    type Item = BrokerItem;
1391    type IntoIter = BrokerItemIterator;
1392
1393    fn into_iter(self) -> Self::IntoIter {
1394        BrokerItemIterator::new(self.clone())
1395    }
1396}
1397
1398#[cfg(test)]
1399mod tests {
1400    use super::*;
1401
1402    #[test]
1403    fn test_query() {
1404        let broker = BgpkitBroker::new()
1405            .ts_start("1634693400")
1406            .ts_end("1634693400");
1407        let res = broker.query();
1408        assert!(&res.is_ok());
1409        let data = res.unwrap();
1410        assert!(!data.is_empty());
1411    }
1412
1413    #[test]
1414    fn test_network_error() {
1415        let broker = BgpkitBroker::new().broker_url("https://api.broker.example.com/v2");
1416        let res = broker.query();
1417        // when testing a must-fail query, you could use `matches!` macro to do so
1418        assert!(res.is_err());
1419        assert!(matches!(res.err(), Some(BrokerError::NetworkError(_))));
1420    }
1421
1422    #[test]
1423    fn test_broker_error() {
1424        let broker = BgpkitBroker::new().page(-1);
1425        let result = broker.query();
1426        assert!(result.is_err());
1427        assert!(matches!(
1428            result.err(),
1429            Some(BrokerError::ConfigurationError(_))
1430        ));
1431    }
1432
1433    #[test]
1434    fn test_query_all() {
1435        let broker = BgpkitBroker::new()
1436            .ts_start("1634693400")
1437            .ts_end("1634693400")
1438            .page_size(100);
1439        let res = broker.query();
1440        assert!(res.is_ok());
1441        assert!(res.ok().unwrap().len() >= 54);
1442    }
1443
1444    #[test]
1445    fn test_iterator() {
1446        let broker = BgpkitBroker::new()
1447            .ts_start("1634693400")
1448            .ts_end("1634693400");
1449        assert!(broker.into_iter().count() >= 54);
1450    }
1451
1452    #[test]
1453    fn test_filters() {
1454        let broker = BgpkitBroker::new()
1455            .ts_start("1634693400")
1456            .ts_end("1634693400");
1457        let items = broker.query().unwrap();
1458        assert!(items.len() >= 54);
1459
1460        let broker = BgpkitBroker::new()
1461            .ts_start("1634693400")
1462            .ts_end("1634693400")
1463            .collector_id("rrc00");
1464        let items = broker.query().unwrap();
1465        assert_eq!(items.len(), 1);
1466
1467        let broker = BgpkitBroker::new()
1468            .ts_start("1634693400")
1469            .ts_end("1634693400")
1470            .project("riperis");
1471        let items = broker.query().unwrap();
1472        assert_eq!(items.len(), 23);
1473    }
1474
1475    #[test]
1476    fn test_latest() {
1477        let broker = BgpkitBroker::new();
1478        let items = broker.latest().unwrap();
1479        assert!(items.len() >= 125);
1480
1481        let broker = BgpkitBroker::new().project("routeviews".to_string());
1482        let items = broker.latest().unwrap();
1483        assert!(!items.is_empty());
1484        assert!(items
1485            .iter()
1486            .all(|item| !item.collector_id.starts_with("rrc")));
1487
1488        let broker = BgpkitBroker::new().project("riperis".to_string());
1489        let items = broker.latest().unwrap();
1490        assert!(!items.is_empty());
1491        assert!(items
1492            .iter()
1493            .all(|item| item.collector_id.starts_with("rrc")));
1494
1495        let broker = BgpkitBroker::new().data_type("rib".to_string());
1496        let items = broker.latest().unwrap();
1497        assert!(!items.is_empty());
1498        assert!(items.iter().all(|item| item.is_rib()));
1499
1500        let broker = BgpkitBroker::new().data_type("update".to_string());
1501        let items = broker.latest().unwrap();
1502        assert!(!items.is_empty());
1503        assert!(items.iter().all(|item| !item.is_rib()));
1504
1505        let broker = BgpkitBroker::new().collector_id("rrc00".to_string());
1506        let items = broker.latest().unwrap();
1507        assert!(!items.is_empty());
1508        assert!(items
1509            .iter()
1510            .all(|item| item.collector_id.as_str() == "rrc00"));
1511        assert_eq!(items.len(), 2);
1512    }
1513
1514    #[test]
1515    fn test_latest_no_ssl() {
1516        let broker = BgpkitBroker::new().accept_invalid_certs();
1517        let items = broker.latest().unwrap();
1518        assert!(items.len() >= 125);
1519    }
1520
1521    #[test]
1522    fn test_health_check() {
1523        let broker = BgpkitBroker::new();
1524        let res = broker.health_check();
1525        assert!(res.is_ok());
1526    }
1527
1528    #[test]
1529    fn test_peers() {
1530        let broker = BgpkitBroker::new();
1531        let all_peers = broker.get_peers().unwrap();
1532        assert!(!all_peers.is_empty());
1533        let first_peer = all_peers.first().unwrap();
1534        let first_ip = first_peer.ip;
1535        let first_asn = first_peer.asn;
1536
1537        let broker = BgpkitBroker::new().peers_ip(first_ip);
1538        let peers = broker.get_peers().unwrap();
1539        assert!(!peers.is_empty());
1540
1541        let broker = BgpkitBroker::new().peers_asn(first_asn);
1542        let peers = broker.get_peers().unwrap();
1543        assert!(!peers.is_empty());
1544
1545        let broker = BgpkitBroker::new().peers_only_full_feed(true);
1546        let full_feed_peers = broker.get_peers().unwrap();
1547        assert!(!full_feed_peers.is_empty());
1548        assert!(full_feed_peers.len() < all_peers.len());
1549
1550        let broker = BgpkitBroker::new().collector_id("rrc00");
1551        let rrc_peers = broker.get_peers().unwrap();
1552        assert!(!rrc_peers.is_empty());
1553        assert!(rrc_peers.iter().all(|peer| peer.collector == "rrc00"));
1554
1555        let broker = BgpkitBroker::new().collector_id("rrc00,route-views2");
1556        let rrc_rv_peers = broker.get_peers().unwrap();
1557        assert!(!rrc_rv_peers.is_empty());
1558        assert!(rrc_rv_peers
1559            .iter()
1560            .any(|peer| peer.collector == "rrc00" || peer.collector == "route-views2"));
1561
1562        assert!(rrc_rv_peers.len() > rrc_peers.len());
1563    }
1564
1565    #[test]
1566    fn test_timestamp_parsing_unix() {
1567        let broker = BgpkitBroker::new();
1568
1569        // Valid Unix timestamps - configuration succeeds, normalization happens at query time
1570        let result = broker.clone().ts_start("1640995200");
1571        // Raw input is stored during configuration
1572        assert_eq!(result.query_params.ts_start, Some("1640995200".to_string()));
1573
1574        let result = broker.clone().ts_end("1640995200");
1575        assert_eq!(result.query_params.ts_end, Some("1640995200".to_string()));
1576    }
1577
1578    #[test]
1579    fn test_timestamp_parsing_rfc3339() {
1580        let broker = BgpkitBroker::new();
1581
1582        // RFC3339 with Z - raw input stored during configuration
1583        let result = broker.clone().ts_start("2022-01-01T00:00:00Z");
1584        assert_eq!(
1585            result.query_params.ts_start,
1586            Some("2022-01-01T00:00:00Z".to_string())
1587        );
1588
1589        // RFC3339 without Z - raw input stored during configuration
1590        let result = broker.clone().ts_start("2022-01-01T12:30:45");
1591        assert_eq!(
1592            result.query_params.ts_start,
1593            Some("2022-01-01T12:30:45".to_string())
1594        );
1595
1596        // Date with time format - raw input stored during configuration
1597        let result = broker.clone().ts_end("2022-01-01 12:30:45");
1598        assert_eq!(
1599            result.query_params.ts_end,
1600            Some("2022-01-01 12:30:45".to_string())
1601        );
1602    }
1603
1604    #[test]
1605    fn test_timestamp_parsing_pure_dates() {
1606        let broker = BgpkitBroker::new();
1607
1608        // Standard date format - raw input stored during configuration
1609        let result = broker.clone().ts_start("2022-01-01");
1610        assert_eq!(result.query_params.ts_start, Some("2022-01-01".to_string()));
1611
1612        // Slash format
1613        let result = broker.clone().ts_start("2022/01/01");
1614        assert_eq!(result.query_params.ts_start, Some("2022/01/01".to_string()));
1615
1616        // Dot format
1617        let result = broker.clone().ts_end("2022.01.01");
1618        assert_eq!(result.query_params.ts_end, Some("2022.01.01".to_string()));
1619
1620        // Compact format
1621        let result = broker.clone().ts_end("20220101");
1622        assert_eq!(result.query_params.ts_end, Some("20220101".to_string()));
1623    }
1624
1625    #[test]
1626    fn test_timestamp_parsing_whitespace() {
1627        let broker = BgpkitBroker::new();
1628
1629        // Test that raw input with whitespace is stored during configuration
1630        let result = broker.clone().ts_start("  2022-01-01  ");
1631        assert_eq!(
1632            result.query_params.ts_start,
1633            Some("  2022-01-01  ".to_string())
1634        );
1635
1636        let result = broker.clone().ts_end("\t1640995200\n");
1637        assert_eq!(
1638            result.query_params.ts_end,
1639            Some("\t1640995200\n".to_string())
1640        );
1641    }
1642
1643    #[test]
1644    fn test_timestamp_parsing_errors() {
1645        let broker = BgpkitBroker::new();
1646
1647        // Invalid format - error occurs at query time
1648        let broker_with_invalid = broker.clone().ts_start("invalid-timestamp");
1649        let result = broker_with_invalid.query();
1650        assert!(result.is_err());
1651        assert!(matches!(
1652            result.err(),
1653            Some(BrokerError::ConfigurationError(_))
1654        ));
1655
1656        // Invalid date - error occurs at query time
1657        let broker_with_invalid = broker.clone().ts_end("2022-13-01");
1658        let result = broker_with_invalid.query();
1659        assert!(result.is_err());
1660        assert!(matches!(
1661            result.err(),
1662            Some(BrokerError::ConfigurationError(_))
1663        ));
1664
1665        // Invalid compact date - error occurs at query time
1666        let broker_with_invalid = broker.clone().ts_start("20221301");
1667        let result = broker_with_invalid.query();
1668        assert!(result.is_err());
1669        assert!(matches!(
1670            result.err(),
1671            Some(BrokerError::ConfigurationError(_))
1672        ));
1673
1674        // Partially valid format - error occurs at query time
1675        let broker_with_invalid = broker.clone().ts_start("2022-01");
1676        let result = broker_with_invalid.query();
1677        assert!(result.is_err());
1678        assert!(matches!(
1679            result.err(),
1680            Some(BrokerError::ConfigurationError(_))
1681        ));
1682    }
1683
1684    #[test]
1685    fn test_parse_timestamp_direct() {
1686        use chrono::{NaiveDate, NaiveDateTime};
1687
1688        // Test the parse_timestamp function directly - it now returns DateTime<Utc>
1689
1690        // Unix timestamp
1691        let expected_unix = Utc.timestamp_opt(1640995200, 0).single().unwrap();
1692        assert_eq!(
1693            BgpkitBroker::parse_timestamp("1640995200").unwrap(),
1694            expected_unix
1695        );
1696
1697        // RFC3339 formats
1698        let expected_rfc3339_z = Utc.from_utc_datetime(
1699            &NaiveDateTime::parse_from_str("2022-01-01T00:00:00", "%Y-%m-%dT%H:%M:%S").unwrap(),
1700        );
1701        assert_eq!(
1702            BgpkitBroker::parse_timestamp("2022-01-01T00:00:00Z").unwrap(),
1703            expected_rfc3339_z
1704        );
1705
1706        let expected_rfc3339_no_z = Utc.from_utc_datetime(
1707            &NaiveDateTime::parse_from_str("2022-01-01T12:30:45", "%Y-%m-%dT%H:%M:%S").unwrap(),
1708        );
1709        assert_eq!(
1710            BgpkitBroker::parse_timestamp("2022-01-01T12:30:45").unwrap(),
1711            expected_rfc3339_no_z
1712        );
1713
1714        let expected_space_format = Utc.from_utc_datetime(
1715            &NaiveDateTime::parse_from_str("2022-01-01 12:30:45", "%Y-%m-%d %H:%M:%S").unwrap(),
1716        );
1717        assert_eq!(
1718            BgpkitBroker::parse_timestamp("2022-01-01 12:30:45").unwrap(),
1719            expected_space_format
1720        );
1721
1722        // Pure date formats (all convert to start of day in UTC)
1723        let expected_date = Utc.from_utc_datetime(
1724            &NaiveDate::from_ymd_opt(2022, 1, 1)
1725                .unwrap()
1726                .and_hms_opt(0, 0, 0)
1727                .unwrap(),
1728        );
1729        assert_eq!(
1730            BgpkitBroker::parse_timestamp("2022-01-01").unwrap(),
1731            expected_date
1732        );
1733        assert_eq!(
1734            BgpkitBroker::parse_timestamp("2022/01/01").unwrap(),
1735            expected_date
1736        );
1737        assert_eq!(
1738            BgpkitBroker::parse_timestamp("2022.01.01").unwrap(),
1739            expected_date
1740        );
1741        assert_eq!(
1742            BgpkitBroker::parse_timestamp("20220101").unwrap(),
1743            expected_date
1744        );
1745
1746        // Test timezone formats - these should now work
1747        let result_plus_tz = BgpkitBroker::parse_timestamp("2022-01-01T00:00:00+00:00").unwrap();
1748        assert_eq!(result_plus_tz, expected_date);
1749        println!("✓ +00:00 timezone format works");
1750
1751        // Test timezone conversion: 2022-01-01T05:00:00-05:00 = 2022-01-01T10:00:00Z
1752        let result_minus_tz = BgpkitBroker::parse_timestamp("2022-01-01T05:00:00-05:00").unwrap();
1753        let expected_10am = Utc.with_ymd_and_hms(2022, 1, 1, 10, 0, 0).unwrap();
1754        assert_eq!(result_minus_tz, expected_10am);
1755        println!("✓ -05:00 timezone format works (05:00-05:00 = 10:00Z)");
1756
1757        // Error cases
1758        assert!(BgpkitBroker::parse_timestamp("invalid").is_err());
1759        assert!(BgpkitBroker::parse_timestamp("2022-13-01").is_err());
1760        assert!(BgpkitBroker::parse_timestamp("2022-01").is_err());
1761    }
1762
1763    #[test]
1764    fn test_collector_id_validation() {
1765        let broker = BgpkitBroker::new();
1766
1767        // Valid single collector - no error at validation time
1768        let broker_valid = broker.clone().collector_id("rrc00");
1769        let result = broker_valid.validate_configuration();
1770        assert!(result.is_ok());
1771
1772        // Valid multiple collectors - no error at validation time
1773        let broker_valid = broker.clone().collector_id("rrc00,route-views2");
1774        let result = broker_valid.validate_configuration();
1775        assert!(result.is_ok());
1776
1777        // Unknown collector should be allowed (permissive behavior)
1778        let broker_unknown = broker.clone().collector_id("brand-new-collector");
1779        let result = broker_unknown.validate_configuration();
1780        assert!(result.is_ok());
1781
1782        // Mixed known and unknown collectors should be allowed
1783        let broker_mixed = broker.clone().collector_id("rrc00,brand-new-collector");
1784        let result = broker_mixed.validate_configuration();
1785        assert!(result.is_ok());
1786
1787        // Empty/whitespace-only should error
1788        let broker_empty = broker.clone().collector_id(", ,  ,");
1789        let result = broker_empty.validate_configuration();
1790        assert!(result.is_err());
1791        assert!(matches!(
1792            result.err(),
1793            Some(BrokerError::ConfigurationError(_))
1794        ));
1795    }
1796
1797    #[test]
1798    fn test_project_validation() {
1799        let broker = BgpkitBroker::new();
1800
1801        // Valid projects - no error at configuration time
1802        let broker_valid = broker.clone().project("riperis");
1803        let result = broker_valid.validate_configuration();
1804        assert!(result.is_ok());
1805
1806        let broker_valid = broker.clone().project("routeviews");
1807        let result = broker_valid.validate_configuration();
1808        assert!(result.is_ok());
1809
1810        // Valid aliases - no error at configuration time
1811        let broker_valid = broker.clone().project("rrc");
1812        let result = broker_valid.validate_configuration();
1813        assert!(result.is_ok());
1814
1815        let broker_valid = broker.clone().project("rv");
1816        let result = broker_valid.validate_configuration();
1817        assert!(result.is_ok());
1818
1819        // Invalid project - error occurs at validation
1820        let broker_invalid = broker.clone().project("invalid-project");
1821        let result = broker_invalid.validate_configuration();
1822        assert!(result.is_err());
1823        assert!(matches!(
1824            result.err(),
1825            Some(BrokerError::ConfigurationError(_))
1826        ));
1827    }
1828
1829    #[test]
1830    fn test_data_type_validation() {
1831        let broker = BgpkitBroker::new();
1832
1833        // Valid data types - no error at configuration time
1834        let broker_valid = broker.clone().data_type("rib");
1835        let result = broker_valid.validate_configuration();
1836        assert!(result.is_ok());
1837
1838        let broker_valid = broker.clone().data_type("updates");
1839        let result = broker_valid.validate_configuration();
1840        assert!(result.is_ok());
1841
1842        // Valid aliases - no error at configuration time
1843        let broker_valid = broker.clone().data_type("ribs");
1844        let result = broker_valid.validate_configuration();
1845        assert!(result.is_ok());
1846
1847        let broker_valid = broker.clone().data_type("update");
1848        let result = broker_valid.validate_configuration();
1849        assert!(result.is_ok());
1850
1851        // Invalid data type - error occurs at validation
1852        let broker_invalid = broker.clone().data_type("invalid-type");
1853        let result = broker_invalid.validate_configuration();
1854        assert!(result.is_err());
1855        assert!(matches!(
1856            result.err(),
1857            Some(BrokerError::ConfigurationError(_))
1858        ));
1859    }
1860
1861    #[test]
1862    fn test_page_validation() {
1863        let broker = BgpkitBroker::new();
1864
1865        // Valid page number - no error at configuration time
1866        let broker_valid = broker.clone().page(1);
1867        let result = broker_valid.validate_configuration();
1868        assert!(result.is_ok());
1869
1870        let broker_valid = broker.clone().page(100);
1871        let result = broker_valid.validate_configuration();
1872        assert!(result.is_ok());
1873
1874        // Invalid page number - error occurs at validation
1875        let broker_invalid = broker.clone().page(0);
1876        let result = broker_invalid.validate_configuration();
1877        assert!(result.is_err());
1878        assert!(matches!(
1879            result.err(),
1880            Some(BrokerError::ConfigurationError(_))
1881        ));
1882    }
1883
1884    #[test]
1885    fn test_page_size_validation() {
1886        let broker = BgpkitBroker::new();
1887
1888        // Valid page sizes - no error at configuration time
1889        let broker_valid = broker.clone().page_size(1);
1890        let result = broker_valid.validate_configuration();
1891        assert!(result.is_ok());
1892
1893        let broker_valid = broker.clone().page_size(100);
1894        let result = broker_valid.validate_configuration();
1895        assert!(result.is_ok());
1896
1897        let broker_valid = broker.clone().page_size(100000);
1898        let result = broker_valid.validate_configuration();
1899        assert!(result.is_ok());
1900
1901        // Invalid page sizes - error occurs at validation
1902        let broker_invalid = broker.clone().page_size(0);
1903        let result = broker_invalid.validate_configuration();
1904        assert!(result.is_err());
1905        assert!(matches!(
1906            result.err(),
1907            Some(BrokerError::ConfigurationError(_))
1908        ));
1909
1910        let broker_invalid = broker.clone().page_size(100001);
1911        let result = broker_invalid.validate_configuration();
1912        assert!(result.is_err());
1913        assert!(matches!(
1914            result.err(),
1915            Some(BrokerError::ConfigurationError(_))
1916        ));
1917    }
1918
1919    #[test]
1920    fn test_method_chaining() {
1921        let broker = BgpkitBroker::new()
1922            .ts_start("1634693400")
1923            .ts_end("1634693400")
1924            .collector_id("rrc00")
1925            .project("riperis")
1926            .data_type("rib")
1927            .page(1)
1928            .page_size(10);
1929
1930        // Raw input is stored during configuration
1931        assert_eq!(broker.query_params.ts_start, Some("1634693400".to_string()));
1932        assert_eq!(broker.query_params.ts_end, Some("1634693400".to_string()));
1933        assert_eq!(broker.query_params.collector_id, Some("rrc00".to_string()));
1934        assert_eq!(broker.query_params.project, Some("riperis".to_string()));
1935        assert_eq!(broker.query_params.data_type, Some("rib".to_string()));
1936        assert_eq!(broker.query_params.page, 1);
1937        assert_eq!(broker.query_params.page_size, 10);
1938    }
1939}