Skip to main content

grafeo_engine/database/
mod.rs

1//! The main database struct and operations.
2//!
3//! Start here with [`GrafeoDB`] - it's your handle to everything.
4//!
5//! Operations are split across focused submodules:
6//! - `query` - Query execution (execute, execute_cypher, etc.)
7//! - `crud` - Node/edge CRUD operations
8//! - `index` - Property, vector, and text index management
9//! - `search` - Vector, text, and hybrid search
10//! - `embed` - Embedding model management
11//! - `persistence` - Save, load, snapshots, iteration
12//! - `admin` - Stats, introspection, diagnostics, CDC
13
14mod admin;
15mod crud;
16#[cfg(feature = "embed")]
17mod embed;
18mod index;
19mod persistence;
20mod query;
21mod search;
22
23use std::path::Path;
24use std::sync::Arc;
25use std::sync::atomic::AtomicUsize;
26
27use parking_lot::RwLock;
28
29#[cfg(feature = "wal")]
30use grafeo_adapters::storage::wal::{
31    DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord, WalRecovery,
32};
33use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
34use grafeo_common::utils::error::Result;
35use grafeo_core::graph::GraphStoreMut;
36use grafeo_core::graph::lpg::LpgStore;
37#[cfg(feature = "rdf")]
38use grafeo_core::graph::rdf::RdfStore;
39
40use crate::config::Config;
41use crate::query::cache::QueryCache;
42use crate::session::Session;
43use crate::transaction::TransactionManager;
44
45/// Your handle to a Grafeo database.
46///
47/// Start here. Create one with [`new_in_memory()`](Self::new_in_memory) for
48/// quick experiments, or [`open()`](Self::open) for persistent storage.
49/// Then grab a [`session()`](Self::session) to start querying.
50///
51/// # Examples
52///
53/// ```
54/// use grafeo_engine::GrafeoDB;
55///
56/// // Quick in-memory database
57/// let db = GrafeoDB::new_in_memory();
58///
59/// // Add some data
60/// db.create_node(&["Person"]);
61///
62/// // Query it
63/// let session = db.session();
64/// let result = session.execute("MATCH (p:Person) RETURN p")?;
65/// # Ok::<(), grafeo_common::utils::error::Error>(())
66/// ```
67pub struct GrafeoDB {
68    /// Database configuration.
69    pub(super) config: Config,
70    /// The underlying graph store.
71    pub(super) store: Arc<LpgStore>,
72    /// RDF triple store (if RDF feature is enabled).
73    #[cfg(feature = "rdf")]
74    pub(super) rdf_store: Arc<RdfStore>,
75    /// Transaction manager.
76    pub(super) tx_manager: Arc<TransactionManager>,
77    /// Unified buffer manager.
78    pub(super) buffer_manager: Arc<BufferManager>,
79    /// Write-ahead log manager (if durability is enabled).
80    #[cfg(feature = "wal")]
81    pub(super) wal: Option<Arc<LpgWal>>,
82    /// Query cache for parsed and optimized plans.
83    pub(super) query_cache: Arc<QueryCache>,
84    /// Shared commit counter for auto-GC across sessions.
85    pub(super) commit_counter: Arc<AtomicUsize>,
86    /// Whether the database is open.
87    pub(super) is_open: RwLock<bool>,
88    /// Change data capture log for tracking mutations.
89    #[cfg(feature = "cdc")]
90    pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
91    /// Registered embedding models for text-to-vector conversion.
92    #[cfg(feature = "embed")]
93    pub(super) embedding_models:
94        RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
95    /// External graph store (when using with_store()).
96    /// When set, sessions route queries through this store instead of the built-in LpgStore.
97    pub(super) external_store: Option<Arc<dyn GraphStoreMut>>,
98}
99
100impl GrafeoDB {
101    /// Creates an in-memory database - fast to create, gone when dropped.
102    ///
103    /// Use this for tests, experiments, or when you don't need persistence.
104    /// For data that survives restarts, use [`open()`](Self::open) instead.
105    ///
106    /// # Examples
107    ///
108    /// ```
109    /// use grafeo_engine::GrafeoDB;
110    ///
111    /// let db = GrafeoDB::new_in_memory();
112    /// let session = db.session();
113    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
114    /// # Ok::<(), grafeo_common::utils::error::Error>(())
115    /// ```
116    #[must_use]
117    pub fn new_in_memory() -> Self {
118        Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
119    }
120
121    /// Opens a database at the given path, creating it if it doesn't exist.
122    ///
123    /// If you've used this path before, Grafeo recovers your data from the
124    /// write-ahead log automatically. First open on a new path creates an
125    /// empty database.
126    ///
127    /// # Errors
128    ///
129    /// Returns an error if the path isn't writable or recovery fails.
130    ///
131    /// # Examples
132    ///
133    /// ```no_run
134    /// use grafeo_engine::GrafeoDB;
135    ///
136    /// let db = GrafeoDB::open("./my_social_network")?;
137    /// # Ok::<(), grafeo_common::utils::error::Error>(())
138    /// ```
139    #[cfg(feature = "wal")]
140    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
141        Self::with_config(Config::persistent(path.as_ref()))
142    }
143
144    /// Creates a database with custom configuration.
145    ///
146    /// Use this when you need fine-grained control over memory limits,
147    /// thread counts, or persistence settings. For most cases,
148    /// [`new_in_memory()`](Self::new_in_memory) or [`open()`](Self::open)
149    /// are simpler.
150    ///
151    /// # Errors
152    ///
153    /// Returns an error if the database can't be created or recovery fails.
154    ///
155    /// # Examples
156    ///
157    /// ```
158    /// use grafeo_engine::{GrafeoDB, Config};
159    ///
160    /// // In-memory with a 512MB limit
161    /// let config = Config::in_memory()
162    ///     .with_memory_limit(512 * 1024 * 1024);
163    ///
164    /// let db = GrafeoDB::with_config(config)?;
165    /// # Ok::<(), grafeo_common::utils::error::Error>(())
166    /// ```
167    pub fn with_config(config: Config) -> Result<Self> {
168        // Validate configuration before proceeding
169        config
170            .validate()
171            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
172
173        let store = Arc::new(LpgStore::new());
174        #[cfg(feature = "rdf")]
175        let rdf_store = Arc::new(RdfStore::new());
176        let tx_manager = Arc::new(TransactionManager::new());
177
178        // Create buffer manager with configured limits
179        let buffer_config = BufferManagerConfig {
180            budget: config.memory_limit.unwrap_or_else(|| {
181                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
182            }),
183            spill_path: config
184                .spill_path
185                .clone()
186                .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
187            ..BufferManagerConfig::default()
188        };
189        let buffer_manager = BufferManager::new(buffer_config);
190
191        // Initialize WAL if persistence is enabled
192        #[cfg(feature = "wal")]
193        let wal = if config.wal_enabled {
194            if let Some(ref db_path) = config.path {
195                // Create database directory if it doesn't exist
196                std::fs::create_dir_all(db_path)?;
197
198                let wal_path = db_path.join("wal");
199
200                // Check if WAL exists and recover if needed
201                if wal_path.exists() {
202                    let recovery = WalRecovery::new(&wal_path);
203                    let records = recovery.recover()?;
204                    Self::apply_wal_records(&store, &records)?;
205                }
206
207                // Open/create WAL manager with configured durability
208                let wal_durability = match config.wal_durability {
209                    crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
210                    crate::config::DurabilityMode::Batch {
211                        max_delay_ms,
212                        max_records,
213                    } => WalDurabilityMode::Batch {
214                        max_delay_ms,
215                        max_records,
216                    },
217                    crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
218                        WalDurabilityMode::Adaptive { target_interval_ms }
219                    }
220                    crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
221                };
222                let wal_config = WalConfig {
223                    durability: wal_durability,
224                    ..WalConfig::default()
225                };
226                let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
227                Some(Arc::new(wal_manager))
228            } else {
229                None
230            }
231        } else {
232            None
233        };
234
235        // Create query cache with default capacity (1000 queries)
236        let query_cache = Arc::new(QueryCache::default());
237
238        Ok(Self {
239            config,
240            store,
241            #[cfg(feature = "rdf")]
242            rdf_store,
243            tx_manager,
244            buffer_manager,
245            #[cfg(feature = "wal")]
246            wal,
247            query_cache,
248            commit_counter: Arc::new(AtomicUsize::new(0)),
249            is_open: RwLock::new(true),
250            #[cfg(feature = "cdc")]
251            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
252            #[cfg(feature = "embed")]
253            embedding_models: RwLock::new(hashbrown::HashMap::new()),
254            external_store: None,
255        })
256    }
257
258    /// Creates a database backed by a custom [`GraphStoreMut`] implementation.
259    ///
260    /// The external store handles all data persistence. WAL, CDC, and index
261    /// management are the responsibility of the store implementation.
262    ///
263    /// Query execution (all 6 languages, optimizer, planner) works through the
264    /// provided store. Admin operations (schema introspection, persistence,
265    /// vector/text indexes) are not available on external stores.
266    ///
267    /// # Examples
268    ///
269    /// ```no_run
270    /// use std::sync::Arc;
271    /// use grafeo_engine::{GrafeoDB, Config};
272    /// use grafeo_core::graph::GraphStoreMut;
273    ///
274    /// fn example(store: Arc<dyn GraphStoreMut>) -> grafeo_common::utils::error::Result<()> {
275    ///     let db = GrafeoDB::with_store(store, Config::in_memory())?;
276    ///     let result = db.execute("MATCH (n) RETURN count(n)")?;
277    ///     Ok(())
278    /// }
279    /// ```
280    ///
281    /// [`GraphStoreMut`]: grafeo_core::graph::GraphStoreMut
282    pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
283        config
284            .validate()
285            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
286
287        let dummy_store = Arc::new(LpgStore::new());
288        let tx_manager = Arc::new(TransactionManager::new());
289
290        let buffer_config = BufferManagerConfig {
291            budget: config.memory_limit.unwrap_or_else(|| {
292                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
293            }),
294            spill_path: None,
295            ..BufferManagerConfig::default()
296        };
297        let buffer_manager = BufferManager::new(buffer_config);
298
299        let query_cache = Arc::new(QueryCache::default());
300
301        Ok(Self {
302            config,
303            store: dummy_store,
304            #[cfg(feature = "rdf")]
305            rdf_store: Arc::new(RdfStore::new()),
306            tx_manager,
307            buffer_manager,
308            #[cfg(feature = "wal")]
309            wal: None,
310            query_cache,
311            commit_counter: Arc::new(AtomicUsize::new(0)),
312            is_open: RwLock::new(true),
313            #[cfg(feature = "cdc")]
314            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
315            #[cfg(feature = "embed")]
316            embedding_models: RwLock::new(hashbrown::HashMap::new()),
317            external_store: Some(store),
318        })
319    }
320
321    /// Applies WAL records to restore the database state.
322    #[cfg(feature = "wal")]
323    fn apply_wal_records(store: &LpgStore, records: &[WalRecord]) -> Result<()> {
324        for record in records {
325            match record {
326                WalRecord::CreateNode { id, labels } => {
327                    let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
328                    store.create_node_with_id(*id, &label_refs);
329                }
330                WalRecord::DeleteNode { id } => {
331                    store.delete_node(*id);
332                }
333                WalRecord::CreateEdge {
334                    id,
335                    src,
336                    dst,
337                    edge_type,
338                } => {
339                    store.create_edge_with_id(*id, *src, *dst, edge_type);
340                }
341                WalRecord::DeleteEdge { id } => {
342                    store.delete_edge(*id);
343                }
344                WalRecord::SetNodeProperty { id, key, value } => {
345                    store.set_node_property(*id, key, value.clone());
346                }
347                WalRecord::SetEdgeProperty { id, key, value } => {
348                    store.set_edge_property(*id, key, value.clone());
349                }
350                WalRecord::AddNodeLabel { id, label } => {
351                    store.add_label(*id, label);
352                }
353                WalRecord::RemoveNodeLabel { id, label } => {
354                    store.remove_label(*id, label);
355                }
356                WalRecord::TxCommit { .. }
357                | WalRecord::TxAbort { .. }
358                | WalRecord::Checkpoint { .. } => {
359                    // Transaction control records don't need replay action
360                    // (recovery already filtered to only committed transactions)
361                }
362            }
363        }
364        Ok(())
365    }
366
367    // =========================================================================
368    // Session & Configuration
369    // =========================================================================
370
371    /// Opens a new session for running queries.
372    ///
373    /// Sessions are cheap to create - spin up as many as you need. Each
374    /// gets its own transaction context, so concurrent sessions won't
375    /// block each other on reads.
376    ///
377    /// # Examples
378    ///
379    /// ```
380    /// use grafeo_engine::GrafeoDB;
381    ///
382    /// let db = GrafeoDB::new_in_memory();
383    /// let session = db.session();
384    ///
385    /// // Run queries through the session
386    /// let result = session.execute("MATCH (n) RETURN count(n)")?;
387    /// # Ok::<(), grafeo_common::utils::error::Error>(())
388    /// ```
389    #[must_use]
390    pub fn session(&self) -> Session {
391        if let Some(ref ext_store) = self.external_store {
392            return Session::with_external_store(
393                Arc::clone(ext_store),
394                Arc::clone(&self.tx_manager),
395                Arc::clone(&self.query_cache),
396                self.config.adaptive.clone(),
397                self.config.factorized_execution,
398                self.config.graph_model,
399                self.config.query_timeout,
400                Arc::clone(&self.commit_counter),
401                self.config.gc_interval,
402            );
403        }
404
405        #[cfg(feature = "rdf")]
406        let mut session = Session::with_rdf_store_and_adaptive(
407            Arc::clone(&self.store),
408            Arc::clone(&self.rdf_store),
409            Arc::clone(&self.tx_manager),
410            Arc::clone(&self.query_cache),
411            self.config.adaptive.clone(),
412            self.config.factorized_execution,
413            self.config.graph_model,
414            self.config.query_timeout,
415            Arc::clone(&self.commit_counter),
416            self.config.gc_interval,
417        );
418        #[cfg(not(feature = "rdf"))]
419        let mut session = Session::with_adaptive(
420            Arc::clone(&self.store),
421            Arc::clone(&self.tx_manager),
422            Arc::clone(&self.query_cache),
423            self.config.adaptive.clone(),
424            self.config.factorized_execution,
425            self.config.graph_model,
426            self.config.query_timeout,
427            Arc::clone(&self.commit_counter),
428            self.config.gc_interval,
429        );
430
431        #[cfg(feature = "cdc")]
432        session.set_cdc_log(Arc::clone(&self.cdc_log));
433
434        // Suppress unused_mut when cdc is disabled
435        let _ = &mut session;
436
437        session
438    }
439
440    /// Returns the adaptive execution configuration.
441    #[must_use]
442    pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
443        &self.config.adaptive
444    }
445
446    /// Returns the configuration.
447    #[must_use]
448    pub fn config(&self) -> &Config {
449        &self.config
450    }
451
452    /// Returns the graph data model of this database.
453    #[must_use]
454    pub fn graph_model(&self) -> crate::config::GraphModel {
455        self.config.graph_model
456    }
457
458    /// Returns the configured memory limit in bytes, if any.
459    #[must_use]
460    pub fn memory_limit(&self) -> Option<usize> {
461        self.config.memory_limit
462    }
463
464    /// Returns the underlying store.
465    ///
466    /// This provides direct access to the LPG store for algorithm implementations
467    /// and admin operations (index management, schema introspection, MVCC internals).
468    ///
469    /// For code that only needs read/write graph operations, prefer
470    /// [`graph_store()`](Self::graph_store) which returns the trait interface.
471    #[must_use]
472    pub fn store(&self) -> &Arc<LpgStore> {
473        &self.store
474    }
475
476    /// Returns the graph store as a trait object.
477    ///
478    /// This provides the [`GraphStoreMut`] interface for code that should work
479    /// with any storage backend. Use this when you only need graph read/write
480    /// operations and don't need admin methods like index management.
481    ///
482    /// [`GraphStoreMut`]: grafeo_core::graph::GraphStoreMut
483    #[must_use]
484    pub fn graph_store(&self) -> Arc<dyn GraphStoreMut> {
485        if let Some(ref ext_store) = self.external_store {
486            Arc::clone(ext_store)
487        } else {
488            Arc::clone(&self.store) as Arc<dyn GraphStoreMut>
489        }
490    }
491
492    /// Garbage collects old MVCC versions that are no longer visible.
493    ///
494    /// Determines the minimum epoch required by active transactions and prunes
495    /// version chains older than that threshold. Also cleans up completed
496    /// transaction metadata in the transaction manager.
497    pub fn gc(&self) {
498        let min_epoch = self.tx_manager.min_active_epoch();
499        self.store.gc_versions(min_epoch);
500        self.tx_manager.gc();
501    }
502
503    /// Returns the buffer manager for memory-aware operations.
504    #[must_use]
505    pub fn buffer_manager(&self) -> &Arc<BufferManager> {
506        &self.buffer_manager
507    }
508
509    /// Returns the query cache.
510    #[must_use]
511    pub fn query_cache(&self) -> &Arc<QueryCache> {
512        &self.query_cache
513    }
514
515    // =========================================================================
516    // Lifecycle
517    // =========================================================================
518
519    /// Closes the database, flushing all pending writes.
520    ///
521    /// For persistent databases, this ensures everything is safely on disk.
522    /// Called automatically when the database is dropped, but you can call
523    /// it explicitly if you need to guarantee durability at a specific point.
524    ///
525    /// # Errors
526    ///
527    /// Returns an error if the WAL can't be flushed (check disk space/permissions).
528    pub fn close(&self) -> Result<()> {
529        let mut is_open = self.is_open.write();
530        if !*is_open {
531            return Ok(());
532        }
533
534        // Commit and checkpoint WAL
535        #[cfg(feature = "wal")]
536        if let Some(ref wal) = self.wal {
537            let epoch = self.store.current_epoch();
538
539            // Use the last assigned transaction ID, or create a checkpoint-only tx
540            let checkpoint_tx = self.tx_manager.last_assigned_tx_id().unwrap_or_else(|| {
541                // No transactions have been started; begin one for checkpoint
542                self.tx_manager.begin()
543            });
544
545            // Log a TxCommit to mark all pending records as committed
546            wal.log(&WalRecord::TxCommit {
547                tx_id: checkpoint_tx,
548            })?;
549
550            // Then checkpoint
551            wal.checkpoint(checkpoint_tx, epoch)?;
552            wal.sync()?;
553        }
554
555        *is_open = false;
556        Ok(())
557    }
558
559    /// Returns the typed WAL if available.
560    #[cfg(feature = "wal")]
561    #[must_use]
562    pub fn wal(&self) -> Option<&Arc<LpgWal>> {
563        self.wal.as_ref()
564    }
565
566    /// Logs a WAL record if WAL is enabled.
567    #[cfg(feature = "wal")]
568    pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
569        if let Some(ref wal) = self.wal {
570            wal.log(record)?;
571        }
572        Ok(())
573    }
574}
575
576impl Drop for GrafeoDB {
577    fn drop(&mut self) {
578        if let Err(e) = self.close() {
579            tracing::error!("Error closing database: {}", e);
580        }
581    }
582}
583
584impl crate::admin::AdminService for GrafeoDB {
585    fn info(&self) -> crate::admin::DatabaseInfo {
586        self.info()
587    }
588
589    fn detailed_stats(&self) -> crate::admin::DatabaseStats {
590        self.detailed_stats()
591    }
592
593    fn schema(&self) -> crate::admin::SchemaInfo {
594        self.schema()
595    }
596
597    fn validate(&self) -> crate::admin::ValidationResult {
598        self.validate()
599    }
600
601    fn wal_status(&self) -> crate::admin::WalStatus {
602        self.wal_status()
603    }
604
605    fn wal_checkpoint(&self) -> Result<()> {
606        self.wal_checkpoint()
607    }
608}
609
610// =========================================================================
611// Query Result Types
612// =========================================================================
613
614/// The result of running a query.
615///
616/// Contains rows and columns, like a table. Use [`iter()`](Self::iter) to
617/// loop through rows, or [`scalar()`](Self::scalar) if you expect a single value.
618///
619/// # Examples
620///
621/// ```
622/// use grafeo_engine::GrafeoDB;
623///
624/// let db = GrafeoDB::new_in_memory();
625/// db.create_node(&["Person"]);
626///
627/// let result = db.execute("MATCH (p:Person) RETURN count(p) AS total")?;
628///
629/// // Check what we got
630/// println!("Columns: {:?}", result.columns);
631/// println!("Rows: {}", result.row_count());
632///
633/// // Iterate through results
634/// for row in result.iter() {
635///     println!("{:?}", row);
636/// }
637/// # Ok::<(), grafeo_common::utils::error::Error>(())
638/// ```
639#[derive(Debug)]
640pub struct QueryResult {
641    /// Column names from the RETURN clause.
642    pub columns: Vec<String>,
643    /// Column types - useful for distinguishing NodeId/EdgeId from plain integers.
644    pub column_types: Vec<grafeo_common::types::LogicalType>,
645    /// The actual result rows.
646    pub rows: Vec<Vec<grafeo_common::types::Value>>,
647    /// Query execution time in milliseconds (if timing was enabled).
648    pub execution_time_ms: Option<f64>,
649    /// Number of rows scanned during query execution (estimate).
650    pub rows_scanned: Option<u64>,
651}
652
653impl QueryResult {
654    /// Creates a new empty query result.
655    #[must_use]
656    pub fn new(columns: Vec<String>) -> Self {
657        let len = columns.len();
658        Self {
659            columns,
660            column_types: vec![grafeo_common::types::LogicalType::Any; len],
661            rows: Vec::new(),
662            execution_time_ms: None,
663            rows_scanned: None,
664        }
665    }
666
667    /// Creates a new empty query result with column types.
668    #[must_use]
669    pub fn with_types(
670        columns: Vec<String>,
671        column_types: Vec<grafeo_common::types::LogicalType>,
672    ) -> Self {
673        Self {
674            columns,
675            column_types,
676            rows: Vec::new(),
677            execution_time_ms: None,
678            rows_scanned: None,
679        }
680    }
681
682    /// Sets the execution metrics on this result.
683    pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
684        self.execution_time_ms = Some(execution_time_ms);
685        self.rows_scanned = Some(rows_scanned);
686        self
687    }
688
689    /// Returns the execution time in milliseconds, if available.
690    #[must_use]
691    pub fn execution_time_ms(&self) -> Option<f64> {
692        self.execution_time_ms
693    }
694
695    /// Returns the number of rows scanned, if available.
696    #[must_use]
697    pub fn rows_scanned(&self) -> Option<u64> {
698        self.rows_scanned
699    }
700
701    /// Returns the number of rows.
702    #[must_use]
703    pub fn row_count(&self) -> usize {
704        self.rows.len()
705    }
706
707    /// Returns the number of columns.
708    #[must_use]
709    pub fn column_count(&self) -> usize {
710        self.columns.len()
711    }
712
713    /// Returns true if the result is empty.
714    #[must_use]
715    pub fn is_empty(&self) -> bool {
716        self.rows.is_empty()
717    }
718
719    /// Extracts a single value from the result.
720    ///
721    /// Use this when your query returns exactly one row with one column,
722    /// like `RETURN count(n)` or `RETURN sum(p.amount)`.
723    ///
724    /// # Errors
725    ///
726    /// Returns an error if the result has multiple rows or columns.
727    pub fn scalar<T: FromValue>(&self) -> Result<T> {
728        if self.rows.len() != 1 || self.columns.len() != 1 {
729            return Err(grafeo_common::utils::error::Error::InvalidValue(
730                "Expected single value".to_string(),
731            ));
732        }
733        T::from_value(&self.rows[0][0])
734    }
735
736    /// Returns an iterator over the rows.
737    pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
738        self.rows.iter()
739    }
740}
741
742/// Converts a [`grafeo_common::types::Value`] to a concrete Rust type.
743///
744/// Implemented for common types like `i64`, `f64`, `String`, and `bool`.
745/// Used by [`QueryResult::scalar()`] to extract typed values.
746pub trait FromValue: Sized {
747    /// Attempts the conversion, returning an error on type mismatch.
748    fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
749}
750
751impl FromValue for i64 {
752    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
753        value
754            .as_int64()
755            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
756                expected: "INT64".to_string(),
757                found: value.type_name().to_string(),
758            })
759    }
760}
761
762impl FromValue for f64 {
763    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
764        value
765            .as_float64()
766            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
767                expected: "FLOAT64".to_string(),
768                found: value.type_name().to_string(),
769            })
770    }
771}
772
773impl FromValue for String {
774    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
775        value.as_str().map(String::from).ok_or_else(|| {
776            grafeo_common::utils::error::Error::TypeMismatch {
777                expected: "STRING".to_string(),
778                found: value.type_name().to_string(),
779            }
780        })
781    }
782}
783
784impl FromValue for bool {
785    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
786        value
787            .as_bool()
788            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
789                expected: "BOOL".to_string(),
790                found: value.type_name().to_string(),
791            })
792    }
793}
794
795#[cfg(test)]
796mod tests {
797    use super::*;
798
799    #[test]
800    fn test_create_in_memory_database() {
801        let db = GrafeoDB::new_in_memory();
802        assert_eq!(db.node_count(), 0);
803        assert_eq!(db.edge_count(), 0);
804    }
805
806    #[test]
807    fn test_database_config() {
808        let config = Config::in_memory().with_threads(4).with_query_logging();
809
810        let db = GrafeoDB::with_config(config).unwrap();
811        assert_eq!(db.config().threads, 4);
812        assert!(db.config().query_logging);
813    }
814
815    #[test]
816    fn test_database_session() {
817        let db = GrafeoDB::new_in_memory();
818        let _session = db.session();
819        // Session should be created successfully
820    }
821
822    #[cfg(feature = "wal")]
823    #[test]
824    fn test_persistent_database_recovery() {
825        use grafeo_common::types::Value;
826        use tempfile::tempdir;
827
828        let dir = tempdir().unwrap();
829        let db_path = dir.path().join("test_db");
830
831        // Create database and add some data
832        {
833            let db = GrafeoDB::open(&db_path).unwrap();
834
835            let alice = db.create_node(&["Person"]);
836            db.set_node_property(alice, "name", Value::from("Alice"));
837
838            let bob = db.create_node(&["Person"]);
839            db.set_node_property(bob, "name", Value::from("Bob"));
840
841            let _edge = db.create_edge(alice, bob, "KNOWS");
842
843            // Explicitly close to flush WAL
844            db.close().unwrap();
845        }
846
847        // Reopen and verify data was recovered
848        {
849            let db = GrafeoDB::open(&db_path).unwrap();
850
851            assert_eq!(db.node_count(), 2);
852            assert_eq!(db.edge_count(), 1);
853
854            // Verify nodes exist
855            let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
856            assert!(node0.is_some());
857
858            let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
859            assert!(node1.is_some());
860        }
861    }
862
863    #[cfg(feature = "wal")]
864    #[test]
865    fn test_wal_logging() {
866        use tempfile::tempdir;
867
868        let dir = tempdir().unwrap();
869        let db_path = dir.path().join("wal_test_db");
870
871        let db = GrafeoDB::open(&db_path).unwrap();
872
873        // Create some data
874        let node = db.create_node(&["Test"]);
875        db.delete_node(node);
876
877        // WAL should have records
878        if let Some(wal) = db.wal() {
879            assert!(wal.record_count() > 0);
880        }
881
882        db.close().unwrap();
883    }
884
885    #[cfg(feature = "wal")]
886    #[test]
887    fn test_wal_recovery_multiple_sessions() {
888        // Tests that WAL recovery works correctly across multiple open/close cycles
889        use grafeo_common::types::Value;
890        use tempfile::tempdir;
891
892        let dir = tempdir().unwrap();
893        let db_path = dir.path().join("multi_session_db");
894
895        // Session 1: Create initial data
896        {
897            let db = GrafeoDB::open(&db_path).unwrap();
898            let alice = db.create_node(&["Person"]);
899            db.set_node_property(alice, "name", Value::from("Alice"));
900            db.close().unwrap();
901        }
902
903        // Session 2: Add more data
904        {
905            let db = GrafeoDB::open(&db_path).unwrap();
906            assert_eq!(db.node_count(), 1); // Previous data recovered
907            let bob = db.create_node(&["Person"]);
908            db.set_node_property(bob, "name", Value::from("Bob"));
909            db.close().unwrap();
910        }
911
912        // Session 3: Verify all data
913        {
914            let db = GrafeoDB::open(&db_path).unwrap();
915            assert_eq!(db.node_count(), 2);
916
917            // Verify properties were recovered correctly
918            let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
919            assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
920
921            let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
922            assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
923        }
924    }
925
926    #[cfg(feature = "wal")]
927    #[test]
928    fn test_database_consistency_after_mutations() {
929        // Tests that database remains consistent after a series of create/delete operations
930        use grafeo_common::types::Value;
931        use tempfile::tempdir;
932
933        let dir = tempdir().unwrap();
934        let db_path = dir.path().join("consistency_db");
935
936        {
937            let db = GrafeoDB::open(&db_path).unwrap();
938
939            // Create nodes
940            let a = db.create_node(&["Node"]);
941            let b = db.create_node(&["Node"]);
942            let c = db.create_node(&["Node"]);
943
944            // Create edges
945            let e1 = db.create_edge(a, b, "LINKS");
946            let _e2 = db.create_edge(b, c, "LINKS");
947
948            // Delete middle node and its edge
949            db.delete_edge(e1);
950            db.delete_node(b);
951
952            // Set properties on remaining nodes
953            db.set_node_property(a, "value", Value::Int64(1));
954            db.set_node_property(c, "value", Value::Int64(3));
955
956            db.close().unwrap();
957        }
958
959        // Reopen and verify consistency
960        {
961            let db = GrafeoDB::open(&db_path).unwrap();
962
963            // Should have 2 nodes (a and c), b was deleted
964            // Note: node_count includes deleted nodes in some implementations
965            // What matters is that the non-deleted nodes are accessible
966            let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
967            assert!(node_a.is_some());
968
969            let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
970            assert!(node_c.is_some());
971
972            // Middle node should be deleted
973            let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
974            assert!(node_b.is_none());
975        }
976    }
977
978    #[cfg(feature = "wal")]
979    #[test]
980    fn test_close_is_idempotent() {
981        // Calling close() multiple times should not cause errors
982        use tempfile::tempdir;
983
984        let dir = tempdir().unwrap();
985        let db_path = dir.path().join("close_test_db");
986
987        let db = GrafeoDB::open(&db_path).unwrap();
988        db.create_node(&["Test"]);
989
990        // First close should succeed
991        assert!(db.close().is_ok());
992
993        // Second close should also succeed (idempotent)
994        assert!(db.close().is_ok());
995    }
996
997    #[test]
998    fn test_query_result_has_metrics() {
999        // Verifies that query results include execution metrics
1000        let db = GrafeoDB::new_in_memory();
1001        db.create_node(&["Person"]);
1002        db.create_node(&["Person"]);
1003
1004        #[cfg(feature = "gql")]
1005        {
1006            let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
1007
1008            // Metrics should be populated
1009            assert!(result.execution_time_ms.is_some());
1010            assert!(result.rows_scanned.is_some());
1011            assert!(result.execution_time_ms.unwrap() >= 0.0);
1012            assert_eq!(result.rows_scanned.unwrap(), 2);
1013        }
1014    }
1015
1016    #[test]
1017    fn test_empty_query_result_metrics() {
1018        // Verifies metrics are correct for queries returning no results
1019        let db = GrafeoDB::new_in_memory();
1020        db.create_node(&["Person"]);
1021
1022        #[cfg(feature = "gql")]
1023        {
1024            // Query that matches nothing
1025            let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
1026
1027            assert!(result.execution_time_ms.is_some());
1028            assert!(result.rows_scanned.is_some());
1029            assert_eq!(result.rows_scanned.unwrap(), 0);
1030        }
1031    }
1032
1033    #[cfg(feature = "cdc")]
1034    mod cdc_integration {
1035        use super::*;
1036
1037        #[test]
1038        fn test_node_lifecycle_history() {
1039            let db = GrafeoDB::new_in_memory();
1040
1041            // Create
1042            let id = db.create_node(&["Person"]);
1043            // Update
1044            db.set_node_property(id, "name", "Alice".into());
1045            db.set_node_property(id, "name", "Bob".into());
1046            // Delete
1047            db.delete_node(id);
1048
1049            let history = db.history(id).unwrap();
1050            assert_eq!(history.len(), 4); // create + 2 updates + delete
1051            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1052            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1053            assert!(history[1].before.is_none()); // first set_node_property has no prior value
1054            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
1055            assert!(history[2].before.is_some()); // second update has prior "Alice"
1056            assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
1057        }
1058
1059        #[test]
1060        fn test_edge_lifecycle_history() {
1061            let db = GrafeoDB::new_in_memory();
1062
1063            let alice = db.create_node(&["Person"]);
1064            let bob = db.create_node(&["Person"]);
1065            let edge = db.create_edge(alice, bob, "KNOWS");
1066            db.set_edge_property(edge, "since", 2024i64.into());
1067            db.delete_edge(edge);
1068
1069            let history = db.history(edge).unwrap();
1070            assert_eq!(history.len(), 3); // create + update + delete
1071            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1072            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1073            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
1074        }
1075
1076        #[test]
1077        fn test_create_node_with_props_cdc() {
1078            let db = GrafeoDB::new_in_memory();
1079
1080            let id = db.create_node_with_props(
1081                &["Person"],
1082                vec![
1083                    ("name", grafeo_common::types::Value::from("Alice")),
1084                    ("age", grafeo_common::types::Value::from(30i64)),
1085                ],
1086            );
1087
1088            let history = db.history(id).unwrap();
1089            assert_eq!(history.len(), 1);
1090            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1091            // Props should be captured
1092            let after = history[0].after.as_ref().unwrap();
1093            assert_eq!(after.len(), 2);
1094        }
1095
1096        #[test]
1097        fn test_changes_between() {
1098            let db = GrafeoDB::new_in_memory();
1099
1100            let id1 = db.create_node(&["A"]);
1101            let _id2 = db.create_node(&["B"]);
1102            db.set_node_property(id1, "x", 1i64.into());
1103
1104            // All events should be at the same epoch (in-memory, epoch doesn't advance without tx)
1105            let changes = db
1106                .changes_between(
1107                    grafeo_common::types::EpochId(0),
1108                    grafeo_common::types::EpochId(u64::MAX),
1109                )
1110                .unwrap();
1111            assert_eq!(changes.len(), 3); // 2 creates + 1 update
1112        }
1113    }
1114}