ribeye/
lib.rs

1//! # RibEye
2//!
3//! RibEye is a framework library for processing RIB dumps using BGPKIT Parser.
4//!
5//! The key concept of ribeye is the [MessageProcessor] trait, which defines the
6//! interface for processing RIB data.
7
8#![doc(
9    html_logo_url = "https://raw.githubusercontent.com/bgpkit/assets/main/logos/icon-transparent.png",
10    html_favicon_url = "https://raw.githubusercontent.com/bgpkit/assets/main/logos/favicon.ico"
11)]
12
13pub use crate::processors::{MessageProcessor, RibMeta};
14use anyhow::Result;
15use tracing::info;
16
17#[cfg(feature = "processors")]
18pub mod processors;
19
20#[derive(Default)]
21pub struct RibEye {
22    processors: Vec<Box<dyn MessageProcessor>>,
23}
24
25impl RibEye {
26    pub fn new() -> Self {
27        Self::default()
28    }
29
30    /// Add default processors to the pipeline
31    ///
32    /// The default processors are:
33    /// - PeerStatsProcessor
34    /// - Prefix2AsProcessor
35    /// - As2relProcessor
36    pub fn with_default_processors(mut self, output_dir: &str) -> Self {
37        self.add_processor(Box::new(processors::PeerStatsProcessor::new(output_dir)));
38        self.add_processor(Box::new(processors::Prefix2AsProcessor::new(output_dir)));
39        self.add_processor(Box::new(processors::As2relProcessor::new(output_dir)));
40        self
41    }
42
43    pub fn with_rib_meta(mut self, rib_meta: &RibMeta) -> Self {
44        for processor in &mut self.processors {
45            processor.reset_processor(rib_meta);
46        }
47        self
48    }
49
50    /// Add a processor to the pipeline
51    pub fn add_processor(&mut self, processor: Box<dyn MessageProcessor>) {
52        self.processors.push(processor);
53    }
54
55    pub fn initialize_processors(&mut self, rib_meta: &RibMeta) -> Result<()> {
56        for processor in &mut self.processors {
57            processor.reset_processor(rib_meta);
58        }
59        Ok(())
60    }
61
62    /// Process each entry in
63    pub fn process_mrt_file(&mut self, file_path: &str) -> Result<()> {
64        if self.processors.is_empty() {
65            info!("no processors added, skip processing: {}", file_path);
66            return Ok(());
67        }
68
69        info!("processing RIB file: {}", file_path);
70
71        let parser = bgpkit_parser::BgpkitParser::new(file_path)?;
72        for msg in parser {
73            for processor in &mut self.processors {
74                processor.process_entry(&msg)?;
75            }
76        }
77
78        for processor in &mut self.processors {
79            processor.output()?;
80        }
81        Ok(())
82    }
83
84    pub fn summarize_latest_files(&mut self, rib_metas: &[RibMeta]) -> Result<()> {
85        for processor in &mut self.processors {
86            info!(
87                "summarizing latest files for processor: {}",
88                processor.name()
89            );
90            if let Err(e) = processor.summarize_latest(rib_metas, true) {
91                info!("failed to summarize latest files: {}", e);
92            }
93        }
94        Ok(())
95    }
96}