monocle/database/session/
msg_store.rs

1//! Session-based message store for BGP search results
2//!
3//! This module provides storage for BGP messages during search operations.
4//! Unlike the shared database, this is intended for one-time use per search session.
5
6use anyhow::{anyhow, Result};
7use bgpkit_parser::models::ElemType;
8use bgpkit_parser::BgpElem;
9use itertools::Itertools;
10
11use crate::database::core::DatabaseConn;
12
13/// Message store for BGP search results
14///
15/// `MsgStore` provides a session-based SQLite database for storing
16/// BGP elements during search operations. Each search session typically
17/// creates its own database file.
18pub struct MsgStore {
19    db: DatabaseConn,
20}
21
22impl MsgStore {
23    /// Create a new message store
24    ///
25    /// # Arguments
26    /// * `db_path` - Optional path to the database file. If `None`, uses in-memory storage.
27    /// * `reset` - If `true`, drops existing data before initializing.
28    pub fn new(db_path: Option<&str>, reset: bool) -> Result<Self> {
29        let db = DatabaseConn::open(db_path)?;
30        let store = MsgStore { db };
31        store.initialize(reset)?;
32        Ok(store)
33    }
34
35    /// Create a new message store (backward-compatible signature)
36    ///
37    /// This method accepts `&Option<String>` for compatibility with existing code.
38    /// Prefer using `new()` with `Option<&str>` for new code.
39    pub fn new_from_option(db_path: &Option<String>, reset: bool) -> Result<Self> {
40        Self::new(db_path.as_deref(), reset)
41    }
42
43    /// Initialize the message store schema
44    fn initialize(&self, reset: bool) -> Result<()> {
45        if reset {
46            self.db
47                .conn
48                .execute("DROP TABLE IF EXISTS elems", [])
49                .map_err(|e| anyhow!("Failed to drop elems table: {}", e))?;
50        }
51
52        self.db
53            .conn
54            .execute(
55                r#"
56                CREATE TABLE IF NOT EXISTS elems (
57                    timestamp INTEGER,
58                    elem_type TEXT,
59                    collector TEXT,
60                    peer_ip TEXT,
61                    peer_asn INTEGER,
62                    prefix TEXT,
63                    next_hop TEXT,
64                    as_path TEXT,
65                    origin_asns TEXT,
66                    origin TEXT,
67                    local_pref INTEGER,
68                    med INTEGER,
69                    communities TEXT,
70                    atomic TEXT,
71                    aggr_asn INTEGER,
72                    aggr_ip TEXT
73                );
74                "#,
75                [],
76            )
77            .map_err(|e| anyhow!("Failed to create elems table: {}", e))?;
78
79        // Add indexes for common query patterns
80        self.db.conn.execute(
81            "CREATE INDEX IF NOT EXISTS idx_timestamp ON elems(timestamp)",
82            [],
83        )?;
84        self.db.conn.execute(
85            "CREATE INDEX IF NOT EXISTS idx_peer_asn ON elems(peer_asn)",
86            [],
87        )?;
88        self.db
89            .conn
90            .execute("CREATE INDEX IF NOT EXISTS idx_prefix ON elems(prefix)", [])?;
91        self.db.conn.execute(
92            "CREATE INDEX IF NOT EXISTS idx_collector ON elems(collector)",
93            [],
94        )?;
95        self.db.conn.execute(
96            "CREATE INDEX IF NOT EXISTS idx_elem_type ON elems(elem_type)",
97            [],
98        )?;
99
100        Ok(())
101    }
102
103    /// Insert BGP elements into the store
104    ///
105    /// # Arguments
106    /// * `elems` - Slice of (BgpElem, collector_name) tuples
107    pub fn insert_elems(&self, elems: &[(BgpElem, String)]) -> Result<()> {
108        if elems.is_empty() {
109            return Ok(());
110        }
111
112        // Use a transaction for the batch
113        let tx = self
114            .db
115            .conn
116            .unchecked_transaction()
117            .map_err(|e| anyhow!("Failed to begin transaction: {}", e))?;
118
119        {
120            // Use prepared statement for better performance
121            let mut stmt = tx
122                .prepare_cached(
123                    "INSERT INTO elems (timestamp, elem_type, collector, peer_ip, peer_asn,
124                     prefix, next_hop, as_path, origin_asns, origin, local_pref, med,
125                     communities, atomic, aggr_asn, aggr_ip)
126                     VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
127                )
128                .map_err(|e| anyhow!("Failed to prepare statement: {}", e))?;
129
130            for (elem, collector) in elems {
131                let elem_type = match elem.elem_type {
132                    ElemType::ANNOUNCE => "A",
133                    ElemType::WITHDRAW => "W",
134                };
135                let origin_string = elem
136                    .origin_asns
137                    .as_ref()
138                    .and_then(|asns| asns.first())
139                    .map(|asn| asn.to_string());
140                let communities_str = elem.communities.as_ref().map(|v| v.iter().join(" "));
141
142                stmt.execute(rusqlite::params![
143                    elem.timestamp as u32,
144                    elem_type,
145                    collector,
146                    elem.peer_ip.to_string(),
147                    elem.peer_asn.to_u32(),
148                    elem.prefix.to_string(),
149                    elem.next_hop.as_ref().map(|v| v.to_string()),
150                    elem.as_path.as_ref().map(|v| v.to_string()),
151                    origin_string,
152                    elem.origin.as_ref().map(|v| v.to_string()),
153                    elem.local_pref,
154                    elem.med,
155                    communities_str,
156                    if elem.atomic { "AG" } else { "NAG" },
157                    elem.aggr_asn.map(|asn| asn.to_u32()),
158                    elem.aggr_ip.as_ref().map(|v| v.to_string()),
159                ])
160                .map_err(|e| anyhow!("Failed to insert element: {}", e))?;
161            }
162        }
163
164        tx.commit()
165            .map_err(|e| anyhow!("Failed to commit transaction: {}", e))?;
166        Ok(())
167    }
168
169    /// Get the count of stored elements
170    pub fn count(&self) -> Result<u64> {
171        self.db.table_count("elems")
172    }
173
174    /// Get access to the underlying database connection for custom queries
175    pub fn connection(&self) -> &rusqlite::Connection {
176        &self.db.conn
177    }
178}
179
180#[cfg(test)]
181mod tests {
182    use super::*;
183    use bgpkit_parser::models::{AsPath, AsPathSegment, NetworkPrefix, Origin};
184    use std::net::{IpAddr, Ipv4Addr};
185    use std::str::FromStr;
186
187    fn create_test_elem() -> BgpElem {
188        BgpElem {
189            timestamp: 1234567890.0,
190            elem_type: ElemType::ANNOUNCE,
191            peer_ip: IpAddr::V4(Ipv4Addr::new(192, 0, 2, 1)),
192            peer_asn: 65000.into(),
193            prefix: NetworkPrefix::from_str("10.0.0.0/8").unwrap(),
194            next_hop: Some(IpAddr::V4(Ipv4Addr::new(192, 0, 2, 1))),
195            as_path: Some(AsPath {
196                segments: vec![AsPathSegment::AsSequence(vec![65000.into(), 65001.into()])],
197            }),
198            origin_asns: Some(vec![65001.into()]),
199            origin: Some(Origin::IGP),
200            local_pref: Some(100),
201            med: Some(0),
202            communities: None,
203            atomic: false,
204            aggr_asn: None,
205            aggr_ip: None,
206            only_to_customer: None,
207            unknown: None,
208            deprecated: None,
209        }
210    }
211
212    #[test]
213    fn test_create_msg_store() {
214        let store = MsgStore::new(None, false);
215        assert!(store.is_ok());
216    }
217
218    #[test]
219    fn test_insert_and_count() {
220        let store = MsgStore::new(None, false).unwrap();
221
222        let elem = create_test_elem();
223        let elems = vec![(elem, "test_collector".to_string())];
224
225        store.insert_elems(&elems).unwrap();
226
227        assert_eq!(store.count().unwrap(), 1);
228    }
229
230    #[test]
231    fn test_reset() {
232        let store = MsgStore::new(None, false).unwrap();
233
234        let elem = create_test_elem();
235        let elems = vec![(elem, "test_collector".to_string())];
236
237        store.insert_elems(&elems).unwrap();
238        assert_eq!(store.count().unwrap(), 1);
239
240        // Create new store with reset
241        let store2 = MsgStore::new(None, true).unwrap();
242        assert_eq!(store2.count().unwrap(), 0);
243    }
244}