Skip to main content

grafeo_engine/database/
mod.rs

1//! The main database struct and operations.
2//!
3//! Start here with [`GrafeoDB`] - it's your handle to everything.
4//!
5//! Operations are split across focused submodules:
6//! - `query` - Query execution (execute, execute_cypher, etc.)
7//! - `crud` - Node/edge CRUD operations
8//! - `index` - Property, vector, and text index management
9//! - `search` - Vector, text, and hybrid search
10//! - `embed` - Embedding model management
11//! - `persistence` - Save, load, snapshots, iteration
12//! - `admin` - Stats, introspection, diagnostics, CDC
13
14mod admin;
15mod crud;
16#[cfg(feature = "embed")]
17mod embed;
18mod index;
19mod persistence;
20mod query;
21mod search;
22
23use std::path::Path;
24use std::sync::Arc;
25use std::sync::atomic::AtomicUsize;
26
27use parking_lot::RwLock;
28
29#[cfg(feature = "wal")]
30use grafeo_adapters::storage::wal::{
31    DurabilityMode as WalDurabilityMode, WalConfig, WalManager, WalRecord, WalRecovery,
32};
33use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
34use grafeo_common::utils::error::Result;
35use grafeo_core::graph::lpg::LpgStore;
36#[cfg(feature = "rdf")]
37use grafeo_core::graph::rdf::RdfStore;
38
39use crate::config::Config;
40use crate::query::cache::QueryCache;
41use crate::session::Session;
42use crate::transaction::TransactionManager;
43
44/// Your handle to a Grafeo database.
45///
46/// Start here. Create one with [`new_in_memory()`](Self::new_in_memory) for
47/// quick experiments, or [`open()`](Self::open) for persistent storage.
48/// Then grab a [`session()`](Self::session) to start querying.
49///
50/// # Examples
51///
52/// ```
53/// use grafeo_engine::GrafeoDB;
54///
55/// // Quick in-memory database
56/// let db = GrafeoDB::new_in_memory();
57///
58/// // Add some data
59/// db.create_node(&["Person"]);
60///
61/// // Query it
62/// let session = db.session();
63/// let result = session.execute("MATCH (p:Person) RETURN p")?;
64/// # Ok::<(), grafeo_common::utils::error::Error>(())
65/// ```
66pub struct GrafeoDB {
67    /// Database configuration.
68    pub(super) config: Config,
69    /// The underlying graph store.
70    pub(super) store: Arc<LpgStore>,
71    /// RDF triple store (if RDF feature is enabled).
72    #[cfg(feature = "rdf")]
73    pub(super) rdf_store: Arc<RdfStore>,
74    /// Transaction manager.
75    pub(super) tx_manager: Arc<TransactionManager>,
76    /// Unified buffer manager.
77    pub(super) buffer_manager: Arc<BufferManager>,
78    /// Write-ahead log manager (if durability is enabled).
79    #[cfg(feature = "wal")]
80    pub(super) wal: Option<Arc<WalManager>>,
81    /// Query cache for parsed and optimized plans.
82    pub(super) query_cache: Arc<QueryCache>,
83    /// Shared commit counter for auto-GC across sessions.
84    pub(super) commit_counter: Arc<AtomicUsize>,
85    /// Whether the database is open.
86    pub(super) is_open: RwLock<bool>,
87    /// Change data capture log for tracking mutations.
88    #[cfg(feature = "cdc")]
89    pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
90    /// Registered embedding models for text-to-vector conversion.
91    #[cfg(feature = "embed")]
92    pub(super) embedding_models:
93        RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
94}
95
96impl GrafeoDB {
97    /// Creates an in-memory database - fast to create, gone when dropped.
98    ///
99    /// Use this for tests, experiments, or when you don't need persistence.
100    /// For data that survives restarts, use [`open()`](Self::open) instead.
101    ///
102    /// # Examples
103    ///
104    /// ```
105    /// use grafeo_engine::GrafeoDB;
106    ///
107    /// let db = GrafeoDB::new_in_memory();
108    /// let session = db.session();
109    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
110    /// # Ok::<(), grafeo_common::utils::error::Error>(())
111    /// ```
112    #[must_use]
113    pub fn new_in_memory() -> Self {
114        Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
115    }
116
117    /// Opens a database at the given path, creating it if it doesn't exist.
118    ///
119    /// If you've used this path before, Grafeo recovers your data from the
120    /// write-ahead log automatically. First open on a new path creates an
121    /// empty database.
122    ///
123    /// # Errors
124    ///
125    /// Returns an error if the path isn't writable or recovery fails.
126    ///
127    /// # Examples
128    ///
129    /// ```no_run
130    /// use grafeo_engine::GrafeoDB;
131    ///
132    /// let db = GrafeoDB::open("./my_social_network")?;
133    /// # Ok::<(), grafeo_common::utils::error::Error>(())
134    /// ```
135    #[cfg(feature = "wal")]
136    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
137        Self::with_config(Config::persistent(path.as_ref()))
138    }
139
140    /// Creates a database with custom configuration.
141    ///
142    /// Use this when you need fine-grained control over memory limits,
143    /// thread counts, or persistence settings. For most cases,
144    /// [`new_in_memory()`](Self::new_in_memory) or [`open()`](Self::open)
145    /// are simpler.
146    ///
147    /// # Errors
148    ///
149    /// Returns an error if the database can't be created or recovery fails.
150    ///
151    /// # Examples
152    ///
153    /// ```
154    /// use grafeo_engine::{GrafeoDB, Config};
155    ///
156    /// // In-memory with a 512MB limit
157    /// let config = Config::in_memory()
158    ///     .with_memory_limit(512 * 1024 * 1024);
159    ///
160    /// let db = GrafeoDB::with_config(config)?;
161    /// # Ok::<(), grafeo_common::utils::error::Error>(())
162    /// ```
163    pub fn with_config(config: Config) -> Result<Self> {
164        // Validate configuration before proceeding
165        config
166            .validate()
167            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
168
169        let store = Arc::new(LpgStore::new());
170        #[cfg(feature = "rdf")]
171        let rdf_store = Arc::new(RdfStore::new());
172        let tx_manager = Arc::new(TransactionManager::new());
173
174        // Create buffer manager with configured limits
175        let buffer_config = BufferManagerConfig {
176            budget: config.memory_limit.unwrap_or_else(|| {
177                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
178            }),
179            spill_path: config
180                .spill_path
181                .clone()
182                .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
183            ..BufferManagerConfig::default()
184        };
185        let buffer_manager = BufferManager::new(buffer_config);
186
187        // Initialize WAL if persistence is enabled
188        #[cfg(feature = "wal")]
189        let wal = if config.wal_enabled {
190            if let Some(ref db_path) = config.path {
191                // Create database directory if it doesn't exist
192                std::fs::create_dir_all(db_path)?;
193
194                let wal_path = db_path.join("wal");
195
196                // Check if WAL exists and recover if needed
197                if wal_path.exists() {
198                    let recovery = WalRecovery::new(&wal_path);
199                    let records = recovery.recover()?;
200                    Self::apply_wal_records(&store, &records)?;
201                }
202
203                // Open/create WAL manager with configured durability
204                let wal_durability = match config.wal_durability {
205                    crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
206                    crate::config::DurabilityMode::Batch {
207                        max_delay_ms,
208                        max_records,
209                    } => WalDurabilityMode::Batch {
210                        max_delay_ms,
211                        max_records,
212                    },
213                    crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
214                        WalDurabilityMode::Adaptive { target_interval_ms }
215                    }
216                    crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
217                };
218                let wal_config = WalConfig {
219                    durability: wal_durability,
220                    ..WalConfig::default()
221                };
222                let wal_manager = WalManager::with_config(&wal_path, wal_config)?;
223                Some(Arc::new(wal_manager))
224            } else {
225                None
226            }
227        } else {
228            None
229        };
230
231        // Create query cache with default capacity (1000 queries)
232        let query_cache = Arc::new(QueryCache::default());
233
234        Ok(Self {
235            config,
236            store,
237            #[cfg(feature = "rdf")]
238            rdf_store,
239            tx_manager,
240            buffer_manager,
241            #[cfg(feature = "wal")]
242            wal,
243            query_cache,
244            commit_counter: Arc::new(AtomicUsize::new(0)),
245            is_open: RwLock::new(true),
246            #[cfg(feature = "cdc")]
247            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
248            #[cfg(feature = "embed")]
249            embedding_models: RwLock::new(hashbrown::HashMap::new()),
250        })
251    }
252
253    /// Applies WAL records to restore the database state.
254    #[cfg(feature = "wal")]
255    fn apply_wal_records(store: &LpgStore, records: &[WalRecord]) -> Result<()> {
256        for record in records {
257            match record {
258                WalRecord::CreateNode { id, labels } => {
259                    let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
260                    store.create_node_with_id(*id, &label_refs);
261                }
262                WalRecord::DeleteNode { id } => {
263                    store.delete_node(*id);
264                }
265                WalRecord::CreateEdge {
266                    id,
267                    src,
268                    dst,
269                    edge_type,
270                } => {
271                    store.create_edge_with_id(*id, *src, *dst, edge_type);
272                }
273                WalRecord::DeleteEdge { id } => {
274                    store.delete_edge(*id);
275                }
276                WalRecord::SetNodeProperty { id, key, value } => {
277                    store.set_node_property(*id, key, value.clone());
278                }
279                WalRecord::SetEdgeProperty { id, key, value } => {
280                    store.set_edge_property(*id, key, value.clone());
281                }
282                WalRecord::AddNodeLabel { id, label } => {
283                    store.add_label(*id, label);
284                }
285                WalRecord::RemoveNodeLabel { id, label } => {
286                    store.remove_label(*id, label);
287                }
288                WalRecord::TxCommit { .. }
289                | WalRecord::TxAbort { .. }
290                | WalRecord::Checkpoint { .. } => {
291                    // Transaction control records don't need replay action
292                    // (recovery already filtered to only committed transactions)
293                }
294            }
295        }
296        Ok(())
297    }
298
299    // =========================================================================
300    // Session & Configuration
301    // =========================================================================
302
303    /// Opens a new session for running queries.
304    ///
305    /// Sessions are cheap to create - spin up as many as you need. Each
306    /// gets its own transaction context, so concurrent sessions won't
307    /// block each other on reads.
308    ///
309    /// # Examples
310    ///
311    /// ```
312    /// use grafeo_engine::GrafeoDB;
313    ///
314    /// let db = GrafeoDB::new_in_memory();
315    /// let session = db.session();
316    ///
317    /// // Run queries through the session
318    /// let result = session.execute("MATCH (n) RETURN count(n)")?;
319    /// # Ok::<(), grafeo_common::utils::error::Error>(())
320    /// ```
321    #[must_use]
322    pub fn session(&self) -> Session {
323        #[cfg(feature = "rdf")]
324        let mut session = Session::with_rdf_store_and_adaptive(
325            Arc::clone(&self.store),
326            Arc::clone(&self.rdf_store),
327            Arc::clone(&self.tx_manager),
328            Arc::clone(&self.query_cache),
329            self.config.adaptive.clone(),
330            self.config.factorized_execution,
331            self.config.graph_model,
332            self.config.query_timeout,
333            Arc::clone(&self.commit_counter),
334            self.config.gc_interval,
335        );
336        #[cfg(not(feature = "rdf"))]
337        let mut session = Session::with_adaptive(
338            Arc::clone(&self.store),
339            Arc::clone(&self.tx_manager),
340            Arc::clone(&self.query_cache),
341            self.config.adaptive.clone(),
342            self.config.factorized_execution,
343            self.config.graph_model,
344            self.config.query_timeout,
345            Arc::clone(&self.commit_counter),
346            self.config.gc_interval,
347        );
348
349        #[cfg(feature = "cdc")]
350        session.set_cdc_log(Arc::clone(&self.cdc_log));
351
352        // Suppress unused_mut when cdc is disabled
353        let _ = &mut session;
354
355        session
356    }
357
358    /// Returns the adaptive execution configuration.
359    #[must_use]
360    pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
361        &self.config.adaptive
362    }
363
364    /// Returns the configuration.
365    #[must_use]
366    pub fn config(&self) -> &Config {
367        &self.config
368    }
369
370    /// Returns the graph data model of this database.
371    #[must_use]
372    pub fn graph_model(&self) -> crate::config::GraphModel {
373        self.config.graph_model
374    }
375
376    /// Returns the configured memory limit in bytes, if any.
377    #[must_use]
378    pub fn memory_limit(&self) -> Option<usize> {
379        self.config.memory_limit
380    }
381
382    /// Returns the underlying store.
383    ///
384    /// This provides direct access to the LPG store for algorithm implementations.
385    #[must_use]
386    pub fn store(&self) -> &Arc<LpgStore> {
387        &self.store
388    }
389
390    /// Garbage collects old MVCC versions that are no longer visible.
391    ///
392    /// Determines the minimum epoch required by active transactions and prunes
393    /// version chains older than that threshold. Also cleans up completed
394    /// transaction metadata in the transaction manager.
395    pub fn gc(&self) {
396        let min_epoch = self.tx_manager.min_active_epoch();
397        self.store.gc_versions(min_epoch);
398        self.tx_manager.gc();
399    }
400
401    /// Returns the buffer manager for memory-aware operations.
402    #[must_use]
403    pub fn buffer_manager(&self) -> &Arc<BufferManager> {
404        &self.buffer_manager
405    }
406
407    /// Returns the query cache.
408    #[must_use]
409    pub fn query_cache(&self) -> &Arc<QueryCache> {
410        &self.query_cache
411    }
412
413    // =========================================================================
414    // Lifecycle
415    // =========================================================================
416
417    /// Closes the database, flushing all pending writes.
418    ///
419    /// For persistent databases, this ensures everything is safely on disk.
420    /// Called automatically when the database is dropped, but you can call
421    /// it explicitly if you need to guarantee durability at a specific point.
422    ///
423    /// # Errors
424    ///
425    /// Returns an error if the WAL can't be flushed (check disk space/permissions).
426    pub fn close(&self) -> Result<()> {
427        let mut is_open = self.is_open.write();
428        if !*is_open {
429            return Ok(());
430        }
431
432        // Commit and checkpoint WAL
433        #[cfg(feature = "wal")]
434        if let Some(ref wal) = self.wal {
435            let epoch = self.store.current_epoch();
436
437            // Use the last assigned transaction ID, or create a checkpoint-only tx
438            let checkpoint_tx = self.tx_manager.last_assigned_tx_id().unwrap_or_else(|| {
439                // No transactions have been started; begin one for checkpoint
440                self.tx_manager.begin()
441            });
442
443            // Log a TxCommit to mark all pending records as committed
444            wal.log(&WalRecord::TxCommit {
445                tx_id: checkpoint_tx,
446            })?;
447
448            // Then checkpoint
449            wal.checkpoint(checkpoint_tx, epoch)?;
450            wal.sync()?;
451        }
452
453        *is_open = false;
454        Ok(())
455    }
456
457    /// Returns the WAL manager if available.
458    #[cfg(feature = "wal")]
459    #[must_use]
460    pub fn wal(&self) -> Option<&Arc<WalManager>> {
461        self.wal.as_ref()
462    }
463
464    /// Logs a WAL record if WAL is enabled.
465    #[cfg(feature = "wal")]
466    pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
467        if let Some(ref wal) = self.wal {
468            wal.log(record)?;
469        }
470        Ok(())
471    }
472}
473
474impl Drop for GrafeoDB {
475    fn drop(&mut self) {
476        if let Err(e) = self.close() {
477            tracing::error!("Error closing database: {}", e);
478        }
479    }
480}
481
482impl crate::admin::AdminService for GrafeoDB {
483    fn info(&self) -> crate::admin::DatabaseInfo {
484        self.info()
485    }
486
487    fn detailed_stats(&self) -> crate::admin::DatabaseStats {
488        self.detailed_stats()
489    }
490
491    fn schema(&self) -> crate::admin::SchemaInfo {
492        self.schema()
493    }
494
495    fn validate(&self) -> crate::admin::ValidationResult {
496        self.validate()
497    }
498
499    fn wal_status(&self) -> crate::admin::WalStatus {
500        self.wal_status()
501    }
502
503    fn wal_checkpoint(&self) -> Result<()> {
504        self.wal_checkpoint()
505    }
506}
507
508// =========================================================================
509// Query Result Types
510// =========================================================================
511
512/// The result of running a query.
513///
514/// Contains rows and columns, like a table. Use [`iter()`](Self::iter) to
515/// loop through rows, or [`scalar()`](Self::scalar) if you expect a single value.
516///
517/// # Examples
518///
519/// ```
520/// use grafeo_engine::GrafeoDB;
521///
522/// let db = GrafeoDB::new_in_memory();
523/// db.create_node(&["Person"]);
524///
525/// let result = db.execute("MATCH (p:Person) RETURN count(p) AS total")?;
526///
527/// // Check what we got
528/// println!("Columns: {:?}", result.columns);
529/// println!("Rows: {}", result.row_count());
530///
531/// // Iterate through results
532/// for row in result.iter() {
533///     println!("{:?}", row);
534/// }
535/// # Ok::<(), grafeo_common::utils::error::Error>(())
536/// ```
537#[derive(Debug)]
538pub struct QueryResult {
539    /// Column names from the RETURN clause.
540    pub columns: Vec<String>,
541    /// Column types - useful for distinguishing NodeId/EdgeId from plain integers.
542    pub column_types: Vec<grafeo_common::types::LogicalType>,
543    /// The actual result rows.
544    pub rows: Vec<Vec<grafeo_common::types::Value>>,
545    /// Query execution time in milliseconds (if timing was enabled).
546    pub execution_time_ms: Option<f64>,
547    /// Number of rows scanned during query execution (estimate).
548    pub rows_scanned: Option<u64>,
549}
550
551impl QueryResult {
552    /// Creates a new empty query result.
553    #[must_use]
554    pub fn new(columns: Vec<String>) -> Self {
555        let len = columns.len();
556        Self {
557            columns,
558            column_types: vec![grafeo_common::types::LogicalType::Any; len],
559            rows: Vec::new(),
560            execution_time_ms: None,
561            rows_scanned: None,
562        }
563    }
564
565    /// Creates a new empty query result with column types.
566    #[must_use]
567    pub fn with_types(
568        columns: Vec<String>,
569        column_types: Vec<grafeo_common::types::LogicalType>,
570    ) -> Self {
571        Self {
572            columns,
573            column_types,
574            rows: Vec::new(),
575            execution_time_ms: None,
576            rows_scanned: None,
577        }
578    }
579
580    /// Sets the execution metrics on this result.
581    pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
582        self.execution_time_ms = Some(execution_time_ms);
583        self.rows_scanned = Some(rows_scanned);
584        self
585    }
586
587    /// Returns the execution time in milliseconds, if available.
588    #[must_use]
589    pub fn execution_time_ms(&self) -> Option<f64> {
590        self.execution_time_ms
591    }
592
593    /// Returns the number of rows scanned, if available.
594    #[must_use]
595    pub fn rows_scanned(&self) -> Option<u64> {
596        self.rows_scanned
597    }
598
599    /// Returns the number of rows.
600    #[must_use]
601    pub fn row_count(&self) -> usize {
602        self.rows.len()
603    }
604
605    /// Returns the number of columns.
606    #[must_use]
607    pub fn column_count(&self) -> usize {
608        self.columns.len()
609    }
610
611    /// Returns true if the result is empty.
612    #[must_use]
613    pub fn is_empty(&self) -> bool {
614        self.rows.is_empty()
615    }
616
617    /// Extracts a single value from the result.
618    ///
619    /// Use this when your query returns exactly one row with one column,
620    /// like `RETURN count(n)` or `RETURN sum(p.amount)`.
621    ///
622    /// # Errors
623    ///
624    /// Returns an error if the result has multiple rows or columns.
625    pub fn scalar<T: FromValue>(&self) -> Result<T> {
626        if self.rows.len() != 1 || self.columns.len() != 1 {
627            return Err(grafeo_common::utils::error::Error::InvalidValue(
628                "Expected single value".to_string(),
629            ));
630        }
631        T::from_value(&self.rows[0][0])
632    }
633
634    /// Returns an iterator over the rows.
635    pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
636        self.rows.iter()
637    }
638}
639
640/// Converts a [`grafeo_common::types::Value`] to a concrete Rust type.
641///
642/// Implemented for common types like `i64`, `f64`, `String`, and `bool`.
643/// Used by [`QueryResult::scalar()`] to extract typed values.
644pub trait FromValue: Sized {
645    /// Attempts the conversion, returning an error on type mismatch.
646    fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
647}
648
649impl FromValue for i64 {
650    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
651        value
652            .as_int64()
653            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
654                expected: "INT64".to_string(),
655                found: value.type_name().to_string(),
656            })
657    }
658}
659
660impl FromValue for f64 {
661    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
662        value
663            .as_float64()
664            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
665                expected: "FLOAT64".to_string(),
666                found: value.type_name().to_string(),
667            })
668    }
669}
670
671impl FromValue for String {
672    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
673        value.as_str().map(String::from).ok_or_else(|| {
674            grafeo_common::utils::error::Error::TypeMismatch {
675                expected: "STRING".to_string(),
676                found: value.type_name().to_string(),
677            }
678        })
679    }
680}
681
682impl FromValue for bool {
683    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
684        value
685            .as_bool()
686            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
687                expected: "BOOL".to_string(),
688                found: value.type_name().to_string(),
689            })
690    }
691}
692
693#[cfg(test)]
694mod tests {
695    use super::*;
696
697    #[test]
698    fn test_create_in_memory_database() {
699        let db = GrafeoDB::new_in_memory();
700        assert_eq!(db.node_count(), 0);
701        assert_eq!(db.edge_count(), 0);
702    }
703
704    #[test]
705    fn test_database_config() {
706        let config = Config::in_memory().with_threads(4).with_query_logging();
707
708        let db = GrafeoDB::with_config(config).unwrap();
709        assert_eq!(db.config().threads, 4);
710        assert!(db.config().query_logging);
711    }
712
713    #[test]
714    fn test_database_session() {
715        let db = GrafeoDB::new_in_memory();
716        let _session = db.session();
717        // Session should be created successfully
718    }
719
720    #[cfg(feature = "wal")]
721    #[test]
722    fn test_persistent_database_recovery() {
723        use grafeo_common::types::Value;
724        use tempfile::tempdir;
725
726        let dir = tempdir().unwrap();
727        let db_path = dir.path().join("test_db");
728
729        // Create database and add some data
730        {
731            let db = GrafeoDB::open(&db_path).unwrap();
732
733            let alice = db.create_node(&["Person"]);
734            db.set_node_property(alice, "name", Value::from("Alice"));
735
736            let bob = db.create_node(&["Person"]);
737            db.set_node_property(bob, "name", Value::from("Bob"));
738
739            let _edge = db.create_edge(alice, bob, "KNOWS");
740
741            // Explicitly close to flush WAL
742            db.close().unwrap();
743        }
744
745        // Reopen and verify data was recovered
746        {
747            let db = GrafeoDB::open(&db_path).unwrap();
748
749            assert_eq!(db.node_count(), 2);
750            assert_eq!(db.edge_count(), 1);
751
752            // Verify nodes exist
753            let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
754            assert!(node0.is_some());
755
756            let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
757            assert!(node1.is_some());
758        }
759    }
760
761    #[cfg(feature = "wal")]
762    #[test]
763    fn test_wal_logging() {
764        use tempfile::tempdir;
765
766        let dir = tempdir().unwrap();
767        let db_path = dir.path().join("wal_test_db");
768
769        let db = GrafeoDB::open(&db_path).unwrap();
770
771        // Create some data
772        let node = db.create_node(&["Test"]);
773        db.delete_node(node);
774
775        // WAL should have records
776        if let Some(wal) = db.wal() {
777            assert!(wal.record_count() > 0);
778        }
779
780        db.close().unwrap();
781    }
782
783    #[cfg(feature = "wal")]
784    #[test]
785    fn test_wal_recovery_multiple_sessions() {
786        // Tests that WAL recovery works correctly across multiple open/close cycles
787        use grafeo_common::types::Value;
788        use tempfile::tempdir;
789
790        let dir = tempdir().unwrap();
791        let db_path = dir.path().join("multi_session_db");
792
793        // Session 1: Create initial data
794        {
795            let db = GrafeoDB::open(&db_path).unwrap();
796            let alice = db.create_node(&["Person"]);
797            db.set_node_property(alice, "name", Value::from("Alice"));
798            db.close().unwrap();
799        }
800
801        // Session 2: Add more data
802        {
803            let db = GrafeoDB::open(&db_path).unwrap();
804            assert_eq!(db.node_count(), 1); // Previous data recovered
805            let bob = db.create_node(&["Person"]);
806            db.set_node_property(bob, "name", Value::from("Bob"));
807            db.close().unwrap();
808        }
809
810        // Session 3: Verify all data
811        {
812            let db = GrafeoDB::open(&db_path).unwrap();
813            assert_eq!(db.node_count(), 2);
814
815            // Verify properties were recovered correctly
816            let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
817            assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
818
819            let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
820            assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
821        }
822    }
823
824    #[cfg(feature = "wal")]
825    #[test]
826    fn test_database_consistency_after_mutations() {
827        // Tests that database remains consistent after a series of create/delete operations
828        use grafeo_common::types::Value;
829        use tempfile::tempdir;
830
831        let dir = tempdir().unwrap();
832        let db_path = dir.path().join("consistency_db");
833
834        {
835            let db = GrafeoDB::open(&db_path).unwrap();
836
837            // Create nodes
838            let a = db.create_node(&["Node"]);
839            let b = db.create_node(&["Node"]);
840            let c = db.create_node(&["Node"]);
841
842            // Create edges
843            let e1 = db.create_edge(a, b, "LINKS");
844            let _e2 = db.create_edge(b, c, "LINKS");
845
846            // Delete middle node and its edge
847            db.delete_edge(e1);
848            db.delete_node(b);
849
850            // Set properties on remaining nodes
851            db.set_node_property(a, "value", Value::Int64(1));
852            db.set_node_property(c, "value", Value::Int64(3));
853
854            db.close().unwrap();
855        }
856
857        // Reopen and verify consistency
858        {
859            let db = GrafeoDB::open(&db_path).unwrap();
860
861            // Should have 2 nodes (a and c), b was deleted
862            // Note: node_count includes deleted nodes in some implementations
863            // What matters is that the non-deleted nodes are accessible
864            let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
865            assert!(node_a.is_some());
866
867            let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
868            assert!(node_c.is_some());
869
870            // Middle node should be deleted
871            let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
872            assert!(node_b.is_none());
873        }
874    }
875
876    #[cfg(feature = "wal")]
877    #[test]
878    fn test_close_is_idempotent() {
879        // Calling close() multiple times should not cause errors
880        use tempfile::tempdir;
881
882        let dir = tempdir().unwrap();
883        let db_path = dir.path().join("close_test_db");
884
885        let db = GrafeoDB::open(&db_path).unwrap();
886        db.create_node(&["Test"]);
887
888        // First close should succeed
889        assert!(db.close().is_ok());
890
891        // Second close should also succeed (idempotent)
892        assert!(db.close().is_ok());
893    }
894
895    #[test]
896    fn test_query_result_has_metrics() {
897        // Verifies that query results include execution metrics
898        let db = GrafeoDB::new_in_memory();
899        db.create_node(&["Person"]);
900        db.create_node(&["Person"]);
901
902        #[cfg(feature = "gql")]
903        {
904            let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
905
906            // Metrics should be populated
907            assert!(result.execution_time_ms.is_some());
908            assert!(result.rows_scanned.is_some());
909            assert!(result.execution_time_ms.unwrap() >= 0.0);
910            assert_eq!(result.rows_scanned.unwrap(), 2);
911        }
912    }
913
914    #[test]
915    fn test_empty_query_result_metrics() {
916        // Verifies metrics are correct for queries returning no results
917        let db = GrafeoDB::new_in_memory();
918        db.create_node(&["Person"]);
919
920        #[cfg(feature = "gql")]
921        {
922            // Query that matches nothing
923            let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
924
925            assert!(result.execution_time_ms.is_some());
926            assert!(result.rows_scanned.is_some());
927            assert_eq!(result.rows_scanned.unwrap(), 0);
928        }
929    }
930
931    #[cfg(feature = "cdc")]
932    mod cdc_integration {
933        use super::*;
934
935        #[test]
936        fn test_node_lifecycle_history() {
937            let db = GrafeoDB::new_in_memory();
938
939            // Create
940            let id = db.create_node(&["Person"]);
941            // Update
942            db.set_node_property(id, "name", "Alice".into());
943            db.set_node_property(id, "name", "Bob".into());
944            // Delete
945            db.delete_node(id);
946
947            let history = db.history(id).unwrap();
948            assert_eq!(history.len(), 4); // create + 2 updates + delete
949            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
950            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
951            assert!(history[1].before.is_none()); // first set_node_property has no prior value
952            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
953            assert!(history[2].before.is_some()); // second update has prior "Alice"
954            assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
955        }
956
957        #[test]
958        fn test_edge_lifecycle_history() {
959            let db = GrafeoDB::new_in_memory();
960
961            let alice = db.create_node(&["Person"]);
962            let bob = db.create_node(&["Person"]);
963            let edge = db.create_edge(alice, bob, "KNOWS");
964            db.set_edge_property(edge, "since", 2024i64.into());
965            db.delete_edge(edge);
966
967            let history = db.history(edge).unwrap();
968            assert_eq!(history.len(), 3); // create + update + delete
969            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
970            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
971            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
972        }
973
974        #[test]
975        fn test_create_node_with_props_cdc() {
976            let db = GrafeoDB::new_in_memory();
977
978            let id = db.create_node_with_props(
979                &["Person"],
980                vec![
981                    ("name", grafeo_common::types::Value::from("Alice")),
982                    ("age", grafeo_common::types::Value::from(30i64)),
983                ],
984            );
985
986            let history = db.history(id).unwrap();
987            assert_eq!(history.len(), 1);
988            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
989            // Props should be captured
990            let after = history[0].after.as_ref().unwrap();
991            assert_eq!(after.len(), 2);
992        }
993
994        #[test]
995        fn test_changes_between() {
996            let db = GrafeoDB::new_in_memory();
997
998            let id1 = db.create_node(&["A"]);
999            let _id2 = db.create_node(&["B"]);
1000            db.set_node_property(id1, "x", 1i64.into());
1001
1002            // All events should be at the same epoch (in-memory, epoch doesn't advance without tx)
1003            let changes = db
1004                .changes_between(
1005                    grafeo_common::types::EpochId(0),
1006                    grafeo_common::types::EpochId(u64::MAX),
1007                )
1008                .unwrap();
1009            assert_eq!(changes.len(), 3); // 2 creates + 1 update
1010        }
1011    }
1012}