1#![allow(unexpected_cfgs)]
16
17use anyhow::Result;
18use async_trait::async_trait;
19use base64::Engine;
20use drasi_core::models::{
21 Element, ElementMetadata, ElementPropertyMap, ElementReference, ElementValue, SourceChange,
22};
23use drasi_lib::bootstrap::{
24 BootstrapContext, BootstrapProvider, BootstrapRequest, BootstrapResult,
25};
26use drasi_lib::channels::BootstrapEventSender;
27use log::{info, warn};
28use ordered_float::OrderedFloat;
29use rusqlite::types::ValueRef;
30use rusqlite::{Connection, OpenFlags};
31use std::sync::Arc;
32
33pub mod descriptor;
34
35#[derive(Debug, Clone, PartialEq, Eq)]
37pub struct TableKeyConfig {
38 pub table: String,
39 pub key_columns: Vec<String>,
40}
41
42#[derive(Clone)]
44pub struct SqliteBootstrapProvider {
45 path: Option<String>,
46 tables: Option<Vec<String>>,
47 table_keys: Vec<TableKeyConfig>,
48}
49
50impl SqliteBootstrapProvider {
51 pub fn builder() -> SqliteBootstrapBuilder {
52 SqliteBootstrapBuilder::new()
53 }
54
55 fn key_columns_for_table(&self, conn: &Connection, table: &str) -> Result<Vec<String>> {
56 if let Some(cfg) = self.table_keys.iter().find(|item| item.table == table) {
57 return Ok(cfg.key_columns.clone());
58 }
59 detect_primary_key(conn, table)
60 }
61}
62
63pub struct SqliteBootstrapBuilder {
65 path: Option<String>,
66 tables: Option<Vec<String>>,
67 table_keys: Vec<TableKeyConfig>,
68}
69
70impl SqliteBootstrapBuilder {
71 fn new() -> Self {
72 Self {
73 path: None,
74 tables: None,
75 table_keys: Vec::new(),
76 }
77 }
78
79 pub fn with_path(mut self, path: impl Into<String>) -> Self {
80 self.path = Some(path.into());
81 self
82 }
83
84 pub fn in_memory(mut self) -> Self {
85 self.path = None;
86 self
87 }
88
89 pub fn with_tables(mut self, tables: Vec<String>) -> Self {
90 self.tables = Some(tables);
91 self
92 }
93
94 pub fn with_table_keys(mut self, table_keys: Vec<TableKeyConfig>) -> Self {
95 self.table_keys = table_keys;
96 self
97 }
98
99 pub fn build(self) -> SqliteBootstrapProvider {
100 SqliteBootstrapProvider {
101 path: self.path,
102 tables: self.tables,
103 table_keys: self.table_keys,
104 }
105 }
106}
107
108#[async_trait]
109impl BootstrapProvider for SqliteBootstrapProvider {
110 async fn bootstrap(
111 &self,
112 request: BootstrapRequest,
113 context: &BootstrapContext,
114 event_tx: BootstrapEventSender,
115 _settings: Option<&drasi_lib::config::SourceSubscriptionSettings>,
116 ) -> Result<BootstrapResult> {
117 info!("Starting SQLite bootstrap for query '{}'", request.query_id);
118
119 let changes = {
120 let Some(path) = &self.path else {
121 warn!("SQLite bootstrap skipped for in-memory database");
122 return Ok(BootstrapResult::default());
123 };
124
125 let conn = Connection::open_with_flags(path, OpenFlags::SQLITE_OPEN_READ_ONLY)?;
126 let tables = resolve_tables(&conn, self.tables.as_ref())?;
127 let mut changes = Vec::new();
128
129 for table in tables {
130 if !request.node_labels.is_empty() && !request.node_labels.contains(&table) {
131 continue;
132 }
133
134 let key_columns = self.key_columns_for_table(&conn, &table)?;
135 let rows = read_table_rows(&conn, &table)?;
136
137 for (row, rowid) in rows {
138 let element_id = generate_element_id(&table, &row, &key_columns, Some(rowid));
139 let mut properties = ElementPropertyMap::new();
140 for (name, value) in row {
141 properties.insert(&name, value);
142 }
143
144 let labels: Arc<[Arc<str>]> = vec![Arc::<str>::from(table.as_str())].into();
145 let element = Element::Node {
146 metadata: ElementMetadata {
147 reference: ElementReference::new(&context.source_id, &element_id),
148 labels,
149 effective_from: chrono::Utc::now().timestamp_millis() as u64,
150 },
151 properties,
152 };
153
154 changes.push(SourceChange::Insert { element });
155 }
156 }
157
158 changes
159 };
160
161 let mut count = 0usize;
162 for change in changes {
163 let event = drasi_lib::channels::BootstrapEvent {
164 source_id: context.source_id.clone(),
165 change,
166 timestamp: chrono::Utc::now(),
167 sequence: context.next_sequence(),
168 };
169 event_tx.send(event).await?;
170 count += 1;
171 }
172
173 info!(
174 "SQLite bootstrap completed for query '{}': {} rows",
175 request.query_id, count
176 );
177 Ok(BootstrapResult {
178 event_count: count,
179 ..Default::default()
180 })
181 }
182}
183
184fn resolve_tables(
185 conn: &Connection,
186 configured_tables: Option<&Vec<String>>,
187) -> Result<Vec<String>> {
188 if let Some(tables) = configured_tables {
189 return Ok(tables.clone());
190 }
191
192 let mut stmt = conn.prepare(
193 "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'",
194 )?;
195 let tables = stmt
196 .query_map([], |row| row.get::<_, String>(0))?
197 .collect::<std::result::Result<Vec<_>, _>>()?;
198 Ok(tables)
199}
200
201fn read_table_rows(
202 conn: &Connection,
203 table: &str,
204) -> Result<Vec<(Vec<(String, ElementValue)>, i64)>> {
205 let query = format!("SELECT rowid, * FROM {}", quote_ident(table));
206 let mut stmt = conn.prepare(&query)?;
207 let column_count = stmt.column_count();
208 let column_names: Vec<String> = (1..column_count)
210 .map(|index| stmt.column_name(index).unwrap_or("").to_string())
211 .collect();
212
213 let mut rows = stmt.query([])?;
214 let mut result = Vec::new();
215 while let Some(row) = rows.next()? {
216 let rowid: i64 = row.get(0)?;
217 let mut values = Vec::with_capacity(column_names.len());
218 for (i, name) in column_names.iter().enumerate() {
219 let value_ref = row.get_ref(i + 1)?;
220 values.push((name.clone(), value_ref_to_element_value(value_ref)));
221 }
222 result.push((values, rowid));
223 }
224 Ok(result)
225}
226
227fn detect_primary_key(conn: &Connection, table: &str) -> Result<Vec<String>> {
228 let sql = format!("PRAGMA table_info({})", quote_ident(table));
229 let mut stmt = conn.prepare(&sql)?;
230 let mut key_pairs = stmt
231 .query_map([], |row| {
232 let name: String = row.get(1)?;
233 let pk: i64 = row.get(5)?;
234 Ok((pk, name))
235 })?
236 .collect::<std::result::Result<Vec<_>, _>>()?;
237
238 key_pairs.retain(|(pk, _)| *pk > 0);
239 key_pairs.sort_by_key(|(pk, _)| *pk);
240 Ok(key_pairs.into_iter().map(|(_, name)| name).collect())
241}
242
243fn value_ref_to_element_value(value_ref: ValueRef<'_>) -> ElementValue {
244 match value_ref {
245 ValueRef::Null => ElementValue::Null,
246 ValueRef::Integer(i) => ElementValue::Integer(i),
247 ValueRef::Real(f) => ElementValue::Float(OrderedFloat(f)),
248 ValueRef::Text(t) => ElementValue::String(Arc::from(String::from_utf8_lossy(t).as_ref())),
249 ValueRef::Blob(b) => ElementValue::String(Arc::from(
250 base64::engine::general_purpose::STANDARD.encode(b),
251 )),
252 }
253}
254
255fn generate_element_id(
256 table: &str,
257 values: &[(String, ElementValue)],
258 key_columns: &[String],
259 rowid: Option<i64>,
260) -> String {
261 if !key_columns.is_empty() {
262 let key_parts = key_columns
263 .iter()
264 .filter_map(|column| {
265 values
266 .iter()
267 .find(|(name, _)| name == column)
268 .map(|(_, v)| v)
269 })
270 .map(value_to_id_fragment)
271 .collect::<Vec<_>>();
272
273 if !key_parts.is_empty() {
274 return format!("{table}:{}", key_parts.join(":"));
275 }
276 }
277
278 if let Some(id) = rowid {
280 return format!("{table}:{id}");
281 }
282
283 format!("{table}:unknown")
284}
285
286fn value_to_id_fragment(value: &ElementValue) -> String {
287 match value {
288 ElementValue::Null => "null".to_string(),
289 ElementValue::Bool(v) => v.to_string(),
290 ElementValue::Float(v) => v.to_string(),
291 ElementValue::Integer(v) => v.to_string(),
292 ElementValue::String(v) => v.to_string(),
293 ElementValue::LocalDateTime(v) => v.to_string(),
294 ElementValue::ZonedDateTime(v) => v.to_rfc3339(),
295 ElementValue::List(v) => format!("{v:?}").replace(':', "%3A"),
296 ElementValue::Object(v) => format!("{v:?}").replace(':', "%3A"),
297 }
298}
299
300fn quote_ident(identifier: &str) -> String {
301 format!("\"{}\"", identifier.replace('"', "\"\""))
302}
303
304#[cfg(test)]
305mod tests {
306 use super::*;
307
308 #[test]
309 fn generate_element_id_uses_key_columns_when_provided() {
310 let values = vec![
311 ("id".to_string(), ElementValue::Integer(42)),
312 ("name".to_string(), ElementValue::String(Arc::from("test"))),
313 ];
314 let keys = vec!["id".to_string()];
315 assert_eq!(
316 generate_element_id("sensors", &values, &keys, Some(1)),
317 "sensors:42"
318 );
319 }
320
321 #[test]
322 fn generate_element_id_uses_composite_keys() {
323 let values = vec![
324 ("tenant".to_string(), ElementValue::String(Arc::from("t1"))),
325 (
326 "event_id".to_string(),
327 ElementValue::String(Arc::from("e1")),
328 ),
329 ];
330 let keys = vec!["tenant".to_string(), "event_id".to_string()];
331 assert_eq!(
332 generate_element_id("events", &values, &keys, Some(99)),
333 "events:t1:e1"
334 );
335 }
336
337 #[test]
338 fn generate_element_id_falls_back_to_rowid_when_no_keys() {
339 let values = vec![
340 ("name".to_string(), ElementValue::String(Arc::from("test"))),
341 ("value".to_string(), ElementValue::Integer(100)),
342 ];
343 let keys: Vec<String> = vec![];
344 assert_eq!(
345 generate_element_id("data", &values, &keys, Some(7)),
346 "data:7"
347 );
348 }
349
350 #[test]
351 fn generate_element_id_returns_unknown_without_keys_or_rowid() {
352 let values = vec![("x".to_string(), ElementValue::Integer(1))];
353 let keys: Vec<String> = vec![];
354 assert_eq!(
355 generate_element_id("data", &values, &keys, None),
356 "data:unknown"
357 );
358 }
359
360 #[test]
361 fn read_table_rows_returns_rows_with_rowid() {
362 let conn = Connection::open_in_memory().expect("open");
363 conn.execute_batch("CREATE TABLE items (name TEXT, value INTEGER); INSERT INTO items VALUES ('a', 1); INSERT INTO items VALUES ('b', 2);")
364 .expect("setup");
365
366 let rows = read_table_rows(&conn, "items").expect("read");
367 assert_eq!(rows.len(), 2);
368
369 let (first_row, first_rowid) = &rows[0];
370 assert_eq!(*first_rowid, 1);
371 assert_eq!(first_row.len(), 2);
372 assert_eq!(first_row[0].0, "name");
373
374 let (second_row, second_rowid) = &rows[1];
375 assert_eq!(*second_rowid, 2);
376 assert_eq!(second_row.len(), 2);
377 let _ = second_row;
378 }
379
380 #[test]
381 fn detect_primary_key_finds_pk_columns() {
382 let conn = Connection::open_in_memory().expect("open");
383 conn.execute_batch("CREATE TABLE sensors (id INTEGER PRIMARY KEY, name TEXT)")
384 .expect("setup");
385
386 let pks = detect_primary_key(&conn, "sensors").expect("detect");
387 assert_eq!(pks, vec!["id".to_string()]);
388 }
389
390 #[test]
391 fn detect_primary_key_returns_empty_for_no_pk() {
392 let conn = Connection::open_in_memory().expect("open");
393 conn.execute_batch("CREATE TABLE data (x TEXT, y TEXT)")
394 .expect("setup");
395
396 let pks = detect_primary_key(&conn, "data").expect("detect");
397 assert!(pks.is_empty());
398 }
399
400 #[test]
401 fn element_id_matches_between_bootstrap_and_source_with_pk() {
402 let conn = Connection::open_in_memory().expect("open");
405 conn.execute_batch("CREATE TABLE sensors (id INTEGER PRIMARY KEY, name TEXT); INSERT INTO sensors VALUES (42, 'test');")
406 .expect("setup");
407
408 let pks = detect_primary_key(&conn, "sensors").expect("detect pk");
409 let rows = read_table_rows(&conn, "sensors").expect("read");
410 let (row, rowid) = &rows[0];
411
412 let bootstrap_id = generate_element_id("sensors", row, &pks, Some(*rowid));
413 assert_eq!(bootstrap_id, "sensors:42");
415 }
416
417 #[test]
418 fn element_id_matches_between_bootstrap_and_source_without_pk() {
419 let conn = Connection::open_in_memory().expect("open");
422 conn.execute_batch(
423 "CREATE TABLE data (x TEXT, y TEXT); INSERT INTO data VALUES ('a', 'b');",
424 )
425 .expect("setup");
426
427 let pks = detect_primary_key(&conn, "data").expect("detect pk");
428 assert!(pks.is_empty());
429
430 let rows = read_table_rows(&conn, "data").expect("read");
431 let (row, rowid) = &rows[0];
432
433 let bootstrap_id = generate_element_id("data", row, &pks, Some(*rowid));
434 assert_eq!(bootstrap_id, "data:1");
436 }
437}
438
439#[cfg(feature = "dynamic-plugin")]
441drasi_plugin_sdk::export_plugin!(
442 plugin_id = "sqlite-bootstrap",
443 core_version = env!("CARGO_PKG_VERSION"),
444 lib_version = env!("CARGO_PKG_VERSION"),
445 plugin_version = env!("CARGO_PKG_VERSION"),
446 source_descriptors = [],
447 reaction_descriptors = [],
448 bootstrap_descriptors = [descriptor::SqliteBootstrapDescriptor],
449);