monocle/database/session/
msg_store.rs1use anyhow::{anyhow, Result};
7use bgpkit_parser::models::ElemType;
8use bgpkit_parser::BgpElem;
9use itertools::Itertools;
10
11use crate::database::core::DatabaseConn;
12
13pub struct MsgStore {
19 db: DatabaseConn,
20}
21
22impl MsgStore {
23 pub fn new(db_path: Option<&str>, reset: bool) -> Result<Self> {
29 let db = DatabaseConn::open(db_path)?;
30 let store = MsgStore { db };
31 store.initialize(reset)?;
32 Ok(store)
33 }
34
35 pub fn new_from_option(db_path: &Option<String>, reset: bool) -> Result<Self> {
40 Self::new(db_path.as_deref(), reset)
41 }
42
43 fn initialize(&self, reset: bool) -> Result<()> {
45 if reset {
46 self.db
47 .conn
48 .execute("DROP TABLE IF EXISTS elems", [])
49 .map_err(|e| anyhow!("Failed to drop elems table: {}", e))?;
50 }
51
52 self.db
53 .conn
54 .execute(
55 r#"
56 CREATE TABLE IF NOT EXISTS elems (
57 timestamp INTEGER,
58 elem_type TEXT,
59 collector TEXT,
60 peer_ip TEXT,
61 peer_asn INTEGER,
62 prefix TEXT,
63 next_hop TEXT,
64 as_path TEXT,
65 origin_asns TEXT,
66 origin TEXT,
67 local_pref INTEGER,
68 med INTEGER,
69 communities TEXT,
70 atomic TEXT,
71 aggr_asn INTEGER,
72 aggr_ip TEXT
73 );
74 "#,
75 [],
76 )
77 .map_err(|e| anyhow!("Failed to create elems table: {}", e))?;
78
79 self.db.conn.execute(
81 "CREATE INDEX IF NOT EXISTS idx_timestamp ON elems(timestamp)",
82 [],
83 )?;
84 self.db.conn.execute(
85 "CREATE INDEX IF NOT EXISTS idx_peer_asn ON elems(peer_asn)",
86 [],
87 )?;
88 self.db
89 .conn
90 .execute("CREATE INDEX IF NOT EXISTS idx_prefix ON elems(prefix)", [])?;
91 self.db.conn.execute(
92 "CREATE INDEX IF NOT EXISTS idx_collector ON elems(collector)",
93 [],
94 )?;
95 self.db.conn.execute(
96 "CREATE INDEX IF NOT EXISTS idx_elem_type ON elems(elem_type)",
97 [],
98 )?;
99
100 Ok(())
101 }
102
103 pub fn insert_elems(&self, elems: &[(BgpElem, String)]) -> Result<()> {
108 if elems.is_empty() {
109 return Ok(());
110 }
111
112 let tx = self
114 .db
115 .conn
116 .unchecked_transaction()
117 .map_err(|e| anyhow!("Failed to begin transaction: {}", e))?;
118
119 {
120 let mut stmt = tx
122 .prepare_cached(
123 "INSERT INTO elems (timestamp, elem_type, collector, peer_ip, peer_asn,
124 prefix, next_hop, as_path, origin_asns, origin, local_pref, med,
125 communities, atomic, aggr_asn, aggr_ip)
126 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
127 )
128 .map_err(|e| anyhow!("Failed to prepare statement: {}", e))?;
129
130 for (elem, collector) in elems {
131 let elem_type = match elem.elem_type {
132 ElemType::ANNOUNCE => "A",
133 ElemType::WITHDRAW => "W",
134 };
135 let origin_string = elem
136 .origin_asns
137 .as_ref()
138 .and_then(|asns| asns.first())
139 .map(|asn| asn.to_string());
140 let communities_str = elem.communities.as_ref().map(|v| v.iter().join(" "));
141
142 stmt.execute(rusqlite::params![
143 elem.timestamp as u32,
144 elem_type,
145 collector,
146 elem.peer_ip.to_string(),
147 elem.peer_asn.to_u32(),
148 elem.prefix.to_string(),
149 elem.next_hop.as_ref().map(|v| v.to_string()),
150 elem.as_path.as_ref().map(|v| v.to_string()),
151 origin_string,
152 elem.origin.as_ref().map(|v| v.to_string()),
153 elem.local_pref,
154 elem.med,
155 communities_str,
156 if elem.atomic { "AG" } else { "NAG" },
157 elem.aggr_asn.map(|asn| asn.to_u32()),
158 elem.aggr_ip.as_ref().map(|v| v.to_string()),
159 ])
160 .map_err(|e| anyhow!("Failed to insert element: {}", e))?;
161 }
162 }
163
164 tx.commit()
165 .map_err(|e| anyhow!("Failed to commit transaction: {}", e))?;
166 Ok(())
167 }
168
169 pub fn count(&self) -> Result<u64> {
171 self.db.table_count("elems")
172 }
173
174 pub fn connection(&self) -> &rusqlite::Connection {
176 &self.db.conn
177 }
178}
179
180#[cfg(test)]
181mod tests {
182 use super::*;
183 use bgpkit_parser::models::{AsPath, AsPathSegment, NetworkPrefix, Origin};
184 use std::net::{IpAddr, Ipv4Addr};
185 use std::str::FromStr;
186
187 fn create_test_elem() -> BgpElem {
188 BgpElem {
189 timestamp: 1234567890.0,
190 elem_type: ElemType::ANNOUNCE,
191 peer_ip: IpAddr::V4(Ipv4Addr::new(192, 0, 2, 1)),
192 peer_asn: 65000.into(),
193 prefix: NetworkPrefix::from_str("10.0.0.0/8").unwrap(),
194 next_hop: Some(IpAddr::V4(Ipv4Addr::new(192, 0, 2, 1))),
195 as_path: Some(AsPath {
196 segments: vec![AsPathSegment::AsSequence(vec![65000.into(), 65001.into()])],
197 }),
198 origin_asns: Some(vec![65001.into()]),
199 origin: Some(Origin::IGP),
200 local_pref: Some(100),
201 med: Some(0),
202 communities: None,
203 atomic: false,
204 aggr_asn: None,
205 aggr_ip: None,
206 only_to_customer: None,
207 unknown: None,
208 deprecated: None,
209 }
210 }
211
212 #[test]
213 fn test_create_msg_store() {
214 let store = MsgStore::new(None, false);
215 assert!(store.is_ok());
216 }
217
218 #[test]
219 fn test_insert_and_count() {
220 let store = MsgStore::new(None, false).unwrap();
221
222 let elem = create_test_elem();
223 let elems = vec![(elem, "test_collector".to_string())];
224
225 store.insert_elems(&elems).unwrap();
226
227 assert_eq!(store.count().unwrap(), 1);
228 }
229
230 #[test]
231 fn test_reset() {
232 let store = MsgStore::new(None, false).unwrap();
233
234 let elem = create_test_elem();
235 let elems = vec![(elem, "test_collector".to_string())];
236
237 store.insert_elems(&elems).unwrap();
238 assert_eq!(store.count().unwrap(), 1);
239
240 let store2 = MsgStore::new(None, true).unwrap();
242 assert_eq!(store2.count().unwrap(), 0);
243 }
244}