Skip to main content

sqlitegraph/backend/sqlite/
impl_.rs

1//! SQLite backend implementation for GraphBackend trait.
2//!
3//! This module contains the SqliteGraphBackend implementation that wraps
4//! SqliteGraph and provides the concrete implementation of all GraphBackend methods.
5
6use parking_lot::RwLock;
7use rusqlite::params;
8
9use crate::{
10    SqliteGraphError,
11    backend::sqlite::types::{BackendDirection, EdgeSpec, NeighborQuery, NodeSpec},
12    backend::{PubSubEvent, SubscriptionFilter},
13    bfs::{bfs_neighbors, shortest_path},
14    graph::{GraphEdge, GraphEntity, SqliteGraph},
15    multi_hop,
16    pattern::{self, PatternMatch, PatternQuery},
17};
18
19/// Simple in-memory publisher for SQLite backend pub/sub
20///
21/// Similar to V3's Publisher but simplified for SQLite backend use
22struct Publisher {
23    subscribers: RwLock<
24        Vec<(
25            u64,
26            std::sync::mpsc::Sender<PubSubEvent>,
27            SubscriptionFilter,
28        )>,
29    >,
30    next_id: std::sync::atomic::AtomicU64,
31}
32
33impl Publisher {
34    fn new() -> Self {
35        Self {
36            subscribers: RwLock::new(Vec::new()),
37            next_id: std::sync::atomic::AtomicU64::new(1),
38        }
39    }
40
41    fn subscribe(
42        &self,
43        filter: SubscriptionFilter,
44    ) -> (u64, std::sync::mpsc::Receiver<PubSubEvent>) {
45        let id = self
46            .next_id
47            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
48        let (tx, rx) = std::sync::mpsc::channel();
49
50        self.subscribers.write().push((id, tx, filter));
51        (id, rx)
52    }
53
54    fn unsubscribe(&self, subscriber_id: u64) -> bool {
55        let mut subs = self.subscribers.write();
56        if let Some(pos) = subs.iter().position(|(id, _, _)| *id == subscriber_id) {
57            subs.remove(pos);
58            true
59        } else {
60            false
61        }
62    }
63
64    fn emit(&self, event: PubSubEvent) {
65        let subs = self.subscribers.read();
66        for (_, sender, filter) in subs.iter() {
67            if Self::should_send(&event, filter) {
68                // Best-effort delivery - ignore send failures
69                let _ = sender.send(event.clone());
70            }
71        }
72    }
73
74    fn should_send(event: &PubSubEvent, filter: &SubscriptionFilter) -> bool {
75        match event {
76            PubSubEvent::NodeChanged { .. } => filter.node_changes,
77            PubSubEvent::EdgeChanged { .. } => filter.edge_changes,
78            PubSubEvent::KvChanged { .. } => filter.kv_changes,
79            PubSubEvent::SnapshotCommitted { .. } => filter.snapshot_commits,
80        }
81    }
82}
83
84/// Validate snapshot_id for SQLite backend
85///
86/// SQLite backend does not support historical snapshot isolation.
87/// Only SnapshotId(0) is supported, which is returned by SnapshotId::current().
88///
89/// # Snapshot ID Contract
90///
91/// - `SnapshotId::current()` returns SnapshotId(0) - the "current" sentinel
92/// - Any non-zero snapshot ID represents a historical snapshot (not supported)
93///
94/// Historical snapshot isolation would require:
95/// - WAL-based versioning with timestamp/LSN indexing
96/// - AS OF queries or point-in-time recovery mechanisms
97/// - Multi-version concurrency control (MVCC) extensions
98///
99/// These are not implemented in the current SQLite backend.
100fn validate_snapshot_for_sqlite(
101    snapshot_id: crate::snapshot::SnapshotId,
102) -> Result<(), SqliteGraphError> {
103    if snapshot_id.as_lsn() == 0 {
104        return Ok(());
105    }
106    Err(SqliteGraphError::query(format!(
107        "SQLite backend does not support historical snapshots (requested: {}). \
108        Only SnapshotId::current() (which returns SnapshotId(0)) is supported. \
109        Historical snapshot isolation requires AS OF queries or MVCC which are not implemented.",
110        snapshot_id.as_lsn()
111    )))
112}
113
114/// SQLite-backed implementation of the GraphBackend trait.
115///
116/// This struct wraps a SqliteGraph instance and implements all GraphBackend methods
117/// by delegating to the underlying SQLite-based graph operations.
118pub struct SqliteGraphBackend {
119    graph: SqliteGraph,
120    /// In-memory publisher for pub/sub support (lazy initialized)
121    publisher: RwLock<Option<Publisher>>,
122}
123
124impl SqliteGraphBackend {
125    /// Create a new SQLite backend with an in-memory database.
126    pub fn in_memory() -> Result<Self, SqliteGraphError> {
127        Ok(Self {
128            graph: SqliteGraph::open_in_memory()?,
129            publisher: RwLock::new(None),
130        })
131    }
132
133    /// Create a new SQLite backend from an existing SqliteGraph instance.
134    pub fn from_graph(graph: SqliteGraph) -> Self {
135        Self {
136            graph,
137            publisher: RwLock::new(None),
138        }
139    }
140
141    /// Get a reference to the underlying SqliteGraph instance.
142    pub fn graph(&self) -> &SqliteGraph {
143        &self.graph
144    }
145
146    /// Create a new HNSW vector storage using this SQLite backend
147    ///
148    /// # Arguments
149    ///
150    /// * `index_name` - Name for the HNSW index (used for table naming)
151    ///
152    /// # Returns
153    ///
154    /// `Some(Box<dyn VectorStorage>)` containing a storage backed by SQLite
155    ///
156    /// # Example
157    ///
158    /// ```ignore
159    /// let backend = SqliteGraphBackend::in_memory().unwrap();
160    /// let storage = backend.create_hnsw_storage("my_index").unwrap();
161    /// ```
162    pub fn create_hnsw_storage(
163        &self,
164        _index_name: impl Into<String>,
165    ) -> Option<Box<dyn crate::hnsw::storage::VectorStorage>> {
166        // SQLiteVectorStorage requires an owned Connection, but we only have a reference
167        // This is a limitation - we can't easily create a storage from &self
168        // The caller should use SQLiteVectorStorage::new() directly with a connection
169        None
170    }
171
172    /// Get all entity IDs from the graph.
173    pub fn entity_ids(&self) -> Result<Vec<i64>, SqliteGraphError> {
174        self.graph.all_entity_ids()
175    }
176
177    /// Ensure the kv_store table exists
178    fn ensure_kv_table(&self) -> Result<(), SqliteGraphError> {
179        let conn = self.graph.connection();
180
181        conn.execute(
182            "CREATE TABLE IF NOT EXISTS kv_store (
183                key TEXT PRIMARY KEY,
184                value_json TEXT NOT NULL,
185                ttl_seconds INTEGER,
186                version INTEGER NOT NULL DEFAULT 1,
187                created_at INTEGER NOT NULL,
188                updated_at INTEGER NOT NULL
189            )",
190            [],
191        )
192        .map_err(|e| {
193            SqliteGraphError::connection(format!("Failed to create kv_store table: {}", e))
194        })?;
195
196        Ok(())
197    }
198
199    /// Execute optimized neighbor queries based on direction and edge type filtering.
200    fn query_neighbors(
201        &self,
202        node: i64,
203        direction: BackendDirection,
204        edge_type: &Option<String>,
205    ) -> Result<Vec<i64>, SqliteGraphError> {
206        match (direction, edge_type) {
207            (BackendDirection::Outgoing, None) => self.graph.fetch_outgoing(node),
208            (BackendDirection::Incoming, None) => self.graph.fetch_incoming(node),
209            (BackendDirection::Outgoing, Some(edge_type)) => {
210                let conn = self.graph.connection();
211                let mut stmt = conn
212                    .prepare_cached(
213                        "SELECT to_id FROM graph_edges WHERE from_id=?1 AND edge_type=?2 ORDER BY to_id, id",
214                    )
215                    .map_err(|e| SqliteGraphError::query(e.to_string()))?;
216                let rows = stmt
217                    .query_map(params![node, edge_type], |row| row.get(0))
218                    .map_err(|e| SqliteGraphError::query(e.to_string()))?;
219                let mut values = Vec::new();
220                for value in rows {
221                    values.push(value.map_err(|e| SqliteGraphError::query(e.to_string()))?);
222                }
223                Ok(values)
224            }
225            (BackendDirection::Incoming, Some(edge_type)) => {
226                let conn = self.graph.connection();
227                let mut stmt = conn
228                    .prepare_cached(
229                        "SELECT from_id FROM graph_edges WHERE to_id=?1 AND edge_type=?2 ORDER BY from_id, id",
230                    )
231                    .map_err(|e| SqliteGraphError::query(e.to_string()))?;
232                let rows = stmt
233                    .query_map(params![node, edge_type], |row| row.get(0))
234                    .map_err(|e| SqliteGraphError::query(e.to_string()))?;
235                let mut values = Vec::new();
236                for value in rows {
237                    values.push(value.map_err(|e| SqliteGraphError::query(e.to_string()))?);
238                }
239                Ok(values)
240            }
241        }
242    }
243}
244
245impl crate::backend::GraphBackend for SqliteGraphBackend {
246    fn insert_node(&self, node: NodeSpec) -> Result<i64, SqliteGraphError> {
247        let id = self.graph.insert_entity(&GraphEntity {
248            id: 0,
249            kind: node.kind,
250            name: node.name,
251            file_path: node.file_path,
252            data: node.data,
253        })?;
254
255        // Emit event if publisher is initialized
256        let pub_guard = self.publisher.read();
257        if let Some(ref publisher) = *pub_guard {
258            publisher.emit(PubSubEvent::NodeChanged {
259                node_id: id,
260                snapshot_id: 0, // SQLite doesn't use snapshot IDs
261            });
262        }
263
264        Ok(id)
265    }
266
267    fn get_node(
268        &self,
269        snapshot_id: crate::snapshot::SnapshotId,
270        id: i64,
271    ) -> Result<GraphEntity, SqliteGraphError> {
272        validate_snapshot_for_sqlite(snapshot_id)?;
273        self.graph.get_entity(id)
274    }
275
276    fn insert_edge(&self, edge: EdgeSpec) -> Result<i64, SqliteGraphError> {
277        let id = self.graph.insert_edge(&GraphEdge {
278            id: 0,
279            from_id: edge.from,
280            to_id: edge.to,
281            edge_type: edge.edge_type,
282            data: edge.data,
283        })?;
284
285        // Emit event if publisher is initialized
286        let pub_guard = self.publisher.read();
287        if let Some(ref publisher) = *pub_guard {
288            publisher.emit(PubSubEvent::EdgeChanged {
289                from_node: edge.from,
290                to_node: edge.to,
291                edge_id: id,
292                snapshot_id: 0, // SQLite doesn't use snapshot IDs
293            });
294        }
295
296        Ok(id)
297    }
298
299    fn delete_entity(&self, id: i64) -> Result<(), SqliteGraphError> {
300        self.graph.delete_entity(id)
301    }
302
303    fn update_node(&self, node_id: i64, node: NodeSpec) -> Result<i64, SqliteGraphError> {
304        // SQLite backend: Use UPDATE SQL query
305        self.graph.update_entity(&GraphEntity {
306            id: node_id,
307            kind: node.kind,
308            name: node.name,
309            file_path: node.file_path,
310            data: node.data,
311        })?;
312        Ok(node_id)
313    }
314
315    fn entity_ids(&self) -> Result<Vec<i64>, SqliteGraphError> {
316        self.graph.all_entity_ids()
317    }
318
319    fn neighbors(
320        &self,
321        snapshot_id: crate::snapshot::SnapshotId,
322        node: i64,
323        query: NeighborQuery,
324    ) -> Result<Vec<i64>, SqliteGraphError> {
325        validate_snapshot_for_sqlite(snapshot_id)?;
326        self.query_neighbors(node, query.direction, &query.edge_type)
327    }
328
329    fn bfs(
330        &self,
331        snapshot_id: crate::snapshot::SnapshotId,
332        start: i64,
333        depth: u32,
334    ) -> Result<Vec<i64>, SqliteGraphError> {
335        validate_snapshot_for_sqlite(snapshot_id)?;
336        // Check query cache first
337        if let Some(cached_result) = self.graph.query_cache.get_bfs(start, depth) {
338            return Ok(cached_result);
339        }
340
341        // Cache miss - compute and cache the result
342        let result = bfs_neighbors(&self.graph, start, depth)?;
343        self.graph.query_cache.put_bfs(start, depth, result.clone());
344        Ok(result)
345    }
346
347    fn shortest_path(
348        &self,
349        snapshot_id: crate::snapshot::SnapshotId,
350        start: i64,
351        end: i64,
352    ) -> Result<Option<Vec<i64>>, SqliteGraphError> {
353        validate_snapshot_for_sqlite(snapshot_id)?;
354        // Check query cache first
355        if let Some(cached_result) = self.graph.query_cache.get_shortest_path(start, end) {
356            return Ok(cached_result);
357        }
358
359        // Cache miss - compute and cache the result
360        let result = shortest_path(&self.graph, start, end)?;
361        self.graph
362            .query_cache
363            .put_shortest_path(start, end, result.clone());
364        Ok(result)
365    }
366
367    fn node_degree(
368        &self,
369        snapshot_id: crate::snapshot::SnapshotId,
370        node: i64,
371    ) -> Result<(usize, usize), SqliteGraphError> {
372        validate_snapshot_for_sqlite(snapshot_id)?;
373        let out = self.graph.fetch_outgoing(node)?.len();
374        let incoming = self.graph.fetch_incoming(node)?.len();
375        Ok((out, incoming))
376    }
377
378    fn k_hop(
379        &self,
380        snapshot_id: crate::snapshot::SnapshotId,
381        start: i64,
382        depth: u32,
383        direction: BackendDirection,
384    ) -> Result<Vec<i64>, SqliteGraphError> {
385        validate_snapshot_for_sqlite(snapshot_id)?;
386        // Check query cache first
387        if let Some(cached_result) = self.graph.query_cache.get_k_hop(start, depth, direction) {
388            return Ok(cached_result);
389        }
390
391        // Cache miss - compute and cache the result
392        let result = multi_hop::k_hop(&self.graph, start, depth, direction)?;
393        self.graph
394            .query_cache
395            .put_k_hop(start, depth, direction, result.clone());
396        Ok(result)
397    }
398
399    fn k_hop_filtered(
400        &self,
401        snapshot_id: crate::snapshot::SnapshotId,
402        start: i64,
403        depth: u32,
404        direction: BackendDirection,
405        allowed_edge_types: &[&str],
406    ) -> Result<Vec<i64>, SqliteGraphError> {
407        validate_snapshot_for_sqlite(snapshot_id)?;
408        // Check query cache first
409        if let Some(cached_result) =
410            self.graph
411                .query_cache
412                .get_k_hop_filtered(start, depth, direction, allowed_edge_types)
413        {
414            return Ok(cached_result);
415        }
416
417        // Cache miss - compute and cache the result
418        let result =
419            multi_hop::k_hop_filtered(&self.graph, start, depth, direction, allowed_edge_types)?;
420        self.graph.query_cache.put_k_hop_filtered(
421            start,
422            depth,
423            direction,
424            allowed_edge_types,
425            result.clone(),
426        );
427        Ok(result)
428    }
429
430    fn chain_query(
431        &self,
432        snapshot_id: crate::snapshot::SnapshotId,
433        start: i64,
434        chain: &[crate::multi_hop::ChainStep],
435    ) -> Result<Vec<i64>, SqliteGraphError> {
436        validate_snapshot_for_sqlite(snapshot_id)?;
437        multi_hop::chain_query(&self.graph, start, chain)
438    }
439
440    fn pattern_search(
441        &self,
442        snapshot_id: crate::snapshot::SnapshotId,
443        start: i64,
444        pattern: &PatternQuery,
445    ) -> Result<Vec<PatternMatch>, SqliteGraphError> {
446        validate_snapshot_for_sqlite(snapshot_id)?;
447        pattern::execute_pattern(&self.graph, start, pattern)
448    }
449
450    fn checkpoint(&self) -> Result<(), SqliteGraphError> {
451        // Execute SQLite WAL checkpoint
452        let conn = self.graph.connection();
453        conn.query_row("PRAGMA wal_checkpoint(TRUNCATE)", [], |row| {
454            // wal_checkpoint returns a row with 3 integers: (busy, log, checkpointed)
455            // We don't need to use them, just execute the checkpoint
456            let _busy: i32 = row.get(0)?;
457            let _log: i32 = row.get(1)?;
458            let _checkpointed: i32 = row.get(2)?;
459            Ok(())
460        })
461        .map_err(|e| SqliteGraphError::connection(format!("WAL checkpoint failed: {}", e)))?;
462        Ok(())
463    }
464
465    fn flush(&self) -> Result<(), SqliteGraphError> {
466        // SQLite handles sync automatically; this is a no-op
467        Ok(())
468    }
469
470    fn backup(
471        &self,
472        backup_dir: &std::path::Path,
473    ) -> Result<crate::backend::BackupResult, SqliteGraphError> {
474        use std::fs;
475
476        // Ensure backup directory exists
477        fs::create_dir_all(backup_dir).map_err(|e| {
478            SqliteGraphError::connection(format!("Failed to create backup directory: {}", e))
479        })?;
480
481        // Generate backup filename with timestamp
482        let timestamp = std::time::SystemTime::now()
483            .duration_since(std::time::UNIX_EPOCH)
484            .unwrap_or_default()
485            .as_secs();
486        let backup_path = backup_dir.join(format!("backup_{}.db", timestamp));
487        let manifest_path = backup_dir.join(format!("backup_{}.json", timestamp));
488
489        // Use SQLite's backup API via VACUUM INTO for a clean backup
490        let conn = self.graph.connection();
491        conn.execute(&format!("VACUUM INTO '{}'", backup_path.display()), [])
492            .map_err(|e| SqliteGraphError::connection(format!("SQLite backup failed: {}", e)))?;
493
494        // Get backup metadata
495        let metadata = fs::metadata(&backup_path).map_err(|e| {
496            SqliteGraphError::connection(format!("Failed to read backup metadata: {}", e))
497        })?;
498
499        // Get entity count
500        let entity_ids = self
501            .graph
502            .all_entity_ids()
503            .map_err(|e| SqliteGraphError::query(format!("Failed to get entity count: {}", e)))?;
504
505        // Create a simple manifest
506        let manifest = serde_json::json!({
507            "timestamp": timestamp,
508            "backup_file": backup_path.display().to_string(),
509            "size_bytes": metadata.len(),
510            "entity_count": entity_ids.len(),
511        });
512        fs::write(&manifest_path, manifest.to_string()).map_err(|e| {
513            SqliteGraphError::connection(format!("Failed to write manifest: {}", e))
514        })?;
515
516        Ok(crate::backend::BackupResult {
517            snapshot_path: backup_path,
518            manifest_path,
519            size_bytes: metadata.len() as u64,
520            checksum: 0, // SQLite doesn't provide checksum
521            record_count: entity_ids.len() as u64,
522            duration_secs: 0.0, // Not tracked for SQLite backup
523            timestamp,
524            checkpoint_performed: false, // VACUUM INTO doesn't require explicit checkpoint
525        })
526    }
527
528    fn snapshot_export(
529        &self,
530        export_dir: &std::path::Path,
531    ) -> Result<crate::backend::SnapshotMetadata, SqliteGraphError> {
532        use std::fs;
533
534        // Ensure export directory exists
535        fs::create_dir_all(export_dir).map_err(|e| {
536            SqliteGraphError::connection(format!("Failed to create export directory: {}", e))
537        })?;
538
539        let snapshot_file = export_dir.join("snapshot.json");
540
541        // Use existing dump_graph_to_path function
542        crate::recovery::dump_graph_to_path(&self.graph, &snapshot_file)?;
543
544        // Get metadata
545        let metadata = fs::metadata(&snapshot_file).map_err(|e| {
546            SqliteGraphError::connection(format!("Failed to read snapshot metadata: {}", e))
547        })?;
548
549        let entity_ids = self
550            .graph
551            .all_entity_ids()
552            .map_err(|e| SqliteGraphError::query(format!("Failed to get entity count: {}", e)))?;
553
554        Ok(crate::backend::SnapshotMetadata {
555            snapshot_path: snapshot_file,
556            size_bytes: metadata.len(),
557            entity_count: entity_ids.len() as u64,
558            edge_count: 0, // SQLite dump doesn't separate edge count easily
559        })
560    }
561
562    fn snapshot_import(
563        &self,
564        import_dir: &std::path::Path,
565    ) -> Result<crate::backend::ImportMetadata, SqliteGraphError> {
566        let snapshot_file = import_dir.join("snapshot.json");
567
568        if !snapshot_file.exists() {
569            return Err(SqliteGraphError::connection(format!(
570                "Snapshot file not found: {}",
571                snapshot_file.display()
572            )));
573        }
574
575        // Get entity count before import
576        let before_count = self
577            .graph
578            .all_entity_ids()
579            .map_err(|e| SqliteGraphError::query(format!("Failed to get entity count: {}", e)))?
580            .len();
581
582        // Use existing load_graph_from_path function
583        crate::recovery::load_graph_from_path(&self.graph, &snapshot_file)?;
584
585        // Get entity count after import
586        let after_count = self
587            .graph
588            .all_entity_ids()
589            .map_err(|e| SqliteGraphError::query(format!("Failed to get entity count: {}", e)))?
590            .len();
591
592        Ok(crate::backend::ImportMetadata {
593            snapshot_path: snapshot_file,
594            entities_imported: (after_count - before_count) as u64,
595            edges_imported: 0, // SQLite load doesn't separate edge count easily
596        })
597    }
598
599    fn kv_get(
600        &self,
601        snapshot_id: crate::snapshot::SnapshotId,
602        key: &[u8],
603    ) -> Result<Option<crate::backend::native::types::KvValue>, crate::SqliteGraphError> {
604        validate_snapshot_for_sqlite(snapshot_id)?;
605        use std::time::SystemTime;
606
607        // Initialize KV table if needed
608        self.ensure_kv_table()?;
609
610        // Convert key to string for storage (comma-separated bytes)
611        let key_str = bytes_to_string(key);
612
613        let conn = self.graph.connection();
614
615        // Query the kv_store table
616        let result = conn.query_row(
617            "SELECT value_json, ttl_seconds, created_at FROM kv_store WHERE key = ?1",
618            params![key_str],
619            |row| {
620                let value_json: String = row.get(0)?;
621                let ttl_seconds: Option<u64> = row.get(1)?;
622                let created_at: u64 = row.get(2)?;
623
624                Ok((value_json, ttl_seconds, created_at))
625            },
626        );
627
628        match result {
629            Ok((value_json, ttl_seconds, created_at)) => {
630                // Check TTL expiration
631                if let Some(ttl) = ttl_seconds {
632                    let now = SystemTime::now()
633                        .duration_since(SystemTime::UNIX_EPOCH)
634                        .map(|d| d.as_secs())
635                        .unwrap_or(0);
636
637                    if now.saturating_sub(created_at) > ttl {
638                        // Entry expired
639                        return Ok(None);
640                    }
641                }
642
643                // Parse JSON value back to KvValue
644                let json_value: serde_json::Value =
645                    serde_json::from_str(&value_json).map_err(|e| {
646                        SqliteGraphError::connection(format!(
647                            "Failed to parse KV value JSON: {}",
648                            e
649                        ))
650                    })?;
651
652                let kv_value = json_to_kv_value(json_value)?;
653                Ok(Some(kv_value))
654            }
655            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
656            Err(e) => Err(SqliteGraphError::query(format!(
657                "Failed to query KV store: {}",
658                e
659            ))),
660        }
661    }
662
663    fn kv_set(
664        &self,
665        key: Vec<u8>,
666        value: crate::backend::native::types::KvValue,
667        ttl_seconds: Option<u64>,
668    ) -> Result<(), crate::SqliteGraphError> {
669        use std::time::SystemTime;
670
671        // Initialize KV table if needed
672        self.ensure_kv_table()?;
673
674        // Convert key to string for storage
675        let key_str = bytes_to_string(&key);
676
677        // Serialize KvValue to JSON
678        let json_value = kv_value_to_json(&value);
679        let value_json = serde_json::to_string(&json_value).map_err(|e| {
680            SqliteGraphError::connection(format!("Failed to serialize KV value: {}", e))
681        })?;
682
683        let now = SystemTime::now()
684            .duration_since(SystemTime::UNIX_EPOCH)
685            .map(|d| d.as_secs())
686            .unwrap_or(0);
687
688        let conn = self.graph.connection();
689
690        // Check if key exists
691        let count: i64 = conn
692            .query_row(
693                "SELECT COUNT(*) FROM kv_store WHERE key = ?1",
694                params![key_str],
695                |row| row.get(0),
696            )
697            .unwrap_or(0);
698
699        if count > 0 {
700            // Update existing entry
701            conn.execute(
702                "UPDATE kv_store SET value_json = ?1, ttl_seconds = ?2, updated_at = ?3, version = version + 1 WHERE key = ?4",
703                params![value_json, ttl_seconds, now, key_str],
704            )
705                .map_err(|e| SqliteGraphError::query(format!("Failed to update KV entry: {}", e)))?;
706        } else {
707            // Insert new entry
708            conn.execute(
709                "INSERT INTO kv_store (key, value_json, ttl_seconds, created_at, updated_at, version) VALUES (?1, ?2, ?3, ?4, ?4, 1)",
710                params![key_str, value_json, ttl_seconds, now],
711            )
712                .map_err(|e| SqliteGraphError::query(format!("Failed to insert KV entry: {}", e)))?;
713        }
714
715        Ok(())
716    }
717
718    fn kv_delete(&self, key: &[u8]) -> Result<(), crate::SqliteGraphError> {
719        // Initialize KV table if needed
720        self.ensure_kv_table()?;
721
722        // Convert key to string for storage
723        let key_str = bytes_to_string(key);
724
725        let conn = self.graph.connection();
726
727        // Delete the entry (ignore if not found - idempotent)
728        conn.execute("DELETE FROM kv_store WHERE key = ?1", params![key_str])
729            .map_err(|e| SqliteGraphError::query(format!("Failed to delete KV entry: {}", e)))?;
730
731        Ok(())
732    }
733
734    fn subscribe(
735        &self,
736        filter: crate::backend::SubscriptionFilter,
737    ) -> Result<
738        (u64, std::sync::mpsc::Receiver<crate::backend::PubSubEvent>),
739        crate::SqliteGraphError,
740    > {
741        // Lazy initialize publisher
742        let mut pub_guard = self.publisher.write();
743        if pub_guard.is_none() {
744            *pub_guard = Some(Publisher::new());
745        }
746        let (id, rx) = pub_guard.as_ref().unwrap().subscribe(filter);
747        Ok((id, rx))
748    }
749
750    fn unsubscribe(&self, subscriber_id: u64) -> Result<bool, crate::SqliteGraphError> {
751        let pub_guard = self.publisher.read();
752        if let Some(ref publisher) = *pub_guard {
753            Ok(publisher.unsubscribe(subscriber_id))
754        } else {
755            Ok(false) // Publisher not initialized, nothing to unsubscribe
756        }
757    }
758
759    // ========== Pub/Sub Enhancement APIs (v1.4.0) ==========
760
761    fn kv_prefix_scan(
762        &self,
763        snapshot_id: crate::snapshot::SnapshotId,
764        prefix: &[u8],
765    ) -> Result<Vec<(Vec<u8>, crate::backend::native::types::KvValue)>, crate::SqliteGraphError>
766    {
767        validate_snapshot_for_sqlite(snapshot_id)?;
768        self.ensure_kv_table()?;
769        let conn = self.graph.connection();
770
771        // Convert prefix to string pattern for LIKE query
772        // Escape special LIKE characters: % and _
773        let prefix_str = String::from_utf8_lossy(prefix);
774        let pattern = prefix_str.replace('%', "\\%").replace('_', "\\_") + "%";
775
776        let mut stmt = conn
777            .prepare_cached("SELECT key, value_json FROM kv_store WHERE key LIKE ?1 ESCAPE '\\'")
778            .map_err(|e| SqliteGraphError::query(e.to_string()))?;
779
780        let mut results = Vec::new();
781        let query_result = stmt.query_map([&pattern], |row| {
782            let key: String = row.get(0)?;
783            let value_json: String = row.get(1)?;
784            Ok((key, value_json))
785        });
786
787        for row in query_result.map_err(|e| SqliteGraphError::query(e.to_string()))? {
788            let (key, value_json) = row.map_err(|e| SqliteGraphError::query(e.to_string()))?;
789            let json_value: serde_json::Value = serde_json::from_str(&value_json)
790                .map_err(|e| SqliteGraphError::query(format!("Failed to parse JSON: {}", e)))?;
791            let kv_value = json_to_kv_value(json_value)?;
792            results.push((key.into_bytes(), kv_value));
793        }
794
795        // Sort by key for deterministic output
796        results.sort_by(|a, b| a.0.cmp(&b.0));
797        Ok(results)
798    }
799
800    fn query_nodes_by_kind(
801        &self,
802        snapshot_id: crate::snapshot::SnapshotId,
803        kind: &str,
804    ) -> Result<Vec<i64>, crate::SqliteGraphError> {
805        validate_snapshot_for_sqlite(snapshot_id)?;
806        let conn = self.graph.connection();
807        let mut stmt = conn
808            .prepare_cached("SELECT id FROM graph_entities WHERE kind = ?1")
809            .map_err(|e| SqliteGraphError::query(e.to_string()))?;
810
811        let node_ids: Vec<i64> = stmt
812            .query_map([kind], |row| row.get(0))
813            .map_err(|e| SqliteGraphError::query(e.to_string()))?
814            .collect::<Result<_, _>>()
815            .map_err(|e| SqliteGraphError::query(e.to_string()))?;
816
817        Ok(node_ids)
818    }
819
820    fn query_nodes_by_name_pattern(
821        &self,
822        snapshot_id: crate::snapshot::SnapshotId,
823        pattern: &str,
824    ) -> Result<Vec<i64>, crate::SqliteGraphError> {
825        validate_snapshot_for_sqlite(snapshot_id)?;
826        let conn = self.graph.connection();
827        let mut stmt = conn
828            .prepare_cached("SELECT id FROM graph_entities WHERE name GLOB ?1")
829            .map_err(|e| SqliteGraphError::query(e.to_string()))?;
830
831        let node_ids: Vec<i64> = stmt
832            .query_map([pattern], |row| row.get(0))
833            .map_err(|e| SqliteGraphError::query(e.to_string()))?
834            .collect::<Result<_, _>>()
835            .map_err(|e| SqliteGraphError::query(e.to_string()))?;
836
837        Ok(node_ids)
838    }
839}
840
841/// Convert KvValue to serde_json::Value for serialization
842fn kv_value_to_json(value: &crate::backend::native::types::KvValue) -> serde_json::Value {
843    use crate::backend::native::types::KvValue;
844
845    match value {
846        KvValue::Null => serde_json::json!({"type": "null"}),
847        KvValue::Bytes(bytes) => {
848            serde_json::json!({
849                "type": "bytes",
850                "data": bytes_to_string(bytes),
851            })
852        }
853        KvValue::String(s) => {
854            serde_json::json!({
855                "type": "string",
856                "data": s,
857            })
858        }
859        KvValue::Integer(n) => {
860            serde_json::json!({
861                "type": "integer",
862                "data": n,
863            })
864        }
865        KvValue::Float(f) => {
866            serde_json::json!({
867                "type": "float",
868                "data": f,
869            })
870        }
871        KvValue::Boolean(b) => {
872            serde_json::json!({
873                "type": "boolean",
874                "data": b,
875            })
876        }
877        KvValue::Json(j) => {
878            serde_json::json!({
879                "type": "json",
880                "data": j,
881            })
882        }
883    }
884}
885
886/// Convert bytes to comma-separated string for storage
887fn bytes_to_string(bytes: &[u8]) -> String {
888    use std::fmt::Write;
889    let mut result = String::new();
890    for (i, byte) in bytes.iter().enumerate() {
891        if i > 0 {
892            result.push(',');
893        }
894        write!(result, "{}", byte).unwrap();
895    }
896    result
897}
898
899/// Convert comma-separated string back to bytes
900fn string_to_bytes(s: &str) -> Result<Vec<u8>, SqliteGraphError> {
901    s.split(',')
902        .map(|part| {
903            part.trim()
904                .parse::<u8>()
905                .map_err(|_| SqliteGraphError::connection(format!("Invalid byte string: {}", s)))
906        })
907        .collect()
908}
909
910/// Convert serde_json::Value back to KvValue after deserialization
911fn json_to_kv_value(
912    json_value: serde_json::Value,
913) -> Result<crate::backend::native::types::KvValue, SqliteGraphError> {
914    use crate::backend::native::types::KvValue;
915
916    let type_str = json_value
917        .get("type")
918        .and_then(|v| v.as_str())
919        .ok_or_else(|| {
920            SqliteGraphError::connection("Missing type field in KV value JSON".to_string())
921        })?;
922
923    let data = json_value.get("data").ok_or_else(|| {
924        SqliteGraphError::connection("Missing data field in KV value JSON".to_string())
925    })?;
926
927    match type_str {
928        "bytes" => {
929            let bytes_str = data.as_str().ok_or_else(|| {
930                SqliteGraphError::connection("Invalid bytes data in KV value".to_string())
931            })?;
932            let bytes = string_to_bytes(bytes_str)?;
933            Ok(KvValue::Bytes(bytes))
934        }
935        "string" => {
936            let s = data.as_str().ok_or_else(|| {
937                SqliteGraphError::connection("Invalid string data in KV value".to_string())
938            })?;
939            Ok(KvValue::String(s.to_string()))
940        }
941        "integer" => {
942            let n = data.as_i64().ok_or_else(|| {
943                SqliteGraphError::connection("Invalid integer data in KV value".to_string())
944            })?;
945            Ok(KvValue::Integer(n))
946        }
947        "float" => {
948            let f = data.as_f64().ok_or_else(|| {
949                SqliteGraphError::connection("Invalid float data in KV value".to_string())
950            })?;
951            Ok(KvValue::Float(f))
952        }
953        "boolean" => {
954            let b = data.as_bool().ok_or_else(|| {
955                SqliteGraphError::connection("Invalid boolean data in KV value".to_string())
956            })?;
957            Ok(KvValue::Boolean(b))
958        }
959        "json" => Ok(KvValue::Json(data.clone())),
960        _ => Err(SqliteGraphError::connection(format!(
961            "Unknown KV value type: {}",
962            type_str
963        ))),
964    }
965}