Skip to main content

grafeo_engine/
database.rs

1//! The main database struct and operations.
2//!
3//! Start here with [`GrafeoDB`] - it's your handle to everything.
4
5use std::path::Path;
6use std::sync::Arc;
7use std::sync::atomic::AtomicUsize;
8
9use parking_lot::RwLock;
10
11#[cfg(feature = "wal")]
12use grafeo_adapters::storage::wal::{
13    DurabilityMode as WalDurabilityMode, WalConfig, WalManager, WalRecord, WalRecovery,
14};
15use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
16use grafeo_common::types::{EdgeId, NodeId, Value};
17use grafeo_common::utils::error::{Error, Result};
18use grafeo_core::graph::lpg::LpgStore;
19#[cfg(feature = "rdf")]
20use grafeo_core::graph::rdf::RdfStore;
21
22use crate::config::Config;
23use crate::query::cache::QueryCache;
24use crate::session::Session;
25use crate::transaction::TransactionManager;
26
27/// Your handle to a Grafeo database.
28///
29/// Start here. Create one with [`new_in_memory()`](Self::new_in_memory) for
30/// quick experiments, or [`open()`](Self::open) for persistent storage.
31/// Then grab a [`session()`](Self::session) to start querying.
32///
33/// # Examples
34///
35/// ```
36/// use grafeo_engine::GrafeoDB;
37///
38/// // Quick in-memory database
39/// let db = GrafeoDB::new_in_memory();
40///
41/// // Add some data
42/// db.create_node(&["Person"]);
43///
44/// // Query it
45/// let session = db.session();
46/// let result = session.execute("MATCH (p:Person) RETURN p")?;
47/// # Ok::<(), grafeo_common::utils::error::Error>(())
48/// ```
49pub struct GrafeoDB {
50    /// Database configuration.
51    config: Config,
52    /// The underlying graph store.
53    store: Arc<LpgStore>,
54    /// RDF triple store (if RDF feature is enabled).
55    #[cfg(feature = "rdf")]
56    rdf_store: Arc<RdfStore>,
57    /// Transaction manager.
58    tx_manager: Arc<TransactionManager>,
59    /// Unified buffer manager.
60    buffer_manager: Arc<BufferManager>,
61    /// Write-ahead log manager (if durability is enabled).
62    #[cfg(feature = "wal")]
63    wal: Option<Arc<WalManager>>,
64    /// Query cache for parsed and optimized plans.
65    query_cache: Arc<QueryCache>,
66    /// Shared commit counter for auto-GC across sessions.
67    commit_counter: Arc<AtomicUsize>,
68    /// Whether the database is open.
69    is_open: RwLock<bool>,
70    /// Change data capture log for tracking mutations.
71    #[cfg(feature = "cdc")]
72    cdc_log: Arc<crate::cdc::CdcLog>,
73    /// Registered embedding models for text-to-vector conversion.
74    #[cfg(feature = "embed")]
75    embedding_models: RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
76}
77
78impl GrafeoDB {
79    /// Creates an in-memory database - fast to create, gone when dropped.
80    ///
81    /// Use this for tests, experiments, or when you don't need persistence.
82    /// For data that survives restarts, use [`open()`](Self::open) instead.
83    ///
84    /// # Examples
85    ///
86    /// ```
87    /// use grafeo_engine::GrafeoDB;
88    ///
89    /// let db = GrafeoDB::new_in_memory();
90    /// let session = db.session();
91    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
92    /// # Ok::<(), grafeo_common::utils::error::Error>(())
93    /// ```
94    #[must_use]
95    pub fn new_in_memory() -> Self {
96        Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
97    }
98
99    /// Opens a database at the given path, creating it if it doesn't exist.
100    ///
101    /// If you've used this path before, Grafeo recovers your data from the
102    /// write-ahead log automatically. First open on a new path creates an
103    /// empty database.
104    ///
105    /// # Errors
106    ///
107    /// Returns an error if the path isn't writable or recovery fails.
108    ///
109    /// # Examples
110    ///
111    /// ```no_run
112    /// use grafeo_engine::GrafeoDB;
113    ///
114    /// let db = GrafeoDB::open("./my_social_network")?;
115    /// # Ok::<(), grafeo_common::utils::error::Error>(())
116    /// ```
117    #[cfg(feature = "wal")]
118    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
119        Self::with_config(Config::persistent(path.as_ref()))
120    }
121
122    /// Creates a database with custom configuration.
123    ///
124    /// Use this when you need fine-grained control over memory limits,
125    /// thread counts, or persistence settings. For most cases,
126    /// [`new_in_memory()`](Self::new_in_memory) or [`open()`](Self::open)
127    /// are simpler.
128    ///
129    /// # Errors
130    ///
131    /// Returns an error if the database can't be created or recovery fails.
132    ///
133    /// # Examples
134    ///
135    /// ```
136    /// use grafeo_engine::{GrafeoDB, Config};
137    ///
138    /// // In-memory with a 512MB limit
139    /// let config = Config::in_memory()
140    ///     .with_memory_limit(512 * 1024 * 1024);
141    ///
142    /// let db = GrafeoDB::with_config(config)?;
143    /// # Ok::<(), grafeo_common::utils::error::Error>(())
144    /// ```
145    pub fn with_config(config: Config) -> Result<Self> {
146        // Validate configuration before proceeding
147        config
148            .validate()
149            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
150
151        let store = Arc::new(LpgStore::new());
152        #[cfg(feature = "rdf")]
153        let rdf_store = Arc::new(RdfStore::new());
154        let tx_manager = Arc::new(TransactionManager::new());
155
156        // Create buffer manager with configured limits
157        let buffer_config = BufferManagerConfig {
158            budget: config.memory_limit.unwrap_or_else(|| {
159                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
160            }),
161            spill_path: config
162                .spill_path
163                .clone()
164                .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
165            ..BufferManagerConfig::default()
166        };
167        let buffer_manager = BufferManager::new(buffer_config);
168
169        // Initialize WAL if persistence is enabled
170        #[cfg(feature = "wal")]
171        let wal = if config.wal_enabled {
172            if let Some(ref db_path) = config.path {
173                // Create database directory if it doesn't exist
174                std::fs::create_dir_all(db_path)?;
175
176                let wal_path = db_path.join("wal");
177
178                // Check if WAL exists and recover if needed
179                if wal_path.exists() {
180                    let recovery = WalRecovery::new(&wal_path);
181                    let records = recovery.recover()?;
182                    Self::apply_wal_records(&store, &records)?;
183                }
184
185                // Open/create WAL manager with configured durability
186                let wal_durability = match config.wal_durability {
187                    crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
188                    crate::config::DurabilityMode::Batch {
189                        max_delay_ms,
190                        max_records,
191                    } => WalDurabilityMode::Batch {
192                        max_delay_ms,
193                        max_records,
194                    },
195                    crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
196                        WalDurabilityMode::Adaptive { target_interval_ms }
197                    }
198                    crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
199                };
200                let wal_config = WalConfig {
201                    durability: wal_durability,
202                    ..WalConfig::default()
203                };
204                let wal_manager = WalManager::with_config(&wal_path, wal_config)?;
205                Some(Arc::new(wal_manager))
206            } else {
207                None
208            }
209        } else {
210            None
211        };
212
213        // Create query cache with default capacity (1000 queries)
214        let query_cache = Arc::new(QueryCache::default());
215
216        Ok(Self {
217            config,
218            store,
219            #[cfg(feature = "rdf")]
220            rdf_store,
221            tx_manager,
222            buffer_manager,
223            #[cfg(feature = "wal")]
224            wal,
225            query_cache,
226            commit_counter: Arc::new(AtomicUsize::new(0)),
227            is_open: RwLock::new(true),
228            #[cfg(feature = "cdc")]
229            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
230            #[cfg(feature = "embed")]
231            embedding_models: RwLock::new(hashbrown::HashMap::new()),
232        })
233    }
234
235    /// Applies WAL records to restore the database state.
236    #[cfg(feature = "wal")]
237    fn apply_wal_records(store: &LpgStore, records: &[WalRecord]) -> Result<()> {
238        for record in records {
239            match record {
240                WalRecord::CreateNode { id, labels } => {
241                    let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
242                    store.create_node_with_id(*id, &label_refs);
243                }
244                WalRecord::DeleteNode { id } => {
245                    store.delete_node(*id);
246                }
247                WalRecord::CreateEdge {
248                    id,
249                    src,
250                    dst,
251                    edge_type,
252                } => {
253                    store.create_edge_with_id(*id, *src, *dst, edge_type);
254                }
255                WalRecord::DeleteEdge { id } => {
256                    store.delete_edge(*id);
257                }
258                WalRecord::SetNodeProperty { id, key, value } => {
259                    store.set_node_property(*id, key, value.clone());
260                }
261                WalRecord::SetEdgeProperty { id, key, value } => {
262                    store.set_edge_property(*id, key, value.clone());
263                }
264                WalRecord::AddNodeLabel { id, label } => {
265                    store.add_label(*id, label);
266                }
267                WalRecord::RemoveNodeLabel { id, label } => {
268                    store.remove_label(*id, label);
269                }
270                WalRecord::TxCommit { .. }
271                | WalRecord::TxAbort { .. }
272                | WalRecord::Checkpoint { .. } => {
273                    // Transaction control records don't need replay action
274                    // (recovery already filtered to only committed transactions)
275                }
276            }
277        }
278        Ok(())
279    }
280
281    /// Opens a new session for running queries.
282    ///
283    /// Sessions are cheap to create - spin up as many as you need. Each
284    /// gets its own transaction context, so concurrent sessions won't
285    /// block each other on reads.
286    ///
287    /// # Examples
288    ///
289    /// ```
290    /// use grafeo_engine::GrafeoDB;
291    ///
292    /// let db = GrafeoDB::new_in_memory();
293    /// let session = db.session();
294    ///
295    /// // Run queries through the session
296    /// let result = session.execute("MATCH (n) RETURN count(n)")?;
297    /// # Ok::<(), grafeo_common::utils::error::Error>(())
298    /// ```
299    #[must_use]
300    pub fn session(&self) -> Session {
301        #[cfg(feature = "rdf")]
302        let mut session = Session::with_rdf_store_and_adaptive(
303            Arc::clone(&self.store),
304            Arc::clone(&self.rdf_store),
305            Arc::clone(&self.tx_manager),
306            Arc::clone(&self.query_cache),
307            self.config.adaptive.clone(),
308            self.config.factorized_execution,
309            self.config.graph_model,
310            self.config.query_timeout,
311            Arc::clone(&self.commit_counter),
312            self.config.gc_interval,
313        );
314        #[cfg(not(feature = "rdf"))]
315        let mut session = Session::with_adaptive(
316            Arc::clone(&self.store),
317            Arc::clone(&self.tx_manager),
318            Arc::clone(&self.query_cache),
319            self.config.adaptive.clone(),
320            self.config.factorized_execution,
321            self.config.graph_model,
322            self.config.query_timeout,
323            Arc::clone(&self.commit_counter),
324            self.config.gc_interval,
325        );
326
327        #[cfg(feature = "cdc")]
328        session.set_cdc_log(Arc::clone(&self.cdc_log));
329
330        // Suppress unused_mut when cdc is disabled
331        let _ = &mut session;
332
333        session
334    }
335
336    /// Returns the adaptive execution configuration.
337    #[must_use]
338    pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
339        &self.config.adaptive
340    }
341
342    /// Runs a query directly on the database.
343    ///
344    /// A convenience method that creates a temporary session behind the
345    /// scenes. If you're running multiple queries, grab a
346    /// [`session()`](Self::session) instead to avoid the overhead.
347    ///
348    /// # Errors
349    ///
350    /// Returns an error if parsing or execution fails.
351    pub fn execute(&self, query: &str) -> Result<QueryResult> {
352        let session = self.session();
353        session.execute(query)
354    }
355
356    /// Executes a query with parameters and returns the result.
357    ///
358    /// # Errors
359    ///
360    /// Returns an error if the query fails.
361    pub fn execute_with_params(
362        &self,
363        query: &str,
364        params: std::collections::HashMap<String, grafeo_common::types::Value>,
365    ) -> Result<QueryResult> {
366        let session = self.session();
367        session.execute_with_params(query, params)
368    }
369
370    /// Executes a Cypher query and returns the result.
371    ///
372    /// # Errors
373    ///
374    /// Returns an error if the query fails.
375    #[cfg(feature = "cypher")]
376    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
377        let session = self.session();
378        session.execute_cypher(query)
379    }
380
381    /// Executes a Cypher query with parameters and returns the result.
382    ///
383    /// # Errors
384    ///
385    /// Returns an error if the query fails.
386    #[cfg(feature = "cypher")]
387    pub fn execute_cypher_with_params(
388        &self,
389        query: &str,
390        params: std::collections::HashMap<String, grafeo_common::types::Value>,
391    ) -> Result<QueryResult> {
392        use crate::query::processor::{QueryLanguage, QueryProcessor};
393
394        // Create processor
395        let processor = QueryProcessor::for_lpg(Arc::clone(&self.store));
396        processor.process(query, QueryLanguage::Cypher, Some(&params))
397    }
398
399    /// Executes a Gremlin query and returns the result.
400    ///
401    /// # Errors
402    ///
403    /// Returns an error if the query fails.
404    #[cfg(feature = "gremlin")]
405    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
406        let session = self.session();
407        session.execute_gremlin(query)
408    }
409
410    /// Executes a Gremlin query with parameters and returns the result.
411    ///
412    /// # Errors
413    ///
414    /// Returns an error if the query fails.
415    #[cfg(feature = "gremlin")]
416    pub fn execute_gremlin_with_params(
417        &self,
418        query: &str,
419        params: std::collections::HashMap<String, grafeo_common::types::Value>,
420    ) -> Result<QueryResult> {
421        let session = self.session();
422        session.execute_gremlin_with_params(query, params)
423    }
424
425    /// Executes a GraphQL query and returns the result.
426    ///
427    /// # Errors
428    ///
429    /// Returns an error if the query fails.
430    #[cfg(feature = "graphql")]
431    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
432        let session = self.session();
433        session.execute_graphql(query)
434    }
435
436    /// Executes a GraphQL query with parameters and returns the result.
437    ///
438    /// # Errors
439    ///
440    /// Returns an error if the query fails.
441    #[cfg(feature = "graphql")]
442    pub fn execute_graphql_with_params(
443        &self,
444        query: &str,
445        params: std::collections::HashMap<String, grafeo_common::types::Value>,
446    ) -> Result<QueryResult> {
447        let session = self.session();
448        session.execute_graphql_with_params(query, params)
449    }
450
451    /// Executes a SQL/PGQ query (SQL:2023 GRAPH_TABLE) and returns the result.
452    ///
453    /// # Errors
454    ///
455    /// Returns an error if the query fails.
456    #[cfg(feature = "sql-pgq")]
457    pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
458        let session = self.session();
459        session.execute_sql(query)
460    }
461
462    /// Executes a SQL/PGQ query with parameters and returns the result.
463    ///
464    /// # Errors
465    ///
466    /// Returns an error if the query fails.
467    #[cfg(feature = "sql-pgq")]
468    pub fn execute_sql_with_params(
469        &self,
470        query: &str,
471        params: std::collections::HashMap<String, grafeo_common::types::Value>,
472    ) -> Result<QueryResult> {
473        use crate::query::processor::{QueryLanguage, QueryProcessor};
474
475        // Create processor
476        let processor = QueryProcessor::for_lpg(Arc::clone(&self.store));
477        processor.process(query, QueryLanguage::SqlPgq, Some(&params))
478    }
479
480    /// Executes a SPARQL query and returns the result.
481    ///
482    /// SPARQL queries operate on the RDF triple store.
483    ///
484    /// # Errors
485    ///
486    /// Returns an error if the query fails.
487    ///
488    /// # Examples
489    ///
490    /// ```no_run
491    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
492    /// use grafeo_engine::GrafeoDB;
493    ///
494    /// let db = GrafeoDB::new_in_memory();
495    /// let result = db.execute_sparql("SELECT ?s ?p ?o WHERE { ?s ?p ?o }")?;
496    /// # Ok(())
497    /// # }
498    /// ```
499    #[cfg(all(feature = "sparql", feature = "rdf"))]
500    pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
501        use crate::query::{
502            Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
503        };
504
505        // Parse and translate the SPARQL query to a logical plan
506        let logical_plan = sparql_translator::translate(query)?;
507
508        // Optimize the plan
509        let optimizer = Optimizer::from_store(&self.store);
510        let optimized_plan = optimizer.optimize(logical_plan)?;
511
512        // Convert to physical plan using RDF planner
513        let planner = RdfPlanner::new(Arc::clone(&self.rdf_store));
514        let mut physical_plan = planner.plan(&optimized_plan)?;
515
516        // Execute the plan
517        let executor = Executor::with_columns(physical_plan.columns.clone());
518        executor.execute(physical_plan.operator.as_mut())
519    }
520
521    /// Returns the RDF store.
522    ///
523    /// This provides direct access to the RDF store for triple operations.
524    #[cfg(feature = "rdf")]
525    #[must_use]
526    pub fn rdf_store(&self) -> &Arc<RdfStore> {
527        &self.rdf_store
528    }
529
530    /// Executes a query and returns a single scalar value.
531    ///
532    /// # Errors
533    ///
534    /// Returns an error if the query fails or doesn't return exactly one row.
535    pub fn query_scalar<T: FromValue>(&self, query: &str) -> Result<T> {
536        let result = self.execute(query)?;
537        result.scalar()
538    }
539
540    /// Returns the configuration.
541    #[must_use]
542    pub fn config(&self) -> &Config {
543        &self.config
544    }
545
546    /// Returns the graph data model of this database.
547    #[must_use]
548    pub fn graph_model(&self) -> crate::config::GraphModel {
549        self.config.graph_model
550    }
551
552    /// Returns the configured memory limit in bytes, if any.
553    #[must_use]
554    pub fn memory_limit(&self) -> Option<usize> {
555        self.config.memory_limit
556    }
557
558    /// Returns the underlying store.
559    ///
560    /// This provides direct access to the LPG store for algorithm implementations.
561    #[must_use]
562    pub fn store(&self) -> &Arc<LpgStore> {
563        &self.store
564    }
565
566    /// Garbage collects old MVCC versions that are no longer visible.
567    ///
568    /// Determines the minimum epoch required by active transactions and prunes
569    /// version chains older than that threshold. Also cleans up completed
570    /// transaction metadata in the transaction manager.
571    pub fn gc(&self) {
572        let min_epoch = self.tx_manager.min_active_epoch();
573        self.store.gc_versions(min_epoch);
574        self.tx_manager.gc();
575    }
576
577    /// Returns the buffer manager for memory-aware operations.
578    #[must_use]
579    pub fn buffer_manager(&self) -> &Arc<BufferManager> {
580        &self.buffer_manager
581    }
582
583    /// Closes the database, flushing all pending writes.
584    ///
585    /// For persistent databases, this ensures everything is safely on disk.
586    /// Called automatically when the database is dropped, but you can call
587    /// it explicitly if you need to guarantee durability at a specific point.
588    ///
589    /// # Errors
590    ///
591    /// Returns an error if the WAL can't be flushed (check disk space/permissions).
592    pub fn close(&self) -> Result<()> {
593        let mut is_open = self.is_open.write();
594        if !*is_open {
595            return Ok(());
596        }
597
598        // Commit and checkpoint WAL
599        #[cfg(feature = "wal")]
600        if let Some(ref wal) = self.wal {
601            let epoch = self.store.current_epoch();
602
603            // Use the last assigned transaction ID, or create a checkpoint-only tx
604            let checkpoint_tx = self.tx_manager.last_assigned_tx_id().unwrap_or_else(|| {
605                // No transactions have been started; begin one for checkpoint
606                self.tx_manager.begin()
607            });
608
609            // Log a TxCommit to mark all pending records as committed
610            wal.log(&WalRecord::TxCommit {
611                tx_id: checkpoint_tx,
612            })?;
613
614            // Then checkpoint
615            wal.checkpoint(checkpoint_tx, epoch)?;
616            wal.sync()?;
617        }
618
619        *is_open = false;
620        Ok(())
621    }
622
623    /// Returns the WAL manager if available.
624    #[cfg(feature = "wal")]
625    #[must_use]
626    pub fn wal(&self) -> Option<&Arc<WalManager>> {
627        self.wal.as_ref()
628    }
629
630    /// Logs a WAL record if WAL is enabled.
631    #[cfg(feature = "wal")]
632    fn log_wal(&self, record: &WalRecord) -> Result<()> {
633        if let Some(ref wal) = self.wal {
634            wal.log(record)?;
635        }
636        Ok(())
637    }
638
639    /// Returns the number of nodes in the database.
640    #[must_use]
641    pub fn node_count(&self) -> usize {
642        self.store.node_count()
643    }
644
645    /// Returns the number of edges in the database.
646    #[must_use]
647    pub fn edge_count(&self) -> usize {
648        self.store.edge_count()
649    }
650
651    /// Returns the number of distinct labels in the database.
652    #[must_use]
653    pub fn label_count(&self) -> usize {
654        self.store.label_count()
655    }
656
657    /// Returns the number of distinct property keys in the database.
658    #[must_use]
659    pub fn property_key_count(&self) -> usize {
660        self.store.property_key_count()
661    }
662
663    /// Returns the number of distinct edge types in the database.
664    #[must_use]
665    pub fn edge_type_count(&self) -> usize {
666        self.store.edge_type_count()
667    }
668
669    // === Node Operations ===
670
671    /// Creates a node with the given labels and returns its ID.
672    ///
673    /// Labels categorize nodes - think of them like tags. A node can have
674    /// multiple labels (e.g., `["Person", "Employee"]`).
675    ///
676    /// # Examples
677    ///
678    /// ```
679    /// use grafeo_engine::GrafeoDB;
680    ///
681    /// let db = GrafeoDB::new_in_memory();
682    /// let alice = db.create_node(&["Person"]);
683    /// let company = db.create_node(&["Company", "Startup"]);
684    /// ```
685    pub fn create_node(&self, labels: &[&str]) -> grafeo_common::types::NodeId {
686        let id = self.store.create_node(labels);
687
688        // Log to WAL if enabled
689        #[cfg(feature = "wal")]
690        if let Err(e) = self.log_wal(&WalRecord::CreateNode {
691            id,
692            labels: labels.iter().map(|s| (*s).to_string()).collect(),
693        }) {
694            tracing::warn!("Failed to log CreateNode to WAL: {}", e);
695        }
696
697        #[cfg(feature = "cdc")]
698        self.cdc_log
699            .record_create_node(id, self.store.current_epoch(), None);
700
701        id
702    }
703
704    /// Creates a new node with labels and properties.
705    ///
706    /// If WAL is enabled, the operation is logged for durability.
707    pub fn create_node_with_props(
708        &self,
709        labels: &[&str],
710        properties: impl IntoIterator<
711            Item = (
712                impl Into<grafeo_common::types::PropertyKey>,
713                impl Into<grafeo_common::types::Value>,
714            ),
715        >,
716    ) -> grafeo_common::types::NodeId {
717        // Collect properties first so we can log them to WAL
718        let props: Vec<(
719            grafeo_common::types::PropertyKey,
720            grafeo_common::types::Value,
721        )> = properties
722            .into_iter()
723            .map(|(k, v)| (k.into(), v.into()))
724            .collect();
725
726        let id = self
727            .store
728            .create_node_with_props(labels, props.iter().map(|(k, v)| (k.clone(), v.clone())));
729
730        // Build CDC snapshot before WAL consumes props
731        #[cfg(feature = "cdc")]
732        let cdc_props: std::collections::HashMap<String, grafeo_common::types::Value> = props
733            .iter()
734            .map(|(k, v)| (k.to_string(), v.clone()))
735            .collect();
736
737        // Log node creation to WAL
738        #[cfg(feature = "wal")]
739        {
740            if let Err(e) = self.log_wal(&WalRecord::CreateNode {
741                id,
742                labels: labels.iter().map(|s| (*s).to_string()).collect(),
743            }) {
744                tracing::warn!("Failed to log CreateNode to WAL: {}", e);
745            }
746
747            // Log each property to WAL for full durability
748            for (key, value) in props {
749                if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
750                    id,
751                    key: key.to_string(),
752                    value,
753                }) {
754                    tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
755                }
756            }
757        }
758
759        #[cfg(feature = "cdc")]
760        self.cdc_log.record_create_node(
761            id,
762            self.store.current_epoch(),
763            if cdc_props.is_empty() {
764                None
765            } else {
766                Some(cdc_props)
767            },
768        );
769
770        // Auto-insert into matching text indexes for the new node
771        #[cfg(feature = "text-index")]
772        if let Some(node) = self.store.get_node(id) {
773            for label in &node.labels {
774                for (prop_key, prop_val) in &node.properties {
775                    if let grafeo_common::types::Value::String(text) = prop_val
776                        && let Some(index) =
777                            self.store.get_text_index(label.as_str(), prop_key.as_ref())
778                    {
779                        index.write().insert(id, text);
780                    }
781                }
782            }
783        }
784
785        id
786    }
787
788    /// Gets a node by ID.
789    #[must_use]
790    pub fn get_node(
791        &self,
792        id: grafeo_common::types::NodeId,
793    ) -> Option<grafeo_core::graph::lpg::Node> {
794        self.store.get_node(id)
795    }
796
797    /// Deletes a node and all its edges.
798    ///
799    /// If WAL is enabled, the operation is logged for durability.
800    pub fn delete_node(&self, id: grafeo_common::types::NodeId) -> bool {
801        // Capture properties for CDC before deletion
802        #[cfg(feature = "cdc")]
803        let cdc_props = self.store.get_node(id).map(|node| {
804            node.properties
805                .iter()
806                .map(|(k, v)| (k.to_string(), v.clone()))
807                .collect::<std::collections::HashMap<String, grafeo_common::types::Value>>()
808        });
809
810        // Collect matching vector indexes BEFORE deletion removes labels
811        #[cfg(feature = "vector-index")]
812        let indexes_to_clean: Vec<std::sync::Arc<grafeo_core::index::vector::HnswIndex>> = self
813            .store
814            .get_node(id)
815            .map(|node| {
816                let mut indexes = Vec::new();
817                for label in &node.labels {
818                    let prefix = format!("{}:", label.as_str());
819                    for (key, index) in self.store.vector_index_entries() {
820                        if key.starts_with(&prefix) {
821                            indexes.push(index);
822                        }
823                    }
824                }
825                indexes
826            })
827            .unwrap_or_default();
828
829        // Collect matching text indexes BEFORE deletion removes labels
830        #[cfg(feature = "text-index")]
831        let text_indexes_to_clean: Vec<
832            std::sync::Arc<parking_lot::RwLock<grafeo_core::index::text::InvertedIndex>>,
833        > = self
834            .store
835            .get_node(id)
836            .map(|node| {
837                let mut indexes = Vec::new();
838                for label in &node.labels {
839                    let prefix = format!("{}:", label.as_str());
840                    for (key, index) in self.store.text_index_entries() {
841                        if key.starts_with(&prefix) {
842                            indexes.push(index);
843                        }
844                    }
845                }
846                indexes
847            })
848            .unwrap_or_default();
849
850        let result = self.store.delete_node(id);
851
852        // Remove from vector indexes after successful deletion
853        #[cfg(feature = "vector-index")]
854        if result {
855            for index in indexes_to_clean {
856                index.remove(id);
857            }
858        }
859
860        // Remove from text indexes after successful deletion
861        #[cfg(feature = "text-index")]
862        if result {
863            for index in text_indexes_to_clean {
864                index.write().remove(id);
865            }
866        }
867
868        #[cfg(feature = "wal")]
869        if result && let Err(e) = self.log_wal(&WalRecord::DeleteNode { id }) {
870            tracing::warn!("Failed to log DeleteNode to WAL: {}", e);
871        }
872
873        #[cfg(feature = "cdc")]
874        if result {
875            self.cdc_log.record_delete(
876                crate::cdc::EntityId::Node(id),
877                self.store.current_epoch(),
878                cdc_props,
879            );
880        }
881
882        result
883    }
884
885    /// Sets a property on a node.
886    ///
887    /// If WAL is enabled, the operation is logged for durability.
888    pub fn set_node_property(
889        &self,
890        id: grafeo_common::types::NodeId,
891        key: &str,
892        value: grafeo_common::types::Value,
893    ) {
894        // Extract vector data before the value is moved into the store
895        #[cfg(feature = "vector-index")]
896        let vector_data = match &value {
897            grafeo_common::types::Value::Vector(v) => Some(v.clone()),
898            _ => None,
899        };
900
901        // Log to WAL first
902        #[cfg(feature = "wal")]
903        if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
904            id,
905            key: key.to_string(),
906            value: value.clone(),
907        }) {
908            tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
909        }
910
911        // Capture old value for CDC before the store write
912        #[cfg(feature = "cdc")]
913        let cdc_old_value = self
914            .store
915            .get_node_property(id, &grafeo_common::types::PropertyKey::new(key));
916        #[cfg(feature = "cdc")]
917        let cdc_new_value = value.clone();
918
919        self.store.set_node_property(id, key, value);
920
921        #[cfg(feature = "cdc")]
922        self.cdc_log.record_update(
923            crate::cdc::EntityId::Node(id),
924            self.store.current_epoch(),
925            key,
926            cdc_old_value,
927            cdc_new_value,
928        );
929
930        // Auto-insert into matching vector indexes
931        #[cfg(feature = "vector-index")]
932        if let Some(vec) = vector_data
933            && let Some(node) = self.store.get_node(id)
934        {
935            for label in &node.labels {
936                if let Some(index) = self.store.get_vector_index(label.as_str(), key) {
937                    let accessor =
938                        grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, key);
939                    index.insert(id, &vec, &accessor);
940                }
941            }
942        }
943
944        // Auto-update matching text indexes
945        #[cfg(feature = "text-index")]
946        if let Some(node) = self.store.get_node(id) {
947            let text_val = node
948                .properties
949                .get(&grafeo_common::types::PropertyKey::new(key))
950                .and_then(|v| match v {
951                    grafeo_common::types::Value::String(s) => Some(s.to_string()),
952                    _ => None,
953                });
954            for label in &node.labels {
955                if let Some(index) = self.store.get_text_index(label.as_str(), key) {
956                    let mut idx = index.write();
957                    if let Some(ref text) = text_val {
958                        idx.insert(id, text);
959                    } else {
960                        idx.remove(id);
961                    }
962                }
963            }
964        }
965    }
966
967    /// Adds a label to an existing node.
968    ///
969    /// Returns `true` if the label was added, `false` if the node doesn't exist
970    /// or already has the label.
971    ///
972    /// # Examples
973    ///
974    /// ```
975    /// use grafeo_engine::GrafeoDB;
976    ///
977    /// let db = GrafeoDB::new_in_memory();
978    /// let alice = db.create_node(&["Person"]);
979    ///
980    /// // Promote Alice to Employee
981    /// let added = db.add_node_label(alice, "Employee");
982    /// assert!(added);
983    /// ```
984    pub fn add_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
985        let result = self.store.add_label(id, label);
986
987        #[cfg(feature = "wal")]
988        if result {
989            // Log to WAL if enabled
990            if let Err(e) = self.log_wal(&WalRecord::AddNodeLabel {
991                id,
992                label: label.to_string(),
993            }) {
994                tracing::warn!("Failed to log AddNodeLabel to WAL: {}", e);
995            }
996        }
997
998        // Auto-insert into vector indexes for the newly-added label
999        #[cfg(feature = "vector-index")]
1000        if result {
1001            let prefix = format!("{label}:");
1002            for (key, index) in self.store.vector_index_entries() {
1003                if let Some(property) = key.strip_prefix(&prefix)
1004                    && let Some(node) = self.store.get_node(id)
1005                {
1006                    let prop_key = grafeo_common::types::PropertyKey::new(property);
1007                    if let Some(grafeo_common::types::Value::Vector(v)) =
1008                        node.properties.get(&prop_key)
1009                    {
1010                        let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
1011                            &self.store,
1012                            property,
1013                        );
1014                        index.insert(id, v, &accessor);
1015                    }
1016                }
1017            }
1018        }
1019
1020        // Auto-insert into text indexes for the newly-added label
1021        #[cfg(feature = "text-index")]
1022        if result && let Some(node) = self.store.get_node(id) {
1023            for (prop_key, prop_val) in &node.properties {
1024                if let grafeo_common::types::Value::String(text) = prop_val
1025                    && let Some(index) = self.store.get_text_index(label, prop_key.as_ref())
1026                {
1027                    index.write().insert(id, text);
1028                }
1029            }
1030        }
1031
1032        result
1033    }
1034
1035    /// Removes a label from a node.
1036    ///
1037    /// Returns `true` if the label was removed, `false` if the node doesn't exist
1038    /// or doesn't have the label.
1039    ///
1040    /// # Examples
1041    ///
1042    /// ```
1043    /// use grafeo_engine::GrafeoDB;
1044    ///
1045    /// let db = GrafeoDB::new_in_memory();
1046    /// let alice = db.create_node(&["Person", "Employee"]);
1047    ///
1048    /// // Remove Employee status
1049    /// let removed = db.remove_node_label(alice, "Employee");
1050    /// assert!(removed);
1051    /// ```
1052    pub fn remove_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
1053        // Collect text indexes to clean BEFORE removing the label
1054        #[cfg(feature = "text-index")]
1055        let text_indexes_to_clean: Vec<
1056            std::sync::Arc<parking_lot::RwLock<grafeo_core::index::text::InvertedIndex>>,
1057        > = {
1058            let prefix = format!("{label}:");
1059            self.store
1060                .text_index_entries()
1061                .into_iter()
1062                .filter(|(key, _)| key.starts_with(&prefix))
1063                .map(|(_, index)| index)
1064                .collect()
1065        };
1066
1067        let result = self.store.remove_label(id, label);
1068
1069        #[cfg(feature = "wal")]
1070        if result {
1071            // Log to WAL if enabled
1072            if let Err(e) = self.log_wal(&WalRecord::RemoveNodeLabel {
1073                id,
1074                label: label.to_string(),
1075            }) {
1076                tracing::warn!("Failed to log RemoveNodeLabel to WAL: {}", e);
1077            }
1078        }
1079
1080        // Remove from text indexes for the removed label
1081        #[cfg(feature = "text-index")]
1082        if result {
1083            for index in text_indexes_to_clean {
1084                index.write().remove(id);
1085            }
1086        }
1087
1088        result
1089    }
1090
1091    /// Gets all labels for a node.
1092    ///
1093    /// Returns `None` if the node doesn't exist.
1094    ///
1095    /// # Examples
1096    ///
1097    /// ```
1098    /// use grafeo_engine::GrafeoDB;
1099    ///
1100    /// let db = GrafeoDB::new_in_memory();
1101    /// let alice = db.create_node(&["Person", "Employee"]);
1102    ///
1103    /// let labels = db.get_node_labels(alice).unwrap();
1104    /// assert!(labels.contains(&"Person".to_string()));
1105    /// assert!(labels.contains(&"Employee".to_string()));
1106    /// ```
1107    #[must_use]
1108    pub fn get_node_labels(&self, id: grafeo_common::types::NodeId) -> Option<Vec<String>> {
1109        self.store
1110            .get_node(id)
1111            .map(|node| node.labels.iter().map(|s| s.to_string()).collect())
1112    }
1113
1114    // === Edge Operations ===
1115
1116    /// Creates an edge (relationship) between two nodes.
1117    ///
1118    /// Edges connect nodes and have a type that describes the relationship.
1119    /// They're directed - the order of `src` and `dst` matters.
1120    ///
1121    /// # Examples
1122    ///
1123    /// ```
1124    /// use grafeo_engine::GrafeoDB;
1125    ///
1126    /// let db = GrafeoDB::new_in_memory();
1127    /// let alice = db.create_node(&["Person"]);
1128    /// let bob = db.create_node(&["Person"]);
1129    ///
1130    /// // Alice knows Bob (directed: Alice -> Bob)
1131    /// let edge = db.create_edge(alice, bob, "KNOWS");
1132    /// ```
1133    pub fn create_edge(
1134        &self,
1135        src: grafeo_common::types::NodeId,
1136        dst: grafeo_common::types::NodeId,
1137        edge_type: &str,
1138    ) -> grafeo_common::types::EdgeId {
1139        let id = self.store.create_edge(src, dst, edge_type);
1140
1141        // Log to WAL if enabled
1142        #[cfg(feature = "wal")]
1143        if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
1144            id,
1145            src,
1146            dst,
1147            edge_type: edge_type.to_string(),
1148        }) {
1149            tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
1150        }
1151
1152        #[cfg(feature = "cdc")]
1153        self.cdc_log
1154            .record_create_edge(id, self.store.current_epoch(), None);
1155
1156        id
1157    }
1158
1159    /// Creates a new edge with properties.
1160    ///
1161    /// If WAL is enabled, the operation is logged for durability.
1162    pub fn create_edge_with_props(
1163        &self,
1164        src: grafeo_common::types::NodeId,
1165        dst: grafeo_common::types::NodeId,
1166        edge_type: &str,
1167        properties: impl IntoIterator<
1168            Item = (
1169                impl Into<grafeo_common::types::PropertyKey>,
1170                impl Into<grafeo_common::types::Value>,
1171            ),
1172        >,
1173    ) -> grafeo_common::types::EdgeId {
1174        // Collect properties first so we can log them to WAL
1175        let props: Vec<(
1176            grafeo_common::types::PropertyKey,
1177            grafeo_common::types::Value,
1178        )> = properties
1179            .into_iter()
1180            .map(|(k, v)| (k.into(), v.into()))
1181            .collect();
1182
1183        let id = self.store.create_edge_with_props(
1184            src,
1185            dst,
1186            edge_type,
1187            props.iter().map(|(k, v)| (k.clone(), v.clone())),
1188        );
1189
1190        // Build CDC snapshot before WAL consumes props
1191        #[cfg(feature = "cdc")]
1192        let cdc_props: std::collections::HashMap<String, grafeo_common::types::Value> = props
1193            .iter()
1194            .map(|(k, v)| (k.to_string(), v.clone()))
1195            .collect();
1196
1197        // Log edge creation to WAL
1198        #[cfg(feature = "wal")]
1199        {
1200            if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
1201                id,
1202                src,
1203                dst,
1204                edge_type: edge_type.to_string(),
1205            }) {
1206                tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
1207            }
1208
1209            // Log each property to WAL for full durability
1210            for (key, value) in props {
1211                if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
1212                    id,
1213                    key: key.to_string(),
1214                    value,
1215                }) {
1216                    tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
1217                }
1218            }
1219        }
1220
1221        #[cfg(feature = "cdc")]
1222        self.cdc_log.record_create_edge(
1223            id,
1224            self.store.current_epoch(),
1225            if cdc_props.is_empty() {
1226                None
1227            } else {
1228                Some(cdc_props)
1229            },
1230        );
1231
1232        id
1233    }
1234
1235    /// Gets an edge by ID.
1236    #[must_use]
1237    pub fn get_edge(
1238        &self,
1239        id: grafeo_common::types::EdgeId,
1240    ) -> Option<grafeo_core::graph::lpg::Edge> {
1241        self.store.get_edge(id)
1242    }
1243
1244    /// Deletes an edge.
1245    ///
1246    /// If WAL is enabled, the operation is logged for durability.
1247    pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
1248        // Capture properties for CDC before deletion
1249        #[cfg(feature = "cdc")]
1250        let cdc_props = self.store.get_edge(id).map(|edge| {
1251            edge.properties
1252                .iter()
1253                .map(|(k, v)| (k.to_string(), v.clone()))
1254                .collect::<std::collections::HashMap<String, grafeo_common::types::Value>>()
1255        });
1256
1257        let result = self.store.delete_edge(id);
1258
1259        #[cfg(feature = "wal")]
1260        if result && let Err(e) = self.log_wal(&WalRecord::DeleteEdge { id }) {
1261            tracing::warn!("Failed to log DeleteEdge to WAL: {}", e);
1262        }
1263
1264        #[cfg(feature = "cdc")]
1265        if result {
1266            self.cdc_log.record_delete(
1267                crate::cdc::EntityId::Edge(id),
1268                self.store.current_epoch(),
1269                cdc_props,
1270            );
1271        }
1272
1273        result
1274    }
1275
1276    /// Sets a property on an edge.
1277    ///
1278    /// If WAL is enabled, the operation is logged for durability.
1279    pub fn set_edge_property(
1280        &self,
1281        id: grafeo_common::types::EdgeId,
1282        key: &str,
1283        value: grafeo_common::types::Value,
1284    ) {
1285        // Log to WAL first
1286        #[cfg(feature = "wal")]
1287        if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
1288            id,
1289            key: key.to_string(),
1290            value: value.clone(),
1291        }) {
1292            tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
1293        }
1294
1295        // Capture old value for CDC before the store write
1296        #[cfg(feature = "cdc")]
1297        let cdc_old_value = self
1298            .store
1299            .get_edge_property(id, &grafeo_common::types::PropertyKey::new(key));
1300        #[cfg(feature = "cdc")]
1301        let cdc_new_value = value.clone();
1302
1303        self.store.set_edge_property(id, key, value);
1304
1305        #[cfg(feature = "cdc")]
1306        self.cdc_log.record_update(
1307            crate::cdc::EntityId::Edge(id),
1308            self.store.current_epoch(),
1309            key,
1310            cdc_old_value,
1311            cdc_new_value,
1312        );
1313    }
1314
1315    /// Removes a property from a node.
1316    ///
1317    /// Returns true if the property existed and was removed, false otherwise.
1318    pub fn remove_node_property(&self, id: grafeo_common::types::NodeId, key: &str) -> bool {
1319        // Note: RemoveProperty WAL records not yet implemented, but operation works in memory
1320        let removed = self.store.remove_node_property(id, key).is_some();
1321
1322        // Remove from matching text indexes
1323        #[cfg(feature = "text-index")]
1324        if removed && let Some(node) = self.store.get_node(id) {
1325            for label in &node.labels {
1326                if let Some(index) = self.store.get_text_index(label.as_str(), key) {
1327                    index.write().remove(id);
1328                }
1329            }
1330        }
1331
1332        removed
1333    }
1334
1335    /// Removes a property from an edge.
1336    ///
1337    /// Returns true if the property existed and was removed, false otherwise.
1338    pub fn remove_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str) -> bool {
1339        // Note: RemoveProperty WAL records not yet implemented, but operation works in memory
1340        self.store.remove_edge_property(id, key).is_some()
1341    }
1342
1343    // =========================================================================
1344    // PROPERTY INDEX API
1345    // =========================================================================
1346
1347    /// Creates an index on a node property for O(1) lookups by value.
1348    ///
1349    /// After creating an index, calls to [`Self::find_nodes_by_property`] will be
1350    /// O(1) instead of O(n) for this property. The index is automatically
1351    /// maintained when properties are set or removed.
1352    ///
1353    /// # Example
1354    ///
1355    /// ```no_run
1356    /// # use grafeo_engine::GrafeoDB;
1357    /// # use grafeo_common::types::Value;
1358    /// # let db = GrafeoDB::new_in_memory();
1359    /// // Create an index on the 'email' property
1360    /// db.create_property_index("email");
1361    ///
1362    /// // Now lookups by email are O(1)
1363    /// let nodes = db.find_nodes_by_property("email", &Value::from("alice@example.com"));
1364    /// ```
1365    pub fn create_property_index(&self, property: &str) {
1366        self.store.create_property_index(property);
1367    }
1368
1369    /// Creates a vector similarity index on a node property.
1370    ///
1371    /// This enables efficient approximate nearest-neighbor search on vector
1372    /// properties. Currently validates the index parameters and scans existing
1373    /// nodes to verify the property contains vectors of the expected dimensions.
1374    ///
1375    /// # Arguments
1376    ///
1377    /// * `label` - Node label to index (e.g., `"Doc"`)
1378    /// * `property` - Property containing vector embeddings (e.g., `"embedding"`)
1379    /// * `dimensions` - Expected vector dimensions (inferred from data if `None`)
1380    /// * `metric` - Distance metric: `"cosine"` (default), `"euclidean"`, `"dot_product"`, `"manhattan"`
1381    /// * `m` - HNSW links per node (default: 16). Higher = better recall, more memory.
1382    /// * `ef_construction` - Construction beam width (default: 128). Higher = better index quality, slower build.
1383    ///
1384    /// # Errors
1385    ///
1386    /// Returns an error if the metric is invalid, no vectors are found, or
1387    /// dimensions don't match.
1388    pub fn create_vector_index(
1389        &self,
1390        label: &str,
1391        property: &str,
1392        dimensions: Option<usize>,
1393        metric: Option<&str>,
1394        m: Option<usize>,
1395        ef_construction: Option<usize>,
1396    ) -> Result<()> {
1397        use grafeo_common::types::{PropertyKey, Value};
1398        use grafeo_core::index::vector::DistanceMetric;
1399
1400        let metric = match metric {
1401            Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1402                grafeo_common::utils::error::Error::Internal(format!(
1403                    "Unknown distance metric '{}'. Use: cosine, euclidean, dot_product, manhattan",
1404                    m
1405                ))
1406            })?,
1407            None => DistanceMetric::Cosine,
1408        };
1409
1410        // Scan nodes to validate vectors exist and check dimensions
1411        let prop_key = PropertyKey::new(property);
1412        let mut found_dims: Option<usize> = dimensions;
1413        let mut vector_count = 0usize;
1414
1415        #[cfg(feature = "vector-index")]
1416        let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1417
1418        for node in self.store.nodes_with_label(label) {
1419            if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1420                if let Some(expected) = found_dims {
1421                    if v.len() != expected {
1422                        return Err(grafeo_common::utils::error::Error::Internal(format!(
1423                            "Vector dimension mismatch: expected {}, found {} on node {}",
1424                            expected,
1425                            v.len(),
1426                            node.id.0
1427                        )));
1428                    }
1429                } else {
1430                    found_dims = Some(v.len());
1431                }
1432                vector_count += 1;
1433                #[cfg(feature = "vector-index")]
1434                vectors.push((node.id, v.to_vec()));
1435            }
1436        }
1437
1438        let Some(dims) = found_dims else {
1439            // No vectors found yet — caller must have supplied explicit dimensions
1440            // so we can create an empty index that auto-populates via set_node_property.
1441            return if let Some(d) = dimensions {
1442                #[cfg(feature = "vector-index")]
1443                {
1444                    use grafeo_core::index::vector::{HnswConfig, HnswIndex};
1445
1446                    let mut config = HnswConfig::new(d, metric);
1447                    if let Some(m_val) = m {
1448                        config = config.with_m(m_val);
1449                    }
1450                    if let Some(ef_c) = ef_construction {
1451                        config = config.with_ef_construction(ef_c);
1452                    }
1453
1454                    let index = HnswIndex::new(config);
1455                    self.store
1456                        .add_vector_index(label, property, Arc::new(index));
1457                }
1458
1459                let _ = (m, ef_construction);
1460                tracing::info!(
1461                    "Empty vector index created: :{label}({property}) - 0 vectors, {d} dimensions, metric={metric_name}",
1462                    metric_name = metric.name()
1463                );
1464                Ok(())
1465            } else {
1466                Err(grafeo_common::utils::error::Error::Internal(format!(
1467                    "No vector properties found on :{label}({property}) and no dimensions specified"
1468                )))
1469            };
1470        };
1471
1472        // Build and populate the HNSW index
1473        #[cfg(feature = "vector-index")]
1474        {
1475            use grafeo_core::index::vector::{HnswConfig, HnswIndex};
1476
1477            let mut config = HnswConfig::new(dims, metric);
1478            if let Some(m_val) = m {
1479                config = config.with_m(m_val);
1480            }
1481            if let Some(ef_c) = ef_construction {
1482                config = config.with_ef_construction(ef_c);
1483            }
1484
1485            let index = HnswIndex::with_capacity(config, vectors.len());
1486            let accessor =
1487                grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, property);
1488            for (node_id, vec) in &vectors {
1489                index.insert(*node_id, vec, &accessor);
1490            }
1491
1492            self.store
1493                .add_vector_index(label, property, Arc::new(index));
1494        }
1495
1496        // Suppress unused variable warnings when vector-index is off
1497        let _ = (m, ef_construction);
1498
1499        tracing::info!(
1500            "Vector index created: :{label}({property}) - {vector_count} vectors, {dims} dimensions, metric={metric_name}",
1501            metric_name = metric.name()
1502        );
1503
1504        Ok(())
1505    }
1506
1507    /// Drops a vector index for the given label and property.
1508    ///
1509    /// Returns `true` if the index existed and was removed, `false` if no
1510    /// index was found.
1511    ///
1512    /// After dropping, [`vector_search`](Self::vector_search) for this
1513    /// label+property pair will return an error.
1514    #[cfg(feature = "vector-index")]
1515    pub fn drop_vector_index(&self, label: &str, property: &str) -> bool {
1516        let removed = self.store.remove_vector_index(label, property);
1517        if removed {
1518            tracing::info!("Vector index dropped: :{label}({property})");
1519        }
1520        removed
1521    }
1522
1523    /// Drops and recreates a vector index, rescanning all matching nodes.
1524    ///
1525    /// This is useful after bulk inserts or when the index may be out of sync.
1526    /// The previous index configuration (dimensions, metric, M, ef\_construction)
1527    /// is preserved.
1528    ///
1529    /// # Errors
1530    ///
1531    /// Returns an error if no vector index exists for this label+property pair,
1532    /// or if the rebuild fails (e.g., no matching vectors found).
1533    #[cfg(feature = "vector-index")]
1534    pub fn rebuild_vector_index(&self, label: &str, property: &str) -> Result<()> {
1535        let config = self
1536            .store
1537            .get_vector_index(label, property)
1538            .map(|idx| idx.config().clone())
1539            .ok_or_else(|| {
1540                grafeo_common::utils::error::Error::Internal(format!(
1541                    "No vector index found for :{label}({property}). Cannot rebuild."
1542                ))
1543            })?;
1544
1545        self.store.remove_vector_index(label, property);
1546
1547        self.create_vector_index(
1548            label,
1549            property,
1550            Some(config.dimensions),
1551            Some(config.metric.name()),
1552            Some(config.m),
1553            Some(config.ef_construction),
1554        )
1555    }
1556
1557    /// Computes a node allowlist from property filters.
1558    ///
1559    /// Supports equality filters (scalar values) and operator filters (Map values
1560    /// with `$`-prefixed keys like `$gt`, `$lt`, `$in`, `$contains`).
1561    ///
1562    /// Returns `None` if filters is `None` or empty (meaning no filtering),
1563    /// or `Some(set)` with the intersection (possibly empty).
1564    #[cfg(feature = "vector-index")]
1565    fn compute_filter_allowlist(
1566        &self,
1567        label: &str,
1568        filters: Option<&std::collections::HashMap<String, Value>>,
1569    ) -> Option<std::collections::HashSet<NodeId>> {
1570        let filters = filters.filter(|f| !f.is_empty())?;
1571
1572        // Start with all nodes for this label
1573        let label_nodes: std::collections::HashSet<NodeId> =
1574            self.store.nodes_by_label(label).into_iter().collect();
1575
1576        let mut allowlist = label_nodes;
1577
1578        for (key, filter_value) in filters {
1579            // Check if this is an operator filter (Map with $-prefixed keys)
1580            let is_operator_filter = matches!(filter_value, Value::Map(ops) if ops.keys().any(|k| k.as_str().starts_with('$')));
1581
1582            let matching: std::collections::HashSet<NodeId> = if is_operator_filter {
1583                // Operator filter: must scan nodes and check each
1584                self.store
1585                    .find_nodes_matching_filter(key, filter_value)
1586                    .into_iter()
1587                    .collect()
1588            } else {
1589                // Equality filter: use indexed lookup when available
1590                self.store
1591                    .find_nodes_by_property(key, filter_value)
1592                    .into_iter()
1593                    .collect()
1594            };
1595            allowlist = allowlist.intersection(&matching).copied().collect();
1596
1597            // Short-circuit: empty intersection means no results possible
1598            if allowlist.is_empty() {
1599                return Some(allowlist);
1600            }
1601        }
1602
1603        Some(allowlist)
1604    }
1605
1606    /// Searches for the k nearest neighbors of a query vector.
1607    ///
1608    /// Uses the HNSW index created by [`create_vector_index`](Self::create_vector_index).
1609    ///
1610    /// # Arguments
1611    ///
1612    /// * `label` - Node label that was indexed
1613    /// * `property` - Property that was indexed
1614    /// * `query` - Query vector (slice of floats)
1615    /// * `k` - Number of nearest neighbors to return
1616    /// * `ef` - Search beam width (higher = better recall, slower). Uses index default if `None`.
1617    /// * `filters` - Optional property equality filters. Only nodes matching all
1618    ///   `(key, value)` pairs will appear in results.
1619    ///
1620    /// # Returns
1621    ///
1622    /// Vector of `(NodeId, distance)` pairs sorted by distance ascending.
1623    #[cfg(feature = "vector-index")]
1624    pub fn vector_search(
1625        &self,
1626        label: &str,
1627        property: &str,
1628        query: &[f32],
1629        k: usize,
1630        ef: Option<usize>,
1631        filters: Option<&std::collections::HashMap<String, Value>>,
1632    ) -> Result<Vec<(grafeo_common::types::NodeId, f32)>> {
1633        let index = self.store.get_vector_index(label, property).ok_or_else(|| {
1634            grafeo_common::utils::error::Error::Internal(format!(
1635                "No vector index found for :{label}({property}). Call create_vector_index() first."
1636            ))
1637        })?;
1638
1639        let accessor =
1640            grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, property);
1641
1642        let results = match self.compute_filter_allowlist(label, filters) {
1643            Some(allowlist) => match ef {
1644                Some(ef_val) => {
1645                    index.search_with_ef_and_filter(query, k, ef_val, &allowlist, &accessor)
1646                }
1647                None => index.search_with_filter(query, k, &allowlist, &accessor),
1648            },
1649            None => match ef {
1650                Some(ef_val) => index.search_with_ef(query, k, ef_val, &accessor),
1651                None => index.search(query, k, &accessor),
1652            },
1653        };
1654
1655        Ok(results)
1656    }
1657
1658    /// Creates multiple nodes in bulk, each with a single vector property.
1659    ///
1660    /// Much faster than individual `create_node_with_props` calls because it
1661    /// acquires internal locks once and loops in Rust rather than crossing
1662    /// the FFI boundary per vector.
1663    ///
1664    /// # Arguments
1665    ///
1666    /// * `label` - Label applied to all created nodes
1667    /// * `property` - Property name for the vector data
1668    /// * `vectors` - Vector data for each node
1669    ///
1670    /// # Returns
1671    ///
1672    /// Vector of created `NodeId`s in the same order as the input vectors.
1673    pub fn batch_create_nodes(
1674        &self,
1675        label: &str,
1676        property: &str,
1677        vectors: Vec<Vec<f32>>,
1678    ) -> Vec<grafeo_common::types::NodeId> {
1679        use grafeo_common::types::{PropertyKey, Value};
1680
1681        let prop_key = PropertyKey::new(property);
1682        let labels: &[&str] = &[label];
1683
1684        let ids: Vec<grafeo_common::types::NodeId> = vectors
1685            .into_iter()
1686            .map(|vec| {
1687                let value = Value::Vector(vec.into());
1688                let id = self.store.create_node_with_props(
1689                    labels,
1690                    std::iter::once((prop_key.clone(), value.clone())),
1691                );
1692
1693                // Log to WAL
1694                #[cfg(feature = "wal")]
1695                {
1696                    if let Err(e) = self.log_wal(&WalRecord::CreateNode {
1697                        id,
1698                        labels: labels.iter().map(|s| (*s).to_string()).collect(),
1699                    }) {
1700                        tracing::warn!("Failed to log CreateNode to WAL: {}", e);
1701                    }
1702                    if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
1703                        id,
1704                        key: property.to_string(),
1705                        value,
1706                    }) {
1707                        tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
1708                    }
1709                }
1710
1711                id
1712            })
1713            .collect();
1714
1715        // Auto-insert into matching vector index if one exists
1716        #[cfg(feature = "vector-index")]
1717        if let Some(index) = self.store.get_vector_index(label, property) {
1718            let accessor =
1719                grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, property);
1720            for &id in &ids {
1721                if let Some(node) = self.store.get_node(id) {
1722                    let pk = grafeo_common::types::PropertyKey::new(property);
1723                    if let Some(grafeo_common::types::Value::Vector(v)) = node.properties.get(&pk) {
1724                        index.insert(id, v, &accessor);
1725                    }
1726                }
1727            }
1728        }
1729
1730        ids
1731    }
1732
1733    /// Searches for nearest neighbors for multiple query vectors in parallel.
1734    ///
1735    /// Uses rayon parallel iteration under the hood for multi-core throughput.
1736    ///
1737    /// # Arguments
1738    ///
1739    /// * `label` - Node label that was indexed
1740    /// * `property` - Property that was indexed
1741    /// * `queries` - Batch of query vectors
1742    /// * `k` - Number of nearest neighbors per query
1743    /// * `ef` - Search beam width (uses index default if `None`)
1744    /// * `filters` - Optional property equality filters
1745    #[cfg(feature = "vector-index")]
1746    pub fn batch_vector_search(
1747        &self,
1748        label: &str,
1749        property: &str,
1750        queries: &[Vec<f32>],
1751        k: usize,
1752        ef: Option<usize>,
1753        filters: Option<&std::collections::HashMap<String, Value>>,
1754    ) -> Result<Vec<Vec<(grafeo_common::types::NodeId, f32)>>> {
1755        let index = self.store.get_vector_index(label, property).ok_or_else(|| {
1756            grafeo_common::utils::error::Error::Internal(format!(
1757                "No vector index found for :{label}({property}). Call create_vector_index() first."
1758            ))
1759        })?;
1760
1761        let accessor =
1762            grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, property);
1763
1764        let results = match self.compute_filter_allowlist(label, filters) {
1765            Some(allowlist) => match ef {
1766                Some(ef_val) => {
1767                    index.batch_search_with_ef_and_filter(queries, k, ef_val, &allowlist, &accessor)
1768                }
1769                None => index.batch_search_with_filter(queries, k, &allowlist, &accessor),
1770            },
1771            None => match ef {
1772                Some(ef_val) => index.batch_search_with_ef(queries, k, ef_val, &accessor),
1773                None => index.batch_search(queries, k, &accessor),
1774            },
1775        };
1776
1777        Ok(results)
1778    }
1779
1780    /// Searches for diverse nearest neighbors using Maximal Marginal Relevance (MMR).
1781    ///
1782    /// MMR balances relevance (similarity to query) with diversity (dissimilarity
1783    /// among selected results). This is the algorithm used by LangChain's
1784    /// `mmr_traversal_search()` for RAG applications.
1785    ///
1786    /// # Arguments
1787    ///
1788    /// * `label` - Node label that was indexed
1789    /// * `property` - Property that was indexed
1790    /// * `query` - Query vector
1791    /// * `k` - Number of diverse results to return
1792    /// * `fetch_k` - Number of initial candidates from HNSW (default: `4 * k`)
1793    /// * `lambda` - Relevance vs. diversity in \[0, 1\] (default: 0.5).
1794    ///   1.0 = pure relevance, 0.0 = pure diversity.
1795    /// * `ef` - HNSW search beam width (uses index default if `None`)
1796    /// * `filters` - Optional property equality filters
1797    ///
1798    /// # Returns
1799    ///
1800    /// `(NodeId, distance)` pairs in MMR selection order. The f32 is the original
1801    /// distance from the query, matching [`vector_search`](Self::vector_search).
1802    #[cfg(feature = "vector-index")]
1803    #[allow(clippy::too_many_arguments)]
1804    pub fn mmr_search(
1805        &self,
1806        label: &str,
1807        property: &str,
1808        query: &[f32],
1809        k: usize,
1810        fetch_k: Option<usize>,
1811        lambda: Option<f32>,
1812        ef: Option<usize>,
1813        filters: Option<&std::collections::HashMap<String, Value>>,
1814    ) -> Result<Vec<(grafeo_common::types::NodeId, f32)>> {
1815        use grafeo_core::index::vector::mmr_select;
1816
1817        let index = self.store.get_vector_index(label, property).ok_or_else(|| {
1818            grafeo_common::utils::error::Error::Internal(format!(
1819                "No vector index found for :{label}({property}). Call create_vector_index() first."
1820            ))
1821        })?;
1822
1823        let accessor =
1824            grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, property);
1825
1826        let fetch_k = fetch_k.unwrap_or(k.saturating_mul(4).max(k));
1827        let lambda = lambda.unwrap_or(0.5);
1828
1829        // Step 1: Fetch candidates from HNSW (with optional filter)
1830        let initial_results = match self.compute_filter_allowlist(label, filters) {
1831            Some(allowlist) => match ef {
1832                Some(ef_val) => {
1833                    index.search_with_ef_and_filter(query, fetch_k, ef_val, &allowlist, &accessor)
1834                }
1835                None => index.search_with_filter(query, fetch_k, &allowlist, &accessor),
1836            },
1837            None => match ef {
1838                Some(ef_val) => index.search_with_ef(query, fetch_k, ef_val, &accessor),
1839                None => index.search(query, fetch_k, &accessor),
1840            },
1841        };
1842
1843        if initial_results.is_empty() {
1844            return Ok(Vec::new());
1845        }
1846
1847        // Step 2: Retrieve stored vectors for MMR pairwise comparison
1848        use grafeo_core::index::vector::VectorAccessor;
1849        let candidates: Vec<(grafeo_common::types::NodeId, f32, std::sync::Arc<[f32]>)> =
1850            initial_results
1851                .into_iter()
1852                .filter_map(|(id, dist)| accessor.get_vector(id).map(|vec| (id, dist, vec)))
1853                .collect();
1854
1855        // Step 3: Build slice-based candidates for mmr_select
1856        let candidate_refs: Vec<(grafeo_common::types::NodeId, f32, &[f32])> = candidates
1857            .iter()
1858            .map(|(id, dist, vec)| (*id, *dist, vec.as_ref()))
1859            .collect();
1860
1861        // Step 4: Run MMR selection
1862        let metric = index.config().metric;
1863        Ok(mmr_select(query, &candidate_refs, k, lambda, metric))
1864    }
1865
1866    // ── Text Search ──────────────────────────────────────────────────────
1867
1868    /// Creates a BM25 text index on a node property for full-text search.
1869    ///
1870    /// Indexes all existing nodes with the given label and property.
1871    /// The index stays in sync automatically as nodes are created, updated,
1872    /// or deleted. Use [`rebuild_text_index`](Self::rebuild_text_index) only
1873    /// if the index was created before existing data was loaded.
1874    ///
1875    /// # Errors
1876    ///
1877    /// Returns an error if the label has no nodes or the property contains no text values.
1878    #[cfg(feature = "text-index")]
1879    pub fn create_text_index(&self, label: &str, property: &str) -> Result<()> {
1880        use grafeo_common::types::PropertyKey;
1881        use grafeo_core::index::text::{BM25Config, InvertedIndex};
1882
1883        let mut index = InvertedIndex::new(BM25Config::default());
1884        let prop_key = PropertyKey::new(property);
1885
1886        // Index all existing nodes with this label + property
1887        let nodes = self.store.nodes_by_label(label);
1888        for node_id in nodes {
1889            if let Some(Value::String(text)) = self.store.get_node_property(node_id, &prop_key) {
1890                index.insert(node_id, text.as_str());
1891            }
1892        }
1893
1894        self.store
1895            .add_text_index(label, property, Arc::new(RwLock::new(index)));
1896        Ok(())
1897    }
1898
1899    /// Drops a text index on a label+property pair.
1900    ///
1901    /// Returns `true` if the index existed and was removed.
1902    #[cfg(feature = "text-index")]
1903    pub fn drop_text_index(&self, label: &str, property: &str) -> bool {
1904        self.store.remove_text_index(label, property)
1905    }
1906
1907    /// Rebuilds a text index by re-scanning all matching nodes.
1908    ///
1909    /// Use after bulk property updates to keep the index current.
1910    ///
1911    /// # Errors
1912    ///
1913    /// Returns an error if no text index exists for this label+property.
1914    #[cfg(feature = "text-index")]
1915    pub fn rebuild_text_index(&self, label: &str, property: &str) -> Result<()> {
1916        self.store.remove_text_index(label, property);
1917        self.create_text_index(label, property)
1918    }
1919
1920    /// Searches a text index using BM25 scoring.
1921    ///
1922    /// Returns up to `k` results as `(NodeId, score)` pairs sorted by
1923    /// descending relevance score.
1924    ///
1925    /// # Errors
1926    ///
1927    /// Returns an error if no text index exists for this label+property.
1928    #[cfg(feature = "text-index")]
1929    pub fn text_search(
1930        &self,
1931        label: &str,
1932        property: &str,
1933        query: &str,
1934        k: usize,
1935    ) -> Result<Vec<(NodeId, f64)>> {
1936        let index = self.store.get_text_index(label, property).ok_or_else(|| {
1937            Error::Internal(format!(
1938                "No text index found for :{label}({property}). Call create_text_index() first."
1939            ))
1940        })?;
1941
1942        Ok(index.read().search(query, k))
1943    }
1944
1945    /// Performs hybrid search combining text (BM25) and vector similarity.
1946    ///
1947    /// Runs both text search and vector search, then fuses results using
1948    /// the specified method (default: Reciprocal Rank Fusion).
1949    ///
1950    /// # Arguments
1951    ///
1952    /// * `label` - Node label to search within
1953    /// * `text_property` - Property indexed for text search
1954    /// * `vector_property` - Property indexed for vector search
1955    /// * `query_text` - Text query for BM25 search
1956    /// * `query_vector` - Vector query for similarity search (optional)
1957    /// * `k` - Number of results to return
1958    /// * `fusion` - Score fusion method (default: RRF with k=60)
1959    ///
1960    /// # Errors
1961    ///
1962    /// Returns an error if the required indexes don't exist.
1963    #[cfg(feature = "hybrid-search")]
1964    #[allow(clippy::too_many_arguments)]
1965    pub fn hybrid_search(
1966        &self,
1967        label: &str,
1968        text_property: &str,
1969        vector_property: &str,
1970        query_text: &str,
1971        query_vector: Option<&[f32]>,
1972        k: usize,
1973        fusion: Option<grafeo_core::index::text::FusionMethod>,
1974    ) -> Result<Vec<(NodeId, f64)>> {
1975        use grafeo_core::index::text::fuse_results;
1976
1977        let fusion_method = fusion.unwrap_or_default();
1978        let mut sources: Vec<Vec<(NodeId, f64)>> = Vec::new();
1979
1980        // Text search
1981        if let Some(text_index) = self.store.get_text_index(label, text_property) {
1982            let text_results = text_index.read().search(query_text, k * 2);
1983            if !text_results.is_empty() {
1984                sources.push(text_results);
1985            }
1986        }
1987
1988        // Vector search (if query vector provided)
1989        if let Some(query_vec) = query_vector
1990            && let Some(vector_index) = self.store.get_vector_index(label, vector_property)
1991        {
1992            let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
1993                &self.store,
1994                vector_property,
1995            );
1996            let vector_results = vector_index.search(query_vec, k * 2, &accessor);
1997            if !vector_results.is_empty() {
1998                sources.push(
1999                    vector_results
2000                        .into_iter()
2001                        .map(|(id, dist)| (id, f64::from(dist)))
2002                        .collect(),
2003                );
2004            }
2005        }
2006
2007        if sources.is_empty() {
2008            return Ok(Vec::new());
2009        }
2010
2011        Ok(fuse_results(&sources, &fusion_method, k))
2012    }
2013
2014    // ── Embedding ────────────────────────────────────────────────────────
2015
2016    /// Registers an embedding model for text-to-vector conversion.
2017    ///
2018    /// Once registered, you can use [`embed_text()`](Self::embed_text) and
2019    /// [`vector_search_text()`](Self::vector_search_text) with the model name.
2020    #[cfg(feature = "embed")]
2021    pub fn register_embedding_model(
2022        &self,
2023        name: &str,
2024        model: Arc<dyn crate::embedding::EmbeddingModel>,
2025    ) {
2026        self.embedding_models
2027            .write()
2028            .insert(name.to_string(), model);
2029    }
2030
2031    /// Generates embeddings for a batch of texts using a registered model.
2032    ///
2033    /// # Errors
2034    ///
2035    /// Returns an error if the model is not registered or embedding fails.
2036    #[cfg(feature = "embed")]
2037    pub fn embed_text(&self, model_name: &str, texts: &[&str]) -> Result<Vec<Vec<f32>>> {
2038        let models = self.embedding_models.read();
2039        let model = models.get(model_name).ok_or_else(|| {
2040            grafeo_common::utils::error::Error::Internal(format!(
2041                "Embedding model '{}' not registered",
2042                model_name
2043            ))
2044        })?;
2045        model.embed(texts)
2046    }
2047
2048    /// Searches a vector index using a text query, generating the embedding on-the-fly.
2049    ///
2050    /// This combines [`embed_text()`](Self::embed_text) with
2051    /// [`vector_search()`](Self::vector_search) in a single call.
2052    ///
2053    /// # Errors
2054    ///
2055    /// Returns an error if the model is not registered, embedding fails,
2056    /// or the vector index doesn't exist.
2057    #[cfg(all(feature = "embed", feature = "vector-index"))]
2058    pub fn vector_search_text(
2059        &self,
2060        label: &str,
2061        property: &str,
2062        model_name: &str,
2063        query_text: &str,
2064        k: usize,
2065        ef: Option<usize>,
2066    ) -> Result<Vec<(grafeo_common::types::NodeId, f32)>> {
2067        let vectors = self.embed_text(model_name, &[query_text])?;
2068        let query_vec = vectors.into_iter().next().ok_or_else(|| {
2069            grafeo_common::utils::error::Error::Internal(
2070                "Embedding model returned no vectors".to_string(),
2071            )
2072        })?;
2073        self.vector_search(label, property, &query_vec, k, ef, None)
2074    }
2075
2076    // ── Change Data Capture ─────────────────────────────────────────────
2077
2078    /// Returns the full change history for an entity (node or edge).
2079    ///
2080    /// Events are ordered chronologically by epoch.
2081    ///
2082    /// # Errors
2083    ///
2084    /// Returns an error if the CDC feature is not enabled.
2085    #[cfg(feature = "cdc")]
2086    pub fn history(
2087        &self,
2088        entity_id: impl Into<crate::cdc::EntityId>,
2089    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
2090        Ok(self.cdc_log.history(entity_id.into()))
2091    }
2092
2093    /// Returns change events for an entity since the given epoch.
2094    #[cfg(feature = "cdc")]
2095    pub fn history_since(
2096        &self,
2097        entity_id: impl Into<crate::cdc::EntityId>,
2098        since_epoch: grafeo_common::types::EpochId,
2099    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
2100        Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
2101    }
2102
2103    /// Returns all change events across all entities in an epoch range.
2104    #[cfg(feature = "cdc")]
2105    pub fn changes_between(
2106        &self,
2107        start_epoch: grafeo_common::types::EpochId,
2108        end_epoch: grafeo_common::types::EpochId,
2109    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
2110        Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
2111    }
2112
2113    // ── Property Indexes ────────────────────────────────────────────────
2114
2115    /// Drops an index on a node property.
2116    ///
2117    /// Returns `true` if the index existed and was removed.
2118    pub fn drop_property_index(&self, property: &str) -> bool {
2119        self.store.drop_property_index(property)
2120    }
2121
2122    /// Returns `true` if the property has an index.
2123    #[must_use]
2124    pub fn has_property_index(&self, property: &str) -> bool {
2125        self.store.has_property_index(property)
2126    }
2127
2128    /// Finds all nodes that have a specific property value.
2129    ///
2130    /// If the property is indexed, this is O(1). Otherwise, it scans all nodes
2131    /// which is O(n). Use [`Self::create_property_index`] for frequently queried properties.
2132    ///
2133    /// # Example
2134    ///
2135    /// ```no_run
2136    /// # use grafeo_engine::GrafeoDB;
2137    /// # use grafeo_common::types::Value;
2138    /// # let db = GrafeoDB::new_in_memory();
2139    /// // Create index for fast lookups (optional but recommended)
2140    /// db.create_property_index("city");
2141    ///
2142    /// // Find all nodes where city = "NYC"
2143    /// let nyc_nodes = db.find_nodes_by_property("city", &Value::from("NYC"));
2144    /// ```
2145    #[must_use]
2146    pub fn find_nodes_by_property(
2147        &self,
2148        property: &str,
2149        value: &grafeo_common::types::Value,
2150    ) -> Vec<grafeo_common::types::NodeId> {
2151        self.store.find_nodes_by_property(property, value)
2152    }
2153
2154    // =========================================================================
2155    // ADMIN API: Introspection
2156    // =========================================================================
2157
2158    /// Returns true if this database is backed by a file (persistent).
2159    ///
2160    /// In-memory databases return false.
2161    #[must_use]
2162    pub fn is_persistent(&self) -> bool {
2163        self.config.path.is_some()
2164    }
2165
2166    /// Returns the database file path, if persistent.
2167    ///
2168    /// In-memory databases return None.
2169    #[must_use]
2170    pub fn path(&self) -> Option<&Path> {
2171        self.config.path.as_deref()
2172    }
2173
2174    /// Returns high-level database information.
2175    ///
2176    /// Includes node/edge counts, persistence status, and mode (LPG/RDF).
2177    #[must_use]
2178    pub fn info(&self) -> crate::admin::DatabaseInfo {
2179        crate::admin::DatabaseInfo {
2180            mode: crate::admin::DatabaseMode::Lpg,
2181            node_count: self.store.node_count(),
2182            edge_count: self.store.edge_count(),
2183            is_persistent: self.is_persistent(),
2184            path: self.config.path.clone(),
2185            wal_enabled: self.config.wal_enabled,
2186            version: env!("CARGO_PKG_VERSION").to_string(),
2187        }
2188    }
2189
2190    /// Returns detailed database statistics.
2191    ///
2192    /// Includes counts, memory usage, and index information.
2193    #[must_use]
2194    pub fn detailed_stats(&self) -> crate::admin::DatabaseStats {
2195        #[cfg(feature = "wal")]
2196        let disk_bytes = self.config.path.as_ref().and_then(|p| {
2197            if p.exists() {
2198                Self::calculate_disk_usage(p).ok()
2199            } else {
2200                None
2201            }
2202        });
2203        #[cfg(not(feature = "wal"))]
2204        let disk_bytes: Option<usize> = None;
2205
2206        crate::admin::DatabaseStats {
2207            node_count: self.store.node_count(),
2208            edge_count: self.store.edge_count(),
2209            label_count: self.store.label_count(),
2210            edge_type_count: self.store.edge_type_count(),
2211            property_key_count: self.store.property_key_count(),
2212            index_count: 0, // TODO: implement index tracking
2213            memory_bytes: self.buffer_manager.allocated(),
2214            disk_bytes,
2215        }
2216    }
2217
2218    /// Calculates total disk usage for the database directory.
2219    #[cfg(feature = "wal")]
2220    fn calculate_disk_usage(path: &Path) -> Result<usize> {
2221        let mut total = 0usize;
2222        if path.is_dir() {
2223            for entry in std::fs::read_dir(path)? {
2224                let entry = entry?;
2225                let metadata = entry.metadata()?;
2226                if metadata.is_file() {
2227                    total += metadata.len() as usize;
2228                } else if metadata.is_dir() {
2229                    total += Self::calculate_disk_usage(&entry.path())?;
2230                }
2231            }
2232        }
2233        Ok(total)
2234    }
2235
2236    /// Returns schema information (labels, edge types, property keys).
2237    ///
2238    /// For LPG mode, returns label and edge type information.
2239    /// For RDF mode, returns predicate and named graph information.
2240    #[must_use]
2241    pub fn schema(&self) -> crate::admin::SchemaInfo {
2242        let labels = self
2243            .store
2244            .all_labels()
2245            .into_iter()
2246            .map(|name| crate::admin::LabelInfo {
2247                name: name.clone(),
2248                count: self.store.nodes_with_label(&name).count(),
2249            })
2250            .collect();
2251
2252        let edge_types = self
2253            .store
2254            .all_edge_types()
2255            .into_iter()
2256            .map(|name| crate::admin::EdgeTypeInfo {
2257                name: name.clone(),
2258                count: self.store.edges_with_type(&name).count(),
2259            })
2260            .collect();
2261
2262        let property_keys = self.store.all_property_keys();
2263
2264        crate::admin::SchemaInfo::Lpg(crate::admin::LpgSchemaInfo {
2265            labels,
2266            edge_types,
2267            property_keys,
2268        })
2269    }
2270
2271    /// Returns RDF schema information.
2272    ///
2273    /// Only available when the RDF feature is enabled.
2274    #[cfg(feature = "rdf")]
2275    #[must_use]
2276    pub fn rdf_schema(&self) -> crate::admin::SchemaInfo {
2277        let stats = self.rdf_store.stats();
2278
2279        let predicates = self
2280            .rdf_store
2281            .predicates()
2282            .into_iter()
2283            .map(|predicate| {
2284                let count = self.rdf_store.triples_with_predicate(&predicate).len();
2285                crate::admin::PredicateInfo {
2286                    iri: predicate.to_string(),
2287                    count,
2288                }
2289            })
2290            .collect();
2291
2292        crate::admin::SchemaInfo::Rdf(crate::admin::RdfSchemaInfo {
2293            predicates,
2294            named_graphs: Vec::new(), // Named graphs not yet implemented in RdfStore
2295            subject_count: stats.subject_count,
2296            object_count: stats.object_count,
2297        })
2298    }
2299
2300    /// Validates database integrity.
2301    ///
2302    /// Checks for:
2303    /// - Dangling edge references (edges pointing to non-existent nodes)
2304    /// - Internal index consistency
2305    ///
2306    /// Returns a list of errors and warnings. Empty errors = valid.
2307    #[must_use]
2308    pub fn validate(&self) -> crate::admin::ValidationResult {
2309        let mut result = crate::admin::ValidationResult::default();
2310
2311        // Check for dangling edge references
2312        for edge in self.store.all_edges() {
2313            if self.store.get_node(edge.src).is_none() {
2314                result.errors.push(crate::admin::ValidationError {
2315                    code: "DANGLING_SRC".to_string(),
2316                    message: format!(
2317                        "Edge {} references non-existent source node {}",
2318                        edge.id.0, edge.src.0
2319                    ),
2320                    context: Some(format!("edge:{}", edge.id.0)),
2321                });
2322            }
2323            if self.store.get_node(edge.dst).is_none() {
2324                result.errors.push(crate::admin::ValidationError {
2325                    code: "DANGLING_DST".to_string(),
2326                    message: format!(
2327                        "Edge {} references non-existent destination node {}",
2328                        edge.id.0, edge.dst.0
2329                    ),
2330                    context: Some(format!("edge:{}", edge.id.0)),
2331                });
2332            }
2333        }
2334
2335        // Add warnings for potential issues
2336        if self.store.node_count() > 0 && self.store.edge_count() == 0 {
2337            result.warnings.push(crate::admin::ValidationWarning {
2338                code: "NO_EDGES".to_string(),
2339                message: "Database has nodes but no edges".to_string(),
2340                context: None,
2341            });
2342        }
2343
2344        result
2345    }
2346
2347    /// Returns WAL (Write-Ahead Log) status.
2348    ///
2349    /// Returns None if WAL is not enabled.
2350    #[must_use]
2351    pub fn wal_status(&self) -> crate::admin::WalStatus {
2352        #[cfg(feature = "wal")]
2353        if let Some(ref wal) = self.wal {
2354            return crate::admin::WalStatus {
2355                enabled: true,
2356                path: self.config.path.as_ref().map(|p| p.join("wal")),
2357                size_bytes: wal.size_bytes(),
2358                record_count: wal.record_count() as usize,
2359                last_checkpoint: wal.last_checkpoint_timestamp(),
2360                current_epoch: self.store.current_epoch().as_u64(),
2361            };
2362        }
2363
2364        crate::admin::WalStatus {
2365            enabled: false,
2366            path: None,
2367            size_bytes: 0,
2368            record_count: 0,
2369            last_checkpoint: None,
2370            current_epoch: self.store.current_epoch().as_u64(),
2371        }
2372    }
2373
2374    /// Forces a WAL checkpoint.
2375    ///
2376    /// Flushes all pending WAL records to the main storage.
2377    ///
2378    /// # Errors
2379    ///
2380    /// Returns an error if the checkpoint fails.
2381    pub fn wal_checkpoint(&self) -> Result<()> {
2382        #[cfg(feature = "wal")]
2383        if let Some(ref wal) = self.wal {
2384            let epoch = self.store.current_epoch();
2385            let tx_id = self
2386                .tx_manager
2387                .last_assigned_tx_id()
2388                .unwrap_or_else(|| self.tx_manager.begin());
2389            wal.checkpoint(tx_id, epoch)?;
2390            wal.sync()?;
2391        }
2392        Ok(())
2393    }
2394
2395    // =========================================================================
2396    // ADMIN API: Persistence Control
2397    // =========================================================================
2398
2399    /// Saves the database to a file path.
2400    ///
2401    /// - If in-memory: creates a new persistent database at path
2402    /// - If file-backed: creates a copy at the new path
2403    ///
2404    /// The original database remains unchanged.
2405    ///
2406    /// # Errors
2407    ///
2408    /// Returns an error if the save operation fails.
2409    ///
2410    /// Requires the `wal` feature for persistence support.
2411    #[cfg(feature = "wal")]
2412    pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
2413        let path = path.as_ref();
2414
2415        // Create target database with WAL enabled
2416        let target_config = Config::persistent(path);
2417        let target = Self::with_config(target_config)?;
2418
2419        // Copy all nodes using WAL-enabled methods
2420        for node in self.store.all_nodes() {
2421            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
2422            target.store.create_node_with_id(node.id, &label_refs);
2423
2424            // Log to WAL
2425            target.log_wal(&WalRecord::CreateNode {
2426                id: node.id,
2427                labels: node.labels.iter().map(|s| s.to_string()).collect(),
2428            })?;
2429
2430            // Copy properties
2431            for (key, value) in node.properties {
2432                target
2433                    .store
2434                    .set_node_property(node.id, key.as_str(), value.clone());
2435                target.log_wal(&WalRecord::SetNodeProperty {
2436                    id: node.id,
2437                    key: key.to_string(),
2438                    value,
2439                })?;
2440            }
2441        }
2442
2443        // Copy all edges using WAL-enabled methods
2444        for edge in self.store.all_edges() {
2445            target
2446                .store
2447                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
2448
2449            // Log to WAL
2450            target.log_wal(&WalRecord::CreateEdge {
2451                id: edge.id,
2452                src: edge.src,
2453                dst: edge.dst,
2454                edge_type: edge.edge_type.to_string(),
2455            })?;
2456
2457            // Copy properties
2458            for (key, value) in edge.properties {
2459                target
2460                    .store
2461                    .set_edge_property(edge.id, key.as_str(), value.clone());
2462                target.log_wal(&WalRecord::SetEdgeProperty {
2463                    id: edge.id,
2464                    key: key.to_string(),
2465                    value,
2466                })?;
2467            }
2468        }
2469
2470        // Checkpoint and close the target database
2471        target.close()?;
2472
2473        Ok(())
2474    }
2475
2476    /// Creates an in-memory copy of this database.
2477    ///
2478    /// Returns a new database that is completely independent.
2479    /// Useful for:
2480    /// - Testing modifications without affecting the original
2481    /// - Faster operations when persistence isn't needed
2482    ///
2483    /// # Errors
2484    ///
2485    /// Returns an error if the copy operation fails.
2486    pub fn to_memory(&self) -> Result<Self> {
2487        let config = Config::in_memory();
2488        let target = Self::with_config(config)?;
2489
2490        // Copy all nodes
2491        for node in self.store.all_nodes() {
2492            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
2493            target.store.create_node_with_id(node.id, &label_refs);
2494
2495            // Copy properties
2496            for (key, value) in node.properties {
2497                target.store.set_node_property(node.id, key.as_str(), value);
2498            }
2499        }
2500
2501        // Copy all edges
2502        for edge in self.store.all_edges() {
2503            target
2504                .store
2505                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
2506
2507            // Copy properties
2508            for (key, value) in edge.properties {
2509                target.store.set_edge_property(edge.id, key.as_str(), value);
2510            }
2511        }
2512
2513        Ok(target)
2514    }
2515
2516    /// Opens a database file and loads it entirely into memory.
2517    ///
2518    /// The returned database has no connection to the original file.
2519    /// Changes will NOT be written back to the file.
2520    ///
2521    /// # Errors
2522    ///
2523    /// Returns an error if the file can't be opened or loaded.
2524    #[cfg(feature = "wal")]
2525    pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
2526        // Open the source database (triggers WAL recovery)
2527        let source = Self::open(path)?;
2528
2529        // Create in-memory copy
2530        let target = source.to_memory()?;
2531
2532        // Close the source (releases file handles)
2533        source.close()?;
2534
2535        Ok(target)
2536    }
2537
2538    // =========================================================================
2539    // ADMIN API: Snapshot Export/Import
2540    // =========================================================================
2541
2542    /// Exports the entire database to a binary snapshot.
2543    ///
2544    /// The returned bytes can be stored (e.g. in IndexedDB) and later
2545    /// restored with [`import_snapshot()`](Self::import_snapshot).
2546    ///
2547    /// # Errors
2548    ///
2549    /// Returns an error if serialization fails.
2550    pub fn export_snapshot(&self) -> Result<Vec<u8>> {
2551        let nodes: Vec<SnapshotNode> = self
2552            .store
2553            .all_nodes()
2554            .map(|n| SnapshotNode {
2555                id: n.id,
2556                labels: n.labels.iter().map(|l| l.to_string()).collect(),
2557                properties: n
2558                    .properties
2559                    .into_iter()
2560                    .map(|(k, v)| (k.to_string(), v))
2561                    .collect(),
2562            })
2563            .collect();
2564
2565        let edges: Vec<SnapshotEdge> = self
2566            .store
2567            .all_edges()
2568            .map(|e| SnapshotEdge {
2569                id: e.id,
2570                src: e.src,
2571                dst: e.dst,
2572                edge_type: e.edge_type.to_string(),
2573                properties: e
2574                    .properties
2575                    .into_iter()
2576                    .map(|(k, v)| (k.to_string(), v))
2577                    .collect(),
2578            })
2579            .collect();
2580
2581        let snapshot = Snapshot {
2582            version: 1,
2583            nodes,
2584            edges,
2585        };
2586
2587        let config = bincode::config::standard();
2588        bincode::serde::encode_to_vec(&snapshot, config)
2589            .map_err(|e| Error::Internal(format!("snapshot export failed: {e}")))
2590    }
2591
2592    /// Creates a new in-memory database from a binary snapshot.
2593    ///
2594    /// The `data` must have been produced by [`export_snapshot()`](Self::export_snapshot).
2595    ///
2596    /// # Errors
2597    ///
2598    /// Returns an error if the snapshot is invalid or deserialization fails.
2599    pub fn import_snapshot(data: &[u8]) -> Result<Self> {
2600        let config = bincode::config::standard();
2601        let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
2602            .map_err(|e| Error::Internal(format!("snapshot import failed: {e}")))?;
2603
2604        if snapshot.version != 1 {
2605            return Err(Error::Internal(format!(
2606                "unsupported snapshot version: {}",
2607                snapshot.version
2608            )));
2609        }
2610
2611        let db = Self::new_in_memory();
2612
2613        for node in snapshot.nodes {
2614            let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
2615            db.store.create_node_with_id(node.id, &label_refs);
2616            for (key, value) in node.properties {
2617                db.store.set_node_property(node.id, &key, value);
2618            }
2619        }
2620
2621        for edge in snapshot.edges {
2622            db.store
2623                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
2624            for (key, value) in edge.properties {
2625                db.store.set_edge_property(edge.id, &key, value);
2626            }
2627        }
2628
2629        Ok(db)
2630    }
2631
2632    // =========================================================================
2633    // ADMIN API: Iteration
2634    // =========================================================================
2635
2636    /// Returns an iterator over all nodes in the database.
2637    ///
2638    /// Useful for dump/export operations.
2639    pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
2640        self.store.all_nodes()
2641    }
2642
2643    /// Returns an iterator over all edges in the database.
2644    ///
2645    /// Useful for dump/export operations.
2646    pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
2647        self.store.all_edges()
2648    }
2649}
2650
2651/// Binary snapshot format for database export/import.
2652#[derive(serde::Serialize, serde::Deserialize)]
2653struct Snapshot {
2654    version: u8,
2655    nodes: Vec<SnapshotNode>,
2656    edges: Vec<SnapshotEdge>,
2657}
2658
2659#[derive(serde::Serialize, serde::Deserialize)]
2660struct SnapshotNode {
2661    id: NodeId,
2662    labels: Vec<String>,
2663    properties: Vec<(String, Value)>,
2664}
2665
2666#[derive(serde::Serialize, serde::Deserialize)]
2667struct SnapshotEdge {
2668    id: EdgeId,
2669    src: NodeId,
2670    dst: NodeId,
2671    edge_type: String,
2672    properties: Vec<(String, Value)>,
2673}
2674
2675impl Drop for GrafeoDB {
2676    fn drop(&mut self) {
2677        if let Err(e) = self.close() {
2678            tracing::error!("Error closing database: {}", e);
2679        }
2680    }
2681}
2682
2683impl crate::admin::AdminService for GrafeoDB {
2684    fn info(&self) -> crate::admin::DatabaseInfo {
2685        self.info()
2686    }
2687
2688    fn detailed_stats(&self) -> crate::admin::DatabaseStats {
2689        self.detailed_stats()
2690    }
2691
2692    fn schema(&self) -> crate::admin::SchemaInfo {
2693        self.schema()
2694    }
2695
2696    fn validate(&self) -> crate::admin::ValidationResult {
2697        self.validate()
2698    }
2699
2700    fn wal_status(&self) -> crate::admin::WalStatus {
2701        self.wal_status()
2702    }
2703
2704    fn wal_checkpoint(&self) -> Result<()> {
2705        self.wal_checkpoint()
2706    }
2707}
2708
2709/// The result of running a query.
2710///
2711/// Contains rows and columns, like a table. Use [`iter()`](Self::iter) to
2712/// loop through rows, or [`scalar()`](Self::scalar) if you expect a single value.
2713///
2714/// # Examples
2715///
2716/// ```
2717/// use grafeo_engine::GrafeoDB;
2718///
2719/// let db = GrafeoDB::new_in_memory();
2720/// db.create_node(&["Person"]);
2721///
2722/// let result = db.execute("MATCH (p:Person) RETURN count(p) AS total")?;
2723///
2724/// // Check what we got
2725/// println!("Columns: {:?}", result.columns);
2726/// println!("Rows: {}", result.row_count());
2727///
2728/// // Iterate through results
2729/// for row in result.iter() {
2730///     println!("{:?}", row);
2731/// }
2732/// # Ok::<(), grafeo_common::utils::error::Error>(())
2733/// ```
2734#[derive(Debug)]
2735pub struct QueryResult {
2736    /// Column names from the RETURN clause.
2737    pub columns: Vec<String>,
2738    /// Column types - useful for distinguishing NodeId/EdgeId from plain integers.
2739    pub column_types: Vec<grafeo_common::types::LogicalType>,
2740    /// The actual result rows.
2741    pub rows: Vec<Vec<grafeo_common::types::Value>>,
2742    /// Query execution time in milliseconds (if timing was enabled).
2743    pub execution_time_ms: Option<f64>,
2744    /// Number of rows scanned during query execution (estimate).
2745    pub rows_scanned: Option<u64>,
2746}
2747
2748impl QueryResult {
2749    /// Creates a new empty query result.
2750    #[must_use]
2751    pub fn new(columns: Vec<String>) -> Self {
2752        let len = columns.len();
2753        Self {
2754            columns,
2755            column_types: vec![grafeo_common::types::LogicalType::Any; len],
2756            rows: Vec::new(),
2757            execution_time_ms: None,
2758            rows_scanned: None,
2759        }
2760    }
2761
2762    /// Creates a new empty query result with column types.
2763    #[must_use]
2764    pub fn with_types(
2765        columns: Vec<String>,
2766        column_types: Vec<grafeo_common::types::LogicalType>,
2767    ) -> Self {
2768        Self {
2769            columns,
2770            column_types,
2771            rows: Vec::new(),
2772            execution_time_ms: None,
2773            rows_scanned: None,
2774        }
2775    }
2776
2777    /// Sets the execution metrics on this result.
2778    pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
2779        self.execution_time_ms = Some(execution_time_ms);
2780        self.rows_scanned = Some(rows_scanned);
2781        self
2782    }
2783
2784    /// Returns the execution time in milliseconds, if available.
2785    #[must_use]
2786    pub fn execution_time_ms(&self) -> Option<f64> {
2787        self.execution_time_ms
2788    }
2789
2790    /// Returns the number of rows scanned, if available.
2791    #[must_use]
2792    pub fn rows_scanned(&self) -> Option<u64> {
2793        self.rows_scanned
2794    }
2795
2796    /// Returns the number of rows.
2797    #[must_use]
2798    pub fn row_count(&self) -> usize {
2799        self.rows.len()
2800    }
2801
2802    /// Returns the number of columns.
2803    #[must_use]
2804    pub fn column_count(&self) -> usize {
2805        self.columns.len()
2806    }
2807
2808    /// Returns true if the result is empty.
2809    #[must_use]
2810    pub fn is_empty(&self) -> bool {
2811        self.rows.is_empty()
2812    }
2813
2814    /// Extracts a single value from the result.
2815    ///
2816    /// Use this when your query returns exactly one row with one column,
2817    /// like `RETURN count(n)` or `RETURN sum(p.amount)`.
2818    ///
2819    /// # Errors
2820    ///
2821    /// Returns an error if the result has multiple rows or columns.
2822    pub fn scalar<T: FromValue>(&self) -> Result<T> {
2823        if self.rows.len() != 1 || self.columns.len() != 1 {
2824            return Err(grafeo_common::utils::error::Error::InvalidValue(
2825                "Expected single value".to_string(),
2826            ));
2827        }
2828        T::from_value(&self.rows[0][0])
2829    }
2830
2831    /// Returns an iterator over the rows.
2832    pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
2833        self.rows.iter()
2834    }
2835}
2836
2837/// Converts a [`Value`] to a concrete Rust type.
2838///
2839/// Implemented for common types like `i64`, `f64`, `String`, and `bool`.
2840/// Used by [`QueryResult::scalar()`] to extract typed values.
2841pub trait FromValue: Sized {
2842    /// Attempts the conversion, returning an error on type mismatch.
2843    fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
2844}
2845
2846impl FromValue for i64 {
2847    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2848        value
2849            .as_int64()
2850            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2851                expected: "INT64".to_string(),
2852                found: value.type_name().to_string(),
2853            })
2854    }
2855}
2856
2857impl FromValue for f64 {
2858    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2859        value
2860            .as_float64()
2861            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2862                expected: "FLOAT64".to_string(),
2863                found: value.type_name().to_string(),
2864            })
2865    }
2866}
2867
2868impl FromValue for String {
2869    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2870        value.as_str().map(String::from).ok_or_else(|| {
2871            grafeo_common::utils::error::Error::TypeMismatch {
2872                expected: "STRING".to_string(),
2873                found: value.type_name().to_string(),
2874            }
2875        })
2876    }
2877}
2878
2879impl FromValue for bool {
2880    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2881        value
2882            .as_bool()
2883            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2884                expected: "BOOL".to_string(),
2885                found: value.type_name().to_string(),
2886            })
2887    }
2888}
2889
2890#[cfg(test)]
2891mod tests {
2892    use super::*;
2893
2894    #[test]
2895    fn test_create_in_memory_database() {
2896        let db = GrafeoDB::new_in_memory();
2897        assert_eq!(db.node_count(), 0);
2898        assert_eq!(db.edge_count(), 0);
2899    }
2900
2901    #[test]
2902    fn test_database_config() {
2903        let config = Config::in_memory().with_threads(4).with_query_logging();
2904
2905        let db = GrafeoDB::with_config(config).unwrap();
2906        assert_eq!(db.config().threads, 4);
2907        assert!(db.config().query_logging);
2908    }
2909
2910    #[test]
2911    fn test_database_session() {
2912        let db = GrafeoDB::new_in_memory();
2913        let _session = db.session();
2914        // Session should be created successfully
2915    }
2916
2917    #[cfg(feature = "wal")]
2918    #[test]
2919    fn test_persistent_database_recovery() {
2920        use grafeo_common::types::Value;
2921        use tempfile::tempdir;
2922
2923        let dir = tempdir().unwrap();
2924        let db_path = dir.path().join("test_db");
2925
2926        // Create database and add some data
2927        {
2928            let db = GrafeoDB::open(&db_path).unwrap();
2929
2930            let alice = db.create_node(&["Person"]);
2931            db.set_node_property(alice, "name", Value::from("Alice"));
2932
2933            let bob = db.create_node(&["Person"]);
2934            db.set_node_property(bob, "name", Value::from("Bob"));
2935
2936            let _edge = db.create_edge(alice, bob, "KNOWS");
2937
2938            // Explicitly close to flush WAL
2939            db.close().unwrap();
2940        }
2941
2942        // Reopen and verify data was recovered
2943        {
2944            let db = GrafeoDB::open(&db_path).unwrap();
2945
2946            assert_eq!(db.node_count(), 2);
2947            assert_eq!(db.edge_count(), 1);
2948
2949            // Verify nodes exist
2950            let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
2951            assert!(node0.is_some());
2952
2953            let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
2954            assert!(node1.is_some());
2955        }
2956    }
2957
2958    #[cfg(feature = "wal")]
2959    #[test]
2960    fn test_wal_logging() {
2961        use tempfile::tempdir;
2962
2963        let dir = tempdir().unwrap();
2964        let db_path = dir.path().join("wal_test_db");
2965
2966        let db = GrafeoDB::open(&db_path).unwrap();
2967
2968        // Create some data
2969        let node = db.create_node(&["Test"]);
2970        db.delete_node(node);
2971
2972        // WAL should have records
2973        if let Some(wal) = db.wal() {
2974            assert!(wal.record_count() > 0);
2975        }
2976
2977        db.close().unwrap();
2978    }
2979
2980    #[cfg(feature = "wal")]
2981    #[test]
2982    fn test_wal_recovery_multiple_sessions() {
2983        // Tests that WAL recovery works correctly across multiple open/close cycles
2984        use grafeo_common::types::Value;
2985        use tempfile::tempdir;
2986
2987        let dir = tempdir().unwrap();
2988        let db_path = dir.path().join("multi_session_db");
2989
2990        // Session 1: Create initial data
2991        {
2992            let db = GrafeoDB::open(&db_path).unwrap();
2993            let alice = db.create_node(&["Person"]);
2994            db.set_node_property(alice, "name", Value::from("Alice"));
2995            db.close().unwrap();
2996        }
2997
2998        // Session 2: Add more data
2999        {
3000            let db = GrafeoDB::open(&db_path).unwrap();
3001            assert_eq!(db.node_count(), 1); // Previous data recovered
3002            let bob = db.create_node(&["Person"]);
3003            db.set_node_property(bob, "name", Value::from("Bob"));
3004            db.close().unwrap();
3005        }
3006
3007        // Session 3: Verify all data
3008        {
3009            let db = GrafeoDB::open(&db_path).unwrap();
3010            assert_eq!(db.node_count(), 2);
3011
3012            // Verify properties were recovered correctly
3013            let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
3014            assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
3015
3016            let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
3017            assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
3018        }
3019    }
3020
3021    #[cfg(feature = "wal")]
3022    #[test]
3023    fn test_database_consistency_after_mutations() {
3024        // Tests that database remains consistent after a series of create/delete operations
3025        use grafeo_common::types::Value;
3026        use tempfile::tempdir;
3027
3028        let dir = tempdir().unwrap();
3029        let db_path = dir.path().join("consistency_db");
3030
3031        {
3032            let db = GrafeoDB::open(&db_path).unwrap();
3033
3034            // Create nodes
3035            let a = db.create_node(&["Node"]);
3036            let b = db.create_node(&["Node"]);
3037            let c = db.create_node(&["Node"]);
3038
3039            // Create edges
3040            let e1 = db.create_edge(a, b, "LINKS");
3041            let _e2 = db.create_edge(b, c, "LINKS");
3042
3043            // Delete middle node and its edge
3044            db.delete_edge(e1);
3045            db.delete_node(b);
3046
3047            // Set properties on remaining nodes
3048            db.set_node_property(a, "value", Value::Int64(1));
3049            db.set_node_property(c, "value", Value::Int64(3));
3050
3051            db.close().unwrap();
3052        }
3053
3054        // Reopen and verify consistency
3055        {
3056            let db = GrafeoDB::open(&db_path).unwrap();
3057
3058            // Should have 2 nodes (a and c), b was deleted
3059            // Note: node_count includes deleted nodes in some implementations
3060            // What matters is that the non-deleted nodes are accessible
3061            let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
3062            assert!(node_a.is_some());
3063
3064            let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
3065            assert!(node_c.is_some());
3066
3067            // Middle node should be deleted
3068            let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
3069            assert!(node_b.is_none());
3070        }
3071    }
3072
3073    #[cfg(feature = "wal")]
3074    #[test]
3075    fn test_close_is_idempotent() {
3076        // Calling close() multiple times should not cause errors
3077        use tempfile::tempdir;
3078
3079        let dir = tempdir().unwrap();
3080        let db_path = dir.path().join("close_test_db");
3081
3082        let db = GrafeoDB::open(&db_path).unwrap();
3083        db.create_node(&["Test"]);
3084
3085        // First close should succeed
3086        assert!(db.close().is_ok());
3087
3088        // Second close should also succeed (idempotent)
3089        assert!(db.close().is_ok());
3090    }
3091
3092    #[test]
3093    fn test_query_result_has_metrics() {
3094        // Verifies that query results include execution metrics
3095        let db = GrafeoDB::new_in_memory();
3096        db.create_node(&["Person"]);
3097        db.create_node(&["Person"]);
3098
3099        #[cfg(feature = "gql")]
3100        {
3101            let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
3102
3103            // Metrics should be populated
3104            assert!(result.execution_time_ms.is_some());
3105            assert!(result.rows_scanned.is_some());
3106            assert!(result.execution_time_ms.unwrap() >= 0.0);
3107            assert_eq!(result.rows_scanned.unwrap(), 2);
3108        }
3109    }
3110
3111    #[test]
3112    fn test_empty_query_result_metrics() {
3113        // Verifies metrics are correct for queries returning no results
3114        let db = GrafeoDB::new_in_memory();
3115        db.create_node(&["Person"]);
3116
3117        #[cfg(feature = "gql")]
3118        {
3119            // Query that matches nothing
3120            let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
3121
3122            assert!(result.execution_time_ms.is_some());
3123            assert!(result.rows_scanned.is_some());
3124            assert_eq!(result.rows_scanned.unwrap(), 0);
3125        }
3126    }
3127
3128    #[cfg(feature = "cdc")]
3129    mod cdc_integration {
3130        use super::*;
3131
3132        #[test]
3133        fn test_node_lifecycle_history() {
3134            let db = GrafeoDB::new_in_memory();
3135
3136            // Create
3137            let id = db.create_node(&["Person"]);
3138            // Update
3139            db.set_node_property(id, "name", "Alice".into());
3140            db.set_node_property(id, "name", "Bob".into());
3141            // Delete
3142            db.delete_node(id);
3143
3144            let history = db.history(id).unwrap();
3145            assert_eq!(history.len(), 4); // create + 2 updates + delete
3146            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3147            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
3148            assert!(history[1].before.is_none()); // first set_node_property has no prior value
3149            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
3150            assert!(history[2].before.is_some()); // second update has prior "Alice"
3151            assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
3152        }
3153
3154        #[test]
3155        fn test_edge_lifecycle_history() {
3156            let db = GrafeoDB::new_in_memory();
3157
3158            let alice = db.create_node(&["Person"]);
3159            let bob = db.create_node(&["Person"]);
3160            let edge = db.create_edge(alice, bob, "KNOWS");
3161            db.set_edge_property(edge, "since", 2024i64.into());
3162            db.delete_edge(edge);
3163
3164            let history = db.history(edge).unwrap();
3165            assert_eq!(history.len(), 3); // create + update + delete
3166            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3167            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
3168            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
3169        }
3170
3171        #[test]
3172        fn test_create_node_with_props_cdc() {
3173            let db = GrafeoDB::new_in_memory();
3174
3175            let id = db.create_node_with_props(
3176                &["Person"],
3177                vec![
3178                    ("name", grafeo_common::types::Value::from("Alice")),
3179                    ("age", grafeo_common::types::Value::from(30i64)),
3180                ],
3181            );
3182
3183            let history = db.history(id).unwrap();
3184            assert_eq!(history.len(), 1);
3185            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3186            // Props should be captured
3187            let after = history[0].after.as_ref().unwrap();
3188            assert_eq!(after.len(), 2);
3189        }
3190
3191        #[test]
3192        fn test_changes_between() {
3193            let db = GrafeoDB::new_in_memory();
3194
3195            let id1 = db.create_node(&["A"]);
3196            let _id2 = db.create_node(&["B"]);
3197            db.set_node_property(id1, "x", 1i64.into());
3198
3199            // All events should be at the same epoch (in-memory, epoch doesn't advance without tx)
3200            let changes = db
3201                .changes_between(
3202                    grafeo_common::types::EpochId(0),
3203                    grafeo_common::types::EpochId(u64::MAX),
3204                )
3205                .unwrap();
3206            assert_eq!(changes.len(), 3); // 2 creates + 1 update
3207        }
3208    }
3209}