1use super::adjacency::{AdjacencyHelpers, Direction};
4use super::edge_store::EdgeStore;
5use super::graph_file::GraphFile;
6use super::graph_ops::*;
7use super::graph_validation::*;
8use super::node_store::NodeStore;
9use super::types::*;
10use crate::SqliteGraphError;
11use crate::backend::{
12 BackendDirection, ChainStep, EdgeSpec, GraphBackend, NeighborQuery, NodeSpec, PatternMatch,
13 PatternQuery,
14};
15use crate::graph::GraphEntity;
16use parking_lot::RwLock;
17use std::sync::Arc;
18
19#[cfg(feature = "native-v2")]
21use crate::backend::native::v2::wal::{
22 GraphWALIntegrationConfig, V2GraphWALIntegrator, V2WALConfig,
23};
24
25#[cfg(feature = "native-v2")]
26use crate::backend::native::v2::kv_store::store::KvStore;
27
28#[cfg(feature = "native-v2")]
29use crate::backend::native::v2::kv_store::types::KvValue;
30
31pub struct NativeGraphBackend {
33 graph_file: RwLock<GraphFile>,
34 #[cfg(feature = "native-v2")]
37 wal_integrator: Option<Arc<V2GraphWALIntegrator>>,
38 #[cfg(feature = "native-v2")]
40 kv_store: Arc<RwLock<KvStore>>,
41}
42
43impl NativeGraphBackend {
44 #[cfg(test)]
46 pub fn new_temp() -> Result<Self, SqliteGraphError> {
47 use tempfile::NamedTempFile;
48 let temp_file =
49 NamedTempFile::new().map_err(|e| SqliteGraphError::connection(e.to_string()))?;
50 let path = temp_file.path();
51 let graph_file = GraphFile::create(path).map_err(map_to_graph_error)?;
52
53 #[cfg(feature = "native-v2")]
54 let wal_integrator = Some(Self::create_wal_integrator(path)?);
55
56 Ok(Self {
57 graph_file: RwLock::new(graph_file),
58 #[cfg(feature = "native-v2")]
59 wal_integrator,
60 #[cfg(feature = "native-v2")]
61 kv_store: Arc::new(RwLock::new(KvStore::new())),
62 })
63 }
64
65 pub fn new<P: AsRef<std::path::Path>>(path: P) -> Result<Self, SqliteGraphError> {
67 let graph_file = GraphFile::create(&path).map_err(map_to_graph_error)?;
68
69 #[cfg(feature = "native-v2")]
70 let wal_integrator = Some(Self::create_wal_integrator(&path)?);
71
72 Ok(Self {
73 graph_file: RwLock::new(graph_file),
74 #[cfg(feature = "native-v2")]
75 wal_integrator,
76 #[cfg(feature = "native-v2")]
77 kv_store: Arc::new(RwLock::new(KvStore::new())),
78 })
79 }
80
81 pub fn open<P: AsRef<std::path::Path>>(path: P) -> Result<Self, SqliteGraphError> {
83 let graph_file = GraphFile::open(&path).map_err(map_to_graph_error)?;
84
85 #[cfg(feature = "native-v2")]
86 let wal_integrator = Some(Self::open_wal_integrator(&path)?);
87
88 #[cfg(feature = "native-v2")]
89 let kv_store = {
90 let wal_config = V2WALConfig::for_graph_file(path.as_ref());
91 match crate::backend::native::v2::kv_store::recover_kv_from_wal(&wal_config.wal_path) {
92 Ok(store) => Arc::new(RwLock::new(store)),
93 Err(e) => {
94 eprintln!(
96 "Warning: KV recovery from {} failed, starting with empty store: {}",
97 wal_config.wal_path.display(),
98 e
99 );
100 Arc::new(RwLock::new(KvStore::new()))
101 }
102 }
103 };
104
105 Ok(Self {
106 graph_file: RwLock::new(graph_file),
107 #[cfg(feature = "native-v2")]
108 wal_integrator,
109 #[cfg(feature = "native-v2")]
110 kv_store,
111 })
112 }
113
114 #[cfg(feature = "native-v2")]
116 fn open_wal_integrator<P: AsRef<std::path::Path>>(
117 path: P,
118 ) -> Result<Arc<V2GraphWALIntegrator>, SqliteGraphError> {
119 let path_ref = path.as_ref();
120
121 let wal_config = V2WALConfig::for_graph_file(path_ref);
123
124 let integration_config = GraphWALIntegrationConfig::default();
126
127 let integrator =
129 V2GraphWALIntegrator::open(wal_config, integration_config).map_err(|e| {
130 SqliteGraphError::connection(format!("Failed to open WAL integrator: {:?}", e))
131 })?;
132
133 Ok(Arc::new(integrator))
134 }
135
136 #[cfg(feature = "native-v2")]
138 fn create_wal_integrator<P: AsRef<std::path::Path>>(
139 path: P,
140 ) -> Result<Arc<V2GraphWALIntegrator>, SqliteGraphError> {
141 let path_ref = path.as_ref();
142
143 let wal_config = V2WALConfig::for_graph_file(path_ref);
145
146 let integration_config = GraphWALIntegrationConfig::default();
148
149 let integrator =
151 V2GraphWALIntegrator::create(wal_config, integration_config).map_err(|e| {
152 SqliteGraphError::connection(format!("Failed to create WAL integrator: {:?}", e))
153 })?;
154
155 Ok(Arc::new(integrator))
156 }
157
158 fn with_graph_file<R, F>(&self, f: F) -> Result<R, SqliteGraphError>
160 where
161 F: FnOnce(&mut GraphFile) -> Result<R, NativeBackendError>,
162 {
163 let mut graph_file = self.graph_file.write();
164 f(&mut *graph_file).map_err(map_to_graph_error)
165 }
166
167 #[cfg(feature = "native-v2")]
169 pub fn get_wal_metrics(&self) -> Option<crate::backend::native::v2::wal::WALManagerMetrics> {
170 self.wal_integrator
171 .as_ref()
172 .map(|integrator| integrator.get_metrics())
173 }
174
175 #[cfg(feature = "native-v2")]
177 pub fn get_active_transaction_count(&self) -> Option<usize> {
178 self.wal_integrator
179 .as_ref()
180 .map(|integrator| integrator.get_active_transaction_count())
181 }
182}
183
184#[cfg(feature = "native-v2")]
186impl Drop for NativeGraphBackend {
187 fn drop(&mut self) {
188 if let Some(ref integrator) = self.wal_integrator {
191 if let Err(e) = integrator.soft_shutdown() {
194 eprintln!(
195 "Warning: Failed to soft shutdown WAL integrator: {:?}",
196 e
197 );
198 }
199 }
200 }
201}
202
203impl GraphBackend for NativeGraphBackend {
204 fn insert_node(&self, node: NodeSpec) -> Result<i64, SqliteGraphError> {
205 self.with_graph_file(|graph_file| {
206 let mut node_store = NodeStore::new(graph_file);
207 let node_id = node_store.allocate_node_id()?;
208
209 let record_v2 = crate::backend::native::v2::node_record_v2::NodeRecordV2::new(
211 node_id, node.kind, node.name, node.data,
212 );
213 node_store.write_node_v2(&record_v2)?;
214
215 Ok(node_id as i64)
216 })
217 }
218
219 fn get_node(
220 &self,
221 snapshot_id: crate::snapshot::SnapshotId,
222 id: i64,
223 ) -> Result<GraphEntity, SqliteGraphError> {
224 self.with_graph_file(|graph_file| {
225 let _snapshot_id = snapshot_id; let mut node_store = NodeStore::new(graph_file);
228 let record = node_store.read_node(id as NativeNodeId)?;
229 Ok(node_record_to_entity(record))
230 })
231 }
232
233 fn insert_edge(&self, edge: EdgeSpec) -> Result<i64, SqliteGraphError> {
234 self.with_graph_file(|graph_file| {
235 #[cfg(feature = "v2_experimental")]
238 {
239 let mut edge_store = EdgeStore::new(graph_file);
240 let edge_id = edge_store.allocate_edge_id();
241 let record = edge_spec_to_record(edge, edge_id);
242
243 edge_store.write_edge(&record)?;
245 Ok(edge_id as i64)
246 }
247 #[cfg(not(feature = "v2_experimental"))]
248 {
249 let mut edge_store = EdgeStore::new(graph_file);
250 let edge_id = edge_store.allocate_edge_id();
251 let record = edge_spec_to_record(edge, edge_id);
252 edge_store.write_edge(&record)?;
253 Ok(edge_id as i64)
254 }
255 })
256 }
257
258 fn update_node(&self, node_id: i64, node: NodeSpec) -> Result<i64, SqliteGraphError> {
259 #[cfg(feature = "native-v2")]
260 {
261 if let Some(ref integrator) = self.wal_integrator {
262 let old_record = self.with_graph_file(|graph_file| {
264 let mut node_store = NodeStore::new(graph_file);
265 node_store.read_node_v2(node_id as NativeNodeId)
266 })?;
267
268 let new_record =
270 node_spec_to_v2_record(node, node_id as NativeNodeId, &old_record)?;
271
272 let _result = integrator
274 .update_node(None, node_id, &old_record, &new_record)
275 .map_err(|e| {
276 SqliteGraphError::connection(format!("Node update failed: {:?}", e))
277 })?;
278
279 return Ok(node_id);
280 }
281 }
282
283 Err(SqliteGraphError::connection(
285 "update_node not supported for this backend configuration".to_string(),
286 ))
287 }
288
289 fn delete_entity(&self, id: i64) -> Result<(), SqliteGraphError> {
290 self.with_graph_file(|graph_file| {
291 let node_id = id as NativeNodeId;
292 let mut node_store = NodeStore::new(graph_file);
293 node_store.delete_node(node_id)?;
294 Ok(())
295 })
296 }
297
298 fn entity_ids(&self) -> Result<Vec<i64>, SqliteGraphError> {
299 self.with_graph_file(|graph_file| {
300 let mut node_store = NodeStore::new(graph_file);
301 let ids = node_store.all_node_ids()?;
302 Ok(ids.into_iter().map(|id| id as i64).collect())
303 })
304 }
305
306 fn neighbors(
307 &self,
308 snapshot_id: crate::snapshot::SnapshotId,
309 node: i64,
310 query: NeighborQuery,
311 ) -> Result<Vec<i64>, SqliteGraphError> {
312 self.with_graph_file(|graph_file| {
313 let node_id = node as NativeNodeId;
314
315 let neighbors = if let Some(edge_type) = &query.edge_type {
317 let edge_type_ref = edge_type.as_str();
318 match query.direction {
319 BackendDirection::Outgoing => {
320 AdjacencyHelpers::get_outgoing_neighbors_filtered(
321 graph_file,
322 node_id,
323 &[edge_type_ref],
324 )
325 }
326 BackendDirection::Incoming => {
327 AdjacencyHelpers::get_incoming_neighbors_filtered(
328 graph_file,
329 node_id,
330 &[edge_type_ref],
331 )
332 }
333 }
334 } else {
335 match query.direction {
338 BackendDirection::Outgoing => {
339 AdjacencyHelpers::get_outgoing_neighbors_at_snapshot(
340 graph_file,
341 node_id,
342 snapshot_id,
343 None, )
345 }
346 BackendDirection::Incoming => {
347 AdjacencyHelpers::get_incoming_neighbors_at_snapshot(
348 graph_file,
349 node_id,
350 snapshot_id,
351 None, )
353 }
354 }
355 }?;
356
357 Ok(neighbors.into_iter().map(|id| id as i64).collect())
358 })
359 }
360
361 fn bfs(
362 &self,
363 snapshot_id: crate::snapshot::SnapshotId,
364 start: i64,
365 depth: u32,
366 ) -> Result<Vec<i64>, SqliteGraphError> {
367 self.with_graph_file(|graph_file| {
368 let _snapshot_id = snapshot_id; let result = native_bfs(graph_file, start as NativeNodeId, depth)?;
371 Ok(result.into_iter().map(|id| id as i64).collect())
372 })
373 }
374
375 fn shortest_path(
376 &self,
377 snapshot_id: crate::snapshot::SnapshotId,
378 start: i64,
379 end: i64,
380 ) -> Result<Option<Vec<i64>>, SqliteGraphError> {
381 self.with_graph_file(|graph_file| {
382 let _snapshot_id = snapshot_id; let result =
385 native_shortest_path(graph_file, start as NativeNodeId, end as NativeNodeId)?;
386 Ok(result.map(|path| path.into_iter().map(|id| id as i64).collect()))
387 })
388 }
389
390 fn node_degree(
391 &self,
392 snapshot_id: crate::snapshot::SnapshotId,
393 node: i64,
394 ) -> Result<(usize, usize), SqliteGraphError> {
395 self.with_graph_file(|graph_file| {
396 let _snapshot_id = snapshot_id; let node_id = node as NativeNodeId;
399 let outgoing = AdjacencyHelpers::outgoing_degree(graph_file, node_id)?;
400 let incoming = AdjacencyHelpers::incoming_degree(graph_file, node_id)?;
401 Ok((outgoing as usize, incoming as usize))
402 })
403 }
404
405 fn k_hop(
406 &self,
407 snapshot_id: crate::snapshot::SnapshotId,
408 start: i64,
409 depth: u32,
410 direction: BackendDirection,
411 ) -> Result<Vec<i64>, SqliteGraphError> {
412 self.with_graph_file(|graph_file| {
413 let _snapshot_id = snapshot_id; let result = native_k_hop(
416 graph_file,
417 start as NativeNodeId,
418 depth,
419 match direction {
420 BackendDirection::Outgoing => Direction::Outgoing,
421 BackendDirection::Incoming => Direction::Incoming,
422 },
423 )?;
424 Ok(result.into_iter().map(|id| id as i64).collect())
425 })
426 }
427
428 fn k_hop_filtered(
429 &self,
430 snapshot_id: crate::snapshot::SnapshotId,
431 start: i64,
432 depth: u32,
433 direction: BackendDirection,
434 allowed_edge_types: &[&str],
435 ) -> Result<Vec<i64>, SqliteGraphError> {
436 self.with_graph_file(|graph_file| {
437 let _snapshot_id = snapshot_id; let result = native_k_hop_filtered(
440 graph_file,
441 start as NativeNodeId,
442 depth,
443 match direction {
444 BackendDirection::Outgoing => Direction::Outgoing,
445 BackendDirection::Incoming => Direction::Incoming,
446 },
447 allowed_edge_types,
448 )?;
449 Ok(result.into_iter().map(|id| id as i64).collect())
450 })
451 }
452
453 fn chain_query(
454 &self,
455 snapshot_id: crate::snapshot::SnapshotId,
456 start: i64,
457 chain: &[ChainStep],
458 ) -> Result<Vec<i64>, SqliteGraphError> {
459 self.with_graph_file(|graph_file| {
460 let _snapshot_id = snapshot_id; let result = native_chain_query(graph_file, start as NativeNodeId, chain)?;
463 Ok(result.into_iter().map(|id| id as i64).collect())
464 })
465 }
466
467 fn pattern_search(
468 &self,
469 snapshot_id: crate::snapshot::SnapshotId,
470 start: i64,
471 pattern: &PatternQuery,
472 ) -> Result<Vec<PatternMatch>, SqliteGraphError> {
473 self.with_graph_file(|graph_file| {
474 let _snapshot_id = snapshot_id; native_pattern_search(graph_file, start as NativeNodeId, pattern)
477 })
478 }
479
480 fn checkpoint(&self) -> Result<(), SqliteGraphError> {
481 #[cfg(feature = "native-v2")]
482 {
483 if let Some(ref integrator) = self.wal_integrator {
484 integrator.force_checkpoint().map_err(|e| {
485 SqliteGraphError::connection(format!("WAL checkpoint failed: {:?}", e))
486 })?;
487 return Ok(());
488 }
489 }
490
491 Ok(())
493 }
494
495 fn flush(&self) -> Result<(), SqliteGraphError> {
496 #[cfg(feature = "native-v2")]
497 {
498 if let Some(ref integrator) = self.wal_integrator {
499 integrator.wal_manager().flush().map_err(|e| {
500 SqliteGraphError::connection(format!("WAL flush failed: {:?}", e))
501 })?;
502 return Ok(());
503 }
504 }
505
506 Ok(())
508 }
509
510 fn snapshot_export(
511 &self,
512 export_dir: &std::path::Path,
513 ) -> Result<crate::backend::SnapshotMetadata, SqliteGraphError> {
514 use crate::backend::native::v2::export::SnapshotExporter;
515 use crate::backend::native::v2::export::snapshot::SnapshotExportConfig;
516 use std::time::{SystemTime, UNIX_EPOCH};
517
518 let graph_path = self.with_graph_file(|graph_file| Ok(graph_file.path().to_path_buf()))?;
520
521 let snapshot_id = format!(
523 "snapshot_{}",
524 SystemTime::now()
525 .duration_since(UNIX_EPOCH)
526 .unwrap_or_default()
527 .as_secs()
528 );
529
530 let config = SnapshotExportConfig {
531 export_path: export_dir.to_path_buf(),
532 snapshot_id: snapshot_id.clone(),
533 include_statistics: true,
534 min_stable_duration: std::time::Duration::from_secs(0),
535 checksum_validation: true,
536 };
537
538 let mut exporter = SnapshotExporter::new(&graph_path, config).map_err(|e| {
539 SqliteGraphError::connection(format!("Failed to create snapshot exporter: {:?}", e))
540 })?;
541
542 let result = exporter.export_snapshot().map_err(|e| {
543 SqliteGraphError::connection(format!("Snapshot export failed: {:?}", e))
544 })?;
545
546 Ok(crate::backend::SnapshotMetadata {
547 snapshot_path: result.snapshot_path,
548 size_bytes: result.snapshot_size_bytes,
549 entity_count: 0, edge_count: 0,
551 })
552 }
553
554 fn backup(
555 &self,
556 backup_dir: &std::path::Path,
557 ) -> Result<crate::backend::BackupResult, SqliteGraphError> {
558 #[cfg(feature = "native-v2")]
559 {
560 use crate::backend::native::v2::backup;
561
562 let graph_path =
564 self.with_graph_file(|graph_file| Ok(graph_file.path().to_path_buf()))?;
565
566 let native_result =
568 backup::create_backup(&graph_path, backup::BackupConfig::new(backup_dir))
569 .map_err(|e| SqliteGraphError::connection(format!("Backup failed: {:?}", e)))?;
570
571 Ok(crate::backend::BackupResult {
572 snapshot_path: native_result.snapshot_path,
573 manifest_path: native_result.manifest_path,
574 size_bytes: native_result.size_bytes,
575 checksum: native_result.checksum,
576 record_count: native_result.record_count,
577 duration_secs: native_result.duration_secs,
578 timestamp: native_result.timestamp,
579 checkpoint_performed: native_result.checkpoint_performed,
580 })
581 }
582
583 #[cfg(not(feature = "native-v2"))]
584 {
585 let _ = backup_dir;
586 Err(SqliteGraphError::connection(
587 "Backup not available without native-v2 feature".to_string(),
588 ))
589 }
590 }
591
592 fn snapshot_import(
593 &self,
594 import_dir: &std::path::Path,
595 ) -> Result<crate::backend::ImportMetadata, SqliteGraphError> {
596 use crate::backend::native::v2::import::ImportMode;
597 use crate::backend::native::v2::import::SnapshotImporter;
598 use crate::backend::native::v2::import::snapshot::SnapshotImportConfig;
599
600 let graph_path = self.with_graph_file(|graph_file| Ok(graph_file.path().to_path_buf()))?;
602
603 let config = SnapshotImportConfig {
604 target_graph_path: graph_path.clone(),
605 export_dir_path: import_dir.to_path_buf(),
606 import_mode: ImportMode::Fresh,
607 validate_manifest: true,
608 verify_checksum: true,
609 overwrite_existing: true, };
611
612 let importer =
613 SnapshotImporter::from_export_dir(import_dir, &graph_path, config).map_err(|e| {
614 SqliteGraphError::connection(format!("Failed to create snapshot importer: {:?}", e))
615 })?;
616
617 let result = importer.import().map_err(|e| {
618 SqliteGraphError::connection(format!("Snapshot import failed: {:?}", e))
619 })?;
620
621 Ok(crate::backend::ImportMetadata {
622 snapshot_path: import_dir.join("snapshot"), entities_imported: result.records_imported,
624 edges_imported: 0, })
626 }
627
628 #[cfg(feature = "native-v2")]
629 fn kv_get(
630 &self,
631 snapshot_id: crate::snapshot::SnapshotId,
632 key: &[u8],
633 ) -> Result<Option<KvValue>, SqliteGraphError> {
634 let store = self.kv_store.read();
635 store
636 .get_at_snapshot(key, snapshot_id)
637 .map_err(|e| SqliteGraphError::connection(e.to_string()))
638 }
639
640 #[cfg(feature = "native-v2")]
641 fn kv_set(
642 &self,
643 key: Vec<u8>,
644 value: KvValue,
645 ttl_seconds: Option<u64>,
646 ) -> Result<(), SqliteGraphError> {
647 use crate::backend::native::v2::kv_store::wal;
648 use crate::backend::native::v2::wal::record::V2WALRecord;
649
650 let wal_integrator = self.wal_integrator.as_ref().ok_or_else(|| {
651 SqliteGraphError::connection("WAL not available - KV requires native-v2".to_string())
652 })?;
653
654 let key_clone = key.clone();
656
657 let value_bytes = wal::serialize_value(&value)
659 .map_err(|e| SqliteGraphError::connection(format!("KV serialization failed: {}", e)))?;
660 let value_type = wal::get_value_type_tag(&value);
661
662 let wal_record = V2WALRecord::KvSet {
664 key,
665 value_bytes,
666 value_type,
667 ttl_seconds,
668 version: 0, };
670
671 let commit_lsn = wal_integrator
673 .wal_manager()
674 .write_record(wal_record)
675 .map_err(|e| SqliteGraphError::connection(format!("KV WAL write failed: {:?}", e)))?;
676
677 let mut store = self.kv_store.write();
679 store
680 .set_with_version(key_clone, value, ttl_seconds, commit_lsn)
681 .map_err(|e| SqliteGraphError::connection(format!("KV store update failed: {}", e)))?;
682
683 Ok(())
684 }
685
686 #[cfg(feature = "native-v2")]
687 fn kv_delete(&self, key: &[u8]) -> Result<(), SqliteGraphError> {
688 use crate::backend::native::v2::kv_store::wal;
689 use crate::backend::native::v2::wal::record::V2WALRecord;
690
691 let wal_integrator = self.wal_integrator.as_ref().ok_or_else(|| {
692 SqliteGraphError::connection("WAL not available - KV requires native-v2".to_string())
693 })?;
694
695 let store = self.kv_store.read();
697 let old_value = store
698 .get(key)
699 .map_err(|e| SqliteGraphError::connection(format!("KV get failed: {}", e)))?;
700 drop(store);
701
702 let (old_value_bytes, old_value_type) = if let Some(ref value) = old_value {
704 let bytes = wal::serialize_value(value).map_err(|e| {
705 SqliteGraphError::connection(format!("KV serialization failed: {}", e))
706 })?;
707 let type_tag = wal::get_value_type_tag(value);
708 (Some(bytes), type_tag)
709 } else {
710 (None, 0)
711 };
712
713 let wal_record = V2WALRecord::KvDelete {
715 key: key.to_vec(),
716 old_value_bytes,
717 old_value_type,
718 old_version: 0, };
720
721 let _commit_lsn = wal_integrator
723 .wal_manager()
724 .write_record(wal_record)
725 .map_err(|e| SqliteGraphError::connection(format!("KV WAL delete failed: {:?}", e)))?;
726
727 let mut store = self.kv_store.write();
729 let _ = store.delete(key);
731
732 Ok(())
733 }
734
735 #[cfg(feature = "native-v2")]
736 fn subscribe(
737 &self,
738 filter: crate::backend::SubscriptionFilter,
739 ) -> Result<(u64, std::sync::mpsc::Receiver<crate::backend::PubSubEvent>), SqliteGraphError>
740 {
741 let wal_integrator = self.wal_integrator.as_ref().ok_or_else(|| {
742 SqliteGraphError::connection(
743 "WAL not available - pub/sub requires native-v2".to_string(),
744 )
745 })?;
746
747 let (sub_id, rx) = wal_integrator
748 .wal_manager()
749 .get_publisher()
750 .subscribe(filter);
751 Ok((sub_id.as_u64(), rx))
752 }
753
754 #[cfg(feature = "native-v2")]
755 fn unsubscribe(&self, subscriber_id: u64) -> Result<bool, SqliteGraphError> {
756 use crate::backend::native::v2::pubsub::SubscriberId;
757
758 let wal_integrator = self.wal_integrator.as_ref().ok_or_else(|| {
759 SqliteGraphError::connection(
760 "WAL not available - pub/sub requires native-v2".to_string(),
761 )
762 })?;
763
764 let sub_id = SubscriberId::from_raw(subscriber_id);
765 let removed = wal_integrator
766 .wal_manager()
767 .get_publisher()
768 .unsubscribe(sub_id);
769 Ok(removed)
770 }
771
772 #[cfg(feature = "native-v2")]
775 fn kv_prefix_scan(
776 &self,
777 snapshot_id: crate::snapshot::SnapshotId,
778 prefix: &[u8],
779 ) -> Result<
780 Vec<(
781 Vec<u8>,
782 crate::backend::native::v2::kv_store::types::KvValue,
783 )>,
784 SqliteGraphError,
785 > {
786 let store = self.kv_store.read();
787 store
788 .prefix_scan(snapshot_id, prefix)
789 .map_err(|e| SqliteGraphError::connection(e.to_string()))
790 }
791
792 fn query_nodes_by_kind(
793 &self,
794 _snapshot_id: crate::snapshot::SnapshotId,
795 kind: &str,
796 ) -> Result<Vec<i64>, SqliteGraphError> {
797 self.with_graph_file(|graph_file| {
798 let header = graph_file.header();
800 let node_count = header.node_count as i64;
801
802 let mut node_store = NodeStore::new(graph_file);
803 let mut results = Vec::new();
804
805 for node_id in 1..=node_count {
809 match node_store.read_node(node_id as NativeNodeId) {
810 Ok(record) => {
811 if record.kind == kind {
812 results.push(node_id);
813 }
814 }
815 Err(_) => {
816 continue;
818 }
819 }
820 }
821
822 results.sort_unstable();
823 Ok(results)
824 })
825 }
826
827 fn query_nodes_by_name_pattern(
828 &self,
829 _snapshot_id: crate::snapshot::SnapshotId,
830 pattern: &str,
831 ) -> Result<Vec<i64>, SqliteGraphError> {
832 use crate::backend::native::pattern::glob_matches;
833
834 self.with_graph_file(|graph_file| {
835 let header = graph_file.header();
837 let node_count = header.node_count as i64;
838
839 let mut node_store = NodeStore::new(graph_file);
840 let mut results = Vec::new();
841
842 for node_id in 1..=node_count {
844 match node_store.read_node(node_id as NativeNodeId) {
845 Ok(record) => {
846 if glob_matches(pattern, &record.name) {
847 results.push(node_id);
848 }
849 }
850 Err(_) => {
851 continue;
853 }
854 }
855 }
856
857 results.sort_unstable();
858 Ok(results)
859 })
860 }
861}
862
863#[cfg(test)]
864mod tests {
865 use super::*;
866
867 #[test]
868 fn test_native_backend_creation() {
869 let backend = NativeGraphBackend::new_temp().unwrap();
870 assert!(true);
872 }
873
874 #[test]
875 fn test_interior_mutability() {
876 let backend = NativeGraphBackend::new_temp().unwrap();
877
878 let node_id = backend
880 .insert_node(NodeSpec {
881 kind: "Test".to_string(),
882 name: "node1".to_string(),
883 file_path: None,
884 data: serde_json::json!({}),
885 })
886 .unwrap();
887
888 let snapshot = crate::snapshot::SnapshotId::current();
889 let node = backend.get_node(snapshot, node_id).unwrap();
890 assert_eq!(node.name, "node1");
891 assert_eq!(node.kind, "Test");
892 }
893
894 #[cfg(feature = "native-v2")]
895 #[test]
896 fn test_subscribe_to_events() {
897 use crate::backend::SubscriptionFilter;
898 use std::time::Duration;
899
900 let backend = NativeGraphBackend::new_temp().unwrap();
902 let filter = SubscriptionFilter::all();
903
904 let (sub_id, mut rx) = backend.subscribe(filter).unwrap();
906
907 let node_id = backend
909 .insert_node(NodeSpec {
910 kind: "Test".to_string(),
911 name: "test_node".to_string(),
912 file_path: None,
913 data: serde_json::json!({}),
914 })
915 .unwrap();
916
917 assert!(sub_id > 0);
920
921 let removed = backend.unsubscribe(sub_id).unwrap();
923 assert!(removed);
924 }
925
926 #[cfg(feature = "native-v2")]
927 #[test]
928 fn test_kv_persistence_across_reopen() {
929 use tempfile::TempDir;
930
931 let temp_dir = TempDir::new().unwrap();
932 let db_path = temp_dir.path().join("test.db");
933
934 {
936 let backend = NativeGraphBackend::new(&db_path).unwrap();
937 backend
938 .kv_set(b"test_key".to_vec(), KvValue::Integer(42), None)
939 .unwrap();
940 backend
941 .kv_set(
942 b"another_key".to_vec(),
943 KvValue::String("hello".to_string()),
944 None,
945 )
946 .unwrap();
947
948 let wal_integrator = backend.wal_integrator.as_ref().unwrap();
950 wal_integrator.wal_manager().flush().unwrap();
951 }
952
953 {
955 let backend = NativeGraphBackend::open(&db_path).unwrap();
956 let snapshot = crate::snapshot::SnapshotId::current();
957
958 let result = backend.kv_get(snapshot, b"test_key").unwrap();
959 assert_eq!(result, Some(KvValue::Integer(42)));
960
961 let result2 = backend.kv_get(snapshot, b"another_key").unwrap();
962 assert_eq!(result2, Some(KvValue::String("hello".to_string())));
963
964 let results = backend.kv_prefix_scan(snapshot, b"test_").unwrap();
966 assert_eq!(results.len(), 1);
967 assert_eq!(results[0].0, b"test_key".to_vec());
968 }
969 }
970
971 #[cfg(feature = "native-v2")]
972 #[test]
973 fn test_flush_wal_buffer() {
974 use tempfile::TempDir;
975
976 let temp_dir = TempDir::new().unwrap();
977 let db_path = temp_dir.path().join("test.db");
978
979 {
981 let backend = NativeGraphBackend::new(&db_path).unwrap();
982 backend
983 .kv_set(b"test_key".to_vec(), KvValue::Integer(1), None)
984 .unwrap();
985 backend.flush().unwrap();
986
987 let snapshot = crate::snapshot::SnapshotId::current();
988 let result = backend.kv_get(snapshot, b"test_key").unwrap();
989 assert_eq!(result, Some(KvValue::Integer(1)));
990 }
991
992 {
994 let backend = NativeGraphBackend::open(&db_path).unwrap();
995 let snapshot = crate::snapshot::SnapshotId::current();
996 let result = backend.kv_get(snapshot, b"test_key").unwrap();
997 assert_eq!(
998 result,
999 Some(KvValue::Integer(1)),
1000 "Flushed data should persist"
1001 );
1002 }
1003
1004 {
1006 let backend = NativeGraphBackend::open(&db_path).unwrap();
1007 for i in 0..10 {
1009 backend
1010 .kv_set(
1011 format!("bulk_key_{}", i).into_bytes(),
1012 KvValue::Integer(i),
1013 None,
1014 )
1015 .unwrap();
1016 }
1017 backend.flush().unwrap();
1018
1019 let wal_path = db_path.with_extension("wal");
1020 let wal_size = std::fs::metadata(&wal_path).unwrap().len();
1021 assert!(wal_size > 200, "WAL should contain data after flush");
1022 }
1023 }
1024
1025 #[cfg(feature = "native-v2")]
1030 #[test]
1031 fn test_update_node_preserves_node_id() {
1032 let backend = NativeGraphBackend::new_temp().unwrap();
1035
1036 let node_id = backend
1038 .insert_node(NodeSpec {
1039 kind: "File".to_string(),
1040 name: "test.rs".to_string(),
1041 file_path: Some("test.rs".to_string()),
1042 data: serde_json::json!({"hash": "abc123"}),
1043 })
1044 .unwrap();
1045
1046 let updated_id = backend
1048 .update_node(
1049 node_id,
1050 NodeSpec {
1051 kind: "File".to_string(),
1052 name: "test.rs".to_string(),
1053 file_path: Some("test.rs".to_string()),
1054 data: serde_json::json!({"hash": "def456", "updated": true}),
1055 },
1056 )
1057 .expect("update_node should be implemented");
1058
1059 assert_eq!(
1060 updated_id, node_id,
1061 "update_node must return the same node_id - no new allocation"
1062 );
1063
1064 let snapshot = crate::snapshot::SnapshotId::current();
1066 let node = backend.get_node(snapshot, updated_id).unwrap();
1067 assert_eq!(node.kind, "File");
1068 assert_eq!(node.name, "test.rs");
1069
1070 let data: serde_json::Value = node.data;
1071 assert_eq!(data["hash"], "def456");
1072 assert_eq!(data["updated"], true);
1073 }
1074
1075 #[cfg(feature = "native-v2")]
1076 #[test]
1077 fn test_update_node_nonexistent_returns_error() {
1078 let backend = NativeGraphBackend::new_temp().unwrap();
1080
1081 let result = backend.update_node(
1082 9999, NodeSpec {
1084 kind: "File".to_string(),
1085 name: "test.rs".to_string(),
1086 file_path: Some("test.rs".to_string()),
1087 data: serde_json::json!({"hash": "abc123"}),
1088 },
1089 );
1090
1091 assert!(
1092 result.is_err(),
1093 "update_node should return error for non-existent node_id"
1094 );
1095 }
1096
1097 #[cfg(feature = "native-v2")]
1098 #[test]
1099 fn test_multiple_updates_dont_increase_node_count() {
1100 use tempfile::TempDir;
1103
1104 let temp_dir = TempDir::new().unwrap();
1105 let db_path = temp_dir.path().join("test_update_no_increase.db");
1106
1107 {
1109 let backend = NativeGraphBackend::new(&db_path).unwrap();
1110 let node_id = backend
1111 .insert_node(NodeSpec {
1112 kind: "File".to_string(),
1113 name: "main.rs".to_string(),
1114 file_path: Some("main.rs".to_string()),
1115 data: serde_json::json!({"version": 1}),
1116 })
1117 .unwrap();
1118
1119 let ids_after_insert = backend.entity_ids().unwrap();
1121 let initial_count = ids_after_insert.len();
1122 assert_eq!(initial_count, 1, "Should have exactly 1 node");
1123
1124 for i in 2..=100 {
1126 let _ = backend
1127 .update_node(
1128 node_id,
1129 NodeSpec {
1130 kind: "File".to_string(),
1131 name: "main.rs".to_string(),
1132 file_path: Some("main.rs".to_string()),
1133 data: serde_json::json!({"version": i}),
1134 },
1135 )
1136 .unwrap();
1137
1138 let ids = backend.entity_ids().unwrap();
1139 assert_eq!(
1140 ids.len(),
1141 1,
1142 "Node count should remain 1 after {} updates",
1143 i - 1
1144 );
1145 }
1146
1147 let snapshot = crate::snapshot::SnapshotId::current();
1149 let node = backend.get_node(snapshot, node_id).unwrap();
1150 let data: serde_json::Value = node.data;
1151 assert_eq!(data["version"], 100, "Data should reflect last update");
1152 }
1153 }
1154
1155 #[test]
1156 fn test_update_node_interface_exists() {
1157 use crate::backend::GraphBackend;
1160
1161 fn has_update_node<B: GraphBackend>(backend: &B) -> bool {
1162 true
1165 }
1166
1167 let backend = NativeGraphBackend::new_temp().unwrap();
1168 assert!(has_update_node(&backend));
1169 }
1170}