bgpkit_broker/
lib.rs

1/*!
2# Overview
3
4[bgpkit-broker][crate] is a package that allows accessing the BGPKIT Broker API and search for BGP archive
5files with different search parameters available.
6
7# Examples
8
9## Basic Usage with Iterator
10
11The recommended usage to collect [BrokerItem]s is to use the built-in iterator. The
12[BrokerItemIterator] handles making API queries so that it can continuously stream new items until
13it reaches the end of items. This is useful for simply getting **all** matching items without need
14to worry about pagination.
15
16```no_run
17use bgpkit_broker::{BgpkitBroker, BrokerItem};
18
19let broker = BgpkitBroker::new()
20    .ts_start("2022-01-01")
21    .ts_end("2022-01-02")
22    .collector_id("route-views2");
23
24// Iterate by reference (reusable broker)
25for item in &broker {
26    println!("BGP file: {} from {} ({})",
27             item.url, item.collector_id, item.data_type);
28}
29
30// Or collect into vector
31let items: Vec<BrokerItem> = broker.into_iter().collect();
32println!("Found {} BGP archive files", items.len());
33```
34
35## Practical BGP Data Analysis with Shortcuts
36
37The SDK provides convenient shortcuts for common BGP data analysis patterns:
38
39### Daily RIB Analysis Across Diverse Collectors
40
41```no_run
42use bgpkit_broker::BgpkitBroker;
43
44// Find the most diverse collectors for comprehensive analysis
45let broker = BgpkitBroker::new()
46    .ts_start("2024-01-01")
47    .ts_end("2024-01-31");
48
49let diverse_collectors = broker.most_diverse_collectors(5, None).unwrap();
50println!("Selected {} diverse collectors: {:?}",
51         diverse_collectors.len(), diverse_collectors);
52
53// Get daily RIB snapshots from these collectors
54let daily_ribs = broker
55    .clone()
56    .collector_id(&diverse_collectors.join(","))
57    .daily_ribs().unwrap();
58
59println!("Found {} daily RIB snapshots for analysis", daily_ribs.len());
60for rib in daily_ribs.iter().take(3) {
61    println!("Daily snapshot: {} from {} at {}",
62             rib.collector_id,
63             rib.ts_start.format("%Y-%m-%d"),
64             rib.url);
65}
66```
67
68### Recent BGP Updates Monitoring
69
70```no_run
71use bgpkit_broker::BgpkitBroker;
72
73// Monitor recent BGP updates from multiple collectors
74let recent_updates = BgpkitBroker::new()
75    .collector_id("route-views2,rrc00,route-views6")
76    .recent_updates(6).unwrap(); // last 6 hours
77
78println!("Found {} recent BGP update files", recent_updates.len());
79for update in recent_updates.iter().take(5) {
80    println!("Update: {} from {} at {}",
81             update.collector_id,
82             update.ts_start.format("%Y-%m-%d %H:%M:%S"),
83             update.url);
84}
85```
86
87### Project-specific Analysis
88
89```no_run
90use bgpkit_broker::BgpkitBroker;
91
92// Compare RouteViews vs RIPE RIS daily snapshots
93let routeviews_ribs = BgpkitBroker::new()
94    .ts_start("2024-01-01")
95    .ts_end("2024-01-07")
96    .project("routeviews")
97    .daily_ribs().unwrap();
98
99let ripe_ribs = BgpkitBroker::new()
100    .ts_start("2024-01-01")
101    .ts_end("2024-01-07")
102    .project("riperis")
103    .daily_ribs().unwrap();
104
105println!("RouteViews daily RIBs: {}", routeviews_ribs.len());
106println!("RIPE RIS daily RIBs: {}", ripe_ribs.len());
107```
108
109### Advanced Collector Selection
110
111```no_run
112use bgpkit_broker::BgpkitBroker;
113
114let broker = BgpkitBroker::new();
115
116// Get diverse RouteViews collectors for focused analysis
117let rv_collectors = broker.most_diverse_collectors(3, Some("routeviews")).unwrap();
118println!("Diverse RouteViews collectors: {:?}", rv_collectors);
119
120// Use them to get comprehensive recent updates
121let comprehensive_updates = broker
122    .clone()
123    .collector_id(&rv_collectors.join(","))
124    .recent_updates(12).unwrap(); // last 12 hours
125
126println!("Got {} updates from {} collectors",
127         comprehensive_updates.len(), rv_collectors.len());
128```
129
130### Routing Table Snapshot Reconstruction
131
132```no_run
133use bgpkit_broker::BgpkitBroker;
134
135// Get the MRT files needed to construct a routing table snapshot
136let broker = BgpkitBroker::new();
137let snapshots = broker.get_snapshot_files(
138    &["route-views2", "rrc00"],
139    "2024-01-01T12:00:00Z"
140).unwrap();
141
142for snapshot in snapshots {
143    println!("Collector: {}", snapshot.collector_id);
144    println!("RIB dump: {}", snapshot.rib_url);
145    println!("Updates to apply: {}", snapshot.updates_urls.len());
146
147    // Use with bgpkit-parser to reconstruct routing table:
148    // 1. Parse RIB dump for initial state
149    // 2. Apply updates in order to reach target timestamp
150}
151```
152
153## Manual Page Queries
154
155For fine-grained control over pagination or custom iteration patterns:
156
157```rust,no_run
158use bgpkit_broker::BgpkitBroker;
159
160let mut broker = BgpkitBroker::new()
161    .ts_start("2022-01-01")
162    .ts_end("2022-01-02")
163    .page(1)
164    .page_size(50);
165
166// Query specific page
167let page1_items = broker.query_single_page().unwrap();
168println!("Page 1: {} items", page1_items.len());
169
170// Move to next page
171broker.turn_page(2);
172let page2_items = broker.query_single_page().unwrap();
173println!("Page 2: {} items", page2_items.len());
174```
175
176## Getting Latest Files and Peer Information
177
178Access the most recent data and peer information:
179
180```rust,no_run
181use bgpkit_broker::BgpkitBroker;
182
183// Get latest files from all collectors
184let broker = BgpkitBroker::new();
185let latest_files = broker.latest().unwrap();
186println!("Latest files from {} collectors", latest_files.len());
187
188// Get full-feed peers from specific collector
189let peers = BgpkitBroker::new()
190    .collector_id("route-views2")
191    .peers_only_full_feed(true)
192    .get_peers().unwrap();
193
194println!("Found {} full-feed peers", peers.len());
195for peer in peers.iter().take(3) {
196    println!("Peer: AS{} ({}) - v4: {}, v6: {}",
197             peer.asn, peer.ip, peer.num_v4_pfxs, peer.num_v6_pfxs);
198}
199```
200*/
201
202#![doc(
203    html_logo_url = "https://raw.githubusercontent.com/bgpkit/assets/main/logos/icon-transparent.png",
204    html_favicon_url = "https://raw.githubusercontent.com/bgpkit/assets/main/logos/favicon.ico"
205)]
206#![allow(unknown_lints)]
207
208mod collector;
209#[cfg(feature = "cli")]
210pub mod config;
211#[cfg(feature = "cli")]
212mod crawler;
213#[cfg(feature = "backend")]
214pub mod db;
215mod error;
216mod item;
217#[cfg(feature = "nats")]
218pub mod notifier;
219mod peer;
220mod query;
221mod shortcuts;
222
223use crate::collector::DEFAULT_COLLECTORS_CONFIG;
224use crate::peer::BrokerPeersResult;
225use crate::query::{BrokerQueryResult, CollectorLatestResult};
226use chrono::{DateTime, NaiveDate, TimeZone, Utc};
227pub use collector::{load_collectors, Collector};
228
229#[cfg(feature = "cli")]
230pub use config::BrokerConfig;
231#[cfg(feature = "cli")]
232pub use crawler::crawl_collector;
233#[cfg(feature = "backend")]
234pub use db::{LocalBrokerDb, UpdatesMeta, DEFAULT_PAGE_SIZE};
235pub use error::BrokerError;
236pub use item::BrokerItem;
237pub use peer::BrokerPeer;
238pub use query::{QueryParams, SortOrder};
239pub use shortcuts::SnapshotFiles;
240use std::collections::{HashMap, HashSet};
241use std::fmt::Display;
242use std::net::IpAddr;
243
244/// BgpkitBroker struct maintains the broker's URL and handles making API queries.
245///
246/// See [module doc][crate#examples] for usage examples.
247#[derive(Clone)]
248pub struct BgpkitBroker {
249    pub broker_url: String,
250    pub query_params: QueryParams,
251    client: reqwest::blocking::Client,
252    collector_project_map: HashMap<String, String>,
253}
254
255impl Default for BgpkitBroker {
256    fn default() -> Self {
257        dotenvy::dotenv().ok();
258        let url = match std::env::var("BGPKIT_BROKER_URL") {
259            Ok(url) => url.trim_end_matches('/').to_string(),
260            Err(_) => "https://api.bgpkit.com/v3/broker".to_string(),
261        };
262
263        let collector_project_map = DEFAULT_COLLECTORS_CONFIG.clone().to_project_map();
264
265        let accept_invalid_certs = match std::env::var("ONEIO_ACCEPT_INVALID_CERTS") {
266            Ok(t) => {
267                let l = t.to_lowercase();
268                l.starts_with("true") || l.starts_with("y")
269            }
270            Err(_) => false,
271        };
272
273        let client = match reqwest::blocking::ClientBuilder::new()
274            .danger_accept_invalid_certs(accept_invalid_certs)
275            .user_agent(concat!("bgpkit-broker/", env!("CARGO_PKG_VERSION")))
276            .build()
277        {
278            Ok(c) => c,
279            Err(e) => {
280                panic!("Failed to build HTTP client for broker requests: {}", e);
281            }
282        };
283
284        Self {
285            broker_url: url,
286            query_params: Default::default(),
287            client,
288            collector_project_map,
289        }
290    }
291}
292
293impl BgpkitBroker {
294    /// Construct a new BgpkitBroker object.
295    ///
296    /// The URL and query parameters can be adjusted with other functions.
297    ///
298    /// Users can opt in to accept invalid SSL certificates by setting the environment variable
299    /// `ONEIO_ACCEPT_INVALID_CERTS` to `true`.
300    ///
301    /// # Examples
302    /// ```
303    /// use bgpkit_broker::BgpkitBroker;
304    /// let broker = BgpkitBroker::new();
305    /// ```
306    pub fn new() -> Self {
307        Self::default()
308    }
309
310    /// Configure broker URL.
311    ///
312    /// You can change the default broker URL to point to your own broker instance.
313    /// You can also change the URL by setting the environment variable `BGPKIT_BROKER_URL`.
314    ///
315    /// # Examples
316    /// ```
317    /// let broker = bgpkit_broker::BgpkitBroker::new()
318    ///     .broker_url("api.broker.example.com/v3");
319    /// ```
320    pub fn broker_url<S: Display>(self, url: S) -> Self {
321        let broker_url = url.to_string().trim_end_matches('/').to_string();
322        Self {
323            broker_url,
324            query_params: self.query_params,
325            client: self.client,
326            collector_project_map: self.collector_project_map,
327        }
328    }
329
330    /// DANGER: Accept invalid SSL certificates.
331    pub fn accept_invalid_certs(self) -> Self {
332        #[allow(clippy::unwrap_used)]
333        Self {
334            broker_url: self.broker_url,
335            query_params: self.query_params,
336            client: reqwest::blocking::ClientBuilder::new()
337                .danger_accept_invalid_certs(true)
338                .build()
339                .unwrap(),
340            collector_project_map: self.collector_project_map,
341        }
342    }
343
344    /// Disable SSL certificate check.
345    #[deprecated(since = "0.7.1", note = "Please use `accept_invalid_certs` instead.")]
346    pub fn disable_ssl_check(self) -> Self {
347        Self::accept_invalid_certs(self)
348    }
349
350    /// Parse and validate timestamp string with support for multiple formats.
351    ///
352    /// Supported formats:
353    /// - Unix timestamp: "1640995200"
354    /// - RFC3339/ISO8601: "2022-01-01T00:00:00Z", "2022-01-01T12:30:45Z"
355    /// - RFC3339 without Z: "2022-01-01T00:00:00", "2022-01-01T12:30:45"
356    /// - Date with time: "2022-01-01 00:00:00", "2022-01-01 12:30:45"
357    /// - Pure date (start of day): "2022-01-01", "2022/01/01"
358    /// - Pure date with dots: "2022.01.01"
359    /// - Compact date: "20220101"
360    ///
361    /// For pure date formats, the time component defaults to 00:00:00 (start of day).
362    /// Returns a `DateTime<Utc>` for consistent handling and formatting.
363    fn parse_timestamp(timestamp: &str) -> Result<DateTime<Utc>, BrokerError> {
364        let ts_str = timestamp.trim();
365
366        // Try parsing as RFC3339 with timezone (including +00:00, -05:00, Z, etc.)
367        if let Ok(dt_with_tz) = DateTime::parse_from_rfc3339(ts_str) {
368            return Ok(dt_with_tz.with_timezone(&Utc));
369        }
370
371        // Try parsing as RFC3339/ISO8601 with Z
372        if let Ok(naive_dt) = chrono::NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%dT%H:%M:%SZ") {
373            return Ok(Utc.from_utc_datetime(&naive_dt));
374        }
375
376        // Try parsing as RFC3339 without Z (assume UTC)
377        if let Ok(naive_dt) = chrono::NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%dT%H:%M:%S") {
378            return Ok(Utc.from_utc_datetime(&naive_dt));
379        }
380
381        // Try parsing as "YYYY-MM-DD HH:MM:SS" (assume UTC)
382        if let Ok(naive_dt) = chrono::NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%d %H:%M:%S") {
383            return Ok(Utc.from_utc_datetime(&naive_dt));
384        }
385
386        // Try parsing pure date formats and convert to start of day
387        let date_formats = [
388            "%Y-%m-%d", // 2022-01-01
389            "%Y/%m/%d", // 2022/01/01
390            "%Y.%m.%d", // 2022.01.01
391            "%Y%m%d",   // 20220101 - must be exactly 8 digits
392        ];
393
394        for format in &date_formats {
395            if let Ok(date) = NaiveDate::parse_from_str(ts_str, format) {
396                // Additional validation for compact format to ensure it's actually a date
397                if format == &"%Y%m%d" && ts_str.len() != 8 {
398                    continue;
399                }
400                // Convert to start of day in UTC
401                if let Some(naive_datetime) = date.and_hms_opt(0, 0, 0) {
402                    return Ok(Utc.from_utc_datetime(&naive_datetime));
403                }
404            }
405        }
406
407        // Finally, try parsing as Unix timestamp (only if it's reasonable length and all digits)
408        if ts_str.len() >= 9 && ts_str.len() <= 13 && ts_str.chars().all(|c| c.is_ascii_digit()) {
409            if let Ok(timestamp) = ts_str.parse::<i64>() {
410                if let Some(dt) = Utc.timestamp_opt(timestamp, 0).single() {
411                    return Ok(dt);
412                }
413            }
414        }
415
416        Err(BrokerError::ConfigurationError(format!(
417            "Invalid timestamp format '{ts_str}'. Supported formats:\n\
418                - Unix timestamp: '1640995200'\n\
419                - RFC3339 with timezone: '2022-01-01T00:00:00+00:00', '2022-01-01T00:00:00Z', '2022-01-01T05:00:00-05:00'\n\
420                - RFC3339 without timezone: '2022-01-01T00:00:00' (assumes UTC)\n\
421                - Date with time: '2022-01-01 00:00:00'\n\
422                - Pure date: '2022-01-01', '2022/01/01', '2022.01.01', '20220101'"
423        )))
424    }
425
426    /// Validate all configuration parameters before making API calls.
427    ///
428    /// This performs the same validation that was previously done at configuration time,
429    /// but now happens just before queries are executed. Returns normalized query parameters.
430    fn validate_configuration(&self) -> Result<QueryParams, BrokerError> {
431        // Validate timestamps and normalize them
432        let mut normalized_params = self.query_params.clone();
433
434        if let Some(ts) = &self.query_params.ts_start {
435            let parsed_datetime = Self::parse_timestamp(ts)?;
436            normalized_params.ts_start =
437                Some(parsed_datetime.format("%Y-%m-%dT%H:%M:%SZ").to_string());
438        }
439
440        if let Some(ts) = &self.query_params.ts_end {
441            let parsed_datetime = Self::parse_timestamp(ts)?;
442            normalized_params.ts_end =
443                Some(parsed_datetime.format("%Y-%m-%dT%H:%M:%SZ").to_string());
444        }
445
446        // Permissive collector validation: normalize only, no network I/O
447        if let Some(collector_str) = &self.query_params.collector_id {
448            let collectors: Vec<String> = collector_str
449                .split(',')
450                .map(|s| s.trim())
451                .filter(|s| !s.is_empty())
452                .map(|s| s.to_string())
453                .collect();
454
455            if collectors.is_empty() {
456                return Err(BrokerError::ConfigurationError(
457                    "Collector ID cannot be empty".to_string(),
458                ));
459            }
460
461            // Deduplicate while preserving order
462            let mut seen = HashSet::new();
463            let mut deduped = Vec::with_capacity(collectors.len());
464            for c in collectors {
465                if seen.insert(c.clone()) {
466                    deduped.push(c);
467                }
468            }
469
470            normalized_params.collector_id = Some(deduped.join(","));
471        }
472
473        // Validate project
474        if let Some(project_str) = &self.query_params.project {
475            let project_lower = project_str.to_lowercase();
476            match project_lower.as_str() {
477                "rrc" | "riperis" | "ripe_ris" | "routeviews" | "route_views" | "rv" => {
478                    // Valid project
479                }
480                _ => {
481                    return Err(BrokerError::ConfigurationError(format!(
482                        "Invalid project '{project_str}'. Valid projects are: 'riperis' (aliases: 'rrc', 'ripe_ris') or 'routeviews' (aliases: 'route_views', 'rv')"
483                    )));
484                }
485            }
486        }
487
488        // Validate data type
489        if let Some(data_type_str) = &self.query_params.data_type {
490            let data_type_lower = data_type_str.to_lowercase();
491            match data_type_lower.as_str() {
492                "rib" | "ribs" | "r" | "update" | "updates" => {
493                    // Valid data type
494                }
495                _ => {
496                    return Err(BrokerError::ConfigurationError(format!(
497                        "Invalid data type '{data_type_str}'. Valid data types are: 'rib' (aliases: 'ribs', 'r') or 'updates' (alias: 'update')"
498                    )));
499                }
500            }
501        }
502
503        // Validate page number
504        if self.query_params.page < 1 {
505            return Err(BrokerError::ConfigurationError(format!(
506                "Invalid page number {}. Page number must be >= 1",
507                self.query_params.page
508            )));
509        }
510
511        // Validate page size
512        if !(1..=100000).contains(&self.query_params.page_size) {
513            return Err(BrokerError::ConfigurationError(format!(
514                "Invalid page size {}. Page size must be between 1 and 100000",
515                self.query_params.page_size
516            )));
517        }
518
519        Ok(normalized_params)
520    }
521
522    /// Add a filter of starting timestamp.
523    ///
524    /// Supports multiple timestamp formats including Unix timestamps, RFC3339 dates, and pure dates.
525    /// Validation occurs at query time.
526    ///
527    /// # Examples
528    ///
529    /// Specify a Unix timestamp:
530    /// ```
531    /// let broker = bgpkit_broker::BgpkitBroker::new()
532    ///     .ts_start("1640995200");
533    /// ```
534    ///
535    /// Specify a RFC3339-formatted time string:
536    /// ```
537    /// let broker = bgpkit_broker::BgpkitBroker::new()
538    ///     .ts_start("2022-01-01T00:00:00Z");
539    /// ```
540    ///
541    /// Specify a pure date (defaults to start of day):
542    /// ```
543    /// let broker = bgpkit_broker::BgpkitBroker::new()
544    ///     .ts_start("2022-01-01");
545    /// ```
546    ///
547    /// Other supported formats:
548    /// ```
549    /// let broker = bgpkit_broker::BgpkitBroker::new()
550    ///     .ts_start("2022/01/01")  // slash format
551    ///     .ts_start("2022.01.01")  // dot format
552    ///     .ts_start("20220101");   // compact format
553    /// ```
554    pub fn ts_start<S: Display>(self, ts_start: S) -> Self {
555        let mut query_params = self.query_params;
556        query_params.ts_start = Some(ts_start.to_string());
557        Self {
558            broker_url: self.broker_url,
559            query_params,
560            client: self.client,
561            collector_project_map: self.collector_project_map,
562        }
563    }
564
565    /// Add a filter of ending timestamp.
566    ///
567    /// Supports the same multiple timestamp formats as `ts_start`.
568    /// Validation occurs at query time.
569    ///
570    /// # Examples
571    ///
572    /// Specify a Unix timestamp:
573    /// ```
574    /// let broker = bgpkit_broker::BgpkitBroker::new()
575    ///     .ts_end("1640995200");
576    /// ```
577    ///
578    /// Specify a RFC3339-formatted time string:
579    /// ```
580    /// let broker = bgpkit_broker::BgpkitBroker::new()
581    ///     .ts_end("2022-01-01T00:00:00Z");
582    /// ```
583    ///
584    /// Specify a pure date (defaults to start of day):
585    /// ```
586    /// let broker = bgpkit_broker::BgpkitBroker::new()
587    ///     .ts_end("2022-01-01");
588    /// ```
589    pub fn ts_end<S: Display>(self, ts_end: S) -> Self {
590        let mut query_params = self.query_params;
591        query_params.ts_end = Some(ts_end.to_string());
592        Self {
593            broker_url: self.broker_url,
594            client: self.client,
595            query_params,
596            collector_project_map: self.collector_project_map,
597        }
598    }
599
600    /// Add a filter of collector ID (e.g. `rrc00` or `route-views2`).
601    ///
602    /// See the full list of collectors [here](https://github.com/bgpkit/bgpkit-broker-backend/blob/main/deployment/full-config.json).
603    /// Validation occurs at query time.
604    ///
605    /// # Examples
606    ///
607    /// filter by single collector
608    /// ```
609    /// let broker = bgpkit_broker::BgpkitBroker::new()
610    ///     .collector_id("rrc00");
611    /// ```
612    ///
613    /// filter by multiple collector
614    /// ```
615    /// let broker = bgpkit_broker::BgpkitBroker::new()
616    ///     .collector_id("route-views2,route-views6");
617    /// ```
618    pub fn collector_id<S: Display>(self, collector_id: S) -> Self {
619        let mut query_params = self.query_params;
620        query_params.collector_id = Some(collector_id.to_string());
621        Self {
622            client: self.client,
623            broker_url: self.broker_url,
624            query_params,
625            collector_project_map: self.collector_project_map,
626        }
627    }
628
629    /// Add a filter of project name with validation, i.e. `riperis` or `routeviews`.
630    ///
631    /// # Examples
632    ///
633    /// ```
634    /// let broker = bgpkit_broker::BgpkitBroker::new()
635    ///     .project("riperis");
636    /// ```
637    ///
638    /// ```
639    /// let broker = bgpkit_broker::BgpkitBroker::new()
640    ///     .project("routeviews");
641    /// ```
642    pub fn project<S: Display>(self, project: S) -> Self {
643        let mut query_params = self.query_params;
644        query_params.project = Some(project.to_string());
645        Self {
646            client: self.client,
647            broker_url: self.broker_url,
648            query_params,
649            collector_project_map: self.collector_project_map,
650        }
651    }
652
653    /// Add filter of data type, i.e. `rib` or `updates`.
654    ///
655    /// Validation occurs at query time.
656    ///
657    /// # Examples
658    ///
659    /// ```
660    /// let broker = bgpkit_broker::BgpkitBroker::new()
661    ///     .data_type("rib");
662    /// ```
663    ///
664    /// ```
665    /// let broker = bgpkit_broker::BgpkitBroker::new()
666    ///     .data_type("updates");
667    /// ```
668    pub fn data_type<S: Display>(self, data_type: S) -> Self {
669        let mut query_params = self.query_params;
670        query_params.data_type = Some(data_type.to_string());
671        Self {
672            broker_url: self.broker_url,
673            client: self.client,
674            query_params,
675            collector_project_map: self.collector_project_map,
676        }
677    }
678
679    /// Change the current page number, starting from 1.
680    ///
681    /// Validation occurs at query time.
682    ///
683    /// # Examples
684    ///
685    /// Start iterating with page 2.
686    /// ```
687    /// let broker = bgpkit_broker::BgpkitBroker::new()
688    ///     .page(2);
689    /// ```
690    pub fn page(self, page: i64) -> Self {
691        let mut query_params = self.query_params;
692        query_params.page = page;
693        Self {
694            broker_url: self.broker_url,
695            client: self.client,
696            query_params,
697            collector_project_map: self.collector_project_map,
698        }
699    }
700
701    /// Change current page size, default 100.
702    ///
703    /// Validation occurs at query time.
704    ///
705    /// # Examples
706    ///
707    /// Set page size to 20.
708    /// ```
709    /// let broker = bgpkit_broker::BgpkitBroker::new()
710    ///     .page_size(10);
711    /// ```
712    pub fn page_size(self, page_size: i64) -> Self {
713        let mut query_params = self.query_params;
714        query_params.page_size = page_size;
715        Self {
716            broker_url: self.broker_url,
717            client: self.client,
718            query_params,
719            collector_project_map: self.collector_project_map,
720        }
721    }
722
723    /// Add a filter of peer IP address when listing peers.
724    ///
725    /// # Examples
726    ///
727    /// ```
728    /// let broker = bgpkit_broker::BgpkitBroker::new()
729    ///    .peers_ip("192.168.1.1".parse().unwrap());
730    /// ```
731    pub fn peers_ip(self, peer_ip: IpAddr) -> Self {
732        let mut query_params = self.query_params;
733        query_params.peers_ip = Some(peer_ip);
734        Self {
735            broker_url: self.broker_url,
736            client: self.client,
737            query_params,
738            collector_project_map: self.collector_project_map,
739        }
740    }
741
742    /// Add a filter of peer ASN when listing peers.
743    ///
744    /// # Examples
745    ///
746    /// ```
747    /// let broker = bgpkit_broker::BgpkitBroker::new()
748    ///    .peers_asn(64496);
749    /// ```
750    pub fn peers_asn(self, peer_asn: u32) -> Self {
751        let mut query_params = self.query_params;
752        query_params.peers_asn = Some(peer_asn);
753        Self {
754            broker_url: self.broker_url,
755            client: self.client,
756            query_params,
757            collector_project_map: self.collector_project_map,
758        }
759    }
760
761    /// Add a filter of peer full feed status when listing peers.
762    ///
763    /// # Examples
764    ///
765    /// ```
766    /// let broker = bgpkit_broker::BgpkitBroker::new()
767    ///   .peers_only_full_feed(true);
768    /// ```
769    pub fn peers_only_full_feed(self, peer_full_feed: bool) -> Self {
770        let mut query_params = self.query_params;
771        query_params.peers_only_full_feed = peer_full_feed;
772        Self {
773            broker_url: self.broker_url,
774            client: self.client,
775            query_params,
776            collector_project_map: self.collector_project_map,
777        }
778    }
779
780    /// Turn to specified page, page starting from 1.
781    ///
782    /// This works with [Self::query_single_page] function to manually paginate.
783    ///
784    /// # Examples
785    ///
786    /// Manually get the first two pages of items.
787    /// ```no_run
788    /// let mut broker = bgpkit_broker::BgpkitBroker::new();
789    /// let mut items = vec![];
790    /// items.extend(broker.query_single_page().unwrap());
791    /// broker.turn_page(2);
792    /// items.extend(broker.query_single_page().unwrap());
793    /// ```
794    pub fn turn_page(&mut self, page: i64) {
795        self.query_params.page = page;
796    }
797
798    /// Send API for a single page of items.
799    ///
800    /// # Examples
801    ///
802    /// Manually get the first page of items.
803    /// ```no_run
804    /// let broker = bgpkit_broker::BgpkitBroker::new();
805    /// let items = broker.query_single_page().unwrap();
806    /// ```
807    pub fn query_single_page(&self) -> Result<Vec<BrokerItem>, BrokerError> {
808        let validated_params = self.validate_configuration()?;
809        let url = format!("{}/search{}", &self.broker_url, &validated_params);
810        log::info!("sending broker query to {}", &url);
811        match self.run_files_query(url.as_str()) {
812            Ok(res) => Ok(res.data),
813            Err(e) => Err(e),
814        }
815    }
816
817    /// Query the total count of items matching the current search criteria without fetching the items.
818    ///
819    /// This method is useful when you need to know how many items match your search criteria
820    /// without downloading all the items. It performs the same validation as a regular query
821    /// but only returns the count.
822    ///
823    /// # Returns
824    /// - `Ok(i64)`: The total number of matching items
825    /// - `Err(BrokerError)`: If the query fails or the count is missing from the response
826    ///
827    /// # Examples
828    ///
829    /// ```no_run
830    /// use bgpkit_broker::BgpkitBroker;
831    ///
832    /// let broker = BgpkitBroker::new()
833    ///     .ts_start("2024-01-01")
834    ///     .ts_end("2024-01-02")
835    ///     .collector_id("route-views2");
836    ///
837    /// let count = broker.query_total_count().unwrap();
838    /// println!("Found {} matching items", count);
839    /// ```
840    pub fn query_total_count(&self) -> Result<i64, BrokerError> {
841        let validated_params = self.validate_configuration()?;
842        let url = format!("{}/search{}", &self.broker_url, &validated_params);
843        match self.run_files_query(url.as_str()) {
844            Ok(res) => res.total.ok_or(BrokerError::BrokerError(
845                "count not found in response".to_string(),
846            )),
847            Err(e) => Err(e),
848        }
849    }
850
851    /// Check if the broker instance is healthy.
852    ///
853    /// # Examples
854    ///
855    /// ```no_run
856    /// let broker = bgpkit_broker::BgpkitBroker::new();
857    /// assert!(broker.health_check().is_ok())
858    /// ```
859    pub fn health_check(&self) -> Result<(), BrokerError> {
860        let url = format!("{}/health", &self.broker_url.trim_end_matches('/'));
861        match self.client.get(url.as_str()).send() {
862            Ok(response) => {
863                if response.status() == reqwest::StatusCode::OK {
864                    Ok(())
865                } else {
866                    Err(BrokerError::BrokerError(format!(
867                        "endpoint unhealthy {}",
868                        self.broker_url
869                    )))
870                }
871            }
872            Err(_e) => Err(BrokerError::BrokerError(format!(
873                "endpoint unhealthy {}",
874                self.broker_url
875            ))),
876        }
877    }
878
879    /// Send a query to get **all** data times returned.
880    ///
881    /// This usually is what one needs.
882    ///
883    /// # Examples
884    ///
885    /// Get all RIB files on 2022-01-01 from route-views2.
886    /// ```no_run
887    /// let broker = bgpkit_broker::BgpkitBroker::new()
888    ///     .ts_start("2022-01-01T00:00:00Z")
889    ///     .ts_end("2022-01-01T23:59:00Z")
890    ///     .data_type("rib")
891    ///     .collector_id("route-views2");
892    /// let items = broker.query().unwrap();
893    ///
894    /// // 1 RIB dump very 2 hours, total of 12 files for 1 day
895    /// assert_eq!(items.len(), 12);
896    /// ```
897    pub fn query(&self) -> Result<Vec<BrokerItem>, BrokerError> {
898        let mut p = self.validate_configuration()?;
899
900        let mut items = vec![];
901        loop {
902            let url = format!("{}/search{}", &self.broker_url, &p);
903
904            let res_items = self.run_files_query(url.as_str())?.data;
905
906            let items_count = res_items.len() as i64;
907
908            if items_count == 0 {
909                // reaches the end
910                break;
911            }
912
913            items.extend(res_items);
914            let cur_page = p.page;
915            p = p.page(cur_page + 1);
916
917            if items_count < p.page_size {
918                // reaches the end
919                break;
920            }
921        }
922        Ok(items)
923    }
924
925    /// Send a query to get the **latest** data for each collector.
926    ///
927    /// The returning result is structured as a vector of [CollectorLatestItem] objects.
928    ///
929    /// # Examples
930    ///
931    /// ```no_run
932    /// let broker = bgpkit_broker::BgpkitBroker::new();
933    /// let latest_items = broker.latest().unwrap();
934    /// for item in &latest_items {
935    ///     println!("{}", item);
936    /// }
937    /// ```
938    pub fn latest(&self) -> Result<Vec<BrokerItem>, BrokerError> {
939        let latest_query_url = format!("{}/latest", self.broker_url);
940        let mut items = match self.client.get(latest_query_url.as_str()).send() {
941            Ok(response) => match response.json::<CollectorLatestResult>() {
942                Ok(result) => result.data,
943                Err(_) => {
944                    return Err(BrokerError::BrokerError(
945                        "Error parsing response".to_string(),
946                    ));
947                }
948            },
949            Err(e) => {
950                return Err(BrokerError::BrokerError(format!(
951                    "Unable to connect to the URL ({latest_query_url}): {e}"
952                )));
953            }
954        };
955
956        items.retain(|item| {
957            let mut matches = true;
958            if let Some(project) = &self.query_params.project {
959                match project.to_lowercase().as_str() {
960                    "rrc" | "riperis" | "ripe_ris" => {
961                        matches = self
962                            .collector_project_map
963                            .get(&item.collector_id)
964                            .cloned()
965                            .unwrap_or_default()
966                            .as_str()
967                            == "riperis";
968                    }
969                    "routeviews" | "route_views" | "rv" => {
970                        matches = self
971                            .collector_project_map
972                            .get(&item.collector_id)
973                            .cloned()
974                            .unwrap_or_default()
975                            .as_str()
976                            == "routeviews";
977                    }
978                    _ => {}
979                }
980            }
981
982            if let Some(data_type) = &self.query_params.data_type {
983                match data_type.to_lowercase().as_str() {
984                    "rib" | "ribs" | "r" => {
985                        if !item.is_rib() {
986                            // if not RIB file, not match
987                            matches = false
988                        }
989                    }
990                    "update" | "updates" => {
991                        if item.is_rib() {
992                            // if is RIB file, not match
993                            matches = false
994                        }
995                    }
996                    _ => {}
997                }
998            }
999
1000            if let Some(collector_id) = &self.query_params.collector_id {
1001                let wanted: HashSet<&str> = collector_id
1002                    .split(',')
1003                    .map(|s| s.trim())
1004                    .filter(|s| !s.is_empty())
1005                    .collect();
1006
1007                if !wanted.contains(item.collector_id.as_str()) {
1008                    return false;
1009                }
1010            }
1011
1012            matches
1013        });
1014
1015        Ok(items)
1016    }
1017
1018    /// Get the most recent information for collector peers.
1019    ///
1020    /// The returning result is structured as a vector of [BrokerPeer] objects.
1021    ///
1022    /// # Examples
1023    ///
1024    /// ## Get all peers
1025    ///
1026    /// ```no_run
1027    /// let broker = bgpkit_broker::BgpkitBroker::new();
1028    /// let peers = broker.get_peers().unwrap();
1029    /// for peer in &peers {
1030    ///     println!("{:?}", peer);
1031    /// }
1032    /// ```
1033    ///
1034    /// ## Get peers from a specific collector
1035    ///
1036    /// ```no_run
1037    /// let broker = bgpkit_broker::BgpkitBroker::new()
1038    ///    .collector_id("route-views2");
1039    /// let peers = broker.get_peers().unwrap();
1040    /// for peer in &peers {
1041    ///    println!("{:?}", peer);
1042    /// }
1043    /// ```
1044    ///
1045    /// ## Get peers from a specific ASN
1046    ///
1047    /// ```no_run
1048    /// let broker = bgpkit_broker::BgpkitBroker::new()
1049    ///   .peers_asn(64496);
1050    /// let peers = broker.get_peers().unwrap();
1051    /// for peer in &peers {
1052    ///    println!("{:?}", peer);
1053    /// }
1054    /// ```
1055    ///
1056    /// ## Get peers from a specific IP address
1057    ///
1058    /// ```no_run
1059    /// let broker = bgpkit_broker::BgpkitBroker::new()
1060    ///   .peers_ip("192.168.1.1".parse().unwrap());
1061    /// let peers = broker.get_peers().unwrap();
1062    /// for peer in &peers {
1063    ///   println!("{:?}", peer);
1064    /// }
1065    /// ```
1066    ///
1067    /// ## Get peers with full feed
1068    ///
1069    /// ```no_run
1070    /// let broker = bgpkit_broker::BgpkitBroker::new()
1071    ///  .peers_only_full_feed(true);
1072    /// let peers = broker.get_peers().unwrap();
1073    /// for peer in &peers {
1074    ///     println!("{:?}", peer);
1075    /// }
1076    /// ```
1077    ///
1078    /// ## Get peers from a specific collector with full feed
1079    ///
1080    /// ```no_run
1081    /// let broker = bgpkit_broker::BgpkitBroker::new()
1082    ///  .collector_id("route-views2")
1083    /// .peers_only_full_feed(true);
1084    /// let peers = broker.get_peers().unwrap();
1085    /// for peer in &peers {
1086    ///    println!("{:?}", peer);
1087    /// }
1088    /// ```
1089    pub fn get_peers(&self) -> Result<Vec<BrokerPeer>, BrokerError> {
1090        let mut url = format!("{}/peers", self.broker_url);
1091        let mut param_strings = vec![];
1092        if let Some(ip) = &self.query_params.peers_ip {
1093            param_strings.push(format!("ip={ip}"));
1094        }
1095        if let Some(asn) = &self.query_params.peers_asn {
1096            param_strings.push(format!("asn={asn}"));
1097        }
1098        if self.query_params.peers_only_full_feed {
1099            param_strings.push("full_feed=true".to_string());
1100        }
1101        if let Some(collector_id) = &self.query_params.collector_id {
1102            param_strings.push(format!("collector={collector_id}"));
1103        }
1104        if !param_strings.is_empty() {
1105            let param_string = param_strings.join("&");
1106            url = format!("{url}?{param_string}");
1107        }
1108
1109        let peers = match self.client.get(url.as_str()).send() {
1110            Ok(response) => match response.json::<BrokerPeersResult>() {
1111                Ok(result) => result.data,
1112                Err(_) => {
1113                    return Err(BrokerError::BrokerError(
1114                        "Error parsing response".to_string(),
1115                    ));
1116                }
1117            },
1118            Err(e) => {
1119                return Err(BrokerError::BrokerError(format!(
1120                    "Unable to connect to the URL ({url}): {e}"
1121                )));
1122            }
1123        };
1124        Ok(peers)
1125    }
1126
1127    fn run_files_query(&self, url: &str) -> Result<BrokerQueryResult, BrokerError> {
1128        log::info!("sending broker query to {}", &url);
1129        match self.client.get(url).send() {
1130            Ok(res) => match res.json::<BrokerQueryResult>() {
1131                Ok(res) => {
1132                    if let Some(e) = res.error {
1133                        Err(BrokerError::BrokerError(e))
1134                    } else {
1135                        Ok(res)
1136                    }
1137                }
1138                Err(e) => {
1139                    // json decoding error. most likely the service returns an error message without
1140                    // `data` field.
1141                    Err(BrokerError::BrokerError(e.to_string()))
1142                }
1143            },
1144            Err(e) => Err(BrokerError::from(e)),
1145        }
1146    }
1147}
1148
1149/// Iterator for BGPKIT Broker that iterates through one [BrokerItem] at a time.
1150///
1151/// The [IntoIterator] trait is implemented for both the struct and the reference, so that you can
1152/// either iterate through items by taking the ownership of the broker, or use the reference to broker
1153/// to iterate.
1154///
1155/// ```no_run
1156/// use bgpkit_broker::{BgpkitBroker, BrokerItem};
1157///
1158/// let mut broker = BgpkitBroker::new()
1159///     .ts_start("1634693400")
1160///     .ts_end("1634693400")
1161///     .page_size(10)
1162///     .page(2);
1163///
1164/// // create iterator from reference (so that you can reuse the broker object)
1165/// // same as `&broker.into_intr()`
1166/// for item in &broker {
1167///     println!("{}", item);
1168/// }
1169///
1170/// // create iterator from the broker object (taking ownership)
1171/// let items = broker.into_iter().collect::<Vec<BrokerItem>>();
1172///
1173/// assert_eq!(items.len(), 43);
1174/// ```
1175pub struct BrokerItemIterator {
1176    broker: BgpkitBroker,
1177    cached_items: Vec<BrokerItem>,
1178    first_run: bool,
1179}
1180
1181impl BrokerItemIterator {
1182    pub fn new(broker: BgpkitBroker) -> BrokerItemIterator {
1183        BrokerItemIterator {
1184            broker,
1185            cached_items: vec![],
1186            first_run: true,
1187        }
1188    }
1189}
1190
1191impl Iterator for BrokerItemIterator {
1192    type Item = BrokerItem;
1193
1194    fn next(&mut self) -> Option<Self::Item> {
1195        // if we have cached items, simply pop and return
1196        if let Some(item) = self.cached_items.pop() {
1197            return Some(item);
1198        }
1199
1200        // no more cached items, refill cache by one more broker query
1201        if self.first_run {
1202            // if it's the first time running, do not change page, and switch the flag.
1203            self.first_run = false;
1204        } else {
1205            // if it's not the first time running, add page number by one.
1206            self.broker.query_params.page += 1;
1207        }
1208
1209        // query the current page
1210        let items = match self.broker.query_single_page() {
1211            Ok(i) => i,
1212            Err(_) => return None,
1213        };
1214
1215        if items.is_empty() {
1216            // break out the iteration
1217            return None;
1218        } else {
1219            // fill the cache
1220            self.cached_items = items;
1221            self.cached_items.reverse();
1222        }
1223
1224        #[allow(clippy::unwrap_used)]
1225        Some(self.cached_items.pop().unwrap())
1226    }
1227}
1228
1229impl IntoIterator for BgpkitBroker {
1230    type Item = BrokerItem;
1231    type IntoIter = BrokerItemIterator;
1232
1233    fn into_iter(self) -> Self::IntoIter {
1234        BrokerItemIterator::new(self)
1235    }
1236}
1237
1238impl IntoIterator for &BgpkitBroker {
1239    type Item = BrokerItem;
1240    type IntoIter = BrokerItemIterator;
1241
1242    fn into_iter(self) -> Self::IntoIter {
1243        BrokerItemIterator::new(self.clone())
1244    }
1245}
1246
1247#[cfg(test)]
1248mod tests {
1249    use super::*;
1250
1251    #[test]
1252    fn test_query() {
1253        let broker = BgpkitBroker::new()
1254            .ts_start("1634693400")
1255            .ts_end("1634693400");
1256        let res = broker.query();
1257        assert!(&res.is_ok());
1258        let data = res.unwrap();
1259        assert!(!data.is_empty());
1260    }
1261
1262    #[test]
1263    fn test_network_error() {
1264        let broker = BgpkitBroker::new().broker_url("https://api.broker.example.com/v2");
1265        let res = broker.query();
1266        // when testing a must-fail query, you could use `matches!` macro to do so
1267        assert!(res.is_err());
1268        assert!(matches!(res.err(), Some(BrokerError::NetworkError(_))));
1269    }
1270
1271    #[test]
1272    fn test_broker_error() {
1273        let broker = BgpkitBroker::new().page(-1);
1274        let result = broker.query();
1275        assert!(result.is_err());
1276        assert!(matches!(
1277            result.err(),
1278            Some(BrokerError::ConfigurationError(_))
1279        ));
1280    }
1281
1282    #[test]
1283    fn test_query_all() {
1284        let broker = BgpkitBroker::new()
1285            .ts_start("1634693400")
1286            .ts_end("1634693400")
1287            .page_size(100);
1288        let res = broker.query();
1289        assert!(res.is_ok());
1290        assert!(res.ok().unwrap().len() >= 54);
1291    }
1292
1293    #[test]
1294    fn test_iterator() {
1295        let broker = BgpkitBroker::new()
1296            .ts_start("1634693400")
1297            .ts_end("1634693400");
1298        assert!(broker.into_iter().count() >= 54);
1299    }
1300
1301    #[test]
1302    fn test_filters() {
1303        let broker = BgpkitBroker::new()
1304            .ts_start("1634693400")
1305            .ts_end("1634693400");
1306        let items = broker.query().unwrap();
1307        assert!(items.len() >= 54);
1308
1309        let broker = BgpkitBroker::new()
1310            .ts_start("1634693400")
1311            .ts_end("1634693400")
1312            .collector_id("rrc00");
1313        let items = broker.query().unwrap();
1314        assert_eq!(items.len(), 1);
1315
1316        let broker = BgpkitBroker::new()
1317            .ts_start("1634693400")
1318            .ts_end("1634693400")
1319            .project("riperis");
1320        let items = broker.query().unwrap();
1321        assert_eq!(items.len(), 23);
1322    }
1323
1324    #[test]
1325    fn test_latest() {
1326        let broker = BgpkitBroker::new();
1327        let items = broker.latest().unwrap();
1328        assert!(items.len() >= 125);
1329
1330        let broker = BgpkitBroker::new().project("routeviews".to_string());
1331        let items = broker.latest().unwrap();
1332        assert!(!items.is_empty());
1333        assert!(items
1334            .iter()
1335            .all(|item| !item.collector_id.starts_with("rrc")));
1336
1337        let broker = BgpkitBroker::new().project("riperis".to_string());
1338        let items = broker.latest().unwrap();
1339        assert!(!items.is_empty());
1340        assert!(items
1341            .iter()
1342            .all(|item| item.collector_id.starts_with("rrc")));
1343
1344        let broker = BgpkitBroker::new().data_type("rib".to_string());
1345        let items = broker.latest().unwrap();
1346        assert!(!items.is_empty());
1347        assert!(items.iter().all(|item| item.is_rib()));
1348
1349        let broker = BgpkitBroker::new().data_type("update".to_string());
1350        let items = broker.latest().unwrap();
1351        assert!(!items.is_empty());
1352        assert!(items.iter().all(|item| !item.is_rib()));
1353
1354        let broker = BgpkitBroker::new().collector_id("rrc00".to_string());
1355        let items = broker.latest().unwrap();
1356        assert!(!items.is_empty());
1357        assert!(items
1358            .iter()
1359            .all(|item| item.collector_id.as_str() == "rrc00"));
1360        assert_eq!(items.len(), 2);
1361    }
1362
1363    #[test]
1364    fn test_latest_no_ssl() {
1365        let broker = BgpkitBroker::new().accept_invalid_certs();
1366        let items = broker.latest().unwrap();
1367        assert!(items.len() >= 125);
1368    }
1369
1370    #[test]
1371    fn test_health_check() {
1372        let broker = BgpkitBroker::new();
1373        let res = broker.health_check();
1374        assert!(res.is_ok());
1375    }
1376
1377    #[test]
1378    fn test_peers() {
1379        let broker = BgpkitBroker::new();
1380        let all_peers = broker.get_peers().unwrap();
1381        assert!(!all_peers.is_empty());
1382        let first_peer = all_peers.first().unwrap();
1383        let first_ip = first_peer.ip;
1384        let first_asn = first_peer.asn;
1385
1386        let broker = BgpkitBroker::new().peers_ip(first_ip);
1387        let peers = broker.get_peers().unwrap();
1388        assert!(!peers.is_empty());
1389
1390        let broker = BgpkitBroker::new().peers_asn(first_asn);
1391        let peers = broker.get_peers().unwrap();
1392        assert!(!peers.is_empty());
1393
1394        let broker = BgpkitBroker::new().peers_only_full_feed(true);
1395        let full_feed_peers = broker.get_peers().unwrap();
1396        assert!(!full_feed_peers.is_empty());
1397        assert!(full_feed_peers.len() < all_peers.len());
1398
1399        let broker = BgpkitBroker::new().collector_id("rrc00");
1400        let rrc_peers = broker.get_peers().unwrap();
1401        assert!(!rrc_peers.is_empty());
1402        assert!(rrc_peers.iter().all(|peer| peer.collector == "rrc00"));
1403
1404        let broker = BgpkitBroker::new().collector_id("rrc00,route-views2");
1405        let rrc_rv_peers = broker.get_peers().unwrap();
1406        assert!(!rrc_rv_peers.is_empty());
1407        assert!(rrc_rv_peers
1408            .iter()
1409            .any(|peer| peer.collector == "rrc00" || peer.collector == "route-views2"));
1410
1411        assert!(rrc_rv_peers.len() > rrc_peers.len());
1412    }
1413
1414    #[test]
1415    fn test_timestamp_parsing_unix() {
1416        let broker = BgpkitBroker::new();
1417
1418        // Valid Unix timestamps - configuration succeeds, normalization happens at query time
1419        let result = broker.clone().ts_start("1640995200");
1420        // Raw input is stored during configuration
1421        assert_eq!(result.query_params.ts_start, Some("1640995200".to_string()));
1422
1423        let result = broker.clone().ts_end("1640995200");
1424        assert_eq!(result.query_params.ts_end, Some("1640995200".to_string()));
1425    }
1426
1427    #[test]
1428    fn test_timestamp_parsing_rfc3339() {
1429        let broker = BgpkitBroker::new();
1430
1431        // RFC3339 with Z - raw input stored during configuration
1432        let result = broker.clone().ts_start("2022-01-01T00:00:00Z");
1433        assert_eq!(
1434            result.query_params.ts_start,
1435            Some("2022-01-01T00:00:00Z".to_string())
1436        );
1437
1438        // RFC3339 without Z - raw input stored during configuration
1439        let result = broker.clone().ts_start("2022-01-01T12:30:45");
1440        assert_eq!(
1441            result.query_params.ts_start,
1442            Some("2022-01-01T12:30:45".to_string())
1443        );
1444
1445        // Date with time format - raw input stored during configuration
1446        let result = broker.clone().ts_end("2022-01-01 12:30:45");
1447        assert_eq!(
1448            result.query_params.ts_end,
1449            Some("2022-01-01 12:30:45".to_string())
1450        );
1451    }
1452
1453    #[test]
1454    fn test_timestamp_parsing_pure_dates() {
1455        let broker = BgpkitBroker::new();
1456
1457        // Standard date format - raw input stored during configuration
1458        let result = broker.clone().ts_start("2022-01-01");
1459        assert_eq!(result.query_params.ts_start, Some("2022-01-01".to_string()));
1460
1461        // Slash format
1462        let result = broker.clone().ts_start("2022/01/01");
1463        assert_eq!(result.query_params.ts_start, Some("2022/01/01".to_string()));
1464
1465        // Dot format
1466        let result = broker.clone().ts_end("2022.01.01");
1467        assert_eq!(result.query_params.ts_end, Some("2022.01.01".to_string()));
1468
1469        // Compact format
1470        let result = broker.clone().ts_end("20220101");
1471        assert_eq!(result.query_params.ts_end, Some("20220101".to_string()));
1472    }
1473
1474    #[test]
1475    fn test_timestamp_parsing_whitespace() {
1476        let broker = BgpkitBroker::new();
1477
1478        // Test that raw input with whitespace is stored during configuration
1479        let result = broker.clone().ts_start("  2022-01-01  ");
1480        assert_eq!(
1481            result.query_params.ts_start,
1482            Some("  2022-01-01  ".to_string())
1483        );
1484
1485        let result = broker.clone().ts_end("\t1640995200\n");
1486        assert_eq!(
1487            result.query_params.ts_end,
1488            Some("\t1640995200\n".to_string())
1489        );
1490    }
1491
1492    #[test]
1493    fn test_timestamp_parsing_errors() {
1494        let broker = BgpkitBroker::new();
1495
1496        // Invalid format - error occurs at query time
1497        let broker_with_invalid = broker.clone().ts_start("invalid-timestamp");
1498        let result = broker_with_invalid.query();
1499        assert!(result.is_err());
1500        assert!(matches!(
1501            result.err(),
1502            Some(BrokerError::ConfigurationError(_))
1503        ));
1504
1505        // Invalid date - error occurs at query time
1506        let broker_with_invalid = broker.clone().ts_end("2022-13-01");
1507        let result = broker_with_invalid.query();
1508        assert!(result.is_err());
1509        assert!(matches!(
1510            result.err(),
1511            Some(BrokerError::ConfigurationError(_))
1512        ));
1513
1514        // Invalid compact date - error occurs at query time
1515        let broker_with_invalid = broker.clone().ts_start("20221301");
1516        let result = broker_with_invalid.query();
1517        assert!(result.is_err());
1518        assert!(matches!(
1519            result.err(),
1520            Some(BrokerError::ConfigurationError(_))
1521        ));
1522
1523        // Partially valid format - error occurs at query time
1524        let broker_with_invalid = broker.clone().ts_start("2022-01");
1525        let result = broker_with_invalid.query();
1526        assert!(result.is_err());
1527        assert!(matches!(
1528            result.err(),
1529            Some(BrokerError::ConfigurationError(_))
1530        ));
1531    }
1532
1533    #[test]
1534    fn test_parse_timestamp_direct() {
1535        use chrono::{NaiveDate, NaiveDateTime};
1536
1537        // Test the parse_timestamp function directly - it now returns DateTime<Utc>
1538
1539        // Unix timestamp
1540        let expected_unix = Utc.timestamp_opt(1640995200, 0).single().unwrap();
1541        assert_eq!(
1542            BgpkitBroker::parse_timestamp("1640995200").unwrap(),
1543            expected_unix
1544        );
1545
1546        // RFC3339 formats
1547        let expected_rfc3339_z = Utc.from_utc_datetime(
1548            &NaiveDateTime::parse_from_str("2022-01-01T00:00:00", "%Y-%m-%dT%H:%M:%S").unwrap(),
1549        );
1550        assert_eq!(
1551            BgpkitBroker::parse_timestamp("2022-01-01T00:00:00Z").unwrap(),
1552            expected_rfc3339_z
1553        );
1554
1555        let expected_rfc3339_no_z = Utc.from_utc_datetime(
1556            &NaiveDateTime::parse_from_str("2022-01-01T12:30:45", "%Y-%m-%dT%H:%M:%S").unwrap(),
1557        );
1558        assert_eq!(
1559            BgpkitBroker::parse_timestamp("2022-01-01T12:30:45").unwrap(),
1560            expected_rfc3339_no_z
1561        );
1562
1563        let expected_space_format = Utc.from_utc_datetime(
1564            &NaiveDateTime::parse_from_str("2022-01-01 12:30:45", "%Y-%m-%d %H:%M:%S").unwrap(),
1565        );
1566        assert_eq!(
1567            BgpkitBroker::parse_timestamp("2022-01-01 12:30:45").unwrap(),
1568            expected_space_format
1569        );
1570
1571        // Pure date formats (all convert to start of day in UTC)
1572        let expected_date = Utc.from_utc_datetime(
1573            &NaiveDate::from_ymd_opt(2022, 1, 1)
1574                .unwrap()
1575                .and_hms_opt(0, 0, 0)
1576                .unwrap(),
1577        );
1578        assert_eq!(
1579            BgpkitBroker::parse_timestamp("2022-01-01").unwrap(),
1580            expected_date
1581        );
1582        assert_eq!(
1583            BgpkitBroker::parse_timestamp("2022/01/01").unwrap(),
1584            expected_date
1585        );
1586        assert_eq!(
1587            BgpkitBroker::parse_timestamp("2022.01.01").unwrap(),
1588            expected_date
1589        );
1590        assert_eq!(
1591            BgpkitBroker::parse_timestamp("20220101").unwrap(),
1592            expected_date
1593        );
1594
1595        // Test timezone formats - these should now work
1596        let result_plus_tz = BgpkitBroker::parse_timestamp("2022-01-01T00:00:00+00:00").unwrap();
1597        assert_eq!(result_plus_tz, expected_date);
1598        println!("✓ +00:00 timezone format works");
1599
1600        // Test timezone conversion: 2022-01-01T05:00:00-05:00 = 2022-01-01T10:00:00Z
1601        let result_minus_tz = BgpkitBroker::parse_timestamp("2022-01-01T05:00:00-05:00").unwrap();
1602        let expected_10am = Utc.with_ymd_and_hms(2022, 1, 1, 10, 0, 0).unwrap();
1603        assert_eq!(result_minus_tz, expected_10am);
1604        println!("✓ -05:00 timezone format works (05:00-05:00 = 10:00Z)");
1605
1606        // Error cases
1607        assert!(BgpkitBroker::parse_timestamp("invalid").is_err());
1608        assert!(BgpkitBroker::parse_timestamp("2022-13-01").is_err());
1609        assert!(BgpkitBroker::parse_timestamp("2022-01").is_err());
1610    }
1611
1612    #[test]
1613    fn test_collector_id_validation() {
1614        let broker = BgpkitBroker::new();
1615
1616        // Valid single collector - no error at validation time
1617        let broker_valid = broker.clone().collector_id("rrc00");
1618        let result = broker_valid.validate_configuration();
1619        assert!(result.is_ok());
1620
1621        // Valid multiple collectors - no error at validation time
1622        let broker_valid = broker.clone().collector_id("rrc00,route-views2");
1623        let result = broker_valid.validate_configuration();
1624        assert!(result.is_ok());
1625
1626        // Unknown collector should be allowed (permissive behavior)
1627        let broker_unknown = broker.clone().collector_id("brand-new-collector");
1628        let result = broker_unknown.validate_configuration();
1629        assert!(result.is_ok());
1630
1631        // Mixed known and unknown collectors should be allowed
1632        let broker_mixed = broker.clone().collector_id("rrc00,brand-new-collector");
1633        let result = broker_mixed.validate_configuration();
1634        assert!(result.is_ok());
1635
1636        // Empty/whitespace-only should error
1637        let broker_empty = broker.clone().collector_id(", ,  ,");
1638        let result = broker_empty.validate_configuration();
1639        assert!(result.is_err());
1640        assert!(matches!(
1641            result.err(),
1642            Some(BrokerError::ConfigurationError(_))
1643        ));
1644    }
1645
1646    #[test]
1647    fn test_project_validation() {
1648        let broker = BgpkitBroker::new();
1649
1650        // Valid projects - no error at configuration time
1651        let broker_valid = broker.clone().project("riperis");
1652        let result = broker_valid.validate_configuration();
1653        assert!(result.is_ok());
1654
1655        let broker_valid = broker.clone().project("routeviews");
1656        let result = broker_valid.validate_configuration();
1657        assert!(result.is_ok());
1658
1659        // Valid aliases - no error at configuration time
1660        let broker_valid = broker.clone().project("rrc");
1661        let result = broker_valid.validate_configuration();
1662        assert!(result.is_ok());
1663
1664        let broker_valid = broker.clone().project("rv");
1665        let result = broker_valid.validate_configuration();
1666        assert!(result.is_ok());
1667
1668        // Invalid project - error occurs at validation
1669        let broker_invalid = broker.clone().project("invalid-project");
1670        let result = broker_invalid.validate_configuration();
1671        assert!(result.is_err());
1672        assert!(matches!(
1673            result.err(),
1674            Some(BrokerError::ConfigurationError(_))
1675        ));
1676    }
1677
1678    #[test]
1679    fn test_data_type_validation() {
1680        let broker = BgpkitBroker::new();
1681
1682        // Valid data types - no error at configuration time
1683        let broker_valid = broker.clone().data_type("rib");
1684        let result = broker_valid.validate_configuration();
1685        assert!(result.is_ok());
1686
1687        let broker_valid = broker.clone().data_type("updates");
1688        let result = broker_valid.validate_configuration();
1689        assert!(result.is_ok());
1690
1691        // Valid aliases - no error at configuration time
1692        let broker_valid = broker.clone().data_type("ribs");
1693        let result = broker_valid.validate_configuration();
1694        assert!(result.is_ok());
1695
1696        let broker_valid = broker.clone().data_type("update");
1697        let result = broker_valid.validate_configuration();
1698        assert!(result.is_ok());
1699
1700        // Invalid data type - error occurs at validation
1701        let broker_invalid = broker.clone().data_type("invalid-type");
1702        let result = broker_invalid.validate_configuration();
1703        assert!(result.is_err());
1704        assert!(matches!(
1705            result.err(),
1706            Some(BrokerError::ConfigurationError(_))
1707        ));
1708    }
1709
1710    #[test]
1711    fn test_page_validation() {
1712        let broker = BgpkitBroker::new();
1713
1714        // Valid page number - no error at configuration time
1715        let broker_valid = broker.clone().page(1);
1716        let result = broker_valid.validate_configuration();
1717        assert!(result.is_ok());
1718
1719        let broker_valid = broker.clone().page(100);
1720        let result = broker_valid.validate_configuration();
1721        assert!(result.is_ok());
1722
1723        // Invalid page number - error occurs at validation
1724        let broker_invalid = broker.clone().page(0);
1725        let result = broker_invalid.validate_configuration();
1726        assert!(result.is_err());
1727        assert!(matches!(
1728            result.err(),
1729            Some(BrokerError::ConfigurationError(_))
1730        ));
1731    }
1732
1733    #[test]
1734    fn test_page_size_validation() {
1735        let broker = BgpkitBroker::new();
1736
1737        // Valid page sizes - no error at configuration time
1738        let broker_valid = broker.clone().page_size(1);
1739        let result = broker_valid.validate_configuration();
1740        assert!(result.is_ok());
1741
1742        let broker_valid = broker.clone().page_size(100);
1743        let result = broker_valid.validate_configuration();
1744        assert!(result.is_ok());
1745
1746        let broker_valid = broker.clone().page_size(100000);
1747        let result = broker_valid.validate_configuration();
1748        assert!(result.is_ok());
1749
1750        // Invalid page sizes - error occurs at validation
1751        let broker_invalid = broker.clone().page_size(0);
1752        let result = broker_invalid.validate_configuration();
1753        assert!(result.is_err());
1754        assert!(matches!(
1755            result.err(),
1756            Some(BrokerError::ConfigurationError(_))
1757        ));
1758
1759        let broker_invalid = broker.clone().page_size(100001);
1760        let result = broker_invalid.validate_configuration();
1761        assert!(result.is_err());
1762        assert!(matches!(
1763            result.err(),
1764            Some(BrokerError::ConfigurationError(_))
1765        ));
1766    }
1767
1768    #[test]
1769    fn test_method_chaining() {
1770        let broker = BgpkitBroker::new()
1771            .ts_start("1634693400")
1772            .ts_end("1634693400")
1773            .collector_id("rrc00")
1774            .project("riperis")
1775            .data_type("rib")
1776            .page(1)
1777            .page_size(10);
1778
1779        // Raw input is stored during configuration
1780        assert_eq!(broker.query_params.ts_start, Some("1634693400".to_string()));
1781        assert_eq!(broker.query_params.ts_end, Some("1634693400".to_string()));
1782        assert_eq!(broker.query_params.collector_id, Some("rrc00".to_string()));
1783        assert_eq!(broker.query_params.project, Some("riperis".to_string()));
1784        assert_eq!(broker.query_params.data_type, Some("rib".to_string()));
1785        assert_eq!(broker.query_params.page, 1);
1786        assert_eq!(broker.query_params.page_size, 10);
1787    }
1788}