Skip to main content

grafeo_engine/database/
mod.rs

1//! The main database struct and operations.
2//!
3//! Start here with [`GrafeoDB`] - it's your handle to everything.
4//!
5//! Operations are split across focused submodules:
6//! - `query` - Query execution (execute, execute_cypher, etc.)
7//! - `crud` - Node/edge CRUD operations
8//! - `index` - Property, vector, and text index management
9//! - `search` - Vector, text, and hybrid search
10//! - `embed` - Embedding model management
11//! - `persistence` - Save, load, snapshots, iteration
12//! - `admin` - Stats, introspection, diagnostics, CDC
13
14#[cfg(feature = "lpg")]
15mod admin;
16#[cfg(feature = "arrow-export")]
17pub mod arrow;
18#[cfg(all(feature = "async-storage", feature = "lpg"))]
19mod async_ops;
20#[cfg(all(feature = "async-storage", feature = "lpg"))]
21pub(crate) mod async_wal_store;
22#[cfg(all(feature = "wal", feature = "grafeo-file"))]
23pub mod backup;
24#[cfg(feature = "lpg")]
25pub(crate) mod catalog_section;
26#[cfg(feature = "cdc")]
27pub(crate) mod cdc_store;
28#[cfg(all(feature = "grafeo-file", feature = "lpg"))]
29mod checkpoint_timer;
30#[cfg(all(feature = "compact-store", feature = "mmap"))]
31pub mod compact_tiered;
32#[cfg(feature = "lpg")]
33mod crud;
34#[cfg(feature = "embed")]
35mod embed;
36#[cfg(feature = "grafeo-file")]
37pub(crate) mod flush;
38#[cfg(feature = "lpg")]
39mod import;
40#[cfg(feature = "lpg")]
41mod index;
42#[cfg(feature = "lpg")]
43mod persistence;
44mod query;
45#[cfg(feature = "triple-store")]
46mod rdf_ops;
47#[cfg(feature = "lpg")]
48mod search;
49pub(crate) mod section_consumer;
50#[cfg(all(feature = "wal", feature = "lpg"))]
51pub(crate) mod wal_store;
52
53use grafeo_common::{grafeo_error, grafeo_warn};
54#[cfg(feature = "wal")]
55use std::path::Path;
56use std::sync::Arc;
57use std::sync::atomic::AtomicUsize;
58
59use parking_lot::RwLock;
60
61use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
62use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind, Result};
63#[cfg(feature = "lpg")]
64use grafeo_core::graph::lpg::LpgStore;
65#[cfg(feature = "triple-store")]
66use grafeo_core::graph::rdf::RdfStore;
67use grafeo_core::graph::{GraphStoreMut, GraphStoreSearch};
68#[cfg(feature = "grafeo-file")]
69use grafeo_storage::file::GrafeoFileManager;
70#[cfg(all(feature = "wal", feature = "lpg"))]
71use grafeo_storage::wal::WalRecovery;
72#[cfg(feature = "wal")]
73use grafeo_storage::wal::{DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord};
74
75use crate::catalog::Catalog;
76use crate::config::Config;
77use crate::query::cache::QueryCache;
78use crate::session::Session;
79use crate::transaction::TransactionManager;
80
81/// Your handle to a Grafeo database.
82///
83/// Start here. Create one with [`new_in_memory()`](Self::new_in_memory) for
84/// quick experiments, or [`open()`](Self::open) for persistent storage.
85/// Then grab a [`session()`](Self::session) to start querying.
86///
87/// # Examples
88///
89/// ```
90/// use grafeo_engine::GrafeoDB;
91///
92/// // Quick in-memory database
93/// let db = GrafeoDB::new_in_memory();
94///
95/// // Add some data
96/// db.create_node(&["Person"]);
97///
98/// // Query it
99/// let session = db.session();
100/// let result = session.execute("MATCH (p:Person) RETURN p")?;
101/// # Ok::<(), grafeo_common::utils::error::Error>(())
102/// ```
103pub struct GrafeoDB {
104    /// Database configuration.
105    pub(super) config: Config,
106    /// The underlying graph store (None when using an external store).
107    #[cfg(feature = "lpg")]
108    pub(super) store: Option<Arc<LpgStore>>,
109    /// Schema and metadata catalog shared across sessions.
110    pub(super) catalog: Arc<Catalog>,
111    /// RDF triple store (if RDF feature is enabled).
112    #[cfg(feature = "triple-store")]
113    pub(super) rdf_store: Arc<RdfStore>,
114    /// Transaction manager.
115    pub(super) transaction_manager: Arc<TransactionManager>,
116    /// Unified buffer manager.
117    pub(super) buffer_manager: Arc<BufferManager>,
118    /// Write-ahead log manager (if durability is enabled).
119    #[cfg(feature = "wal")]
120    pub(super) wal: Option<Arc<LpgWal>>,
121    /// Shared WAL graph context tracker. Tracks which named graph was last
122    /// written to the WAL, so concurrent sessions can emit `SwitchGraph`
123    /// records only when the context actually changes.
124    #[cfg(feature = "wal")]
125    pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
126    /// Query cache for parsed and optimized plans.
127    pub(super) query_cache: Arc<QueryCache>,
128    /// Shared commit counter for auto-GC across sessions.
129    pub(super) commit_counter: Arc<AtomicUsize>,
130    /// Whether the database is open.
131    pub(super) is_open: RwLock<bool>,
132    /// Change data capture log for tracking mutations.
133    #[cfg(feature = "cdc")]
134    pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
135    /// Whether CDC is active for new sessions and direct CRUD (runtime-mutable).
136    #[cfg(feature = "cdc")]
137    cdc_enabled: std::sync::atomic::AtomicBool,
138    /// Registered embedding models for text-to-vector conversion.
139    #[cfg(feature = "embed")]
140    pub(super) embedding_models:
141        RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
142    /// Single-file database manager (when using `.grafeo` format).
143    #[cfg(feature = "grafeo-file")]
144    pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
145    /// Periodic checkpoint timer (when `checkpoint_interval` is configured).
146    /// Wrapped in Mutex because `close()` takes `&self` but needs to stop the timer.
147    #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
148    checkpoint_timer: parking_lot::Mutex<Option<checkpoint_timer::CheckpointTimer>>,
149    /// Shared registry of spilled vector storages.
150    /// Used by the search path to create `SpillableVectorAccessor` instances.
151    #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
152    vector_spill_storages: Option<
153        Arc<
154            parking_lot::RwLock<
155                std::collections::HashMap<String, Arc<grafeo_core::index::vector::MmapStorage>>,
156            >,
157        >,
158    >,
159    /// External read-only graph store (when using with_store() or with_read_store()).
160    /// When set, sessions route queries through this store instead of the built-in LpgStore.
161    pub(super) external_read_store: Option<Arc<dyn GraphStoreSearch>>,
162    /// External writable graph store (when using with_store()).
163    /// None for read-only databases created via with_read_store().
164    pub(super) external_write_store: Option<Arc<dyn GraphStoreMut>>,
165    /// Metrics registry shared across all sessions.
166    #[cfg(feature = "metrics")]
167    pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
168    /// Persistent graph context for one-shot `execute()` calls.
169    /// When set, each call to `session()` pre-configures the session to this graph.
170    /// Updated after every one-shot `execute()` to reflect `USE GRAPH` / `SESSION RESET`.
171    current_graph: RwLock<Option<String>>,
172    /// Persistent schema context for one-shot `execute()` calls.
173    /// When set, each call to `session()` pre-configures the session to this schema.
174    /// Updated after every one-shot `execute()` to reflect `SESSION SET SCHEMA` / `SESSION RESET`.
175    current_schema: RwLock<Option<String>>,
176    /// Whether this database is open in read-only mode.
177    /// When true, sessions automatically enforce read-only transactions.
178    read_only: bool,
179    /// Named graph projections (virtual subgraphs), shared with sessions.
180    projections:
181        Arc<RwLock<std::collections::HashMap<String, Arc<grafeo_core::graph::GraphProjection>>>>,
182    /// Layered store (compact base + mutable overlay), set after `compact()`.
183    #[cfg(all(feature = "compact-store", feature = "lpg"))]
184    layered_store: Option<Arc<grafeo_core::graph::compact::layered::LayeredStore>>,
185    /// Disk-backed tier wrapper for the compact base, set after `compact()`.
186    ///
187    /// Provides the spill path for [`CompactStoreConsumer`]: when the buffer
188    /// manager signals memory pressure, the consumer calls
189    /// `persist_to_mmap()` here and then publishes the fresh base to
190    /// `layered_store` via `swap_base()`.
191    #[cfg(all(feature = "compact-store", feature = "mmap", feature = "lpg"))]
192    compact_tiered: Option<Arc<compact_tiered::CompactStoreTiered>>,
193}
194
195impl GrafeoDB {
196    /// Returns a reference to the built-in LPG store.
197    ///
198    /// # Panics
199    ///
200    /// Panics if the database was created with [`with_store()`](Self::with_store) or
201    /// [`with_read_store()`](Self::with_read_store), which use an external store
202    /// instead of the built-in LPG store.
203    #[cfg(feature = "lpg")]
204    fn lpg_store(&self) -> &Arc<LpgStore> {
205        self.store.as_ref().expect(
206            "no built-in LpgStore: this GrafeoDB was created with an external store \
207             (with_store / with_read_store). Use session() or graph_store() instead.",
208        )
209    }
210
211    /// Returns a borrowed reference to the active graph store.
212    ///
213    /// In layered mode (after [`compact()`](Self::compact)), returns the
214    /// `LayeredStore` which merges the columnar base with the overlay.
215    /// Otherwise, returns the built-in `LpgStore`.
216    ///
217    /// Unlike [`graph_store()`](Self::graph_store) (which clones an `Arc`),
218    /// this borrows from `self` — suitable for constructing accessors that
219    /// need `&'a dyn GraphStore` tied to the database lifetime.
220    #[cfg(any(
221        feature = "vector-index",
222        feature = "text-index",
223        feature = "hybrid-search",
224        feature = "embed",
225    ))]
226    fn graph_store_ref(&self) -> &dyn grafeo_core::graph::GraphStore {
227        if let Some(ref ext_read) = self.external_read_store {
228            ext_read.as_ref()
229        } else {
230            #[cfg(feature = "lpg")]
231            {
232                &**self.lpg_store()
233            }
234            #[cfg(not(feature = "lpg"))]
235            unreachable!("no graph store available: enable the `lpg` feature or use with_store()")
236        }
237    }
238
239    /// Returns whether CDC is active (runtime check).
240    #[cfg(feature = "cdc")]
241    #[inline]
242    pub(super) fn cdc_active(&self) -> bool {
243        self.cdc_enabled.load(std::sync::atomic::Ordering::Relaxed)
244    }
245
246    /// Creates an in-memory database, fast to create, gone when dropped.
247    ///
248    /// Use this for tests, experiments, or when you don't need persistence.
249    /// For data that survives restarts, use [`open()`](Self::open) instead.
250    ///
251    /// # Panics
252    ///
253    /// Panics if the internal arena allocator cannot be initialized (out of memory).
254    /// Use [`with_config()`](Self::with_config) for a fallible alternative.
255    ///
256    /// # Examples
257    ///
258    /// ```
259    /// use grafeo_engine::GrafeoDB;
260    ///
261    /// let db = GrafeoDB::new_in_memory();
262    /// let session = db.session();
263    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
264    /// # Ok::<(), grafeo_common::utils::error::Error>(())
265    /// ```
266    #[must_use]
267    pub fn new_in_memory() -> Self {
268        Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
269    }
270
271    /// Opens a database at the given path, creating it if it doesn't exist.
272    ///
273    /// If you've used this path before, Grafeo recovers your data from the
274    /// write-ahead log automatically. First open on a new path creates an
275    /// empty database.
276    ///
277    /// # Errors
278    ///
279    /// Returns an error if the path isn't writable or recovery fails.
280    ///
281    /// # Examples
282    ///
283    /// ```no_run
284    /// use grafeo_engine::GrafeoDB;
285    ///
286    /// let db = GrafeoDB::open("./my_social_network")?;
287    /// # Ok::<(), grafeo_common::utils::error::Error>(())
288    /// ```
289    #[cfg(feature = "wal")]
290    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
291        Self::with_config(Config::persistent(path.as_ref()))
292    }
293
294    /// Opens an existing database in read-only mode.
295    ///
296    /// Uses a shared file lock, so multiple processes can read the same
297    /// `.grafeo` file concurrently. The database loads the last checkpoint
298    /// snapshot but does **not** replay the WAL or allow mutations.
299    ///
300    /// Currently only supports the single-file (`.grafeo`) format.
301    ///
302    /// # Errors
303    ///
304    /// Returns an error if the file doesn't exist or can't be read.
305    ///
306    /// # Examples
307    ///
308    /// ```no_run
309    /// use grafeo_engine::GrafeoDB;
310    ///
311    /// let db = GrafeoDB::open_read_only("./my_graph.grafeo")?;
312    /// let session = db.session();
313    /// let result = session.execute("MATCH (n) RETURN n LIMIT 10")?;
314    /// // Mutations will return an error:
315    /// // session.execute("INSERT (:Person)") => Err(ReadOnly)
316    /// # Ok::<(), grafeo_common::utils::error::Error>(())
317    /// ```
318    #[cfg(feature = "grafeo-file")]
319    pub fn open_read_only(path: impl AsRef<std::path::Path>) -> Result<Self> {
320        Self::with_config(Config::read_only(path.as_ref()))
321    }
322
323    /// Creates a database with custom configuration.
324    ///
325    /// Use this when you need fine-grained control over memory limits,
326    /// thread counts, or persistence settings. For most cases,
327    /// [`new_in_memory()`](Self::new_in_memory) or [`open()`](Self::open)
328    /// are simpler.
329    ///
330    /// # Errors
331    ///
332    /// Returns an error if the database can't be created or recovery fails.
333    ///
334    /// # Examples
335    ///
336    /// ```
337    /// use grafeo_engine::{GrafeoDB, Config};
338    ///
339    /// // In-memory with a 512MB limit
340    /// let config = Config::in_memory()
341    ///     .with_memory_limit(512 * 1024 * 1024);
342    ///
343    /// let db = GrafeoDB::with_config(config)?;
344    /// # Ok::<(), grafeo_common::utils::error::Error>(())
345    /// ```
346    pub fn with_config(config: Config) -> Result<Self> {
347        // Validate configuration before proceeding
348        config
349            .validate()
350            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
351
352        #[cfg(feature = "lpg")]
353        let store = Arc::new(LpgStore::new()?);
354        #[cfg(feature = "triple-store")]
355        let rdf_store = Arc::new(RdfStore::new());
356        let transaction_manager = Arc::new(TransactionManager::new());
357
358        // Create buffer manager with configured limits
359        let buffer_config = BufferManagerConfig {
360            budget: config.memory_limit.unwrap_or_else(|| {
361                // reason: product of system RAM and 0.75 is always a valid positive usize
362                #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
363                let b = (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize;
364                b
365            }),
366            spill_path: config.spill_path.clone().or_else(|| {
367                config.path.as_ref().and_then(|p| {
368                    let parent = p.parent()?;
369                    let name = p.file_name()?.to_str()?;
370                    Some(parent.join(format!("{name}.spill")))
371                })
372            }),
373            ..BufferManagerConfig::default()
374        };
375        let buffer_manager = BufferManager::new(buffer_config);
376
377        // Create catalog early so WAL replay can restore schema definitions
378        let catalog = Arc::new(Catalog::new());
379
380        let is_read_only = config.access_mode == crate::config::AccessMode::ReadOnly;
381
382        // --- Single-file format (.grafeo) ---
383        #[cfg(feature = "grafeo-file")]
384        let file_manager: Option<Arc<GrafeoFileManager>> = if is_read_only {
385            // Read-only mode: open with shared lock, load snapshot, skip WAL
386            if let Some(ref db_path) = config.path {
387                if db_path.exists() && db_path.is_file() {
388                    let fm = GrafeoFileManager::open_read_only(db_path)?;
389                    // Try v2 section-based format first
390                    #[cfg(feature = "lpg")]
391                    if fm.read_section_directory()?.is_some() {
392                        Self::load_from_sections(
393                            &fm,
394                            &store,
395                            &catalog,
396                            #[cfg(feature = "triple-store")]
397                            &rdf_store,
398                        )?;
399                    } else {
400                        // Fall back to v1 blob format
401                        let snapshot_data = fm.read_snapshot()?;
402                        if !snapshot_data.is_empty() {
403                            Self::apply_snapshot_data(
404                                &store,
405                                &catalog,
406                                #[cfg(feature = "triple-store")]
407                                &rdf_store,
408                                &snapshot_data,
409                            )?;
410                        }
411                    }
412                    Some(Arc::new(fm))
413                } else {
414                    return Err(grafeo_common::utils::error::Error::Internal(format!(
415                        "read-only open requires an existing .grafeo file: {}",
416                        db_path.display()
417                    )));
418                }
419            } else {
420                return Err(grafeo_common::utils::error::Error::Internal(
421                    "read-only mode requires a database path".to_string(),
422                ));
423            }
424        } else if let Some(ref db_path) = config.path {
425            // Initialize the file manager whenever single-file format is selected,
426            // regardless of whether WAL is enabled. Without this, a database opened
427            // with wal_enabled:false + StorageFormat::SingleFile would produce no
428            // output at all (the file manager was previously gated behind wal_enabled).
429            if Self::should_use_single_file(db_path, config.storage_format) {
430                let fm = if db_path.exists() && db_path.is_file() {
431                    GrafeoFileManager::open(db_path)?
432                } else if !db_path.exists() {
433                    GrafeoFileManager::create(db_path)?
434                } else {
435                    // Path exists but is not a file (directory, etc.)
436                    return Err(grafeo_common::utils::error::Error::Internal(format!(
437                        "path exists but is not a file: {}",
438                        db_path.display()
439                    )));
440                };
441
442                // Load data: try v2 section-based format, fall back to v1 blob
443                #[cfg(feature = "lpg")]
444                if fm.read_section_directory()?.is_some() {
445                    Self::load_from_sections(
446                        &fm,
447                        &store,
448                        &catalog,
449                        #[cfg(feature = "triple-store")]
450                        &rdf_store,
451                    )?;
452                } else {
453                    let snapshot_data = fm.read_snapshot()?;
454                    if !snapshot_data.is_empty() {
455                        Self::apply_snapshot_data(
456                            &store,
457                            &catalog,
458                            #[cfg(feature = "triple-store")]
459                            &rdf_store,
460                            &snapshot_data,
461                        )?;
462                    }
463                }
464
465                // Recover sidecar WAL if WAL is enabled and a sidecar exists
466                #[cfg(all(feature = "wal", feature = "lpg"))]
467                if config.wal_enabled && fm.has_sidecar_wal() {
468                    let recovery = WalRecovery::new(fm.sidecar_wal_path());
469                    let records = recovery.recover()?;
470                    Self::apply_wal_records(
471                        &store,
472                        &catalog,
473                        #[cfg(feature = "triple-store")]
474                        &rdf_store,
475                        &records,
476                    )?;
477                }
478
479                Some(Arc::new(fm))
480            } else {
481                None
482            }
483        } else {
484            None
485        };
486
487        // Determine whether to use the WAL directory path (legacy) or sidecar
488        // Read-only mode skips WAL entirely (no recovery, no creation).
489        #[cfg(feature = "wal")]
490        let wal = if is_read_only {
491            None
492        } else if config.wal_enabled {
493            if let Some(ref db_path) = config.path {
494                // When using single-file format, the WAL is a sidecar directory
495                #[cfg(feature = "grafeo-file")]
496                let wal_path = if let Some(ref fm) = file_manager {
497                    let p = fm.sidecar_wal_path();
498                    std::fs::create_dir_all(&p)?;
499                    p
500                } else {
501                    // Legacy: WAL inside the database directory
502                    std::fs::create_dir_all(db_path)?;
503                    db_path.join("wal")
504                };
505
506                #[cfg(not(feature = "grafeo-file"))]
507                let wal_path = {
508                    std::fs::create_dir_all(db_path)?;
509                    db_path.join("wal")
510                };
511
512                // For legacy WAL directory format, check if WAL exists and recover
513                #[cfg(all(feature = "lpg", feature = "grafeo-file"))]
514                let is_single_file = file_manager.is_some();
515                #[cfg(all(feature = "lpg", not(feature = "grafeo-file")))]
516                let is_single_file = false;
517
518                #[cfg(feature = "lpg")]
519                if !is_single_file && wal_path.exists() {
520                    let recovery = WalRecovery::new(&wal_path);
521                    let records = recovery.recover()?;
522                    Self::apply_wal_records(
523                        &store,
524                        &catalog,
525                        #[cfg(feature = "triple-store")]
526                        &rdf_store,
527                        &records,
528                    )?;
529                }
530
531                // Open/create WAL manager with configured durability
532                let wal_durability = match config.wal_durability {
533                    crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
534                    crate::config::DurabilityMode::Batch {
535                        max_delay_ms,
536                        max_records,
537                    } => WalDurabilityMode::Batch {
538                        max_delay_ms,
539                        max_records,
540                    },
541                    crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
542                        WalDurabilityMode::Adaptive { target_interval_ms }
543                    }
544                    crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
545                };
546                let wal_config = WalConfig {
547                    durability: wal_durability,
548                    ..WalConfig::default()
549                };
550                let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
551                Some(Arc::new(wal_manager))
552            } else {
553                None
554            }
555        } else {
556            None
557        };
558
559        // Create query cache with default capacity (1000 queries)
560        let query_cache = Arc::new(QueryCache::default());
561
562        // After all snapshot/WAL recovery, sync TransactionManager epoch
563        // with the store so queries use the correct viewing epoch.
564        #[cfg(all(feature = "temporal", feature = "lpg"))]
565        transaction_manager.sync_epoch(store.current_epoch());
566
567        #[cfg(feature = "cdc")]
568        let cdc_enabled_val = config.cdc_enabled;
569        #[cfg(feature = "cdc")]
570        let cdc_retention = config.cdc_retention.clone();
571
572        // Clone Arcs for the checkpoint timer before moving originals into the struct.
573        // The timer captures its own references and runs in a background thread.
574        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
575        let checkpoint_interval = config.checkpoint_interval;
576        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
577        let timer_store = Arc::clone(&store);
578        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
579        let timer_catalog = Arc::clone(&catalog);
580        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
581        let timer_tm = Arc::clone(&transaction_manager);
582        #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "triple-store"))]
583        let timer_rdf = Arc::clone(&rdf_store);
584        #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "wal"))]
585        let timer_wal = wal.clone();
586
587        let mut db = Self {
588            config,
589            #[cfg(feature = "lpg")]
590            store: Some(store),
591            catalog,
592            #[cfg(feature = "triple-store")]
593            rdf_store,
594            transaction_manager,
595            buffer_manager,
596            #[cfg(feature = "wal")]
597            wal,
598            #[cfg(feature = "wal")]
599            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
600            query_cache,
601            commit_counter: Arc::new(AtomicUsize::new(0)),
602            is_open: RwLock::new(true),
603            #[cfg(feature = "cdc")]
604            cdc_log: Arc::new(crate::cdc::CdcLog::with_retention(cdc_retention)),
605            #[cfg(feature = "cdc")]
606            cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
607            #[cfg(feature = "embed")]
608            embedding_models: RwLock::new(hashbrown::HashMap::new()),
609            #[cfg(feature = "grafeo-file")]
610            file_manager,
611            #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
612            checkpoint_timer: parking_lot::Mutex::new(None),
613            #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
614            vector_spill_storages: None,
615            external_read_store: None,
616            external_write_store: None,
617            #[cfg(feature = "metrics")]
618            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
619            current_graph: RwLock::new(None),
620            current_schema: RwLock::new(None),
621            read_only: is_read_only,
622            projections: Arc::new(RwLock::new(std::collections::HashMap::new())),
623            #[cfg(all(feature = "compact-store", feature = "lpg"))]
624            layered_store: None,
625            #[cfg(all(feature = "compact-store", feature = "mmap", feature = "lpg"))]
626            compact_tiered: None,
627        };
628
629        // Register storage sections as memory consumers for pressure tracking
630        db.register_section_consumers();
631
632        // Start periodic checkpoint timer if configured
633        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
634        if let (Some(interval), Some(fm)) = (checkpoint_interval, &db.file_manager)
635            && !is_read_only
636        {
637            *db.checkpoint_timer.lock() = Some(checkpoint_timer::CheckpointTimer::start(
638                interval,
639                Arc::clone(fm),
640                timer_store,
641                timer_catalog,
642                timer_tm,
643                #[cfg(feature = "triple-store")]
644                timer_rdf,
645                #[cfg(feature = "wal")]
646                timer_wal,
647            ));
648        }
649
650        // Discover existing spill files from a previous session.
651        // If vectors were spilled before close, the spill files persist on disk
652        // and need to be re-mapped so search can read from them.
653        #[cfg(all(
654            feature = "lpg",
655            feature = "vector-index",
656            feature = "mmap",
657            not(feature = "temporal")
658        ))]
659        db.restore_spill_files();
660
661        // If VectorStore is configured as ForceDisk, immediately spill embeddings.
662        // This must happen after register_section_consumers() which creates the consumer.
663        #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
664        if db
665            .config
666            .section_configs
667            .get(&grafeo_common::storage::SectionType::VectorStore)
668            .is_some_and(|c| c.tier == grafeo_common::storage::TierOverride::ForceDisk)
669        {
670            db.buffer_manager.spill_all();
671        }
672
673        Ok(db)
674    }
675
676    /// Creates a database backed by a custom [`GraphStoreMut`] implementation.
677    ///
678    /// The external store handles all data persistence. WAL, CDC, and index
679    /// management are the responsibility of the store implementation.
680    ///
681    /// Query execution (all 6 languages, optimizer, planner) works through the
682    /// provided store. Admin operations (schema introspection, persistence,
683    /// vector/text indexes) are not available on external stores.
684    ///
685    /// # Examples
686    ///
687    /// ```no_run
688    /// use std::sync::Arc;
689    /// use grafeo_engine::{GrafeoDB, Config};
690    /// use grafeo_core::graph::GraphStoreMut;
691    ///
692    /// fn example(store: Arc<dyn GraphStoreMut>) -> grafeo_common::utils::error::Result<()> {
693    ///     let db = GrafeoDB::with_store(store, Config::in_memory())?;
694    ///     let result = db.execute("MATCH (n) RETURN count(n)")?;
695    ///     Ok(())
696    /// }
697    /// ```
698    ///
699    /// # Errors
700    ///
701    /// Returns an error if config validation fails.
702    ///
703    /// [`GraphStoreMut`]: grafeo_core::graph::GraphStoreMut
704    pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
705        config
706            .validate()
707            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
708
709        let transaction_manager = Arc::new(TransactionManager::new());
710
711        let buffer_config = BufferManagerConfig {
712            budget: config.memory_limit.unwrap_or_else(|| {
713                // reason: product of system RAM and 0.75 is always a valid positive usize
714                #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
715                let b = (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize;
716                b
717            }),
718            spill_path: None,
719            ..BufferManagerConfig::default()
720        };
721        let buffer_manager = BufferManager::new(buffer_config);
722
723        let query_cache = Arc::new(QueryCache::default());
724
725        #[cfg(feature = "cdc")]
726        let cdc_enabled_val = config.cdc_enabled;
727
728        Ok(Self {
729            config,
730            #[cfg(feature = "lpg")]
731            store: None,
732            catalog: Arc::new(Catalog::new()),
733            #[cfg(feature = "triple-store")]
734            rdf_store: Arc::new(RdfStore::new()),
735            transaction_manager,
736            buffer_manager,
737            #[cfg(feature = "wal")]
738            wal: None,
739            #[cfg(feature = "wal")]
740            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
741            query_cache,
742            commit_counter: Arc::new(AtomicUsize::new(0)),
743            is_open: RwLock::new(true),
744            #[cfg(feature = "cdc")]
745            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
746            #[cfg(feature = "cdc")]
747            cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
748            #[cfg(feature = "embed")]
749            embedding_models: RwLock::new(hashbrown::HashMap::new()),
750            #[cfg(feature = "grafeo-file")]
751            file_manager: None,
752            #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
753            checkpoint_timer: parking_lot::Mutex::new(None),
754            #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
755            vector_spill_storages: None,
756            external_read_store: Some(Arc::clone(&store) as Arc<dyn GraphStoreSearch>),
757            external_write_store: Some(store),
758            #[cfg(feature = "metrics")]
759            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
760            current_graph: RwLock::new(None),
761            current_schema: RwLock::new(None),
762            read_only: false,
763            projections: Arc::new(RwLock::new(std::collections::HashMap::new())),
764            #[cfg(all(feature = "compact-store", feature = "lpg"))]
765            layered_store: None,
766            #[cfg(all(feature = "compact-store", feature = "mmap", feature = "lpg"))]
767            compact_tiered: None,
768        })
769    }
770
771    /// Creates a database backed by a read-only [`GraphStore`].
772    ///
773    /// The database is set to read-only mode. Write queries (CREATE, SET,
774    /// DELETE) will return `TransactionError::ReadOnly`.
775    ///
776    /// # Examples
777    ///
778    /// ```no_run
779    /// use std::sync::Arc;
780    /// use grafeo_engine::{GrafeoDB, Config};
781    /// use grafeo_core::graph::GraphStoreSearch;
782    ///
783    /// fn example(store: Arc<dyn GraphStoreSearch>) -> grafeo_common::utils::error::Result<()> {
784    ///     let db = GrafeoDB::with_read_store(store, Config::in_memory())?;
785    ///     let result = db.execute("MATCH (n) RETURN count(n)")?;
786    ///     Ok(())
787    /// }
788    /// ```
789    ///
790    /// # Errors
791    ///
792    /// Returns an error if config validation fails.
793    ///
794    /// [`GraphStore`]: grafeo_core::graph::GraphStore
795    pub fn with_read_store(store: Arc<dyn GraphStoreSearch>, config: Config) -> Result<Self> {
796        config
797            .validate()
798            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
799
800        let transaction_manager = Arc::new(TransactionManager::new());
801
802        let buffer_config = BufferManagerConfig {
803            budget: config.memory_limit.unwrap_or_else(|| {
804                // reason: product of system RAM and 0.75 is always a valid positive usize
805                #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
806                let b = (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize;
807                b
808            }),
809            spill_path: None,
810            ..BufferManagerConfig::default()
811        };
812        let buffer_manager = BufferManager::new(buffer_config);
813
814        let query_cache = Arc::new(QueryCache::default());
815
816        #[cfg(feature = "cdc")]
817        let cdc_enabled_val = config.cdc_enabled;
818
819        Ok(Self {
820            config,
821            #[cfg(feature = "lpg")]
822            store: None,
823            catalog: Arc::new(Catalog::new()),
824            #[cfg(feature = "triple-store")]
825            rdf_store: Arc::new(RdfStore::new()),
826            transaction_manager,
827            buffer_manager,
828            #[cfg(feature = "wal")]
829            wal: None,
830            #[cfg(feature = "wal")]
831            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
832            query_cache,
833            commit_counter: Arc::new(AtomicUsize::new(0)),
834            is_open: RwLock::new(true),
835            #[cfg(feature = "cdc")]
836            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
837            #[cfg(feature = "cdc")]
838            cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
839            #[cfg(feature = "embed")]
840            embedding_models: RwLock::new(hashbrown::HashMap::new()),
841            #[cfg(feature = "grafeo-file")]
842            file_manager: None,
843            #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
844            checkpoint_timer: parking_lot::Mutex::new(None),
845            #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
846            vector_spill_storages: None,
847            external_read_store: Some(store),
848            external_write_store: None,
849            #[cfg(feature = "metrics")]
850            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
851            current_graph: RwLock::new(None),
852            current_schema: RwLock::new(None),
853            read_only: true,
854            projections: Arc::new(RwLock::new(std::collections::HashMap::new())),
855            #[cfg(all(feature = "compact-store", feature = "lpg"))]
856            layered_store: None,
857            #[cfg(all(feature = "compact-store", feature = "mmap", feature = "lpg"))]
858            compact_tiered: None,
859        })
860    }
861
862    /// Compacts the database into a two-layer store: columnar base + mutable overlay.
863    ///
864    /// Takes a snapshot of all nodes and edges from the current store, builds
865    /// a columnar `CompactStore` with CSR adjacency as the read-only base,
866    /// and creates a fresh `LpgStore` overlay for future mutations. The
867    /// original store is dropped to free memory.
868    ///
869    /// Unlike the pre-0.5.39 behavior, the database remains writable after
870    /// compaction: new writes go to the overlay. Call [`recompact()`](Self::recompact)
871    /// to merge the overlay back into the columnar base periodically.
872    ///
873    /// # Errors
874    ///
875    /// Returns an error if the conversion fails (e.g. more than 32,767
876    /// distinct labels or edge types).
877    ///
878    /// [`CompactStore`]: grafeo_core::graph::compact::CompactStore
879    #[cfg(all(feature = "compact-store", feature = "lpg"))]
880    pub fn compact(&mut self) -> Result<()> {
881        use grafeo_core::graph::compact::from_graph_store_preserving_ids;
882        use grafeo_core::graph::compact::layered::LayeredStore;
883
884        let current_store = self.graph_store();
885
886        // Determine max node/edge IDs for overlay seeding.
887        let max_node_id = if let Some(ref store) = self.store {
888            store.next_node_id().saturating_sub(1)
889        } else {
890            current_store.node_ids().last().map_or(0, |id| id.as_u64())
891        };
892        let max_edge_id = if let Some(ref store) = self.store {
893            store.next_edge_id().saturating_sub(1)
894        } else {
895            // Scan edges to find max ID.
896            let mut max_eid = 0u64;
897            for nid in current_store.node_ids() {
898                for (_, eid) in
899                    current_store.edges_from(nid, grafeo_core::graph::Direction::Outgoing)
900                {
901                    max_eid = max_eid.max(eid.as_u64());
902                }
903            }
904            max_eid
905        };
906
907        let compact = from_graph_store_preserving_ids(current_store.as_ref())
908            .map_err(|e| Error::Internal(e.to_string()))?;
909
910        let layered = Arc::new(
911            LayeredStore::new(compact, max_node_id, max_edge_id)
912                .map_err(|e| Error::Internal(e.to_string()))?,
913        );
914
915        // Sync the overlay's epoch with the TransactionManager so MVCC
916        // visibility works correctly for nodes created after compact().
917        let current_epoch = self.transaction_manager.current_epoch();
918        layered.overlay_store().sync_epoch(current_epoch);
919
920        // Named graphs are LPG-specific and outside the columnar base; move them
921        // from the pre-compact overlay into the new overlay so they survive
922        // compaction.
923        if let Some(ref old) = self.store {
924            layered
925                .overlay_store()
926                .install_named_graphs(old.take_named_graphs());
927        }
928
929        self.external_read_store = Some(Arc::clone(&layered) as Arc<dyn GraphStoreSearch>);
930        self.external_write_store = Some(Arc::clone(&layered) as Arc<dyn GraphStoreMut>);
931        self.store = Some(Arc::clone(layered.overlay_store()));
932
933        // Install the disk-backed tier wrapper and register its memory
934        // consumer so the BufferManager can spill the base to mmap under
935        // memory pressure.
936        #[cfg(feature = "mmap")]
937        {
938            let tiered = Arc::new(compact_tiered::CompactStoreTiered::new_in_memory(
939                layered.base_store_arc(),
940            ));
941            let spill_path = self.buffer_manager.config().spill_path.clone();
942            let consumer = Arc::new(section_consumer::CompactStoreConsumer::new(
943                &tiered, &layered, spill_path,
944            ));
945            self.buffer_manager.register_consumer(consumer);
946            self.compact_tiered = Some(tiered);
947        }
948
949        self.layered_store = Some(layered);
950        self.read_only = false;
951        self.query_cache = Arc::new(QueryCache::default());
952        self.projections.write().clear();
953
954        Ok(())
955    }
956
957    /// Merges the overlay back into the columnar base.
958    ///
959    /// Reads the combined view (base + overlay), builds a fresh `CompactStore`,
960    /// and replaces the `LayeredStore` with one backed by the merged base and
961    /// an empty overlay.
962    ///
963    /// # Errors
964    ///
965    /// Returns an error if the database was not previously compacted, or if
966    /// the merge fails.
967    #[cfg(all(feature = "compact-store", feature = "lpg"))]
968    pub fn recompact(&mut self) -> Result<()> {
969        use grafeo_core::graph::compact::from_graph_store_preserving_ids;
970        use grafeo_core::graph::compact::layered::LayeredStore;
971
972        let layered = self
973            .layered_store
974            .as_ref()
975            .ok_or_else(|| Error::Internal("recompact() requires a prior compact()".into()))?;
976
977        // Read the combined view.
978        let combined: Arc<dyn GraphStoreSearch> = Arc::clone(layered) as Arc<dyn GraphStoreSearch>;
979
980        // Max IDs from the overlay's allocators.
981        let max_node_id = layered.overlay_store().next_node_id().saturating_sub(1);
982        let max_edge_id = layered.overlay_store().next_edge_id().saturating_sub(1);
983
984        let fresh_compact = from_graph_store_preserving_ids(combined.as_ref())
985            .map_err(|e| Error::Internal(e.to_string()))?;
986
987        let new_layered = Arc::new(
988            LayeredStore::new(fresh_compact, max_node_id, max_edge_id)
989                .map_err(|e| Error::Internal(e.to_string()))?,
990        );
991
992        // Sync overlay epoch.
993        let current_epoch = self.transaction_manager.current_epoch();
994        new_layered.overlay_store().sync_epoch(current_epoch);
995
996        // Carry named graphs forward: the old overlay is about to be dropped.
997        new_layered
998            .overlay_store()
999            .install_named_graphs(layered.overlay_store().take_named_graphs());
1000
1001        self.external_read_store = Some(Arc::clone(&new_layered) as Arc<dyn GraphStoreSearch>);
1002        self.external_write_store = Some(Arc::clone(&new_layered) as Arc<dyn GraphStoreMut>);
1003        self.store = Some(Arc::clone(new_layered.overlay_store()));
1004
1005        // Replace the tier wrapper: old one's Weak refs will now return None,
1006        // and its consumer (unregistered below) no longer tracks the freshly
1007        // built base.
1008        #[cfg(feature = "mmap")]
1009        {
1010            self.buffer_manager
1011                .unregister_consumer("section:CompactStore");
1012            let tiered = Arc::new(compact_tiered::CompactStoreTiered::new_in_memory(
1013                new_layered.base_store_arc(),
1014            ));
1015            let spill_path = self.buffer_manager.config().spill_path.clone();
1016            let consumer = Arc::new(section_consumer::CompactStoreConsumer::new(
1017                &tiered,
1018                &new_layered,
1019                spill_path,
1020            ));
1021            self.buffer_manager.register_consumer(consumer);
1022            self.compact_tiered = Some(tiered);
1023        }
1024
1025        self.layered_store = Some(new_layered);
1026        self.query_cache = Arc::new(QueryCache::default());
1027
1028        Ok(())
1029    }
1030
1031    /// Applies WAL records to restore the database state.
1032    ///
1033    /// Data mutation records are routed through a graph cursor that tracks
1034    /// `SwitchGraph` context markers, replaying mutations into the correct
1035    /// named graph (or the default graph when cursor is `None`).
1036    #[cfg(all(feature = "wal", feature = "lpg"))]
1037    fn apply_wal_records(
1038        store: &Arc<LpgStore>,
1039        catalog: &Catalog,
1040        #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
1041        records: &[WalRecord],
1042    ) -> Result<()> {
1043        use crate::catalog::{
1044            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
1045        };
1046        use grafeo_common::utils::error::Error;
1047
1048        // Graph cursor: tracks which named graph receives data mutations.
1049        // `None` means the default graph.
1050        let mut current_graph: Option<String> = None;
1051        let mut target_store: Arc<LpgStore> = Arc::clone(store);
1052
1053        for record in records {
1054            match record {
1055                // --- Named graph lifecycle ---
1056                WalRecord::CreateNamedGraph { name } => {
1057                    let _ = store.create_graph(name);
1058                }
1059                WalRecord::DropNamedGraph { name } => {
1060                    store.drop_graph(name);
1061                    // Reset cursor if the dropped graph was active
1062                    if current_graph.as_deref() == Some(name.as_str()) {
1063                        current_graph = None;
1064                        target_store = Arc::clone(store);
1065                    }
1066                }
1067                WalRecord::SwitchGraph { name } => {
1068                    current_graph.clone_from(name);
1069                    target_store = match &current_graph {
1070                        None => Arc::clone(store),
1071                        Some(graph_name) => store
1072                            .graph_or_create(graph_name)
1073                            .map_err(|e| Error::Internal(e.to_string()))?,
1074                    };
1075                }
1076
1077                // --- Data mutations: routed through target_store ---
1078                WalRecord::CreateNode { id, labels } => {
1079                    let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
1080                    target_store.create_node_with_id(*id, &label_refs)?;
1081                }
1082                WalRecord::DeleteNode { id } => {
1083                    target_store.delete_node(*id);
1084                }
1085                WalRecord::CreateEdge {
1086                    id,
1087                    src,
1088                    dst,
1089                    edge_type,
1090                } => {
1091                    target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
1092                }
1093                WalRecord::DeleteEdge { id } => {
1094                    target_store.delete_edge(*id);
1095                }
1096                WalRecord::SetNodeProperty { id, key, value } => {
1097                    target_store.set_node_property(*id, key, value.clone());
1098                }
1099                WalRecord::SetEdgeProperty { id, key, value } => {
1100                    target_store.set_edge_property(*id, key, value.clone());
1101                }
1102                WalRecord::AddNodeLabel { id, label } => {
1103                    target_store.add_label(*id, label);
1104                }
1105                WalRecord::RemoveNodeLabel { id, label } => {
1106                    target_store.remove_label(*id, label);
1107                }
1108                WalRecord::RemoveNodeProperty { id, key } => {
1109                    target_store.remove_node_property(*id, key);
1110                }
1111                WalRecord::RemoveEdgeProperty { id, key } => {
1112                    target_store.remove_edge_property(*id, key);
1113                }
1114
1115                // --- Schema DDL replay (always on root catalog) ---
1116                WalRecord::CreateNodeType {
1117                    name,
1118                    properties,
1119                    constraints,
1120                } => {
1121                    let def = NodeTypeDefinition {
1122                        name: name.clone(),
1123                        properties: properties
1124                            .iter()
1125                            .map(|(n, t, nullable)| TypedProperty {
1126                                name: n.clone(),
1127                                data_type: PropertyDataType::from_type_name(t),
1128                                nullable: *nullable,
1129                                default_value: None,
1130                            })
1131                            .collect(),
1132                        constraints: constraints
1133                            .iter()
1134                            .map(|(kind, props)| match kind.as_str() {
1135                                "unique" => TypeConstraint::Unique(props.clone()),
1136                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
1137                                "not_null" if !props.is_empty() => {
1138                                    TypeConstraint::NotNull(props[0].clone())
1139                                }
1140                                _ => TypeConstraint::Unique(props.clone()),
1141                            })
1142                            .collect(),
1143                        parent_types: Vec::new(),
1144                    };
1145                    let _ = catalog.register_node_type(def);
1146                }
1147                WalRecord::DropNodeType { name } => {
1148                    let _ = catalog.drop_node_type(name);
1149                }
1150                WalRecord::CreateEdgeType {
1151                    name,
1152                    properties,
1153                    constraints,
1154                } => {
1155                    let def = EdgeTypeDefinition {
1156                        name: name.clone(),
1157                        properties: properties
1158                            .iter()
1159                            .map(|(n, t, nullable)| TypedProperty {
1160                                name: n.clone(),
1161                                data_type: PropertyDataType::from_type_name(t),
1162                                nullable: *nullable,
1163                                default_value: None,
1164                            })
1165                            .collect(),
1166                        constraints: constraints
1167                            .iter()
1168                            .map(|(kind, props)| match kind.as_str() {
1169                                "unique" => TypeConstraint::Unique(props.clone()),
1170                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
1171                                "not_null" if !props.is_empty() => {
1172                                    TypeConstraint::NotNull(props[0].clone())
1173                                }
1174                                _ => TypeConstraint::Unique(props.clone()),
1175                            })
1176                            .collect(),
1177                        source_node_types: Vec::new(),
1178                        target_node_types: Vec::new(),
1179                    };
1180                    let _ = catalog.register_edge_type_def(def);
1181                }
1182                WalRecord::DropEdgeType { name } => {
1183                    let _ = catalog.drop_edge_type_def(name);
1184                }
1185                WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
1186                    // Index recreation is handled by the store on startup
1187                    // (indexes are rebuilt from data, not WAL)
1188                }
1189                WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
1190                    // Constraint definitions are part of type definitions
1191                    // and replayed via CreateNodeType/CreateEdgeType
1192                }
1193                WalRecord::CreateGraphType {
1194                    name,
1195                    node_types,
1196                    edge_types,
1197                    open,
1198                } => {
1199                    use crate::catalog::GraphTypeDefinition;
1200                    let def = GraphTypeDefinition {
1201                        name: name.clone(),
1202                        allowed_node_types: node_types.clone(),
1203                        allowed_edge_types: edge_types.clone(),
1204                        open: *open,
1205                    };
1206                    let _ = catalog.register_graph_type(def);
1207                }
1208                WalRecord::DropGraphType { name } => {
1209                    let _ = catalog.drop_graph_type(name);
1210                }
1211                WalRecord::CreateSchema { name } => {
1212                    let _ = catalog.register_schema_namespace(name.clone());
1213                }
1214                WalRecord::DropSchema { name } => {
1215                    let _ = catalog.drop_schema_namespace(name);
1216                }
1217
1218                WalRecord::AlterNodeType { name, alterations } => {
1219                    for (action, prop_name, type_name, nullable) in alterations {
1220                        match action.as_str() {
1221                            "add" => {
1222                                let prop = TypedProperty {
1223                                    name: prop_name.clone(),
1224                                    data_type: PropertyDataType::from_type_name(type_name),
1225                                    nullable: *nullable,
1226                                    default_value: None,
1227                                };
1228                                let _ = catalog.alter_node_type_add_property(name, prop);
1229                            }
1230                            "drop" => {
1231                                let _ = catalog.alter_node_type_drop_property(name, prop_name);
1232                            }
1233                            _ => {}
1234                        }
1235                    }
1236                }
1237                WalRecord::AlterEdgeType { name, alterations } => {
1238                    for (action, prop_name, type_name, nullable) in alterations {
1239                        match action.as_str() {
1240                            "add" => {
1241                                let prop = TypedProperty {
1242                                    name: prop_name.clone(),
1243                                    data_type: PropertyDataType::from_type_name(type_name),
1244                                    nullable: *nullable,
1245                                    default_value: None,
1246                                };
1247                                let _ = catalog.alter_edge_type_add_property(name, prop);
1248                            }
1249                            "drop" => {
1250                                let _ = catalog.alter_edge_type_drop_property(name, prop_name);
1251                            }
1252                            _ => {}
1253                        }
1254                    }
1255                }
1256                WalRecord::AlterGraphType { name, alterations } => {
1257                    for (action, type_name) in alterations {
1258                        match action.as_str() {
1259                            "add_node" => {
1260                                let _ =
1261                                    catalog.alter_graph_type_add_node_type(name, type_name.clone());
1262                            }
1263                            "drop_node" => {
1264                                let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
1265                            }
1266                            "add_edge" => {
1267                                let _ =
1268                                    catalog.alter_graph_type_add_edge_type(name, type_name.clone());
1269                            }
1270                            "drop_edge" => {
1271                                let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
1272                            }
1273                            _ => {}
1274                        }
1275                    }
1276                }
1277
1278                WalRecord::CreateProcedure {
1279                    name,
1280                    params,
1281                    returns,
1282                    body,
1283                } => {
1284                    use crate::catalog::ProcedureDefinition;
1285                    let def = ProcedureDefinition {
1286                        name: name.clone(),
1287                        params: params.clone(),
1288                        returns: returns.clone(),
1289                        body: body.clone(),
1290                    };
1291                    let _ = catalog.register_procedure(def);
1292                }
1293                WalRecord::DropProcedure { name } => {
1294                    let _ = catalog.drop_procedure(name);
1295                }
1296
1297                // --- RDF triple replay ---
1298                #[cfg(feature = "triple-store")]
1299                WalRecord::InsertRdfTriple { .. }
1300                | WalRecord::DeleteRdfTriple { .. }
1301                | WalRecord::ClearRdfGraph { .. }
1302                | WalRecord::CreateRdfGraph { .. }
1303                | WalRecord::DropRdfGraph { .. } => {
1304                    rdf_ops::replay_rdf_wal_record(rdf_store, record);
1305                }
1306                #[cfg(not(feature = "triple-store"))]
1307                WalRecord::InsertRdfTriple { .. }
1308                | WalRecord::DeleteRdfTriple { .. }
1309                | WalRecord::ClearRdfGraph { .. }
1310                | WalRecord::CreateRdfGraph { .. }
1311                | WalRecord::DropRdfGraph { .. } => {}
1312
1313                WalRecord::TransactionCommit { .. } => {
1314                    // In temporal mode, advance the store epoch on each committed
1315                    // transaction so that subsequent property/label operations
1316                    // are recorded at the correct epoch in their VersionLogs.
1317                    #[cfg(feature = "temporal")]
1318                    {
1319                        target_store.new_epoch();
1320                    }
1321                }
1322                WalRecord::TransactionAbort { .. } | WalRecord::Checkpoint { .. } => {
1323                    // Transaction control records don't need replay action
1324                    // (recovery already filtered to only committed transactions)
1325                }
1326                WalRecord::EpochAdvance { .. } => {
1327                    // Metadata record: no store mutation needed.
1328                    // Used by incremental backup and point-in-time recovery.
1329                }
1330            }
1331        }
1332        Ok(())
1333    }
1334
1335    // =========================================================================
1336    // Single-file format helpers
1337    // =========================================================================
1338
1339    /// Returns `true` if the given path should use single-file format.
1340    #[cfg(feature = "grafeo-file")]
1341    fn should_use_single_file(
1342        path: &std::path::Path,
1343        configured: crate::config::StorageFormat,
1344    ) -> bool {
1345        use crate::config::StorageFormat;
1346        match configured {
1347            StorageFormat::SingleFile => true,
1348            StorageFormat::WalDirectory => false,
1349            StorageFormat::Auto => {
1350                // Existing file: check magic bytes
1351                if path.is_file() {
1352                    if let Ok(mut f) = std::fs::File::open(path) {
1353                        use std::io::Read;
1354                        let mut magic = [0u8; 4];
1355                        if f.read_exact(&mut magic).is_ok() && magic == grafeo_storage::file::MAGIC
1356                        {
1357                            return true;
1358                        }
1359                    }
1360                    return false;
1361                }
1362                // Existing directory: legacy format
1363                if path.is_dir() {
1364                    return false;
1365                }
1366                // New path: check extension
1367                path.extension().is_some_and(|ext| ext == "grafeo")
1368            }
1369        }
1370    }
1371
1372    /// Applies snapshot data (from a `.grafeo` file) to restore the store and catalog.
1373    ///
1374    /// Supports both v1 (monolithic blob) and v2 (section-based) formats.
1375    #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
1376    fn apply_snapshot_data(
1377        store: &Arc<LpgStore>,
1378        catalog: &Arc<crate::catalog::Catalog>,
1379        #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
1380        data: &[u8],
1381    ) -> Result<()> {
1382        // v1 blob format: pass through to legacy loader
1383        persistence::load_snapshot_into_store(
1384            store,
1385            catalog,
1386            #[cfg(feature = "triple-store")]
1387            rdf_store,
1388            data,
1389        )
1390    }
1391
1392    /// Loads from a section-based `.grafeo` file (v2 format).
1393    ///
1394    /// Reads the section directory, then deserializes each section independently.
1395    #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
1396    fn load_from_sections(
1397        fm: &GrafeoFileManager,
1398        store: &Arc<LpgStore>,
1399        catalog: &Arc<crate::catalog::Catalog>,
1400        #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
1401    ) -> Result<()> {
1402        use grafeo_common::storage::{Section, SectionType};
1403
1404        let dir = fm.read_section_directory()?.ok_or_else(|| {
1405            grafeo_common::utils::error::Error::Internal(
1406                "expected v2 section directory but found none".to_string(),
1407            )
1408        })?;
1409
1410        // Load catalog section first (schema defs needed before data)
1411        if let Some(entry) = dir.find(SectionType::Catalog) {
1412            let data = fm.read_section_data(entry)?;
1413            let tm = Arc::new(crate::transaction::TransactionManager::new());
1414            let mut section = catalog_section::CatalogSection::new(
1415                Arc::clone(catalog),
1416                Arc::clone(store),
1417                move || tm.current_epoch().as_u64(),
1418            );
1419            section.deserialize(&data)?;
1420        }
1421
1422        // Load LPG store
1423        if let Some(entry) = dir.find(SectionType::LpgStore) {
1424            let data = fm.read_section_data(entry)?;
1425            let mut section = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
1426            section.deserialize(&data)?;
1427        }
1428
1429        // Load RDF store
1430        #[cfg(feature = "triple-store")]
1431        if let Some(entry) = dir.find(SectionType::RdfStore) {
1432            let data = fm.read_section_data(entry)?;
1433            let mut section = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(rdf_store));
1434            section.deserialize(&data)?;
1435        }
1436
1437        // Restore Ring Index (if persisted)
1438        #[cfg(feature = "ring-index")]
1439        if let Some(entry) = dir.find(SectionType::RdfRing) {
1440            let data = fm.read_section_data(entry)?;
1441            let mut section = grafeo_core::index::ring::RdfRingSection::new(Arc::clone(rdf_store));
1442            section.deserialize(&data)?;
1443        }
1444
1445        // Restore HNSW topology (if vector indexes exist in both catalog and section)
1446        #[cfg(feature = "vector-index")]
1447        if let Some(entry) = dir.find(SectionType::VectorStore) {
1448            let data = fm.read_section_data(entry)?;
1449            let indexes = store.vector_index_entries();
1450            if !indexes.is_empty() {
1451                let mut section = grafeo_core::index::vector::VectorStoreSection::new(indexes);
1452                section.deserialize(&data)?;
1453            }
1454        }
1455
1456        // Restore BM25 postings (if text indexes exist in both catalog and section)
1457        #[cfg(feature = "text-index")]
1458        if let Some(entry) = dir.find(SectionType::TextIndex) {
1459            let data = fm.read_section_data(entry)?;
1460            let indexes = store.text_index_entries();
1461            if !indexes.is_empty() {
1462                let mut section = grafeo_core::index::text::TextIndexSection::new(indexes);
1463                section.deserialize(&data)?;
1464            }
1465        }
1466
1467        Ok(())
1468    }
1469
1470    // =========================================================================
1471    // Session & Configuration
1472    // =========================================================================
1473
1474    /// Opens a new session for running queries.
1475    ///
1476    /// Sessions are cheap to create: spin up as many as you need. Each
1477    /// gets its own transaction context, so concurrent sessions won't
1478    /// block each other on reads.
1479    ///
1480    /// # Panics
1481    ///
1482    /// Panics if the database was configured with an external graph store and
1483    /// the internal arena allocator cannot be initialized (out of memory).
1484    ///
1485    /// # Examples
1486    ///
1487    /// ```
1488    /// use grafeo_engine::GrafeoDB;
1489    ///
1490    /// let db = GrafeoDB::new_in_memory();
1491    /// let session = db.session();
1492    ///
1493    /// // Run queries through the session
1494    /// let result = session.execute("MATCH (n) RETURN count(n)")?;
1495    /// # Ok::<(), grafeo_common::utils::error::Error>(())
1496    /// ```
1497    #[must_use]
1498    pub fn session(&self) -> Session {
1499        self.create_session_inner(None)
1500    }
1501
1502    /// Creates a session scoped to the given identity.
1503    ///
1504    /// The identity determines what operations the session is allowed to
1505    /// perform. A [`Role::ReadOnly`](crate::auth::Role::ReadOnly) identity
1506    /// creates a read-only session; a [`Role::ReadWrite`](crate::auth::Role::ReadWrite)
1507    /// identity allows data mutations but not schema DDL; a
1508    /// [`Role::Admin`](crate::auth::Role::Admin) identity has full access.
1509    ///
1510    /// # Examples
1511    ///
1512    /// ```
1513    /// use grafeo_engine::{GrafeoDB, auth::{Identity, Role}};
1514    ///
1515    /// let db = GrafeoDB::new_in_memory();
1516    /// let identity = Identity::new("app-service", [Role::ReadWrite]);
1517    /// let session = db.session_with_identity(identity);
1518    /// ```
1519    #[must_use]
1520    pub fn session_with_identity(&self, identity: crate::auth::Identity) -> Session {
1521        let force_read_only = !identity.can_write();
1522        self.create_session_inner_full(None, force_read_only, identity)
1523    }
1524
1525    /// Creates a session scoped to a single role.
1526    ///
1527    /// Convenience shorthand for
1528    /// `session_with_identity(Identity::new("anonymous", [role]))`.
1529    ///
1530    /// # Examples
1531    ///
1532    /// ```
1533    /// use grafeo_engine::{GrafeoDB, auth::Role};
1534    ///
1535    /// let db = GrafeoDB::new_in_memory();
1536    /// let reader = db.session_with_role(Role::ReadOnly);
1537    /// ```
1538    #[must_use]
1539    pub fn session_with_role(&self, role: crate::auth::Role) -> Session {
1540        self.session_with_identity(crate::auth::Identity::new("anonymous", [role]))
1541    }
1542
1543    /// Creates a session with an explicit CDC override.
1544    ///
1545    /// When `cdc_enabled` is `true`, mutations in this session are tracked
1546    /// regardless of the database default. When `false`, mutations are not
1547    /// tracked regardless of the database default.
1548    ///
1549    /// # Examples
1550    ///
1551    /// ```
1552    /// use grafeo_engine::GrafeoDB;
1553    ///
1554    /// let db = GrafeoDB::new_in_memory();
1555    ///
1556    /// // Opt in to CDC for just this session
1557    /// let tracked = db.session_with_cdc(true);
1558    /// tracked.execute("INSERT (:Person {name: 'Alix'})")?;
1559    /// # Ok::<(), grafeo_common::utils::error::Error>(())
1560    /// ```
1561    #[cfg(feature = "cdc")]
1562    #[must_use]
1563    pub fn session_with_cdc(&self, cdc_enabled: bool) -> Session {
1564        self.create_session_inner(Some(cdc_enabled))
1565    }
1566
1567    /// Creates a read-only session regardless of the database's access mode.
1568    ///
1569    /// Mutations executed through this session will fail with
1570    /// `TransactionError::ReadOnly`. Useful for replication replicas where
1571    /// the database itself must remain writable (for applying CDC changes)
1572    /// but client-facing queries must be read-only.
1573    ///
1574    /// **Deprecated**: Use `session_with_role(Role::ReadOnly)` instead.
1575    #[deprecated(
1576        since = "0.5.36",
1577        note = "use session_with_role(Role::ReadOnly) instead"
1578    )]
1579    #[must_use]
1580    pub fn session_read_only(&self) -> Session {
1581        self.session_with_role(crate::auth::Role::ReadOnly)
1582    }
1583
1584    /// Shared session creation logic.
1585    ///
1586    /// `cdc_override` overrides the database-wide `cdc_enabled` default when
1587    /// `Some`. `None` falls back to the database default.
1588    #[allow(unused_variables)] // cdc_override unused when cdc feature is off
1589    fn create_session_inner(&self, cdc_override: Option<bool>) -> Session {
1590        self.create_session_inner_full(cdc_override, false, crate::auth::Identity::anonymous())
1591    }
1592
1593    /// Shared session creation with all overrides.
1594    #[allow(unused_variables)]
1595    fn create_session_inner_full(
1596        &self,
1597        cdc_override: Option<bool>,
1598        force_read_only: bool,
1599        identity: crate::auth::Identity,
1600    ) -> Session {
1601        let session_cfg = || crate::session::SessionConfig {
1602            transaction_manager: Arc::clone(&self.transaction_manager),
1603            query_cache: Arc::clone(&self.query_cache),
1604            catalog: Arc::clone(&self.catalog),
1605            adaptive_config: self.config.adaptive.clone(),
1606            factorized_execution: self.config.factorized_execution,
1607            graph_model: self.config.graph_model,
1608            query_timeout: self.config.query_timeout,
1609            max_property_size: self.config.max_property_size,
1610            buffer_manager: Some(Arc::clone(&self.buffer_manager)),
1611            commit_counter: Arc::clone(&self.commit_counter),
1612            gc_interval: self.config.gc_interval,
1613            read_only: self.read_only || force_read_only,
1614            identity: identity.clone(),
1615            #[cfg(feature = "lpg")]
1616            projections: Arc::clone(&self.projections),
1617        };
1618
1619        // Layered store: use the overlay LpgStore as the session's internal store
1620        // so MVCC operations (begin_tx, commit, visibility) work correctly.
1621        #[cfg(all(feature = "compact-store", feature = "lpg"))]
1622        if let Some(ref layered) = self.layered_store {
1623            let overlay = Arc::clone(layered.overlay_store());
1624            let layered_arc = Arc::clone(layered);
1625            let mut session = Session::with_adaptive(overlay, session_cfg());
1626            // Override graph_store/graph_store_mut to use the LayeredStore
1627            // (which merges base + overlay), not just the overlay alone.
1628            session.override_stores(
1629                Arc::clone(&layered_arc) as Arc<dyn GraphStoreSearch>,
1630                Some(layered_arc as Arc<dyn GraphStoreMut>),
1631            );
1632            return session;
1633        }
1634
1635        if let Some(ref ext_read) = self.external_read_store {
1636            return Session::with_external_store(
1637                Arc::clone(ext_read),
1638                self.external_write_store.as_ref().map(Arc::clone),
1639                session_cfg(),
1640            )
1641            .expect("arena allocation for external store session");
1642        }
1643
1644        #[cfg(all(feature = "lpg", feature = "triple-store"))]
1645        let mut session = Session::with_rdf_store_and_adaptive(
1646            Arc::clone(self.lpg_store()),
1647            Arc::clone(&self.rdf_store),
1648            session_cfg(),
1649        );
1650        #[cfg(all(feature = "lpg", not(feature = "triple-store")))]
1651        let mut session = Session::with_adaptive(Arc::clone(self.lpg_store()), session_cfg());
1652        #[cfg(not(feature = "lpg"))]
1653        let mut session =
1654            Session::with_external_store(self.graph_store(), self.graph_store_mut(), session_cfg())
1655                .expect("session creation for non-lpg build");
1656
1657        #[cfg(all(feature = "wal", feature = "lpg"))]
1658        if let Some(ref wal) = self.wal {
1659            session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
1660        }
1661
1662        #[cfg(feature = "cdc")]
1663        {
1664            let should_enable = cdc_override.unwrap_or_else(|| self.cdc_active());
1665            if should_enable {
1666                session.set_cdc_log(Arc::clone(&self.cdc_log));
1667            }
1668        }
1669
1670        #[cfg(feature = "metrics")]
1671        {
1672            if let Some(ref m) = self.metrics {
1673                session.set_metrics(Arc::clone(m));
1674                m.session_created
1675                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1676                m.session_active
1677                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1678            }
1679        }
1680
1681        // Propagate persistent graph context to the new session
1682        if let Some(ref graph) = *self.current_graph.read() {
1683            session.use_graph(graph);
1684        }
1685
1686        // Propagate persistent schema context to the new session
1687        if let Some(ref schema) = *self.current_schema.read() {
1688            session.set_schema(schema);
1689        }
1690
1691        // Suppress unused_mut when cdc/wal are disabled
1692        let _ = &mut session;
1693
1694        session
1695    }
1696
1697    /// Returns the current graph name, if any.
1698    ///
1699    /// This is the persistent graph context used by one-shot `execute()` calls.
1700    /// It is updated whenever `execute()` encounters `USE GRAPH`, `SESSION SET GRAPH`,
1701    /// or `SESSION RESET`.
1702    #[must_use]
1703    pub fn current_graph(&self) -> Option<String> {
1704        self.current_graph.read().clone()
1705    }
1706
1707    /// Sets the current graph context for subsequent one-shot `execute()` calls.
1708    ///
1709    /// This is equivalent to running `USE GRAPH <name>` but without creating a session.
1710    /// Pass `None` to reset to the default graph.
1711    ///
1712    /// # Errors
1713    ///
1714    /// Returns an error if the named graph does not exist.
1715    pub fn set_current_graph(&self, name: Option<&str>) -> Result<()> {
1716        #[cfg(feature = "lpg")]
1717        if let Some(name) = name
1718            && !name.eq_ignore_ascii_case("default")
1719            && let Some(store) = &self.store
1720            && store.graph(name).is_none()
1721        {
1722            return Err(Error::Query(QueryError::new(
1723                QueryErrorKind::Semantic,
1724                format!("Graph '{name}' does not exist"),
1725            )));
1726        }
1727        *self.current_graph.write() = name.map(ToString::to_string);
1728        Ok(())
1729    }
1730
1731    /// Returns the current schema name, if any.
1732    ///
1733    /// This is the persistent schema context used by one-shot `execute()` calls.
1734    /// It is updated whenever `execute()` encounters `SESSION SET SCHEMA` or `SESSION RESET`.
1735    #[must_use]
1736    pub fn current_schema(&self) -> Option<String> {
1737        self.current_schema.read().clone()
1738    }
1739
1740    /// Sets the current schema context for subsequent one-shot `execute()` calls.
1741    ///
1742    /// This is equivalent to running `SESSION SET SCHEMA <name>` but without creating
1743    /// a session. Pass `None` to clear the schema context.
1744    ///
1745    /// # Errors
1746    ///
1747    /// Returns an error if the named schema does not exist.
1748    pub fn set_current_schema(&self, name: Option<&str>) -> Result<()> {
1749        if let Some(name) = name
1750            && !self.catalog.schema_exists(name)
1751        {
1752            return Err(Error::Query(QueryError::new(
1753                QueryErrorKind::Semantic,
1754                format!("Schema '{name}' does not exist"),
1755            )));
1756        }
1757        *self.current_schema.write() = name.map(ToString::to_string);
1758        Ok(())
1759    }
1760
1761    /// Returns the adaptive execution configuration.
1762    #[must_use]
1763    pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
1764        &self.config.adaptive
1765    }
1766
1767    /// Returns `true` if this database was opened in read-only mode.
1768    #[must_use]
1769    pub fn is_read_only(&self) -> bool {
1770        self.read_only
1771    }
1772
1773    /// Returns the configuration.
1774    #[must_use]
1775    pub fn config(&self) -> &Config {
1776        &self.config
1777    }
1778
1779    /// Returns the graph data model of this database.
1780    #[must_use]
1781    pub fn graph_model(&self) -> crate::config::GraphModel {
1782        self.config.graph_model
1783    }
1784
1785    /// Returns the configured memory limit in bytes, if any.
1786    #[must_use]
1787    pub fn memory_limit(&self) -> Option<usize> {
1788        self.config.memory_limit
1789    }
1790
1791    /// Returns a point-in-time snapshot of all metrics.
1792    ///
1793    /// If the `metrics` feature is disabled or the registry is not
1794    /// initialized, returns a default (all-zero) snapshot.
1795    #[cfg(feature = "metrics")]
1796    #[must_use]
1797    pub fn metrics(&self) -> crate::metrics::MetricsSnapshot {
1798        let mut snapshot = self
1799            .metrics
1800            .as_ref()
1801            .map_or_else(crate::metrics::MetricsSnapshot::default, |m| m.snapshot());
1802
1803        // Augment with cache stats from the query cache (not tracked in the registry)
1804        let cache_stats = self.query_cache.stats();
1805        snapshot.cache_hits = cache_stats.parsed_hits + cache_stats.optimized_hits;
1806        snapshot.cache_misses = cache_stats.parsed_misses + cache_stats.optimized_misses;
1807        snapshot.cache_size = cache_stats.parsed_size + cache_stats.optimized_size;
1808        snapshot.cache_invalidations = cache_stats.invalidations;
1809
1810        snapshot
1811    }
1812
1813    /// Returns all metrics in Prometheus text exposition format.
1814    ///
1815    /// The output is ready to serve from an HTTP `/metrics` endpoint.
1816    #[cfg(feature = "metrics")]
1817    #[must_use]
1818    pub fn metrics_prometheus(&self) -> String {
1819        self.metrics
1820            .as_ref()
1821            .map_or_else(String::new, |m| m.to_prometheus())
1822    }
1823
1824    /// Resets all metrics counters and histograms to zero.
1825    #[cfg(feature = "metrics")]
1826    pub fn reset_metrics(&self) {
1827        if let Some(ref m) = self.metrics {
1828            m.reset();
1829        }
1830        self.query_cache.reset_stats();
1831    }
1832
1833    /// Returns the underlying (default) store.
1834    ///
1835    /// This provides direct access to the LPG store for algorithm implementations
1836    /// and admin operations (index management, schema introspection, MVCC internals).
1837    ///
1838    /// For code that only needs read/write graph operations, prefer
1839    /// [`graph_store()`](Self::graph_store) which returns the trait interface.
1840    #[cfg(feature = "lpg")]
1841    #[must_use]
1842    pub fn store(&self) -> &Arc<LpgStore> {
1843        self.lpg_store()
1844    }
1845
1846    // === Named Graph Management ===
1847
1848    /// Creates a named graph. Returns `true` if created, `false` if it already exists.
1849    ///
1850    /// # Errors
1851    ///
1852    /// Returns an error if arena allocation fails.
1853    #[cfg(feature = "lpg")]
1854    pub fn create_graph(&self, name: &str) -> Result<bool> {
1855        Ok(self.lpg_store().create_graph(name)?)
1856    }
1857
1858    /// Drops a named graph. Returns `true` if dropped, `false` if it did not exist.
1859    ///
1860    /// If the dropped graph was the active graph context, the context is reset
1861    /// to the default graph.
1862    #[cfg(feature = "lpg")]
1863    pub fn drop_graph(&self, name: &str) -> bool {
1864        let Some(store) = &self.store else {
1865            return false;
1866        };
1867        let dropped = store.drop_graph(name);
1868        if dropped {
1869            let mut current = self.current_graph.write();
1870            if current
1871                .as_deref()
1872                .is_some_and(|g| g.eq_ignore_ascii_case(name))
1873            {
1874                *current = None;
1875            }
1876        }
1877        dropped
1878    }
1879
1880    /// Returns all named graph names.
1881    #[cfg(feature = "lpg")]
1882    #[must_use]
1883    pub fn list_graphs(&self) -> Vec<String> {
1884        self.lpg_store().graph_names()
1885    }
1886
1887    // === Graph Projections ===
1888
1889    /// Creates a named graph projection (virtual subgraph).
1890    ///
1891    /// The projection filters the graph store to only include nodes with the
1892    /// specified labels and edges with the specified types. Returns `true` if
1893    /// created, `false` if a projection with that name already exists.
1894    ///
1895    /// # Examples
1896    ///
1897    /// ```
1898    /// use grafeo_engine::GrafeoDB;
1899    /// use grafeo_core::graph::ProjectionSpec;
1900    ///
1901    /// let db = GrafeoDB::new_in_memory();
1902    /// let spec = ProjectionSpec::new()
1903    ///     .with_node_labels(["Person", "City"])
1904    ///     .with_edge_types(["LIVES_IN"]);
1905    /// assert!(db.create_projection("social", spec));
1906    /// ```
1907    pub fn create_projection(
1908        &self,
1909        name: impl Into<String>,
1910        spec: grafeo_core::graph::ProjectionSpec,
1911    ) -> bool {
1912        use grafeo_core::graph::GraphProjection;
1913        use std::collections::hash_map::Entry;
1914
1915        let store = self.graph_store();
1916        let projection = Arc::new(GraphProjection::new(store, spec));
1917        let mut projections = self.projections.write();
1918        match projections.entry(name.into()) {
1919            Entry::Occupied(_) => false,
1920            Entry::Vacant(e) => {
1921                e.insert(projection);
1922                true
1923            }
1924        }
1925    }
1926
1927    /// Drops a named graph projection. Returns `true` if it existed.
1928    pub fn drop_projection(&self, name: &str) -> bool {
1929        self.projections.write().remove(name).is_some()
1930    }
1931
1932    /// Returns the names of all graph projections.
1933    #[must_use]
1934    pub fn list_projections(&self) -> Vec<String> {
1935        self.projections.read().keys().cloned().collect()
1936    }
1937
1938    /// Returns a named projection as a [`GraphStoreSearch`] trait object.
1939    #[must_use]
1940    pub fn projection(&self, name: &str) -> Option<Arc<dyn GraphStoreSearch>> {
1941        self.projections
1942            .read()
1943            .get(name)
1944            .map(|p| Arc::clone(p) as Arc<dyn GraphStoreSearch>)
1945    }
1946
1947    /// Returns the graph store as a trait object.
1948    ///
1949    /// Returns a read-only trait object for the active graph store.
1950    ///
1951    /// This provides the [`GraphStoreSearch`] interface (graph-structure reads
1952    /// plus text/vector search capabilities) for code that only needs read
1953    /// operations. For write access, use [`graph_store_mut()`](Self::graph_store_mut).
1954    #[must_use]
1955    pub fn graph_store(&self) -> Arc<dyn GraphStoreSearch> {
1956        if let Some(ref ext_read) = self.external_read_store {
1957            Arc::clone(ext_read)
1958        } else {
1959            #[cfg(feature = "lpg")]
1960            {
1961                Arc::clone(self.lpg_store()) as Arc<dyn GraphStoreSearch>
1962            }
1963            #[cfg(not(feature = "lpg"))]
1964            unreachable!("no graph store available: enable the `lpg` feature or use with_store()")
1965        }
1966    }
1967
1968    /// Returns the writable graph store, if available.
1969    ///
1970    /// Returns `None` for read-only databases created via
1971    /// [`with_read_store()`](Self::with_read_store).
1972    #[must_use]
1973    pub fn graph_store_mut(&self) -> Option<Arc<dyn GraphStoreMut>> {
1974        if self.external_read_store.is_some() {
1975            self.external_write_store.as_ref().map(Arc::clone)
1976        } else {
1977            #[cfg(feature = "lpg")]
1978            {
1979                Some(Arc::clone(self.lpg_store()) as Arc<dyn GraphStoreMut>)
1980            }
1981            #[cfg(not(feature = "lpg"))]
1982            {
1983                None
1984            }
1985        }
1986    }
1987
1988    /// Garbage collects old MVCC versions that are no longer visible.
1989    ///
1990    /// Determines the minimum epoch required by active transactions and prunes
1991    /// version chains older than that threshold. Also cleans up completed
1992    /// transaction metadata in the transaction manager, and prunes the CDC
1993    /// event log according to its retention policy.
1994    pub fn gc(&self) {
1995        #[cfg(feature = "lpg")]
1996        {
1997            let min_epoch = self.transaction_manager.min_active_epoch();
1998            self.lpg_store().gc_versions(min_epoch);
1999        }
2000        #[cfg(all(feature = "lpg", feature = "cdc"))]
2001        let current_epoch = self.transaction_manager.current_epoch();
2002        self.transaction_manager.gc();
2003
2004        // Prune CDC events based on retention config (epoch + count limits)
2005        #[cfg(feature = "cdc")]
2006        if self.cdc_enabled.load(std::sync::atomic::Ordering::Relaxed) {
2007            #[cfg(feature = "lpg")]
2008            self.cdc_log.apply_retention(current_epoch);
2009        }
2010    }
2011
2012    /// Returns the buffer manager for memory-aware operations.
2013    #[must_use]
2014    pub fn buffer_manager(&self) -> &Arc<BufferManager> {
2015        &self.buffer_manager
2016    }
2017
2018    /// Returns the layered store (compact base + mutable overlay), if
2019    /// [`compact()`](Self::compact) has been called.
2020    #[cfg(all(feature = "compact-store", feature = "lpg"))]
2021    #[must_use]
2022    pub fn layered_store(
2023        &self,
2024    ) -> Option<&Arc<grafeo_core::graph::compact::layered::LayeredStore>> {
2025        self.layered_store.as_ref()
2026    }
2027
2028    /// Returns the disk-backed tier wrapper for the compact base, if
2029    /// [`compact()`](Self::compact) has been called and `mmap` is enabled.
2030    #[cfg(all(feature = "compact-store", feature = "mmap", feature = "lpg"))]
2031    #[must_use]
2032    pub fn compact_tiered(&self) -> Option<&Arc<compact_tiered::CompactStoreTiered>> {
2033        self.compact_tiered.as_ref()
2034    }
2035
2036    /// Returns the query cache.
2037    #[must_use]
2038    pub fn query_cache(&self) -> &Arc<QueryCache> {
2039        &self.query_cache
2040    }
2041
2042    /// Clears all cached query plans.
2043    ///
2044    /// This is called automatically after DDL operations, but can also be
2045    /// invoked manually after external schema changes (e.g., WAL replay,
2046    /// import) or when you want to force re-optimization of all queries.
2047    pub fn clear_plan_cache(&self) {
2048        self.query_cache.clear();
2049    }
2050
2051    // =========================================================================
2052    // Lifecycle
2053    // =========================================================================
2054
2055    /// Closes the database, flushing all pending writes.
2056    ///
2057    /// For persistent databases, this ensures everything is safely on disk.
2058    /// Called automatically when the database is dropped, but you can call
2059    /// it explicitly if you need to guarantee durability at a specific point.
2060    ///
2061    /// # Errors
2062    ///
2063    /// Returns an error if the WAL can't be flushed (check disk space/permissions).
2064    pub fn close(&self) -> Result<()> {
2065        let mut is_open = self.is_open.write();
2066        if !*is_open {
2067            return Ok(());
2068        }
2069
2070        // Stop the periodic checkpoint timer first, even for read-only databases.
2071        // compact() can switch a writable DB to read-only after the timer started,
2072        // so the timer must be stopped before any early return to avoid racing
2073        // with the closed file manager.
2074        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
2075        if let Some(mut timer) = self.checkpoint_timer.lock().take() {
2076            timer.stop();
2077        }
2078
2079        // Read-only databases: just release the shared lock, no checkpointing
2080        if self.read_only {
2081            #[cfg(feature = "grafeo-file")]
2082            if let Some(ref fm) = self.file_manager {
2083                fm.close()?;
2084            }
2085            *is_open = false;
2086            return Ok(());
2087        }
2088
2089        // For single-file format: checkpoint to .grafeo file, then clean up sidecar WAL.
2090        // We must do this BEFORE the WAL close path because checkpoint_to_file
2091        // removes the sidecar WAL directory.
2092        #[cfg(feature = "grafeo-file")]
2093        let is_single_file = self.file_manager.is_some();
2094        #[cfg(not(feature = "grafeo-file"))]
2095        let is_single_file = false;
2096
2097        #[cfg(feature = "grafeo-file")]
2098        if let Some(ref fm) = self.file_manager {
2099            // Flush WAL first so all records are on disk before we snapshot
2100            #[cfg(feature = "wal")]
2101            if let Some(ref wal) = self.wal {
2102                wal.sync()?;
2103            }
2104            let flush_result = self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?;
2105
2106            // Safety check: if WAL has records but the checkpoint was a no-op
2107            // (zero sections written), the container file may not contain the
2108            // latest data. This can happen when sections are not marked dirty
2109            // despite mutations going through the WAL. Force-dirty all sections
2110            // and retry before removing the sidecar.
2111            #[cfg(feature = "wal")]
2112            let flush_result = if flush_result.sections_written == 0 {
2113                if let Some(ref wal) = self.wal {
2114                    if wal.record_count() > 0 {
2115                        grafeo_warn!(
2116                            "WAL has {} records but checkpoint wrote 0 sections; retrying with forced flush",
2117                            wal.record_count()
2118                        );
2119                        self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?
2120                    } else {
2121                        flush_result
2122                    }
2123                } else {
2124                    flush_result
2125                }
2126            } else {
2127                flush_result
2128            };
2129
2130            // Release WAL file handles before removing sidecar directory.
2131            // On Windows, open handles prevent directory deletion.
2132            #[cfg(feature = "wal")]
2133            if let Some(ref wal) = self.wal {
2134                wal.close_active_log();
2135            }
2136
2137            // Only remove the sidecar WAL after verifying the checkpoint wrote
2138            // data to the container. If nothing was written and the WAL had
2139            // records, keep the sidecar so the next open can recover from it.
2140            #[cfg(feature = "wal")]
2141            let has_wal_records = self.wal.as_ref().is_some_and(|wal| wal.record_count() > 0);
2142            #[cfg(not(feature = "wal"))]
2143            let has_wal_records = false;
2144
2145            if flush_result.sections_written > 0 || !has_wal_records {
2146                {
2147                    use grafeo_common::testing::crash::maybe_crash;
2148                    maybe_crash("close:before_remove_sidecar_wal");
2149                }
2150                fm.remove_sidecar_wal()?;
2151            } else {
2152                grafeo_warn!(
2153                    "keeping sidecar WAL for recovery: checkpoint wrote 0 sections but WAL has records"
2154                );
2155            }
2156            fm.close()?;
2157        }
2158
2159        // Commit and sync WAL (legacy directory format only).
2160        // We intentionally do NOT call wal.checkpoint() here. Directory format
2161        // has no snapshot: the WAL files are the sole source of truth. Writing
2162        // checkpoint.meta would cause recovery to skip older WAL files, losing
2163        // all data that predates the current log sequence.
2164        #[cfg(feature = "wal")]
2165        if !is_single_file && let Some(ref wal) = self.wal {
2166            // Use the last assigned transaction ID, or create one for the commit record
2167            let commit_tx = self
2168                .transaction_manager
2169                .last_assigned_transaction_id()
2170                .unwrap_or_else(|| self.transaction_manager.begin());
2171
2172            // Log a TransactionCommit to mark all pending records as committed
2173            wal.log(&WalRecord::TransactionCommit {
2174                transaction_id: commit_tx,
2175            })?;
2176
2177            wal.sync()?;
2178        }
2179
2180        *is_open = false;
2181        Ok(())
2182    }
2183
2184    /// Returns the typed WAL if available.
2185    #[cfg(feature = "wal")]
2186    #[must_use]
2187    pub fn wal(&self) -> Option<&Arc<LpgWal>> {
2188        self.wal.as_ref()
2189    }
2190
2191    /// Logs a WAL record if WAL is enabled.
2192    #[cfg(feature = "wal")]
2193    pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
2194        if let Some(ref wal) = self.wal {
2195            wal.log(record)?;
2196        }
2197        Ok(())
2198    }
2199
2200    /// Registers storage sections as [`MemoryConsumer`]s with the BufferManager.
2201    ///
2202    /// Each section reports its memory usage to the buffer manager, enabling
2203    /// accurate pressure tracking. Called once after database construction.
2204    fn register_section_consumers(&mut self) {
2205        // LPG store section
2206        #[cfg(feature = "lpg")]
2207        let store_ref = self.store.as_ref();
2208        #[cfg(feature = "lpg")]
2209        if let Some(store) = store_ref {
2210            let lpg = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
2211            self.buffer_manager.register_consumer(Arc::new(
2212                section_consumer::SectionConsumer::new(Arc::new(lpg)),
2213            ));
2214        }
2215
2216        // RDF store: only when data exists
2217        #[cfg(feature = "triple-store")]
2218        if !self.rdf_store.is_empty() || self.rdf_store.graph_count() > 0 {
2219            let rdf = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(&self.rdf_store));
2220            self.buffer_manager.register_consumer(Arc::new(
2221                section_consumer::SectionConsumer::new(Arc::new(rdf)),
2222            ));
2223        }
2224
2225        // Ring Index: only when Ring has been built
2226        #[cfg(feature = "ring-index")]
2227        if self.rdf_store.ring().is_some() {
2228            let ring = grafeo_core::index::ring::RdfRingSection::new(Arc::clone(&self.rdf_store));
2229            self.buffer_manager.register_consumer(Arc::new(
2230                section_consumer::SectionConsumer::new(Arc::new(ring)),
2231            ));
2232        }
2233
2234        // Vector indexes: dynamic consumer that re-queries the store on each
2235        // memory_usage() call, so dropped indexes are freed and new ones tracked.
2236        #[cfg(all(
2237            feature = "lpg",
2238            feature = "vector-index",
2239            feature = "mmap",
2240            not(feature = "temporal")
2241        ))]
2242        if let Some(store) = store_ref {
2243            let spill_path = self.buffer_manager.config().spill_path.clone();
2244            let consumer = Arc::new(section_consumer::VectorIndexConsumer::new(
2245                store, spill_path,
2246            ));
2247            // Share the spill registry with the search path
2248            self.vector_spill_storages = Some(Arc::clone(consumer.spilled_storages()));
2249            self.buffer_manager.register_consumer(consumer);
2250        }
2251
2252        // Text indexes: same dynamic approach as vector indexes.
2253        #[cfg(all(feature = "lpg", feature = "text-index"))]
2254        if let Some(store) = store_ref {
2255            self.buffer_manager
2256                .register_consumer(Arc::new(section_consumer::TextIndexConsumer::new(store)));
2257        }
2258
2259        // CDC log: register as memory consumer so the buffer manager can
2260        // prune events under memory pressure.
2261        #[cfg(feature = "cdc")]
2262        self.buffer_manager.register_consumer(
2263            Arc::clone(&self.cdc_log) as Arc<dyn grafeo_common::memory::MemoryConsumer>
2264        );
2265    }
2266
2267    /// Discovers and re-opens spill files from a previous session.
2268    ///
2269    /// When the database was closed with spilled vector embeddings, the
2270    /// `vectors_*.bin` files persist in the spill directory. This method
2271    /// scans for them, opens each as `MmapStorage`, and registers them
2272    /// in the `vector_spill_storages` map so search can read from them.
2273    #[cfg(all(
2274        feature = "lpg",
2275        feature = "vector-index",
2276        feature = "mmap",
2277        not(feature = "temporal")
2278    ))]
2279    fn restore_spill_files(&mut self) {
2280        use grafeo_core::index::vector::MmapStorage;
2281
2282        let spill_dir = match self.buffer_manager.config().spill_path {
2283            Some(ref path) => path.clone(),
2284            None => return,
2285        };
2286
2287        if !spill_dir.exists() {
2288            return;
2289        }
2290
2291        let spill_map = match self.vector_spill_storages {
2292            Some(ref map) => Arc::clone(map),
2293            None => return,
2294        };
2295
2296        let Ok(entries) = std::fs::read_dir(&spill_dir) else {
2297            return;
2298        };
2299
2300        let Some(ref store) = self.store else {
2301            return;
2302        };
2303
2304        for entry in entries.flatten() {
2305            let path = entry.path();
2306            let file_name = match path.file_name().and_then(|n| n.to_str()) {
2307                Some(name) => name.to_string(),
2308                None => continue,
2309            };
2310
2311            // Match pattern: vectors_{key}.bin where key is percent-encoded
2312            if !file_name.starts_with("vectors_")
2313                || !std::path::Path::new(&file_name)
2314                    .extension()
2315                    .is_some_and(|ext| ext.eq_ignore_ascii_case("bin"))
2316            {
2317                continue;
2318            }
2319
2320            // Extract and decode key: "vectors_Label%3Aembedding.bin" -> "Label:embedding"
2321            let key_part = &file_name["vectors_".len()..file_name.len() - ".bin".len()];
2322
2323            // Percent-decode: %3A -> ':', %25 -> '%'
2324            let key = key_part.replace("%3A", ":").replace("%25", "%");
2325
2326            // Key must contain ':' (label:property format)
2327            if !key.contains(':') {
2328                // Legacy file with old encoding, skip (will be re-created on next spill)
2329                continue;
2330            }
2331
2332            // Only restore if the corresponding vector index exists
2333            if store.get_vector_index_by_key(&key).is_none() {
2334                // Stale spill file (index was dropped), clean it up
2335                let _ = std::fs::remove_file(&path);
2336                continue;
2337            }
2338
2339            // Open the MmapStorage
2340            match MmapStorage::open(&path) {
2341                Ok(mmap_storage) => {
2342                    // Mark the property column as spilled so get() returns None
2343                    let property = key.split(':').nth(1).unwrap_or("");
2344                    let prop_key = grafeo_common::types::PropertyKey::new(property);
2345                    store.node_properties_mark_spilled(&prop_key);
2346
2347                    spill_map.write().insert(key, Arc::new(mmap_storage));
2348                }
2349                Err(e) => {
2350                    eprintln!("failed to restore spill file {}: {e}", path.display());
2351                    // Remove corrupt spill file
2352                    let _ = std::fs::remove_file(&path);
2353                }
2354            }
2355        }
2356    }
2357
2358    /// Builds section objects for the current database state.
2359    #[cfg(feature = "grafeo-file")]
2360    fn build_sections(&self) -> Vec<Box<dyn grafeo_common::storage::Section>> {
2361        let mut sections: Vec<Box<dyn grafeo_common::storage::Section>> = Vec::new();
2362
2363        // Layered store: serialize both the compact base and the overlay.
2364        #[cfg(all(feature = "compact-store", feature = "lpg"))]
2365        if let Some(ref layered) = self.layered_store {
2366            // Compact base section.
2367            let compact_section = grafeo_core::graph::compact::section::CompactStoreSection::new(
2368                layered.base_store_arc(),
2369            );
2370            sections.push(Box::new(compact_section));
2371
2372            // Overlay LPG section.
2373            let overlay = layered.overlay_store();
2374            let overlay_section =
2375                grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(overlay));
2376            sections.push(Box::new(overlay_section));
2377
2378            return sections;
2379        }
2380
2381        // LPG sections: store, catalog, vector indexes, text indexes
2382        #[cfg(feature = "lpg")]
2383        if let Some(store) = self.store.as_ref() {
2384            let lpg = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
2385
2386            let catalog = catalog_section::CatalogSection::new(
2387                Arc::clone(&self.catalog),
2388                Arc::clone(store),
2389                {
2390                    let tm = Arc::clone(&self.transaction_manager);
2391                    move || tm.current_epoch().as_u64()
2392                },
2393            );
2394
2395            sections.push(Box::new(catalog));
2396            sections.push(Box::new(lpg));
2397
2398            // Vector indexes: persist HNSW topology to avoid rebuild on load
2399            #[cfg(feature = "vector-index")]
2400            {
2401                let indexes = store.vector_index_entries();
2402                if !indexes.is_empty() {
2403                    let vector = grafeo_core::index::vector::VectorStoreSection::new(indexes);
2404                    sections.push(Box::new(vector));
2405                }
2406            }
2407
2408            // Text indexes: persist BM25 postings to avoid rebuild on load
2409            #[cfg(feature = "text-index")]
2410            {
2411                let indexes = store.text_index_entries();
2412                if !indexes.is_empty() {
2413                    let text = grafeo_core::index::text::TextIndexSection::new(indexes);
2414                    sections.push(Box::new(text));
2415                }
2416            }
2417        }
2418
2419        #[cfg(feature = "triple-store")]
2420        if !self.rdf_store.is_empty() || self.rdf_store.graph_count() > 0 {
2421            let rdf = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(&self.rdf_store));
2422            sections.push(Box::new(rdf));
2423        }
2424
2425        #[cfg(feature = "ring-index")]
2426        if self.rdf_store.ring().is_some() {
2427            let ring = grafeo_core::index::ring::RdfRingSection::new(Arc::clone(&self.rdf_store));
2428            sections.push(Box::new(ring));
2429        }
2430
2431        sections
2432    }
2433
2434    // =========================================================================
2435    // Backup API
2436    // =========================================================================
2437
2438    /// Creates a full backup of the database in the given directory.
2439    ///
2440    /// Checkpoints the database, copies the `.grafeo` file, and creates a
2441    /// backup manifest. Subsequent incremental backups will use this as the
2442    /// base.
2443    ///
2444    /// # Errors
2445    ///
2446    /// Returns an error if the database has no file manager or I/O fails.
2447    #[cfg(all(feature = "wal", feature = "grafeo-file", feature = "lpg"))]
2448    pub fn backup_full(&self, backup_dir: &std::path::Path) -> Result<backup::BackupSegment> {
2449        let fm = self
2450            .file_manager
2451            .as_ref()
2452            .ok_or_else(|| Error::Internal("backup requires a persistent database".to_string()))?;
2453
2454        // Checkpoint to ensure the container has the latest data.
2455        // Skip for read-only databases: the on-disk file is already a valid
2456        // snapshot and the file manager rejects writes.
2457        if !self.read_only {
2458            let _ = self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?;
2459        }
2460
2461        let current_epoch = self.transaction_manager.current_epoch();
2462        backup::do_backup_full(backup_dir, fm, self.wal.as_deref(), current_epoch)
2463    }
2464
2465    /// Creates an incremental backup containing WAL records since the last backup.
2466    ///
2467    /// Requires a prior full backup in the backup directory.
2468    ///
2469    /// # Errors
2470    ///
2471    /// Returns an error if no full backup exists, or if the WAL has no new records.
2472    #[cfg(all(feature = "wal", feature = "grafeo-file", feature = "lpg"))]
2473    pub fn backup_incremental(
2474        &self,
2475        backup_dir: &std::path::Path,
2476    ) -> Result<backup::BackupSegment> {
2477        let wal = self
2478            .wal
2479            .as_ref()
2480            .ok_or_else(|| Error::Internal("incremental backup requires WAL".to_string()))?;
2481
2482        let current_epoch = self.transaction_manager.current_epoch();
2483        backup::do_backup_incremental(backup_dir, wal, current_epoch)
2484    }
2485
2486    /// Returns the backup manifest for a backup directory, if one exists.
2487    ///
2488    /// # Errors
2489    ///
2490    /// Returns an error if the manifest file exists but cannot be parsed.
2491    #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2492    pub fn read_backup_manifest(
2493        backup_dir: &std::path::Path,
2494    ) -> Result<Option<backup::BackupManifest>> {
2495        backup::read_manifest(backup_dir)
2496    }
2497
2498    /// Returns the current backup cursor (last backed-up position), if any.
2499    #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2500    #[must_use]
2501    pub fn backup_cursor(&self) -> Option<backup::BackupCursor> {
2502        self.wal
2503            .as_ref()
2504            .and_then(|wal| backup::read_backup_cursor(wal.dir()).ok().flatten())
2505    }
2506
2507    /// Restores a database from a backup chain to a specific epoch.
2508    ///
2509    /// Copies the full backup to `output_path`, then replays incremental
2510    /// WAL segments up to `target_epoch`. The restored database can be
2511    /// opened with [`GrafeoDB::open`].
2512    ///
2513    /// # Errors
2514    ///
2515    /// Returns an error if the backup chain does not cover the target epoch,
2516    /// segment checksums fail, or I/O fails.
2517    #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2518    pub fn restore_to_epoch(
2519        backup_dir: &std::path::Path,
2520        target_epoch: grafeo_common::types::EpochId,
2521        output_path: &std::path::Path,
2522    ) -> Result<()> {
2523        backup::do_restore_to_epoch(backup_dir, target_epoch, output_path)
2524    }
2525
2526    /// Writes the current database state to the `.grafeo` file using the unified flush.
2527    ///
2528    /// Does NOT remove the sidecar WAL: callers that want to clean up
2529    /// the sidecar (e.g. `close()`) should call `fm.remove_sidecar_wal()`
2530    /// separately after this returns.
2531    #[cfg(feature = "grafeo-file")]
2532    fn checkpoint_to_file(
2533        &self,
2534        fm: &GrafeoFileManager,
2535        reason: flush::FlushReason,
2536    ) -> Result<flush::FlushResult> {
2537        let sections = self.build_sections();
2538        let section_refs: Vec<&dyn grafeo_common::storage::Section> =
2539            sections.iter().map(|s| s.as_ref()).collect();
2540        #[cfg(feature = "lpg")]
2541        let context = flush::build_context(self.lpg_store(), &self.transaction_manager);
2542        #[cfg(not(feature = "lpg"))]
2543        let context = flush::build_context_minimal(&self.transaction_manager);
2544
2545        flush::flush(
2546            fm,
2547            &section_refs,
2548            &context,
2549            reason,
2550            #[cfg(feature = "wal")]
2551            self.wal.as_deref(),
2552        )
2553    }
2554
2555    /// Returns the file manager if using single-file format.
2556    #[cfg(feature = "grafeo-file")]
2557    #[must_use]
2558    pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
2559        self.file_manager.as_ref()
2560    }
2561}
2562
2563impl Drop for GrafeoDB {
2564    fn drop(&mut self) {
2565        if let Err(e) = self.close() {
2566            grafeo_error!("Error closing database: {}", e);
2567        }
2568    }
2569}
2570
2571#[cfg(feature = "lpg")]
2572impl crate::admin::AdminService for GrafeoDB {
2573    fn info(&self) -> crate::admin::DatabaseInfo {
2574        self.info()
2575    }
2576
2577    fn detailed_stats(&self) -> crate::admin::DatabaseStats {
2578        self.detailed_stats()
2579    }
2580
2581    fn schema(&self) -> crate::admin::SchemaInfo {
2582        self.schema()
2583    }
2584
2585    fn validate(&self) -> crate::admin::ValidationResult {
2586        self.validate()
2587    }
2588
2589    fn wal_status(&self) -> crate::admin::WalStatus {
2590        self.wal_status()
2591    }
2592
2593    fn wal_checkpoint(&self) -> Result<()> {
2594        self.wal_checkpoint()
2595    }
2596}
2597
2598// =========================================================================
2599// Query Result Types
2600// =========================================================================
2601
2602/// The result of running a query.
2603///
2604/// Contains rows and columns, like a table. Use [`iter()`](Self::iter) to
2605/// loop through rows, or [`scalar()`](Self::scalar) if you expect a single value.
2606///
2607/// # Examples
2608///
2609/// ```
2610/// use grafeo_engine::GrafeoDB;
2611///
2612/// let db = GrafeoDB::new_in_memory();
2613/// db.create_node(&["Person"]);
2614///
2615/// let result = db.execute("MATCH (p:Person) RETURN count(p) AS total")?;
2616///
2617/// // Check what we got
2618/// println!("Columns: {:?}", result.columns);
2619/// println!("Rows: {}", result.row_count());
2620///
2621/// // Iterate through results
2622/// for row in result.iter() {
2623///     println!("{:?}", row);
2624/// }
2625/// # Ok::<(), grafeo_common::utils::error::Error>(())
2626/// ```
2627#[derive(Debug)]
2628pub struct QueryResult {
2629    /// Column names from the RETURN clause.
2630    pub columns: Vec<String>,
2631    /// Column types - useful for distinguishing NodeId/EdgeId from plain integers.
2632    pub column_types: Vec<grafeo_common::types::LogicalType>,
2633    /// The actual result rows.
2634    ///
2635    /// Use [`rows()`](Self::rows) for borrowed access or
2636    /// [`into_rows()`](Self::into_rows) to take ownership.
2637    pub(crate) rows: Vec<Vec<grafeo_common::types::Value>>,
2638    /// Query execution time in milliseconds (if timing was enabled).
2639    pub execution_time_ms: Option<f64>,
2640    /// Number of rows scanned during query execution (estimate).
2641    pub rows_scanned: Option<u64>,
2642    /// Status message for DDL and session commands (e.g., "Created node type 'Person'").
2643    pub status_message: Option<String>,
2644    /// GQLSTATUS code per ISO/IEC 39075:2024, sec 23.
2645    pub gql_status: grafeo_common::utils::GqlStatus,
2646}
2647
2648impl QueryResult {
2649    /// Creates a fully empty query result (no columns, no rows).
2650    #[must_use]
2651    pub fn empty() -> Self {
2652        Self {
2653            columns: Vec::new(),
2654            column_types: Vec::new(),
2655            rows: Vec::new(),
2656            execution_time_ms: None,
2657            rows_scanned: None,
2658            status_message: None,
2659            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2660        }
2661    }
2662
2663    /// Creates a query result with only a status message (for DDL commands).
2664    #[must_use]
2665    pub fn status(msg: impl Into<String>) -> Self {
2666        Self {
2667            columns: Vec::new(),
2668            column_types: Vec::new(),
2669            rows: Vec::new(),
2670            execution_time_ms: None,
2671            rows_scanned: None,
2672            status_message: Some(msg.into()),
2673            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2674        }
2675    }
2676
2677    /// Creates a new empty query result.
2678    #[must_use]
2679    pub fn new(columns: Vec<String>) -> Self {
2680        let len = columns.len();
2681        Self {
2682            columns,
2683            column_types: vec![grafeo_common::types::LogicalType::Any; len],
2684            rows: Vec::new(),
2685            execution_time_ms: None,
2686            rows_scanned: None,
2687            status_message: None,
2688            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2689        }
2690    }
2691
2692    /// Creates a new empty query result with column types.
2693    #[must_use]
2694    pub fn with_types(
2695        columns: Vec<String>,
2696        column_types: Vec<grafeo_common::types::LogicalType>,
2697    ) -> Self {
2698        Self {
2699            columns,
2700            column_types,
2701            rows: Vec::new(),
2702            execution_time_ms: None,
2703            rows_scanned: None,
2704            status_message: None,
2705            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2706        }
2707    }
2708
2709    /// Creates a query result with pre-populated rows.
2710    #[must_use]
2711    pub fn from_rows(columns: Vec<String>, rows: Vec<Vec<grafeo_common::types::Value>>) -> Self {
2712        let len = columns.len();
2713        Self {
2714            columns,
2715            column_types: vec![grafeo_common::types::LogicalType::Any; len],
2716            rows,
2717            execution_time_ms: None,
2718            rows_scanned: None,
2719            status_message: None,
2720            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2721        }
2722    }
2723
2724    /// Appends a row to this result.
2725    pub fn push_row(&mut self, row: Vec<grafeo_common::types::Value>) {
2726        self.rows.push(row);
2727    }
2728
2729    /// Sets the execution metrics on this result.
2730    pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
2731        self.execution_time_ms = Some(execution_time_ms);
2732        self.rows_scanned = Some(rows_scanned);
2733        self
2734    }
2735
2736    /// Returns the execution time in milliseconds, if available.
2737    #[must_use]
2738    pub fn execution_time_ms(&self) -> Option<f64> {
2739        self.execution_time_ms
2740    }
2741
2742    /// Returns the number of rows scanned, if available.
2743    #[must_use]
2744    pub fn rows_scanned(&self) -> Option<u64> {
2745        self.rows_scanned
2746    }
2747
2748    /// Returns the number of rows.
2749    #[must_use]
2750    pub fn row_count(&self) -> usize {
2751        self.rows.len()
2752    }
2753
2754    /// Returns the number of columns.
2755    #[must_use]
2756    pub fn column_count(&self) -> usize {
2757        self.columns.len()
2758    }
2759
2760    /// Returns true if the result is empty.
2761    #[must_use]
2762    pub fn is_empty(&self) -> bool {
2763        self.rows.is_empty()
2764    }
2765
2766    /// Extracts a single value from the result.
2767    ///
2768    /// Use this when your query returns exactly one row with one column,
2769    /// like `RETURN count(n)` or `RETURN sum(p.amount)`.
2770    ///
2771    /// # Errors
2772    ///
2773    /// Returns an error if the result has multiple rows or columns.
2774    pub fn scalar<T: FromValue>(&self) -> Result<T> {
2775        if self.rows.len() != 1 || self.columns.len() != 1 {
2776            return Err(grafeo_common::utils::error::Error::InvalidValue(
2777                "Expected single value".to_string(),
2778            ));
2779        }
2780        T::from_value(&self.rows[0][0])
2781    }
2782
2783    /// Returns a slice of all result rows.
2784    #[must_use]
2785    pub fn rows(&self) -> &[Vec<grafeo_common::types::Value>] {
2786        &self.rows
2787    }
2788
2789    /// Takes ownership of all result rows.
2790    #[must_use]
2791    pub fn into_rows(self) -> Vec<Vec<grafeo_common::types::Value>> {
2792        self.rows
2793    }
2794
2795    /// Returns an iterator over the rows.
2796    pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
2797        self.rows.iter()
2798    }
2799
2800    /// Converts this query result to an Arrow [`RecordBatch`](arrow_array::RecordBatch).
2801    ///
2802    /// Each column in the result becomes an Arrow array. Type mapping:
2803    /// - `Int64` / `Float64` / `Bool` / `String` / `Bytes`: direct Arrow equivalents
2804    /// - `Timestamp` / `ZonedDatetime`: `Timestamp(Microsecond, UTC)`
2805    /// - `Date`: `Date32`, `Time`: `Time64(Nanosecond)`
2806    /// - `Vector`: `FixedSizeList(Float32, dim)`
2807    /// - `Duration` / `List` / `Map` / `Path`: serialized as `Utf8`
2808    ///
2809    /// Heterogeneous columns (mixed types) fall back to `Utf8`.
2810    ///
2811    /// # Errors
2812    ///
2813    /// Returns [`ArrowExportError`](arrow::ArrowExportError) if Arrow array construction fails.
2814    #[cfg(feature = "arrow-export")]
2815    pub fn to_record_batch(
2816        &self,
2817    ) -> std::result::Result<arrow_array::RecordBatch, arrow::ArrowExportError> {
2818        arrow::query_result_to_record_batch(&self.columns, &self.column_types, &self.rows)
2819    }
2820
2821    /// Serializes this query result as Arrow IPC stream bytes.
2822    ///
2823    /// The returned bytes can be read by any Arrow implementation:
2824    /// - Python: `pyarrow.ipc.open_stream(buf).read_all()`
2825    /// - Polars: `pl.read_ipc(buf)`
2826    /// - Node.js: `apache-arrow` `RecordBatchStreamReader`
2827    ///
2828    /// # Errors
2829    ///
2830    /// Returns [`ArrowExportError`](arrow::ArrowExportError) on conversion or serialization failure.
2831    #[cfg(feature = "arrow-export")]
2832    pub fn to_arrow_ipc(&self) -> std::result::Result<Vec<u8>, arrow::ArrowExportError> {
2833        let batch = self.to_record_batch()?;
2834        arrow::record_batch_to_ipc_stream(&batch)
2835    }
2836}
2837
2838impl std::fmt::Display for QueryResult {
2839    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2840        let table = grafeo_common::fmt::format_result_table(
2841            &self.columns,
2842            &self.rows,
2843            self.execution_time_ms,
2844            self.status_message.as_deref(),
2845        );
2846        f.write_str(&table)
2847    }
2848}
2849
2850/// Converts a [`grafeo_common::types::Value`] to a concrete Rust type.
2851///
2852/// Implemented for common types like `i64`, `f64`, `String`, and `bool`.
2853/// Used by [`QueryResult::scalar()`] to extract typed values.
2854pub trait FromValue: Sized {
2855    /// Attempts the conversion, returning an error on type mismatch.
2856    ///
2857    /// # Errors
2858    ///
2859    /// Returns `Error::TypeMismatch` if the value is not the expected type.
2860    fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
2861}
2862
2863impl FromValue for i64 {
2864    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2865        value
2866            .as_int64()
2867            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2868                expected: "INT64".to_string(),
2869                found: value.type_name().to_string(),
2870            })
2871    }
2872}
2873
2874impl FromValue for f64 {
2875    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2876        value
2877            .as_float64()
2878            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2879                expected: "FLOAT64".to_string(),
2880                found: value.type_name().to_string(),
2881            })
2882    }
2883}
2884
2885impl FromValue for String {
2886    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2887        value.as_str().map(String::from).ok_or_else(|| {
2888            grafeo_common::utils::error::Error::TypeMismatch {
2889                expected: "STRING".to_string(),
2890                found: value.type_name().to_string(),
2891            }
2892        })
2893    }
2894}
2895
2896impl FromValue for bool {
2897    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2898        value
2899            .as_bool()
2900            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2901                expected: "BOOL".to_string(),
2902                found: value.type_name().to_string(),
2903            })
2904    }
2905}
2906
2907#[cfg(test)]
2908mod tests {
2909    use super::*;
2910
2911    #[test]
2912    fn test_create_in_memory_database() {
2913        let db = GrafeoDB::new_in_memory();
2914        assert_eq!(db.node_count(), 0);
2915        assert_eq!(db.edge_count(), 0);
2916    }
2917
2918    #[test]
2919    fn test_database_config() {
2920        let config = Config::in_memory().with_threads(4).with_query_logging();
2921
2922        let db = GrafeoDB::with_config(config).unwrap();
2923        assert_eq!(db.config().threads, 4);
2924        assert!(db.config().query_logging);
2925    }
2926
2927    #[test]
2928    fn test_database_session() {
2929        let db = GrafeoDB::new_in_memory();
2930        let _session = db.session();
2931        // Session should be created successfully
2932    }
2933
2934    #[cfg(feature = "wal")]
2935    #[test]
2936    fn test_persistent_database_recovery() {
2937        use grafeo_common::types::Value;
2938        use tempfile::tempdir;
2939
2940        let dir = tempdir().unwrap();
2941        let db_path = dir.path().join("test_db");
2942
2943        // Create database and add some data
2944        {
2945            let db = GrafeoDB::open(&db_path).unwrap();
2946
2947            let alix = db.create_node(&["Person"]);
2948            db.set_node_property(alix, "name", Value::from("Alix"));
2949
2950            let gus = db.create_node(&["Person"]);
2951            db.set_node_property(gus, "name", Value::from("Gus"));
2952
2953            let _edge = db.create_edge(alix, gus, "KNOWS");
2954
2955            // Explicitly close to flush WAL
2956            db.close().unwrap();
2957        }
2958
2959        // Reopen and verify data was recovered
2960        {
2961            let db = GrafeoDB::open(&db_path).unwrap();
2962
2963            assert_eq!(db.node_count(), 2);
2964            assert_eq!(db.edge_count(), 1);
2965
2966            // Verify nodes exist
2967            let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
2968            assert!(node0.is_some());
2969
2970            let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
2971            assert!(node1.is_some());
2972        }
2973    }
2974
2975    #[cfg(feature = "wal")]
2976    #[test]
2977    fn test_wal_logging() {
2978        use tempfile::tempdir;
2979
2980        let dir = tempdir().unwrap();
2981        let db_path = dir.path().join("wal_test_db");
2982
2983        let db = GrafeoDB::open(&db_path).unwrap();
2984
2985        // Create some data
2986        let node = db.create_node(&["Test"]);
2987        db.delete_node(node);
2988
2989        // WAL should have records
2990        if let Some(wal) = db.wal() {
2991            assert!(wal.record_count() > 0);
2992        }
2993
2994        db.close().unwrap();
2995    }
2996
2997    #[cfg(feature = "wal")]
2998    #[test]
2999    fn test_wal_recovery_multiple_sessions() {
3000        // Tests that WAL recovery works correctly across multiple open/close cycles
3001        use grafeo_common::types::Value;
3002        use tempfile::tempdir;
3003
3004        let dir = tempdir().unwrap();
3005        let db_path = dir.path().join("multi_session_db");
3006
3007        // Session 1: Create initial data
3008        {
3009            let db = GrafeoDB::open(&db_path).unwrap();
3010            let alix = db.create_node(&["Person"]);
3011            db.set_node_property(alix, "name", Value::from("Alix"));
3012            db.close().unwrap();
3013        }
3014
3015        // Session 2: Add more data
3016        {
3017            let db = GrafeoDB::open(&db_path).unwrap();
3018            assert_eq!(db.node_count(), 1); // Previous data recovered
3019            let gus = db.create_node(&["Person"]);
3020            db.set_node_property(gus, "name", Value::from("Gus"));
3021            db.close().unwrap();
3022        }
3023
3024        // Session 3: Verify all data
3025        {
3026            let db = GrafeoDB::open(&db_path).unwrap();
3027            assert_eq!(db.node_count(), 2);
3028
3029            // Verify properties were recovered correctly
3030            let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
3031            assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
3032
3033            let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
3034            assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
3035        }
3036    }
3037
3038    #[cfg(feature = "wal")]
3039    #[test]
3040    fn test_database_consistency_after_mutations() {
3041        // Tests that database remains consistent after a series of create/delete operations
3042        use grafeo_common::types::Value;
3043        use tempfile::tempdir;
3044
3045        let dir = tempdir().unwrap();
3046        let db_path = dir.path().join("consistency_db");
3047
3048        {
3049            let db = GrafeoDB::open(&db_path).unwrap();
3050
3051            // Create nodes
3052            let a = db.create_node(&["Node"]);
3053            let b = db.create_node(&["Node"]);
3054            let c = db.create_node(&["Node"]);
3055
3056            // Create edges
3057            let e1 = db.create_edge(a, b, "LINKS");
3058            let _e2 = db.create_edge(b, c, "LINKS");
3059
3060            // Delete middle node and its edge
3061            db.delete_edge(e1);
3062            db.delete_node(b);
3063
3064            // Set properties on remaining nodes
3065            db.set_node_property(a, "value", Value::Int64(1));
3066            db.set_node_property(c, "value", Value::Int64(3));
3067
3068            db.close().unwrap();
3069        }
3070
3071        // Reopen and verify consistency
3072        {
3073            let db = GrafeoDB::open(&db_path).unwrap();
3074
3075            // Should have 2 nodes (a and c), b was deleted
3076            // Note: node_count includes deleted nodes in some implementations
3077            // What matters is that the non-deleted nodes are accessible
3078            let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
3079            assert!(node_a.is_some());
3080
3081            let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
3082            assert!(node_c.is_some());
3083
3084            // Middle node should be deleted
3085            let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
3086            assert!(node_b.is_none());
3087        }
3088    }
3089
3090    #[cfg(feature = "wal")]
3091    #[test]
3092    fn test_close_is_idempotent() {
3093        // Calling close() multiple times should not cause errors
3094        use tempfile::tempdir;
3095
3096        let dir = tempdir().unwrap();
3097        let db_path = dir.path().join("close_test_db");
3098
3099        let db = GrafeoDB::open(&db_path).unwrap();
3100        db.create_node(&["Test"]);
3101
3102        // First close should succeed
3103        assert!(db.close().is_ok());
3104
3105        // Second close should also succeed (idempotent)
3106        assert!(db.close().is_ok());
3107    }
3108
3109    #[test]
3110    fn test_with_store_external_backend() {
3111        use grafeo_core::graph::lpg::LpgStore;
3112
3113        let external = Arc::new(LpgStore::new().unwrap());
3114
3115        // Seed data on the external store directly
3116        let n1 = external.create_node(&["Person"]);
3117        external.set_node_property(n1, "name", grafeo_common::types::Value::from("Alix"));
3118
3119        let db = GrafeoDB::with_store(
3120            Arc::clone(&external) as Arc<dyn GraphStoreMut>,
3121            Config::in_memory(),
3122        )
3123        .unwrap();
3124
3125        let session = db.session();
3126
3127        // Session should see data from the external store via execute
3128        #[cfg(feature = "gql")]
3129        {
3130            let result = session.execute("MATCH (p:Person) RETURN p.name").unwrap();
3131            assert_eq!(result.rows.len(), 1);
3132        }
3133    }
3134
3135    #[test]
3136    fn test_with_config_custom_memory_limit() {
3137        let config = Config::in_memory().with_memory_limit(64 * 1024 * 1024); // 64 MB
3138
3139        let db = GrafeoDB::with_config(config).unwrap();
3140        assert_eq!(db.config().memory_limit, Some(64 * 1024 * 1024));
3141        assert_eq!(db.node_count(), 0);
3142    }
3143
3144    #[cfg(feature = "metrics")]
3145    #[test]
3146    fn test_database_metrics_registry() {
3147        let db = GrafeoDB::new_in_memory();
3148
3149        // Perform some operations
3150        db.create_node(&["Person"]);
3151        db.create_node(&["Person"]);
3152
3153        // Check that metrics snapshot returns data
3154        let snap = db.metrics();
3155        // Session created counter should reflect at least 0 (metrics is initialized)
3156        assert_eq!(snap.query_count, 0); // No queries executed yet
3157    }
3158
3159    #[test]
3160    fn test_query_result_has_metrics() {
3161        // Verifies that query results include execution metrics
3162        let db = GrafeoDB::new_in_memory();
3163        db.create_node(&["Person"]);
3164        db.create_node(&["Person"]);
3165
3166        #[cfg(feature = "gql")]
3167        {
3168            let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
3169
3170            // Metrics should be populated
3171            assert!(result.execution_time_ms.is_some());
3172            assert!(result.rows_scanned.is_some());
3173            assert!(result.execution_time_ms.unwrap() >= 0.0);
3174            assert_eq!(result.rows_scanned.unwrap(), 2);
3175        }
3176    }
3177
3178    #[test]
3179    fn test_empty_query_result_metrics() {
3180        // Verifies metrics are correct for queries returning no results
3181        let db = GrafeoDB::new_in_memory();
3182        db.create_node(&["Person"]);
3183
3184        #[cfg(feature = "gql")]
3185        {
3186            // Query that matches nothing
3187            let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
3188
3189            assert!(result.execution_time_ms.is_some());
3190            assert!(result.rows_scanned.is_some());
3191            assert_eq!(result.rows_scanned.unwrap(), 0);
3192        }
3193    }
3194
3195    #[cfg(feature = "cdc")]
3196    mod cdc_integration {
3197        use super::*;
3198
3199        /// Helper: creates an in-memory database with CDC enabled.
3200        fn cdc_db() -> GrafeoDB {
3201            GrafeoDB::with_config(Config::in_memory().with_cdc()).unwrap()
3202        }
3203
3204        #[test]
3205        fn test_node_lifecycle_history() {
3206            let db = cdc_db();
3207
3208            // Create
3209            let id = db.create_node(&["Person"]);
3210            // Update
3211            db.set_node_property(id, "name", "Alix".into());
3212            db.set_node_property(id, "name", "Gus".into());
3213            // Delete
3214            db.delete_node(id);
3215
3216            let history = db.history(id).unwrap();
3217            assert_eq!(history.len(), 4); // create + 2 updates + delete
3218            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3219            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
3220            assert!(history[1].before.is_none()); // first set_node_property has no prior value
3221            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
3222            assert!(history[2].before.is_some()); // second update has prior "Alix"
3223            assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
3224        }
3225
3226        #[test]
3227        fn test_edge_lifecycle_history() {
3228            let db = cdc_db();
3229
3230            let alix = db.create_node(&["Person"]);
3231            let gus = db.create_node(&["Person"]);
3232            let edge = db.create_edge(alix, gus, "KNOWS");
3233            db.set_edge_property(edge, "since", 2024i64.into());
3234            db.delete_edge(edge);
3235
3236            let history = db.history(edge).unwrap();
3237            assert_eq!(history.len(), 3); // create + update + delete
3238            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3239            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
3240            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
3241        }
3242
3243        #[test]
3244        fn test_create_node_with_props_cdc() {
3245            let db = cdc_db();
3246
3247            let id = db.create_node_with_props(
3248                &["Person"],
3249                vec![
3250                    ("name", grafeo_common::types::Value::from("Alix")),
3251                    ("age", grafeo_common::types::Value::from(30i64)),
3252                ],
3253            );
3254
3255            let history = db.history(id).unwrap();
3256            assert_eq!(history.len(), 1);
3257            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3258            // Props should be captured
3259            let after = history[0].after.as_ref().unwrap();
3260            assert_eq!(after.len(), 2);
3261        }
3262
3263        #[test]
3264        fn test_changes_between() {
3265            let db = cdc_db();
3266
3267            let id1 = db.create_node(&["A"]);
3268            let _id2 = db.create_node(&["B"]);
3269            db.set_node_property(id1, "x", 1i64.into());
3270
3271            // All events should be at the same epoch (in-memory, epoch doesn't advance without tx)
3272            let changes = db
3273                .changes_between(
3274                    grafeo_common::types::EpochId(0),
3275                    grafeo_common::types::EpochId(u64::MAX),
3276                )
3277                .unwrap();
3278            assert_eq!(changes.len(), 3); // 2 creates + 1 update
3279        }
3280
3281        #[test]
3282        fn test_cdc_disabled_by_default() {
3283            let db = GrafeoDB::new_in_memory();
3284            assert!(!db.is_cdc_enabled());
3285
3286            let id = db.create_node(&["Person"]);
3287            db.set_node_property(id, "name", "Alix".into());
3288
3289            let history = db.history(id).unwrap();
3290            assert!(history.is_empty(), "CDC off by default: no events recorded");
3291        }
3292
3293        #[test]
3294        fn test_session_with_cdc_override_on() {
3295            // Database default is off, but session opts in
3296            let db = GrafeoDB::new_in_memory();
3297            let session = db.session_with_cdc(true);
3298            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3299            // The CDC log should have events from the opted-in session
3300            let changes = db
3301                .changes_between(
3302                    grafeo_common::types::EpochId(0),
3303                    grafeo_common::types::EpochId(u64::MAX),
3304                )
3305                .unwrap();
3306            assert!(
3307                !changes.is_empty(),
3308                "session_with_cdc(true) should record events"
3309            );
3310        }
3311
3312        #[test]
3313        fn test_session_with_cdc_override_off() {
3314            // Database default is on, but session opts out
3315            let db = cdc_db();
3316            let session = db.session_with_cdc(false);
3317            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3318            let changes = db
3319                .changes_between(
3320                    grafeo_common::types::EpochId(0),
3321                    grafeo_common::types::EpochId(u64::MAX),
3322                )
3323                .unwrap();
3324            assert!(
3325                changes.is_empty(),
3326                "session_with_cdc(false) should not record events"
3327            );
3328        }
3329
3330        #[test]
3331        fn test_set_cdc_enabled_runtime() {
3332            let db = GrafeoDB::new_in_memory();
3333            assert!(!db.is_cdc_enabled());
3334
3335            // Enable at runtime
3336            db.set_cdc_enabled(true);
3337            assert!(db.is_cdc_enabled());
3338
3339            let id = db.create_node(&["Person"]);
3340            let history = db.history(id).unwrap();
3341            assert_eq!(history.len(), 1, "CDC enabled at runtime records events");
3342
3343            // Disable again
3344            db.set_cdc_enabled(false);
3345            let id2 = db.create_node(&["Person"]);
3346            let history2 = db.history(id2).unwrap();
3347            assert!(
3348                history2.is_empty(),
3349                "CDC disabled at runtime stops recording"
3350            );
3351        }
3352    }
3353
3354    #[test]
3355    fn test_with_store_basic() {
3356        use grafeo_core::graph::lpg::LpgStore;
3357
3358        let store = Arc::new(LpgStore::new().unwrap());
3359        let n1 = store.create_node(&["Person"]);
3360        store.set_node_property(n1, "name", "Alix".into());
3361
3362        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
3363        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
3364
3365        let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
3366        assert_eq!(result.rows.len(), 1);
3367    }
3368
3369    #[test]
3370    fn test_with_store_session() {
3371        use grafeo_core::graph::lpg::LpgStore;
3372
3373        let store = Arc::new(LpgStore::new().unwrap());
3374        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
3375        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
3376
3377        let session = db.session();
3378        let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
3379        assert_eq!(result.rows.len(), 1);
3380    }
3381
3382    #[test]
3383    fn test_with_store_mutations() {
3384        use grafeo_core::graph::lpg::LpgStore;
3385
3386        let store = Arc::new(LpgStore::new().unwrap());
3387        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
3388        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
3389
3390        let mut session = db.session();
3391
3392        // Use an explicit transaction so INSERT and MATCH share the same
3393        // transaction context. With PENDING epochs, uncommitted versions are
3394        // only visible to the owning transaction.
3395        session.begin_transaction().unwrap();
3396        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3397
3398        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
3399        assert_eq!(result.rows.len(), 1);
3400
3401        session.commit().unwrap();
3402    }
3403
3404    // =========================================================================
3405    // QueryResult tests
3406    // =========================================================================
3407
3408    #[test]
3409    fn test_query_result_empty() {
3410        let result = QueryResult::empty();
3411        assert!(result.is_empty());
3412        assert_eq!(result.row_count(), 0);
3413        assert_eq!(result.column_count(), 0);
3414        assert!(result.execution_time_ms().is_none());
3415        assert!(result.rows_scanned().is_none());
3416        assert!(result.status_message.is_none());
3417    }
3418
3419    #[test]
3420    fn test_query_result_status() {
3421        let result = QueryResult::status("Created node type 'Person'");
3422        assert!(result.is_empty());
3423        assert_eq!(result.column_count(), 0);
3424        assert_eq!(
3425            result.status_message.as_deref(),
3426            Some("Created node type 'Person'")
3427        );
3428    }
3429
3430    #[test]
3431    fn test_query_result_new_with_columns() {
3432        let result = QueryResult::new(vec!["name".into(), "age".into()]);
3433        assert_eq!(result.column_count(), 2);
3434        assert_eq!(result.row_count(), 0);
3435        assert!(result.is_empty());
3436        // Column types should default to Any
3437        assert_eq!(
3438            result.column_types,
3439            vec![
3440                grafeo_common::types::LogicalType::Any,
3441                grafeo_common::types::LogicalType::Any
3442            ]
3443        );
3444    }
3445
3446    #[test]
3447    fn test_query_result_with_types() {
3448        use grafeo_common::types::LogicalType;
3449        let result = QueryResult::with_types(
3450            vec!["name".into(), "age".into()],
3451            vec![LogicalType::String, LogicalType::Int64],
3452        );
3453        assert_eq!(result.column_count(), 2);
3454        assert_eq!(result.column_types[0], LogicalType::String);
3455        assert_eq!(result.column_types[1], LogicalType::Int64);
3456    }
3457
3458    #[test]
3459    fn test_query_result_with_metrics() {
3460        let result = QueryResult::new(vec!["x".into()]).with_metrics(42.5, 100);
3461        assert_eq!(result.execution_time_ms(), Some(42.5));
3462        assert_eq!(result.rows_scanned(), Some(100));
3463    }
3464
3465    #[test]
3466    fn test_query_result_scalar_success() {
3467        use grafeo_common::types::Value;
3468        let mut result = QueryResult::new(vec!["count".into()]);
3469        result.rows.push(vec![Value::Int64(42)]);
3470
3471        let val: i64 = result.scalar().unwrap();
3472        assert_eq!(val, 42);
3473    }
3474
3475    #[test]
3476    fn test_query_result_scalar_wrong_shape() {
3477        use grafeo_common::types::Value;
3478        // Multiple rows
3479        let mut result = QueryResult::new(vec!["x".into()]);
3480        result.rows.push(vec![Value::Int64(1)]);
3481        result.rows.push(vec![Value::Int64(2)]);
3482        assert!(result.scalar::<i64>().is_err());
3483
3484        // Multiple columns
3485        let mut result2 = QueryResult::new(vec!["a".into(), "b".into()]);
3486        result2.rows.push(vec![Value::Int64(1), Value::Int64(2)]);
3487        assert!(result2.scalar::<i64>().is_err());
3488
3489        // Empty
3490        let result3 = QueryResult::new(vec!["x".into()]);
3491        assert!(result3.scalar::<i64>().is_err());
3492    }
3493
3494    #[test]
3495    fn test_query_result_iter() {
3496        use grafeo_common::types::Value;
3497        let mut result = QueryResult::new(vec!["x".into()]);
3498        result.rows.push(vec![Value::Int64(1)]);
3499        result.rows.push(vec![Value::Int64(2)]);
3500
3501        let collected: Vec<_> = result.iter().collect();
3502        assert_eq!(collected.len(), 2);
3503    }
3504
3505    #[test]
3506    fn test_query_result_display() {
3507        use grafeo_common::types::Value;
3508        let mut result = QueryResult::new(vec!["name".into()]);
3509        result.rows.push(vec![Value::from("Alix")]);
3510        let display = result.to_string();
3511        assert!(display.contains("name"));
3512        assert!(display.contains("Alix"));
3513    }
3514
3515    // =========================================================================
3516    // FromValue error paths
3517    // =========================================================================
3518
3519    #[test]
3520    fn test_from_value_i64_type_mismatch() {
3521        use grafeo_common::types::Value;
3522        let val = Value::from("not a number");
3523        assert!(i64::from_value(&val).is_err());
3524    }
3525
3526    #[test]
3527    fn test_from_value_f64_type_mismatch() {
3528        use grafeo_common::types::Value;
3529        let val = Value::from("not a float");
3530        assert!(f64::from_value(&val).is_err());
3531    }
3532
3533    #[test]
3534    fn test_from_value_string_type_mismatch() {
3535        use grafeo_common::types::Value;
3536        let val = Value::Int64(42);
3537        assert!(String::from_value(&val).is_err());
3538    }
3539
3540    #[test]
3541    fn test_from_value_bool_type_mismatch() {
3542        use grafeo_common::types::Value;
3543        let val = Value::Int64(1);
3544        assert!(bool::from_value(&val).is_err());
3545    }
3546
3547    #[test]
3548    fn test_from_value_all_success() {
3549        use grafeo_common::types::Value;
3550        assert_eq!(i64::from_value(&Value::Int64(99)).unwrap(), 99);
3551        assert!((f64::from_value(&Value::Float64(2.72)).unwrap() - 2.72).abs() < f64::EPSILON);
3552        assert_eq!(String::from_value(&Value::from("hello")).unwrap(), "hello");
3553        assert!(bool::from_value(&Value::Bool(true)).unwrap());
3554    }
3555
3556    // =========================================================================
3557    // GrafeoDB accessor tests
3558    // =========================================================================
3559
3560    #[test]
3561    fn test_database_is_read_only_false_by_default() {
3562        let db = GrafeoDB::new_in_memory();
3563        assert!(!db.is_read_only());
3564    }
3565
3566    #[test]
3567    fn test_database_graph_model() {
3568        let db = GrafeoDB::new_in_memory();
3569        assert_eq!(db.graph_model(), crate::config::GraphModel::Lpg);
3570    }
3571
3572    #[test]
3573    fn test_database_memory_limit_none_by_default() {
3574        let db = GrafeoDB::new_in_memory();
3575        assert!(db.memory_limit().is_none());
3576    }
3577
3578    #[test]
3579    fn test_database_memory_limit_custom() {
3580        let config = Config::in_memory().with_memory_limit(128 * 1024 * 1024);
3581        let db = GrafeoDB::with_config(config).unwrap();
3582        assert_eq!(db.memory_limit(), Some(128 * 1024 * 1024));
3583    }
3584
3585    #[test]
3586    fn test_database_adaptive_config() {
3587        let db = GrafeoDB::new_in_memory();
3588        let adaptive = db.adaptive_config();
3589        assert!(adaptive.enabled);
3590        assert!((adaptive.threshold - 3.0).abs() < f64::EPSILON);
3591    }
3592
3593    #[test]
3594    fn test_database_buffer_manager() {
3595        let db = GrafeoDB::new_in_memory();
3596        let _bm = db.buffer_manager();
3597        // Just verify it doesn't panic
3598    }
3599
3600    #[test]
3601    fn test_database_query_cache() {
3602        let db = GrafeoDB::new_in_memory();
3603        let _qc = db.query_cache();
3604    }
3605
3606    #[test]
3607    fn test_database_clear_plan_cache() {
3608        let db = GrafeoDB::new_in_memory();
3609        // Execute a query to populate the cache
3610        #[cfg(feature = "gql")]
3611        {
3612            let _ = db.execute("MATCH (n) RETURN count(n)");
3613        }
3614        db.clear_plan_cache();
3615        // No panic means success
3616    }
3617
3618    #[test]
3619    fn test_database_gc() {
3620        let db = GrafeoDB::new_in_memory();
3621        db.create_node(&["Person"]);
3622        db.gc();
3623        // Verify no panic, node still accessible
3624        assert_eq!(db.node_count(), 1);
3625    }
3626
3627    // =========================================================================
3628    // Named graph management
3629    // =========================================================================
3630
3631    #[test]
3632    fn test_create_and_list_graphs() {
3633        let db = GrafeoDB::new_in_memory();
3634        let created = db.create_graph("social").unwrap();
3635        assert!(created);
3636
3637        // Creating same graph again returns false
3638        let created_again = db.create_graph("social").unwrap();
3639        assert!(!created_again);
3640
3641        let names = db.list_graphs();
3642        assert!(names.contains(&"social".to_string()));
3643    }
3644
3645    #[test]
3646    fn test_drop_graph() {
3647        let db = GrafeoDB::new_in_memory();
3648        db.create_graph("temp").unwrap();
3649        assert!(db.drop_graph("temp"));
3650        assert!(!db.drop_graph("temp")); // Already dropped
3651    }
3652
3653    #[test]
3654    fn test_drop_graph_resets_current_graph() {
3655        let db = GrafeoDB::new_in_memory();
3656        db.create_graph("active").unwrap();
3657        db.set_current_graph(Some("active")).unwrap();
3658        assert_eq!(db.current_graph(), Some("active".to_string()));
3659
3660        db.drop_graph("active");
3661        assert_eq!(db.current_graph(), None);
3662    }
3663
3664    // =========================================================================
3665    // Current graph / schema context
3666    // =========================================================================
3667
3668    #[test]
3669    fn test_current_graph_default_none() {
3670        let db = GrafeoDB::new_in_memory();
3671        assert_eq!(db.current_graph(), None);
3672    }
3673
3674    #[test]
3675    fn test_set_current_graph_valid() {
3676        let db = GrafeoDB::new_in_memory();
3677        db.create_graph("social").unwrap();
3678        db.set_current_graph(Some("social")).unwrap();
3679        assert_eq!(db.current_graph(), Some("social".to_string()));
3680    }
3681
3682    #[test]
3683    fn test_set_current_graph_nonexistent() {
3684        let db = GrafeoDB::new_in_memory();
3685        let result = db.set_current_graph(Some("nonexistent"));
3686        assert!(result.is_err());
3687    }
3688
3689    #[test]
3690    fn test_set_current_graph_none_resets() {
3691        let db = GrafeoDB::new_in_memory();
3692        db.create_graph("social").unwrap();
3693        db.set_current_graph(Some("social")).unwrap();
3694        db.set_current_graph(None).unwrap();
3695        assert_eq!(db.current_graph(), None);
3696    }
3697
3698    #[test]
3699    fn test_set_current_graph_default_keyword() {
3700        let db = GrafeoDB::new_in_memory();
3701        // "default" is a special case that always succeeds
3702        db.set_current_graph(Some("default")).unwrap();
3703        assert_eq!(db.current_graph(), Some("default".to_string()));
3704    }
3705
3706    #[test]
3707    fn test_current_schema_default_none() {
3708        let db = GrafeoDB::new_in_memory();
3709        assert_eq!(db.current_schema(), None);
3710    }
3711
3712    #[test]
3713    fn test_set_current_schema_nonexistent() {
3714        let db = GrafeoDB::new_in_memory();
3715        let result = db.set_current_schema(Some("nonexistent"));
3716        assert!(result.is_err());
3717    }
3718
3719    #[test]
3720    fn test_set_current_schema_none_resets() {
3721        let db = GrafeoDB::new_in_memory();
3722        db.set_current_schema(None).unwrap();
3723        assert_eq!(db.current_schema(), None);
3724    }
3725
3726    // =========================================================================
3727    // graph_store / graph_store_mut
3728    // =========================================================================
3729
3730    #[test]
3731    fn test_graph_store_returns_lpg_by_default() {
3732        let db = GrafeoDB::new_in_memory();
3733        db.create_node(&["Person"]);
3734        let store = db.graph_store();
3735        assert_eq!(store.node_count(), 1);
3736    }
3737
3738    #[test]
3739    fn test_graph_store_mut_returns_some_by_default() {
3740        let db = GrafeoDB::new_in_memory();
3741        assert!(db.graph_store_mut().is_some());
3742    }
3743
3744    #[test]
3745    fn test_with_read_store() {
3746        use grafeo_core::graph::lpg::LpgStore;
3747
3748        let store = Arc::new(LpgStore::new().unwrap());
3749        store.create_node(&["Person"]);
3750
3751        let read_store = Arc::clone(&store) as Arc<dyn GraphStoreSearch>;
3752        let db = GrafeoDB::with_read_store(read_store, Config::in_memory()).unwrap();
3753
3754        assert!(db.is_read_only());
3755        assert!(db.graph_store_mut().is_none());
3756
3757        // Read queries should work
3758        let gs = db.graph_store();
3759        assert_eq!(gs.node_count(), 1);
3760    }
3761
3762    #[test]
3763    fn test_with_store_graph_store_methods() {
3764        use grafeo_core::graph::lpg::LpgStore;
3765
3766        let store = Arc::new(LpgStore::new().unwrap());
3767        store.create_node(&["Person"]);
3768
3769        let db = GrafeoDB::with_store(
3770            Arc::clone(&store) as Arc<dyn GraphStoreMut>,
3771            Config::in_memory(),
3772        )
3773        .unwrap();
3774
3775        assert!(!db.is_read_only());
3776        assert!(db.graph_store_mut().is_some());
3777        assert_eq!(db.graph_store().node_count(), 1);
3778    }
3779
3780    // =========================================================================
3781    // session_read_only
3782    // =========================================================================
3783
3784    #[test]
3785    #[allow(deprecated)]
3786    fn test_session_read_only() {
3787        let db = GrafeoDB::new_in_memory();
3788        db.create_node(&["Person"]);
3789
3790        let session = db.session_read_only();
3791        // Read queries should work
3792        #[cfg(feature = "gql")]
3793        {
3794            let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
3795            assert_eq!(result.rows.len(), 1);
3796        }
3797    }
3798
3799    // =========================================================================
3800    // close on in-memory database
3801    // =========================================================================
3802
3803    #[test]
3804    fn test_close_in_memory_database() {
3805        let db = GrafeoDB::new_in_memory();
3806        db.create_node(&["Person"]);
3807        assert!(db.close().is_ok());
3808        // Second close should also be fine (idempotent)
3809        assert!(db.close().is_ok());
3810    }
3811
3812    // =========================================================================
3813    // with_config validation failure
3814    // =========================================================================
3815
3816    #[test]
3817    fn test_with_config_invalid_config_zero_threads() {
3818        let config = Config::in_memory().with_threads(0);
3819        let result = GrafeoDB::with_config(config);
3820        assert!(result.is_err());
3821    }
3822
3823    #[test]
3824    fn test_with_config_invalid_config_zero_memory_limit() {
3825        let config = Config::in_memory().with_memory_limit(0);
3826        let result = GrafeoDB::with_config(config);
3827        assert!(result.is_err());
3828    }
3829
3830    // =========================================================================
3831    // StorageFormat display (for config.rs coverage)
3832    // =========================================================================
3833
3834    #[test]
3835    fn test_storage_format_display() {
3836        use crate::config::StorageFormat;
3837        assert_eq!(StorageFormat::Auto.to_string(), "auto");
3838        assert_eq!(StorageFormat::WalDirectory.to_string(), "wal-directory");
3839        assert_eq!(StorageFormat::SingleFile.to_string(), "single-file");
3840    }
3841
3842    #[test]
3843    fn test_storage_format_default() {
3844        use crate::config::StorageFormat;
3845        assert_eq!(StorageFormat::default(), StorageFormat::Auto);
3846    }
3847
3848    #[test]
3849    fn test_config_with_storage_format() {
3850        use crate::config::StorageFormat;
3851        let config = Config::in_memory().with_storage_format(StorageFormat::SingleFile);
3852        assert_eq!(config.storage_format, StorageFormat::SingleFile);
3853    }
3854
3855    // =========================================================================
3856    // Config CDC
3857    // =========================================================================
3858
3859    #[test]
3860    fn test_config_with_cdc() {
3861        let config = Config::in_memory().with_cdc();
3862        assert!(config.cdc_enabled);
3863    }
3864
3865    #[test]
3866    fn test_config_cdc_default_false() {
3867        let config = Config::in_memory();
3868        assert!(!config.cdc_enabled);
3869    }
3870
3871    // =========================================================================
3872    // ConfigError as std::error::Error
3873    // =========================================================================
3874
3875    #[test]
3876    fn test_config_error_is_error_trait() {
3877        use crate::config::ConfigError;
3878        let err: Box<dyn std::error::Error> = Box::new(ConfigError::ZeroMemoryLimit);
3879        assert!(err.source().is_none());
3880    }
3881
3882    // =========================================================================
3883    // Metrics tests
3884    // =========================================================================
3885
3886    #[cfg(feature = "metrics")]
3887    #[test]
3888    fn test_metrics_prometheus_output() {
3889        let db = GrafeoDB::new_in_memory();
3890        let prom = db.metrics_prometheus();
3891        // Should contain at least some metric names
3892        assert!(!prom.is_empty());
3893    }
3894
3895    #[cfg(feature = "metrics")]
3896    #[test]
3897    fn test_reset_metrics() {
3898        let db = GrafeoDB::new_in_memory();
3899        // Execute something to generate metrics
3900        let _session = db.session();
3901        db.reset_metrics();
3902        let snap = db.metrics();
3903        assert_eq!(snap.query_count, 0);
3904    }
3905
3906    // =========================================================================
3907    // drop_graph on external store
3908    // =========================================================================
3909
3910    #[test]
3911    fn test_drop_graph_on_external_store() {
3912        use grafeo_core::graph::lpg::LpgStore;
3913
3914        let store = Arc::new(LpgStore::new().unwrap());
3915        let read_store = Arc::clone(&store) as Arc<dyn GraphStoreSearch>;
3916        let db = GrafeoDB::with_read_store(read_store, Config::in_memory()).unwrap();
3917
3918        // drop_graph with external store (no built-in store) returns false
3919        assert!(!db.drop_graph("anything"));
3920    }
3921}