Skip to main content

sqlitegraph/backend/native/
graph_backend.rs

1//! Native GraphBackend implementation with interior mutability.
2
3use super::adjacency::{AdjacencyHelpers, Direction};
4use super::edge_store::EdgeStore;
5use super::graph_file::GraphFile;
6use super::graph_ops::*;
7use super::graph_validation::*;
8use super::node_store::NodeStore;
9use super::types::*;
10use crate::SqliteGraphError;
11use crate::backend::{
12    BackendDirection, ChainStep, EdgeSpec, GraphBackend, NeighborQuery, NodeSpec, PatternMatch,
13    PatternQuery,
14};
15use crate::graph::GraphEntity;
16use parking_lot::RwLock;
17use std::sync::Arc;
18
19// V2 WAL integration (always available when native-v2 feature is enabled)
20#[cfg(feature = "native-v2")]
21use crate::backend::native::v2::wal::{
22    GraphWALIntegrationConfig, V2GraphWALIntegrator, V2WALConfig,
23};
24
25#[cfg(feature = "native-v2")]
26use crate::backend::native::v2::kv_store::store::KvStore;
27
28#[cfg(feature = "native-v2")]
29use crate::backend::native::v2::kv_store::types::KvValue;
30
31/// Native backend implementation using interior mutability
32pub struct NativeGraphBackend {
33    graph_file: RwLock<GraphFile>,
34    /// WAL integrator for transaction management and checkpointing
35    /// Always available when native-v2 feature is enabled (production ready)
36    #[cfg(feature = "native-v2")]
37    wal_integrator: Option<Arc<V2GraphWALIntegrator>>,
38    /// KV store for metadata and application data
39    #[cfg(feature = "native-v2")]
40    kv_store: Arc<RwLock<KvStore>>,
41}
42
43impl NativeGraphBackend {
44    /// Create a new native backend with an in-memory temporary file
45    #[cfg(test)]
46    pub fn new_temp() -> Result<Self, SqliteGraphError> {
47        use tempfile::NamedTempFile;
48        let temp_file =
49            NamedTempFile::new().map_err(|e| SqliteGraphError::connection(e.to_string()))?;
50        let path = temp_file.path();
51        let graph_file = GraphFile::create(path).map_err(map_to_graph_error)?;
52
53        #[cfg(feature = "native-v2")]
54        let wal_integrator = Some(Self::create_wal_integrator(path)?);
55
56        Ok(Self {
57            graph_file: RwLock::new(graph_file),
58            #[cfg(feature = "native-v2")]
59            wal_integrator,
60            #[cfg(feature = "native-v2")]
61            kv_store: Arc::new(RwLock::new(KvStore::new())),
62        })
63    }
64
65    /// Create a new native backend at the specified path
66    pub fn new<P: AsRef<std::path::Path>>(path: P) -> Result<Self, SqliteGraphError> {
67        let graph_file = GraphFile::create(&path).map_err(map_to_graph_error)?;
68
69        #[cfg(feature = "native-v2")]
70        let wal_integrator = Some(Self::create_wal_integrator(&path)?);
71
72        Ok(Self {
73            graph_file: RwLock::new(graph_file),
74            #[cfg(feature = "native-v2")]
75            wal_integrator,
76            #[cfg(feature = "native-v2")]
77            kv_store: Arc::new(RwLock::new(KvStore::new())),
78        })
79    }
80
81    /// Open an existing native backend from the specified path
82    pub fn open<P: AsRef<std::path::Path>>(path: P) -> Result<Self, SqliteGraphError> {
83        let graph_file = GraphFile::open(&path).map_err(map_to_graph_error)?;
84
85        #[cfg(feature = "native-v2")]
86        let wal_integrator = Some(Self::open_wal_integrator(&path)?);
87
88        #[cfg(feature = "native-v2")]
89        let kv_store = {
90            let wal_config = V2WALConfig::for_graph_file(path.as_ref());
91            match crate::backend::native::v2::kv_store::recover_kv_from_wal(&wal_config.wal_path) {
92                Ok(store) => Arc::new(RwLock::new(store)),
93                Err(e) => {
94                    // Log warning but continue with empty store
95                    eprintln!(
96                        "Warning: KV recovery from {} failed, starting with empty store: {}",
97                        wal_config.wal_path.display(),
98                        e
99                    );
100                    Arc::new(RwLock::new(KvStore::new()))
101                }
102            }
103        };
104
105        Ok(Self {
106            graph_file: RwLock::new(graph_file),
107            #[cfg(feature = "native-v2")]
108            wal_integrator,
109            #[cfg(feature = "native-v2")]
110            kv_store,
111        })
112    }
113
114    /// Create WAL integrator for the graph (opens existing WAL without truncating)
115    #[cfg(feature = "native-v2")]
116    fn open_wal_integrator<P: AsRef<std::path::Path>>(
117        path: P,
118    ) -> Result<Arc<V2GraphWALIntegrator>, SqliteGraphError> {
119        let path_ref = path.as_ref();
120
121        // Use the helper function to create WAL config with correct paths
122        let wal_config = V2WALConfig::for_graph_file(path_ref);
123
124        // Create integration config with default settings
125        let integration_config = GraphWALIntegrationConfig::default();
126
127        // Open the integrator (preserves existing WAL data)
128        let integrator =
129            V2GraphWALIntegrator::open(wal_config, integration_config).map_err(|e| {
130                SqliteGraphError::connection(format!("Failed to open WAL integrator: {:?}", e))
131            })?;
132
133        Ok(Arc::new(integrator))
134    }
135
136    /// Create WAL integrator for the graph
137    #[cfg(feature = "native-v2")]
138    fn create_wal_integrator<P: AsRef<std::path::Path>>(
139        path: P,
140    ) -> Result<Arc<V2GraphWALIntegrator>, SqliteGraphError> {
141        let path_ref = path.as_ref();
142
143        // Use the helper function to create WAL config with correct paths
144        let wal_config = V2WALConfig::for_graph_file(path_ref);
145
146        // Create integration config with default settings
147        let integration_config = GraphWALIntegrationConfig::default();
148
149        // Create the integrator
150        let integrator =
151            V2GraphWALIntegrator::create(wal_config, integration_config).map_err(|e| {
152                SqliteGraphError::connection(format!("Failed to create WAL integrator: {:?}", e))
153            })?;
154
155        Ok(Arc::new(integrator))
156    }
157
158    /// Get mutable access to the underlying graph file for internal operations
159    fn with_graph_file<R, F>(&self, f: F) -> Result<R, SqliteGraphError>
160    where
161        F: FnOnce(&mut GraphFile) -> Result<R, NativeBackendError>,
162    {
163        let mut graph_file = self.graph_file.write();
164        f(&mut *graph_file).map_err(map_to_graph_error)
165    }
166
167    /// Get WAL metrics (if native-v2 feature is enabled and WAL integrator exists)
168    #[cfg(feature = "native-v2")]
169    pub fn get_wal_metrics(&self) -> Option<crate::backend::native::v2::wal::WALManagerMetrics> {
170        self.wal_integrator
171            .as_ref()
172            .map(|integrator| integrator.get_metrics())
173    }
174
175    /// Get active transaction count (if native-v2 feature is enabled and WAL integrator exists)
176    #[cfg(feature = "native-v2")]
177    pub fn get_active_transaction_count(&self) -> Option<usize> {
178        self.wal_integrator
179            .as_ref()
180            .map(|integrator| integrator.get_active_transaction_count())
181    }
182}
183
184/// Properly shut down the WAL integrator when NativeGraphBackend is dropped
185#[cfg(feature = "native-v2")]
186impl Drop for NativeGraphBackend {
187    fn drop(&mut self) {
188        // Ensure WAL integrator is properly shut down
189        // This signals the background coordinator thread to stop and flushes pending data
190        if let Some(ref integrator) = self.wal_integrator {
191            // Use soft_shutdown which works via Arc reference
192            // We need to access the inner WAL manager
193            if let Err(e) = integrator.soft_shutdown() {
194                eprintln!(
195                    "Warning: Failed to soft shutdown WAL integrator: {:?}",
196                    e
197                );
198            }
199        }
200    }
201}
202
203impl GraphBackend for NativeGraphBackend {
204    fn insert_node(&self, node: NodeSpec) -> Result<i64, SqliteGraphError> {
205        self.with_graph_file(|graph_file| {
206            let mut node_store = NodeStore::new(graph_file);
207            let node_id = node_store.allocate_node_id()?;
208
209            // Phase 31: V2 is now the default format (no feature gating)
210            let record_v2 = crate::backend::native::v2::node_record_v2::NodeRecordV2::new(
211                node_id, node.kind, node.name, node.data,
212            );
213            node_store.write_node_v2(&record_v2)?;
214
215            Ok(node_id as i64)
216        })
217    }
218
219    fn get_node(
220        &self,
221        snapshot_id: crate::snapshot::SnapshotId,
222        id: i64,
223    ) -> Result<GraphEntity, SqliteGraphError> {
224        self.with_graph_file(|graph_file| {
225            // TODO: Pass snapshot_id to filter WAL records (Phase 38-04)
226            let _snapshot_id = snapshot_id; // Suppress unused warning until Phase 38-04
227            let mut node_store = NodeStore::new(graph_file);
228            let record = node_store.read_node(id as NativeNodeId)?;
229            Ok(node_record_to_entity(record))
230        })
231    }
232
233    fn insert_edge(&self, edge: EdgeSpec) -> Result<i64, SqliteGraphError> {
234        self.with_graph_file(|graph_file| {
235            // Phase 44.2: Use V2 clustered adjacency when experimental feature is enabled
236            // Phase 44.2: Use V2 clustered adjacency when experimental feature is enabled
237            #[cfg(feature = "v2_experimental")]
238            {
239                let mut edge_store = EdgeStore::new(graph_file);
240                let edge_id = edge_store.allocate_edge_id();
241                let record = edge_spec_to_record(edge, edge_id);
242
243                // write_edge already calls update_v2_clustered_adjacency internally
244                edge_store.write_edge(&record)?;
245                Ok(edge_id as i64)
246            }
247            #[cfg(not(feature = "v2_experimental"))]
248            {
249                let mut edge_store = EdgeStore::new(graph_file);
250                let edge_id = edge_store.allocate_edge_id();
251                let record = edge_spec_to_record(edge, edge_id);
252                edge_store.write_edge(&record)?;
253                Ok(edge_id as i64)
254            }
255        })
256    }
257
258    fn update_node(&self, node_id: i64, node: NodeSpec) -> Result<i64, SqliteGraphError> {
259        #[cfg(feature = "native-v2")]
260        {
261            if let Some(ref integrator) = self.wal_integrator {
262                // Convert NodeSpec to NodeRecordV2
263                let old_record = self.with_graph_file(|graph_file| {
264                    let mut node_store = NodeStore::new(graph_file);
265                    node_store.read_node_v2(node_id as NativeNodeId)
266                })?;
267
268                // Create new record from NodeSpec
269                let new_record =
270                    node_spec_to_v2_record(node, node_id as NativeNodeId, &old_record)?;
271
272                // Use WAL integrator to update
273                let _result = integrator
274                    .update_node(None, node_id, &old_record, &new_record)
275                    .map_err(|e| {
276                        SqliteGraphError::connection(format!("Node update failed: {:?}", e))
277                    })?;
278
279                return Ok(node_id);
280            }
281        }
282
283        // Fallback: Not implemented for backends without WAL
284        Err(SqliteGraphError::connection(
285            "update_node not supported for this backend configuration".to_string(),
286        ))
287    }
288
289    fn delete_entity(&self, id: i64) -> Result<(), SqliteGraphError> {
290        self.with_graph_file(|graph_file| {
291            let node_id = id as NativeNodeId;
292            let mut node_store = NodeStore::new(graph_file);
293            node_store.delete_node(node_id)?;
294            Ok(())
295        })
296    }
297
298    fn entity_ids(&self) -> Result<Vec<i64>, SqliteGraphError> {
299        self.with_graph_file(|graph_file| {
300            let mut node_store = NodeStore::new(graph_file);
301            let ids = node_store.all_node_ids()?;
302            Ok(ids.into_iter().map(|id| id as i64).collect())
303        })
304    }
305
306    fn neighbors(
307        &self,
308        snapshot_id: crate::snapshot::SnapshotId,
309        node: i64,
310        query: NeighborQuery,
311    ) -> Result<Vec<i64>, SqliteGraphError> {
312        self.with_graph_file(|graph_file| {
313            let node_id = node as NativeNodeId;
314
315            // Use snapshot-aware helpers for neighbor retrieval
316            let neighbors = if let Some(edge_type) = &query.edge_type {
317                let edge_type_ref = edge_type.as_str();
318                match query.direction {
319                    BackendDirection::Outgoing => {
320                        AdjacencyHelpers::get_outgoing_neighbors_filtered(
321                            graph_file,
322                            node_id,
323                            &[edge_type_ref],
324                        )
325                    }
326                    BackendDirection::Incoming => {
327                        AdjacencyHelpers::get_incoming_neighbors_filtered(
328                            graph_file,
329                            node_id,
330                            &[edge_type_ref],
331                        )
332                    }
333                }
334            } else {
335                // Phase 38-04: Use snapshot-aware neighbor retrieval
336                // Note: WAL reader not available at this layer - pass None for base data only
337                match query.direction {
338                    BackendDirection::Outgoing => {
339                        AdjacencyHelpers::get_outgoing_neighbors_at_snapshot(
340                            graph_file,
341                            node_id,
342                            snapshot_id,
343                            None, // WAL reader - base data only
344                        )
345                    }
346                    BackendDirection::Incoming => {
347                        AdjacencyHelpers::get_incoming_neighbors_at_snapshot(
348                            graph_file,
349                            node_id,
350                            snapshot_id,
351                            None, // WAL reader - base data only
352                        )
353                    }
354                }
355            }?;
356
357            Ok(neighbors.into_iter().map(|id| id as i64).collect())
358        })
359    }
360
361    fn bfs(
362        &self,
363        snapshot_id: crate::snapshot::SnapshotId,
364        start: i64,
365        depth: u32,
366    ) -> Result<Vec<i64>, SqliteGraphError> {
367        self.with_graph_file(|graph_file| {
368            // TODO: Pass snapshot_id to filter WAL records (Phase 38-04)
369            let _snapshot_id = snapshot_id; // Suppress unused warning until Phase 38-04
370            let result = native_bfs(graph_file, start as NativeNodeId, depth)?;
371            Ok(result.into_iter().map(|id| id as i64).collect())
372        })
373    }
374
375    fn shortest_path(
376        &self,
377        snapshot_id: crate::snapshot::SnapshotId,
378        start: i64,
379        end: i64,
380    ) -> Result<Option<Vec<i64>>, SqliteGraphError> {
381        self.with_graph_file(|graph_file| {
382            // TODO: Pass snapshot_id to filter WAL records (Phase 38-04)
383            let _snapshot_id = snapshot_id; // Suppress unused warning until Phase 38-04
384            let result =
385                native_shortest_path(graph_file, start as NativeNodeId, end as NativeNodeId)?;
386            Ok(result.map(|path| path.into_iter().map(|id| id as i64).collect()))
387        })
388    }
389
390    fn node_degree(
391        &self,
392        snapshot_id: crate::snapshot::SnapshotId,
393        node: i64,
394    ) -> Result<(usize, usize), SqliteGraphError> {
395        self.with_graph_file(|graph_file| {
396            // TODO: Pass snapshot_id to filter WAL records (Phase 38-04)
397            let _snapshot_id = snapshot_id; // Suppress unused warning until Phase 38-04
398            let node_id = node as NativeNodeId;
399            let outgoing = AdjacencyHelpers::outgoing_degree(graph_file, node_id)?;
400            let incoming = AdjacencyHelpers::incoming_degree(graph_file, node_id)?;
401            Ok((outgoing as usize, incoming as usize))
402        })
403    }
404
405    fn k_hop(
406        &self,
407        snapshot_id: crate::snapshot::SnapshotId,
408        start: i64,
409        depth: u32,
410        direction: BackendDirection,
411    ) -> Result<Vec<i64>, SqliteGraphError> {
412        self.with_graph_file(|graph_file| {
413            // TODO: Pass snapshot_id to filter WAL records (Phase 38-04)
414            let _snapshot_id = snapshot_id; // Suppress unused warning until Phase 38-04
415            let result = native_k_hop(
416                graph_file,
417                start as NativeNodeId,
418                depth,
419                match direction {
420                    BackendDirection::Outgoing => Direction::Outgoing,
421                    BackendDirection::Incoming => Direction::Incoming,
422                },
423            )?;
424            Ok(result.into_iter().map(|id| id as i64).collect())
425        })
426    }
427
428    fn k_hop_filtered(
429        &self,
430        snapshot_id: crate::snapshot::SnapshotId,
431        start: i64,
432        depth: u32,
433        direction: BackendDirection,
434        allowed_edge_types: &[&str],
435    ) -> Result<Vec<i64>, SqliteGraphError> {
436        self.with_graph_file(|graph_file| {
437            // TODO: Pass snapshot_id to filter WAL records (Phase 38-04)
438            let _snapshot_id = snapshot_id; // Suppress unused warning until Phase 38-04
439            let result = native_k_hop_filtered(
440                graph_file,
441                start as NativeNodeId,
442                depth,
443                match direction {
444                    BackendDirection::Outgoing => Direction::Outgoing,
445                    BackendDirection::Incoming => Direction::Incoming,
446                },
447                allowed_edge_types,
448            )?;
449            Ok(result.into_iter().map(|id| id as i64).collect())
450        })
451    }
452
453    fn chain_query(
454        &self,
455        snapshot_id: crate::snapshot::SnapshotId,
456        start: i64,
457        chain: &[ChainStep],
458    ) -> Result<Vec<i64>, SqliteGraphError> {
459        self.with_graph_file(|graph_file| {
460            // TODO: Pass snapshot_id to filter WAL records (Phase 38-04)
461            let _snapshot_id = snapshot_id; // Suppress unused warning until Phase 38-04
462            let result = native_chain_query(graph_file, start as NativeNodeId, chain)?;
463            Ok(result.into_iter().map(|id| id as i64).collect())
464        })
465    }
466
467    fn pattern_search(
468        &self,
469        snapshot_id: crate::snapshot::SnapshotId,
470        start: i64,
471        pattern: &PatternQuery,
472    ) -> Result<Vec<PatternMatch>, SqliteGraphError> {
473        self.with_graph_file(|graph_file| {
474            // TODO: Pass snapshot_id to filter WAL records (Phase 38-04)
475            let _snapshot_id = snapshot_id; // Suppress unused warning until Phase 38-04
476            native_pattern_search(graph_file, start as NativeNodeId, pattern)
477        })
478    }
479
480    fn checkpoint(&self) -> Result<(), SqliteGraphError> {
481        #[cfg(feature = "native-v2")]
482        {
483            if let Some(ref integrator) = self.wal_integrator {
484                integrator.force_checkpoint().map_err(|e| {
485                    SqliteGraphError::connection(format!("WAL checkpoint failed: {:?}", e))
486                })?;
487                return Ok(());
488            }
489        }
490
491        // If native-v2 feature is not enabled, checkpoint is a no-op
492        Ok(())
493    }
494
495    fn flush(&self) -> Result<(), SqliteGraphError> {
496        #[cfg(feature = "native-v2")]
497        {
498            if let Some(ref integrator) = self.wal_integrator {
499                integrator.wal_manager().flush().map_err(|e| {
500                    SqliteGraphError::connection(format!("WAL flush failed: {:?}", e))
501                })?;
502                return Ok(());
503            }
504        }
505
506        // If native-v2 feature is not enabled, flush is a no-op
507        Ok(())
508    }
509
510    fn snapshot_export(
511        &self,
512        export_dir: &std::path::Path,
513    ) -> Result<crate::backend::SnapshotMetadata, SqliteGraphError> {
514        use crate::backend::native::v2::export::SnapshotExporter;
515        use crate::backend::native::v2::export::snapshot::SnapshotExportConfig;
516        use std::time::{SystemTime, UNIX_EPOCH};
517
518        // Get the graph file path from the GraphFile
519        let graph_path = self.with_graph_file(|graph_file| Ok(graph_file.path().to_path_buf()))?;
520
521        // Create snapshot exporter with default config
522        let snapshot_id = format!(
523            "snapshot_{}",
524            SystemTime::now()
525                .duration_since(UNIX_EPOCH)
526                .unwrap_or_default()
527                .as_secs()
528        );
529
530        let config = SnapshotExportConfig {
531            export_path: export_dir.to_path_buf(),
532            snapshot_id: snapshot_id.clone(),
533            include_statistics: true,
534            min_stable_duration: std::time::Duration::from_secs(0),
535            checksum_validation: true,
536        };
537
538        let mut exporter = SnapshotExporter::new(&graph_path, config).map_err(|e| {
539            SqliteGraphError::connection(format!("Failed to create snapshot exporter: {:?}", e))
540        })?;
541
542        let result = exporter.export_snapshot().map_err(|e| {
543            SqliteGraphError::connection(format!("Snapshot export failed: {:?}", e))
544        })?;
545
546        Ok(crate::backend::SnapshotMetadata {
547            snapshot_path: result.snapshot_path,
548            size_bytes: result.snapshot_size_bytes,
549            entity_count: 0, // Snapshot export doesn't return entity count directly
550            edge_count: 0,
551        })
552    }
553
554    fn backup(
555        &self,
556        backup_dir: &std::path::Path,
557    ) -> Result<crate::backend::BackupResult, SqliteGraphError> {
558        #[cfg(feature = "native-v2")]
559        {
560            use crate::backend::native::v2::backup;
561
562            // Get the graph file path from the GraphFile
563            let graph_path =
564                self.with_graph_file(|graph_file| Ok(graph_file.path().to_path_buf()))?;
565
566            // Create backup with default configuration (includes checkpoint)
567            let native_result =
568                backup::create_backup(&graph_path, backup::BackupConfig::new(backup_dir))
569                    .map_err(|e| SqliteGraphError::connection(format!("Backup failed: {:?}", e)))?;
570
571            Ok(crate::backend::BackupResult {
572                snapshot_path: native_result.snapshot_path,
573                manifest_path: native_result.manifest_path,
574                size_bytes: native_result.size_bytes,
575                checksum: native_result.checksum,
576                record_count: native_result.record_count,
577                duration_secs: native_result.duration_secs,
578                timestamp: native_result.timestamp,
579                checkpoint_performed: native_result.checkpoint_performed,
580            })
581        }
582
583        #[cfg(not(feature = "native-v2"))]
584        {
585            let _ = backup_dir;
586            Err(SqliteGraphError::connection(
587                "Backup not available without native-v2 feature".to_string(),
588            ))
589        }
590    }
591
592    fn snapshot_import(
593        &self,
594        import_dir: &std::path::Path,
595    ) -> Result<crate::backend::ImportMetadata, SqliteGraphError> {
596        use crate::backend::native::v2::import::ImportMode;
597        use crate::backend::native::v2::import::SnapshotImporter;
598        use crate::backend::native::v2::import::snapshot::SnapshotImportConfig;
599
600        // Get the graph file path
601        let graph_path = self.with_graph_file(|graph_file| Ok(graph_file.path().to_path_buf()))?;
602
603        let config = SnapshotImportConfig {
604            target_graph_path: graph_path.clone(),
605            export_dir_path: import_dir.to_path_buf(),
606            import_mode: ImportMode::Fresh,
607            validate_manifest: true,
608            verify_checksum: true,
609            overwrite_existing: true, // Allow overwriting for import
610        };
611
612        let importer =
613            SnapshotImporter::from_export_dir(import_dir, &graph_path, config).map_err(|e| {
614                SqliteGraphError::connection(format!("Failed to create snapshot importer: {:?}", e))
615            })?;
616
617        let result = importer.import().map_err(|e| {
618            SqliteGraphError::connection(format!("Snapshot import failed: {:?}", e))
619        })?;
620
621        Ok(crate::backend::ImportMetadata {
622            snapshot_path: import_dir.join("snapshot"), // Approximate path
623            entities_imported: result.records_imported,
624            edges_imported: 0, // Records include both entities and edges
625        })
626    }
627
628    #[cfg(feature = "native-v2")]
629    fn kv_get(
630        &self,
631        snapshot_id: crate::snapshot::SnapshotId,
632        key: &[u8],
633    ) -> Result<Option<KvValue>, SqliteGraphError> {
634        let store = self.kv_store.read();
635        store
636            .get_at_snapshot(key, snapshot_id)
637            .map_err(|e| SqliteGraphError::connection(e.to_string()))
638    }
639
640    #[cfg(feature = "native-v2")]
641    fn kv_set(
642        &self,
643        key: Vec<u8>,
644        value: KvValue,
645        ttl_seconds: Option<u64>,
646    ) -> Result<(), SqliteGraphError> {
647        use crate::backend::native::v2::kv_store::wal;
648        use crate::backend::native::v2::wal::record::V2WALRecord;
649
650        let wal_integrator = self.wal_integrator.as_ref().ok_or_else(|| {
651            SqliteGraphError::connection("WAL not available - KV requires native-v2".to_string())
652        })?;
653
654        // Clone key for use in both WAL and store
655        let key_clone = key.clone();
656
657        // Serialize value
658        let value_bytes = wal::serialize_value(&value)
659            .map_err(|e| SqliteGraphError::connection(format!("KV serialization failed: {}", e)))?;
660        let value_type = wal::get_value_type_tag(&value);
661
662        // Create WAL record
663        let wal_record = V2WALRecord::KvSet {
664            key,
665            value_bytes,
666            value_type,
667            ttl_seconds,
668            version: 0, // Will be assigned by WAL manager
669        };
670
671        // Write WAL record and get assigned LSN
672        let commit_lsn = wal_integrator
673            .wal_manager()
674            .write_record(wal_record)
675            .map_err(|e| SqliteGraphError::connection(format!("KV WAL write failed: {:?}", e)))?;
676
677        // Update in-memory store with assigned LSN as version
678        let mut store = self.kv_store.write();
679        store
680            .set_with_version(key_clone, value, ttl_seconds, commit_lsn)
681            .map_err(|e| SqliteGraphError::connection(format!("KV store update failed: {}", e)))?;
682
683        Ok(())
684    }
685
686    #[cfg(feature = "native-v2")]
687    fn kv_delete(&self, key: &[u8]) -> Result<(), SqliteGraphError> {
688        use crate::backend::native::v2::kv_store::wal;
689        use crate::backend::native::v2::wal::record::V2WALRecord;
690
691        let wal_integrator = self.wal_integrator.as_ref().ok_or_else(|| {
692            SqliteGraphError::connection("WAL not available - KV requires native-v2".to_string())
693        })?;
694
695        // Get old value for rollback/recovery
696        let store = self.kv_store.read();
697        let old_value = store
698            .get(key)
699            .map_err(|e| SqliteGraphError::connection(format!("KV get failed: {}", e)))?;
700        drop(store);
701
702        // Serialize old value if exists
703        let (old_value_bytes, old_value_type) = if let Some(ref value) = old_value {
704            let bytes = wal::serialize_value(value).map_err(|e| {
705                SqliteGraphError::connection(format!("KV serialization failed: {}", e))
706            })?;
707            let type_tag = wal::get_value_type_tag(value);
708            (Some(bytes), type_tag)
709        } else {
710            (None, 0)
711        };
712
713        // Create WAL record
714        let wal_record = V2WALRecord::KvDelete {
715            key: key.to_vec(),
716            old_value_bytes,
717            old_value_type,
718            old_version: 0, // Will be assigned by WAL manager
719        };
720
721        // Write WAL record and get assigned LSN
722        let _commit_lsn = wal_integrator
723            .wal_manager()
724            .write_record(wal_record)
725            .map_err(|e| SqliteGraphError::connection(format!("KV WAL delete failed: {:?}", e)))?;
726
727        // Delete from in-memory store
728        let mut store = self.kv_store.write();
729        // Ignore KeyNotFound - delete is idempotent
730        let _ = store.delete(key);
731
732        Ok(())
733    }
734
735    #[cfg(feature = "native-v2")]
736    fn subscribe(
737        &self,
738        filter: crate::backend::SubscriptionFilter,
739    ) -> Result<(u64, std::sync::mpsc::Receiver<crate::backend::PubSubEvent>), SqliteGraphError>
740    {
741        let wal_integrator = self.wal_integrator.as_ref().ok_or_else(|| {
742            SqliteGraphError::connection(
743                "WAL not available - pub/sub requires native-v2".to_string(),
744            )
745        })?;
746
747        let (sub_id, rx) = wal_integrator
748            .wal_manager()
749            .get_publisher()
750            .subscribe(filter);
751        Ok((sub_id.as_u64(), rx))
752    }
753
754    #[cfg(feature = "native-v2")]
755    fn unsubscribe(&self, subscriber_id: u64) -> Result<bool, SqliteGraphError> {
756        use crate::backend::native::v2::pubsub::SubscriberId;
757
758        let wal_integrator = self.wal_integrator.as_ref().ok_or_else(|| {
759            SqliteGraphError::connection(
760                "WAL not available - pub/sub requires native-v2".to_string(),
761            )
762        })?;
763
764        let sub_id = SubscriberId::from_raw(subscriber_id);
765        let removed = wal_integrator
766            .wal_manager()
767            .get_publisher()
768            .unsubscribe(sub_id);
769        Ok(removed)
770    }
771
772    // ========== Pub/Sub Enhancement APIs (v1.4.0) ==========
773
774    #[cfg(feature = "native-v2")]
775    fn kv_prefix_scan(
776        &self,
777        snapshot_id: crate::snapshot::SnapshotId,
778        prefix: &[u8],
779    ) -> Result<
780        Vec<(
781            Vec<u8>,
782            crate::backend::native::v2::kv_store::types::KvValue,
783        )>,
784        SqliteGraphError,
785    > {
786        let store = self.kv_store.read();
787        store
788            .prefix_scan(snapshot_id, prefix)
789            .map_err(|e| SqliteGraphError::connection(e.to_string()))
790    }
791
792    fn query_nodes_by_kind(
793        &self,
794        _snapshot_id: crate::snapshot::SnapshotId,
795        kind: &str,
796    ) -> Result<Vec<i64>, SqliteGraphError> {
797        self.with_graph_file(|graph_file| {
798            // Get the total node count first
799            let header = graph_file.header();
800            let node_count = header.node_count as i64;
801
802            let mut node_store = NodeStore::new(graph_file);
803            let mut results = Vec::new();
804
805            // Scan through all node IDs to find matches
806            // This is O(N) but acceptable for MVP
807            // Future optimization: add kind index
808            for node_id in 1..=node_count {
809                match node_store.read_node(node_id as NativeNodeId) {
810                    Ok(record) => {
811                        if record.kind == kind {
812                            results.push(node_id);
813                        }
814                    }
815                    Err(_) => {
816                        // Skip nodes that can't be read
817                        continue;
818                    }
819                }
820            }
821
822            results.sort_unstable();
823            Ok(results)
824        })
825    }
826
827    fn query_nodes_by_name_pattern(
828        &self,
829        _snapshot_id: crate::snapshot::SnapshotId,
830        pattern: &str,
831    ) -> Result<Vec<i64>, SqliteGraphError> {
832        use crate::backend::native::pattern::glob_matches;
833
834        self.with_graph_file(|graph_file| {
835            // Get the total node count first
836            let header = graph_file.header();
837            let node_count = header.node_count as i64;
838
839            let mut node_store = NodeStore::new(graph_file);
840            let mut results = Vec::new();
841
842            // Scan through all node IDs to find pattern matches
843            for node_id in 1..=node_count {
844                match node_store.read_node(node_id as NativeNodeId) {
845                    Ok(record) => {
846                        if glob_matches(pattern, &record.name) {
847                            results.push(node_id);
848                        }
849                    }
850                    Err(_) => {
851                        // Skip nodes that can't be read
852                        continue;
853                    }
854                }
855            }
856
857            results.sort_unstable();
858            Ok(results)
859        })
860    }
861}
862
863#[cfg(test)]
864mod tests {
865    use super::*;
866
867    #[test]
868    fn test_native_backend_creation() {
869        let backend = NativeGraphBackend::new_temp().unwrap();
870        // Test that backend can be created successfully
871        assert!(true);
872    }
873
874    #[test]
875    fn test_interior_mutability() {
876        let backend = NativeGraphBackend::new_temp().unwrap();
877
878        // Test that we can perform multiple operations
879        let node_id = backend
880            .insert_node(NodeSpec {
881                kind: "Test".to_string(),
882                name: "node1".to_string(),
883                file_path: None,
884                data: serde_json::json!({}),
885            })
886            .unwrap();
887
888        let snapshot = crate::snapshot::SnapshotId::current();
889        let node = backend.get_node(snapshot, node_id).unwrap();
890        assert_eq!(node.name, "node1");
891        assert_eq!(node.kind, "Test");
892    }
893
894    #[cfg(feature = "native-v2")]
895    #[test]
896    fn test_subscribe_to_events() {
897        use crate::backend::SubscriptionFilter;
898        use std::time::Duration;
899
900        // Setup graph
901        let backend = NativeGraphBackend::new_temp().unwrap();
902        let filter = SubscriptionFilter::all();
903
904        // Subscribe
905        let (sub_id, mut rx) = backend.subscribe(filter).unwrap();
906
907        // Make a change (direct node insert - no transaction API in this test)
908        let node_id = backend
909            .insert_node(NodeSpec {
910                kind: "Test".to_string(),
911                name: "test_node".to_string(),
912                file_path: None,
913                data: serde_json::json!({}),
914            })
915            .unwrap();
916
917        // Note: Events are only emitted on WAL commit, which requires transaction API
918        // For now, just verify subscription was successful
919        assert!(sub_id > 0);
920
921        // Unsubscribe
922        let removed = backend.unsubscribe(sub_id).unwrap();
923        assert!(removed);
924    }
925
926    #[cfg(feature = "native-v2")]
927    #[test]
928    fn test_kv_persistence_across_reopen() {
929        use tempfile::TempDir;
930
931        let temp_dir = TempDir::new().unwrap();
932        let db_path = temp_dir.path().join("test.db");
933
934        // Write KV data in first session
935        {
936            let backend = NativeGraphBackend::new(&db_path).unwrap();
937            backend
938                .kv_set(b"test_key".to_vec(), KvValue::Integer(42), None)
939                .unwrap();
940            backend
941                .kv_set(
942                    b"another_key".to_vec(),
943                    KvValue::String("hello".to_string()),
944                    None,
945                )
946                .unwrap();
947
948            // Flush WAL buffer to ensure records are written to disk
949            let wal_integrator = backend.wal_integrator.as_ref().unwrap();
950            wal_integrator.wal_manager().flush().unwrap();
951        }
952
953        // Reopen and verify KV data persists
954        {
955            let backend = NativeGraphBackend::open(&db_path).unwrap();
956            let snapshot = crate::snapshot::SnapshotId::current();
957
958            let result = backend.kv_get(snapshot, b"test_key").unwrap();
959            assert_eq!(result, Some(KvValue::Integer(42)));
960
961            let result2 = backend.kv_get(snapshot, b"another_key").unwrap();
962            assert_eq!(result2, Some(KvValue::String("hello".to_string())));
963
964            // Verify prefix scan works
965            let results = backend.kv_prefix_scan(snapshot, b"test_").unwrap();
966            assert_eq!(results.len(), 1);
967            assert_eq!(results[0].0, b"test_key".to_vec());
968        }
969    }
970
971    #[cfg(feature = "native-v2")]
972    #[test]
973    fn test_flush_wal_buffer() {
974        use tempfile::TempDir;
975
976        let temp_dir = TempDir::new().unwrap();
977        let db_path = temp_dir.path().join("test.db");
978
979        // Test 1: flush() method exists and doesn't crash
980        {
981            let backend = NativeGraphBackend::new(&db_path).unwrap();
982            backend
983                .kv_set(b"test_key".to_vec(), KvValue::Integer(1), None)
984                .unwrap();
985            backend.flush().unwrap();
986
987            let snapshot = crate::snapshot::SnapshotId::current();
988            let result = backend.kv_get(snapshot, b"test_key").unwrap();
989            assert_eq!(result, Some(KvValue::Integer(1)));
990        }
991
992        // Test 2: flushed data persists across reopen
993        {
994            let backend = NativeGraphBackend::open(&db_path).unwrap();
995            let snapshot = crate::snapshot::SnapshotId::current();
996            let result = backend.kv_get(snapshot, b"test_key").unwrap();
997            assert_eq!(
998                result,
999                Some(KvValue::Integer(1)),
1000                "Flushed data should persist"
1001            );
1002        }
1003
1004        // Test 3: WAL file size increases after flush
1005        {
1006            let backend = NativeGraphBackend::open(&db_path).unwrap();
1007            // Write enough data to trigger buffer growth
1008            for i in 0..10 {
1009                backend
1010                    .kv_set(
1011                        format!("bulk_key_{}", i).into_bytes(),
1012                        KvValue::Integer(i),
1013                        None,
1014                    )
1015                    .unwrap();
1016            }
1017            backend.flush().unwrap();
1018
1019            let wal_path = db_path.with_extension("wal");
1020            let wal_size = std::fs::metadata(&wal_path).unwrap().len();
1021            assert!(wal_size > 200, "WAL should contain data after flush");
1022        }
1023    }
1024
1025    // TDD Tests for update_node functionality
1026    // These tests are written FIRST, following TDD principles
1027    // They will FAIL until update_node is implemented
1028
1029    #[cfg(feature = "native-v2")]
1030    #[test]
1031    fn test_update_node_preserves_node_id() {
1032        // Test: update_node should return the same node_id that was passed in
1033        // This ensures we don't allocate new node slots when updating
1034        let backend = NativeGraphBackend::new_temp().unwrap();
1035
1036        // First, create a node
1037        let node_id = backend
1038            .insert_node(NodeSpec {
1039                kind: "File".to_string(),
1040                name: "test.rs".to_string(),
1041                file_path: Some("test.rs".to_string()),
1042                data: serde_json::json!({"hash": "abc123"}),
1043            })
1044            .unwrap();
1045
1046        // Now update it - should return the same node_id
1047        let updated_id = backend
1048            .update_node(
1049                node_id,
1050                NodeSpec {
1051                    kind: "File".to_string(),
1052                    name: "test.rs".to_string(),
1053                    file_path: Some("test.rs".to_string()),
1054                    data: serde_json::json!({"hash": "def456", "updated": true}),
1055                },
1056            )
1057            .expect("update_node should be implemented");
1058
1059        assert_eq!(
1060            updated_id, node_id,
1061            "update_node must return the same node_id - no new allocation"
1062        );
1063
1064        // Verify the data was actually updated
1065        let snapshot = crate::snapshot::SnapshotId::current();
1066        let node = backend.get_node(snapshot, updated_id).unwrap();
1067        assert_eq!(node.kind, "File");
1068        assert_eq!(node.name, "test.rs");
1069
1070        let data: serde_json::Value = node.data;
1071        assert_eq!(data["hash"], "def456");
1072        assert_eq!(data["updated"], true);
1073    }
1074
1075    #[cfg(feature = "native-v2")]
1076    #[test]
1077    fn test_update_node_nonexistent_returns_error() {
1078        // Test: updating a non-existent node should return an error
1079        let backend = NativeGraphBackend::new_temp().unwrap();
1080
1081        let result = backend.update_node(
1082            9999, // Non-existent node_id
1083            NodeSpec {
1084                kind: "File".to_string(),
1085                name: "test.rs".to_string(),
1086                file_path: Some("test.rs".to_string()),
1087                data: serde_json::json!({"hash": "abc123"}),
1088            },
1089        );
1090
1091        assert!(
1092            result.is_err(),
1093            "update_node should return error for non-existent node_id"
1094        );
1095    }
1096
1097    #[cfg(feature = "native-v2")]
1098    #[test]
1099    fn test_multiple_updates_dont_increase_node_count() {
1100        // Test: Multiple consecutive updates should not increase node_count
1101        // This is the regression test for the node region overflow bug
1102        use tempfile::TempDir;
1103
1104        let temp_dir = TempDir::new().unwrap();
1105        let db_path = temp_dir.path().join("test_update_no_increase.db");
1106
1107        // Create initial node
1108        {
1109            let backend = NativeGraphBackend::new(&db_path).unwrap();
1110            let node_id = backend
1111                .insert_node(NodeSpec {
1112                    kind: "File".to_string(),
1113                    name: "main.rs".to_string(),
1114                    file_path: Some("main.rs".to_string()),
1115                    data: serde_json::json!({"version": 1}),
1116                })
1117                .unwrap();
1118
1119            // Get initial node_count
1120            let ids_after_insert = backend.entity_ids().unwrap();
1121            let initial_count = ids_after_insert.len();
1122            assert_eq!(initial_count, 1, "Should have exactly 1 node");
1123
1124            // Perform 100 updates - node count should stay at 1
1125            for i in 2..=100 {
1126                let _ = backend
1127                    .update_node(
1128                        node_id,
1129                        NodeSpec {
1130                            kind: "File".to_string(),
1131                            name: "main.rs".to_string(),
1132                            file_path: Some("main.rs".to_string()),
1133                            data: serde_json::json!({"version": i}),
1134                        },
1135                    )
1136                    .unwrap();
1137
1138                let ids = backend.entity_ids().unwrap();
1139                assert_eq!(
1140                    ids.len(),
1141                    1,
1142                    "Node count should remain 1 after {} updates",
1143                    i - 1
1144                );
1145            }
1146
1147            // Verify final state
1148            let snapshot = crate::snapshot::SnapshotId::current();
1149            let node = backend.get_node(snapshot, node_id).unwrap();
1150            let data: serde_json::Value = node.data;
1151            assert_eq!(data["version"], 100, "Data should reflect last update");
1152        }
1153    }
1154
1155    #[test]
1156    fn test_update_node_interface_exists() {
1157        // Test: Verify update_node method exists on GraphBackend trait
1158        // This is a compile-time test - if update_node doesn't exist, this won't compile
1159        use crate::backend::GraphBackend;
1160
1161        fn has_update_node<B: GraphBackend>(backend: &B) -> bool {
1162            // This function just proves update_node is callable
1163            // Actual functionality tested in other tests
1164            true
1165        }
1166
1167        let backend = NativeGraphBackend::new_temp().unwrap();
1168        assert!(has_update_node(&backend));
1169    }
1170}