Skip to main content

grafeo_engine/database/
mod.rs

1//! The main database struct and operations.
2//!
3//! Start here with [`GrafeoDB`] - it's your handle to everything.
4//!
5//! Operations are split across focused submodules:
6//! - `query` - Query execution (execute, execute_cypher, etc.)
7//! - `crud` - Node/edge CRUD operations
8//! - `index` - Property, vector, and text index management
9//! - `search` - Vector, text, and hybrid search
10//! - `embed` - Embedding model management
11//! - `persistence` - Save, load, snapshots, iteration
12//! - `admin` - Stats, introspection, diagnostics, CDC
13
14mod admin;
15#[cfg(feature = "async-storage")]
16mod async_ops;
17#[cfg(feature = "async-storage")]
18pub(crate) mod async_wal_store;
19#[cfg(feature = "cdc")]
20pub(crate) mod cdc_store;
21mod crud;
22#[cfg(feature = "embed")]
23mod embed;
24mod index;
25mod persistence;
26mod query;
27#[cfg(feature = "rdf")]
28mod rdf_ops;
29mod search;
30#[cfg(feature = "wal")]
31pub(crate) mod wal_store;
32
33use grafeo_common::grafeo_error;
34#[cfg(feature = "wal")]
35use std::path::Path;
36use std::sync::Arc;
37use std::sync::atomic::AtomicUsize;
38
39use parking_lot::RwLock;
40
41#[cfg(feature = "grafeo-file")]
42use grafeo_adapters::storage::file::GrafeoFileManager;
43#[cfg(feature = "wal")]
44use grafeo_adapters::storage::wal::{
45    DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord, WalRecovery,
46};
47use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
48use grafeo_common::utils::error::Result;
49use grafeo_core::graph::lpg::LpgStore;
50#[cfg(feature = "rdf")]
51use grafeo_core::graph::rdf::RdfStore;
52use grafeo_core::graph::{GraphStore, GraphStoreMut};
53
54use crate::catalog::Catalog;
55use crate::config::Config;
56use crate::query::cache::QueryCache;
57use crate::session::Session;
58use crate::transaction::TransactionManager;
59
60/// Your handle to a Grafeo database.
61///
62/// Start here. Create one with [`new_in_memory()`](Self::new_in_memory) for
63/// quick experiments, or [`open()`](Self::open) for persistent storage.
64/// Then grab a [`session()`](Self::session) to start querying.
65///
66/// # Examples
67///
68/// ```
69/// use grafeo_engine::GrafeoDB;
70///
71/// // Quick in-memory database
72/// let db = GrafeoDB::new_in_memory();
73///
74/// // Add some data
75/// db.create_node(&["Person"]);
76///
77/// // Query it
78/// let session = db.session();
79/// let result = session.execute("MATCH (p:Person) RETURN p")?;
80/// # Ok::<(), grafeo_common::utils::error::Error>(())
81/// ```
82pub struct GrafeoDB {
83    /// Database configuration.
84    pub(super) config: Config,
85    /// The underlying graph store (None when using an external store).
86    pub(super) store: Option<Arc<LpgStore>>,
87    /// Schema and metadata catalog shared across sessions.
88    pub(super) catalog: Arc<Catalog>,
89    /// RDF triple store (if RDF feature is enabled).
90    #[cfg(feature = "rdf")]
91    pub(super) rdf_store: Arc<RdfStore>,
92    /// Transaction manager.
93    pub(super) transaction_manager: Arc<TransactionManager>,
94    /// Unified buffer manager.
95    pub(super) buffer_manager: Arc<BufferManager>,
96    /// Write-ahead log manager (if durability is enabled).
97    #[cfg(feature = "wal")]
98    pub(super) wal: Option<Arc<LpgWal>>,
99    /// Shared WAL graph context tracker. Tracks which named graph was last
100    /// written to the WAL, so concurrent sessions can emit `SwitchGraph`
101    /// records only when the context actually changes.
102    #[cfg(feature = "wal")]
103    pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
104    /// Query cache for parsed and optimized plans.
105    pub(super) query_cache: Arc<QueryCache>,
106    /// Shared commit counter for auto-GC across sessions.
107    pub(super) commit_counter: Arc<AtomicUsize>,
108    /// Whether the database is open.
109    pub(super) is_open: RwLock<bool>,
110    /// Change data capture log for tracking mutations.
111    #[cfg(feature = "cdc")]
112    pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
113    /// Whether CDC is active for new sessions and direct CRUD (runtime-mutable).
114    #[cfg(feature = "cdc")]
115    cdc_enabled: std::sync::atomic::AtomicBool,
116    /// Registered embedding models for text-to-vector conversion.
117    #[cfg(feature = "embed")]
118    pub(super) embedding_models:
119        RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
120    /// Single-file database manager (when using `.grafeo` format).
121    #[cfg(feature = "grafeo-file")]
122    pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
123    /// External read-only graph store (when using with_store() or with_read_store()).
124    /// When set, sessions route queries through this store instead of the built-in LpgStore.
125    pub(super) external_read_store: Option<Arc<dyn GraphStore>>,
126    /// External writable graph store (when using with_store()).
127    /// None for read-only databases created via with_read_store().
128    pub(super) external_write_store: Option<Arc<dyn GraphStoreMut>>,
129    /// Metrics registry shared across all sessions.
130    #[cfg(feature = "metrics")]
131    pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
132    /// Persistent graph context for one-shot `execute()` calls.
133    /// When set, each call to `session()` pre-configures the session to this graph.
134    /// Updated after every one-shot `execute()` to reflect `USE GRAPH` / `SESSION RESET`.
135    current_graph: RwLock<Option<String>>,
136    /// Persistent schema context for one-shot `execute()` calls.
137    /// When set, each call to `session()` pre-configures the session to this schema.
138    /// Updated after every one-shot `execute()` to reflect `SESSION SET SCHEMA` / `SESSION RESET`.
139    current_schema: RwLock<Option<String>>,
140    /// Whether this database is open in read-only mode.
141    /// When true, sessions automatically enforce read-only transactions.
142    read_only: bool,
143}
144
145impl GrafeoDB {
146    /// Returns a reference to the built-in LPG store.
147    ///
148    /// # Panics
149    ///
150    /// Panics if the database was created with [`with_store()`](Self::with_store) or
151    /// [`with_read_store()`](Self::with_read_store), which use an external store
152    /// instead of the built-in LPG store.
153    fn lpg_store(&self) -> &Arc<LpgStore> {
154        self.store.as_ref().expect(
155            "no built-in LpgStore: this GrafeoDB was created with an external store \
156             (with_store / with_read_store). Use session() or graph_store() instead.",
157        )
158    }
159
160    /// Returns whether CDC is active (runtime check).
161    #[cfg(feature = "cdc")]
162    #[inline]
163    pub(super) fn cdc_active(&self) -> bool {
164        self.cdc_enabled.load(std::sync::atomic::Ordering::Relaxed)
165    }
166
167    /// Creates an in-memory database, fast to create, gone when dropped.
168    ///
169    /// Use this for tests, experiments, or when you don't need persistence.
170    /// For data that survives restarts, use [`open()`](Self::open) instead.
171    ///
172    /// # Panics
173    ///
174    /// Panics if the internal arena allocator cannot be initialized (out of memory).
175    /// Use [`with_config()`](Self::with_config) for a fallible alternative.
176    ///
177    /// # Examples
178    ///
179    /// ```
180    /// use grafeo_engine::GrafeoDB;
181    ///
182    /// let db = GrafeoDB::new_in_memory();
183    /// let session = db.session();
184    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
185    /// # Ok::<(), grafeo_common::utils::error::Error>(())
186    /// ```
187    #[must_use]
188    pub fn new_in_memory() -> Self {
189        Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
190    }
191
192    /// Opens a database at the given path, creating it if it doesn't exist.
193    ///
194    /// If you've used this path before, Grafeo recovers your data from the
195    /// write-ahead log automatically. First open on a new path creates an
196    /// empty database.
197    ///
198    /// # Errors
199    ///
200    /// Returns an error if the path isn't writable or recovery fails.
201    ///
202    /// # Examples
203    ///
204    /// ```no_run
205    /// use grafeo_engine::GrafeoDB;
206    ///
207    /// let db = GrafeoDB::open("./my_social_network")?;
208    /// # Ok::<(), grafeo_common::utils::error::Error>(())
209    /// ```
210    #[cfg(feature = "wal")]
211    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
212        Self::with_config(Config::persistent(path.as_ref()))
213    }
214
215    /// Opens an existing database in read-only mode.
216    ///
217    /// Uses a shared file lock, so multiple processes can read the same
218    /// `.grafeo` file concurrently. The database loads the last checkpoint
219    /// snapshot but does **not** replay the WAL or allow mutations.
220    ///
221    /// Currently only supports the single-file (`.grafeo`) format.
222    ///
223    /// # Errors
224    ///
225    /// Returns an error if the file doesn't exist or can't be read.
226    ///
227    /// # Examples
228    ///
229    /// ```no_run
230    /// use grafeo_engine::GrafeoDB;
231    ///
232    /// let db = GrafeoDB::open_read_only("./my_graph.grafeo")?;
233    /// let session = db.session();
234    /// let result = session.execute("MATCH (n) RETURN n LIMIT 10")?;
235    /// // Mutations will return an error:
236    /// // session.execute("INSERT (:Person)") => Err(ReadOnly)
237    /// # Ok::<(), grafeo_common::utils::error::Error>(())
238    /// ```
239    #[cfg(feature = "grafeo-file")]
240    pub fn open_read_only(path: impl AsRef<std::path::Path>) -> Result<Self> {
241        Self::with_config(Config::read_only(path.as_ref()))
242    }
243
244    /// Creates a database with custom configuration.
245    ///
246    /// Use this when you need fine-grained control over memory limits,
247    /// thread counts, or persistence settings. For most cases,
248    /// [`new_in_memory()`](Self::new_in_memory) or [`open()`](Self::open)
249    /// are simpler.
250    ///
251    /// # Errors
252    ///
253    /// Returns an error if the database can't be created or recovery fails.
254    ///
255    /// # Examples
256    ///
257    /// ```
258    /// use grafeo_engine::{GrafeoDB, Config};
259    ///
260    /// // In-memory with a 512MB limit
261    /// let config = Config::in_memory()
262    ///     .with_memory_limit(512 * 1024 * 1024);
263    ///
264    /// let db = GrafeoDB::with_config(config)?;
265    /// # Ok::<(), grafeo_common::utils::error::Error>(())
266    /// ```
267    pub fn with_config(config: Config) -> Result<Self> {
268        // Validate configuration before proceeding
269        config
270            .validate()
271            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
272
273        let store = Arc::new(LpgStore::new()?);
274        #[cfg(feature = "rdf")]
275        let rdf_store = Arc::new(RdfStore::new());
276        let transaction_manager = Arc::new(TransactionManager::new());
277
278        // Create buffer manager with configured limits
279        let buffer_config = BufferManagerConfig {
280            budget: config.memory_limit.unwrap_or_else(|| {
281                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
282            }),
283            spill_path: config
284                .spill_path
285                .clone()
286                .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
287            ..BufferManagerConfig::default()
288        };
289        let buffer_manager = BufferManager::new(buffer_config);
290
291        // Create catalog early so WAL replay can restore schema definitions
292        let catalog = Arc::new(Catalog::new());
293
294        let is_read_only = config.access_mode == crate::config::AccessMode::ReadOnly;
295
296        // --- Single-file format (.grafeo) ---
297        #[cfg(feature = "grafeo-file")]
298        let file_manager: Option<Arc<GrafeoFileManager>> = if is_read_only {
299            // Read-only mode: open with shared lock, load snapshot, skip WAL
300            if let Some(ref db_path) = config.path {
301                if db_path.exists() && db_path.is_file() {
302                    let fm = GrafeoFileManager::open_read_only(db_path)?;
303                    let snapshot_data = fm.read_snapshot()?;
304                    if !snapshot_data.is_empty() {
305                        Self::apply_snapshot_data(
306                            &store,
307                            &catalog,
308                            #[cfg(feature = "rdf")]
309                            &rdf_store,
310                            &snapshot_data,
311                        )?;
312                    }
313                    Some(Arc::new(fm))
314                } else {
315                    return Err(grafeo_common::utils::error::Error::Internal(format!(
316                        "read-only open requires an existing .grafeo file: {}",
317                        db_path.display()
318                    )));
319                }
320            } else {
321                return Err(grafeo_common::utils::error::Error::Internal(
322                    "read-only mode requires a database path".to_string(),
323                ));
324            }
325        } else if let Some(ref db_path) = config.path {
326            // Initialize the file manager whenever single-file format is selected,
327            // regardless of whether WAL is enabled. Without this, a database opened
328            // with wal_enabled:false + StorageFormat::SingleFile would produce no
329            // output at all (the file manager was previously gated behind wal_enabled).
330            if Self::should_use_single_file(db_path, config.storage_format) {
331                let fm = if db_path.exists() && db_path.is_file() {
332                    GrafeoFileManager::open(db_path)?
333                } else if !db_path.exists() {
334                    GrafeoFileManager::create(db_path)?
335                } else {
336                    // Path exists but is not a file (directory, etc.)
337                    return Err(grafeo_common::utils::error::Error::Internal(format!(
338                        "path exists but is not a file: {}",
339                        db_path.display()
340                    )));
341                };
342
343                // Load snapshot data from the file
344                let snapshot_data = fm.read_snapshot()?;
345                if !snapshot_data.is_empty() {
346                    Self::apply_snapshot_data(
347                        &store,
348                        &catalog,
349                        #[cfg(feature = "rdf")]
350                        &rdf_store,
351                        &snapshot_data,
352                    )?;
353                }
354
355                // Recover sidecar WAL if WAL is enabled and a sidecar exists
356                #[cfg(feature = "wal")]
357                if config.wal_enabled && fm.has_sidecar_wal() {
358                    let recovery = WalRecovery::new(fm.sidecar_wal_path());
359                    let records = recovery.recover()?;
360                    Self::apply_wal_records(
361                        &store,
362                        &catalog,
363                        #[cfg(feature = "rdf")]
364                        &rdf_store,
365                        &records,
366                    )?;
367                }
368
369                Some(Arc::new(fm))
370            } else {
371                None
372            }
373        } else {
374            None
375        };
376
377        // Determine whether to use the WAL directory path (legacy) or sidecar
378        // Read-only mode skips WAL entirely (no recovery, no creation).
379        #[cfg(feature = "wal")]
380        let wal = if is_read_only {
381            None
382        } else if config.wal_enabled {
383            if let Some(ref db_path) = config.path {
384                // When using single-file format, the WAL is a sidecar directory
385                #[cfg(feature = "grafeo-file")]
386                let wal_path = if let Some(ref fm) = file_manager {
387                    let p = fm.sidecar_wal_path();
388                    std::fs::create_dir_all(&p)?;
389                    p
390                } else {
391                    // Legacy: WAL inside the database directory
392                    std::fs::create_dir_all(db_path)?;
393                    db_path.join("wal")
394                };
395
396                #[cfg(not(feature = "grafeo-file"))]
397                let wal_path = {
398                    std::fs::create_dir_all(db_path)?;
399                    db_path.join("wal")
400                };
401
402                // For legacy WAL directory format, check if WAL exists and recover
403                #[cfg(feature = "grafeo-file")]
404                let is_single_file = file_manager.is_some();
405                #[cfg(not(feature = "grafeo-file"))]
406                let is_single_file = false;
407
408                if !is_single_file && wal_path.exists() {
409                    let recovery = WalRecovery::new(&wal_path);
410                    let records = recovery.recover()?;
411                    Self::apply_wal_records(
412                        &store,
413                        &catalog,
414                        #[cfg(feature = "rdf")]
415                        &rdf_store,
416                        &records,
417                    )?;
418                }
419
420                // Open/create WAL manager with configured durability
421                let wal_durability = match config.wal_durability {
422                    crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
423                    crate::config::DurabilityMode::Batch {
424                        max_delay_ms,
425                        max_records,
426                    } => WalDurabilityMode::Batch {
427                        max_delay_ms,
428                        max_records,
429                    },
430                    crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
431                        WalDurabilityMode::Adaptive { target_interval_ms }
432                    }
433                    crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
434                };
435                let wal_config = WalConfig {
436                    durability: wal_durability,
437                    ..WalConfig::default()
438                };
439                let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
440                Some(Arc::new(wal_manager))
441            } else {
442                None
443            }
444        } else {
445            None
446        };
447
448        // Create query cache with default capacity (1000 queries)
449        let query_cache = Arc::new(QueryCache::default());
450
451        // After all snapshot/WAL recovery, sync TransactionManager epoch
452        // with the store so queries use the correct viewing epoch.
453        #[cfg(feature = "temporal")]
454        transaction_manager.sync_epoch(store.current_epoch());
455
456        #[cfg(feature = "cdc")]
457        let cdc_enabled_val = config.cdc_enabled;
458
459        Ok(Self {
460            config,
461            store: Some(store),
462            catalog,
463            #[cfg(feature = "rdf")]
464            rdf_store,
465            transaction_manager,
466            buffer_manager,
467            #[cfg(feature = "wal")]
468            wal,
469            #[cfg(feature = "wal")]
470            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
471            query_cache,
472            commit_counter: Arc::new(AtomicUsize::new(0)),
473            is_open: RwLock::new(true),
474            #[cfg(feature = "cdc")]
475            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
476            #[cfg(feature = "cdc")]
477            cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
478            #[cfg(feature = "embed")]
479            embedding_models: RwLock::new(hashbrown::HashMap::new()),
480            #[cfg(feature = "grafeo-file")]
481            file_manager,
482            external_read_store: None,
483            external_write_store: None,
484            #[cfg(feature = "metrics")]
485            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
486            current_graph: RwLock::new(None),
487            current_schema: RwLock::new(None),
488            read_only: is_read_only,
489        })
490    }
491
492    /// Creates a database backed by a custom [`GraphStoreMut`] implementation.
493    ///
494    /// The external store handles all data persistence. WAL, CDC, and index
495    /// management are the responsibility of the store implementation.
496    ///
497    /// Query execution (all 6 languages, optimizer, planner) works through the
498    /// provided store. Admin operations (schema introspection, persistence,
499    /// vector/text indexes) are not available on external stores.
500    ///
501    /// # Examples
502    ///
503    /// ```no_run
504    /// use std::sync::Arc;
505    /// use grafeo_engine::{GrafeoDB, Config};
506    /// use grafeo_core::graph::GraphStoreMut;
507    ///
508    /// fn example(store: Arc<dyn GraphStoreMut>) -> grafeo_common::utils::error::Result<()> {
509    ///     let db = GrafeoDB::with_store(store, Config::in_memory())?;
510    ///     let result = db.execute("MATCH (n) RETURN count(n)")?;
511    ///     Ok(())
512    /// }
513    /// ```
514    ///
515    /// [`GraphStoreMut`]: grafeo_core::graph::GraphStoreMut
516    pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
517        config
518            .validate()
519            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
520
521        let transaction_manager = Arc::new(TransactionManager::new());
522
523        let buffer_config = BufferManagerConfig {
524            budget: config.memory_limit.unwrap_or_else(|| {
525                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
526            }),
527            spill_path: None,
528            ..BufferManagerConfig::default()
529        };
530        let buffer_manager = BufferManager::new(buffer_config);
531
532        let query_cache = Arc::new(QueryCache::default());
533
534        #[cfg(feature = "cdc")]
535        let cdc_enabled_val = config.cdc_enabled;
536
537        Ok(Self {
538            config,
539            store: None,
540            catalog: Arc::new(Catalog::new()),
541            #[cfg(feature = "rdf")]
542            rdf_store: Arc::new(RdfStore::new()),
543            transaction_manager,
544            buffer_manager,
545            #[cfg(feature = "wal")]
546            wal: None,
547            #[cfg(feature = "wal")]
548            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
549            query_cache,
550            commit_counter: Arc::new(AtomicUsize::new(0)),
551            is_open: RwLock::new(true),
552            #[cfg(feature = "cdc")]
553            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
554            #[cfg(feature = "cdc")]
555            cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
556            #[cfg(feature = "embed")]
557            embedding_models: RwLock::new(hashbrown::HashMap::new()),
558            #[cfg(feature = "grafeo-file")]
559            file_manager: None,
560            external_read_store: Some(Arc::clone(&store) as Arc<dyn GraphStore>),
561            external_write_store: Some(store),
562            #[cfg(feature = "metrics")]
563            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
564            current_graph: RwLock::new(None),
565            current_schema: RwLock::new(None),
566            read_only: false,
567        })
568    }
569
570    /// Creates a database backed by a read-only [`GraphStore`].
571    ///
572    /// The database is set to read-only mode. Write queries (CREATE, SET,
573    /// DELETE) will return `TransactionError::ReadOnly`.
574    ///
575    /// # Examples
576    ///
577    /// ```no_run
578    /// use std::sync::Arc;
579    /// use grafeo_engine::{GrafeoDB, Config};
580    /// use grafeo_core::graph::GraphStore;
581    ///
582    /// fn example(store: Arc<dyn GraphStore>) -> grafeo_common::utils::error::Result<()> {
583    ///     let db = GrafeoDB::with_read_store(store, Config::in_memory())?;
584    ///     let result = db.execute("MATCH (n) RETURN count(n)")?;
585    ///     Ok(())
586    /// }
587    /// ```
588    ///
589    /// [`GraphStore`]: grafeo_core::graph::GraphStore
590    pub fn with_read_store(store: Arc<dyn GraphStore>, config: Config) -> Result<Self> {
591        config
592            .validate()
593            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
594
595        let transaction_manager = Arc::new(TransactionManager::new());
596
597        let buffer_config = BufferManagerConfig {
598            budget: config.memory_limit.unwrap_or_else(|| {
599                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
600            }),
601            spill_path: None,
602            ..BufferManagerConfig::default()
603        };
604        let buffer_manager = BufferManager::new(buffer_config);
605
606        let query_cache = Arc::new(QueryCache::default());
607
608        #[cfg(feature = "cdc")]
609        let cdc_enabled_val = config.cdc_enabled;
610
611        Ok(Self {
612            config,
613            store: None,
614            catalog: Arc::new(Catalog::new()),
615            #[cfg(feature = "rdf")]
616            rdf_store: Arc::new(RdfStore::new()),
617            transaction_manager,
618            buffer_manager,
619            #[cfg(feature = "wal")]
620            wal: None,
621            #[cfg(feature = "wal")]
622            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
623            query_cache,
624            commit_counter: Arc::new(AtomicUsize::new(0)),
625            is_open: RwLock::new(true),
626            #[cfg(feature = "cdc")]
627            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
628            #[cfg(feature = "cdc")]
629            cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
630            #[cfg(feature = "embed")]
631            embedding_models: RwLock::new(hashbrown::HashMap::new()),
632            #[cfg(feature = "grafeo-file")]
633            file_manager: None,
634            external_read_store: Some(store),
635            external_write_store: None,
636            #[cfg(feature = "metrics")]
637            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
638            current_graph: RwLock::new(None),
639            current_schema: RwLock::new(None),
640            read_only: true,
641        })
642    }
643
644    /// Converts the database to a read-only [`CompactStore`] for faster queries.
645    ///
646    /// Takes a snapshot of all nodes and edges from the current store, builds
647    /// a columnar `CompactStore` with CSR adjacency, and switches the database
648    /// to read-only mode. The original store is dropped to free memory.
649    ///
650    /// After calling this, all write queries will fail with
651    /// `TransactionError::ReadOnly`. Read queries (across all supported
652    /// languages) continue to work and benefit from ~60x memory reduction
653    /// and 100x+ traversal speedup.
654    ///
655    /// # Errors
656    ///
657    /// Returns an error if the conversion fails (e.g. more than 32,767
658    /// distinct labels or edge types).
659    ///
660    /// [`CompactStore`]: grafeo_core::graph::compact::CompactStore
661    #[cfg(feature = "compact-store")]
662    pub fn compact(&mut self) -> Result<()> {
663        use grafeo_core::graph::compact::from_graph_store;
664
665        let current_store = self.graph_store();
666        let compact = from_graph_store(current_store.as_ref())
667            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
668
669        self.external_read_store = Some(Arc::new(compact) as Arc<dyn GraphStore>);
670        self.external_write_store = None;
671        self.store = None;
672        self.read_only = true;
673        self.query_cache = Arc::new(QueryCache::default());
674
675        Ok(())
676    }
677
678    /// Applies WAL records to restore the database state.
679    ///
680    /// Data mutation records are routed through a graph cursor that tracks
681    /// `SwitchGraph` context markers, replaying mutations into the correct
682    /// named graph (or the default graph when cursor is `None`).
683    #[cfg(feature = "wal")]
684    fn apply_wal_records(
685        store: &Arc<LpgStore>,
686        catalog: &Catalog,
687        #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
688        records: &[WalRecord],
689    ) -> Result<()> {
690        use crate::catalog::{
691            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
692        };
693        use grafeo_common::utils::error::Error;
694
695        // Graph cursor: tracks which named graph receives data mutations.
696        // `None` means the default graph.
697        let mut current_graph: Option<String> = None;
698        let mut target_store: Arc<LpgStore> = Arc::clone(store);
699
700        for record in records {
701            match record {
702                // --- Named graph lifecycle ---
703                WalRecord::CreateNamedGraph { name } => {
704                    let _ = store.create_graph(name);
705                }
706                WalRecord::DropNamedGraph { name } => {
707                    store.drop_graph(name);
708                    // Reset cursor if the dropped graph was active
709                    if current_graph.as_deref() == Some(name.as_str()) {
710                        current_graph = None;
711                        target_store = Arc::clone(store);
712                    }
713                }
714                WalRecord::SwitchGraph { name } => {
715                    current_graph.clone_from(name);
716                    target_store = match &current_graph {
717                        None => Arc::clone(store),
718                        Some(graph_name) => store
719                            .graph_or_create(graph_name)
720                            .map_err(|e| Error::Internal(e.to_string()))?,
721                    };
722                }
723
724                // --- Data mutations: routed through target_store ---
725                WalRecord::CreateNode { id, labels } => {
726                    let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
727                    target_store.create_node_with_id(*id, &label_refs)?;
728                }
729                WalRecord::DeleteNode { id } => {
730                    target_store.delete_node(*id);
731                }
732                WalRecord::CreateEdge {
733                    id,
734                    src,
735                    dst,
736                    edge_type,
737                } => {
738                    target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
739                }
740                WalRecord::DeleteEdge { id } => {
741                    target_store.delete_edge(*id);
742                }
743                WalRecord::SetNodeProperty { id, key, value } => {
744                    target_store.set_node_property(*id, key, value.clone());
745                }
746                WalRecord::SetEdgeProperty { id, key, value } => {
747                    target_store.set_edge_property(*id, key, value.clone());
748                }
749                WalRecord::AddNodeLabel { id, label } => {
750                    target_store.add_label(*id, label);
751                }
752                WalRecord::RemoveNodeLabel { id, label } => {
753                    target_store.remove_label(*id, label);
754                }
755                WalRecord::RemoveNodeProperty { id, key } => {
756                    target_store.remove_node_property(*id, key);
757                }
758                WalRecord::RemoveEdgeProperty { id, key } => {
759                    target_store.remove_edge_property(*id, key);
760                }
761
762                // --- Schema DDL replay (always on root catalog) ---
763                WalRecord::CreateNodeType {
764                    name,
765                    properties,
766                    constraints,
767                } => {
768                    let def = NodeTypeDefinition {
769                        name: name.clone(),
770                        properties: properties
771                            .iter()
772                            .map(|(n, t, nullable)| TypedProperty {
773                                name: n.clone(),
774                                data_type: PropertyDataType::from_type_name(t),
775                                nullable: *nullable,
776                                default_value: None,
777                            })
778                            .collect(),
779                        constraints: constraints
780                            .iter()
781                            .map(|(kind, props)| match kind.as_str() {
782                                "unique" => TypeConstraint::Unique(props.clone()),
783                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
784                                "not_null" if !props.is_empty() => {
785                                    TypeConstraint::NotNull(props[0].clone())
786                                }
787                                _ => TypeConstraint::Unique(props.clone()),
788                            })
789                            .collect(),
790                        parent_types: Vec::new(),
791                    };
792                    let _ = catalog.register_node_type(def);
793                }
794                WalRecord::DropNodeType { name } => {
795                    let _ = catalog.drop_node_type(name);
796                }
797                WalRecord::CreateEdgeType {
798                    name,
799                    properties,
800                    constraints,
801                } => {
802                    let def = EdgeTypeDefinition {
803                        name: name.clone(),
804                        properties: properties
805                            .iter()
806                            .map(|(n, t, nullable)| TypedProperty {
807                                name: n.clone(),
808                                data_type: PropertyDataType::from_type_name(t),
809                                nullable: *nullable,
810                                default_value: None,
811                            })
812                            .collect(),
813                        constraints: constraints
814                            .iter()
815                            .map(|(kind, props)| match kind.as_str() {
816                                "unique" => TypeConstraint::Unique(props.clone()),
817                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
818                                "not_null" if !props.is_empty() => {
819                                    TypeConstraint::NotNull(props[0].clone())
820                                }
821                                _ => TypeConstraint::Unique(props.clone()),
822                            })
823                            .collect(),
824                        source_node_types: Vec::new(),
825                        target_node_types: Vec::new(),
826                    };
827                    let _ = catalog.register_edge_type_def(def);
828                }
829                WalRecord::DropEdgeType { name } => {
830                    let _ = catalog.drop_edge_type_def(name);
831                }
832                WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
833                    // Index recreation is handled by the store on startup
834                    // (indexes are rebuilt from data, not WAL)
835                }
836                WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
837                    // Constraint definitions are part of type definitions
838                    // and replayed via CreateNodeType/CreateEdgeType
839                }
840                WalRecord::CreateGraphType {
841                    name,
842                    node_types,
843                    edge_types,
844                    open,
845                } => {
846                    use crate::catalog::GraphTypeDefinition;
847                    let def = GraphTypeDefinition {
848                        name: name.clone(),
849                        allowed_node_types: node_types.clone(),
850                        allowed_edge_types: edge_types.clone(),
851                        open: *open,
852                    };
853                    let _ = catalog.register_graph_type(def);
854                }
855                WalRecord::DropGraphType { name } => {
856                    let _ = catalog.drop_graph_type(name);
857                }
858                WalRecord::CreateSchema { name } => {
859                    let _ = catalog.register_schema_namespace(name.clone());
860                }
861                WalRecord::DropSchema { name } => {
862                    let _ = catalog.drop_schema_namespace(name);
863                }
864
865                WalRecord::AlterNodeType { name, alterations } => {
866                    for (action, prop_name, type_name, nullable) in alterations {
867                        match action.as_str() {
868                            "add" => {
869                                let prop = TypedProperty {
870                                    name: prop_name.clone(),
871                                    data_type: PropertyDataType::from_type_name(type_name),
872                                    nullable: *nullable,
873                                    default_value: None,
874                                };
875                                let _ = catalog.alter_node_type_add_property(name, prop);
876                            }
877                            "drop" => {
878                                let _ = catalog.alter_node_type_drop_property(name, prop_name);
879                            }
880                            _ => {}
881                        }
882                    }
883                }
884                WalRecord::AlterEdgeType { name, alterations } => {
885                    for (action, prop_name, type_name, nullable) in alterations {
886                        match action.as_str() {
887                            "add" => {
888                                let prop = TypedProperty {
889                                    name: prop_name.clone(),
890                                    data_type: PropertyDataType::from_type_name(type_name),
891                                    nullable: *nullable,
892                                    default_value: None,
893                                };
894                                let _ = catalog.alter_edge_type_add_property(name, prop);
895                            }
896                            "drop" => {
897                                let _ = catalog.alter_edge_type_drop_property(name, prop_name);
898                            }
899                            _ => {}
900                        }
901                    }
902                }
903                WalRecord::AlterGraphType { name, alterations } => {
904                    for (action, type_name) in alterations {
905                        match action.as_str() {
906                            "add_node" => {
907                                let _ =
908                                    catalog.alter_graph_type_add_node_type(name, type_name.clone());
909                            }
910                            "drop_node" => {
911                                let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
912                            }
913                            "add_edge" => {
914                                let _ =
915                                    catalog.alter_graph_type_add_edge_type(name, type_name.clone());
916                            }
917                            "drop_edge" => {
918                                let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
919                            }
920                            _ => {}
921                        }
922                    }
923                }
924
925                WalRecord::CreateProcedure {
926                    name,
927                    params,
928                    returns,
929                    body,
930                } => {
931                    use crate::catalog::ProcedureDefinition;
932                    let def = ProcedureDefinition {
933                        name: name.clone(),
934                        params: params.clone(),
935                        returns: returns.clone(),
936                        body: body.clone(),
937                    };
938                    let _ = catalog.register_procedure(def);
939                }
940                WalRecord::DropProcedure { name } => {
941                    let _ = catalog.drop_procedure(name);
942                }
943
944                // --- RDF triple replay ---
945                #[cfg(feature = "rdf")]
946                WalRecord::InsertRdfTriple { .. }
947                | WalRecord::DeleteRdfTriple { .. }
948                | WalRecord::ClearRdfGraph { .. }
949                | WalRecord::CreateRdfGraph { .. }
950                | WalRecord::DropRdfGraph { .. } => {
951                    rdf_ops::replay_rdf_wal_record(rdf_store, record);
952                }
953                #[cfg(not(feature = "rdf"))]
954                WalRecord::InsertRdfTriple { .. }
955                | WalRecord::DeleteRdfTriple { .. }
956                | WalRecord::ClearRdfGraph { .. }
957                | WalRecord::CreateRdfGraph { .. }
958                | WalRecord::DropRdfGraph { .. } => {}
959
960                WalRecord::TransactionCommit { .. } => {
961                    // In temporal mode, advance the store epoch on each committed
962                    // transaction so that subsequent property/label operations
963                    // are recorded at the correct epoch in their VersionLogs.
964                    #[cfg(feature = "temporal")]
965                    {
966                        target_store.new_epoch();
967                    }
968                }
969                WalRecord::TransactionAbort { .. } | WalRecord::Checkpoint { .. } => {
970                    // Transaction control records don't need replay action
971                    // (recovery already filtered to only committed transactions)
972                }
973            }
974        }
975        Ok(())
976    }
977
978    // =========================================================================
979    // Single-file format helpers
980    // =========================================================================
981
982    /// Returns `true` if the given path should use single-file format.
983    #[cfg(feature = "grafeo-file")]
984    fn should_use_single_file(
985        path: &std::path::Path,
986        configured: crate::config::StorageFormat,
987    ) -> bool {
988        use crate::config::StorageFormat;
989        match configured {
990            StorageFormat::SingleFile => true,
991            StorageFormat::WalDirectory => false,
992            StorageFormat::Auto => {
993                // Existing file: check magic bytes
994                if path.is_file() {
995                    if let Ok(mut f) = std::fs::File::open(path) {
996                        use std::io::Read;
997                        let mut magic = [0u8; 4];
998                        if f.read_exact(&mut magic).is_ok()
999                            && magic == grafeo_adapters::storage::file::MAGIC
1000                        {
1001                            return true;
1002                        }
1003                    }
1004                    return false;
1005                }
1006                // Existing directory: legacy format
1007                if path.is_dir() {
1008                    return false;
1009                }
1010                // New path: check extension
1011                path.extension().is_some_and(|ext| ext == "grafeo")
1012            }
1013        }
1014    }
1015
1016    /// Applies snapshot data (from a `.grafeo` file) to restore the store and catalog.
1017    #[cfg(feature = "grafeo-file")]
1018    fn apply_snapshot_data(
1019        store: &Arc<LpgStore>,
1020        catalog: &Arc<crate::catalog::Catalog>,
1021        #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
1022        data: &[u8],
1023    ) -> Result<()> {
1024        persistence::load_snapshot_into_store(
1025            store,
1026            catalog,
1027            #[cfg(feature = "rdf")]
1028            rdf_store,
1029            data,
1030        )
1031    }
1032
1033    // =========================================================================
1034    // Session & Configuration
1035    // =========================================================================
1036
1037    /// Opens a new session for running queries.
1038    ///
1039    /// Sessions are cheap to create: spin up as many as you need. Each
1040    /// gets its own transaction context, so concurrent sessions won't
1041    /// block each other on reads.
1042    ///
1043    /// # Panics
1044    ///
1045    /// Panics if the database was configured with an external graph store and
1046    /// the internal arena allocator cannot be initialized (out of memory).
1047    ///
1048    /// # Examples
1049    ///
1050    /// ```
1051    /// use grafeo_engine::GrafeoDB;
1052    ///
1053    /// let db = GrafeoDB::new_in_memory();
1054    /// let session = db.session();
1055    ///
1056    /// // Run queries through the session
1057    /// let result = session.execute("MATCH (n) RETURN count(n)")?;
1058    /// # Ok::<(), grafeo_common::utils::error::Error>(())
1059    /// ```
1060    #[must_use]
1061    pub fn session(&self) -> Session {
1062        self.create_session_inner(None)
1063    }
1064
1065    /// Creates a session with an explicit CDC override.
1066    ///
1067    /// When `cdc_enabled` is `true`, mutations in this session are tracked
1068    /// regardless of the database default. When `false`, mutations are not
1069    /// tracked regardless of the database default.
1070    ///
1071    /// # Examples
1072    ///
1073    /// ```
1074    /// use grafeo_engine::GrafeoDB;
1075    ///
1076    /// let db = GrafeoDB::new_in_memory();
1077    ///
1078    /// // Opt in to CDC for just this session
1079    /// let tracked = db.session_with_cdc(true);
1080    /// tracked.execute("INSERT (:Person {name: 'Alix'})")?;
1081    /// # Ok::<(), grafeo_common::utils::error::Error>(())
1082    /// ```
1083    #[cfg(feature = "cdc")]
1084    #[must_use]
1085    pub fn session_with_cdc(&self, cdc_enabled: bool) -> Session {
1086        self.create_session_inner(Some(cdc_enabled))
1087    }
1088
1089    /// Creates a read-only session regardless of the database's access mode.
1090    ///
1091    /// Mutations executed through this session will fail with
1092    /// `TransactionError::ReadOnly`. Useful for replication replicas where
1093    /// the database itself must remain writable (for applying CDC changes)
1094    /// but client-facing queries must be read-only.
1095    #[must_use]
1096    pub fn session_read_only(&self) -> Session {
1097        self.create_session_inner_opts(None, true)
1098    }
1099
1100    /// Shared session creation logic.
1101    ///
1102    /// `cdc_override` overrides the database-wide `cdc_enabled` default when
1103    /// `Some`. `None` falls back to the database default.
1104    #[allow(unused_variables)] // cdc_override unused when cdc feature is off
1105    fn create_session_inner(&self, cdc_override: Option<bool>) -> Session {
1106        self.create_session_inner_opts(cdc_override, false)
1107    }
1108
1109    /// Shared session creation with all overrides.
1110    #[allow(unused_variables)]
1111    fn create_session_inner_opts(
1112        &self,
1113        cdc_override: Option<bool>,
1114        force_read_only: bool,
1115    ) -> Session {
1116        let session_cfg = || crate::session::SessionConfig {
1117            transaction_manager: Arc::clone(&self.transaction_manager),
1118            query_cache: Arc::clone(&self.query_cache),
1119            catalog: Arc::clone(&self.catalog),
1120            adaptive_config: self.config.adaptive.clone(),
1121            factorized_execution: self.config.factorized_execution,
1122            graph_model: self.config.graph_model,
1123            query_timeout: self.config.query_timeout,
1124            commit_counter: Arc::clone(&self.commit_counter),
1125            gc_interval: self.config.gc_interval,
1126            read_only: self.read_only || force_read_only,
1127        };
1128
1129        if let Some(ref ext_read) = self.external_read_store {
1130            return Session::with_external_store(
1131                Arc::clone(ext_read),
1132                self.external_write_store.as_ref().map(Arc::clone),
1133                session_cfg(),
1134            )
1135            .expect("arena allocation for external store session");
1136        }
1137
1138        #[cfg(feature = "rdf")]
1139        let mut session = Session::with_rdf_store_and_adaptive(
1140            Arc::clone(self.lpg_store()),
1141            Arc::clone(&self.rdf_store),
1142            session_cfg(),
1143        );
1144        #[cfg(not(feature = "rdf"))]
1145        let mut session = Session::with_adaptive(Arc::clone(self.lpg_store()), session_cfg());
1146
1147        #[cfg(feature = "wal")]
1148        if let Some(ref wal) = self.wal {
1149            session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
1150        }
1151
1152        #[cfg(feature = "cdc")]
1153        {
1154            let should_enable = cdc_override.unwrap_or_else(|| self.cdc_active());
1155            if should_enable {
1156                session.set_cdc_log(Arc::clone(&self.cdc_log));
1157            }
1158        }
1159
1160        #[cfg(feature = "metrics")]
1161        {
1162            if let Some(ref m) = self.metrics {
1163                session.set_metrics(Arc::clone(m));
1164                m.session_created
1165                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1166                m.session_active
1167                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1168            }
1169        }
1170
1171        // Propagate persistent graph context to the new session
1172        if let Some(ref graph) = *self.current_graph.read() {
1173            session.use_graph(graph);
1174        }
1175
1176        // Propagate persistent schema context to the new session
1177        if let Some(ref schema) = *self.current_schema.read() {
1178            session.set_schema(schema);
1179        }
1180
1181        // Suppress unused_mut when cdc/wal are disabled
1182        let _ = &mut session;
1183
1184        session
1185    }
1186
1187    /// Returns the current graph name, if any.
1188    ///
1189    /// This is the persistent graph context used by one-shot `execute()` calls.
1190    /// It is updated whenever `execute()` encounters `USE GRAPH`, `SESSION SET GRAPH`,
1191    /// or `SESSION RESET`.
1192    #[must_use]
1193    pub fn current_graph(&self) -> Option<String> {
1194        self.current_graph.read().clone()
1195    }
1196
1197    /// Sets the current graph context for subsequent one-shot `execute()` calls.
1198    ///
1199    /// This is equivalent to running `USE GRAPH <name>` but without creating a session.
1200    /// Pass `None` to reset to the default graph.
1201    pub fn set_current_graph(&self, name: Option<&str>) {
1202        *self.current_graph.write() = name.map(ToString::to_string);
1203    }
1204
1205    /// Returns the current schema name, if any.
1206    ///
1207    /// This is the persistent schema context used by one-shot `execute()` calls.
1208    /// It is updated whenever `execute()` encounters `SESSION SET SCHEMA` or `SESSION RESET`.
1209    #[must_use]
1210    pub fn current_schema(&self) -> Option<String> {
1211        self.current_schema.read().clone()
1212    }
1213
1214    /// Sets the current schema context for subsequent one-shot `execute()` calls.
1215    ///
1216    /// This is equivalent to running `SESSION SET SCHEMA <name>` but without creating
1217    /// a session. Pass `None` to clear the schema context.
1218    pub fn set_current_schema(&self, name: Option<&str>) {
1219        *self.current_schema.write() = name.map(ToString::to_string);
1220    }
1221
1222    /// Returns the adaptive execution configuration.
1223    #[must_use]
1224    pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
1225        &self.config.adaptive
1226    }
1227
1228    /// Returns `true` if this database was opened in read-only mode.
1229    #[must_use]
1230    pub fn is_read_only(&self) -> bool {
1231        self.read_only
1232    }
1233
1234    /// Returns the configuration.
1235    #[must_use]
1236    pub fn config(&self) -> &Config {
1237        &self.config
1238    }
1239
1240    /// Returns the graph data model of this database.
1241    #[must_use]
1242    pub fn graph_model(&self) -> crate::config::GraphModel {
1243        self.config.graph_model
1244    }
1245
1246    /// Returns the configured memory limit in bytes, if any.
1247    #[must_use]
1248    pub fn memory_limit(&self) -> Option<usize> {
1249        self.config.memory_limit
1250    }
1251
1252    /// Returns a point-in-time snapshot of all metrics.
1253    ///
1254    /// If the `metrics` feature is disabled or the registry is not
1255    /// initialized, returns a default (all-zero) snapshot.
1256    #[cfg(feature = "metrics")]
1257    #[must_use]
1258    pub fn metrics(&self) -> crate::metrics::MetricsSnapshot {
1259        let mut snapshot = self
1260            .metrics
1261            .as_ref()
1262            .map_or_else(crate::metrics::MetricsSnapshot::default, |m| m.snapshot());
1263
1264        // Augment with cache stats from the query cache (not tracked in the registry)
1265        let cache_stats = self.query_cache.stats();
1266        snapshot.cache_hits = cache_stats.parsed_hits + cache_stats.optimized_hits;
1267        snapshot.cache_misses = cache_stats.parsed_misses + cache_stats.optimized_misses;
1268        snapshot.cache_size = cache_stats.parsed_size + cache_stats.optimized_size;
1269        snapshot.cache_invalidations = cache_stats.invalidations;
1270
1271        snapshot
1272    }
1273
1274    /// Returns all metrics in Prometheus text exposition format.
1275    ///
1276    /// The output is ready to serve from an HTTP `/metrics` endpoint.
1277    #[cfg(feature = "metrics")]
1278    #[must_use]
1279    pub fn metrics_prometheus(&self) -> String {
1280        self.metrics
1281            .as_ref()
1282            .map_or_else(String::new, |m| m.to_prometheus())
1283    }
1284
1285    /// Resets all metrics counters and histograms to zero.
1286    #[cfg(feature = "metrics")]
1287    pub fn reset_metrics(&self) {
1288        if let Some(ref m) = self.metrics {
1289            m.reset();
1290        }
1291        self.query_cache.reset_stats();
1292    }
1293
1294    /// Returns the underlying (default) store.
1295    ///
1296    /// This provides direct access to the LPG store for algorithm implementations
1297    /// and admin operations (index management, schema introspection, MVCC internals).
1298    ///
1299    /// For code that only needs read/write graph operations, prefer
1300    /// [`graph_store()`](Self::graph_store) which returns the trait interface.
1301    #[must_use]
1302    pub fn store(&self) -> &Arc<LpgStore> {
1303        self.lpg_store()
1304    }
1305
1306    /// Returns the LPG store for the currently active graph.
1307    ///
1308    /// If [`current_graph`](Self::current_graph) is `None` or `"default"`, returns
1309    /// the default store. Otherwise looks up the named graph in the root store.
1310    /// Falls back to the default store if the named graph does not exist.
1311    #[allow(dead_code)] // Reserved for future graph-aware CRUD methods
1312    fn active_store(&self) -> Arc<LpgStore> {
1313        let store = self.lpg_store();
1314        let graph_name = self.current_graph.read().clone();
1315        match graph_name {
1316            None => Arc::clone(store),
1317            Some(ref name) if name.eq_ignore_ascii_case("default") => Arc::clone(store),
1318            Some(ref name) => store.graph(name).unwrap_or_else(|| Arc::clone(store)),
1319        }
1320    }
1321
1322    // === Named Graph Management ===
1323
1324    /// Creates a named graph. Returns `true` if created, `false` if it already exists.
1325    ///
1326    /// # Errors
1327    ///
1328    /// Returns an error if arena allocation fails.
1329    pub fn create_graph(&self, name: &str) -> Result<bool> {
1330        Ok(self.lpg_store().create_graph(name)?)
1331    }
1332
1333    /// Drops a named graph. Returns `true` if dropped, `false` if it did not exist.
1334    pub fn drop_graph(&self, name: &str) -> bool {
1335        self.lpg_store().drop_graph(name)
1336    }
1337
1338    /// Returns all named graph names.
1339    #[must_use]
1340    pub fn list_graphs(&self) -> Vec<String> {
1341        self.lpg_store().graph_names()
1342    }
1343
1344    /// Returns the graph store as a trait object.
1345    ///
1346    /// Returns a read-only trait object for the active graph store.
1347    ///
1348    /// This provides the [`GraphStore`] interface for code that only needs
1349    /// read operations. For write access, use [`graph_store_mut()`](Self::graph_store_mut).
1350    ///
1351    /// [`GraphStore`]: grafeo_core::graph::GraphStore
1352    #[must_use]
1353    pub fn graph_store(&self) -> Arc<dyn GraphStore> {
1354        if let Some(ref ext_read) = self.external_read_store {
1355            Arc::clone(ext_read)
1356        } else {
1357            Arc::clone(self.lpg_store()) as Arc<dyn GraphStore>
1358        }
1359    }
1360
1361    /// Returns the writable graph store, if available.
1362    ///
1363    /// Returns `None` for read-only databases created via
1364    /// [`with_read_store()`](Self::with_read_store).
1365    #[must_use]
1366    pub fn graph_store_mut(&self) -> Option<Arc<dyn GraphStoreMut>> {
1367        if self.external_read_store.is_some() {
1368            self.external_write_store.as_ref().map(Arc::clone)
1369        } else {
1370            Some(Arc::clone(self.lpg_store()) as Arc<dyn GraphStoreMut>)
1371        }
1372    }
1373
1374    /// Garbage collects old MVCC versions that are no longer visible.
1375    ///
1376    /// Determines the minimum epoch required by active transactions and prunes
1377    /// version chains older than that threshold. Also cleans up completed
1378    /// transaction metadata in the transaction manager.
1379    pub fn gc(&self) {
1380        let min_epoch = self.transaction_manager.min_active_epoch();
1381        self.lpg_store().gc_versions(min_epoch);
1382        self.transaction_manager.gc();
1383    }
1384
1385    /// Returns the buffer manager for memory-aware operations.
1386    #[must_use]
1387    pub fn buffer_manager(&self) -> &Arc<BufferManager> {
1388        &self.buffer_manager
1389    }
1390
1391    /// Returns the query cache.
1392    #[must_use]
1393    pub fn query_cache(&self) -> &Arc<QueryCache> {
1394        &self.query_cache
1395    }
1396
1397    /// Clears all cached query plans.
1398    ///
1399    /// This is called automatically after DDL operations, but can also be
1400    /// invoked manually after external schema changes (e.g., WAL replay,
1401    /// import) or when you want to force re-optimization of all queries.
1402    pub fn clear_plan_cache(&self) {
1403        self.query_cache.clear();
1404    }
1405
1406    // =========================================================================
1407    // Lifecycle
1408    // =========================================================================
1409
1410    /// Closes the database, flushing all pending writes.
1411    ///
1412    /// For persistent databases, this ensures everything is safely on disk.
1413    /// Called automatically when the database is dropped, but you can call
1414    /// it explicitly if you need to guarantee durability at a specific point.
1415    ///
1416    /// # Errors
1417    ///
1418    /// Returns an error if the WAL can't be flushed (check disk space/permissions).
1419    pub fn close(&self) -> Result<()> {
1420        let mut is_open = self.is_open.write();
1421        if !*is_open {
1422            return Ok(());
1423        }
1424
1425        // Read-only databases: just release the shared lock, no checkpointing
1426        if self.read_only {
1427            #[cfg(feature = "grafeo-file")]
1428            if let Some(ref fm) = self.file_manager {
1429                fm.close()?;
1430            }
1431            *is_open = false;
1432            return Ok(());
1433        }
1434
1435        // For single-file format: checkpoint to .grafeo file, then clean up sidecar WAL.
1436        // We must do this BEFORE the WAL close path because checkpoint_to_file
1437        // removes the sidecar WAL directory.
1438        #[cfg(feature = "grafeo-file")]
1439        let is_single_file = self.file_manager.is_some();
1440        #[cfg(not(feature = "grafeo-file"))]
1441        let is_single_file = false;
1442
1443        #[cfg(feature = "grafeo-file")]
1444        if let Some(ref fm) = self.file_manager {
1445            // Flush WAL first so all records are on disk before we snapshot
1446            #[cfg(feature = "wal")]
1447            if let Some(ref wal) = self.wal {
1448                wal.sync()?;
1449            }
1450            self.checkpoint_to_file(fm)?;
1451
1452            // Release WAL file handles before removing sidecar directory.
1453            // On Windows, open handles prevent directory deletion.
1454            #[cfg(feature = "wal")]
1455            if let Some(ref wal) = self.wal {
1456                wal.close_active_log();
1457            }
1458
1459            {
1460                use grafeo_core::testing::crash::maybe_crash;
1461                maybe_crash("close:before_remove_sidecar_wal");
1462            }
1463            fm.remove_sidecar_wal()?;
1464            fm.close()?;
1465        }
1466
1467        // Commit and sync WAL (legacy directory format only).
1468        // We intentionally do NOT call wal.checkpoint() here. Directory format
1469        // has no snapshot: the WAL files are the sole source of truth. Writing
1470        // checkpoint.meta would cause recovery to skip older WAL files, losing
1471        // all data that predates the current log sequence.
1472        #[cfg(feature = "wal")]
1473        if !is_single_file && let Some(ref wal) = self.wal {
1474            // Use the last assigned transaction ID, or create one for the commit record
1475            let commit_tx = self
1476                .transaction_manager
1477                .last_assigned_transaction_id()
1478                .unwrap_or_else(|| self.transaction_manager.begin());
1479
1480            // Log a TransactionCommit to mark all pending records as committed
1481            wal.log(&WalRecord::TransactionCommit {
1482                transaction_id: commit_tx,
1483            })?;
1484
1485            wal.sync()?;
1486        }
1487
1488        *is_open = false;
1489        Ok(())
1490    }
1491
1492    /// Returns the typed WAL if available.
1493    #[cfg(feature = "wal")]
1494    #[must_use]
1495    pub fn wal(&self) -> Option<&Arc<LpgWal>> {
1496        self.wal.as_ref()
1497    }
1498
1499    /// Logs a WAL record if WAL is enabled.
1500    #[cfg(feature = "wal")]
1501    pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
1502        if let Some(ref wal) = self.wal {
1503            wal.log(record)?;
1504        }
1505        Ok(())
1506    }
1507
1508    /// Writes the current database snapshot to the `.grafeo` file.
1509    ///
1510    /// Does NOT remove the sidecar WAL: callers that want to clean up
1511    /// the sidecar (e.g. `close()`) should call `fm.remove_sidecar_wal()`
1512    /// separately after this returns.
1513    #[cfg(feature = "grafeo-file")]
1514    fn checkpoint_to_file(&self, fm: &GrafeoFileManager) -> Result<()> {
1515        use grafeo_core::testing::crash::maybe_crash;
1516
1517        maybe_crash("checkpoint_to_file:before_export");
1518        let snapshot_data = self.export_snapshot()?;
1519        maybe_crash("checkpoint_to_file:after_export");
1520
1521        let epoch = self.lpg_store().current_epoch();
1522        let transaction_id = self
1523            .transaction_manager
1524            .last_assigned_transaction_id()
1525            .map_or(0, |t| t.0);
1526        let node_count = self.lpg_store().node_count() as u64;
1527        let edge_count = self.lpg_store().edge_count() as u64;
1528
1529        fm.write_snapshot(
1530            &snapshot_data,
1531            epoch.0,
1532            transaction_id,
1533            node_count,
1534            edge_count,
1535        )?;
1536
1537        maybe_crash("checkpoint_to_file:after_write_snapshot");
1538        Ok(())
1539    }
1540
1541    /// Returns the file manager if using single-file format.
1542    #[cfg(feature = "grafeo-file")]
1543    #[must_use]
1544    pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
1545        self.file_manager.as_ref()
1546    }
1547}
1548
1549impl Drop for GrafeoDB {
1550    fn drop(&mut self) {
1551        if let Err(e) = self.close() {
1552            grafeo_error!("Error closing database: {}", e);
1553        }
1554    }
1555}
1556
1557impl crate::admin::AdminService for GrafeoDB {
1558    fn info(&self) -> crate::admin::DatabaseInfo {
1559        self.info()
1560    }
1561
1562    fn detailed_stats(&self) -> crate::admin::DatabaseStats {
1563        self.detailed_stats()
1564    }
1565
1566    fn schema(&self) -> crate::admin::SchemaInfo {
1567        self.schema()
1568    }
1569
1570    fn validate(&self) -> crate::admin::ValidationResult {
1571        self.validate()
1572    }
1573
1574    fn wal_status(&self) -> crate::admin::WalStatus {
1575        self.wal_status()
1576    }
1577
1578    fn wal_checkpoint(&self) -> Result<()> {
1579        self.wal_checkpoint()
1580    }
1581}
1582
1583// =========================================================================
1584// Query Result Types
1585// =========================================================================
1586
1587/// The result of running a query.
1588///
1589/// Contains rows and columns, like a table. Use [`iter()`](Self::iter) to
1590/// loop through rows, or [`scalar()`](Self::scalar) if you expect a single value.
1591///
1592/// # Examples
1593///
1594/// ```
1595/// use grafeo_engine::GrafeoDB;
1596///
1597/// let db = GrafeoDB::new_in_memory();
1598/// db.create_node(&["Person"]);
1599///
1600/// let result = db.execute("MATCH (p:Person) RETURN count(p) AS total")?;
1601///
1602/// // Check what we got
1603/// println!("Columns: {:?}", result.columns);
1604/// println!("Rows: {}", result.row_count());
1605///
1606/// // Iterate through results
1607/// for row in result.iter() {
1608///     println!("{:?}", row);
1609/// }
1610/// # Ok::<(), grafeo_common::utils::error::Error>(())
1611/// ```
1612#[derive(Debug)]
1613pub struct QueryResult {
1614    /// Column names from the RETURN clause.
1615    pub columns: Vec<String>,
1616    /// Column types - useful for distinguishing NodeId/EdgeId from plain integers.
1617    pub column_types: Vec<grafeo_common::types::LogicalType>,
1618    /// The actual result rows.
1619    pub rows: Vec<Vec<grafeo_common::types::Value>>,
1620    /// Query execution time in milliseconds (if timing was enabled).
1621    pub execution_time_ms: Option<f64>,
1622    /// Number of rows scanned during query execution (estimate).
1623    pub rows_scanned: Option<u64>,
1624    /// Status message for DDL and session commands (e.g., "Created node type 'Person'").
1625    pub status_message: Option<String>,
1626    /// GQLSTATUS code per ISO/IEC 39075:2024, sec 23.
1627    pub gql_status: grafeo_common::utils::GqlStatus,
1628}
1629
1630impl QueryResult {
1631    /// Creates a fully empty query result (no columns, no rows).
1632    #[must_use]
1633    pub fn empty() -> Self {
1634        Self {
1635            columns: Vec::new(),
1636            column_types: Vec::new(),
1637            rows: Vec::new(),
1638            execution_time_ms: None,
1639            rows_scanned: None,
1640            status_message: None,
1641            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1642        }
1643    }
1644
1645    /// Creates a query result with only a status message (for DDL commands).
1646    #[must_use]
1647    pub fn status(msg: impl Into<String>) -> Self {
1648        Self {
1649            columns: Vec::new(),
1650            column_types: Vec::new(),
1651            rows: Vec::new(),
1652            execution_time_ms: None,
1653            rows_scanned: None,
1654            status_message: Some(msg.into()),
1655            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1656        }
1657    }
1658
1659    /// Creates a new empty query result.
1660    #[must_use]
1661    pub fn new(columns: Vec<String>) -> Self {
1662        let len = columns.len();
1663        Self {
1664            columns,
1665            column_types: vec![grafeo_common::types::LogicalType::Any; len],
1666            rows: Vec::new(),
1667            execution_time_ms: None,
1668            rows_scanned: None,
1669            status_message: None,
1670            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1671        }
1672    }
1673
1674    /// Creates a new empty query result with column types.
1675    #[must_use]
1676    pub fn with_types(
1677        columns: Vec<String>,
1678        column_types: Vec<grafeo_common::types::LogicalType>,
1679    ) -> Self {
1680        Self {
1681            columns,
1682            column_types,
1683            rows: Vec::new(),
1684            execution_time_ms: None,
1685            rows_scanned: None,
1686            status_message: None,
1687            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1688        }
1689    }
1690
1691    /// Sets the execution metrics on this result.
1692    pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
1693        self.execution_time_ms = Some(execution_time_ms);
1694        self.rows_scanned = Some(rows_scanned);
1695        self
1696    }
1697
1698    /// Returns the execution time in milliseconds, if available.
1699    #[must_use]
1700    pub fn execution_time_ms(&self) -> Option<f64> {
1701        self.execution_time_ms
1702    }
1703
1704    /// Returns the number of rows scanned, if available.
1705    #[must_use]
1706    pub fn rows_scanned(&self) -> Option<u64> {
1707        self.rows_scanned
1708    }
1709
1710    /// Returns the number of rows.
1711    #[must_use]
1712    pub fn row_count(&self) -> usize {
1713        self.rows.len()
1714    }
1715
1716    /// Returns the number of columns.
1717    #[must_use]
1718    pub fn column_count(&self) -> usize {
1719        self.columns.len()
1720    }
1721
1722    /// Returns true if the result is empty.
1723    #[must_use]
1724    pub fn is_empty(&self) -> bool {
1725        self.rows.is_empty()
1726    }
1727
1728    /// Extracts a single value from the result.
1729    ///
1730    /// Use this when your query returns exactly one row with one column,
1731    /// like `RETURN count(n)` or `RETURN sum(p.amount)`.
1732    ///
1733    /// # Errors
1734    ///
1735    /// Returns an error if the result has multiple rows or columns.
1736    pub fn scalar<T: FromValue>(&self) -> Result<T> {
1737        if self.rows.len() != 1 || self.columns.len() != 1 {
1738            return Err(grafeo_common::utils::error::Error::InvalidValue(
1739                "Expected single value".to_string(),
1740            ));
1741        }
1742        T::from_value(&self.rows[0][0])
1743    }
1744
1745    /// Returns an iterator over the rows.
1746    pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1747        self.rows.iter()
1748    }
1749}
1750
1751impl std::fmt::Display for QueryResult {
1752    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1753        let table = grafeo_common::fmt::format_result_table(
1754            &self.columns,
1755            &self.rows,
1756            self.execution_time_ms,
1757            self.status_message.as_deref(),
1758        );
1759        f.write_str(&table)
1760    }
1761}
1762
1763/// Converts a [`grafeo_common::types::Value`] to a concrete Rust type.
1764///
1765/// Implemented for common types like `i64`, `f64`, `String`, and `bool`.
1766/// Used by [`QueryResult::scalar()`] to extract typed values.
1767pub trait FromValue: Sized {
1768    /// Attempts the conversion, returning an error on type mismatch.
1769    fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1770}
1771
1772impl FromValue for i64 {
1773    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1774        value
1775            .as_int64()
1776            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1777                expected: "INT64".to_string(),
1778                found: value.type_name().to_string(),
1779            })
1780    }
1781}
1782
1783impl FromValue for f64 {
1784    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1785        value
1786            .as_float64()
1787            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1788                expected: "FLOAT64".to_string(),
1789                found: value.type_name().to_string(),
1790            })
1791    }
1792}
1793
1794impl FromValue for String {
1795    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1796        value.as_str().map(String::from).ok_or_else(|| {
1797            grafeo_common::utils::error::Error::TypeMismatch {
1798                expected: "STRING".to_string(),
1799                found: value.type_name().to_string(),
1800            }
1801        })
1802    }
1803}
1804
1805impl FromValue for bool {
1806    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1807        value
1808            .as_bool()
1809            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1810                expected: "BOOL".to_string(),
1811                found: value.type_name().to_string(),
1812            })
1813    }
1814}
1815
1816#[cfg(test)]
1817mod tests {
1818    use super::*;
1819
1820    #[test]
1821    fn test_create_in_memory_database() {
1822        let db = GrafeoDB::new_in_memory();
1823        assert_eq!(db.node_count(), 0);
1824        assert_eq!(db.edge_count(), 0);
1825    }
1826
1827    #[test]
1828    fn test_database_config() {
1829        let config = Config::in_memory().with_threads(4).with_query_logging();
1830
1831        let db = GrafeoDB::with_config(config).unwrap();
1832        assert_eq!(db.config().threads, 4);
1833        assert!(db.config().query_logging);
1834    }
1835
1836    #[test]
1837    fn test_database_session() {
1838        let db = GrafeoDB::new_in_memory();
1839        let _session = db.session();
1840        // Session should be created successfully
1841    }
1842
1843    #[cfg(feature = "wal")]
1844    #[test]
1845    fn test_persistent_database_recovery() {
1846        use grafeo_common::types::Value;
1847        use tempfile::tempdir;
1848
1849        let dir = tempdir().unwrap();
1850        let db_path = dir.path().join("test_db");
1851
1852        // Create database and add some data
1853        {
1854            let db = GrafeoDB::open(&db_path).unwrap();
1855
1856            let alix = db.create_node(&["Person"]);
1857            db.set_node_property(alix, "name", Value::from("Alix"));
1858
1859            let gus = db.create_node(&["Person"]);
1860            db.set_node_property(gus, "name", Value::from("Gus"));
1861
1862            let _edge = db.create_edge(alix, gus, "KNOWS");
1863
1864            // Explicitly close to flush WAL
1865            db.close().unwrap();
1866        }
1867
1868        // Reopen and verify data was recovered
1869        {
1870            let db = GrafeoDB::open(&db_path).unwrap();
1871
1872            assert_eq!(db.node_count(), 2);
1873            assert_eq!(db.edge_count(), 1);
1874
1875            // Verify nodes exist
1876            let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1877            assert!(node0.is_some());
1878
1879            let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1880            assert!(node1.is_some());
1881        }
1882    }
1883
1884    #[cfg(feature = "wal")]
1885    #[test]
1886    fn test_wal_logging() {
1887        use tempfile::tempdir;
1888
1889        let dir = tempdir().unwrap();
1890        let db_path = dir.path().join("wal_test_db");
1891
1892        let db = GrafeoDB::open(&db_path).unwrap();
1893
1894        // Create some data
1895        let node = db.create_node(&["Test"]);
1896        db.delete_node(node);
1897
1898        // WAL should have records
1899        if let Some(wal) = db.wal() {
1900            assert!(wal.record_count() > 0);
1901        }
1902
1903        db.close().unwrap();
1904    }
1905
1906    #[cfg(feature = "wal")]
1907    #[test]
1908    fn test_wal_recovery_multiple_sessions() {
1909        // Tests that WAL recovery works correctly across multiple open/close cycles
1910        use grafeo_common::types::Value;
1911        use tempfile::tempdir;
1912
1913        let dir = tempdir().unwrap();
1914        let db_path = dir.path().join("multi_session_db");
1915
1916        // Session 1: Create initial data
1917        {
1918            let db = GrafeoDB::open(&db_path).unwrap();
1919            let alix = db.create_node(&["Person"]);
1920            db.set_node_property(alix, "name", Value::from("Alix"));
1921            db.close().unwrap();
1922        }
1923
1924        // Session 2: Add more data
1925        {
1926            let db = GrafeoDB::open(&db_path).unwrap();
1927            assert_eq!(db.node_count(), 1); // Previous data recovered
1928            let gus = db.create_node(&["Person"]);
1929            db.set_node_property(gus, "name", Value::from("Gus"));
1930            db.close().unwrap();
1931        }
1932
1933        // Session 3: Verify all data
1934        {
1935            let db = GrafeoDB::open(&db_path).unwrap();
1936            assert_eq!(db.node_count(), 2);
1937
1938            // Verify properties were recovered correctly
1939            let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
1940            assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
1941
1942            let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
1943            assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
1944        }
1945    }
1946
1947    #[cfg(feature = "wal")]
1948    #[test]
1949    fn test_database_consistency_after_mutations() {
1950        // Tests that database remains consistent after a series of create/delete operations
1951        use grafeo_common::types::Value;
1952        use tempfile::tempdir;
1953
1954        let dir = tempdir().unwrap();
1955        let db_path = dir.path().join("consistency_db");
1956
1957        {
1958            let db = GrafeoDB::open(&db_path).unwrap();
1959
1960            // Create nodes
1961            let a = db.create_node(&["Node"]);
1962            let b = db.create_node(&["Node"]);
1963            let c = db.create_node(&["Node"]);
1964
1965            // Create edges
1966            let e1 = db.create_edge(a, b, "LINKS");
1967            let _e2 = db.create_edge(b, c, "LINKS");
1968
1969            // Delete middle node and its edge
1970            db.delete_edge(e1);
1971            db.delete_node(b);
1972
1973            // Set properties on remaining nodes
1974            db.set_node_property(a, "value", Value::Int64(1));
1975            db.set_node_property(c, "value", Value::Int64(3));
1976
1977            db.close().unwrap();
1978        }
1979
1980        // Reopen and verify consistency
1981        {
1982            let db = GrafeoDB::open(&db_path).unwrap();
1983
1984            // Should have 2 nodes (a and c), b was deleted
1985            // Note: node_count includes deleted nodes in some implementations
1986            // What matters is that the non-deleted nodes are accessible
1987            let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
1988            assert!(node_a.is_some());
1989
1990            let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
1991            assert!(node_c.is_some());
1992
1993            // Middle node should be deleted
1994            let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
1995            assert!(node_b.is_none());
1996        }
1997    }
1998
1999    #[cfg(feature = "wal")]
2000    #[test]
2001    fn test_close_is_idempotent() {
2002        // Calling close() multiple times should not cause errors
2003        use tempfile::tempdir;
2004
2005        let dir = tempdir().unwrap();
2006        let db_path = dir.path().join("close_test_db");
2007
2008        let db = GrafeoDB::open(&db_path).unwrap();
2009        db.create_node(&["Test"]);
2010
2011        // First close should succeed
2012        assert!(db.close().is_ok());
2013
2014        // Second close should also succeed (idempotent)
2015        assert!(db.close().is_ok());
2016    }
2017
2018    #[test]
2019    fn test_with_store_external_backend() {
2020        use grafeo_core::graph::lpg::LpgStore;
2021
2022        let external = Arc::new(LpgStore::new().unwrap());
2023
2024        // Seed data on the external store directly
2025        let n1 = external.create_node(&["Person"]);
2026        external.set_node_property(n1, "name", grafeo_common::types::Value::from("Alix"));
2027
2028        let db = GrafeoDB::with_store(
2029            Arc::clone(&external) as Arc<dyn GraphStoreMut>,
2030            Config::in_memory(),
2031        )
2032        .unwrap();
2033
2034        let session = db.session();
2035
2036        // Session should see data from the external store via execute
2037        #[cfg(feature = "gql")]
2038        {
2039            let result = session.execute("MATCH (p:Person) RETURN p.name").unwrap();
2040            assert_eq!(result.rows.len(), 1);
2041        }
2042    }
2043
2044    #[test]
2045    fn test_with_config_custom_memory_limit() {
2046        let config = Config::in_memory().with_memory_limit(64 * 1024 * 1024); // 64 MB
2047
2048        let db = GrafeoDB::with_config(config).unwrap();
2049        assert_eq!(db.config().memory_limit, Some(64 * 1024 * 1024));
2050        assert_eq!(db.node_count(), 0);
2051    }
2052
2053    #[cfg(feature = "metrics")]
2054    #[test]
2055    fn test_database_metrics_registry() {
2056        let db = GrafeoDB::new_in_memory();
2057
2058        // Perform some operations
2059        db.create_node(&["Person"]);
2060        db.create_node(&["Person"]);
2061
2062        // Check that metrics snapshot returns data
2063        let snap = db.metrics();
2064        // Session created counter should reflect at least 0 (metrics is initialized)
2065        assert_eq!(snap.query_count, 0); // No queries executed yet
2066    }
2067
2068    #[test]
2069    fn test_query_result_has_metrics() {
2070        // Verifies that query results include execution metrics
2071        let db = GrafeoDB::new_in_memory();
2072        db.create_node(&["Person"]);
2073        db.create_node(&["Person"]);
2074
2075        #[cfg(feature = "gql")]
2076        {
2077            let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
2078
2079            // Metrics should be populated
2080            assert!(result.execution_time_ms.is_some());
2081            assert!(result.rows_scanned.is_some());
2082            assert!(result.execution_time_ms.unwrap() >= 0.0);
2083            assert_eq!(result.rows_scanned.unwrap(), 2);
2084        }
2085    }
2086
2087    #[test]
2088    fn test_empty_query_result_metrics() {
2089        // Verifies metrics are correct for queries returning no results
2090        let db = GrafeoDB::new_in_memory();
2091        db.create_node(&["Person"]);
2092
2093        #[cfg(feature = "gql")]
2094        {
2095            // Query that matches nothing
2096            let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
2097
2098            assert!(result.execution_time_ms.is_some());
2099            assert!(result.rows_scanned.is_some());
2100            assert_eq!(result.rows_scanned.unwrap(), 0);
2101        }
2102    }
2103
2104    #[cfg(feature = "cdc")]
2105    mod cdc_integration {
2106        use super::*;
2107
2108        /// Helper: creates an in-memory database with CDC enabled.
2109        fn cdc_db() -> GrafeoDB {
2110            GrafeoDB::with_config(Config::in_memory().with_cdc()).unwrap()
2111        }
2112
2113        #[test]
2114        fn test_node_lifecycle_history() {
2115            let db = cdc_db();
2116
2117            // Create
2118            let id = db.create_node(&["Person"]);
2119            // Update
2120            db.set_node_property(id, "name", "Alix".into());
2121            db.set_node_property(id, "name", "Gus".into());
2122            // Delete
2123            db.delete_node(id);
2124
2125            let history = db.history(id).unwrap();
2126            assert_eq!(history.len(), 4); // create + 2 updates + delete
2127            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2128            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
2129            assert!(history[1].before.is_none()); // first set_node_property has no prior value
2130            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
2131            assert!(history[2].before.is_some()); // second update has prior "Alix"
2132            assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
2133        }
2134
2135        #[test]
2136        fn test_edge_lifecycle_history() {
2137            let db = cdc_db();
2138
2139            let alix = db.create_node(&["Person"]);
2140            let gus = db.create_node(&["Person"]);
2141            let edge = db.create_edge(alix, gus, "KNOWS");
2142            db.set_edge_property(edge, "since", 2024i64.into());
2143            db.delete_edge(edge);
2144
2145            let history = db.history(edge).unwrap();
2146            assert_eq!(history.len(), 3); // create + update + delete
2147            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2148            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
2149            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
2150        }
2151
2152        #[test]
2153        fn test_create_node_with_props_cdc() {
2154            let db = cdc_db();
2155
2156            let id = db.create_node_with_props(
2157                &["Person"],
2158                vec![
2159                    ("name", grafeo_common::types::Value::from("Alix")),
2160                    ("age", grafeo_common::types::Value::from(30i64)),
2161                ],
2162            );
2163
2164            let history = db.history(id).unwrap();
2165            assert_eq!(history.len(), 1);
2166            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2167            // Props should be captured
2168            let after = history[0].after.as_ref().unwrap();
2169            assert_eq!(after.len(), 2);
2170        }
2171
2172        #[test]
2173        fn test_changes_between() {
2174            let db = cdc_db();
2175
2176            let id1 = db.create_node(&["A"]);
2177            let _id2 = db.create_node(&["B"]);
2178            db.set_node_property(id1, "x", 1i64.into());
2179
2180            // All events should be at the same epoch (in-memory, epoch doesn't advance without tx)
2181            let changes = db
2182                .changes_between(
2183                    grafeo_common::types::EpochId(0),
2184                    grafeo_common::types::EpochId(u64::MAX),
2185                )
2186                .unwrap();
2187            assert_eq!(changes.len(), 3); // 2 creates + 1 update
2188        }
2189
2190        #[test]
2191        fn test_cdc_disabled_by_default() {
2192            let db = GrafeoDB::new_in_memory();
2193            assert!(!db.is_cdc_enabled());
2194
2195            let id = db.create_node(&["Person"]);
2196            db.set_node_property(id, "name", "Alix".into());
2197
2198            let history = db.history(id).unwrap();
2199            assert!(history.is_empty(), "CDC off by default: no events recorded");
2200        }
2201
2202        #[test]
2203        fn test_session_with_cdc_override_on() {
2204            // Database default is off, but session opts in
2205            let db = GrafeoDB::new_in_memory();
2206            let session = db.session_with_cdc(true);
2207            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
2208            // The CDC log should have events from the opted-in session
2209            let changes = db
2210                .changes_between(
2211                    grafeo_common::types::EpochId(0),
2212                    grafeo_common::types::EpochId(u64::MAX),
2213                )
2214                .unwrap();
2215            assert!(
2216                !changes.is_empty(),
2217                "session_with_cdc(true) should record events"
2218            );
2219        }
2220
2221        #[test]
2222        fn test_session_with_cdc_override_off() {
2223            // Database default is on, but session opts out
2224            let db = cdc_db();
2225            let session = db.session_with_cdc(false);
2226            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
2227            let changes = db
2228                .changes_between(
2229                    grafeo_common::types::EpochId(0),
2230                    grafeo_common::types::EpochId(u64::MAX),
2231                )
2232                .unwrap();
2233            assert!(
2234                changes.is_empty(),
2235                "session_with_cdc(false) should not record events"
2236            );
2237        }
2238
2239        #[test]
2240        fn test_set_cdc_enabled_runtime() {
2241            let db = GrafeoDB::new_in_memory();
2242            assert!(!db.is_cdc_enabled());
2243
2244            // Enable at runtime
2245            db.set_cdc_enabled(true);
2246            assert!(db.is_cdc_enabled());
2247
2248            let id = db.create_node(&["Person"]);
2249            let history = db.history(id).unwrap();
2250            assert_eq!(history.len(), 1, "CDC enabled at runtime records events");
2251
2252            // Disable again
2253            db.set_cdc_enabled(false);
2254            let id2 = db.create_node(&["Person"]);
2255            let history2 = db.history(id2).unwrap();
2256            assert!(
2257                history2.is_empty(),
2258                "CDC disabled at runtime stops recording"
2259            );
2260        }
2261    }
2262
2263    #[test]
2264    fn test_with_store_basic() {
2265        use grafeo_core::graph::lpg::LpgStore;
2266
2267        let store = Arc::new(LpgStore::new().unwrap());
2268        let n1 = store.create_node(&["Person"]);
2269        store.set_node_property(n1, "name", "Alix".into());
2270
2271        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
2272        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
2273
2274        let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
2275        assert_eq!(result.rows.len(), 1);
2276    }
2277
2278    #[test]
2279    fn test_with_store_session() {
2280        use grafeo_core::graph::lpg::LpgStore;
2281
2282        let store = Arc::new(LpgStore::new().unwrap());
2283        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
2284        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
2285
2286        let session = db.session();
2287        let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
2288        assert_eq!(result.rows.len(), 1);
2289    }
2290
2291    #[test]
2292    fn test_with_store_mutations() {
2293        use grafeo_core::graph::lpg::LpgStore;
2294
2295        let store = Arc::new(LpgStore::new().unwrap());
2296        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
2297        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
2298
2299        let mut session = db.session();
2300
2301        // Use an explicit transaction so INSERT and MATCH share the same
2302        // transaction context. With PENDING epochs, uncommitted versions are
2303        // only visible to the owning transaction.
2304        session.begin_transaction().unwrap();
2305        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
2306
2307        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
2308        assert_eq!(result.rows.len(), 1);
2309
2310        session.commit().unwrap();
2311    }
2312}