Skip to main content

grafeo_engine/database/
mod.rs

1//! The main database struct and operations.
2//!
3//! Start here with [`GrafeoDB`] - it's your handle to everything.
4//!
5//! Operations are split across focused submodules:
6//! - `query` - Query execution (execute, execute_cypher, etc.)
7//! - `crud` - Node/edge CRUD operations
8//! - `index` - Property, vector, and text index management
9//! - `search` - Vector, text, and hybrid search
10//! - `embed` - Embedding model management
11//! - `persistence` - Save, load, snapshots, iteration
12//! - `admin` - Stats, introspection, diagnostics, CDC
13
14mod admin;
15#[cfg(feature = "async-storage")]
16mod async_ops;
17#[cfg(feature = "async-storage")]
18pub(crate) mod async_wal_store;
19mod crud;
20#[cfg(feature = "embed")]
21mod embed;
22mod index;
23mod persistence;
24mod query;
25#[cfg(feature = "rdf")]
26mod rdf_ops;
27mod search;
28#[cfg(feature = "wal")]
29pub(crate) mod wal_store;
30
31use grafeo_common::grafeo_error;
32#[cfg(feature = "wal")]
33use std::path::Path;
34use std::sync::Arc;
35use std::sync::atomic::AtomicUsize;
36
37use parking_lot::RwLock;
38
39#[cfg(feature = "grafeo-file")]
40use grafeo_adapters::storage::file::GrafeoFileManager;
41#[cfg(feature = "wal")]
42use grafeo_adapters::storage::wal::{
43    DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord, WalRecovery,
44};
45use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
46use grafeo_common::utils::error::Result;
47use grafeo_core::graph::lpg::LpgStore;
48#[cfg(feature = "rdf")]
49use grafeo_core::graph::rdf::RdfStore;
50use grafeo_core::graph::{GraphStore, GraphStoreMut};
51
52use crate::catalog::Catalog;
53use crate::config::Config;
54use crate::query::cache::QueryCache;
55use crate::session::Session;
56use crate::transaction::TransactionManager;
57
58/// Your handle to a Grafeo database.
59///
60/// Start here. Create one with [`new_in_memory()`](Self::new_in_memory) for
61/// quick experiments, or [`open()`](Self::open) for persistent storage.
62/// Then grab a [`session()`](Self::session) to start querying.
63///
64/// # Examples
65///
66/// ```
67/// use grafeo_engine::GrafeoDB;
68///
69/// // Quick in-memory database
70/// let db = GrafeoDB::new_in_memory();
71///
72/// // Add some data
73/// db.create_node(&["Person"]);
74///
75/// // Query it
76/// let session = db.session();
77/// let result = session.execute("MATCH (p:Person) RETURN p")?;
78/// # Ok::<(), grafeo_common::utils::error::Error>(())
79/// ```
80pub struct GrafeoDB {
81    /// Database configuration.
82    pub(super) config: Config,
83    /// The underlying graph store (None when using an external store).
84    pub(super) store: Option<Arc<LpgStore>>,
85    /// Schema and metadata catalog shared across sessions.
86    pub(super) catalog: Arc<Catalog>,
87    /// RDF triple store (if RDF feature is enabled).
88    #[cfg(feature = "rdf")]
89    pub(super) rdf_store: Arc<RdfStore>,
90    /// Transaction manager.
91    pub(super) transaction_manager: Arc<TransactionManager>,
92    /// Unified buffer manager.
93    pub(super) buffer_manager: Arc<BufferManager>,
94    /// Write-ahead log manager (if durability is enabled).
95    #[cfg(feature = "wal")]
96    pub(super) wal: Option<Arc<LpgWal>>,
97    /// Shared WAL graph context tracker. Tracks which named graph was last
98    /// written to the WAL, so concurrent sessions can emit `SwitchGraph`
99    /// records only when the context actually changes.
100    #[cfg(feature = "wal")]
101    pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
102    /// Query cache for parsed and optimized plans.
103    pub(super) query_cache: Arc<QueryCache>,
104    /// Shared commit counter for auto-GC across sessions.
105    pub(super) commit_counter: Arc<AtomicUsize>,
106    /// Whether the database is open.
107    pub(super) is_open: RwLock<bool>,
108    /// Change data capture log for tracking mutations.
109    #[cfg(feature = "cdc")]
110    pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
111    /// Registered embedding models for text-to-vector conversion.
112    #[cfg(feature = "embed")]
113    pub(super) embedding_models:
114        RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
115    /// Single-file database manager (when using `.grafeo` format).
116    #[cfg(feature = "grafeo-file")]
117    pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
118    /// External read-only graph store (when using with_store() or with_read_store()).
119    /// When set, sessions route queries through this store instead of the built-in LpgStore.
120    pub(super) external_read_store: Option<Arc<dyn GraphStore>>,
121    /// External writable graph store (when using with_store()).
122    /// None for read-only databases created via with_read_store().
123    pub(super) external_write_store: Option<Arc<dyn GraphStoreMut>>,
124    /// Metrics registry shared across all sessions.
125    #[cfg(feature = "metrics")]
126    pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
127    /// Persistent graph context for one-shot `execute()` calls.
128    /// When set, each call to `session()` pre-configures the session to this graph.
129    /// Updated after every one-shot `execute()` to reflect `USE GRAPH` / `SESSION RESET`.
130    current_graph: RwLock<Option<String>>,
131    /// Persistent schema context for one-shot `execute()` calls.
132    /// When set, each call to `session()` pre-configures the session to this schema.
133    /// Updated after every one-shot `execute()` to reflect `SESSION SET SCHEMA` / `SESSION RESET`.
134    current_schema: RwLock<Option<String>>,
135    /// Whether this database is open in read-only mode.
136    /// When true, sessions automatically enforce read-only transactions.
137    read_only: bool,
138}
139
140impl GrafeoDB {
141    /// Returns a reference to the built-in LPG store.
142    ///
143    /// # Panics
144    ///
145    /// Panics if the database was created with [`with_store()`](Self::with_store) or
146    /// [`with_read_store()`](Self::with_read_store), which use an external store
147    /// instead of the built-in LPG store.
148    fn lpg_store(&self) -> &Arc<LpgStore> {
149        self.store.as_ref().expect(
150            "no built-in LpgStore: this GrafeoDB was created with an external store \
151             (with_store / with_read_store). Use session() or graph_store() instead.",
152        )
153    }
154
155    /// Creates an in-memory database, fast to create, gone when dropped.
156    ///
157    /// Use this for tests, experiments, or when you don't need persistence.
158    /// For data that survives restarts, use [`open()`](Self::open) instead.
159    ///
160    /// # Panics
161    ///
162    /// Panics if the internal arena allocator cannot be initialized (out of memory).
163    /// Use [`with_config()`](Self::with_config) for a fallible alternative.
164    ///
165    /// # Examples
166    ///
167    /// ```
168    /// use grafeo_engine::GrafeoDB;
169    ///
170    /// let db = GrafeoDB::new_in_memory();
171    /// let session = db.session();
172    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
173    /// # Ok::<(), grafeo_common::utils::error::Error>(())
174    /// ```
175    #[must_use]
176    pub fn new_in_memory() -> Self {
177        Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
178    }
179
180    /// Opens a database at the given path, creating it if it doesn't exist.
181    ///
182    /// If you've used this path before, Grafeo recovers your data from the
183    /// write-ahead log automatically. First open on a new path creates an
184    /// empty database.
185    ///
186    /// # Errors
187    ///
188    /// Returns an error if the path isn't writable or recovery fails.
189    ///
190    /// # Examples
191    ///
192    /// ```no_run
193    /// use grafeo_engine::GrafeoDB;
194    ///
195    /// let db = GrafeoDB::open("./my_social_network")?;
196    /// # Ok::<(), grafeo_common::utils::error::Error>(())
197    /// ```
198    #[cfg(feature = "wal")]
199    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
200        Self::with_config(Config::persistent(path.as_ref()))
201    }
202
203    /// Opens an existing database in read-only mode.
204    ///
205    /// Uses a shared file lock, so multiple processes can read the same
206    /// `.grafeo` file concurrently. The database loads the last checkpoint
207    /// snapshot but does **not** replay the WAL or allow mutations.
208    ///
209    /// Currently only supports the single-file (`.grafeo`) format.
210    ///
211    /// # Errors
212    ///
213    /// Returns an error if the file doesn't exist or can't be read.
214    ///
215    /// # Examples
216    ///
217    /// ```no_run
218    /// use grafeo_engine::GrafeoDB;
219    ///
220    /// let db = GrafeoDB::open_read_only("./my_graph.grafeo")?;
221    /// let session = db.session();
222    /// let result = session.execute("MATCH (n) RETURN n LIMIT 10")?;
223    /// // Mutations will return an error:
224    /// // session.execute("INSERT (:Person)") => Err(ReadOnly)
225    /// # Ok::<(), grafeo_common::utils::error::Error>(())
226    /// ```
227    #[cfg(feature = "grafeo-file")]
228    pub fn open_read_only(path: impl AsRef<std::path::Path>) -> Result<Self> {
229        Self::with_config(Config::read_only(path.as_ref()))
230    }
231
232    /// Creates a database with custom configuration.
233    ///
234    /// Use this when you need fine-grained control over memory limits,
235    /// thread counts, or persistence settings. For most cases,
236    /// [`new_in_memory()`](Self::new_in_memory) or [`open()`](Self::open)
237    /// are simpler.
238    ///
239    /// # Errors
240    ///
241    /// Returns an error if the database can't be created or recovery fails.
242    ///
243    /// # Examples
244    ///
245    /// ```
246    /// use grafeo_engine::{GrafeoDB, Config};
247    ///
248    /// // In-memory with a 512MB limit
249    /// let config = Config::in_memory()
250    ///     .with_memory_limit(512 * 1024 * 1024);
251    ///
252    /// let db = GrafeoDB::with_config(config)?;
253    /// # Ok::<(), grafeo_common::utils::error::Error>(())
254    /// ```
255    pub fn with_config(config: Config) -> Result<Self> {
256        // Validate configuration before proceeding
257        config
258            .validate()
259            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
260
261        let store = Arc::new(LpgStore::new()?);
262        #[cfg(feature = "rdf")]
263        let rdf_store = Arc::new(RdfStore::new());
264        let transaction_manager = Arc::new(TransactionManager::new());
265
266        // Create buffer manager with configured limits
267        let buffer_config = BufferManagerConfig {
268            budget: config.memory_limit.unwrap_or_else(|| {
269                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
270            }),
271            spill_path: config
272                .spill_path
273                .clone()
274                .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
275            ..BufferManagerConfig::default()
276        };
277        let buffer_manager = BufferManager::new(buffer_config);
278
279        // Create catalog early so WAL replay can restore schema definitions
280        let catalog = Arc::new(Catalog::new());
281
282        let is_read_only = config.access_mode == crate::config::AccessMode::ReadOnly;
283
284        // --- Single-file format (.grafeo) ---
285        #[cfg(feature = "grafeo-file")]
286        let file_manager: Option<Arc<GrafeoFileManager>> = if is_read_only {
287            // Read-only mode: open with shared lock, load snapshot, skip WAL
288            if let Some(ref db_path) = config.path {
289                if db_path.exists() && db_path.is_file() {
290                    let fm = GrafeoFileManager::open_read_only(db_path)?;
291                    let snapshot_data = fm.read_snapshot()?;
292                    if !snapshot_data.is_empty() {
293                        Self::apply_snapshot_data(
294                            &store,
295                            &catalog,
296                            #[cfg(feature = "rdf")]
297                            &rdf_store,
298                            &snapshot_data,
299                        )?;
300                    }
301                    Some(Arc::new(fm))
302                } else {
303                    return Err(grafeo_common::utils::error::Error::Internal(format!(
304                        "read-only open requires an existing .grafeo file: {}",
305                        db_path.display()
306                    )));
307                }
308            } else {
309                return Err(grafeo_common::utils::error::Error::Internal(
310                    "read-only mode requires a database path".to_string(),
311                ));
312            }
313        } else if let Some(ref db_path) = config.path {
314            // Initialize the file manager whenever single-file format is selected,
315            // regardless of whether WAL is enabled. Without this, a database opened
316            // with wal_enabled:false + StorageFormat::SingleFile would produce no
317            // output at all (the file manager was previously gated behind wal_enabled).
318            if Self::should_use_single_file(db_path, config.storage_format) {
319                let fm = if db_path.exists() && db_path.is_file() {
320                    GrafeoFileManager::open(db_path)?
321                } else if !db_path.exists() {
322                    GrafeoFileManager::create(db_path)?
323                } else {
324                    // Path exists but is not a file (directory, etc.)
325                    return Err(grafeo_common::utils::error::Error::Internal(format!(
326                        "path exists but is not a file: {}",
327                        db_path.display()
328                    )));
329                };
330
331                // Load snapshot data from the file
332                let snapshot_data = fm.read_snapshot()?;
333                if !snapshot_data.is_empty() {
334                    Self::apply_snapshot_data(
335                        &store,
336                        &catalog,
337                        #[cfg(feature = "rdf")]
338                        &rdf_store,
339                        &snapshot_data,
340                    )?;
341                }
342
343                // Recover sidecar WAL if WAL is enabled and a sidecar exists
344                #[cfg(feature = "wal")]
345                if config.wal_enabled && fm.has_sidecar_wal() {
346                    let recovery = WalRecovery::new(fm.sidecar_wal_path());
347                    let records = recovery.recover()?;
348                    Self::apply_wal_records(
349                        &store,
350                        &catalog,
351                        #[cfg(feature = "rdf")]
352                        &rdf_store,
353                        &records,
354                    )?;
355                }
356
357                Some(Arc::new(fm))
358            } else {
359                None
360            }
361        } else {
362            None
363        };
364
365        // Determine whether to use the WAL directory path (legacy) or sidecar
366        // Read-only mode skips WAL entirely (no recovery, no creation).
367        #[cfg(feature = "wal")]
368        let wal = if is_read_only {
369            None
370        } else if config.wal_enabled {
371            if let Some(ref db_path) = config.path {
372                // When using single-file format, the WAL is a sidecar directory
373                #[cfg(feature = "grafeo-file")]
374                let wal_path = if let Some(ref fm) = file_manager {
375                    let p = fm.sidecar_wal_path();
376                    std::fs::create_dir_all(&p)?;
377                    p
378                } else {
379                    // Legacy: WAL inside the database directory
380                    std::fs::create_dir_all(db_path)?;
381                    db_path.join("wal")
382                };
383
384                #[cfg(not(feature = "grafeo-file"))]
385                let wal_path = {
386                    std::fs::create_dir_all(db_path)?;
387                    db_path.join("wal")
388                };
389
390                // For legacy WAL directory format, check if WAL exists and recover
391                #[cfg(feature = "grafeo-file")]
392                let is_single_file = file_manager.is_some();
393                #[cfg(not(feature = "grafeo-file"))]
394                let is_single_file = false;
395
396                if !is_single_file && wal_path.exists() {
397                    let recovery = WalRecovery::new(&wal_path);
398                    let records = recovery.recover()?;
399                    Self::apply_wal_records(
400                        &store,
401                        &catalog,
402                        #[cfg(feature = "rdf")]
403                        &rdf_store,
404                        &records,
405                    )?;
406                }
407
408                // Open/create WAL manager with configured durability
409                let wal_durability = match config.wal_durability {
410                    crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
411                    crate::config::DurabilityMode::Batch {
412                        max_delay_ms,
413                        max_records,
414                    } => WalDurabilityMode::Batch {
415                        max_delay_ms,
416                        max_records,
417                    },
418                    crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
419                        WalDurabilityMode::Adaptive { target_interval_ms }
420                    }
421                    crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
422                };
423                let wal_config = WalConfig {
424                    durability: wal_durability,
425                    ..WalConfig::default()
426                };
427                let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
428                Some(Arc::new(wal_manager))
429            } else {
430                None
431            }
432        } else {
433            None
434        };
435
436        // Create query cache with default capacity (1000 queries)
437        let query_cache = Arc::new(QueryCache::default());
438
439        // After all snapshot/WAL recovery, sync TransactionManager epoch
440        // with the store so queries use the correct viewing epoch.
441        #[cfg(feature = "temporal")]
442        transaction_manager.sync_epoch(store.current_epoch());
443
444        Ok(Self {
445            config,
446            store: Some(store),
447            catalog,
448            #[cfg(feature = "rdf")]
449            rdf_store,
450            transaction_manager,
451            buffer_manager,
452            #[cfg(feature = "wal")]
453            wal,
454            #[cfg(feature = "wal")]
455            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
456            query_cache,
457            commit_counter: Arc::new(AtomicUsize::new(0)),
458            is_open: RwLock::new(true),
459            #[cfg(feature = "cdc")]
460            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
461            #[cfg(feature = "embed")]
462            embedding_models: RwLock::new(hashbrown::HashMap::new()),
463            #[cfg(feature = "grafeo-file")]
464            file_manager,
465            external_read_store: None,
466            external_write_store: None,
467            #[cfg(feature = "metrics")]
468            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
469            current_graph: RwLock::new(None),
470            current_schema: RwLock::new(None),
471            read_only: is_read_only,
472        })
473    }
474
475    /// Creates a database backed by a custom [`GraphStoreMut`] implementation.
476    ///
477    /// The external store handles all data persistence. WAL, CDC, and index
478    /// management are the responsibility of the store implementation.
479    ///
480    /// Query execution (all 6 languages, optimizer, planner) works through the
481    /// provided store. Admin operations (schema introspection, persistence,
482    /// vector/text indexes) are not available on external stores.
483    ///
484    /// # Examples
485    ///
486    /// ```no_run
487    /// use std::sync::Arc;
488    /// use grafeo_engine::{GrafeoDB, Config};
489    /// use grafeo_core::graph::GraphStoreMut;
490    ///
491    /// fn example(store: Arc<dyn GraphStoreMut>) -> grafeo_common::utils::error::Result<()> {
492    ///     let db = GrafeoDB::with_store(store, Config::in_memory())?;
493    ///     let result = db.execute("MATCH (n) RETURN count(n)")?;
494    ///     Ok(())
495    /// }
496    /// ```
497    ///
498    /// [`GraphStoreMut`]: grafeo_core::graph::GraphStoreMut
499    pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
500        config
501            .validate()
502            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
503
504        let transaction_manager = Arc::new(TransactionManager::new());
505
506        let buffer_config = BufferManagerConfig {
507            budget: config.memory_limit.unwrap_or_else(|| {
508                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
509            }),
510            spill_path: None,
511            ..BufferManagerConfig::default()
512        };
513        let buffer_manager = BufferManager::new(buffer_config);
514
515        let query_cache = Arc::new(QueryCache::default());
516
517        Ok(Self {
518            config,
519            store: None,
520            catalog: Arc::new(Catalog::new()),
521            #[cfg(feature = "rdf")]
522            rdf_store: Arc::new(RdfStore::new()),
523            transaction_manager,
524            buffer_manager,
525            #[cfg(feature = "wal")]
526            wal: None,
527            #[cfg(feature = "wal")]
528            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
529            query_cache,
530            commit_counter: Arc::new(AtomicUsize::new(0)),
531            is_open: RwLock::new(true),
532            #[cfg(feature = "cdc")]
533            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
534            #[cfg(feature = "embed")]
535            embedding_models: RwLock::new(hashbrown::HashMap::new()),
536            #[cfg(feature = "grafeo-file")]
537            file_manager: None,
538            external_read_store: Some(Arc::clone(&store) as Arc<dyn GraphStore>),
539            external_write_store: Some(store),
540            #[cfg(feature = "metrics")]
541            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
542            current_graph: RwLock::new(None),
543            current_schema: RwLock::new(None),
544            read_only: false,
545        })
546    }
547
548    /// Creates a database backed by a read-only [`GraphStore`].
549    ///
550    /// The database is set to read-only mode. Write queries (CREATE, SET,
551    /// DELETE) will return `TransactionError::ReadOnly`.
552    ///
553    /// # Examples
554    ///
555    /// ```no_run
556    /// use std::sync::Arc;
557    /// use grafeo_engine::{GrafeoDB, Config};
558    /// use grafeo_core::graph::GraphStore;
559    ///
560    /// fn example(store: Arc<dyn GraphStore>) -> grafeo_common::utils::error::Result<()> {
561    ///     let db = GrafeoDB::with_read_store(store, Config::in_memory())?;
562    ///     let result = db.execute("MATCH (n) RETURN count(n)")?;
563    ///     Ok(())
564    /// }
565    /// ```
566    ///
567    /// [`GraphStore`]: grafeo_core::graph::GraphStore
568    pub fn with_read_store(store: Arc<dyn GraphStore>, config: Config) -> Result<Self> {
569        config
570            .validate()
571            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
572
573        let transaction_manager = Arc::new(TransactionManager::new());
574
575        let buffer_config = BufferManagerConfig {
576            budget: config.memory_limit.unwrap_or_else(|| {
577                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
578            }),
579            spill_path: None,
580            ..BufferManagerConfig::default()
581        };
582        let buffer_manager = BufferManager::new(buffer_config);
583
584        let query_cache = Arc::new(QueryCache::default());
585
586        Ok(Self {
587            config,
588            store: None,
589            catalog: Arc::new(Catalog::new()),
590            #[cfg(feature = "rdf")]
591            rdf_store: Arc::new(RdfStore::new()),
592            transaction_manager,
593            buffer_manager,
594            #[cfg(feature = "wal")]
595            wal: None,
596            #[cfg(feature = "wal")]
597            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
598            query_cache,
599            commit_counter: Arc::new(AtomicUsize::new(0)),
600            is_open: RwLock::new(true),
601            #[cfg(feature = "cdc")]
602            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
603            #[cfg(feature = "embed")]
604            embedding_models: RwLock::new(hashbrown::HashMap::new()),
605            #[cfg(feature = "grafeo-file")]
606            file_manager: None,
607            external_read_store: Some(store),
608            external_write_store: None,
609            #[cfg(feature = "metrics")]
610            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
611            current_graph: RwLock::new(None),
612            current_schema: RwLock::new(None),
613            read_only: true,
614        })
615    }
616
617    /// Applies WAL records to restore the database state.
618    ///
619    /// Data mutation records are routed through a graph cursor that tracks
620    /// `SwitchGraph` context markers, replaying mutations into the correct
621    /// named graph (or the default graph when cursor is `None`).
622    #[cfg(feature = "wal")]
623    fn apply_wal_records(
624        store: &Arc<LpgStore>,
625        catalog: &Catalog,
626        #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
627        records: &[WalRecord],
628    ) -> Result<()> {
629        use crate::catalog::{
630            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
631        };
632        use grafeo_common::utils::error::Error;
633
634        // Graph cursor: tracks which named graph receives data mutations.
635        // `None` means the default graph.
636        let mut current_graph: Option<String> = None;
637        let mut target_store: Arc<LpgStore> = Arc::clone(store);
638
639        for record in records {
640            match record {
641                // --- Named graph lifecycle ---
642                WalRecord::CreateNamedGraph { name } => {
643                    let _ = store.create_graph(name);
644                }
645                WalRecord::DropNamedGraph { name } => {
646                    store.drop_graph(name);
647                    // Reset cursor if the dropped graph was active
648                    if current_graph.as_deref() == Some(name.as_str()) {
649                        current_graph = None;
650                        target_store = Arc::clone(store);
651                    }
652                }
653                WalRecord::SwitchGraph { name } => {
654                    current_graph.clone_from(name);
655                    target_store = match &current_graph {
656                        None => Arc::clone(store),
657                        Some(graph_name) => store
658                            .graph_or_create(graph_name)
659                            .map_err(|e| Error::Internal(e.to_string()))?,
660                    };
661                }
662
663                // --- Data mutations: routed through target_store ---
664                WalRecord::CreateNode { id, labels } => {
665                    let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
666                    target_store.create_node_with_id(*id, &label_refs)?;
667                }
668                WalRecord::DeleteNode { id } => {
669                    target_store.delete_node(*id);
670                }
671                WalRecord::CreateEdge {
672                    id,
673                    src,
674                    dst,
675                    edge_type,
676                } => {
677                    target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
678                }
679                WalRecord::DeleteEdge { id } => {
680                    target_store.delete_edge(*id);
681                }
682                WalRecord::SetNodeProperty { id, key, value } => {
683                    target_store.set_node_property(*id, key, value.clone());
684                }
685                WalRecord::SetEdgeProperty { id, key, value } => {
686                    target_store.set_edge_property(*id, key, value.clone());
687                }
688                WalRecord::AddNodeLabel { id, label } => {
689                    target_store.add_label(*id, label);
690                }
691                WalRecord::RemoveNodeLabel { id, label } => {
692                    target_store.remove_label(*id, label);
693                }
694                WalRecord::RemoveNodeProperty { id, key } => {
695                    target_store.remove_node_property(*id, key);
696                }
697                WalRecord::RemoveEdgeProperty { id, key } => {
698                    target_store.remove_edge_property(*id, key);
699                }
700
701                // --- Schema DDL replay (always on root catalog) ---
702                WalRecord::CreateNodeType {
703                    name,
704                    properties,
705                    constraints,
706                } => {
707                    let def = NodeTypeDefinition {
708                        name: name.clone(),
709                        properties: properties
710                            .iter()
711                            .map(|(n, t, nullable)| TypedProperty {
712                                name: n.clone(),
713                                data_type: PropertyDataType::from_type_name(t),
714                                nullable: *nullable,
715                                default_value: None,
716                            })
717                            .collect(),
718                        constraints: constraints
719                            .iter()
720                            .map(|(kind, props)| match kind.as_str() {
721                                "unique" => TypeConstraint::Unique(props.clone()),
722                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
723                                "not_null" if !props.is_empty() => {
724                                    TypeConstraint::NotNull(props[0].clone())
725                                }
726                                _ => TypeConstraint::Unique(props.clone()),
727                            })
728                            .collect(),
729                        parent_types: Vec::new(),
730                    };
731                    let _ = catalog.register_node_type(def);
732                }
733                WalRecord::DropNodeType { name } => {
734                    let _ = catalog.drop_node_type(name);
735                }
736                WalRecord::CreateEdgeType {
737                    name,
738                    properties,
739                    constraints,
740                } => {
741                    let def = EdgeTypeDefinition {
742                        name: name.clone(),
743                        properties: properties
744                            .iter()
745                            .map(|(n, t, nullable)| TypedProperty {
746                                name: n.clone(),
747                                data_type: PropertyDataType::from_type_name(t),
748                                nullable: *nullable,
749                                default_value: None,
750                            })
751                            .collect(),
752                        constraints: constraints
753                            .iter()
754                            .map(|(kind, props)| match kind.as_str() {
755                                "unique" => TypeConstraint::Unique(props.clone()),
756                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
757                                "not_null" if !props.is_empty() => {
758                                    TypeConstraint::NotNull(props[0].clone())
759                                }
760                                _ => TypeConstraint::Unique(props.clone()),
761                            })
762                            .collect(),
763                        source_node_types: Vec::new(),
764                        target_node_types: Vec::new(),
765                    };
766                    let _ = catalog.register_edge_type_def(def);
767                }
768                WalRecord::DropEdgeType { name } => {
769                    let _ = catalog.drop_edge_type_def(name);
770                }
771                WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
772                    // Index recreation is handled by the store on startup
773                    // (indexes are rebuilt from data, not WAL)
774                }
775                WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
776                    // Constraint definitions are part of type definitions
777                    // and replayed via CreateNodeType/CreateEdgeType
778                }
779                WalRecord::CreateGraphType {
780                    name,
781                    node_types,
782                    edge_types,
783                    open,
784                } => {
785                    use crate::catalog::GraphTypeDefinition;
786                    let def = GraphTypeDefinition {
787                        name: name.clone(),
788                        allowed_node_types: node_types.clone(),
789                        allowed_edge_types: edge_types.clone(),
790                        open: *open,
791                    };
792                    let _ = catalog.register_graph_type(def);
793                }
794                WalRecord::DropGraphType { name } => {
795                    let _ = catalog.drop_graph_type(name);
796                }
797                WalRecord::CreateSchema { name } => {
798                    let _ = catalog.register_schema_namespace(name.clone());
799                }
800                WalRecord::DropSchema { name } => {
801                    let _ = catalog.drop_schema_namespace(name);
802                }
803
804                WalRecord::AlterNodeType { name, alterations } => {
805                    for (action, prop_name, type_name, nullable) in alterations {
806                        match action.as_str() {
807                            "add" => {
808                                let prop = TypedProperty {
809                                    name: prop_name.clone(),
810                                    data_type: PropertyDataType::from_type_name(type_name),
811                                    nullable: *nullable,
812                                    default_value: None,
813                                };
814                                let _ = catalog.alter_node_type_add_property(name, prop);
815                            }
816                            "drop" => {
817                                let _ = catalog.alter_node_type_drop_property(name, prop_name);
818                            }
819                            _ => {}
820                        }
821                    }
822                }
823                WalRecord::AlterEdgeType { name, alterations } => {
824                    for (action, prop_name, type_name, nullable) in alterations {
825                        match action.as_str() {
826                            "add" => {
827                                let prop = TypedProperty {
828                                    name: prop_name.clone(),
829                                    data_type: PropertyDataType::from_type_name(type_name),
830                                    nullable: *nullable,
831                                    default_value: None,
832                                };
833                                let _ = catalog.alter_edge_type_add_property(name, prop);
834                            }
835                            "drop" => {
836                                let _ = catalog.alter_edge_type_drop_property(name, prop_name);
837                            }
838                            _ => {}
839                        }
840                    }
841                }
842                WalRecord::AlterGraphType { name, alterations } => {
843                    for (action, type_name) in alterations {
844                        match action.as_str() {
845                            "add_node" => {
846                                let _ =
847                                    catalog.alter_graph_type_add_node_type(name, type_name.clone());
848                            }
849                            "drop_node" => {
850                                let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
851                            }
852                            "add_edge" => {
853                                let _ =
854                                    catalog.alter_graph_type_add_edge_type(name, type_name.clone());
855                            }
856                            "drop_edge" => {
857                                let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
858                            }
859                            _ => {}
860                        }
861                    }
862                }
863
864                WalRecord::CreateProcedure {
865                    name,
866                    params,
867                    returns,
868                    body,
869                } => {
870                    use crate::catalog::ProcedureDefinition;
871                    let def = ProcedureDefinition {
872                        name: name.clone(),
873                        params: params.clone(),
874                        returns: returns.clone(),
875                        body: body.clone(),
876                    };
877                    let _ = catalog.register_procedure(def);
878                }
879                WalRecord::DropProcedure { name } => {
880                    let _ = catalog.drop_procedure(name);
881                }
882
883                // --- RDF triple replay ---
884                #[cfg(feature = "rdf")]
885                WalRecord::InsertRdfTriple { .. }
886                | WalRecord::DeleteRdfTriple { .. }
887                | WalRecord::ClearRdfGraph { .. }
888                | WalRecord::CreateRdfGraph { .. }
889                | WalRecord::DropRdfGraph { .. } => {
890                    rdf_ops::replay_rdf_wal_record(rdf_store, record);
891                }
892                #[cfg(not(feature = "rdf"))]
893                WalRecord::InsertRdfTriple { .. }
894                | WalRecord::DeleteRdfTriple { .. }
895                | WalRecord::ClearRdfGraph { .. }
896                | WalRecord::CreateRdfGraph { .. }
897                | WalRecord::DropRdfGraph { .. } => {}
898
899                WalRecord::TransactionCommit { .. } => {
900                    // In temporal mode, advance the store epoch on each committed
901                    // transaction so that subsequent property/label operations
902                    // are recorded at the correct epoch in their VersionLogs.
903                    #[cfg(feature = "temporal")]
904                    {
905                        target_store.new_epoch();
906                    }
907                }
908                WalRecord::TransactionAbort { .. } | WalRecord::Checkpoint { .. } => {
909                    // Transaction control records don't need replay action
910                    // (recovery already filtered to only committed transactions)
911                }
912            }
913        }
914        Ok(())
915    }
916
917    // =========================================================================
918    // Single-file format helpers
919    // =========================================================================
920
921    /// Returns `true` if the given path should use single-file format.
922    #[cfg(feature = "grafeo-file")]
923    fn should_use_single_file(
924        path: &std::path::Path,
925        configured: crate::config::StorageFormat,
926    ) -> bool {
927        use crate::config::StorageFormat;
928        match configured {
929            StorageFormat::SingleFile => true,
930            StorageFormat::WalDirectory => false,
931            StorageFormat::Auto => {
932                // Existing file: check magic bytes
933                if path.is_file() {
934                    if let Ok(mut f) = std::fs::File::open(path) {
935                        use std::io::Read;
936                        let mut magic = [0u8; 4];
937                        if f.read_exact(&mut magic).is_ok()
938                            && magic == grafeo_adapters::storage::file::MAGIC
939                        {
940                            return true;
941                        }
942                    }
943                    return false;
944                }
945                // Existing directory: legacy format
946                if path.is_dir() {
947                    return false;
948                }
949                // New path: check extension
950                path.extension().is_some_and(|ext| ext == "grafeo")
951            }
952        }
953    }
954
955    /// Applies snapshot data (from a `.grafeo` file) to restore the store and catalog.
956    #[cfg(feature = "grafeo-file")]
957    fn apply_snapshot_data(
958        store: &Arc<LpgStore>,
959        catalog: &Arc<crate::catalog::Catalog>,
960        #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
961        data: &[u8],
962    ) -> Result<()> {
963        persistence::load_snapshot_into_store(
964            store,
965            catalog,
966            #[cfg(feature = "rdf")]
967            rdf_store,
968            data,
969        )
970    }
971
972    // =========================================================================
973    // Session & Configuration
974    // =========================================================================
975
976    /// Opens a new session for running queries.
977    ///
978    /// Sessions are cheap to create: spin up as many as you need. Each
979    /// gets its own transaction context, so concurrent sessions won't
980    /// block each other on reads.
981    ///
982    /// # Panics
983    ///
984    /// Panics if the database was configured with an external graph store and
985    /// the internal arena allocator cannot be initialized (out of memory).
986    ///
987    /// # Examples
988    ///
989    /// ```
990    /// use grafeo_engine::GrafeoDB;
991    ///
992    /// let db = GrafeoDB::new_in_memory();
993    /// let session = db.session();
994    ///
995    /// // Run queries through the session
996    /// let result = session.execute("MATCH (n) RETURN count(n)")?;
997    /// # Ok::<(), grafeo_common::utils::error::Error>(())
998    /// ```
999    #[must_use]
1000    pub fn session(&self) -> Session {
1001        let session_cfg = || crate::session::SessionConfig {
1002            transaction_manager: Arc::clone(&self.transaction_manager),
1003            query_cache: Arc::clone(&self.query_cache),
1004            catalog: Arc::clone(&self.catalog),
1005            adaptive_config: self.config.adaptive.clone(),
1006            factorized_execution: self.config.factorized_execution,
1007            graph_model: self.config.graph_model,
1008            query_timeout: self.config.query_timeout,
1009            commit_counter: Arc::clone(&self.commit_counter),
1010            gc_interval: self.config.gc_interval,
1011            read_only: self.read_only,
1012        };
1013
1014        if let Some(ref ext_read) = self.external_read_store {
1015            return Session::with_external_store(
1016                Arc::clone(ext_read),
1017                self.external_write_store.as_ref().map(Arc::clone),
1018                session_cfg(),
1019            )
1020            .expect("arena allocation for external store session");
1021        }
1022
1023        #[cfg(feature = "rdf")]
1024        let mut session = Session::with_rdf_store_and_adaptive(
1025            Arc::clone(self.lpg_store()),
1026            Arc::clone(&self.rdf_store),
1027            session_cfg(),
1028        );
1029        #[cfg(not(feature = "rdf"))]
1030        let mut session = Session::with_adaptive(Arc::clone(self.lpg_store()), session_cfg());
1031
1032        #[cfg(feature = "wal")]
1033        if let Some(ref wal) = self.wal {
1034            session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
1035        }
1036
1037        #[cfg(feature = "cdc")]
1038        session.set_cdc_log(Arc::clone(&self.cdc_log));
1039
1040        #[cfg(feature = "metrics")]
1041        {
1042            if let Some(ref m) = self.metrics {
1043                session.set_metrics(Arc::clone(m));
1044                m.session_created
1045                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1046                m.session_active
1047                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1048            }
1049        }
1050
1051        // Propagate persistent graph context to the new session
1052        if let Some(ref graph) = *self.current_graph.read() {
1053            session.use_graph(graph);
1054        }
1055
1056        // Propagate persistent schema context to the new session
1057        if let Some(ref schema) = *self.current_schema.read() {
1058            session.set_schema(schema);
1059        }
1060
1061        // Suppress unused_mut when cdc/wal are disabled
1062        let _ = &mut session;
1063
1064        session
1065    }
1066
1067    /// Returns the current graph name, if any.
1068    ///
1069    /// This is the persistent graph context used by one-shot `execute()` calls.
1070    /// It is updated whenever `execute()` encounters `USE GRAPH`, `SESSION SET GRAPH`,
1071    /// or `SESSION RESET`.
1072    #[must_use]
1073    pub fn current_graph(&self) -> Option<String> {
1074        self.current_graph.read().clone()
1075    }
1076
1077    /// Sets the current graph context for subsequent one-shot `execute()` calls.
1078    ///
1079    /// This is equivalent to running `USE GRAPH <name>` but without creating a session.
1080    /// Pass `None` to reset to the default graph.
1081    pub fn set_current_graph(&self, name: Option<&str>) {
1082        *self.current_graph.write() = name.map(ToString::to_string);
1083    }
1084
1085    /// Returns the current schema name, if any.
1086    ///
1087    /// This is the persistent schema context used by one-shot `execute()` calls.
1088    /// It is updated whenever `execute()` encounters `SESSION SET SCHEMA` or `SESSION RESET`.
1089    #[must_use]
1090    pub fn current_schema(&self) -> Option<String> {
1091        self.current_schema.read().clone()
1092    }
1093
1094    /// Sets the current schema context for subsequent one-shot `execute()` calls.
1095    ///
1096    /// This is equivalent to running `SESSION SET SCHEMA <name>` but without creating
1097    /// a session. Pass `None` to clear the schema context.
1098    pub fn set_current_schema(&self, name: Option<&str>) {
1099        *self.current_schema.write() = name.map(ToString::to_string);
1100    }
1101
1102    /// Returns the adaptive execution configuration.
1103    #[must_use]
1104    pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
1105        &self.config.adaptive
1106    }
1107
1108    /// Returns `true` if this database was opened in read-only mode.
1109    #[must_use]
1110    pub fn is_read_only(&self) -> bool {
1111        self.read_only
1112    }
1113
1114    /// Returns the configuration.
1115    #[must_use]
1116    pub fn config(&self) -> &Config {
1117        &self.config
1118    }
1119
1120    /// Returns the graph data model of this database.
1121    #[must_use]
1122    pub fn graph_model(&self) -> crate::config::GraphModel {
1123        self.config.graph_model
1124    }
1125
1126    /// Returns the configured memory limit in bytes, if any.
1127    #[must_use]
1128    pub fn memory_limit(&self) -> Option<usize> {
1129        self.config.memory_limit
1130    }
1131
1132    /// Returns a point-in-time snapshot of all metrics.
1133    ///
1134    /// If the `metrics` feature is disabled or the registry is not
1135    /// initialized, returns a default (all-zero) snapshot.
1136    #[cfg(feature = "metrics")]
1137    #[must_use]
1138    pub fn metrics(&self) -> crate::metrics::MetricsSnapshot {
1139        let mut snapshot = self
1140            .metrics
1141            .as_ref()
1142            .map_or_else(crate::metrics::MetricsSnapshot::default, |m| m.snapshot());
1143
1144        // Augment with cache stats from the query cache (not tracked in the registry)
1145        let cache_stats = self.query_cache.stats();
1146        snapshot.cache_hits = cache_stats.parsed_hits + cache_stats.optimized_hits;
1147        snapshot.cache_misses = cache_stats.parsed_misses + cache_stats.optimized_misses;
1148        snapshot.cache_size = cache_stats.parsed_size + cache_stats.optimized_size;
1149        snapshot.cache_invalidations = cache_stats.invalidations;
1150
1151        snapshot
1152    }
1153
1154    /// Returns all metrics in Prometheus text exposition format.
1155    ///
1156    /// The output is ready to serve from an HTTP `/metrics` endpoint.
1157    #[cfg(feature = "metrics")]
1158    #[must_use]
1159    pub fn metrics_prometheus(&self) -> String {
1160        self.metrics
1161            .as_ref()
1162            .map_or_else(String::new, |m| m.to_prometheus())
1163    }
1164
1165    /// Resets all metrics counters and histograms to zero.
1166    #[cfg(feature = "metrics")]
1167    pub fn reset_metrics(&self) {
1168        if let Some(ref m) = self.metrics {
1169            m.reset();
1170        }
1171        self.query_cache.reset_stats();
1172    }
1173
1174    /// Returns the underlying (default) store.
1175    ///
1176    /// This provides direct access to the LPG store for algorithm implementations
1177    /// and admin operations (index management, schema introspection, MVCC internals).
1178    ///
1179    /// For code that only needs read/write graph operations, prefer
1180    /// [`graph_store()`](Self::graph_store) which returns the trait interface.
1181    #[must_use]
1182    pub fn store(&self) -> &Arc<LpgStore> {
1183        self.lpg_store()
1184    }
1185
1186    /// Returns the LPG store for the currently active graph.
1187    ///
1188    /// If [`current_graph`](Self::current_graph) is `None` or `"default"`, returns
1189    /// the default store. Otherwise looks up the named graph in the root store.
1190    /// Falls back to the default store if the named graph does not exist.
1191    #[allow(dead_code)] // Reserved for future graph-aware CRUD methods
1192    fn active_store(&self) -> Arc<LpgStore> {
1193        let store = self.lpg_store();
1194        let graph_name = self.current_graph.read().clone();
1195        match graph_name {
1196            None => Arc::clone(store),
1197            Some(ref name) if name.eq_ignore_ascii_case("default") => Arc::clone(store),
1198            Some(ref name) => store.graph(name).unwrap_or_else(|| Arc::clone(store)),
1199        }
1200    }
1201
1202    // === Named Graph Management ===
1203
1204    /// Creates a named graph. Returns `true` if created, `false` if it already exists.
1205    ///
1206    /// # Errors
1207    ///
1208    /// Returns an error if arena allocation fails.
1209    pub fn create_graph(&self, name: &str) -> Result<bool> {
1210        Ok(self.lpg_store().create_graph(name)?)
1211    }
1212
1213    /// Drops a named graph. Returns `true` if dropped, `false` if it did not exist.
1214    pub fn drop_graph(&self, name: &str) -> bool {
1215        self.lpg_store().drop_graph(name)
1216    }
1217
1218    /// Returns all named graph names.
1219    #[must_use]
1220    pub fn list_graphs(&self) -> Vec<String> {
1221        self.lpg_store().graph_names()
1222    }
1223
1224    /// Returns the graph store as a trait object.
1225    ///
1226    /// Returns a read-only trait object for the active graph store.
1227    ///
1228    /// This provides the [`GraphStore`] interface for code that only needs
1229    /// read operations. For write access, use [`graph_store_mut()`](Self::graph_store_mut).
1230    ///
1231    /// [`GraphStore`]: grafeo_core::graph::GraphStore
1232    #[must_use]
1233    pub fn graph_store(&self) -> Arc<dyn GraphStore> {
1234        if let Some(ref ext_read) = self.external_read_store {
1235            Arc::clone(ext_read)
1236        } else {
1237            Arc::clone(self.lpg_store()) as Arc<dyn GraphStore>
1238        }
1239    }
1240
1241    /// Returns the writable graph store, if available.
1242    ///
1243    /// Returns `None` for read-only databases created via
1244    /// [`with_read_store()`](Self::with_read_store).
1245    #[must_use]
1246    pub fn graph_store_mut(&self) -> Option<Arc<dyn GraphStoreMut>> {
1247        if self.external_read_store.is_some() {
1248            self.external_write_store.as_ref().map(Arc::clone)
1249        } else {
1250            Some(Arc::clone(self.lpg_store()) as Arc<dyn GraphStoreMut>)
1251        }
1252    }
1253
1254    /// Garbage collects old MVCC versions that are no longer visible.
1255    ///
1256    /// Determines the minimum epoch required by active transactions and prunes
1257    /// version chains older than that threshold. Also cleans up completed
1258    /// transaction metadata in the transaction manager.
1259    pub fn gc(&self) {
1260        let min_epoch = self.transaction_manager.min_active_epoch();
1261        self.lpg_store().gc_versions(min_epoch);
1262        self.transaction_manager.gc();
1263    }
1264
1265    /// Returns the buffer manager for memory-aware operations.
1266    #[must_use]
1267    pub fn buffer_manager(&self) -> &Arc<BufferManager> {
1268        &self.buffer_manager
1269    }
1270
1271    /// Returns the query cache.
1272    #[must_use]
1273    pub fn query_cache(&self) -> &Arc<QueryCache> {
1274        &self.query_cache
1275    }
1276
1277    /// Clears all cached query plans.
1278    ///
1279    /// This is called automatically after DDL operations, but can also be
1280    /// invoked manually after external schema changes (e.g., WAL replay,
1281    /// import) or when you want to force re-optimization of all queries.
1282    pub fn clear_plan_cache(&self) {
1283        self.query_cache.clear();
1284    }
1285
1286    // =========================================================================
1287    // Lifecycle
1288    // =========================================================================
1289
1290    /// Closes the database, flushing all pending writes.
1291    ///
1292    /// For persistent databases, this ensures everything is safely on disk.
1293    /// Called automatically when the database is dropped, but you can call
1294    /// it explicitly if you need to guarantee durability at a specific point.
1295    ///
1296    /// # Errors
1297    ///
1298    /// Returns an error if the WAL can't be flushed (check disk space/permissions).
1299    pub fn close(&self) -> Result<()> {
1300        let mut is_open = self.is_open.write();
1301        if !*is_open {
1302            return Ok(());
1303        }
1304
1305        // Read-only databases: just release the shared lock, no checkpointing
1306        if self.read_only {
1307            #[cfg(feature = "grafeo-file")]
1308            if let Some(ref fm) = self.file_manager {
1309                fm.close()?;
1310            }
1311            *is_open = false;
1312            return Ok(());
1313        }
1314
1315        // For single-file format: checkpoint to .grafeo file, then clean up sidecar WAL.
1316        // We must do this BEFORE the WAL close path because checkpoint_to_file
1317        // removes the sidecar WAL directory.
1318        #[cfg(feature = "grafeo-file")]
1319        let is_single_file = self.file_manager.is_some();
1320        #[cfg(not(feature = "grafeo-file"))]
1321        let is_single_file = false;
1322
1323        #[cfg(feature = "grafeo-file")]
1324        if let Some(ref fm) = self.file_manager {
1325            // Flush WAL first so all records are on disk before we snapshot
1326            #[cfg(feature = "wal")]
1327            if let Some(ref wal) = self.wal {
1328                wal.sync()?;
1329            }
1330            self.checkpoint_to_file(fm)?;
1331
1332            // Release WAL file handles before removing sidecar directory.
1333            // On Windows, open handles prevent directory deletion.
1334            #[cfg(feature = "wal")]
1335            if let Some(ref wal) = self.wal {
1336                wal.close_active_log();
1337            }
1338
1339            {
1340                use grafeo_core::testing::crash::maybe_crash;
1341                maybe_crash("close:before_remove_sidecar_wal");
1342            }
1343            fm.remove_sidecar_wal()?;
1344            fm.close()?;
1345        }
1346
1347        // Commit and sync WAL (legacy directory format only).
1348        // We intentionally do NOT call wal.checkpoint() here. Directory format
1349        // has no snapshot: the WAL files are the sole source of truth. Writing
1350        // checkpoint.meta would cause recovery to skip older WAL files, losing
1351        // all data that predates the current log sequence.
1352        #[cfg(feature = "wal")]
1353        if !is_single_file && let Some(ref wal) = self.wal {
1354            // Use the last assigned transaction ID, or create one for the commit record
1355            let commit_tx = self
1356                .transaction_manager
1357                .last_assigned_transaction_id()
1358                .unwrap_or_else(|| self.transaction_manager.begin());
1359
1360            // Log a TransactionCommit to mark all pending records as committed
1361            wal.log(&WalRecord::TransactionCommit {
1362                transaction_id: commit_tx,
1363            })?;
1364
1365            wal.sync()?;
1366        }
1367
1368        *is_open = false;
1369        Ok(())
1370    }
1371
1372    /// Returns the typed WAL if available.
1373    #[cfg(feature = "wal")]
1374    #[must_use]
1375    pub fn wal(&self) -> Option<&Arc<LpgWal>> {
1376        self.wal.as_ref()
1377    }
1378
1379    /// Logs a WAL record if WAL is enabled.
1380    #[cfg(feature = "wal")]
1381    pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
1382        if let Some(ref wal) = self.wal {
1383            wal.log(record)?;
1384        }
1385        Ok(())
1386    }
1387
1388    /// Writes the current database snapshot to the `.grafeo` file.
1389    ///
1390    /// Does NOT remove the sidecar WAL: callers that want to clean up
1391    /// the sidecar (e.g. `close()`) should call `fm.remove_sidecar_wal()`
1392    /// separately after this returns.
1393    #[cfg(feature = "grafeo-file")]
1394    fn checkpoint_to_file(&self, fm: &GrafeoFileManager) -> Result<()> {
1395        use grafeo_core::testing::crash::maybe_crash;
1396
1397        maybe_crash("checkpoint_to_file:before_export");
1398        let snapshot_data = self.export_snapshot()?;
1399        maybe_crash("checkpoint_to_file:after_export");
1400
1401        let epoch = self.lpg_store().current_epoch();
1402        let transaction_id = self
1403            .transaction_manager
1404            .last_assigned_transaction_id()
1405            .map_or(0, |t| t.0);
1406        let node_count = self.lpg_store().node_count() as u64;
1407        let edge_count = self.lpg_store().edge_count() as u64;
1408
1409        fm.write_snapshot(
1410            &snapshot_data,
1411            epoch.0,
1412            transaction_id,
1413            node_count,
1414            edge_count,
1415        )?;
1416
1417        maybe_crash("checkpoint_to_file:after_write_snapshot");
1418        Ok(())
1419    }
1420
1421    /// Returns the file manager if using single-file format.
1422    #[cfg(feature = "grafeo-file")]
1423    #[must_use]
1424    pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
1425        self.file_manager.as_ref()
1426    }
1427}
1428
1429impl Drop for GrafeoDB {
1430    fn drop(&mut self) {
1431        if let Err(e) = self.close() {
1432            grafeo_error!("Error closing database: {}", e);
1433        }
1434    }
1435}
1436
1437impl crate::admin::AdminService for GrafeoDB {
1438    fn info(&self) -> crate::admin::DatabaseInfo {
1439        self.info()
1440    }
1441
1442    fn detailed_stats(&self) -> crate::admin::DatabaseStats {
1443        self.detailed_stats()
1444    }
1445
1446    fn schema(&self) -> crate::admin::SchemaInfo {
1447        self.schema()
1448    }
1449
1450    fn validate(&self) -> crate::admin::ValidationResult {
1451        self.validate()
1452    }
1453
1454    fn wal_status(&self) -> crate::admin::WalStatus {
1455        self.wal_status()
1456    }
1457
1458    fn wal_checkpoint(&self) -> Result<()> {
1459        self.wal_checkpoint()
1460    }
1461}
1462
1463// =========================================================================
1464// Query Result Types
1465// =========================================================================
1466
1467/// The result of running a query.
1468///
1469/// Contains rows and columns, like a table. Use [`iter()`](Self::iter) to
1470/// loop through rows, or [`scalar()`](Self::scalar) if you expect a single value.
1471///
1472/// # Examples
1473///
1474/// ```
1475/// use grafeo_engine::GrafeoDB;
1476///
1477/// let db = GrafeoDB::new_in_memory();
1478/// db.create_node(&["Person"]);
1479///
1480/// let result = db.execute("MATCH (p:Person) RETURN count(p) AS total")?;
1481///
1482/// // Check what we got
1483/// println!("Columns: {:?}", result.columns);
1484/// println!("Rows: {}", result.row_count());
1485///
1486/// // Iterate through results
1487/// for row in result.iter() {
1488///     println!("{:?}", row);
1489/// }
1490/// # Ok::<(), grafeo_common::utils::error::Error>(())
1491/// ```
1492#[derive(Debug)]
1493pub struct QueryResult {
1494    /// Column names from the RETURN clause.
1495    pub columns: Vec<String>,
1496    /// Column types - useful for distinguishing NodeId/EdgeId from plain integers.
1497    pub column_types: Vec<grafeo_common::types::LogicalType>,
1498    /// The actual result rows.
1499    pub rows: Vec<Vec<grafeo_common::types::Value>>,
1500    /// Query execution time in milliseconds (if timing was enabled).
1501    pub execution_time_ms: Option<f64>,
1502    /// Number of rows scanned during query execution (estimate).
1503    pub rows_scanned: Option<u64>,
1504    /// Status message for DDL and session commands (e.g., "Created node type 'Person'").
1505    pub status_message: Option<String>,
1506    /// GQLSTATUS code per ISO/IEC 39075:2024, sec 23.
1507    pub gql_status: grafeo_common::utils::GqlStatus,
1508}
1509
1510impl QueryResult {
1511    /// Creates a fully empty query result (no columns, no rows).
1512    #[must_use]
1513    pub fn empty() -> Self {
1514        Self {
1515            columns: Vec::new(),
1516            column_types: Vec::new(),
1517            rows: Vec::new(),
1518            execution_time_ms: None,
1519            rows_scanned: None,
1520            status_message: None,
1521            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1522        }
1523    }
1524
1525    /// Creates a query result with only a status message (for DDL commands).
1526    #[must_use]
1527    pub fn status(msg: impl Into<String>) -> Self {
1528        Self {
1529            columns: Vec::new(),
1530            column_types: Vec::new(),
1531            rows: Vec::new(),
1532            execution_time_ms: None,
1533            rows_scanned: None,
1534            status_message: Some(msg.into()),
1535            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1536        }
1537    }
1538
1539    /// Creates a new empty query result.
1540    #[must_use]
1541    pub fn new(columns: Vec<String>) -> Self {
1542        let len = columns.len();
1543        Self {
1544            columns,
1545            column_types: vec![grafeo_common::types::LogicalType::Any; len],
1546            rows: Vec::new(),
1547            execution_time_ms: None,
1548            rows_scanned: None,
1549            status_message: None,
1550            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1551        }
1552    }
1553
1554    /// Creates a new empty query result with column types.
1555    #[must_use]
1556    pub fn with_types(
1557        columns: Vec<String>,
1558        column_types: Vec<grafeo_common::types::LogicalType>,
1559    ) -> Self {
1560        Self {
1561            columns,
1562            column_types,
1563            rows: Vec::new(),
1564            execution_time_ms: None,
1565            rows_scanned: None,
1566            status_message: None,
1567            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1568        }
1569    }
1570
1571    /// Sets the execution metrics on this result.
1572    pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
1573        self.execution_time_ms = Some(execution_time_ms);
1574        self.rows_scanned = Some(rows_scanned);
1575        self
1576    }
1577
1578    /// Returns the execution time in milliseconds, if available.
1579    #[must_use]
1580    pub fn execution_time_ms(&self) -> Option<f64> {
1581        self.execution_time_ms
1582    }
1583
1584    /// Returns the number of rows scanned, if available.
1585    #[must_use]
1586    pub fn rows_scanned(&self) -> Option<u64> {
1587        self.rows_scanned
1588    }
1589
1590    /// Returns the number of rows.
1591    #[must_use]
1592    pub fn row_count(&self) -> usize {
1593        self.rows.len()
1594    }
1595
1596    /// Returns the number of columns.
1597    #[must_use]
1598    pub fn column_count(&self) -> usize {
1599        self.columns.len()
1600    }
1601
1602    /// Returns true if the result is empty.
1603    #[must_use]
1604    pub fn is_empty(&self) -> bool {
1605        self.rows.is_empty()
1606    }
1607
1608    /// Extracts a single value from the result.
1609    ///
1610    /// Use this when your query returns exactly one row with one column,
1611    /// like `RETURN count(n)` or `RETURN sum(p.amount)`.
1612    ///
1613    /// # Errors
1614    ///
1615    /// Returns an error if the result has multiple rows or columns.
1616    pub fn scalar<T: FromValue>(&self) -> Result<T> {
1617        if self.rows.len() != 1 || self.columns.len() != 1 {
1618            return Err(grafeo_common::utils::error::Error::InvalidValue(
1619                "Expected single value".to_string(),
1620            ));
1621        }
1622        T::from_value(&self.rows[0][0])
1623    }
1624
1625    /// Returns an iterator over the rows.
1626    pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1627        self.rows.iter()
1628    }
1629}
1630
1631impl std::fmt::Display for QueryResult {
1632    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1633        let table = grafeo_common::fmt::format_result_table(
1634            &self.columns,
1635            &self.rows,
1636            self.execution_time_ms,
1637            self.status_message.as_deref(),
1638        );
1639        f.write_str(&table)
1640    }
1641}
1642
1643/// Converts a [`grafeo_common::types::Value`] to a concrete Rust type.
1644///
1645/// Implemented for common types like `i64`, `f64`, `String`, and `bool`.
1646/// Used by [`QueryResult::scalar()`] to extract typed values.
1647pub trait FromValue: Sized {
1648    /// Attempts the conversion, returning an error on type mismatch.
1649    fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1650}
1651
1652impl FromValue for i64 {
1653    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1654        value
1655            .as_int64()
1656            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1657                expected: "INT64".to_string(),
1658                found: value.type_name().to_string(),
1659            })
1660    }
1661}
1662
1663impl FromValue for f64 {
1664    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1665        value
1666            .as_float64()
1667            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1668                expected: "FLOAT64".to_string(),
1669                found: value.type_name().to_string(),
1670            })
1671    }
1672}
1673
1674impl FromValue for String {
1675    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1676        value.as_str().map(String::from).ok_or_else(|| {
1677            grafeo_common::utils::error::Error::TypeMismatch {
1678                expected: "STRING".to_string(),
1679                found: value.type_name().to_string(),
1680            }
1681        })
1682    }
1683}
1684
1685impl FromValue for bool {
1686    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1687        value
1688            .as_bool()
1689            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1690                expected: "BOOL".to_string(),
1691                found: value.type_name().to_string(),
1692            })
1693    }
1694}
1695
1696#[cfg(test)]
1697mod tests {
1698    use super::*;
1699
1700    #[test]
1701    fn test_create_in_memory_database() {
1702        let db = GrafeoDB::new_in_memory();
1703        assert_eq!(db.node_count(), 0);
1704        assert_eq!(db.edge_count(), 0);
1705    }
1706
1707    #[test]
1708    fn test_database_config() {
1709        let config = Config::in_memory().with_threads(4).with_query_logging();
1710
1711        let db = GrafeoDB::with_config(config).unwrap();
1712        assert_eq!(db.config().threads, 4);
1713        assert!(db.config().query_logging);
1714    }
1715
1716    #[test]
1717    fn test_database_session() {
1718        let db = GrafeoDB::new_in_memory();
1719        let _session = db.session();
1720        // Session should be created successfully
1721    }
1722
1723    #[cfg(feature = "wal")]
1724    #[test]
1725    fn test_persistent_database_recovery() {
1726        use grafeo_common::types::Value;
1727        use tempfile::tempdir;
1728
1729        let dir = tempdir().unwrap();
1730        let db_path = dir.path().join("test_db");
1731
1732        // Create database and add some data
1733        {
1734            let db = GrafeoDB::open(&db_path).unwrap();
1735
1736            let alix = db.create_node(&["Person"]);
1737            db.set_node_property(alix, "name", Value::from("Alix"));
1738
1739            let gus = db.create_node(&["Person"]);
1740            db.set_node_property(gus, "name", Value::from("Gus"));
1741
1742            let _edge = db.create_edge(alix, gus, "KNOWS");
1743
1744            // Explicitly close to flush WAL
1745            db.close().unwrap();
1746        }
1747
1748        // Reopen and verify data was recovered
1749        {
1750            let db = GrafeoDB::open(&db_path).unwrap();
1751
1752            assert_eq!(db.node_count(), 2);
1753            assert_eq!(db.edge_count(), 1);
1754
1755            // Verify nodes exist
1756            let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1757            assert!(node0.is_some());
1758
1759            let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1760            assert!(node1.is_some());
1761        }
1762    }
1763
1764    #[cfg(feature = "wal")]
1765    #[test]
1766    fn test_wal_logging() {
1767        use tempfile::tempdir;
1768
1769        let dir = tempdir().unwrap();
1770        let db_path = dir.path().join("wal_test_db");
1771
1772        let db = GrafeoDB::open(&db_path).unwrap();
1773
1774        // Create some data
1775        let node = db.create_node(&["Test"]);
1776        db.delete_node(node);
1777
1778        // WAL should have records
1779        if let Some(wal) = db.wal() {
1780            assert!(wal.record_count() > 0);
1781        }
1782
1783        db.close().unwrap();
1784    }
1785
1786    #[cfg(feature = "wal")]
1787    #[test]
1788    fn test_wal_recovery_multiple_sessions() {
1789        // Tests that WAL recovery works correctly across multiple open/close cycles
1790        use grafeo_common::types::Value;
1791        use tempfile::tempdir;
1792
1793        let dir = tempdir().unwrap();
1794        let db_path = dir.path().join("multi_session_db");
1795
1796        // Session 1: Create initial data
1797        {
1798            let db = GrafeoDB::open(&db_path).unwrap();
1799            let alix = db.create_node(&["Person"]);
1800            db.set_node_property(alix, "name", Value::from("Alix"));
1801            db.close().unwrap();
1802        }
1803
1804        // Session 2: Add more data
1805        {
1806            let db = GrafeoDB::open(&db_path).unwrap();
1807            assert_eq!(db.node_count(), 1); // Previous data recovered
1808            let gus = db.create_node(&["Person"]);
1809            db.set_node_property(gus, "name", Value::from("Gus"));
1810            db.close().unwrap();
1811        }
1812
1813        // Session 3: Verify all data
1814        {
1815            let db = GrafeoDB::open(&db_path).unwrap();
1816            assert_eq!(db.node_count(), 2);
1817
1818            // Verify properties were recovered correctly
1819            let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
1820            assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
1821
1822            let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
1823            assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
1824        }
1825    }
1826
1827    #[cfg(feature = "wal")]
1828    #[test]
1829    fn test_database_consistency_after_mutations() {
1830        // Tests that database remains consistent after a series of create/delete operations
1831        use grafeo_common::types::Value;
1832        use tempfile::tempdir;
1833
1834        let dir = tempdir().unwrap();
1835        let db_path = dir.path().join("consistency_db");
1836
1837        {
1838            let db = GrafeoDB::open(&db_path).unwrap();
1839
1840            // Create nodes
1841            let a = db.create_node(&["Node"]);
1842            let b = db.create_node(&["Node"]);
1843            let c = db.create_node(&["Node"]);
1844
1845            // Create edges
1846            let e1 = db.create_edge(a, b, "LINKS");
1847            let _e2 = db.create_edge(b, c, "LINKS");
1848
1849            // Delete middle node and its edge
1850            db.delete_edge(e1);
1851            db.delete_node(b);
1852
1853            // Set properties on remaining nodes
1854            db.set_node_property(a, "value", Value::Int64(1));
1855            db.set_node_property(c, "value", Value::Int64(3));
1856
1857            db.close().unwrap();
1858        }
1859
1860        // Reopen and verify consistency
1861        {
1862            let db = GrafeoDB::open(&db_path).unwrap();
1863
1864            // Should have 2 nodes (a and c), b was deleted
1865            // Note: node_count includes deleted nodes in some implementations
1866            // What matters is that the non-deleted nodes are accessible
1867            let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
1868            assert!(node_a.is_some());
1869
1870            let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
1871            assert!(node_c.is_some());
1872
1873            // Middle node should be deleted
1874            let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
1875            assert!(node_b.is_none());
1876        }
1877    }
1878
1879    #[cfg(feature = "wal")]
1880    #[test]
1881    fn test_close_is_idempotent() {
1882        // Calling close() multiple times should not cause errors
1883        use tempfile::tempdir;
1884
1885        let dir = tempdir().unwrap();
1886        let db_path = dir.path().join("close_test_db");
1887
1888        let db = GrafeoDB::open(&db_path).unwrap();
1889        db.create_node(&["Test"]);
1890
1891        // First close should succeed
1892        assert!(db.close().is_ok());
1893
1894        // Second close should also succeed (idempotent)
1895        assert!(db.close().is_ok());
1896    }
1897
1898    #[test]
1899    fn test_with_store_external_backend() {
1900        use grafeo_core::graph::lpg::LpgStore;
1901
1902        let external = Arc::new(LpgStore::new().unwrap());
1903
1904        // Seed data on the external store directly
1905        let n1 = external.create_node(&["Person"]);
1906        external.set_node_property(n1, "name", grafeo_common::types::Value::from("Alix"));
1907
1908        let db = GrafeoDB::with_store(
1909            Arc::clone(&external) as Arc<dyn GraphStoreMut>,
1910            Config::in_memory(),
1911        )
1912        .unwrap();
1913
1914        let session = db.session();
1915
1916        // Session should see data from the external store via execute
1917        #[cfg(feature = "gql")]
1918        {
1919            let result = session.execute("MATCH (p:Person) RETURN p.name").unwrap();
1920            assert_eq!(result.rows.len(), 1);
1921        }
1922    }
1923
1924    #[test]
1925    fn test_with_config_custom_memory_limit() {
1926        let config = Config::in_memory().with_memory_limit(64 * 1024 * 1024); // 64 MB
1927
1928        let db = GrafeoDB::with_config(config).unwrap();
1929        assert_eq!(db.config().memory_limit, Some(64 * 1024 * 1024));
1930        assert_eq!(db.node_count(), 0);
1931    }
1932
1933    #[cfg(feature = "metrics")]
1934    #[test]
1935    fn test_database_metrics_registry() {
1936        let db = GrafeoDB::new_in_memory();
1937
1938        // Perform some operations
1939        db.create_node(&["Person"]);
1940        db.create_node(&["Person"]);
1941
1942        // Check that metrics snapshot returns data
1943        let snap = db.metrics();
1944        // Session created counter should reflect at least 0 (metrics is initialized)
1945        assert_eq!(snap.query_count, 0); // No queries executed yet
1946    }
1947
1948    #[test]
1949    fn test_query_result_has_metrics() {
1950        // Verifies that query results include execution metrics
1951        let db = GrafeoDB::new_in_memory();
1952        db.create_node(&["Person"]);
1953        db.create_node(&["Person"]);
1954
1955        #[cfg(feature = "gql")]
1956        {
1957            let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
1958
1959            // Metrics should be populated
1960            assert!(result.execution_time_ms.is_some());
1961            assert!(result.rows_scanned.is_some());
1962            assert!(result.execution_time_ms.unwrap() >= 0.0);
1963            assert_eq!(result.rows_scanned.unwrap(), 2);
1964        }
1965    }
1966
1967    #[test]
1968    fn test_empty_query_result_metrics() {
1969        // Verifies metrics are correct for queries returning no results
1970        let db = GrafeoDB::new_in_memory();
1971        db.create_node(&["Person"]);
1972
1973        #[cfg(feature = "gql")]
1974        {
1975            // Query that matches nothing
1976            let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
1977
1978            assert!(result.execution_time_ms.is_some());
1979            assert!(result.rows_scanned.is_some());
1980            assert_eq!(result.rows_scanned.unwrap(), 0);
1981        }
1982    }
1983
1984    #[cfg(feature = "cdc")]
1985    mod cdc_integration {
1986        use super::*;
1987
1988        #[test]
1989        fn test_node_lifecycle_history() {
1990            let db = GrafeoDB::new_in_memory();
1991
1992            // Create
1993            let id = db.create_node(&["Person"]);
1994            // Update
1995            db.set_node_property(id, "name", "Alix".into());
1996            db.set_node_property(id, "name", "Gus".into());
1997            // Delete
1998            db.delete_node(id);
1999
2000            let history = db.history(id).unwrap();
2001            assert_eq!(history.len(), 4); // create + 2 updates + delete
2002            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2003            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
2004            assert!(history[1].before.is_none()); // first set_node_property has no prior value
2005            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
2006            assert!(history[2].before.is_some()); // second update has prior "Alix"
2007            assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
2008        }
2009
2010        #[test]
2011        fn test_edge_lifecycle_history() {
2012            let db = GrafeoDB::new_in_memory();
2013
2014            let alix = db.create_node(&["Person"]);
2015            let gus = db.create_node(&["Person"]);
2016            let edge = db.create_edge(alix, gus, "KNOWS");
2017            db.set_edge_property(edge, "since", 2024i64.into());
2018            db.delete_edge(edge);
2019
2020            let history = db.history(edge).unwrap();
2021            assert_eq!(history.len(), 3); // create + update + delete
2022            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2023            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
2024            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
2025        }
2026
2027        #[test]
2028        fn test_create_node_with_props_cdc() {
2029            let db = GrafeoDB::new_in_memory();
2030
2031            let id = db.create_node_with_props(
2032                &["Person"],
2033                vec![
2034                    ("name", grafeo_common::types::Value::from("Alix")),
2035                    ("age", grafeo_common::types::Value::from(30i64)),
2036                ],
2037            );
2038
2039            let history = db.history(id).unwrap();
2040            assert_eq!(history.len(), 1);
2041            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2042            // Props should be captured
2043            let after = history[0].after.as_ref().unwrap();
2044            assert_eq!(after.len(), 2);
2045        }
2046
2047        #[test]
2048        fn test_changes_between() {
2049            let db = GrafeoDB::new_in_memory();
2050
2051            let id1 = db.create_node(&["A"]);
2052            let _id2 = db.create_node(&["B"]);
2053            db.set_node_property(id1, "x", 1i64.into());
2054
2055            // All events should be at the same epoch (in-memory, epoch doesn't advance without tx)
2056            let changes = db
2057                .changes_between(
2058                    grafeo_common::types::EpochId(0),
2059                    grafeo_common::types::EpochId(u64::MAX),
2060                )
2061                .unwrap();
2062            assert_eq!(changes.len(), 3); // 2 creates + 1 update
2063        }
2064    }
2065
2066    #[test]
2067    fn test_with_store_basic() {
2068        use grafeo_core::graph::lpg::LpgStore;
2069
2070        let store = Arc::new(LpgStore::new().unwrap());
2071        let n1 = store.create_node(&["Person"]);
2072        store.set_node_property(n1, "name", "Alix".into());
2073
2074        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
2075        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
2076
2077        let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
2078        assert_eq!(result.rows.len(), 1);
2079    }
2080
2081    #[test]
2082    fn test_with_store_session() {
2083        use grafeo_core::graph::lpg::LpgStore;
2084
2085        let store = Arc::new(LpgStore::new().unwrap());
2086        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
2087        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
2088
2089        let session = db.session();
2090        let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
2091        assert_eq!(result.rows.len(), 1);
2092    }
2093
2094    #[test]
2095    fn test_with_store_mutations() {
2096        use grafeo_core::graph::lpg::LpgStore;
2097
2098        let store = Arc::new(LpgStore::new().unwrap());
2099        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
2100        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
2101
2102        let mut session = db.session();
2103
2104        // Use an explicit transaction so INSERT and MATCH share the same
2105        // transaction context. With PENDING epochs, uncommitted versions are
2106        // only visible to the owning transaction.
2107        session.begin_transaction().unwrap();
2108        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
2109
2110        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
2111        assert_eq!(result.rows.len(), 1);
2112
2113        session.commit().unwrap();
2114    }
2115}