Skip to main content

drasi_bootstrap_sqlite/
lib.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(unexpected_cfgs)]
16
17use anyhow::Result;
18use async_trait::async_trait;
19use base64::Engine;
20use drasi_core::models::{
21    Element, ElementMetadata, ElementPropertyMap, ElementReference, ElementValue, SourceChange,
22};
23use drasi_lib::bootstrap::{
24    BootstrapContext, BootstrapProvider, BootstrapRequest, BootstrapResult,
25};
26use drasi_lib::channels::BootstrapEventSender;
27use log::{info, warn};
28use ordered_float::OrderedFloat;
29use rusqlite::types::ValueRef;
30use rusqlite::{Connection, OpenFlags};
31use std::sync::Arc;
32
33pub mod descriptor;
34
35/// Per-table key configuration for stable element IDs.
36#[derive(Debug, Clone, PartialEq, Eq)]
37pub struct TableKeyConfig {
38    pub table: String,
39    pub key_columns: Vec<String>,
40}
41
42/// SQLite bootstrap provider.
43#[derive(Clone)]
44pub struct SqliteBootstrapProvider {
45    path: Option<String>,
46    tables: Option<Vec<String>>,
47    table_keys: Vec<TableKeyConfig>,
48}
49
50impl SqliteBootstrapProvider {
51    pub fn builder() -> SqliteBootstrapBuilder {
52        SqliteBootstrapBuilder::new()
53    }
54
55    fn key_columns_for_table(&self, conn: &Connection, table: &str) -> Result<Vec<String>> {
56        if let Some(cfg) = self.table_keys.iter().find(|item| item.table == table) {
57            return Ok(cfg.key_columns.clone());
58        }
59        detect_primary_key(conn, table)
60    }
61}
62
63/// Builder for [`SqliteBootstrapProvider`].
64pub struct SqliteBootstrapBuilder {
65    path: Option<String>,
66    tables: Option<Vec<String>>,
67    table_keys: Vec<TableKeyConfig>,
68}
69
70impl SqliteBootstrapBuilder {
71    fn new() -> Self {
72        Self {
73            path: None,
74            tables: None,
75            table_keys: Vec::new(),
76        }
77    }
78
79    pub fn with_path(mut self, path: impl Into<String>) -> Self {
80        self.path = Some(path.into());
81        self
82    }
83
84    pub fn in_memory(mut self) -> Self {
85        self.path = None;
86        self
87    }
88
89    pub fn with_tables(mut self, tables: Vec<String>) -> Self {
90        self.tables = Some(tables);
91        self
92    }
93
94    pub fn with_table_keys(mut self, table_keys: Vec<TableKeyConfig>) -> Self {
95        self.table_keys = table_keys;
96        self
97    }
98
99    pub fn build(self) -> SqliteBootstrapProvider {
100        SqliteBootstrapProvider {
101            path: self.path,
102            tables: self.tables,
103            table_keys: self.table_keys,
104        }
105    }
106}
107
108#[async_trait]
109impl BootstrapProvider for SqliteBootstrapProvider {
110    async fn bootstrap(
111        &self,
112        request: BootstrapRequest,
113        context: &BootstrapContext,
114        event_tx: BootstrapEventSender,
115        _settings: Option<&drasi_lib::config::SourceSubscriptionSettings>,
116    ) -> Result<BootstrapResult> {
117        info!("Starting SQLite bootstrap for query '{}'", request.query_id);
118
119        let changes = {
120            let Some(path) = &self.path else {
121                warn!("SQLite bootstrap skipped for in-memory database");
122                return Ok(BootstrapResult::default());
123            };
124
125            let conn = Connection::open_with_flags(path, OpenFlags::SQLITE_OPEN_READ_ONLY)?;
126            let tables = resolve_tables(&conn, self.tables.as_ref())?;
127            let mut changes = Vec::new();
128
129            for table in tables {
130                if !request.node_labels.is_empty() && !request.node_labels.contains(&table) {
131                    continue;
132                }
133
134                let key_columns = self.key_columns_for_table(&conn, &table)?;
135                let rows = read_table_rows(&conn, &table)?;
136
137                for (row, rowid) in rows {
138                    let element_id = generate_element_id(&table, &row, &key_columns, Some(rowid));
139                    let mut properties = ElementPropertyMap::new();
140                    for (name, value) in row {
141                        properties.insert(&name, value);
142                    }
143
144                    let labels: Arc<[Arc<str>]> = vec![Arc::<str>::from(table.as_str())].into();
145                    let element = Element::Node {
146                        metadata: ElementMetadata {
147                            reference: ElementReference::new(&context.source_id, &element_id),
148                            labels,
149                            effective_from: chrono::Utc::now().timestamp_millis() as u64,
150                        },
151                        properties,
152                    };
153
154                    changes.push(SourceChange::Insert { element });
155                }
156            }
157
158            changes
159        };
160
161        let mut count = 0usize;
162        for change in changes {
163            let event = drasi_lib::channels::BootstrapEvent {
164                source_id: context.source_id.clone(),
165                change,
166                timestamp: chrono::Utc::now(),
167                sequence: context.next_sequence(),
168            };
169            event_tx.send(event).await?;
170            count += 1;
171        }
172
173        info!(
174            "SQLite bootstrap completed for query '{}': {} rows",
175            request.query_id, count
176        );
177        Ok(BootstrapResult {
178            event_count: count,
179            ..Default::default()
180        })
181    }
182}
183
184fn resolve_tables(
185    conn: &Connection,
186    configured_tables: Option<&Vec<String>>,
187) -> Result<Vec<String>> {
188    if let Some(tables) = configured_tables {
189        return Ok(tables.clone());
190    }
191
192    let mut stmt = conn.prepare(
193        "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'",
194    )?;
195    let tables = stmt
196        .query_map([], |row| row.get::<_, String>(0))?
197        .collect::<std::result::Result<Vec<_>, _>>()?;
198    Ok(tables)
199}
200
201fn read_table_rows(
202    conn: &Connection,
203    table: &str,
204) -> Result<Vec<(Vec<(String, ElementValue)>, i64)>> {
205    let query = format!("SELECT rowid, * FROM {}", quote_ident(table));
206    let mut stmt = conn.prepare(&query)?;
207    let column_count = stmt.column_count();
208    // First column is rowid, remaining are user columns
209    let column_names: Vec<String> = (1..column_count)
210        .map(|index| stmt.column_name(index).unwrap_or("").to_string())
211        .collect();
212
213    let mut rows = stmt.query([])?;
214    let mut result = Vec::new();
215    while let Some(row) = rows.next()? {
216        let rowid: i64 = row.get(0)?;
217        let mut values = Vec::with_capacity(column_names.len());
218        for (i, name) in column_names.iter().enumerate() {
219            let value_ref = row.get_ref(i + 1)?;
220            values.push((name.clone(), value_ref_to_element_value(value_ref)));
221        }
222        result.push((values, rowid));
223    }
224    Ok(result)
225}
226
227fn detect_primary_key(conn: &Connection, table: &str) -> Result<Vec<String>> {
228    let sql = format!("PRAGMA table_info({})", quote_ident(table));
229    let mut stmt = conn.prepare(&sql)?;
230    let mut key_pairs = stmt
231        .query_map([], |row| {
232            let name: String = row.get(1)?;
233            let pk: i64 = row.get(5)?;
234            Ok((pk, name))
235        })?
236        .collect::<std::result::Result<Vec<_>, _>>()?;
237
238    key_pairs.retain(|(pk, _)| *pk > 0);
239    key_pairs.sort_by_key(|(pk, _)| *pk);
240    Ok(key_pairs.into_iter().map(|(_, name)| name).collect())
241}
242
243fn value_ref_to_element_value(value_ref: ValueRef<'_>) -> ElementValue {
244    match value_ref {
245        ValueRef::Null => ElementValue::Null,
246        ValueRef::Integer(i) => ElementValue::Integer(i),
247        ValueRef::Real(f) => ElementValue::Float(OrderedFloat(f)),
248        ValueRef::Text(t) => ElementValue::String(Arc::from(String::from_utf8_lossy(t).as_ref())),
249        ValueRef::Blob(b) => ElementValue::String(Arc::from(
250            base64::engine::general_purpose::STANDARD.encode(b),
251        )),
252    }
253}
254
255fn generate_element_id(
256    table: &str,
257    values: &[(String, ElementValue)],
258    key_columns: &[String],
259    rowid: Option<i64>,
260) -> String {
261    if !key_columns.is_empty() {
262        let key_parts = key_columns
263            .iter()
264            .filter_map(|column| {
265                values
266                    .iter()
267                    .find(|(name, _)| name == column)
268                    .map(|(_, v)| v)
269            })
270            .map(value_to_id_fragment)
271            .collect::<Vec<_>>();
272
273        if !key_parts.is_empty() {
274            return format!("{table}:{}", key_parts.join(":"));
275        }
276    }
277
278    // Fall back to rowid, matching the source's CDC behavior
279    if let Some(id) = rowid {
280        return format!("{table}:{id}");
281    }
282
283    format!("{table}:unknown")
284}
285
286fn value_to_id_fragment(value: &ElementValue) -> String {
287    match value {
288        ElementValue::Null => "null".to_string(),
289        ElementValue::Bool(v) => v.to_string(),
290        ElementValue::Float(v) => v.to_string(),
291        ElementValue::Integer(v) => v.to_string(),
292        ElementValue::String(v) => v.to_string(),
293        ElementValue::LocalDateTime(v) => v.to_string(),
294        ElementValue::ZonedDateTime(v) => v.to_rfc3339(),
295        ElementValue::List(v) => format!("{v:?}").replace(':', "%3A"),
296        ElementValue::Object(v) => format!("{v:?}").replace(':', "%3A"),
297    }
298}
299
300fn quote_ident(identifier: &str) -> String {
301    format!("\"{}\"", identifier.replace('"', "\"\""))
302}
303
304#[cfg(test)]
305mod tests {
306    use super::*;
307
308    #[test]
309    fn generate_element_id_uses_key_columns_when_provided() {
310        let values = vec![
311            ("id".to_string(), ElementValue::Integer(42)),
312            ("name".to_string(), ElementValue::String(Arc::from("test"))),
313        ];
314        let keys = vec!["id".to_string()];
315        assert_eq!(
316            generate_element_id("sensors", &values, &keys, Some(1)),
317            "sensors:42"
318        );
319    }
320
321    #[test]
322    fn generate_element_id_uses_composite_keys() {
323        let values = vec![
324            ("tenant".to_string(), ElementValue::String(Arc::from("t1"))),
325            (
326                "event_id".to_string(),
327                ElementValue::String(Arc::from("e1")),
328            ),
329        ];
330        let keys = vec!["tenant".to_string(), "event_id".to_string()];
331        assert_eq!(
332            generate_element_id("events", &values, &keys, Some(99)),
333            "events:t1:e1"
334        );
335    }
336
337    #[test]
338    fn generate_element_id_falls_back_to_rowid_when_no_keys() {
339        let values = vec![
340            ("name".to_string(), ElementValue::String(Arc::from("test"))),
341            ("value".to_string(), ElementValue::Integer(100)),
342        ];
343        let keys: Vec<String> = vec![];
344        assert_eq!(
345            generate_element_id("data", &values, &keys, Some(7)),
346            "data:7"
347        );
348    }
349
350    #[test]
351    fn generate_element_id_returns_unknown_without_keys_or_rowid() {
352        let values = vec![("x".to_string(), ElementValue::Integer(1))];
353        let keys: Vec<String> = vec![];
354        assert_eq!(
355            generate_element_id("data", &values, &keys, None),
356            "data:unknown"
357        );
358    }
359
360    #[test]
361    fn read_table_rows_returns_rows_with_rowid() {
362        let conn = Connection::open_in_memory().expect("open");
363        conn.execute_batch("CREATE TABLE items (name TEXT, value INTEGER); INSERT INTO items VALUES ('a', 1); INSERT INTO items VALUES ('b', 2);")
364            .expect("setup");
365
366        let rows = read_table_rows(&conn, "items").expect("read");
367        assert_eq!(rows.len(), 2);
368
369        let (first_row, first_rowid) = &rows[0];
370        assert_eq!(*first_rowid, 1);
371        assert_eq!(first_row.len(), 2);
372        assert_eq!(first_row[0].0, "name");
373
374        let (second_row, second_rowid) = &rows[1];
375        assert_eq!(*second_rowid, 2);
376        assert_eq!(second_row.len(), 2);
377        let _ = second_row;
378    }
379
380    #[test]
381    fn detect_primary_key_finds_pk_columns() {
382        let conn = Connection::open_in_memory().expect("open");
383        conn.execute_batch("CREATE TABLE sensors (id INTEGER PRIMARY KEY, name TEXT)")
384            .expect("setup");
385
386        let pks = detect_primary_key(&conn, "sensors").expect("detect");
387        assert_eq!(pks, vec!["id".to_string()]);
388    }
389
390    #[test]
391    fn detect_primary_key_returns_empty_for_no_pk() {
392        let conn = Connection::open_in_memory().expect("open");
393        conn.execute_batch("CREATE TABLE data (x TEXT, y TEXT)")
394            .expect("setup");
395
396        let pks = detect_primary_key(&conn, "data").expect("detect");
397        assert!(pks.is_empty());
398    }
399
400    #[test]
401    fn element_id_matches_between_bootstrap_and_source_with_pk() {
402        // Verifies that bootstrap and source produce the same element ID
403        // when a table has a declared PK.
404        let conn = Connection::open_in_memory().expect("open");
405        conn.execute_batch("CREATE TABLE sensors (id INTEGER PRIMARY KEY, name TEXT); INSERT INTO sensors VALUES (42, 'test');")
406            .expect("setup");
407
408        let pks = detect_primary_key(&conn, "sensors").expect("detect pk");
409        let rows = read_table_rows(&conn, "sensors").expect("read");
410        let (row, rowid) = &rows[0];
411
412        let bootstrap_id = generate_element_id("sensors", row, &pks, Some(*rowid));
413        // Source would produce "sensors:42" via PK column "id"
414        assert_eq!(bootstrap_id, "sensors:42");
415    }
416
417    #[test]
418    fn element_id_matches_between_bootstrap_and_source_without_pk() {
419        // Verifies that bootstrap falls back to rowid just like source does
420        // when no PK is declared.
421        let conn = Connection::open_in_memory().expect("open");
422        conn.execute_batch(
423            "CREATE TABLE data (x TEXT, y TEXT); INSERT INTO data VALUES ('a', 'b');",
424        )
425        .expect("setup");
426
427        let pks = detect_primary_key(&conn, "data").expect("detect pk");
428        assert!(pks.is_empty());
429
430        let rows = read_table_rows(&conn, "data").expect("read");
431        let (row, rowid) = &rows[0];
432
433        let bootstrap_id = generate_element_id("data", row, &pks, Some(*rowid));
434        // Source would produce "data:1" via rowid fallback
435        assert_eq!(bootstrap_id, "data:1");
436    }
437}
438
439/// Dynamic plugin entry point.
440#[cfg(feature = "dynamic-plugin")]
441drasi_plugin_sdk::export_plugin!(
442    plugin_id = "sqlite-bootstrap",
443    core_version = env!("CARGO_PKG_VERSION"),
444    lib_version = env!("CARGO_PKG_VERSION"),
445    plugin_version = env!("CARGO_PKG_VERSION"),
446    source_descriptors = [],
447    reaction_descriptors = [],
448    bootstrap_descriptors = [descriptor::SqliteBootstrapDescriptor],
449);