1use parking_lot::RwLock;
7use rusqlite::params;
8
9use crate::{
10 SqliteGraphError,
11 backend::sqlite::types::{BackendDirection, EdgeSpec, NeighborQuery, NodeSpec},
12 backend::{PubSubEvent, SubscriptionFilter},
13 bfs::{bfs_neighbors, shortest_path},
14 graph::{GraphEdge, GraphEntity, SqliteGraph},
15 multi_hop,
16 pattern::{self, PatternMatch, PatternQuery},
17};
18
19struct Publisher {
23 subscribers: RwLock<
24 Vec<(
25 u64,
26 std::sync::mpsc::Sender<PubSubEvent>,
27 SubscriptionFilter,
28 )>,
29 >,
30 next_id: std::sync::atomic::AtomicU64,
31}
32
33impl Publisher {
34 fn new() -> Self {
35 Self {
36 subscribers: RwLock::new(Vec::new()),
37 next_id: std::sync::atomic::AtomicU64::new(1),
38 }
39 }
40
41 fn subscribe(
42 &self,
43 filter: SubscriptionFilter,
44 ) -> (u64, std::sync::mpsc::Receiver<PubSubEvent>) {
45 let id = self
46 .next_id
47 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
48 let (tx, rx) = std::sync::mpsc::channel();
49
50 self.subscribers.write().push((id, tx, filter));
51 (id, rx)
52 }
53
54 fn unsubscribe(&self, subscriber_id: u64) -> bool {
55 let mut subs = self.subscribers.write();
56 if let Some(pos) = subs.iter().position(|(id, _, _)| *id == subscriber_id) {
57 subs.remove(pos);
58 true
59 } else {
60 false
61 }
62 }
63
64 fn emit(&self, event: PubSubEvent) {
65 let subs = self.subscribers.read();
66 for (_, sender, filter) in subs.iter() {
67 if Self::should_send(&event, filter) {
68 let _ = sender.send(event.clone());
70 }
71 }
72 }
73
74 fn should_send(event: &PubSubEvent, filter: &SubscriptionFilter) -> bool {
75 match event {
76 PubSubEvent::NodeChanged { .. } => filter.node_changes,
77 PubSubEvent::EdgeChanged { .. } => filter.edge_changes,
78 PubSubEvent::KvChanged { .. } => filter.kv_changes,
79 PubSubEvent::SnapshotCommitted { .. } => filter.snapshot_commits,
80 }
81 }
82}
83
84fn validate_snapshot_for_sqlite(
101 snapshot_id: crate::snapshot::SnapshotId,
102) -> Result<(), SqliteGraphError> {
103 if snapshot_id.as_lsn() == 0 {
104 return Ok(());
105 }
106 Err(SqliteGraphError::query(format!(
107 "SQLite backend does not support historical snapshots (requested: {}). \
108 Only SnapshotId::current() (which returns SnapshotId(0)) is supported. \
109 Historical snapshot isolation requires AS OF queries or MVCC which are not implemented.",
110 snapshot_id.as_lsn()
111 )))
112}
113
114pub struct SqliteGraphBackend {
119 graph: SqliteGraph,
120 publisher: RwLock<Option<Publisher>>,
122}
123
124impl SqliteGraphBackend {
125 pub fn in_memory() -> Result<Self, SqliteGraphError> {
127 Ok(Self {
128 graph: SqliteGraph::open_in_memory()?,
129 publisher: RwLock::new(None),
130 })
131 }
132
133 pub fn from_graph(graph: SqliteGraph) -> Self {
135 Self {
136 graph,
137 publisher: RwLock::new(None),
138 }
139 }
140
141 pub fn graph(&self) -> &SqliteGraph {
143 &self.graph
144 }
145
146 pub fn create_hnsw_storage(
163 &self,
164 _index_name: impl Into<String>,
165 ) -> Option<Box<dyn crate::hnsw::storage::VectorStorage>> {
166 None
170 }
171
172 pub fn entity_ids(&self) -> Result<Vec<i64>, SqliteGraphError> {
174 self.graph.all_entity_ids()
175 }
176
177 fn ensure_kv_table(&self) -> Result<(), SqliteGraphError> {
179 let conn = self.graph.connection();
180
181 conn.execute(
182 "CREATE TABLE IF NOT EXISTS kv_store (
183 key TEXT PRIMARY KEY,
184 value_json TEXT NOT NULL,
185 ttl_seconds INTEGER,
186 version INTEGER NOT NULL DEFAULT 1,
187 created_at INTEGER NOT NULL,
188 updated_at INTEGER NOT NULL
189 )",
190 [],
191 )
192 .map_err(|e| {
193 SqliteGraphError::connection(format!("Failed to create kv_store table: {}", e))
194 })?;
195
196 Ok(())
197 }
198
199 fn query_neighbors(
201 &self,
202 node: i64,
203 direction: BackendDirection,
204 edge_type: &Option<String>,
205 ) -> Result<Vec<i64>, SqliteGraphError> {
206 match (direction, edge_type) {
207 (BackendDirection::Outgoing, None) => self.graph.fetch_outgoing(node),
208 (BackendDirection::Incoming, None) => self.graph.fetch_incoming(node),
209 (BackendDirection::Outgoing, Some(edge_type)) => {
210 let conn = self.graph.connection();
211 let mut stmt = conn
212 .prepare_cached(
213 "SELECT to_id FROM graph_edges WHERE from_id=?1 AND edge_type=?2 ORDER BY to_id, id",
214 )
215 .map_err(|e| SqliteGraphError::query(e.to_string()))?;
216 let rows = stmt
217 .query_map(params![node, edge_type], |row| row.get(0))
218 .map_err(|e| SqliteGraphError::query(e.to_string()))?;
219 let mut values = Vec::new();
220 for value in rows {
221 values.push(value.map_err(|e| SqliteGraphError::query(e.to_string()))?);
222 }
223 Ok(values)
224 }
225 (BackendDirection::Incoming, Some(edge_type)) => {
226 let conn = self.graph.connection();
227 let mut stmt = conn
228 .prepare_cached(
229 "SELECT from_id FROM graph_edges WHERE to_id=?1 AND edge_type=?2 ORDER BY from_id, id",
230 )
231 .map_err(|e| SqliteGraphError::query(e.to_string()))?;
232 let rows = stmt
233 .query_map(params![node, edge_type], |row| row.get(0))
234 .map_err(|e| SqliteGraphError::query(e.to_string()))?;
235 let mut values = Vec::new();
236 for value in rows {
237 values.push(value.map_err(|e| SqliteGraphError::query(e.to_string()))?);
238 }
239 Ok(values)
240 }
241 }
242 }
243}
244
245impl crate::backend::GraphBackend for SqliteGraphBackend {
246 fn insert_node(&self, node: NodeSpec) -> Result<i64, SqliteGraphError> {
247 let id = self.graph.insert_entity(&GraphEntity {
248 id: 0,
249 kind: node.kind,
250 name: node.name,
251 file_path: node.file_path,
252 data: node.data,
253 })?;
254
255 let pub_guard = self.publisher.read();
257 if let Some(ref publisher) = *pub_guard {
258 publisher.emit(PubSubEvent::NodeChanged {
259 node_id: id,
260 snapshot_id: 0, });
262 }
263
264 Ok(id)
265 }
266
267 fn get_node(
268 &self,
269 snapshot_id: crate::snapshot::SnapshotId,
270 id: i64,
271 ) -> Result<GraphEntity, SqliteGraphError> {
272 validate_snapshot_for_sqlite(snapshot_id)?;
273 self.graph.get_entity(id)
274 }
275
276 fn insert_edge(&self, edge: EdgeSpec) -> Result<i64, SqliteGraphError> {
277 let id = self.graph.insert_edge(&GraphEdge {
278 id: 0,
279 from_id: edge.from,
280 to_id: edge.to,
281 edge_type: edge.edge_type,
282 data: edge.data,
283 })?;
284
285 let pub_guard = self.publisher.read();
287 if let Some(ref publisher) = *pub_guard {
288 publisher.emit(PubSubEvent::EdgeChanged {
289 from_node: edge.from,
290 to_node: edge.to,
291 edge_id: id,
292 snapshot_id: 0, });
294 }
295
296 Ok(id)
297 }
298
299 fn delete_entity(&self, id: i64) -> Result<(), SqliteGraphError> {
300 self.graph.delete_entity(id)
301 }
302
303 fn update_node(&self, node_id: i64, node: NodeSpec) -> Result<i64, SqliteGraphError> {
304 self.graph.update_entity(&GraphEntity {
306 id: node_id,
307 kind: node.kind,
308 name: node.name,
309 file_path: node.file_path,
310 data: node.data,
311 })?;
312 Ok(node_id)
313 }
314
315 fn entity_ids(&self) -> Result<Vec<i64>, SqliteGraphError> {
316 self.graph.all_entity_ids()
317 }
318
319 fn neighbors(
320 &self,
321 snapshot_id: crate::snapshot::SnapshotId,
322 node: i64,
323 query: NeighborQuery,
324 ) -> Result<Vec<i64>, SqliteGraphError> {
325 validate_snapshot_for_sqlite(snapshot_id)?;
326 self.query_neighbors(node, query.direction, &query.edge_type)
327 }
328
329 fn bfs(
330 &self,
331 snapshot_id: crate::snapshot::SnapshotId,
332 start: i64,
333 depth: u32,
334 ) -> Result<Vec<i64>, SqliteGraphError> {
335 validate_snapshot_for_sqlite(snapshot_id)?;
336 if let Some(cached_result) = self.graph.query_cache.get_bfs(start, depth) {
338 return Ok(cached_result);
339 }
340
341 let result = bfs_neighbors(&self.graph, start, depth)?;
343 self.graph.query_cache.put_bfs(start, depth, result.clone());
344 Ok(result)
345 }
346
347 fn shortest_path(
348 &self,
349 snapshot_id: crate::snapshot::SnapshotId,
350 start: i64,
351 end: i64,
352 ) -> Result<Option<Vec<i64>>, SqliteGraphError> {
353 validate_snapshot_for_sqlite(snapshot_id)?;
354 if let Some(cached_result) = self.graph.query_cache.get_shortest_path(start, end) {
356 return Ok(cached_result);
357 }
358
359 let result = shortest_path(&self.graph, start, end)?;
361 self.graph
362 .query_cache
363 .put_shortest_path(start, end, result.clone());
364 Ok(result)
365 }
366
367 fn node_degree(
368 &self,
369 snapshot_id: crate::snapshot::SnapshotId,
370 node: i64,
371 ) -> Result<(usize, usize), SqliteGraphError> {
372 validate_snapshot_for_sqlite(snapshot_id)?;
373 let out = self.graph.fetch_outgoing(node)?.len();
374 let incoming = self.graph.fetch_incoming(node)?.len();
375 Ok((out, incoming))
376 }
377
378 fn k_hop(
379 &self,
380 snapshot_id: crate::snapshot::SnapshotId,
381 start: i64,
382 depth: u32,
383 direction: BackendDirection,
384 ) -> Result<Vec<i64>, SqliteGraphError> {
385 validate_snapshot_for_sqlite(snapshot_id)?;
386 if let Some(cached_result) = self.graph.query_cache.get_k_hop(start, depth, direction) {
388 return Ok(cached_result);
389 }
390
391 let result = multi_hop::k_hop(&self.graph, start, depth, direction)?;
393 self.graph
394 .query_cache
395 .put_k_hop(start, depth, direction, result.clone());
396 Ok(result)
397 }
398
399 fn k_hop_filtered(
400 &self,
401 snapshot_id: crate::snapshot::SnapshotId,
402 start: i64,
403 depth: u32,
404 direction: BackendDirection,
405 allowed_edge_types: &[&str],
406 ) -> Result<Vec<i64>, SqliteGraphError> {
407 validate_snapshot_for_sqlite(snapshot_id)?;
408 if let Some(cached_result) =
410 self.graph
411 .query_cache
412 .get_k_hop_filtered(start, depth, direction, allowed_edge_types)
413 {
414 return Ok(cached_result);
415 }
416
417 let result =
419 multi_hop::k_hop_filtered(&self.graph, start, depth, direction, allowed_edge_types)?;
420 self.graph.query_cache.put_k_hop_filtered(
421 start,
422 depth,
423 direction,
424 allowed_edge_types,
425 result.clone(),
426 );
427 Ok(result)
428 }
429
430 fn chain_query(
431 &self,
432 snapshot_id: crate::snapshot::SnapshotId,
433 start: i64,
434 chain: &[crate::multi_hop::ChainStep],
435 ) -> Result<Vec<i64>, SqliteGraphError> {
436 validate_snapshot_for_sqlite(snapshot_id)?;
437 multi_hop::chain_query(&self.graph, start, chain)
438 }
439
440 fn pattern_search(
441 &self,
442 snapshot_id: crate::snapshot::SnapshotId,
443 start: i64,
444 pattern: &PatternQuery,
445 ) -> Result<Vec<PatternMatch>, SqliteGraphError> {
446 validate_snapshot_for_sqlite(snapshot_id)?;
447 pattern::execute_pattern(&self.graph, start, pattern)
448 }
449
450 fn checkpoint(&self) -> Result<(), SqliteGraphError> {
451 let conn = self.graph.connection();
453 conn.query_row("PRAGMA wal_checkpoint(TRUNCATE)", [], |row| {
454 let _busy: i32 = row.get(0)?;
457 let _log: i32 = row.get(1)?;
458 let _checkpointed: i32 = row.get(2)?;
459 Ok(())
460 })
461 .map_err(|e| SqliteGraphError::connection(format!("WAL checkpoint failed: {}", e)))?;
462 Ok(())
463 }
464
465 fn flush(&self) -> Result<(), SqliteGraphError> {
466 Ok(())
468 }
469
470 fn backup(
471 &self,
472 backup_dir: &std::path::Path,
473 ) -> Result<crate::backend::BackupResult, SqliteGraphError> {
474 use std::fs;
475
476 fs::create_dir_all(backup_dir).map_err(|e| {
478 SqliteGraphError::connection(format!("Failed to create backup directory: {}", e))
479 })?;
480
481 let timestamp = std::time::SystemTime::now()
483 .duration_since(std::time::UNIX_EPOCH)
484 .unwrap_or_default()
485 .as_secs();
486 let backup_path = backup_dir.join(format!("backup_{}.db", timestamp));
487 let manifest_path = backup_dir.join(format!("backup_{}.json", timestamp));
488
489 let conn = self.graph.connection();
491 conn.execute(&format!("VACUUM INTO '{}'", backup_path.display()), [])
492 .map_err(|e| SqliteGraphError::connection(format!("SQLite backup failed: {}", e)))?;
493
494 let metadata = fs::metadata(&backup_path).map_err(|e| {
496 SqliteGraphError::connection(format!("Failed to read backup metadata: {}", e))
497 })?;
498
499 let entity_ids = self
501 .graph
502 .all_entity_ids()
503 .map_err(|e| SqliteGraphError::query(format!("Failed to get entity count: {}", e)))?;
504
505 let manifest = serde_json::json!({
507 "timestamp": timestamp,
508 "backup_file": backup_path.display().to_string(),
509 "size_bytes": metadata.len(),
510 "entity_count": entity_ids.len(),
511 });
512 fs::write(&manifest_path, manifest.to_string()).map_err(|e| {
513 SqliteGraphError::connection(format!("Failed to write manifest: {}", e))
514 })?;
515
516 Ok(crate::backend::BackupResult {
517 snapshot_path: backup_path,
518 manifest_path,
519 size_bytes: metadata.len() as u64,
520 checksum: 0, record_count: entity_ids.len() as u64,
522 duration_secs: 0.0, timestamp,
524 checkpoint_performed: false, })
526 }
527
528 fn snapshot_export(
529 &self,
530 export_dir: &std::path::Path,
531 ) -> Result<crate::backend::SnapshotMetadata, SqliteGraphError> {
532 use std::fs;
533
534 fs::create_dir_all(export_dir).map_err(|e| {
536 SqliteGraphError::connection(format!("Failed to create export directory: {}", e))
537 })?;
538
539 let snapshot_file = export_dir.join("snapshot.json");
540
541 crate::recovery::dump_graph_to_path(&self.graph, &snapshot_file)?;
543
544 let metadata = fs::metadata(&snapshot_file).map_err(|e| {
546 SqliteGraphError::connection(format!("Failed to read snapshot metadata: {}", e))
547 })?;
548
549 let entity_ids = self
550 .graph
551 .all_entity_ids()
552 .map_err(|e| SqliteGraphError::query(format!("Failed to get entity count: {}", e)))?;
553
554 Ok(crate::backend::SnapshotMetadata {
555 snapshot_path: snapshot_file,
556 size_bytes: metadata.len(),
557 entity_count: entity_ids.len() as u64,
558 edge_count: 0, })
560 }
561
562 fn snapshot_import(
563 &self,
564 import_dir: &std::path::Path,
565 ) -> Result<crate::backend::ImportMetadata, SqliteGraphError> {
566 let snapshot_file = import_dir.join("snapshot.json");
567
568 if !snapshot_file.exists() {
569 return Err(SqliteGraphError::connection(format!(
570 "Snapshot file not found: {}",
571 snapshot_file.display()
572 )));
573 }
574
575 let before_count = self
577 .graph
578 .all_entity_ids()
579 .map_err(|e| SqliteGraphError::query(format!("Failed to get entity count: {}", e)))?
580 .len();
581
582 crate::recovery::load_graph_from_path(&self.graph, &snapshot_file)?;
584
585 let after_count = self
587 .graph
588 .all_entity_ids()
589 .map_err(|e| SqliteGraphError::query(format!("Failed to get entity count: {}", e)))?
590 .len();
591
592 Ok(crate::backend::ImportMetadata {
593 snapshot_path: snapshot_file,
594 entities_imported: (after_count - before_count) as u64,
595 edges_imported: 0, })
597 }
598
599 fn kv_get(
600 &self,
601 snapshot_id: crate::snapshot::SnapshotId,
602 key: &[u8],
603 ) -> Result<Option<crate::backend::native::types::KvValue>, crate::SqliteGraphError> {
604 validate_snapshot_for_sqlite(snapshot_id)?;
605 use std::time::SystemTime;
606
607 self.ensure_kv_table()?;
609
610 let key_str = bytes_to_string(key);
612
613 let conn = self.graph.connection();
614
615 let result = conn.query_row(
617 "SELECT value_json, ttl_seconds, created_at FROM kv_store WHERE key = ?1",
618 params![key_str],
619 |row| {
620 let value_json: String = row.get(0)?;
621 let ttl_seconds: Option<u64> = row.get(1)?;
622 let created_at: u64 = row.get(2)?;
623
624 Ok((value_json, ttl_seconds, created_at))
625 },
626 );
627
628 match result {
629 Ok((value_json, ttl_seconds, created_at)) => {
630 if let Some(ttl) = ttl_seconds {
632 let now = SystemTime::now()
633 .duration_since(SystemTime::UNIX_EPOCH)
634 .map(|d| d.as_secs())
635 .unwrap_or(0);
636
637 if now.saturating_sub(created_at) > ttl {
638 return Ok(None);
640 }
641 }
642
643 let json_value: serde_json::Value =
645 serde_json::from_str(&value_json).map_err(|e| {
646 SqliteGraphError::connection(format!(
647 "Failed to parse KV value JSON: {}",
648 e
649 ))
650 })?;
651
652 let kv_value = json_to_kv_value(json_value)?;
653 Ok(Some(kv_value))
654 }
655 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
656 Err(e) => Err(SqliteGraphError::query(format!(
657 "Failed to query KV store: {}",
658 e
659 ))),
660 }
661 }
662
663 fn kv_set(
664 &self,
665 key: Vec<u8>,
666 value: crate::backend::native::types::KvValue,
667 ttl_seconds: Option<u64>,
668 ) -> Result<(), crate::SqliteGraphError> {
669 use std::time::SystemTime;
670
671 self.ensure_kv_table()?;
673
674 let key_str = bytes_to_string(&key);
676
677 let json_value = kv_value_to_json(&value);
679 let value_json = serde_json::to_string(&json_value).map_err(|e| {
680 SqliteGraphError::connection(format!("Failed to serialize KV value: {}", e))
681 })?;
682
683 let now = SystemTime::now()
684 .duration_since(SystemTime::UNIX_EPOCH)
685 .map(|d| d.as_secs())
686 .unwrap_or(0);
687
688 let conn = self.graph.connection();
689
690 let count: i64 = conn
692 .query_row(
693 "SELECT COUNT(*) FROM kv_store WHERE key = ?1",
694 params![key_str],
695 |row| row.get(0),
696 )
697 .unwrap_or(0);
698
699 if count > 0 {
700 conn.execute(
702 "UPDATE kv_store SET value_json = ?1, ttl_seconds = ?2, updated_at = ?3, version = version + 1 WHERE key = ?4",
703 params![value_json, ttl_seconds, now, key_str],
704 )
705 .map_err(|e| SqliteGraphError::query(format!("Failed to update KV entry: {}", e)))?;
706 } else {
707 conn.execute(
709 "INSERT INTO kv_store (key, value_json, ttl_seconds, created_at, updated_at, version) VALUES (?1, ?2, ?3, ?4, ?4, 1)",
710 params![key_str, value_json, ttl_seconds, now],
711 )
712 .map_err(|e| SqliteGraphError::query(format!("Failed to insert KV entry: {}", e)))?;
713 }
714
715 Ok(())
716 }
717
718 fn kv_delete(&self, key: &[u8]) -> Result<(), crate::SqliteGraphError> {
719 self.ensure_kv_table()?;
721
722 let key_str = bytes_to_string(key);
724
725 let conn = self.graph.connection();
726
727 conn.execute("DELETE FROM kv_store WHERE key = ?1", params![key_str])
729 .map_err(|e| SqliteGraphError::query(format!("Failed to delete KV entry: {}", e)))?;
730
731 Ok(())
732 }
733
734 fn subscribe(
735 &self,
736 filter: crate::backend::SubscriptionFilter,
737 ) -> Result<
738 (u64, std::sync::mpsc::Receiver<crate::backend::PubSubEvent>),
739 crate::SqliteGraphError,
740 > {
741 let mut pub_guard = self.publisher.write();
743 if pub_guard.is_none() {
744 *pub_guard = Some(Publisher::new());
745 }
746 let (id, rx) = pub_guard.as_ref().unwrap().subscribe(filter);
747 Ok((id, rx))
748 }
749
750 fn unsubscribe(&self, subscriber_id: u64) -> Result<bool, crate::SqliteGraphError> {
751 let pub_guard = self.publisher.read();
752 if let Some(ref publisher) = *pub_guard {
753 Ok(publisher.unsubscribe(subscriber_id))
754 } else {
755 Ok(false) }
757 }
758
759 fn kv_prefix_scan(
762 &self,
763 snapshot_id: crate::snapshot::SnapshotId,
764 prefix: &[u8],
765 ) -> Result<Vec<(Vec<u8>, crate::backend::native::types::KvValue)>, crate::SqliteGraphError>
766 {
767 validate_snapshot_for_sqlite(snapshot_id)?;
768 self.ensure_kv_table()?;
769 let conn = self.graph.connection();
770
771 let prefix_str = String::from_utf8_lossy(prefix);
774 let pattern = prefix_str.replace('%', "\\%").replace('_', "\\_") + "%";
775
776 let mut stmt = conn
777 .prepare_cached("SELECT key, value_json FROM kv_store WHERE key LIKE ?1 ESCAPE '\\'")
778 .map_err(|e| SqliteGraphError::query(e.to_string()))?;
779
780 let mut results = Vec::new();
781 let query_result = stmt.query_map([&pattern], |row| {
782 let key: String = row.get(0)?;
783 let value_json: String = row.get(1)?;
784 Ok((key, value_json))
785 });
786
787 for row in query_result.map_err(|e| SqliteGraphError::query(e.to_string()))? {
788 let (key, value_json) = row.map_err(|e| SqliteGraphError::query(e.to_string()))?;
789 let json_value: serde_json::Value = serde_json::from_str(&value_json)
790 .map_err(|e| SqliteGraphError::query(format!("Failed to parse JSON: {}", e)))?;
791 let kv_value = json_to_kv_value(json_value)?;
792 results.push((key.into_bytes(), kv_value));
793 }
794
795 results.sort_by(|a, b| a.0.cmp(&b.0));
797 Ok(results)
798 }
799
800 fn query_nodes_by_kind(
801 &self,
802 snapshot_id: crate::snapshot::SnapshotId,
803 kind: &str,
804 ) -> Result<Vec<i64>, crate::SqliteGraphError> {
805 validate_snapshot_for_sqlite(snapshot_id)?;
806 let conn = self.graph.connection();
807 let mut stmt = conn
808 .prepare_cached("SELECT id FROM graph_entities WHERE kind = ?1")
809 .map_err(|e| SqliteGraphError::query(e.to_string()))?;
810
811 let node_ids: Vec<i64> = stmt
812 .query_map([kind], |row| row.get(0))
813 .map_err(|e| SqliteGraphError::query(e.to_string()))?
814 .collect::<Result<_, _>>()
815 .map_err(|e| SqliteGraphError::query(e.to_string()))?;
816
817 Ok(node_ids)
818 }
819
820 fn query_nodes_by_name_pattern(
821 &self,
822 snapshot_id: crate::snapshot::SnapshotId,
823 pattern: &str,
824 ) -> Result<Vec<i64>, crate::SqliteGraphError> {
825 validate_snapshot_for_sqlite(snapshot_id)?;
826 let conn = self.graph.connection();
827 let mut stmt = conn
828 .prepare_cached("SELECT id FROM graph_entities WHERE name GLOB ?1")
829 .map_err(|e| SqliteGraphError::query(e.to_string()))?;
830
831 let node_ids: Vec<i64> = stmt
832 .query_map([pattern], |row| row.get(0))
833 .map_err(|e| SqliteGraphError::query(e.to_string()))?
834 .collect::<Result<_, _>>()
835 .map_err(|e| SqliteGraphError::query(e.to_string()))?;
836
837 Ok(node_ids)
838 }
839}
840
841fn kv_value_to_json(value: &crate::backend::native::types::KvValue) -> serde_json::Value {
843 use crate::backend::native::types::KvValue;
844
845 match value {
846 KvValue::Null => serde_json::json!({"type": "null"}),
847 KvValue::Bytes(bytes) => {
848 serde_json::json!({
849 "type": "bytes",
850 "data": bytes_to_string(bytes),
851 })
852 }
853 KvValue::String(s) => {
854 serde_json::json!({
855 "type": "string",
856 "data": s,
857 })
858 }
859 KvValue::Integer(n) => {
860 serde_json::json!({
861 "type": "integer",
862 "data": n,
863 })
864 }
865 KvValue::Float(f) => {
866 serde_json::json!({
867 "type": "float",
868 "data": f,
869 })
870 }
871 KvValue::Boolean(b) => {
872 serde_json::json!({
873 "type": "boolean",
874 "data": b,
875 })
876 }
877 KvValue::Json(j) => {
878 serde_json::json!({
879 "type": "json",
880 "data": j,
881 })
882 }
883 }
884}
885
886fn bytes_to_string(bytes: &[u8]) -> String {
888 use std::fmt::Write;
889 let mut result = String::new();
890 for (i, byte) in bytes.iter().enumerate() {
891 if i > 0 {
892 result.push(',');
893 }
894 write!(result, "{}", byte).unwrap();
895 }
896 result
897}
898
899fn string_to_bytes(s: &str) -> Result<Vec<u8>, SqliteGraphError> {
901 s.split(',')
902 .map(|part| {
903 part.trim()
904 .parse::<u8>()
905 .map_err(|_| SqliteGraphError::connection(format!("Invalid byte string: {}", s)))
906 })
907 .collect()
908}
909
910fn json_to_kv_value(
912 json_value: serde_json::Value,
913) -> Result<crate::backend::native::types::KvValue, SqliteGraphError> {
914 use crate::backend::native::types::KvValue;
915
916 let type_str = json_value
917 .get("type")
918 .and_then(|v| v.as_str())
919 .ok_or_else(|| {
920 SqliteGraphError::connection("Missing type field in KV value JSON".to_string())
921 })?;
922
923 let data = json_value.get("data").ok_or_else(|| {
924 SqliteGraphError::connection("Missing data field in KV value JSON".to_string())
925 })?;
926
927 match type_str {
928 "bytes" => {
929 let bytes_str = data.as_str().ok_or_else(|| {
930 SqliteGraphError::connection("Invalid bytes data in KV value".to_string())
931 })?;
932 let bytes = string_to_bytes(bytes_str)?;
933 Ok(KvValue::Bytes(bytes))
934 }
935 "string" => {
936 let s = data.as_str().ok_or_else(|| {
937 SqliteGraphError::connection("Invalid string data in KV value".to_string())
938 })?;
939 Ok(KvValue::String(s.to_string()))
940 }
941 "integer" => {
942 let n = data.as_i64().ok_or_else(|| {
943 SqliteGraphError::connection("Invalid integer data in KV value".to_string())
944 })?;
945 Ok(KvValue::Integer(n))
946 }
947 "float" => {
948 let f = data.as_f64().ok_or_else(|| {
949 SqliteGraphError::connection("Invalid float data in KV value".to_string())
950 })?;
951 Ok(KvValue::Float(f))
952 }
953 "boolean" => {
954 let b = data.as_bool().ok_or_else(|| {
955 SqliteGraphError::connection("Invalid boolean data in KV value".to_string())
956 })?;
957 Ok(KvValue::Boolean(b))
958 }
959 "json" => Ok(KvValue::Json(data.clone())),
960 _ => Err(SqliteGraphError::connection(format!(
961 "Unknown KV value type: {}",
962 type_str
963 ))),
964 }
965}