Skip to main content

grafeo_engine/database/
mod.rs

1//! The main database struct and operations.
2//!
3//! Start here with [`GrafeoDB`] - it's your handle to everything.
4//!
5//! Operations are split across focused submodules:
6//! - `query` - Query execution (execute, execute_cypher, etc.)
7//! - `crud` - Node/edge CRUD operations
8//! - `index` - Property, vector, and text index management
9//! - `search` - Vector, text, and hybrid search
10//! - `embed` - Embedding model management
11//! - `persistence` - Save, load, snapshots, iteration
12//! - `admin` - Stats, introspection, diagnostics, CDC
13
14#[cfg(feature = "lpg")]
15mod admin;
16#[cfg(feature = "arrow-export")]
17pub mod arrow;
18#[cfg(all(feature = "async-storage", feature = "lpg"))]
19mod async_ops;
20#[cfg(all(feature = "async-storage", feature = "lpg"))]
21pub(crate) mod async_wal_store;
22#[cfg(all(feature = "wal", feature = "grafeo-file"))]
23pub mod backup;
24#[cfg(feature = "lpg")]
25pub(crate) mod catalog_section;
26#[cfg(feature = "cdc")]
27pub(crate) mod cdc_store;
28#[cfg(all(feature = "grafeo-file", feature = "lpg"))]
29mod checkpoint_timer;
30#[cfg(all(feature = "compact-store", feature = "mmap"))]
31pub mod compact_tiered;
32#[cfg(feature = "lpg")]
33mod crud;
34#[cfg(feature = "embed")]
35mod embed;
36#[cfg(feature = "grafeo-file")]
37pub(crate) mod flush;
38#[cfg(feature = "lpg")]
39mod import;
40#[cfg(feature = "lpg")]
41mod index;
42#[cfg(feature = "lpg")]
43mod persistence;
44mod query;
45#[cfg(feature = "triple-store")]
46mod rdf_ops;
47#[cfg(feature = "lpg")]
48mod search;
49pub(crate) mod section_consumer;
50#[cfg(all(feature = "wal", feature = "lpg"))]
51pub(crate) mod wal_store;
52
53use grafeo_common::{grafeo_error, grafeo_warn};
54#[cfg(feature = "wal")]
55use std::path::Path;
56use std::sync::Arc;
57use std::sync::atomic::AtomicUsize;
58
59use parking_lot::RwLock;
60
61use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
62use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind, Result};
63#[cfg(feature = "lpg")]
64use grafeo_core::graph::lpg::LpgStore;
65#[cfg(feature = "triple-store")]
66use grafeo_core::graph::rdf::RdfStore;
67use grafeo_core::graph::{GraphStoreMut, GraphStoreSearch};
68#[cfg(feature = "grafeo-file")]
69use grafeo_storage::file::GrafeoFileManager;
70#[cfg(all(feature = "wal", feature = "lpg"))]
71use grafeo_storage::wal::WalRecovery;
72#[cfg(feature = "wal")]
73use grafeo_storage::wal::{DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord};
74
75use crate::catalog::Catalog;
76use crate::config::Config;
77use crate::query::cache::QueryCache;
78use crate::session::Session;
79use crate::transaction::TransactionManager;
80
81/// Your handle to a Grafeo database.
82///
83/// Start here. Create one with [`new_in_memory()`](Self::new_in_memory) for
84/// quick experiments, or [`open()`](Self::open) for persistent storage.
85/// Then grab a [`session()`](Self::session) to start querying.
86///
87/// # Examples
88///
89/// ```
90/// use grafeo_engine::GrafeoDB;
91///
92/// // Quick in-memory database
93/// let db = GrafeoDB::new_in_memory();
94///
95/// // Add some data
96/// db.create_node(&["Person"]);
97///
98/// // Query it
99/// let session = db.session();
100/// let result = session.execute("MATCH (p:Person) RETURN p")?;
101/// # Ok::<(), grafeo_common::utils::error::Error>(())
102/// ```
103pub struct GrafeoDB {
104    /// Database configuration.
105    pub(super) config: Config,
106    /// The underlying graph store (None when using an external store).
107    #[cfg(feature = "lpg")]
108    pub(super) store: Option<Arc<LpgStore>>,
109    /// Schema and metadata catalog shared across sessions.
110    pub(super) catalog: Arc<Catalog>,
111    /// RDF triple store (if RDF feature is enabled).
112    #[cfg(feature = "triple-store")]
113    pub(super) rdf_store: Arc<RdfStore>,
114    /// Transaction manager.
115    pub(super) transaction_manager: Arc<TransactionManager>,
116    /// Unified buffer manager.
117    pub(super) buffer_manager: Arc<BufferManager>,
118    /// Write-ahead log manager (if durability is enabled).
119    #[cfg(feature = "wal")]
120    pub(super) wal: Option<Arc<LpgWal>>,
121    /// Shared WAL graph context tracker. Tracks which named graph was last
122    /// written to the WAL, so concurrent sessions can emit `SwitchGraph`
123    /// records only when the context actually changes.
124    #[cfg(feature = "wal")]
125    pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
126    /// Query cache for parsed and optimized plans.
127    pub(super) query_cache: Arc<QueryCache>,
128    /// Shared commit counter for auto-GC across sessions.
129    pub(super) commit_counter: Arc<AtomicUsize>,
130    /// Whether the database is open.
131    pub(super) is_open: RwLock<bool>,
132    /// Change data capture log for tracking mutations.
133    #[cfg(feature = "cdc")]
134    pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
135    /// Whether CDC is active for new sessions and direct CRUD (runtime-mutable).
136    #[cfg(feature = "cdc")]
137    cdc_enabled: std::sync::atomic::AtomicBool,
138    /// Registered embedding models for text-to-vector conversion.
139    #[cfg(feature = "embed")]
140    pub(super) embedding_models:
141        RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
142    /// Single-file database manager (when using `.grafeo` format).
143    #[cfg(feature = "grafeo-file")]
144    pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
145    /// Periodic checkpoint timer (when `checkpoint_interval` is configured).
146    /// Wrapped in Mutex because `close()` takes `&self` but needs to stop the timer.
147    #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
148    checkpoint_timer: parking_lot::Mutex<Option<checkpoint_timer::CheckpointTimer>>,
149    /// Shared registry of spilled vector storages.
150    /// Used by the search path to create `SpillableVectorAccessor` instances.
151    #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
152    vector_spill_storages: Option<
153        Arc<
154            parking_lot::RwLock<
155                std::collections::HashMap<String, Arc<grafeo_core::index::vector::MmapStorage>>,
156            >,
157        >,
158    >,
159    /// External read-only graph store (when using with_store() or with_read_store()).
160    /// When set, sessions route queries through this store instead of the built-in LpgStore.
161    pub(super) external_read_store: Option<Arc<dyn GraphStoreSearch>>,
162    /// External writable graph store (when using with_store()).
163    /// None for read-only databases created via with_read_store().
164    pub(super) external_write_store: Option<Arc<dyn GraphStoreMut>>,
165    /// Metrics registry shared across all sessions.
166    #[cfg(feature = "metrics")]
167    pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
168    /// Persistent graph context for one-shot `execute()` calls.
169    /// When set, each call to `session()` pre-configures the session to this graph.
170    /// Updated after every one-shot `execute()` to reflect `USE GRAPH` / `SESSION RESET`.
171    current_graph: RwLock<Option<String>>,
172    /// Persistent schema context for one-shot `execute()` calls.
173    /// When set, each call to `session()` pre-configures the session to this schema.
174    /// Updated after every one-shot `execute()` to reflect `SESSION SET SCHEMA` / `SESSION RESET`.
175    current_schema: RwLock<Option<String>>,
176    /// Whether this database is open in read-only mode.
177    /// When true, sessions automatically enforce read-only transactions.
178    read_only: bool,
179    /// Named graph projections (virtual subgraphs), shared with sessions.
180    projections:
181        Arc<RwLock<std::collections::HashMap<String, Arc<grafeo_core::graph::GraphProjection>>>>,
182    /// Layered store (compact base + mutable overlay), set after `compact()`.
183    #[cfg(all(feature = "compact-store", feature = "lpg"))]
184    layered_store: Option<Arc<grafeo_core::graph::compact::layered::LayeredStore>>,
185    /// Disk-backed tier wrapper for the compact base, set after `compact()`.
186    ///
187    /// Provides the spill path for [`CompactStoreConsumer`]: when the buffer
188    /// manager signals memory pressure, the consumer calls
189    /// `persist_to_mmap()` here and then publishes the fresh base to
190    /// `layered_store` via `swap_base()`.
191    #[cfg(all(feature = "compact-store", feature = "mmap", feature = "lpg"))]
192    compact_tiered: Option<Arc<compact_tiered::CompactStoreTiered>>,
193}
194
195impl GrafeoDB {
196    /// Returns a reference to the built-in LPG store.
197    ///
198    /// # Panics
199    ///
200    /// Panics if the database was created with [`with_store()`](Self::with_store) or
201    /// [`with_read_store()`](Self::with_read_store), which use an external store
202    /// instead of the built-in LPG store.
203    #[cfg(feature = "lpg")]
204    fn lpg_store(&self) -> &Arc<LpgStore> {
205        self.store.as_ref().expect(
206            "no built-in LpgStore: this GrafeoDB was created with an external store \
207             (with_store / with_read_store). Use session() or graph_store() instead.",
208        )
209    }
210
211    /// Returns a borrowed reference to the active graph store.
212    ///
213    /// In layered mode (after [`compact()`](Self::compact)), returns the
214    /// `LayeredStore` which merges the columnar base with the overlay.
215    /// Otherwise, returns the built-in `LpgStore`.
216    ///
217    /// Unlike [`graph_store()`](Self::graph_store) (which clones an `Arc`),
218    /// this borrows from `self` — suitable for constructing accessors that
219    /// need `&'a dyn GraphStore` tied to the database lifetime.
220    #[cfg(any(
221        feature = "vector-index",
222        feature = "text-index",
223        feature = "hybrid-search",
224        feature = "embed",
225    ))]
226    fn graph_store_ref(&self) -> &dyn grafeo_core::graph::GraphStore {
227        if let Some(ref ext_read) = self.external_read_store {
228            ext_read.as_ref()
229        } else {
230            #[cfg(feature = "lpg")]
231            {
232                &**self.lpg_store()
233            }
234            #[cfg(not(feature = "lpg"))]
235            unreachable!("no graph store available: enable the `lpg` feature or use with_store()")
236        }
237    }
238
239    /// Returns whether CDC is active (runtime check).
240    #[cfg(feature = "cdc")]
241    #[inline]
242    pub(super) fn cdc_active(&self) -> bool {
243        self.cdc_enabled.load(std::sync::atomic::Ordering::Relaxed)
244    }
245
246    /// Creates an in-memory database, fast to create, gone when dropped.
247    ///
248    /// Use this for tests, experiments, or when you don't need persistence.
249    /// For data that survives restarts, use [`open()`](Self::open) instead.
250    ///
251    /// # Panics
252    ///
253    /// Panics if the internal arena allocator cannot be initialized (out of memory).
254    /// Use [`with_config()`](Self::with_config) for a fallible alternative.
255    ///
256    /// # Examples
257    ///
258    /// ```
259    /// use grafeo_engine::GrafeoDB;
260    ///
261    /// let db = GrafeoDB::new_in_memory();
262    /// let session = db.session();
263    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
264    /// # Ok::<(), grafeo_common::utils::error::Error>(())
265    /// ```
266    #[must_use]
267    pub fn new_in_memory() -> Self {
268        Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
269    }
270
271    /// Opens a database at the given path, creating it if it doesn't exist.
272    ///
273    /// If you've used this path before, Grafeo recovers your data from the
274    /// write-ahead log automatically. First open on a new path creates an
275    /// empty database.
276    ///
277    /// # Errors
278    ///
279    /// Returns an error if the path isn't writable or recovery fails.
280    ///
281    /// # Examples
282    ///
283    /// ```no_run
284    /// use grafeo_engine::GrafeoDB;
285    ///
286    /// let db = GrafeoDB::open("./my_social_network")?;
287    /// # Ok::<(), grafeo_common::utils::error::Error>(())
288    /// ```
289    #[cfg(feature = "wal")]
290    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
291        Self::with_config(Config::persistent(path.as_ref()))
292    }
293
294    /// Opens an existing database in read-only mode.
295    ///
296    /// Uses a shared file lock, so multiple processes can read the same
297    /// `.grafeo` file concurrently. The database loads the last checkpoint
298    /// snapshot but does **not** replay the WAL or allow mutations.
299    ///
300    /// Currently only supports the single-file (`.grafeo`) format.
301    ///
302    /// # Errors
303    ///
304    /// Returns an error if the file doesn't exist or can't be read.
305    ///
306    /// # Examples
307    ///
308    /// ```no_run
309    /// use grafeo_engine::GrafeoDB;
310    ///
311    /// let db = GrafeoDB::open_read_only("./my_graph.grafeo")?;
312    /// let session = db.session();
313    /// let result = session.execute("MATCH (n) RETURN n LIMIT 10")?;
314    /// // Mutations will return an error:
315    /// // session.execute("INSERT (:Person)") => Err(ReadOnly)
316    /// # Ok::<(), grafeo_common::utils::error::Error>(())
317    /// ```
318    #[cfg(feature = "grafeo-file")]
319    pub fn open_read_only(path: impl AsRef<std::path::Path>) -> Result<Self> {
320        Self::with_config(Config::read_only(path.as_ref()))
321    }
322
323    /// Creates a database with custom configuration.
324    ///
325    /// Use this when you need fine-grained control over memory limits,
326    /// thread counts, or persistence settings. For most cases,
327    /// [`new_in_memory()`](Self::new_in_memory) or [`open()`](Self::open)
328    /// are simpler.
329    ///
330    /// # Errors
331    ///
332    /// Returns an error if the database can't be created or recovery fails.
333    ///
334    /// # Examples
335    ///
336    /// ```
337    /// use grafeo_engine::{GrafeoDB, Config};
338    ///
339    /// // In-memory with a 512MB limit
340    /// let config = Config::in_memory()
341    ///     .with_memory_limit(512 * 1024 * 1024);
342    ///
343    /// let db = GrafeoDB::with_config(config)?;
344    /// # Ok::<(), grafeo_common::utils::error::Error>(())
345    /// ```
346    pub fn with_config(config: Config) -> Result<Self> {
347        // Validate configuration before proceeding
348        config
349            .validate()
350            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
351
352        #[cfg(feature = "lpg")]
353        let store = Arc::new(LpgStore::new()?);
354        #[cfg(feature = "triple-store")]
355        let rdf_store = Arc::new(RdfStore::new());
356        let transaction_manager = Arc::new(TransactionManager::new());
357
358        // Create buffer manager with configured limits
359        let buffer_config = BufferManagerConfig {
360            budget: config.memory_limit.unwrap_or_else(|| {
361                // reason: product of system RAM and 0.75 is always a valid positive usize
362                #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
363                let b = (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize;
364                b
365            }),
366            spill_path: config.spill_path.clone().or_else(|| {
367                config.path.as_ref().and_then(|p| {
368                    let parent = p.parent()?;
369                    let name = p.file_name()?.to_str()?;
370                    Some(parent.join(format!("{name}.spill")))
371                })
372            }),
373            ..BufferManagerConfig::default()
374        };
375        let buffer_manager = BufferManager::new(buffer_config);
376
377        // Create catalog early so WAL replay can restore schema definitions
378        let catalog = Arc::new(Catalog::new());
379
380        let is_read_only = config.access_mode == crate::config::AccessMode::ReadOnly;
381
382        // Phase 5e: capture the deserialized CompactStore base when we
383        // reload a v2 section file that was written by a previously
384        // compacted database. The post-construction wiring uses this to
385        // rebuild the LayeredStore + tier wrapper + overlay consumer.
386        #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "compact-store"))]
387        let mut loaded_compact_base: Option<
388            Arc<grafeo_core::graph::compact::CompactStore>,
389        > = None;
390
391        // Phase 5e: snapshot of the OverlayDeletions section (if present),
392        // applied after the LayeredStore is wired so that previously-deleted
393        // base nodes/edges remain deleted across reload.
394        #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "compact-store"))]
395        let mut loaded_overlay_deletions: Option<(
396            Vec<grafeo_common::types::NodeId>,
397            Vec<grafeo_common::types::EdgeId>,
398        )> = None;
399
400        // --- Single-file format (.grafeo) ---
401        #[cfg(feature = "grafeo-file")]
402        let file_manager: Option<Arc<GrafeoFileManager>> = if is_read_only {
403            // Read-only mode: open with shared lock, load snapshot, skip WAL
404            if let Some(ref db_path) = config.path {
405                if db_path.exists() && db_path.is_file() {
406                    let fm = GrafeoFileManager::open_read_only(db_path)?;
407                    // Try v2 section-based format first
408                    #[cfg(feature = "lpg")]
409                    if fm.read_section_directory()?.is_some() {
410                        Self::load_from_sections(
411                            &fm,
412                            &store,
413                            &catalog,
414                            #[cfg(feature = "triple-store")]
415                            &rdf_store,
416                        )?;
417                        #[cfg(feature = "compact-store")]
418                        {
419                            loaded_compact_base = Self::extract_compact_base(&fm)?;
420                            loaded_overlay_deletions = Self::extract_overlay_deletions(&fm)?;
421                        }
422                    } else {
423                        // Fall back to v1 blob format
424                        let snapshot_data = fm.read_snapshot()?;
425                        if !snapshot_data.is_empty() {
426                            Self::apply_snapshot_data(
427                                &store,
428                                &catalog,
429                                #[cfg(feature = "triple-store")]
430                                &rdf_store,
431                                &snapshot_data,
432                            )?;
433                        }
434                    }
435                    Some(Arc::new(fm))
436                } else {
437                    return Err(grafeo_common::utils::error::Error::Internal(format!(
438                        "read-only open requires an existing .grafeo file: {}",
439                        db_path.display()
440                    )));
441                }
442            } else {
443                return Err(grafeo_common::utils::error::Error::Internal(
444                    "read-only mode requires a database path".to_string(),
445                ));
446            }
447        } else if let Some(ref db_path) = config.path {
448            // Initialize the file manager whenever single-file format is selected,
449            // regardless of whether WAL is enabled. Without this, a database opened
450            // with wal_enabled:false + StorageFormat::SingleFile would produce no
451            // output at all (the file manager was previously gated behind wal_enabled).
452            if Self::should_use_single_file(db_path, config.storage_format) {
453                let fm = if db_path.exists() && db_path.is_file() {
454                    GrafeoFileManager::open(db_path)?
455                } else if !db_path.exists() {
456                    GrafeoFileManager::create(db_path)?
457                } else {
458                    // Path exists but is not a file (directory, etc.)
459                    return Err(grafeo_common::utils::error::Error::Internal(format!(
460                        "path exists but is not a file: {}",
461                        db_path.display()
462                    )));
463                };
464
465                // Load data: try v2 section-based format, fall back to v1 blob
466                #[cfg(feature = "lpg")]
467                if fm.read_section_directory()?.is_some() {
468                    Self::load_from_sections(
469                        &fm,
470                        &store,
471                        &catalog,
472                        #[cfg(feature = "triple-store")]
473                        &rdf_store,
474                    )?;
475                    #[cfg(feature = "compact-store")]
476                    {
477                        loaded_compact_base = Self::extract_compact_base(&fm)?;
478                        loaded_overlay_deletions = Self::extract_overlay_deletions(&fm)?;
479                    }
480                } else {
481                    let snapshot_data = fm.read_snapshot()?;
482                    if !snapshot_data.is_empty() {
483                        Self::apply_snapshot_data(
484                            &store,
485                            &catalog,
486                            #[cfg(feature = "triple-store")]
487                            &rdf_store,
488                            &snapshot_data,
489                        )?;
490                    }
491                }
492
493                // Recover sidecar WAL if WAL is enabled and a sidecar exists
494                #[cfg(all(feature = "wal", feature = "lpg"))]
495                if config.wal_enabled && fm.has_sidecar_wal() {
496                    let recovery = WalRecovery::new(fm.sidecar_wal_path());
497                    let records = recovery.recover()?;
498                    Self::apply_wal_records(
499                        &store,
500                        &catalog,
501                        #[cfg(feature = "triple-store")]
502                        &rdf_store,
503                        &records,
504                    )?;
505                }
506
507                Some(Arc::new(fm))
508            } else {
509                None
510            }
511        } else {
512            None
513        };
514
515        // Determine whether to use the WAL directory path (legacy) or sidecar
516        // Read-only mode skips WAL entirely (no recovery, no creation).
517        #[cfg(feature = "wal")]
518        let wal = if is_read_only {
519            None
520        } else if config.wal_enabled {
521            if let Some(ref db_path) = config.path {
522                // When using single-file format, the WAL is a sidecar directory
523                #[cfg(feature = "grafeo-file")]
524                let wal_path = if let Some(ref fm) = file_manager {
525                    let p = fm.sidecar_wal_path();
526                    std::fs::create_dir_all(&p)?;
527                    p
528                } else {
529                    // Legacy: WAL inside the database directory
530                    std::fs::create_dir_all(db_path)?;
531                    db_path.join("wal")
532                };
533
534                #[cfg(not(feature = "grafeo-file"))]
535                let wal_path = {
536                    std::fs::create_dir_all(db_path)?;
537                    db_path.join("wal")
538                };
539
540                // For legacy WAL directory format, check if WAL exists and recover
541                #[cfg(all(feature = "lpg", feature = "grafeo-file"))]
542                let is_single_file = file_manager.is_some();
543                #[cfg(all(feature = "lpg", not(feature = "grafeo-file")))]
544                let is_single_file = false;
545
546                #[cfg(feature = "lpg")]
547                if !is_single_file && wal_path.exists() {
548                    let recovery = WalRecovery::new(&wal_path);
549                    let records = recovery.recover()?;
550                    Self::apply_wal_records(
551                        &store,
552                        &catalog,
553                        #[cfg(feature = "triple-store")]
554                        &rdf_store,
555                        &records,
556                    )?;
557                }
558
559                // Open/create WAL manager with configured durability
560                let wal_durability = match config.wal_durability {
561                    crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
562                    crate::config::DurabilityMode::Batch {
563                        max_delay_ms,
564                        max_records,
565                    } => WalDurabilityMode::Batch {
566                        max_delay_ms,
567                        max_records,
568                    },
569                    crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
570                        WalDurabilityMode::Adaptive { target_interval_ms }
571                    }
572                    crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
573                };
574                let wal_config = WalConfig {
575                    durability: wal_durability,
576                    ..WalConfig::default()
577                };
578                let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
579                Some(Arc::new(wal_manager))
580            } else {
581                None
582            }
583        } else {
584            None
585        };
586
587        // Create query cache with default capacity (1000 queries)
588        let query_cache = Arc::new(QueryCache::default());
589
590        // After all snapshot/WAL recovery, sync TransactionManager epoch
591        // with the store so queries use the correct viewing epoch.
592        #[cfg(all(feature = "temporal", feature = "lpg"))]
593        transaction_manager.sync_epoch(store.current_epoch());
594
595        #[cfg(feature = "cdc")]
596        let cdc_enabled_val = config.cdc_enabled;
597        #[cfg(feature = "cdc")]
598        let cdc_retention = config.cdc_retention.clone();
599
600        // Clone Arcs for the checkpoint timer before moving originals into the struct.
601        // The timer captures its own references and runs in a background thread.
602        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
603        let checkpoint_interval = config.checkpoint_interval;
604        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
605        let timer_store = Arc::clone(&store);
606        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
607        let timer_catalog = Arc::clone(&catalog);
608        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
609        let timer_tm = Arc::clone(&transaction_manager);
610        #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "triple-store"))]
611        let timer_rdf = Arc::clone(&rdf_store);
612        #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "wal"))]
613        let timer_wal = wal.clone();
614
615        let mut db = Self {
616            config,
617            #[cfg(feature = "lpg")]
618            store: Some(store),
619            catalog,
620            #[cfg(feature = "triple-store")]
621            rdf_store,
622            transaction_manager,
623            buffer_manager,
624            #[cfg(feature = "wal")]
625            wal,
626            #[cfg(feature = "wal")]
627            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
628            query_cache,
629            commit_counter: Arc::new(AtomicUsize::new(0)),
630            is_open: RwLock::new(true),
631            #[cfg(feature = "cdc")]
632            cdc_log: Arc::new(crate::cdc::CdcLog::with_retention(cdc_retention)),
633            #[cfg(feature = "cdc")]
634            cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
635            #[cfg(feature = "embed")]
636            embedding_models: RwLock::new(hashbrown::HashMap::new()),
637            #[cfg(feature = "grafeo-file")]
638            file_manager,
639            #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
640            checkpoint_timer: parking_lot::Mutex::new(None),
641            #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
642            vector_spill_storages: None,
643            external_read_store: None,
644            external_write_store: None,
645            #[cfg(feature = "metrics")]
646            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
647            current_graph: RwLock::new(None),
648            current_schema: RwLock::new(None),
649            read_only: is_read_only,
650            projections: Arc::new(RwLock::new(std::collections::HashMap::new())),
651            #[cfg(all(feature = "compact-store", feature = "lpg"))]
652            layered_store: None,
653            #[cfg(all(feature = "compact-store", feature = "mmap", feature = "lpg"))]
654            compact_tiered: None,
655        };
656
657        // Register storage sections as memory consumers for pressure tracking
658        db.register_section_consumers();
659
660        // Phase 5e: if the loaded file has a CompactStore section, the
661        // database was previously compacted. Reconstruct the LayeredStore
662        // wiring (base + overlay + tier wrapper + consumers) so the
663        // engine sees the full picture and the read/write paths route
664        // through the layered store.
665        #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "compact-store"))]
666        if let Some(compact_base) = loaded_compact_base {
667            db.wire_layered_after_load(compact_base, loaded_overlay_deletions)?;
668        }
669
670        // Start periodic checkpoint timer if configured
671        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
672        if let (Some(interval), Some(fm)) = (checkpoint_interval, &db.file_manager)
673            && !is_read_only
674        {
675            *db.checkpoint_timer.lock() = Some(checkpoint_timer::CheckpointTimer::start(
676                interval,
677                Arc::clone(fm),
678                timer_store,
679                timer_catalog,
680                timer_tm,
681                #[cfg(feature = "triple-store")]
682                timer_rdf,
683                #[cfg(feature = "wal")]
684                timer_wal,
685            ));
686        }
687
688        // Discover existing spill files from a previous session.
689        // If vectors were spilled before close, the spill files persist on disk
690        // and need to be re-mapped so search can read from them.
691        #[cfg(all(
692            feature = "lpg",
693            feature = "vector-index",
694            feature = "mmap",
695            not(feature = "temporal")
696        ))]
697        db.restore_spill_files();
698
699        // Phase 8a: apply per-section ForceDisk overrides. Each section
700        // type configured as ForceDisk triggers a targeted spill of its
701        // matching consumer; sections with Auto/ForceRam are left alone.
702        // Must happen after register_section_consumers() which creates
703        // the consumers we're about to spill.
704        db.apply_force_disk_overrides();
705
706        Ok(db)
707    }
708
709    /// Creates a database backed by a custom [`GraphStoreMut`] implementation.
710    ///
711    /// The external store handles all data persistence. WAL, CDC, and index
712    /// management are the responsibility of the store implementation.
713    ///
714    /// Query execution (all 6 languages, optimizer, planner) works through the
715    /// provided store. Admin operations (schema introspection, persistence,
716    /// vector/text indexes) are not available on external stores.
717    ///
718    /// # Examples
719    ///
720    /// ```no_run
721    /// use std::sync::Arc;
722    /// use grafeo_engine::{GrafeoDB, Config};
723    /// use grafeo_core::graph::GraphStoreMut;
724    ///
725    /// fn example(store: Arc<dyn GraphStoreMut>) -> grafeo_common::utils::error::Result<()> {
726    ///     let db = GrafeoDB::with_store(store, Config::in_memory())?;
727    ///     let result = db.execute("MATCH (n) RETURN count(n)")?;
728    ///     Ok(())
729    /// }
730    /// ```
731    ///
732    /// # Errors
733    ///
734    /// Returns an error if config validation fails.
735    ///
736    /// [`GraphStoreMut`]: grafeo_core::graph::GraphStoreMut
737    pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
738        config
739            .validate()
740            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
741
742        let transaction_manager = Arc::new(TransactionManager::new());
743
744        let buffer_config = BufferManagerConfig {
745            budget: config.memory_limit.unwrap_or_else(|| {
746                // reason: product of system RAM and 0.75 is always a valid positive usize
747                #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
748                let b = (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize;
749                b
750            }),
751            spill_path: None,
752            ..BufferManagerConfig::default()
753        };
754        let buffer_manager = BufferManager::new(buffer_config);
755
756        let query_cache = Arc::new(QueryCache::default());
757
758        #[cfg(feature = "cdc")]
759        let cdc_enabled_val = config.cdc_enabled;
760
761        Ok(Self {
762            config,
763            #[cfg(feature = "lpg")]
764            store: None,
765            catalog: Arc::new(Catalog::new()),
766            #[cfg(feature = "triple-store")]
767            rdf_store: Arc::new(RdfStore::new()),
768            transaction_manager,
769            buffer_manager,
770            #[cfg(feature = "wal")]
771            wal: None,
772            #[cfg(feature = "wal")]
773            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
774            query_cache,
775            commit_counter: Arc::new(AtomicUsize::new(0)),
776            is_open: RwLock::new(true),
777            #[cfg(feature = "cdc")]
778            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
779            #[cfg(feature = "cdc")]
780            cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
781            #[cfg(feature = "embed")]
782            embedding_models: RwLock::new(hashbrown::HashMap::new()),
783            #[cfg(feature = "grafeo-file")]
784            file_manager: None,
785            #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
786            checkpoint_timer: parking_lot::Mutex::new(None),
787            #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
788            vector_spill_storages: None,
789            external_read_store: Some(Arc::clone(&store) as Arc<dyn GraphStoreSearch>),
790            external_write_store: Some(store),
791            #[cfg(feature = "metrics")]
792            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
793            current_graph: RwLock::new(None),
794            current_schema: RwLock::new(None),
795            read_only: false,
796            projections: Arc::new(RwLock::new(std::collections::HashMap::new())),
797            #[cfg(all(feature = "compact-store", feature = "lpg"))]
798            layered_store: None,
799            #[cfg(all(feature = "compact-store", feature = "mmap", feature = "lpg"))]
800            compact_tiered: None,
801        })
802    }
803
804    /// Creates a database backed by a read-only [`GraphStore`].
805    ///
806    /// The database is set to read-only mode. Write queries (CREATE, SET,
807    /// DELETE) will return `TransactionError::ReadOnly`.
808    ///
809    /// # Examples
810    ///
811    /// ```no_run
812    /// use std::sync::Arc;
813    /// use grafeo_engine::{GrafeoDB, Config};
814    /// use grafeo_core::graph::GraphStoreSearch;
815    ///
816    /// fn example(store: Arc<dyn GraphStoreSearch>) -> grafeo_common::utils::error::Result<()> {
817    ///     let db = GrafeoDB::with_read_store(store, Config::in_memory())?;
818    ///     let result = db.execute("MATCH (n) RETURN count(n)")?;
819    ///     Ok(())
820    /// }
821    /// ```
822    ///
823    /// # Errors
824    ///
825    /// Returns an error if config validation fails.
826    ///
827    /// [`GraphStore`]: grafeo_core::graph::GraphStore
828    pub fn with_read_store(store: Arc<dyn GraphStoreSearch>, config: Config) -> Result<Self> {
829        config
830            .validate()
831            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
832
833        let transaction_manager = Arc::new(TransactionManager::new());
834
835        let buffer_config = BufferManagerConfig {
836            budget: config.memory_limit.unwrap_or_else(|| {
837                // reason: product of system RAM and 0.75 is always a valid positive usize
838                #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
839                let b = (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize;
840                b
841            }),
842            spill_path: None,
843            ..BufferManagerConfig::default()
844        };
845        let buffer_manager = BufferManager::new(buffer_config);
846
847        let query_cache = Arc::new(QueryCache::default());
848
849        #[cfg(feature = "cdc")]
850        let cdc_enabled_val = config.cdc_enabled;
851
852        Ok(Self {
853            config,
854            #[cfg(feature = "lpg")]
855            store: None,
856            catalog: Arc::new(Catalog::new()),
857            #[cfg(feature = "triple-store")]
858            rdf_store: Arc::new(RdfStore::new()),
859            transaction_manager,
860            buffer_manager,
861            #[cfg(feature = "wal")]
862            wal: None,
863            #[cfg(feature = "wal")]
864            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
865            query_cache,
866            commit_counter: Arc::new(AtomicUsize::new(0)),
867            is_open: RwLock::new(true),
868            #[cfg(feature = "cdc")]
869            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
870            #[cfg(feature = "cdc")]
871            cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
872            #[cfg(feature = "embed")]
873            embedding_models: RwLock::new(hashbrown::HashMap::new()),
874            #[cfg(feature = "grafeo-file")]
875            file_manager: None,
876            #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
877            checkpoint_timer: parking_lot::Mutex::new(None),
878            #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
879            vector_spill_storages: None,
880            external_read_store: Some(store),
881            external_write_store: None,
882            #[cfg(feature = "metrics")]
883            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
884            current_graph: RwLock::new(None),
885            current_schema: RwLock::new(None),
886            read_only: true,
887            projections: Arc::new(RwLock::new(std::collections::HashMap::new())),
888            #[cfg(all(feature = "compact-store", feature = "lpg"))]
889            layered_store: None,
890            #[cfg(all(feature = "compact-store", feature = "mmap", feature = "lpg"))]
891            compact_tiered: None,
892        })
893    }
894
895    /// Compacts the database into a two-layer store: columnar base + mutable overlay.
896    ///
897    /// Takes a snapshot of all nodes and edges from the current store, builds
898    /// a columnar `CompactStore` with CSR adjacency as the read-only base,
899    /// and creates a fresh `LpgStore` overlay for future mutations. The
900    /// original store is dropped to free memory.
901    ///
902    /// Unlike the pre-0.5.39 behavior, the database remains writable after
903    /// compaction: new writes go to the overlay. Call [`recompact()`](Self::recompact)
904    /// to merge the overlay back into the columnar base periodically.
905    ///
906    /// # Errors
907    ///
908    /// Returns an error if the conversion fails (e.g. more than 32,767
909    /// distinct labels or edge types).
910    ///
911    /// [`CompactStore`]: grafeo_core::graph::compact::CompactStore
912    #[cfg(all(feature = "compact-store", feature = "lpg"))]
913    pub fn compact(&mut self) -> Result<()> {
914        use grafeo_core::graph::compact::from_graph_store_preserving_ids;
915        use grafeo_core::graph::compact::layered::LayeredStore;
916
917        let current_store = self.graph_store();
918
919        // Determine max node/edge IDs for overlay seeding.
920        let max_node_id = if let Some(ref store) = self.store {
921            store.next_node_id().saturating_sub(1)
922        } else {
923            current_store.node_ids().last().map_or(0, |id| id.as_u64())
924        };
925        let max_edge_id = if let Some(ref store) = self.store {
926            store.next_edge_id().saturating_sub(1)
927        } else {
928            // Scan edges to find max ID.
929            let mut max_eid = 0u64;
930            for nid in current_store.node_ids() {
931                for (_, eid) in
932                    current_store.edges_from(nid, grafeo_core::graph::Direction::Outgoing)
933                {
934                    max_eid = max_eid.max(eid.as_u64());
935                }
936            }
937            max_eid
938        };
939
940        let compact = from_graph_store_preserving_ids(current_store.as_ref())
941            .map_err(|e| Error::Internal(e.to_string()))?;
942
943        let layered = Arc::new(
944            LayeredStore::new(compact, max_node_id, max_edge_id)
945                .map_err(|e| Error::Internal(e.to_string()))?,
946        );
947
948        // Sync the overlay's epoch with the TransactionManager so MVCC
949        // visibility works correctly for nodes created after compact().
950        let current_epoch = self.transaction_manager.current_epoch();
951        layered.overlay_store().sync_epoch(current_epoch);
952
953        // Named graphs are LPG-specific and outside the columnar base; move them
954        // from the pre-compact overlay into the new overlay so they survive
955        // compaction.
956        if let Some(ref old) = self.store {
957            layered
958                .overlay_store()
959                .install_named_graphs(old.take_named_graphs());
960        }
961
962        self.external_read_store = Some(Arc::clone(&layered) as Arc<dyn GraphStoreSearch>);
963        self.external_write_store = Some(Arc::clone(&layered) as Arc<dyn GraphStoreMut>);
964        self.store = Some(layered.overlay_store());
965
966        // Install the disk-backed tier wrapper and register its memory
967        // consumer so the BufferManager can spill the base to mmap under
968        // memory pressure.
969        #[cfg(feature = "mmap")]
970        {
971            let tiered = Arc::new(compact_tiered::CompactStoreTiered::new_in_memory(
972                layered.base_store_arc(),
973            ));
974            let spill_path = self.buffer_manager.config().spill_path.clone();
975            let consumer = Arc::new(section_consumer::CompactStoreConsumer::new(
976                &tiered, &layered, spill_path,
977            ));
978            self.buffer_manager.register_consumer(consumer);
979            self.compact_tiered = Some(tiered);
980        }
981
982        // Phase 5c: register the overlay consumer so growing-overlay
983        // pressure triggers an automatic merge-into-base.
984        let overlay_consumer = Arc::new(section_consumer::OverlayConsumer::new(&layered));
985        self.buffer_manager.register_consumer(overlay_consumer);
986
987        self.layered_store = Some(layered);
988        self.read_only = false;
989        self.query_cache = Arc::new(QueryCache::default());
990        self.projections.write().clear();
991
992        Ok(())
993    }
994
995    /// Merges the overlay back into the columnar base.
996    ///
997    /// Reads the combined view (base + overlay), builds a fresh `CompactStore`,
998    /// and replaces the `LayeredStore` with one backed by the merged base and
999    /// an empty overlay.
1000    ///
1001    /// # Errors
1002    ///
1003    /// Returns an error if the database was not previously compacted, or if
1004    /// the merge fails.
1005    #[cfg(all(feature = "compact-store", feature = "lpg"))]
1006    pub fn recompact(&mut self) -> Result<()> {
1007        use grafeo_core::graph::compact::from_graph_store_preserving_ids;
1008        use grafeo_core::graph::compact::layered::LayeredStore;
1009
1010        let layered = self
1011            .layered_store
1012            .as_ref()
1013            .ok_or_else(|| Error::Internal("recompact() requires a prior compact()".into()))?;
1014
1015        // Read the combined view.
1016        let combined: Arc<dyn GraphStoreSearch> = Arc::clone(layered) as Arc<dyn GraphStoreSearch>;
1017
1018        // Max IDs from the overlay's allocators.
1019        let max_node_id = layered.overlay_store().next_node_id().saturating_sub(1);
1020        let max_edge_id = layered.overlay_store().next_edge_id().saturating_sub(1);
1021
1022        let fresh_compact = from_graph_store_preserving_ids(combined.as_ref())
1023            .map_err(|e| Error::Internal(e.to_string()))?;
1024
1025        let new_layered = Arc::new(
1026            LayeredStore::new(fresh_compact, max_node_id, max_edge_id)
1027                .map_err(|e| Error::Internal(e.to_string()))?,
1028        );
1029
1030        // Sync overlay epoch.
1031        let current_epoch = self.transaction_manager.current_epoch();
1032        new_layered.overlay_store().sync_epoch(current_epoch);
1033
1034        // Carry named graphs forward: the old overlay is about to be dropped.
1035        new_layered
1036            .overlay_store()
1037            .install_named_graphs(layered.overlay_store().take_named_graphs());
1038
1039        self.external_read_store = Some(Arc::clone(&new_layered) as Arc<dyn GraphStoreSearch>);
1040        self.external_write_store = Some(Arc::clone(&new_layered) as Arc<dyn GraphStoreMut>);
1041        self.store = Some(new_layered.overlay_store());
1042
1043        // Replace the tier wrapper: old one's Weak refs will now return None,
1044        // and its consumer (unregistered below) no longer tracks the freshly
1045        // built base.
1046        #[cfg(feature = "mmap")]
1047        {
1048            self.buffer_manager
1049                .unregister_consumer("section:CompactStore");
1050            let tiered = Arc::new(compact_tiered::CompactStoreTiered::new_in_memory(
1051                new_layered.base_store_arc(),
1052            ));
1053            let spill_path = self.buffer_manager.config().spill_path.clone();
1054            let consumer = Arc::new(section_consumer::CompactStoreConsumer::new(
1055                &tiered,
1056                &new_layered,
1057                spill_path,
1058            ));
1059            self.buffer_manager.register_consumer(consumer);
1060            self.compact_tiered = Some(tiered);
1061        }
1062
1063        // Phase 5c: re-register overlay consumer for the new layered store.
1064        self.buffer_manager.unregister_consumer("overlay:LpgStore");
1065        let overlay_consumer = Arc::new(section_consumer::OverlayConsumer::new(&new_layered));
1066        self.buffer_manager.register_consumer(overlay_consumer);
1067
1068        self.layered_store = Some(new_layered);
1069        self.query_cache = Arc::new(QueryCache::default());
1070
1071        Ok(())
1072    }
1073
1074    /// Applies WAL records to restore the database state.
1075    ///
1076    /// Data mutation records are routed through a graph cursor that tracks
1077    /// `SwitchGraph` context markers, replaying mutations into the correct
1078    /// named graph (or the default graph when cursor is `None`).
1079    #[cfg(all(feature = "wal", feature = "lpg"))]
1080    fn apply_wal_records(
1081        store: &Arc<LpgStore>,
1082        catalog: &Catalog,
1083        #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
1084        records: &[WalRecord],
1085    ) -> Result<()> {
1086        use crate::catalog::{
1087            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
1088        };
1089        use grafeo_common::utils::error::Error;
1090
1091        // Graph cursor: tracks which named graph receives data mutations.
1092        // `None` means the default graph.
1093        let mut current_graph: Option<String> = None;
1094        let mut target_store: Arc<LpgStore> = Arc::clone(store);
1095
1096        for record in records {
1097            match record {
1098                // --- Named graph lifecycle ---
1099                WalRecord::CreateNamedGraph { name } => {
1100                    let _ = store.create_graph(name);
1101                }
1102                WalRecord::DropNamedGraph { name } => {
1103                    store.drop_graph(name);
1104                    // Reset cursor if the dropped graph was active
1105                    if current_graph.as_deref() == Some(name.as_str()) {
1106                        current_graph = None;
1107                        target_store = Arc::clone(store);
1108                    }
1109                }
1110                WalRecord::SwitchGraph { name } => {
1111                    current_graph.clone_from(name);
1112                    target_store = match &current_graph {
1113                        None => Arc::clone(store),
1114                        Some(graph_name) => store
1115                            .graph_or_create(graph_name)
1116                            .map_err(|e| Error::Internal(e.to_string()))?,
1117                    };
1118                }
1119
1120                // --- Data mutations: routed through target_store ---
1121                WalRecord::CreateNode { id, labels } => {
1122                    let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
1123                    target_store.create_node_with_id(*id, &label_refs)?;
1124                }
1125                WalRecord::DeleteNode { id } => {
1126                    target_store.delete_node(*id);
1127                }
1128                WalRecord::CreateEdge {
1129                    id,
1130                    src,
1131                    dst,
1132                    edge_type,
1133                } => {
1134                    target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
1135                }
1136                WalRecord::DeleteEdge { id } => {
1137                    target_store.delete_edge(*id);
1138                }
1139                WalRecord::SetNodeProperty { id, key, value } => {
1140                    target_store.set_node_property(*id, key, value.clone());
1141                }
1142                WalRecord::SetEdgeProperty { id, key, value } => {
1143                    target_store.set_edge_property(*id, key, value.clone());
1144                }
1145                WalRecord::AddNodeLabel { id, label } => {
1146                    target_store.add_label(*id, label);
1147                }
1148                WalRecord::RemoveNodeLabel { id, label } => {
1149                    target_store.remove_label(*id, label);
1150                }
1151                WalRecord::RemoveNodeProperty { id, key } => {
1152                    target_store.remove_node_property(*id, key);
1153                }
1154                WalRecord::RemoveEdgeProperty { id, key } => {
1155                    target_store.remove_edge_property(*id, key);
1156                }
1157
1158                // --- Schema DDL replay (always on root catalog) ---
1159                WalRecord::CreateNodeType {
1160                    name,
1161                    properties,
1162                    constraints,
1163                } => {
1164                    let def = NodeTypeDefinition {
1165                        name: name.clone(),
1166                        properties: properties
1167                            .iter()
1168                            .map(|(n, t, nullable)| TypedProperty {
1169                                name: n.clone(),
1170                                data_type: PropertyDataType::from_type_name(t),
1171                                nullable: *nullable,
1172                                default_value: None,
1173                            })
1174                            .collect(),
1175                        constraints: constraints
1176                            .iter()
1177                            .map(|(kind, props)| match kind.as_str() {
1178                                "unique" => TypeConstraint::Unique(props.clone()),
1179                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
1180                                "not_null" if !props.is_empty() => {
1181                                    TypeConstraint::NotNull(props[0].clone())
1182                                }
1183                                _ => TypeConstraint::Unique(props.clone()),
1184                            })
1185                            .collect(),
1186                        parent_types: Vec::new(),
1187                    };
1188                    let _ = catalog.register_node_type(def);
1189                }
1190                WalRecord::DropNodeType { name } => {
1191                    let _ = catalog.drop_node_type(name);
1192                }
1193                WalRecord::CreateEdgeType {
1194                    name,
1195                    properties,
1196                    constraints,
1197                } => {
1198                    let def = EdgeTypeDefinition {
1199                        name: name.clone(),
1200                        properties: properties
1201                            .iter()
1202                            .map(|(n, t, nullable)| TypedProperty {
1203                                name: n.clone(),
1204                                data_type: PropertyDataType::from_type_name(t),
1205                                nullable: *nullable,
1206                                default_value: None,
1207                            })
1208                            .collect(),
1209                        constraints: constraints
1210                            .iter()
1211                            .map(|(kind, props)| match kind.as_str() {
1212                                "unique" => TypeConstraint::Unique(props.clone()),
1213                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
1214                                "not_null" if !props.is_empty() => {
1215                                    TypeConstraint::NotNull(props[0].clone())
1216                                }
1217                                _ => TypeConstraint::Unique(props.clone()),
1218                            })
1219                            .collect(),
1220                        source_node_types: Vec::new(),
1221                        target_node_types: Vec::new(),
1222                    };
1223                    let _ = catalog.register_edge_type_def(def);
1224                }
1225                WalRecord::DropEdgeType { name } => {
1226                    let _ = catalog.drop_edge_type_def(name);
1227                }
1228                WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
1229                    // Index recreation is handled by the store on startup
1230                    // (indexes are rebuilt from data, not WAL)
1231                }
1232                WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
1233                    // Constraint definitions are part of type definitions
1234                    // and replayed via CreateNodeType/CreateEdgeType
1235                }
1236                WalRecord::CreateGraphType {
1237                    name,
1238                    node_types,
1239                    edge_types,
1240                    open,
1241                } => {
1242                    use crate::catalog::GraphTypeDefinition;
1243                    let def = GraphTypeDefinition {
1244                        name: name.clone(),
1245                        allowed_node_types: node_types.clone(),
1246                        allowed_edge_types: edge_types.clone(),
1247                        open: *open,
1248                    };
1249                    let _ = catalog.register_graph_type(def);
1250                }
1251                WalRecord::DropGraphType { name } => {
1252                    let _ = catalog.drop_graph_type(name);
1253                }
1254                WalRecord::CreateSchema { name } => {
1255                    let _ = catalog.register_schema_namespace(name.clone());
1256                }
1257                WalRecord::DropSchema { name } => {
1258                    let _ = catalog.drop_schema_namespace(name);
1259                }
1260
1261                WalRecord::AlterNodeType { name, alterations } => {
1262                    for (action, prop_name, type_name, nullable) in alterations {
1263                        match action.as_str() {
1264                            "add" => {
1265                                let prop = TypedProperty {
1266                                    name: prop_name.clone(),
1267                                    data_type: PropertyDataType::from_type_name(type_name),
1268                                    nullable: *nullable,
1269                                    default_value: None,
1270                                };
1271                                let _ = catalog.alter_node_type_add_property(name, prop);
1272                            }
1273                            "drop" => {
1274                                let _ = catalog.alter_node_type_drop_property(name, prop_name);
1275                            }
1276                            _ => {}
1277                        }
1278                    }
1279                }
1280                WalRecord::AlterEdgeType { name, alterations } => {
1281                    for (action, prop_name, type_name, nullable) in alterations {
1282                        match action.as_str() {
1283                            "add" => {
1284                                let prop = TypedProperty {
1285                                    name: prop_name.clone(),
1286                                    data_type: PropertyDataType::from_type_name(type_name),
1287                                    nullable: *nullable,
1288                                    default_value: None,
1289                                };
1290                                let _ = catalog.alter_edge_type_add_property(name, prop);
1291                            }
1292                            "drop" => {
1293                                let _ = catalog.alter_edge_type_drop_property(name, prop_name);
1294                            }
1295                            _ => {}
1296                        }
1297                    }
1298                }
1299                WalRecord::AlterGraphType { name, alterations } => {
1300                    for (action, type_name) in alterations {
1301                        match action.as_str() {
1302                            "add_node" => {
1303                                let _ =
1304                                    catalog.alter_graph_type_add_node_type(name, type_name.clone());
1305                            }
1306                            "drop_node" => {
1307                                let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
1308                            }
1309                            "add_edge" => {
1310                                let _ =
1311                                    catalog.alter_graph_type_add_edge_type(name, type_name.clone());
1312                            }
1313                            "drop_edge" => {
1314                                let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
1315                            }
1316                            _ => {}
1317                        }
1318                    }
1319                }
1320
1321                WalRecord::CreateProcedure {
1322                    name,
1323                    params,
1324                    returns,
1325                    body,
1326                } => {
1327                    use crate::catalog::ProcedureDefinition;
1328                    let def = ProcedureDefinition {
1329                        name: name.clone(),
1330                        params: params.clone(),
1331                        returns: returns.clone(),
1332                        body: body.clone(),
1333                    };
1334                    let _ = catalog.register_procedure(def);
1335                }
1336                WalRecord::DropProcedure { name } => {
1337                    let _ = catalog.drop_procedure(name);
1338                }
1339
1340                // --- RDF triple replay ---
1341                #[cfg(feature = "triple-store")]
1342                WalRecord::InsertRdfTriple { .. }
1343                | WalRecord::DeleteRdfTriple { .. }
1344                | WalRecord::ClearRdfGraph { .. }
1345                | WalRecord::CreateRdfGraph { .. }
1346                | WalRecord::DropRdfGraph { .. } => {
1347                    rdf_ops::replay_rdf_wal_record(rdf_store, record);
1348                }
1349                #[cfg(not(feature = "triple-store"))]
1350                WalRecord::InsertRdfTriple { .. }
1351                | WalRecord::DeleteRdfTriple { .. }
1352                | WalRecord::ClearRdfGraph { .. }
1353                | WalRecord::CreateRdfGraph { .. }
1354                | WalRecord::DropRdfGraph { .. } => {}
1355
1356                WalRecord::TransactionCommit { .. } => {
1357                    // In temporal mode, advance the store epoch on each committed
1358                    // transaction so that subsequent property/label operations
1359                    // are recorded at the correct epoch in their VersionLogs.
1360                    #[cfg(feature = "temporal")]
1361                    {
1362                        target_store.new_epoch();
1363                    }
1364                }
1365                WalRecord::TransactionAbort { .. } | WalRecord::Checkpoint { .. } => {
1366                    // Transaction control records don't need replay action
1367                    // (recovery already filtered to only committed transactions)
1368                }
1369                WalRecord::EpochAdvance { .. } => {
1370                    // Metadata record: no store mutation needed.
1371                    // Used by incremental backup and point-in-time recovery.
1372                }
1373            }
1374        }
1375        Ok(())
1376    }
1377
1378    // =========================================================================
1379    // Single-file format helpers
1380    // =========================================================================
1381
1382    /// Returns `true` if the given path should use single-file format.
1383    #[cfg(feature = "grafeo-file")]
1384    fn should_use_single_file(
1385        path: &std::path::Path,
1386        configured: crate::config::StorageFormat,
1387    ) -> bool {
1388        use crate::config::StorageFormat;
1389        match configured {
1390            StorageFormat::SingleFile => true,
1391            StorageFormat::WalDirectory => false,
1392            StorageFormat::Auto => {
1393                // Existing file: check magic bytes
1394                if path.is_file() {
1395                    if let Ok(mut f) = std::fs::File::open(path) {
1396                        use std::io::Read;
1397                        let mut magic = [0u8; 4];
1398                        if f.read_exact(&mut magic).is_ok() && magic == grafeo_storage::file::MAGIC
1399                        {
1400                            return true;
1401                        }
1402                    }
1403                    return false;
1404                }
1405                // Existing directory: legacy format
1406                if path.is_dir() {
1407                    return false;
1408                }
1409                // New path: check extension
1410                path.extension().is_some_and(|ext| ext == "grafeo")
1411            }
1412        }
1413    }
1414
1415    /// Applies snapshot data (from a `.grafeo` file) to restore the store and catalog.
1416    ///
1417    /// Supports both v1 (monolithic blob) and v2 (section-based) formats.
1418    #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
1419    fn apply_snapshot_data(
1420        store: &Arc<LpgStore>,
1421        catalog: &Arc<crate::catalog::Catalog>,
1422        #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
1423        data: &[u8],
1424    ) -> Result<()> {
1425        // v1 blob format: pass through to legacy loader
1426        persistence::load_snapshot_into_store(
1427            store,
1428            catalog,
1429            #[cfg(feature = "triple-store")]
1430            rdf_store,
1431            data,
1432        )
1433    }
1434
1435    /// Phase 5e: post-load LayeredStore wiring.
1436    ///
1437    /// After `load_from_sections` has populated `self.store` (the LpgStore,
1438    /// which now holds the overlay data) and `extract_compact_base` has
1439    /// produced the base, this rebuilds the same engine state that
1440    /// `compact()` establishes:
1441    ///
1442    /// - `self.layered_store = Some(LayeredStore { base, overlay = self.store })`
1443    /// - `self.external_read_store / external_write_store = Arc::clone(layered)`
1444    /// - `self.store` swapped to the overlay (which is the same `Arc<LpgStore>`)
1445    /// - Tier wrapper installed and `CompactStoreConsumer` + `OverlayConsumer`
1446    ///   registered with the BufferManager
1447    #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "compact-store"))]
1448    fn wire_layered_after_load(
1449        &mut self,
1450        compact_base: Arc<grafeo_core::graph::compact::CompactStore>,
1451        deletion_log: Option<(
1452            Vec<grafeo_common::types::NodeId>,
1453            Vec<grafeo_common::types::EdgeId>,
1454        )>,
1455    ) -> Result<()> {
1456        use grafeo_core::graph::compact::layered::LayeredStore;
1457
1458        let overlay_store = self
1459            .store
1460            .as_ref()
1461            .ok_or_else(|| Error::Internal("wire_layered_after_load: no LpgStore".into()))?;
1462
1463        // Adopt the loaded base + the loaded overlay (id allocator state
1464        // is preserved on the overlay during deserialization).
1465        let layered = Arc::new(LayeredStore::with_overlay(
1466            Arc::clone(&compact_base),
1467            Arc::clone(overlay_store),
1468        ));
1469
1470        // Restore base-entity tombstones from the persisted deletion log,
1471        // if the file carried one. Without this, base nodes/edges deleted
1472        // by the previous session silently reappear after reload.
1473        if let Some((nodes, edges)) = deletion_log {
1474            layered.seed_deleted_from_base(nodes, edges);
1475        }
1476
1477        // Sync overlay epoch with the transaction manager.
1478        let current_epoch = self.transaction_manager.current_epoch();
1479        layered.overlay_store().sync_epoch(current_epoch);
1480
1481        self.external_read_store = Some(Arc::clone(&layered) as Arc<dyn GraphStoreSearch>);
1482        self.external_write_store = Some(Arc::clone(&layered) as Arc<dyn GraphStoreMut>);
1483
1484        // Install the tier wrapper + consumers (mirror of compact()'s flow).
1485        #[cfg(feature = "mmap")]
1486        {
1487            let tiered = Arc::new(compact_tiered::CompactStoreTiered::new_in_memory(
1488                layered.base_store_arc(),
1489            ));
1490            let spill_path = self.buffer_manager.config().spill_path.clone();
1491            let consumer = Arc::new(section_consumer::CompactStoreConsumer::new(
1492                &tiered, &layered, spill_path,
1493            ));
1494            self.buffer_manager.register_consumer(consumer);
1495            self.compact_tiered = Some(tiered);
1496        }
1497
1498        let overlay_consumer = Arc::new(section_consumer::OverlayConsumer::new(&layered));
1499        self.buffer_manager.register_consumer(overlay_consumer);
1500
1501        self.layered_store = Some(layered);
1502
1503        Ok(())
1504    }
1505
1506    /// Phase 5e: extracts the deserialized CompactStore base from a v2
1507    /// section file, if present. Used by the open path to reconstruct
1508    /// the LayeredStore wiring after a previously-compacted database
1509    /// reopens.
1510    #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "compact-store"))]
1511    fn extract_compact_base(
1512        fm: &GrafeoFileManager,
1513    ) -> Result<Option<Arc<grafeo_core::graph::compact::CompactStore>>> {
1514        use grafeo_common::storage::{Section, SectionType};
1515        let Some(dir) = fm.read_section_directory()? else {
1516            return Ok(None);
1517        };
1518        let Some(entry) = dir.find(SectionType::CompactStore) else {
1519            return Ok(None);
1520        };
1521        let data = fm.read_section_data(entry)?;
1522        let mut section = grafeo_core::graph::compact::section::CompactStoreSection::empty();
1523        section.deserialize(&data)?;
1524        Ok(section.store())
1525    }
1526
1527    /// Reads the persisted overlay deletion log from the container, if
1528    /// the file carries one. Returns `(deleted_nodes, deleted_edges)` so
1529    /// the caller can seed `LayeredStore::seed_deleted_from_base` after
1530    /// the layered store is constructed. Returns `None` when no
1531    /// deletions were recorded (the section is omitted in that case).
1532    #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "compact-store"))]
1533    fn extract_overlay_deletions(
1534        fm: &GrafeoFileManager,
1535    ) -> Result<
1536        Option<(
1537            Vec<grafeo_common::types::NodeId>,
1538            Vec<grafeo_common::types::EdgeId>,
1539        )>,
1540    > {
1541        use grafeo_common::storage::{Section, SectionType};
1542        let Some(dir) = fm.read_section_directory()? else {
1543            return Ok(None);
1544        };
1545        let Some(entry) = dir.find(SectionType::OverlayDeletions) else {
1546            return Ok(None);
1547        };
1548        let data = fm.read_section_data(entry)?;
1549        let mut section =
1550            grafeo_core::graph::compact::deletions_section::OverlayDeletionsSection::empty();
1551        section.deserialize(&data)?;
1552        Ok(Some(section.take()))
1553    }
1554
1555    /// Loads from a section-based `.grafeo` file (v2 format).
1556    ///
1557    /// Reads the section directory, then deserializes each section independently.
1558    #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
1559    fn load_from_sections(
1560        fm: &GrafeoFileManager,
1561        store: &Arc<LpgStore>,
1562        catalog: &Arc<crate::catalog::Catalog>,
1563        #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
1564    ) -> Result<()> {
1565        use grafeo_common::storage::{Section, SectionType};
1566
1567        let dir = fm.read_section_directory()?.ok_or_else(|| {
1568            grafeo_common::utils::error::Error::Internal(
1569                "expected v2 section directory but found none".to_string(),
1570            )
1571        })?;
1572
1573        // Load catalog section first (schema defs needed before data)
1574        if let Some(entry) = dir.find(SectionType::Catalog) {
1575            let data = fm.read_section_data(entry)?;
1576            let tm = Arc::new(crate::transaction::TransactionManager::new());
1577            let mut section = catalog_section::CatalogSection::new(
1578                Arc::clone(catalog),
1579                Arc::clone(store),
1580                move || tm.current_epoch().as_u64(),
1581            );
1582            section.deserialize(&data)?;
1583        }
1584
1585        // Load LPG store (Phase 5e: when the file has a CompactStore section,
1586        // this LpgStore data IS the overlay; the caller then wires it into
1587        // a LayeredStore via `extract_compact_base`).
1588        if let Some(entry) = dir.find(SectionType::LpgStore) {
1589            let data = fm.read_section_data(entry)?;
1590            let mut section = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
1591            section.deserialize(&data)?;
1592        }
1593
1594        // Load RDF store
1595        #[cfg(feature = "triple-store")]
1596        if let Some(entry) = dir.find(SectionType::RdfStore) {
1597            let data = fm.read_section_data(entry)?;
1598            let mut section = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(rdf_store));
1599            section.deserialize(&data)?;
1600        }
1601
1602        // Restore Ring Index (if persisted)
1603        #[cfg(feature = "ring-index")]
1604        if let Some(entry) = dir.find(SectionType::RdfRing) {
1605            let data = fm.read_section_data(entry)?;
1606            let mut section = grafeo_core::index::ring::RdfRingSection::new(Arc::clone(rdf_store));
1607            section.deserialize(&data)?;
1608        }
1609
1610        // Restore HNSW topology (if vector indexes exist in both catalog and section)
1611        #[cfg(feature = "vector-index")]
1612        if let Some(entry) = dir.find(SectionType::VectorStore) {
1613            let data = fm.read_section_data(entry)?;
1614            let indexes = store.vector_index_entries();
1615            if !indexes.is_empty() {
1616                let mut section = grafeo_core::index::vector::VectorStoreSection::new(indexes);
1617                section.deserialize(&data)?;
1618            }
1619        }
1620
1621        // Restore BM25 postings (if text indexes exist in both catalog and section)
1622        #[cfg(feature = "text-index")]
1623        if let Some(entry) = dir.find(SectionType::TextIndex) {
1624            let data = fm.read_section_data(entry)?;
1625            let indexes = store.text_index_entries();
1626            if !indexes.is_empty() {
1627                let mut section = grafeo_core::index::text::TextIndexSection::new(indexes);
1628                section.deserialize(&data)?;
1629            }
1630        }
1631
1632        Ok(())
1633    }
1634
1635    // =========================================================================
1636    // Session & Configuration
1637    // =========================================================================
1638
1639    /// Opens a new session for running queries.
1640    ///
1641    /// Sessions are cheap to create: spin up as many as you need. Each
1642    /// gets its own transaction context, so concurrent sessions won't
1643    /// block each other on reads.
1644    ///
1645    /// # Panics
1646    ///
1647    /// Panics if the database was configured with an external graph store and
1648    /// the internal arena allocator cannot be initialized (out of memory).
1649    ///
1650    /// # Examples
1651    ///
1652    /// ```
1653    /// use grafeo_engine::GrafeoDB;
1654    ///
1655    /// let db = GrafeoDB::new_in_memory();
1656    /// let session = db.session();
1657    ///
1658    /// // Run queries through the session
1659    /// let result = session.execute("MATCH (n) RETURN count(n)")?;
1660    /// # Ok::<(), grafeo_common::utils::error::Error>(())
1661    /// ```
1662    #[must_use]
1663    pub fn session(&self) -> Session {
1664        self.create_session_inner(None)
1665    }
1666
1667    /// Creates a session scoped to the given identity.
1668    ///
1669    /// The identity determines what operations the session is allowed to
1670    /// perform. A [`Role::ReadOnly`](crate::auth::Role::ReadOnly) identity
1671    /// creates a read-only session; a [`Role::ReadWrite`](crate::auth::Role::ReadWrite)
1672    /// identity allows data mutations but not schema DDL; a
1673    /// [`Role::Admin`](crate::auth::Role::Admin) identity has full access.
1674    ///
1675    /// # Examples
1676    ///
1677    /// ```
1678    /// use grafeo_engine::{GrafeoDB, auth::{Identity, Role}};
1679    ///
1680    /// let db = GrafeoDB::new_in_memory();
1681    /// let identity = Identity::new("app-service", [Role::ReadWrite]);
1682    /// let session = db.session_with_identity(identity);
1683    /// ```
1684    #[must_use]
1685    pub fn session_with_identity(&self, identity: crate::auth::Identity) -> Session {
1686        let force_read_only = !identity.can_write();
1687        self.create_session_inner_full(None, force_read_only, identity)
1688    }
1689
1690    /// Creates a session scoped to a single role.
1691    ///
1692    /// Convenience shorthand for
1693    /// `session_with_identity(Identity::new("anonymous", [role]))`.
1694    ///
1695    /// # Examples
1696    ///
1697    /// ```
1698    /// use grafeo_engine::{GrafeoDB, auth::Role};
1699    ///
1700    /// let db = GrafeoDB::new_in_memory();
1701    /// let reader = db.session_with_role(Role::ReadOnly);
1702    /// ```
1703    #[must_use]
1704    pub fn session_with_role(&self, role: crate::auth::Role) -> Session {
1705        self.session_with_identity(crate::auth::Identity::new("anonymous", [role]))
1706    }
1707
1708    /// Creates a session with an explicit CDC override.
1709    ///
1710    /// When `cdc_enabled` is `true`, mutations in this session are tracked
1711    /// regardless of the database default. When `false`, mutations are not
1712    /// tracked regardless of the database default.
1713    ///
1714    /// # Examples
1715    ///
1716    /// ```
1717    /// use grafeo_engine::GrafeoDB;
1718    ///
1719    /// let db = GrafeoDB::new_in_memory();
1720    ///
1721    /// // Opt in to CDC for just this session
1722    /// let tracked = db.session_with_cdc(true);
1723    /// tracked.execute("INSERT (:Person {name: 'Alix'})")?;
1724    /// # Ok::<(), grafeo_common::utils::error::Error>(())
1725    /// ```
1726    #[cfg(feature = "cdc")]
1727    #[must_use]
1728    pub fn session_with_cdc(&self, cdc_enabled: bool) -> Session {
1729        self.create_session_inner(Some(cdc_enabled))
1730    }
1731
1732    /// Creates a read-only session regardless of the database's access mode.
1733    ///
1734    /// Mutations executed through this session will fail with
1735    /// `TransactionError::ReadOnly`. Useful for replication replicas where
1736    /// the database itself must remain writable (for applying CDC changes)
1737    /// but client-facing queries must be read-only.
1738    ///
1739    /// **Deprecated**: Use `session_with_role(Role::ReadOnly)` instead.
1740    #[deprecated(
1741        since = "0.5.36",
1742        note = "use session_with_role(Role::ReadOnly) instead"
1743    )]
1744    #[must_use]
1745    pub fn session_read_only(&self) -> Session {
1746        self.session_with_role(crate::auth::Role::ReadOnly)
1747    }
1748
1749    /// Shared session creation logic.
1750    ///
1751    /// `cdc_override` overrides the database-wide `cdc_enabled` default when
1752    /// `Some`. `None` falls back to the database default.
1753    #[allow(unused_variables)] // cdc_override unused when cdc feature is off
1754    fn create_session_inner(&self, cdc_override: Option<bool>) -> Session {
1755        self.create_session_inner_full(cdc_override, false, crate::auth::Identity::anonymous())
1756    }
1757
1758    /// Shared session creation with all overrides.
1759    #[allow(unused_variables)]
1760    fn create_session_inner_full(
1761        &self,
1762        cdc_override: Option<bool>,
1763        force_read_only: bool,
1764        identity: crate::auth::Identity,
1765    ) -> Session {
1766        let session_cfg = || crate::session::SessionConfig {
1767            transaction_manager: Arc::clone(&self.transaction_manager),
1768            query_cache: Arc::clone(&self.query_cache),
1769            catalog: Arc::clone(&self.catalog),
1770            adaptive_config: self.config.adaptive.clone(),
1771            factorized_execution: self.config.factorized_execution,
1772            graph_model: self.config.graph_model,
1773            query_timeout: self.config.query_timeout,
1774            max_property_size: self.config.max_property_size,
1775            buffer_manager: Some(Arc::clone(&self.buffer_manager)),
1776            commit_counter: Arc::clone(&self.commit_counter),
1777            gc_interval: self.config.gc_interval,
1778            read_only: self.read_only || force_read_only,
1779            identity: identity.clone(),
1780            #[cfg(feature = "lpg")]
1781            projections: Arc::clone(&self.projections),
1782        };
1783
1784        // Layered store: use the overlay LpgStore as the session's internal store
1785        // so MVCC operations (begin_tx, commit, visibility) work correctly.
1786        #[cfg(all(feature = "compact-store", feature = "lpg"))]
1787        if let Some(ref layered) = self.layered_store {
1788            let overlay = layered.overlay_store();
1789            let layered_arc = Arc::clone(layered);
1790            let mut session = Session::with_adaptive(overlay, session_cfg());
1791            // Override graph_store/graph_store_mut to use the LayeredStore
1792            // (which merges base + overlay), not just the overlay alone.
1793            session.override_stores(
1794                Arc::clone(&layered_arc) as Arc<dyn GraphStoreSearch>,
1795                Some(layered_arc as Arc<dyn GraphStoreMut>),
1796            );
1797            return session;
1798        }
1799
1800        if let Some(ref ext_read) = self.external_read_store {
1801            return Session::with_external_store(
1802                Arc::clone(ext_read),
1803                self.external_write_store.as_ref().map(Arc::clone),
1804                session_cfg(),
1805            )
1806            .expect("arena allocation for external store session");
1807        }
1808
1809        #[cfg(all(feature = "lpg", feature = "triple-store"))]
1810        let mut session = Session::with_rdf_store_and_adaptive(
1811            Arc::clone(self.lpg_store()),
1812            Arc::clone(&self.rdf_store),
1813            session_cfg(),
1814        );
1815        #[cfg(all(feature = "lpg", not(feature = "triple-store")))]
1816        let mut session = Session::with_adaptive(Arc::clone(self.lpg_store()), session_cfg());
1817        #[cfg(not(feature = "lpg"))]
1818        let mut session =
1819            Session::with_external_store(self.graph_store(), self.graph_store_mut(), session_cfg())
1820                .expect("session creation for non-lpg build");
1821
1822        #[cfg(all(feature = "wal", feature = "lpg"))]
1823        if let Some(ref wal) = self.wal {
1824            session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
1825        }
1826
1827        #[cfg(feature = "cdc")]
1828        {
1829            let should_enable = cdc_override.unwrap_or_else(|| self.cdc_active());
1830            if should_enable {
1831                session.set_cdc_log(Arc::clone(&self.cdc_log));
1832            }
1833        }
1834
1835        #[cfg(feature = "metrics")]
1836        {
1837            if let Some(ref m) = self.metrics {
1838                session.set_metrics(Arc::clone(m));
1839                m.session_created
1840                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1841                m.session_active
1842                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1843            }
1844        }
1845
1846        // Propagate persistent graph context to the new session
1847        if let Some(ref graph) = *self.current_graph.read() {
1848            session.use_graph(graph);
1849        }
1850
1851        // Propagate persistent schema context to the new session
1852        if let Some(ref schema) = *self.current_schema.read() {
1853            session.set_schema(schema);
1854        }
1855
1856        // Suppress unused_mut when cdc/wal are disabled
1857        let _ = &mut session;
1858
1859        session
1860    }
1861
1862    /// Returns the current graph name, if any.
1863    ///
1864    /// This is the persistent graph context used by one-shot `execute()` calls.
1865    /// It is updated whenever `execute()` encounters `USE GRAPH`, `SESSION SET GRAPH`,
1866    /// or `SESSION RESET`.
1867    #[must_use]
1868    pub fn current_graph(&self) -> Option<String> {
1869        self.current_graph.read().clone()
1870    }
1871
1872    /// Sets the current graph context for subsequent one-shot `execute()` calls.
1873    ///
1874    /// This is equivalent to running `USE GRAPH <name>` but without creating a session.
1875    /// Pass `None` to reset to the default graph.
1876    ///
1877    /// # Errors
1878    ///
1879    /// Returns an error if the named graph does not exist.
1880    pub fn set_current_graph(&self, name: Option<&str>) -> Result<()> {
1881        #[cfg(feature = "lpg")]
1882        if let Some(name) = name
1883            && !name.eq_ignore_ascii_case("default")
1884            && let Some(store) = &self.store
1885            && store.graph(name).is_none()
1886        {
1887            return Err(Error::Query(QueryError::new(
1888                QueryErrorKind::Semantic,
1889                format!("Graph '{name}' does not exist"),
1890            )));
1891        }
1892        *self.current_graph.write() = name.map(ToString::to_string);
1893        Ok(())
1894    }
1895
1896    /// Returns the current schema name, if any.
1897    ///
1898    /// This is the persistent schema context used by one-shot `execute()` calls.
1899    /// It is updated whenever `execute()` encounters `SESSION SET SCHEMA` or `SESSION RESET`.
1900    #[must_use]
1901    pub fn current_schema(&self) -> Option<String> {
1902        self.current_schema.read().clone()
1903    }
1904
1905    /// Sets the current schema context for subsequent one-shot `execute()` calls.
1906    ///
1907    /// This is equivalent to running `SESSION SET SCHEMA <name>` but without creating
1908    /// a session. Pass `None` to clear the schema context.
1909    ///
1910    /// # Errors
1911    ///
1912    /// Returns an error if the named schema does not exist.
1913    pub fn set_current_schema(&self, name: Option<&str>) -> Result<()> {
1914        if let Some(name) = name
1915            && !self.catalog.schema_exists(name)
1916        {
1917            return Err(Error::Query(QueryError::new(
1918                QueryErrorKind::Semantic,
1919                format!("Schema '{name}' does not exist"),
1920            )));
1921        }
1922        *self.current_schema.write() = name.map(ToString::to_string);
1923        Ok(())
1924    }
1925
1926    /// Returns the adaptive execution configuration.
1927    #[must_use]
1928    pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
1929        &self.config.adaptive
1930    }
1931
1932    /// Returns `true` if this database was opened in read-only mode.
1933    #[must_use]
1934    pub fn is_read_only(&self) -> bool {
1935        self.read_only
1936    }
1937
1938    /// Returns the configuration.
1939    #[must_use]
1940    pub fn config(&self) -> &Config {
1941        &self.config
1942    }
1943
1944    /// Returns the graph data model of this database.
1945    #[must_use]
1946    pub fn graph_model(&self) -> crate::config::GraphModel {
1947        self.config.graph_model
1948    }
1949
1950    /// Returns the configured memory limit in bytes, if any.
1951    #[must_use]
1952    pub fn memory_limit(&self) -> Option<usize> {
1953        self.config.memory_limit
1954    }
1955
1956    /// Returns a point-in-time snapshot of all metrics.
1957    ///
1958    /// If the `metrics` feature is disabled or the registry is not
1959    /// initialized, returns a default (all-zero) snapshot.
1960    #[cfg(feature = "metrics")]
1961    #[must_use]
1962    pub fn metrics(&self) -> crate::metrics::MetricsSnapshot {
1963        let mut snapshot = self
1964            .metrics
1965            .as_ref()
1966            .map_or_else(crate::metrics::MetricsSnapshot::default, |m| m.snapshot());
1967
1968        // Augment with cache stats from the query cache (not tracked in the registry)
1969        let cache_stats = self.query_cache.stats();
1970        snapshot.cache_hits = cache_stats.parsed_hits + cache_stats.optimized_hits;
1971        snapshot.cache_misses = cache_stats.parsed_misses + cache_stats.optimized_misses;
1972        snapshot.cache_size = cache_stats.parsed_size + cache_stats.optimized_size;
1973        snapshot.cache_invalidations = cache_stats.invalidations;
1974
1975        snapshot
1976    }
1977
1978    /// Returns all metrics in Prometheus text exposition format.
1979    ///
1980    /// The output is ready to serve from an HTTP `/metrics` endpoint.
1981    #[cfg(feature = "metrics")]
1982    #[must_use]
1983    pub fn metrics_prometheus(&self) -> String {
1984        self.metrics
1985            .as_ref()
1986            .map_or_else(String::new, |m| m.to_prometheus())
1987    }
1988
1989    /// Resets all metrics counters and histograms to zero.
1990    #[cfg(feature = "metrics")]
1991    pub fn reset_metrics(&self) {
1992        if let Some(ref m) = self.metrics {
1993            m.reset();
1994        }
1995        self.query_cache.reset_stats();
1996    }
1997
1998    /// Returns the underlying (default) store.
1999    ///
2000    /// This provides direct access to the LPG store for algorithm implementations
2001    /// and admin operations (index management, schema introspection, MVCC internals).
2002    ///
2003    /// For code that only needs read/write graph operations, prefer
2004    /// [`graph_store()`](Self::graph_store) which returns the trait interface.
2005    #[cfg(feature = "lpg")]
2006    #[must_use]
2007    pub fn store(&self) -> &Arc<LpgStore> {
2008        self.lpg_store()
2009    }
2010
2011    // === Named Graph Management ===
2012
2013    /// Creates a named graph. Returns `true` if created, `false` if it already exists.
2014    ///
2015    /// # Errors
2016    ///
2017    /// Returns an error if arena allocation fails.
2018    #[cfg(feature = "lpg")]
2019    pub fn create_graph(&self, name: &str) -> Result<bool> {
2020        Ok(self.lpg_store().create_graph(name)?)
2021    }
2022
2023    /// Drops a named graph. Returns `true` if dropped, `false` if it did not exist.
2024    ///
2025    /// If the dropped graph was the active graph context, the context is reset
2026    /// to the default graph.
2027    #[cfg(feature = "lpg")]
2028    pub fn drop_graph(&self, name: &str) -> bool {
2029        let Some(store) = &self.store else {
2030            return false;
2031        };
2032        let dropped = store.drop_graph(name);
2033        if dropped {
2034            let mut current = self.current_graph.write();
2035            if current
2036                .as_deref()
2037                .is_some_and(|g| g.eq_ignore_ascii_case(name))
2038            {
2039                *current = None;
2040            }
2041        }
2042        dropped
2043    }
2044
2045    /// Returns all named graph names.
2046    #[cfg(feature = "lpg")]
2047    #[must_use]
2048    pub fn list_graphs(&self) -> Vec<String> {
2049        self.lpg_store().graph_names()
2050    }
2051
2052    // === Graph Projections ===
2053
2054    /// Creates a named graph projection (virtual subgraph).
2055    ///
2056    /// The projection filters the graph store to only include nodes with the
2057    /// specified labels and edges with the specified types. Returns `true` if
2058    /// created, `false` if a projection with that name already exists.
2059    ///
2060    /// # Examples
2061    ///
2062    /// ```
2063    /// use grafeo_engine::GrafeoDB;
2064    /// use grafeo_core::graph::ProjectionSpec;
2065    ///
2066    /// let db = GrafeoDB::new_in_memory();
2067    /// let spec = ProjectionSpec::new()
2068    ///     .with_node_labels(["Person", "City"])
2069    ///     .with_edge_types(["LIVES_IN"]);
2070    /// assert!(db.create_projection("social", spec));
2071    /// ```
2072    pub fn create_projection(
2073        &self,
2074        name: impl Into<String>,
2075        spec: grafeo_core::graph::ProjectionSpec,
2076    ) -> bool {
2077        use grafeo_core::graph::GraphProjection;
2078        use std::collections::hash_map::Entry;
2079
2080        let store = self.graph_store();
2081        let projection = Arc::new(GraphProjection::new(store, spec));
2082        let mut projections = self.projections.write();
2083        match projections.entry(name.into()) {
2084            Entry::Occupied(_) => false,
2085            Entry::Vacant(e) => {
2086                e.insert(projection);
2087                true
2088            }
2089        }
2090    }
2091
2092    /// Drops a named graph projection. Returns `true` if it existed.
2093    pub fn drop_projection(&self, name: &str) -> bool {
2094        self.projections.write().remove(name).is_some()
2095    }
2096
2097    /// Returns the names of all graph projections.
2098    #[must_use]
2099    pub fn list_projections(&self) -> Vec<String> {
2100        self.projections.read().keys().cloned().collect()
2101    }
2102
2103    /// Returns a named projection as a [`GraphStoreSearch`] trait object.
2104    #[must_use]
2105    pub fn projection(&self, name: &str) -> Option<Arc<dyn GraphStoreSearch>> {
2106        self.projections
2107            .read()
2108            .get(name)
2109            .map(|p| Arc::clone(p) as Arc<dyn GraphStoreSearch>)
2110    }
2111
2112    /// Returns the graph store as a trait object.
2113    ///
2114    /// Returns a read-only trait object for the active graph store.
2115    ///
2116    /// This provides the [`GraphStoreSearch`] interface (graph-structure reads
2117    /// plus text/vector search capabilities) for code that only needs read
2118    /// operations. For write access, use [`graph_store_mut()`](Self::graph_store_mut).
2119    #[must_use]
2120    pub fn graph_store(&self) -> Arc<dyn GraphStoreSearch> {
2121        if let Some(ref ext_read) = self.external_read_store {
2122            Arc::clone(ext_read)
2123        } else {
2124            #[cfg(feature = "lpg")]
2125            {
2126                Arc::clone(self.lpg_store()) as Arc<dyn GraphStoreSearch>
2127            }
2128            #[cfg(not(feature = "lpg"))]
2129            unreachable!("no graph store available: enable the `lpg` feature or use with_store()")
2130        }
2131    }
2132
2133    /// Returns the writable graph store, if available.
2134    ///
2135    /// Returns `None` for read-only databases created via
2136    /// [`with_read_store()`](Self::with_read_store).
2137    #[must_use]
2138    pub fn graph_store_mut(&self) -> Option<Arc<dyn GraphStoreMut>> {
2139        if self.external_read_store.is_some() {
2140            self.external_write_store.as_ref().map(Arc::clone)
2141        } else {
2142            #[cfg(feature = "lpg")]
2143            {
2144                Some(Arc::clone(self.lpg_store()) as Arc<dyn GraphStoreMut>)
2145            }
2146            #[cfg(not(feature = "lpg"))]
2147            {
2148                None
2149            }
2150        }
2151    }
2152
2153    /// Garbage collects old MVCC versions that are no longer visible.
2154    ///
2155    /// Determines the minimum epoch required by active transactions and prunes
2156    /// version chains older than that threshold. Also cleans up completed
2157    /// transaction metadata in the transaction manager, and prunes the CDC
2158    /// event log according to its retention policy.
2159    pub fn gc(&self) {
2160        #[cfg(feature = "lpg")]
2161        {
2162            let min_epoch = self.transaction_manager.min_active_epoch();
2163            self.lpg_store().gc_versions(min_epoch);
2164        }
2165        #[cfg(all(feature = "lpg", feature = "cdc"))]
2166        let current_epoch = self.transaction_manager.current_epoch();
2167        self.transaction_manager.gc();
2168
2169        // Prune CDC events based on retention config (epoch + count limits)
2170        #[cfg(feature = "cdc")]
2171        if self.cdc_enabled.load(std::sync::atomic::Ordering::Relaxed) {
2172            #[cfg(feature = "lpg")]
2173            self.cdc_log.apply_retention(current_epoch);
2174        }
2175    }
2176
2177    /// Returns the buffer manager for memory-aware operations.
2178    #[must_use]
2179    pub fn buffer_manager(&self) -> &Arc<BufferManager> {
2180        &self.buffer_manager
2181    }
2182
2183    /// Returns the layered store (compact base + mutable overlay), if
2184    /// [`compact()`](Self::compact) has been called.
2185    #[cfg(all(feature = "compact-store", feature = "lpg"))]
2186    #[must_use]
2187    pub fn layered_store(
2188        &self,
2189    ) -> Option<&Arc<grafeo_core::graph::compact::layered::LayeredStore>> {
2190        self.layered_store.as_ref()
2191    }
2192
2193    /// Returns the disk-backed tier wrapper for the compact base, if
2194    /// [`compact()`](Self::compact) has been called and `mmap` is enabled.
2195    #[cfg(all(feature = "compact-store", feature = "mmap", feature = "lpg"))]
2196    #[must_use]
2197    pub fn compact_tiered(&self) -> Option<&Arc<compact_tiered::CompactStoreTiered>> {
2198        self.compact_tiered.as_ref()
2199    }
2200
2201    /// Returns the query cache.
2202    #[must_use]
2203    pub fn query_cache(&self) -> &Arc<QueryCache> {
2204        &self.query_cache
2205    }
2206
2207    /// Clears all cached query plans.
2208    ///
2209    /// This is called automatically after DDL operations, but can also be
2210    /// invoked manually after external schema changes (e.g., WAL replay,
2211    /// import) or when you want to force re-optimization of all queries.
2212    pub fn clear_plan_cache(&self) {
2213        self.query_cache.clear();
2214    }
2215
2216    // =========================================================================
2217    // Lifecycle
2218    // =========================================================================
2219
2220    /// Closes the database, flushing all pending writes.
2221    ///
2222    /// For persistent databases, this ensures everything is safely on disk.
2223    /// Called automatically when the database is dropped, but you can call
2224    /// it explicitly if you need to guarantee durability at a specific point.
2225    ///
2226    /// # Errors
2227    ///
2228    /// Returns an error if the WAL can't be flushed (check disk space/permissions).
2229    pub fn close(&self) -> Result<()> {
2230        let mut is_open = self.is_open.write();
2231        if !*is_open {
2232            return Ok(());
2233        }
2234
2235        // Stop the periodic checkpoint timer first, even for read-only databases.
2236        // compact() can switch a writable DB to read-only after the timer started,
2237        // so the timer must be stopped before any early return to avoid racing
2238        // with the closed file manager.
2239        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
2240        if let Some(mut timer) = self.checkpoint_timer.lock().take() {
2241            timer.stop();
2242        }
2243
2244        // Read-only databases: just release the shared lock, no checkpointing
2245        if self.read_only {
2246            #[cfg(feature = "grafeo-file")]
2247            if let Some(ref fm) = self.file_manager {
2248                fm.close()?;
2249            }
2250            *is_open = false;
2251            return Ok(());
2252        }
2253
2254        // For single-file format: checkpoint to .grafeo file, then clean up sidecar WAL.
2255        // We must do this BEFORE the WAL close path because checkpoint_to_file
2256        // removes the sidecar WAL directory.
2257        #[cfg(feature = "grafeo-file")]
2258        let is_single_file = self.file_manager.is_some();
2259        #[cfg(not(feature = "grafeo-file"))]
2260        let is_single_file = false;
2261
2262        #[cfg(feature = "grafeo-file")]
2263        if let Some(ref fm) = self.file_manager {
2264            // Flush WAL first so all records are on disk before we snapshot
2265            #[cfg(feature = "wal")]
2266            if let Some(ref wal) = self.wal {
2267                wal.sync()?;
2268            }
2269            let flush_result = self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?;
2270
2271            // Safety check: if WAL has records but the checkpoint was a no-op
2272            // (zero sections written), the container file may not contain the
2273            // latest data. This can happen when sections are not marked dirty
2274            // despite mutations going through the WAL. Force-dirty all sections
2275            // and retry before removing the sidecar.
2276            #[cfg(feature = "wal")]
2277            let flush_result = if flush_result.sections_written == 0 {
2278                if let Some(ref wal) = self.wal {
2279                    if wal.record_count() > 0 {
2280                        grafeo_warn!(
2281                            "WAL has {} records but checkpoint wrote 0 sections; retrying with forced flush",
2282                            wal.record_count()
2283                        );
2284                        self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?
2285                    } else {
2286                        flush_result
2287                    }
2288                } else {
2289                    flush_result
2290                }
2291            } else {
2292                flush_result
2293            };
2294
2295            // Release WAL file handles before removing sidecar directory.
2296            // On Windows, open handles prevent directory deletion.
2297            #[cfg(feature = "wal")]
2298            if let Some(ref wal) = self.wal {
2299                wal.close_active_log();
2300            }
2301
2302            // Only remove the sidecar WAL after verifying the checkpoint wrote
2303            // data to the container. If nothing was written and the WAL had
2304            // records, keep the sidecar so the next open can recover from it.
2305            #[cfg(feature = "wal")]
2306            let has_wal_records = self.wal.as_ref().is_some_and(|wal| wal.record_count() > 0);
2307            #[cfg(not(feature = "wal"))]
2308            let has_wal_records = false;
2309
2310            if flush_result.sections_written > 0 || !has_wal_records {
2311                {
2312                    use grafeo_common::testing::crash::maybe_crash;
2313                    maybe_crash("close:before_remove_sidecar_wal");
2314                }
2315                fm.remove_sidecar_wal()?;
2316            } else {
2317                grafeo_warn!(
2318                    "keeping sidecar WAL for recovery: checkpoint wrote 0 sections but WAL has records"
2319                );
2320            }
2321            fm.close()?;
2322        }
2323
2324        // Commit and sync WAL (legacy directory format only).
2325        // We intentionally do NOT call wal.checkpoint() here. Directory format
2326        // has no snapshot: the WAL files are the sole source of truth. Writing
2327        // checkpoint.meta would cause recovery to skip older WAL files, losing
2328        // all data that predates the current log sequence.
2329        #[cfg(feature = "wal")]
2330        if !is_single_file && let Some(ref wal) = self.wal {
2331            // Use the last assigned transaction ID, or create one for the commit record
2332            let commit_tx = self
2333                .transaction_manager
2334                .last_assigned_transaction_id()
2335                .unwrap_or_else(|| self.transaction_manager.begin());
2336
2337            // Log a TransactionCommit to mark all pending records as committed
2338            wal.log(&WalRecord::TransactionCommit {
2339                transaction_id: commit_tx,
2340            })?;
2341
2342            wal.sync()?;
2343        }
2344
2345        *is_open = false;
2346        Ok(())
2347    }
2348
2349    /// Returns the typed WAL if available.
2350    #[cfg(feature = "wal")]
2351    #[must_use]
2352    pub fn wal(&self) -> Option<&Arc<LpgWal>> {
2353        self.wal.as_ref()
2354    }
2355
2356    /// Logs a WAL record if WAL is enabled.
2357    #[cfg(feature = "wal")]
2358    pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
2359        if let Some(ref wal) = self.wal {
2360            wal.log(record)?;
2361        }
2362        Ok(())
2363    }
2364
2365    /// Registers storage sections as [`MemoryConsumer`]s with the BufferManager.
2366    ///
2367    /// Each section reports its memory usage to the buffer manager, enabling
2368    /// accurate pressure tracking. Called once after database construction.
2369    fn register_section_consumers(&mut self) {
2370        // LPG store section
2371        #[cfg(feature = "lpg")]
2372        let store_ref = self.store.as_ref();
2373        #[cfg(feature = "lpg")]
2374        if let Some(store) = store_ref {
2375            let lpg = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
2376            self.buffer_manager.register_consumer(Arc::new(
2377                section_consumer::SectionConsumer::new(Arc::new(lpg)),
2378            ));
2379        }
2380
2381        // RDF store: only when data exists
2382        #[cfg(feature = "triple-store")]
2383        if !self.rdf_store.is_empty() || self.rdf_store.graph_count() > 0 {
2384            let rdf = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(&self.rdf_store));
2385            self.buffer_manager.register_consumer(Arc::new(
2386                section_consumer::SectionConsumer::new(Arc::new(rdf)),
2387            ));
2388        }
2389
2390        // Ring Index: only when Ring has been built
2391        #[cfg(feature = "ring-index")]
2392        if self.rdf_store.ring().is_some() {
2393            let ring = grafeo_core::index::ring::RdfRingSection::new(Arc::clone(&self.rdf_store));
2394            // Ring is the first index section to opt into spill wiring.
2395            // The Ring's `swap_to_mmap` still returns `NotSupported` until
2396            // its packed disk format lands, so a spill attempt today fails
2397            // cleanly without leaking files; once the format is in place,
2398            // no engine-side change is required to enable real eviction.
2399            let consumer = match self.buffer_manager.config().spill_path.clone() {
2400                Some(path) => section_consumer::SectionConsumer::with_spill(Arc::new(ring), path),
2401                None => section_consumer::SectionConsumer::new(Arc::new(ring)),
2402            };
2403            self.buffer_manager.register_consumer(Arc::new(consumer));
2404        }
2405
2406        // Vector indexes: dynamic consumer that re-queries the store on each
2407        // memory_usage() call, so dropped indexes are freed and new ones tracked.
2408        #[cfg(all(
2409            feature = "lpg",
2410            feature = "vector-index",
2411            feature = "mmap",
2412            not(feature = "temporal")
2413        ))]
2414        if let Some(store) = store_ref {
2415            let spill_path = self.buffer_manager.config().spill_path.clone();
2416            let consumer = Arc::new(section_consumer::VectorIndexConsumer::new(
2417                store, spill_path,
2418            ));
2419            // Share the spill registry with the search path
2420            self.vector_spill_storages = Some(Arc::clone(consumer.spilled_storages()));
2421            self.buffer_manager.register_consumer(consumer);
2422        }
2423
2424        // Text indexes: same dynamic approach as vector indexes.
2425        #[cfg(all(feature = "lpg", feature = "text-index"))]
2426        if let Some(store) = store_ref {
2427            self.buffer_manager
2428                .register_consumer(Arc::new(section_consumer::TextIndexConsumer::new(store)));
2429        }
2430
2431        // CDC log: register as memory consumer so the buffer manager can
2432        // prune events under memory pressure.
2433        #[cfg(feature = "cdc")]
2434        self.buffer_manager.register_consumer(
2435            Arc::clone(&self.cdc_log) as Arc<dyn grafeo_common::memory::MemoryConsumer>
2436        );
2437    }
2438
2439    /// Applies `TierOverride::ForceDisk` and `TierOverride::ForceRam`
2440    /// overrides at database open time.
2441    ///
2442    /// For each section type in [`Config::section_configs`]:
2443    ///
2444    /// - `ForceDisk`: spills the matching registered consumer (named
2445    ///   `"section:<TypeName>"`) once.
2446    /// - `ForceRam` (Phase 8g): pins the matching consumer in the buffer
2447    ///   manager so subsequent spill loops (pressure-driven, explicit, or
2448    ///   targeted) skip it.
2449    /// - `Auto`: no action; the BufferManager applies its default policy.
2450    ///
2451    /// Must be called after [`Self::register_section_consumers`].
2452    fn apply_force_disk_overrides(&self) {
2453        use grafeo_common::storage::TierOverride;
2454
2455        for (section_type, mem_config) in &self.config.section_configs {
2456            let consumer_name = format!("section:{section_type:?}");
2457            match mem_config.tier {
2458                TierOverride::ForceDisk => {
2459                    #[cfg(feature = "tracing")]
2460                    tracing::info!(
2461                        target: "grafeo::tier",
2462                        section = ?section_type,
2463                        tier = "ForceDisk",
2464                        "applying tier override at db open"
2465                    );
2466                    self.buffer_manager.spill_consumer_by_name(&consumer_name);
2467                }
2468                TierOverride::ForceRam => {
2469                    #[cfg(feature = "tracing")]
2470                    tracing::info!(
2471                        target: "grafeo::tier",
2472                        section = ?section_type,
2473                        tier = "ForceRam",
2474                        "pinning consumer to RAM"
2475                    );
2476                    self.buffer_manager.mark_force_ram(&consumer_name);
2477                }
2478                TierOverride::Auto => {}
2479                _ => {}
2480            }
2481        }
2482    }
2483
2484    /// Reloads spilled consumers back into RAM up to a target memory fraction.
2485    ///
2486    /// Phase 9a: closes the spill / reload loop. After memory pressure drops
2487    /// (e.g. a workload finishes, or a checkpoint freed mutation overlay
2488    /// state), call this to bring spilled section data back into RAM for
2489    /// faster subsequent reads.
2490    ///
2491    /// Walks consumers currently reporting `StorageTier::OnDisk`, in priority
2492    /// order (highest first), reloading each as long as projected memory
2493    /// usage stays below `target_fraction * memory_limit`.
2494    ///
2495    /// Returns the number of consumers successfully reloaded.
2496    ///
2497    /// `target_fraction` is clamped to `[0.0, 1.0]`. A typical value is `0.7`
2498    /// (matching the default `soft_limit_fraction`).
2499    pub fn reload_eligible(&self, target_fraction: f64) -> usize {
2500        self.buffer_manager.reload_eligible(target_fraction)
2501    }
2502
2503    /// Returns the current [`StorageTier`] of every registered section consumer.
2504    ///
2505    /// The map keys are the [`SectionType`]s parsed from each consumer's name
2506    /// (consumers whose names don't follow the `"section:<TypeName>"` convention
2507    /// are skipped). Tier classification is best-effort: a consumer reporting
2508    /// zero `memory_usage()` and `can_spill() == true` is reported as `OnDisk`,
2509    /// otherwise `InMemory` (or `Uninitialized` if both are zero).
2510    ///
2511    /// Useful for tests, observability, and binding-side introspection.
2512    ///
2513    /// [`StorageTier`]: grafeo_common::memory::buffer::StorageTier
2514    /// [`SectionType`]: grafeo_common::storage::SectionType
2515    #[must_use]
2516    pub fn storage_tiers(
2517        &self,
2518    ) -> hashbrown::HashMap<
2519        grafeo_common::storage::SectionType,
2520        grafeo_common::memory::buffer::StorageTier,
2521    > {
2522        use grafeo_common::storage::SectionType;
2523        let snapshot = self.buffer_manager.snapshot_consumer_tiers();
2524        let mut out = hashbrown::HashMap::new();
2525        for (name, tier) in snapshot {
2526            let Some(suffix) = name.strip_prefix("section:") else {
2527                continue;
2528            };
2529            let section_type = match suffix {
2530                "LpgStore" => SectionType::LpgStore,
2531                "RdfStore" => SectionType::RdfStore,
2532                "CompactStore" => SectionType::CompactStore,
2533                "VectorStore" => SectionType::VectorStore,
2534                "TextIndex" => SectionType::TextIndex,
2535                "RdfRing" => SectionType::RdfRing,
2536                "PropertyIndex" => SectionType::PropertyIndex,
2537                "Catalog" => SectionType::Catalog,
2538                _ => continue,
2539            };
2540            out.insert(section_type, tier);
2541        }
2542        out
2543    }
2544
2545    /// Discovers and re-opens spill files from a previous session.
2546    ///
2547    /// When the database was closed with spilled vector embeddings, the
2548    /// `vectors_*.bin` files persist in the spill directory. This method
2549    /// scans for them, opens each as `MmapStorage`, and registers them
2550    /// in the `vector_spill_storages` map so search can read from them.
2551    #[cfg(all(
2552        feature = "lpg",
2553        feature = "vector-index",
2554        feature = "mmap",
2555        not(feature = "temporal")
2556    ))]
2557    fn restore_spill_files(&mut self) {
2558        use grafeo_core::index::vector::MmapStorage;
2559
2560        let spill_dir = match self.buffer_manager.config().spill_path {
2561            Some(ref path) => path.clone(),
2562            None => return,
2563        };
2564
2565        if !spill_dir.exists() {
2566            return;
2567        }
2568
2569        let spill_map = match self.vector_spill_storages {
2570            Some(ref map) => Arc::clone(map),
2571            None => return,
2572        };
2573
2574        let Ok(entries) = std::fs::read_dir(&spill_dir) else {
2575            return;
2576        };
2577
2578        let Some(ref store) = self.store else {
2579            return;
2580        };
2581
2582        for entry in entries.flatten() {
2583            let path = entry.path();
2584            let file_name = match path.file_name().and_then(|n| n.to_str()) {
2585                Some(name) => name.to_string(),
2586                None => continue,
2587            };
2588
2589            // Match pattern: vectors_{key}.bin where key is percent-encoded
2590            if !file_name.starts_with("vectors_")
2591                || !std::path::Path::new(&file_name)
2592                    .extension()
2593                    .is_some_and(|ext| ext.eq_ignore_ascii_case("bin"))
2594            {
2595                continue;
2596            }
2597
2598            // Extract and decode key: "vectors_Label%3Aembedding.bin" -> "Label:embedding"
2599            let key_part = &file_name["vectors_".len()..file_name.len() - ".bin".len()];
2600
2601            // Percent-decode: %3A -> ':', %25 -> '%'
2602            let key = key_part.replace("%3A", ":").replace("%25", "%");
2603
2604            // Key must contain ':' (label:property format)
2605            if !key.contains(':') {
2606                // Legacy file with old encoding, skip (will be re-created on next spill)
2607                continue;
2608            }
2609
2610            // Only restore if the corresponding vector index exists
2611            if store.get_vector_index_by_key(&key).is_none() {
2612                // Stale spill file (index was dropped), clean it up
2613                let _ = std::fs::remove_file(&path);
2614                continue;
2615            }
2616
2617            // Open the MmapStorage
2618            match MmapStorage::open(&path) {
2619                Ok(mmap_storage) => {
2620                    // Mark the property column as spilled so get() returns None
2621                    let property = key.split(':').nth(1).unwrap_or("");
2622                    let prop_key = grafeo_common::types::PropertyKey::new(property);
2623                    store.node_properties_mark_spilled(&prop_key);
2624
2625                    spill_map.write().insert(key, Arc::new(mmap_storage));
2626                }
2627                Err(e) => {
2628                    eprintln!("failed to restore spill file {}: {e}", path.display());
2629                    // Remove corrupt spill file
2630                    let _ = std::fs::remove_file(&path);
2631                }
2632            }
2633        }
2634    }
2635
2636    /// Builds section objects for the current database state.
2637    #[cfg(feature = "grafeo-file")]
2638    fn build_sections(&self) -> Vec<Box<dyn grafeo_common::storage::Section>> {
2639        let mut sections: Vec<Box<dyn grafeo_common::storage::Section>> = Vec::new();
2640
2641        // Layered store: serialize both the compact base and the overlay.
2642        #[cfg(all(feature = "compact-store", feature = "lpg"))]
2643        if let Some(ref layered) = self.layered_store {
2644            // Compact base section.
2645            let compact_section = grafeo_core::graph::compact::section::CompactStoreSection::new(
2646                layered.base_store_arc(),
2647            );
2648            sections.push(Box::new(compact_section));
2649
2650            // Overlay LPG section.
2651            let overlay = layered.overlay_store();
2652            let overlay_section = grafeo_core::graph::lpg::LpgStoreSection::new(overlay);
2653            sections.push(Box::new(overlay_section));
2654
2655            // Overlay deletion log: persists base-node/edge tombstones
2656            // that have not yet been merged into the base. Without this,
2657            // close+reopen silently un-deletes those entities. Only push
2658            // when there is actually something to record so we don't
2659            // emit an empty section on every checkpoint.
2660            let deletions = grafeo_core::graph::compact::deletions_section::OverlayDeletionsSection::from_layered(
2661                Arc::clone(layered),
2662            );
2663            if !deletions.is_empty() {
2664                sections.push(Box::new(deletions));
2665            } else {
2666                // The set may have transitioned from non-empty to empty
2667                // (e.g. a compact merged the deletes); make sure the
2668                // dirty flag is cleared so subsequent checkpoints don't
2669                // think they need to keep flushing.
2670                layered.mark_deletions_clean();
2671            }
2672
2673            return sections;
2674        }
2675
2676        // LPG sections: store, catalog, vector indexes, text indexes
2677        #[cfg(feature = "lpg")]
2678        if let Some(store) = self.store.as_ref() {
2679            let lpg = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
2680
2681            let catalog = catalog_section::CatalogSection::new(
2682                Arc::clone(&self.catalog),
2683                Arc::clone(store),
2684                {
2685                    let tm = Arc::clone(&self.transaction_manager);
2686                    move || tm.current_epoch().as_u64()
2687                },
2688            );
2689
2690            sections.push(Box::new(catalog));
2691            sections.push(Box::new(lpg));
2692
2693            // Vector indexes: persist HNSW topology to avoid rebuild on load
2694            #[cfg(feature = "vector-index")]
2695            {
2696                let indexes = store.vector_index_entries();
2697                if !indexes.is_empty() {
2698                    let vector = grafeo_core::index::vector::VectorStoreSection::new(indexes);
2699                    sections.push(Box::new(vector));
2700                }
2701            }
2702
2703            // Text indexes: persist BM25 postings to avoid rebuild on load
2704            #[cfg(feature = "text-index")]
2705            {
2706                let indexes = store.text_index_entries();
2707                if !indexes.is_empty() {
2708                    let text = grafeo_core::index::text::TextIndexSection::new(indexes);
2709                    sections.push(Box::new(text));
2710                }
2711            }
2712        }
2713
2714        #[cfg(feature = "triple-store")]
2715        if !self.rdf_store.is_empty() || self.rdf_store.graph_count() > 0 {
2716            let rdf = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(&self.rdf_store));
2717            sections.push(Box::new(rdf));
2718        }
2719
2720        #[cfg(feature = "ring-index")]
2721        if self.rdf_store.ring().is_some() {
2722            let ring = grafeo_core::index::ring::RdfRingSection::new(Arc::clone(&self.rdf_store));
2723            sections.push(Box::new(ring));
2724        }
2725
2726        sections
2727    }
2728
2729    // =========================================================================
2730    // Backup API
2731    // =========================================================================
2732
2733    /// Creates a full backup of the database in the given directory.
2734    ///
2735    /// Checkpoints the database, copies the `.grafeo` file, and creates a
2736    /// backup manifest. Subsequent incremental backups will use this as the
2737    /// base.
2738    ///
2739    /// # Errors
2740    ///
2741    /// Returns an error if the database has no file manager or I/O fails.
2742    #[cfg(all(feature = "wal", feature = "grafeo-file", feature = "lpg"))]
2743    pub fn backup_full(&self, backup_dir: &std::path::Path) -> Result<backup::BackupSegment> {
2744        let fm = self
2745            .file_manager
2746            .as_ref()
2747            .ok_or_else(|| Error::Internal("backup requires a persistent database".to_string()))?;
2748
2749        // Checkpoint to ensure the container has the latest data.
2750        // Skip for read-only databases: the on-disk file is already a valid
2751        // snapshot and the file manager rejects writes.
2752        if !self.read_only {
2753            let _ = self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?;
2754        }
2755
2756        let current_epoch = self.transaction_manager.current_epoch();
2757        backup::do_backup_full(backup_dir, fm, self.wal.as_deref(), current_epoch)
2758    }
2759
2760    /// Creates an incremental backup containing WAL records since the last backup.
2761    ///
2762    /// Requires a prior full backup in the backup directory.
2763    ///
2764    /// # Errors
2765    ///
2766    /// Returns an error if no full backup exists, or if the WAL has no new records.
2767    #[cfg(all(feature = "wal", feature = "grafeo-file", feature = "lpg"))]
2768    pub fn backup_incremental(
2769        &self,
2770        backup_dir: &std::path::Path,
2771    ) -> Result<backup::BackupSegment> {
2772        let wal = self
2773            .wal
2774            .as_ref()
2775            .ok_or_else(|| Error::Internal("incremental backup requires WAL".to_string()))?;
2776
2777        let current_epoch = self.transaction_manager.current_epoch();
2778        backup::do_backup_incremental(backup_dir, wal, current_epoch)
2779    }
2780
2781    /// Returns the backup manifest for a backup directory, if one exists.
2782    ///
2783    /// # Errors
2784    ///
2785    /// Returns an error if the manifest file exists but cannot be parsed.
2786    #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2787    pub fn read_backup_manifest(
2788        backup_dir: &std::path::Path,
2789    ) -> Result<Option<backup::BackupManifest>> {
2790        backup::read_manifest(backup_dir)
2791    }
2792
2793    /// Returns the current backup cursor (last backed-up position), if any.
2794    #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2795    #[must_use]
2796    pub fn backup_cursor(&self) -> Option<backup::BackupCursor> {
2797        self.wal
2798            .as_ref()
2799            .and_then(|wal| backup::read_backup_cursor(wal.dir()).ok().flatten())
2800    }
2801
2802    /// Restores a database from a backup chain to a specific epoch.
2803    ///
2804    /// Copies the full backup to `output_path`, then replays incremental
2805    /// WAL segments up to `target_epoch`. The restored database can be
2806    /// opened with [`GrafeoDB::open`].
2807    ///
2808    /// # Errors
2809    ///
2810    /// Returns an error if the backup chain does not cover the target epoch,
2811    /// segment checksums fail, or I/O fails.
2812    #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2813    pub fn restore_to_epoch(
2814        backup_dir: &std::path::Path,
2815        target_epoch: grafeo_common::types::EpochId,
2816        output_path: &std::path::Path,
2817    ) -> Result<()> {
2818        backup::do_restore_to_epoch(backup_dir, target_epoch, output_path)
2819    }
2820
2821    /// Writes the current database state to the `.grafeo` file using the unified flush.
2822    ///
2823    /// Does NOT remove the sidecar WAL: callers that want to clean up
2824    /// the sidecar (e.g. `close()`) should call `fm.remove_sidecar_wal()`
2825    /// separately after this returns.
2826    #[cfg(feature = "grafeo-file")]
2827    fn checkpoint_to_file(
2828        &self,
2829        fm: &GrafeoFileManager,
2830        reason: flush::FlushReason,
2831    ) -> Result<flush::FlushResult> {
2832        let sections = self.build_sections();
2833        let section_refs: Vec<&dyn grafeo_common::storage::Section> =
2834            sections.iter().map(|s| s.as_ref()).collect();
2835        #[cfg(feature = "lpg")]
2836        let context = flush::build_context(self.lpg_store(), &self.transaction_manager);
2837        #[cfg(not(feature = "lpg"))]
2838        let context = flush::build_context_minimal(&self.transaction_manager);
2839
2840        flush::flush(
2841            fm,
2842            &section_refs,
2843            &context,
2844            reason,
2845            #[cfg(feature = "wal")]
2846            self.wal.as_deref(),
2847        )
2848    }
2849
2850    /// Returns the file manager if using single-file format.
2851    #[cfg(feature = "grafeo-file")]
2852    #[must_use]
2853    pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
2854        self.file_manager.as_ref()
2855    }
2856}
2857
2858impl Drop for GrafeoDB {
2859    fn drop(&mut self) {
2860        if let Err(e) = self.close() {
2861            grafeo_error!("Error closing database: {}", e);
2862        }
2863    }
2864}
2865
2866#[cfg(feature = "lpg")]
2867impl crate::admin::AdminService for GrafeoDB {
2868    fn info(&self) -> crate::admin::DatabaseInfo {
2869        self.info()
2870    }
2871
2872    fn detailed_stats(&self) -> crate::admin::DatabaseStats {
2873        self.detailed_stats()
2874    }
2875
2876    fn schema(&self) -> crate::admin::SchemaInfo {
2877        self.schema()
2878    }
2879
2880    fn validate(&self) -> crate::admin::ValidationResult {
2881        self.validate()
2882    }
2883
2884    fn wal_status(&self) -> crate::admin::WalStatus {
2885        self.wal_status()
2886    }
2887
2888    fn wal_checkpoint(&self) -> Result<()> {
2889        self.wal_checkpoint()
2890    }
2891}
2892
2893// =========================================================================
2894// Query Result Types
2895// =========================================================================
2896
2897/// The result of running a query.
2898///
2899/// Contains rows and columns, like a table. Use [`iter()`](Self::iter) to
2900/// loop through rows, or [`scalar()`](Self::scalar) if you expect a single value.
2901///
2902/// # Examples
2903///
2904/// ```
2905/// use grafeo_engine::GrafeoDB;
2906///
2907/// let db = GrafeoDB::new_in_memory();
2908/// db.create_node(&["Person"]);
2909///
2910/// let result = db.execute("MATCH (p:Person) RETURN count(p) AS total")?;
2911///
2912/// // Check what we got
2913/// println!("Columns: {:?}", result.columns);
2914/// println!("Rows: {}", result.row_count());
2915///
2916/// // Iterate through results
2917/// for row in result.iter() {
2918///     println!("{:?}", row);
2919/// }
2920/// # Ok::<(), grafeo_common::utils::error::Error>(())
2921/// ```
2922#[derive(Debug)]
2923pub struct QueryResult {
2924    /// Column names from the RETURN clause.
2925    pub columns: Vec<String>,
2926    /// Column types - useful for distinguishing NodeId/EdgeId from plain integers.
2927    pub column_types: Vec<grafeo_common::types::LogicalType>,
2928    /// The actual result rows.
2929    ///
2930    /// Use [`rows()`](Self::rows) for borrowed access or
2931    /// [`into_rows()`](Self::into_rows) to take ownership.
2932    pub(crate) rows: Vec<Vec<grafeo_common::types::Value>>,
2933    /// Query execution time in milliseconds (if timing was enabled).
2934    pub execution_time_ms: Option<f64>,
2935    /// Number of rows scanned during query execution (estimate).
2936    pub rows_scanned: Option<u64>,
2937    /// Status message for DDL and session commands (e.g., "Created node type 'Person'").
2938    pub status_message: Option<String>,
2939    /// GQLSTATUS code per ISO/IEC 39075:2024, sec 23.
2940    pub gql_status: grafeo_common::utils::GqlStatus,
2941}
2942
2943impl QueryResult {
2944    /// Creates a fully empty query result (no columns, no rows).
2945    #[must_use]
2946    pub fn empty() -> Self {
2947        Self {
2948            columns: Vec::new(),
2949            column_types: Vec::new(),
2950            rows: Vec::new(),
2951            execution_time_ms: None,
2952            rows_scanned: None,
2953            status_message: None,
2954            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2955        }
2956    }
2957
2958    /// Creates a query result with only a status message (for DDL commands).
2959    #[must_use]
2960    pub fn status(msg: impl Into<String>) -> Self {
2961        Self {
2962            columns: Vec::new(),
2963            column_types: Vec::new(),
2964            rows: Vec::new(),
2965            execution_time_ms: None,
2966            rows_scanned: None,
2967            status_message: Some(msg.into()),
2968            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2969        }
2970    }
2971
2972    /// Creates a new empty query result.
2973    #[must_use]
2974    pub fn new(columns: Vec<String>) -> Self {
2975        let len = columns.len();
2976        Self {
2977            columns,
2978            column_types: vec![grafeo_common::types::LogicalType::Any; len],
2979            rows: Vec::new(),
2980            execution_time_ms: None,
2981            rows_scanned: None,
2982            status_message: None,
2983            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2984        }
2985    }
2986
2987    /// Creates a new empty query result with column types.
2988    #[must_use]
2989    pub fn with_types(
2990        columns: Vec<String>,
2991        column_types: Vec<grafeo_common::types::LogicalType>,
2992    ) -> Self {
2993        Self {
2994            columns,
2995            column_types,
2996            rows: Vec::new(),
2997            execution_time_ms: None,
2998            rows_scanned: None,
2999            status_message: None,
3000            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
3001        }
3002    }
3003
3004    /// Creates a query result with pre-populated rows.
3005    #[must_use]
3006    pub fn from_rows(columns: Vec<String>, rows: Vec<Vec<grafeo_common::types::Value>>) -> Self {
3007        let len = columns.len();
3008        Self {
3009            columns,
3010            column_types: vec![grafeo_common::types::LogicalType::Any; len],
3011            rows,
3012            execution_time_ms: None,
3013            rows_scanned: None,
3014            status_message: None,
3015            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
3016        }
3017    }
3018
3019    /// Appends a row to this result.
3020    pub fn push_row(&mut self, row: Vec<grafeo_common::types::Value>) {
3021        self.rows.push(row);
3022    }
3023
3024    /// Sets the execution metrics on this result.
3025    pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
3026        self.execution_time_ms = Some(execution_time_ms);
3027        self.rows_scanned = Some(rows_scanned);
3028        self
3029    }
3030
3031    /// Returns the execution time in milliseconds, if available.
3032    #[must_use]
3033    pub fn execution_time_ms(&self) -> Option<f64> {
3034        self.execution_time_ms
3035    }
3036
3037    /// Returns the number of rows scanned, if available.
3038    #[must_use]
3039    pub fn rows_scanned(&self) -> Option<u64> {
3040        self.rows_scanned
3041    }
3042
3043    /// Returns the number of rows.
3044    #[must_use]
3045    pub fn row_count(&self) -> usize {
3046        self.rows.len()
3047    }
3048
3049    /// Returns the number of columns.
3050    #[must_use]
3051    pub fn column_count(&self) -> usize {
3052        self.columns.len()
3053    }
3054
3055    /// Returns true if the result is empty.
3056    #[must_use]
3057    pub fn is_empty(&self) -> bool {
3058        self.rows.is_empty()
3059    }
3060
3061    /// Extracts a single value from the result.
3062    ///
3063    /// Use this when your query returns exactly one row with one column,
3064    /// like `RETURN count(n)` or `RETURN sum(p.amount)`.
3065    ///
3066    /// # Errors
3067    ///
3068    /// Returns an error if the result has multiple rows or columns.
3069    pub fn scalar<T: FromValue>(&self) -> Result<T> {
3070        if self.rows.len() != 1 || self.columns.len() != 1 {
3071            return Err(grafeo_common::utils::error::Error::InvalidValue(
3072                "Expected single value".to_string(),
3073            ));
3074        }
3075        T::from_value(&self.rows[0][0])
3076    }
3077
3078    /// Returns a slice of all result rows.
3079    #[must_use]
3080    pub fn rows(&self) -> &[Vec<grafeo_common::types::Value>] {
3081        &self.rows
3082    }
3083
3084    /// Takes ownership of all result rows.
3085    #[must_use]
3086    pub fn into_rows(self) -> Vec<Vec<grafeo_common::types::Value>> {
3087        self.rows
3088    }
3089
3090    /// Returns an iterator over the rows.
3091    pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
3092        self.rows.iter()
3093    }
3094
3095    /// Converts this query result to an Arrow [`RecordBatch`](arrow_array::RecordBatch).
3096    ///
3097    /// Each column in the result becomes an Arrow array. Type mapping:
3098    /// - `Int64` / `Float64` / `Bool` / `String` / `Bytes`: direct Arrow equivalents
3099    /// - `Timestamp` / `ZonedDatetime`: `Timestamp(Microsecond, UTC)`
3100    /// - `Date`: `Date32`, `Time`: `Time64(Nanosecond)`
3101    /// - `Vector`: `FixedSizeList(Float32, dim)`
3102    /// - `Duration` / `List` / `Map` / `Path`: serialized as `Utf8`
3103    ///
3104    /// Heterogeneous columns (mixed types) fall back to `Utf8`.
3105    ///
3106    /// # Errors
3107    ///
3108    /// Returns [`ArrowExportError`](arrow::ArrowExportError) if Arrow array construction fails.
3109    #[cfg(feature = "arrow-export")]
3110    pub fn to_record_batch(
3111        &self,
3112    ) -> std::result::Result<arrow_array::RecordBatch, arrow::ArrowExportError> {
3113        arrow::query_result_to_record_batch(&self.columns, &self.column_types, &self.rows)
3114    }
3115
3116    /// Serializes this query result as Arrow IPC stream bytes.
3117    ///
3118    /// The returned bytes can be read by any Arrow implementation:
3119    /// - Python: `pyarrow.ipc.open_stream(buf).read_all()`
3120    /// - Polars: `pl.read_ipc(buf)`
3121    /// - Node.js: `apache-arrow` `RecordBatchStreamReader`
3122    ///
3123    /// # Errors
3124    ///
3125    /// Returns [`ArrowExportError`](arrow::ArrowExportError) on conversion or serialization failure.
3126    #[cfg(feature = "arrow-export")]
3127    pub fn to_arrow_ipc(&self) -> std::result::Result<Vec<u8>, arrow::ArrowExportError> {
3128        let batch = self.to_record_batch()?;
3129        arrow::record_batch_to_ipc_stream(&batch)
3130    }
3131}
3132
3133impl std::fmt::Display for QueryResult {
3134    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3135        let table = grafeo_common::fmt::format_result_table(
3136            &self.columns,
3137            &self.rows,
3138            self.execution_time_ms,
3139            self.status_message.as_deref(),
3140        );
3141        f.write_str(&table)
3142    }
3143}
3144
3145/// Converts a [`grafeo_common::types::Value`] to a concrete Rust type.
3146///
3147/// Implemented for common types like `i64`, `f64`, `String`, and `bool`.
3148/// Used by [`QueryResult::scalar()`] to extract typed values.
3149pub trait FromValue: Sized {
3150    /// Attempts the conversion, returning an error on type mismatch.
3151    ///
3152    /// # Errors
3153    ///
3154    /// Returns `Error::TypeMismatch` if the value is not the expected type.
3155    fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
3156}
3157
3158impl FromValue for i64 {
3159    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
3160        value
3161            .as_int64()
3162            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
3163                expected: "INT64".to_string(),
3164                found: value.type_name().to_string(),
3165            })
3166    }
3167}
3168
3169impl FromValue for f64 {
3170    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
3171        value
3172            .as_float64()
3173            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
3174                expected: "FLOAT64".to_string(),
3175                found: value.type_name().to_string(),
3176            })
3177    }
3178}
3179
3180impl FromValue for String {
3181    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
3182        value.as_str().map(String::from).ok_or_else(|| {
3183            grafeo_common::utils::error::Error::TypeMismatch {
3184                expected: "STRING".to_string(),
3185                found: value.type_name().to_string(),
3186            }
3187        })
3188    }
3189}
3190
3191impl FromValue for bool {
3192    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
3193        value
3194            .as_bool()
3195            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
3196                expected: "BOOL".to_string(),
3197                found: value.type_name().to_string(),
3198            })
3199    }
3200}
3201
3202#[cfg(test)]
3203mod tests {
3204    use super::*;
3205
3206    #[test]
3207    fn test_create_in_memory_database() {
3208        let db = GrafeoDB::new_in_memory();
3209        assert_eq!(db.node_count(), 0);
3210        assert_eq!(db.edge_count(), 0);
3211    }
3212
3213    #[test]
3214    fn test_database_config() {
3215        let config = Config::in_memory().with_threads(4).with_query_logging();
3216
3217        let db = GrafeoDB::with_config(config).unwrap();
3218        assert_eq!(db.config().threads, 4);
3219        assert!(db.config().query_logging);
3220    }
3221
3222    #[test]
3223    fn test_database_session() {
3224        let db = GrafeoDB::new_in_memory();
3225        let _session = db.session();
3226        // Session should be created successfully
3227    }
3228
3229    #[cfg(feature = "wal")]
3230    #[test]
3231    fn test_persistent_database_recovery() {
3232        use grafeo_common::types::Value;
3233        use tempfile::tempdir;
3234
3235        let dir = tempdir().unwrap();
3236        let db_path = dir.path().join("test_db");
3237
3238        // Create database and add some data
3239        {
3240            let db = GrafeoDB::open(&db_path).unwrap();
3241
3242            let alix = db.create_node(&["Person"]);
3243            db.set_node_property(alix, "name", Value::from("Alix"));
3244
3245            let gus = db.create_node(&["Person"]);
3246            db.set_node_property(gus, "name", Value::from("Gus"));
3247
3248            let _edge = db.create_edge(alix, gus, "KNOWS");
3249
3250            // Explicitly close to flush WAL
3251            db.close().unwrap();
3252        }
3253
3254        // Reopen and verify data was recovered
3255        {
3256            let db = GrafeoDB::open(&db_path).unwrap();
3257
3258            assert_eq!(db.node_count(), 2);
3259            assert_eq!(db.edge_count(), 1);
3260
3261            // Verify nodes exist
3262            let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
3263            assert!(node0.is_some());
3264
3265            let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
3266            assert!(node1.is_some());
3267        }
3268    }
3269
3270    #[cfg(feature = "wal")]
3271    #[test]
3272    fn test_wal_logging() {
3273        use tempfile::tempdir;
3274
3275        let dir = tempdir().unwrap();
3276        let db_path = dir.path().join("wal_test_db");
3277
3278        let db = GrafeoDB::open(&db_path).unwrap();
3279
3280        // Create some data
3281        let node = db.create_node(&["Test"]);
3282        db.delete_node(node);
3283
3284        // WAL should have records
3285        if let Some(wal) = db.wal() {
3286            assert!(wal.record_count() > 0);
3287        }
3288
3289        db.close().unwrap();
3290    }
3291
3292    #[cfg(feature = "wal")]
3293    #[test]
3294    fn test_wal_recovery_multiple_sessions() {
3295        // Tests that WAL recovery works correctly across multiple open/close cycles
3296        use grafeo_common::types::Value;
3297        use tempfile::tempdir;
3298
3299        let dir = tempdir().unwrap();
3300        let db_path = dir.path().join("multi_session_db");
3301
3302        // Session 1: Create initial data
3303        {
3304            let db = GrafeoDB::open(&db_path).unwrap();
3305            let alix = db.create_node(&["Person"]);
3306            db.set_node_property(alix, "name", Value::from("Alix"));
3307            db.close().unwrap();
3308        }
3309
3310        // Session 2: Add more data
3311        {
3312            let db = GrafeoDB::open(&db_path).unwrap();
3313            assert_eq!(db.node_count(), 1); // Previous data recovered
3314            let gus = db.create_node(&["Person"]);
3315            db.set_node_property(gus, "name", Value::from("Gus"));
3316            db.close().unwrap();
3317        }
3318
3319        // Session 3: Verify all data
3320        {
3321            let db = GrafeoDB::open(&db_path).unwrap();
3322            assert_eq!(db.node_count(), 2);
3323
3324            // Verify properties were recovered correctly
3325            let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
3326            assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
3327
3328            let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
3329            assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
3330        }
3331    }
3332
3333    #[cfg(feature = "wal")]
3334    #[test]
3335    fn test_database_consistency_after_mutations() {
3336        // Tests that database remains consistent after a series of create/delete operations
3337        use grafeo_common::types::Value;
3338        use tempfile::tempdir;
3339
3340        let dir = tempdir().unwrap();
3341        let db_path = dir.path().join("consistency_db");
3342
3343        {
3344            let db = GrafeoDB::open(&db_path).unwrap();
3345
3346            // Create nodes
3347            let a = db.create_node(&["Node"]);
3348            let b = db.create_node(&["Node"]);
3349            let c = db.create_node(&["Node"]);
3350
3351            // Create edges
3352            let e1 = db.create_edge(a, b, "LINKS");
3353            let _e2 = db.create_edge(b, c, "LINKS");
3354
3355            // Delete middle node and its edge
3356            db.delete_edge(e1);
3357            db.delete_node(b);
3358
3359            // Set properties on remaining nodes
3360            db.set_node_property(a, "value", Value::Int64(1));
3361            db.set_node_property(c, "value", Value::Int64(3));
3362
3363            db.close().unwrap();
3364        }
3365
3366        // Reopen and verify consistency
3367        {
3368            let db = GrafeoDB::open(&db_path).unwrap();
3369
3370            // Should have 2 nodes (a and c), b was deleted
3371            // Note: node_count includes deleted nodes in some implementations
3372            // What matters is that the non-deleted nodes are accessible
3373            let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
3374            assert!(node_a.is_some());
3375
3376            let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
3377            assert!(node_c.is_some());
3378
3379            // Middle node should be deleted
3380            let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
3381            assert!(node_b.is_none());
3382        }
3383    }
3384
3385    #[cfg(feature = "wal")]
3386    #[test]
3387    fn test_close_is_idempotent() {
3388        // Calling close() multiple times should not cause errors
3389        use tempfile::tempdir;
3390
3391        let dir = tempdir().unwrap();
3392        let db_path = dir.path().join("close_test_db");
3393
3394        let db = GrafeoDB::open(&db_path).unwrap();
3395        db.create_node(&["Test"]);
3396
3397        // First close should succeed
3398        assert!(db.close().is_ok());
3399
3400        // Second close should also succeed (idempotent)
3401        assert!(db.close().is_ok());
3402    }
3403
3404    #[test]
3405    fn test_with_store_external_backend() {
3406        use grafeo_core::graph::lpg::LpgStore;
3407
3408        let external = Arc::new(LpgStore::new().unwrap());
3409
3410        // Seed data on the external store directly
3411        let n1 = external.create_node(&["Person"]);
3412        external.set_node_property(n1, "name", grafeo_common::types::Value::from("Alix"));
3413
3414        let db = GrafeoDB::with_store(
3415            Arc::clone(&external) as Arc<dyn GraphStoreMut>,
3416            Config::in_memory(),
3417        )
3418        .unwrap();
3419
3420        let session = db.session();
3421
3422        // Session should see data from the external store via execute
3423        #[cfg(feature = "gql")]
3424        {
3425            let result = session.execute("MATCH (p:Person) RETURN p.name").unwrap();
3426            assert_eq!(result.rows.len(), 1);
3427        }
3428    }
3429
3430    #[test]
3431    fn test_with_config_custom_memory_limit() {
3432        let config = Config::in_memory().with_memory_limit(64 * 1024 * 1024); // 64 MB
3433
3434        let db = GrafeoDB::with_config(config).unwrap();
3435        assert_eq!(db.config().memory_limit, Some(64 * 1024 * 1024));
3436        assert_eq!(db.node_count(), 0);
3437    }
3438
3439    #[cfg(feature = "metrics")]
3440    #[test]
3441    fn test_database_metrics_registry() {
3442        let db = GrafeoDB::new_in_memory();
3443
3444        // Perform some operations
3445        db.create_node(&["Person"]);
3446        db.create_node(&["Person"]);
3447
3448        // Check that metrics snapshot returns data
3449        let snap = db.metrics();
3450        // Session created counter should reflect at least 0 (metrics is initialized)
3451        assert_eq!(snap.query_count, 0); // No queries executed yet
3452    }
3453
3454    #[test]
3455    fn test_query_result_has_metrics() {
3456        // Verifies that query results include execution metrics
3457        let db = GrafeoDB::new_in_memory();
3458        db.create_node(&["Person"]);
3459        db.create_node(&["Person"]);
3460
3461        #[cfg(feature = "gql")]
3462        {
3463            let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
3464
3465            // Metrics should be populated
3466            assert!(result.execution_time_ms.is_some());
3467            assert!(result.rows_scanned.is_some());
3468            assert!(result.execution_time_ms.unwrap() >= 0.0);
3469            assert_eq!(result.rows_scanned.unwrap(), 2);
3470        }
3471    }
3472
3473    #[test]
3474    fn test_empty_query_result_metrics() {
3475        // Verifies metrics are correct for queries returning no results
3476        let db = GrafeoDB::new_in_memory();
3477        db.create_node(&["Person"]);
3478
3479        #[cfg(feature = "gql")]
3480        {
3481            // Query that matches nothing
3482            let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
3483
3484            assert!(result.execution_time_ms.is_some());
3485            assert!(result.rows_scanned.is_some());
3486            assert_eq!(result.rows_scanned.unwrap(), 0);
3487        }
3488    }
3489
3490    #[cfg(feature = "cdc")]
3491    mod cdc_integration {
3492        use super::*;
3493
3494        /// Helper: creates an in-memory database with CDC enabled.
3495        fn cdc_db() -> GrafeoDB {
3496            GrafeoDB::with_config(Config::in_memory().with_cdc()).unwrap()
3497        }
3498
3499        #[test]
3500        fn test_node_lifecycle_history() {
3501            let db = cdc_db();
3502
3503            // Create
3504            let id = db.create_node(&["Person"]);
3505            // Update
3506            db.set_node_property(id, "name", "Alix".into());
3507            db.set_node_property(id, "name", "Gus".into());
3508            // Delete
3509            db.delete_node(id);
3510
3511            let history = db.history(id).unwrap();
3512            assert_eq!(history.len(), 4); // create + 2 updates + delete
3513            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3514            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
3515            assert!(history[1].before.is_none()); // first set_node_property has no prior value
3516            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
3517            assert!(history[2].before.is_some()); // second update has prior "Alix"
3518            assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
3519        }
3520
3521        #[test]
3522        fn test_edge_lifecycle_history() {
3523            let db = cdc_db();
3524
3525            let alix = db.create_node(&["Person"]);
3526            let gus = db.create_node(&["Person"]);
3527            let edge = db.create_edge(alix, gus, "KNOWS");
3528            db.set_edge_property(edge, "since", 2024i64.into());
3529            db.delete_edge(edge);
3530
3531            let history = db.history(edge).unwrap();
3532            assert_eq!(history.len(), 3); // create + update + delete
3533            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3534            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
3535            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
3536        }
3537
3538        #[test]
3539        fn test_create_node_with_props_cdc() {
3540            let db = cdc_db();
3541
3542            let id = db.create_node_with_props(
3543                &["Person"],
3544                vec![
3545                    ("name", grafeo_common::types::Value::from("Alix")),
3546                    ("age", grafeo_common::types::Value::from(30i64)),
3547                ],
3548            );
3549
3550            let history = db.history(id).unwrap();
3551            assert_eq!(history.len(), 1);
3552            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3553            // Props should be captured
3554            let after = history[0].after.as_ref().unwrap();
3555            assert_eq!(after.len(), 2);
3556        }
3557
3558        #[test]
3559        fn test_changes_between() {
3560            let db = cdc_db();
3561
3562            let id1 = db.create_node(&["A"]);
3563            let _id2 = db.create_node(&["B"]);
3564            db.set_node_property(id1, "x", 1i64.into());
3565
3566            // All events should be at the same epoch (in-memory, epoch doesn't advance without tx)
3567            let changes = db
3568                .changes_between(
3569                    grafeo_common::types::EpochId(0),
3570                    grafeo_common::types::EpochId(u64::MAX),
3571                )
3572                .unwrap();
3573            assert_eq!(changes.len(), 3); // 2 creates + 1 update
3574        }
3575
3576        #[test]
3577        fn test_cdc_disabled_by_default() {
3578            let db = GrafeoDB::new_in_memory();
3579            assert!(!db.is_cdc_enabled());
3580
3581            let id = db.create_node(&["Person"]);
3582            db.set_node_property(id, "name", "Alix".into());
3583
3584            let history = db.history(id).unwrap();
3585            assert!(history.is_empty(), "CDC off by default: no events recorded");
3586        }
3587
3588        #[test]
3589        fn test_session_with_cdc_override_on() {
3590            // Database default is off, but session opts in
3591            let db = GrafeoDB::new_in_memory();
3592            let session = db.session_with_cdc(true);
3593            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3594            // The CDC log should have events from the opted-in session
3595            let changes = db
3596                .changes_between(
3597                    grafeo_common::types::EpochId(0),
3598                    grafeo_common::types::EpochId(u64::MAX),
3599                )
3600                .unwrap();
3601            assert!(
3602                !changes.is_empty(),
3603                "session_with_cdc(true) should record events"
3604            );
3605        }
3606
3607        #[test]
3608        fn test_session_with_cdc_override_off() {
3609            // Database default is on, but session opts out
3610            let db = cdc_db();
3611            let session = db.session_with_cdc(false);
3612            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3613            let changes = db
3614                .changes_between(
3615                    grafeo_common::types::EpochId(0),
3616                    grafeo_common::types::EpochId(u64::MAX),
3617                )
3618                .unwrap();
3619            assert!(
3620                changes.is_empty(),
3621                "session_with_cdc(false) should not record events"
3622            );
3623        }
3624
3625        #[test]
3626        fn test_set_cdc_enabled_runtime() {
3627            let db = GrafeoDB::new_in_memory();
3628            assert!(!db.is_cdc_enabled());
3629
3630            // Enable at runtime
3631            db.set_cdc_enabled(true);
3632            assert!(db.is_cdc_enabled());
3633
3634            let id = db.create_node(&["Person"]);
3635            let history = db.history(id).unwrap();
3636            assert_eq!(history.len(), 1, "CDC enabled at runtime records events");
3637
3638            // Disable again
3639            db.set_cdc_enabled(false);
3640            let id2 = db.create_node(&["Person"]);
3641            let history2 = db.history(id2).unwrap();
3642            assert!(
3643                history2.is_empty(),
3644                "CDC disabled at runtime stops recording"
3645            );
3646        }
3647    }
3648
3649    #[test]
3650    fn test_with_store_basic() {
3651        use grafeo_core::graph::lpg::LpgStore;
3652
3653        let store = Arc::new(LpgStore::new().unwrap());
3654        let n1 = store.create_node(&["Person"]);
3655        store.set_node_property(n1, "name", "Alix".into());
3656
3657        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
3658        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
3659
3660        let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
3661        assert_eq!(result.rows.len(), 1);
3662    }
3663
3664    #[test]
3665    fn test_with_store_session() {
3666        use grafeo_core::graph::lpg::LpgStore;
3667
3668        let store = Arc::new(LpgStore::new().unwrap());
3669        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
3670        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
3671
3672        let session = db.session();
3673        let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
3674        assert_eq!(result.rows.len(), 1);
3675    }
3676
3677    #[test]
3678    fn test_with_store_mutations() {
3679        use grafeo_core::graph::lpg::LpgStore;
3680
3681        let store = Arc::new(LpgStore::new().unwrap());
3682        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
3683        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
3684
3685        let mut session = db.session();
3686
3687        // Use an explicit transaction so INSERT and MATCH share the same
3688        // transaction context. With PENDING epochs, uncommitted versions are
3689        // only visible to the owning transaction.
3690        session.begin_transaction().unwrap();
3691        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3692
3693        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
3694        assert_eq!(result.rows.len(), 1);
3695
3696        session.commit().unwrap();
3697    }
3698
3699    // =========================================================================
3700    // QueryResult tests
3701    // =========================================================================
3702
3703    #[test]
3704    fn test_query_result_empty() {
3705        let result = QueryResult::empty();
3706        assert!(result.is_empty());
3707        assert_eq!(result.row_count(), 0);
3708        assert_eq!(result.column_count(), 0);
3709        assert!(result.execution_time_ms().is_none());
3710        assert!(result.rows_scanned().is_none());
3711        assert!(result.status_message.is_none());
3712    }
3713
3714    #[test]
3715    fn test_query_result_status() {
3716        let result = QueryResult::status("Created node type 'Person'");
3717        assert!(result.is_empty());
3718        assert_eq!(result.column_count(), 0);
3719        assert_eq!(
3720            result.status_message.as_deref(),
3721            Some("Created node type 'Person'")
3722        );
3723    }
3724
3725    #[test]
3726    fn test_query_result_new_with_columns() {
3727        let result = QueryResult::new(vec!["name".into(), "age".into()]);
3728        assert_eq!(result.column_count(), 2);
3729        assert_eq!(result.row_count(), 0);
3730        assert!(result.is_empty());
3731        // Column types should default to Any
3732        assert_eq!(
3733            result.column_types,
3734            vec![
3735                grafeo_common::types::LogicalType::Any,
3736                grafeo_common::types::LogicalType::Any
3737            ]
3738        );
3739    }
3740
3741    #[test]
3742    fn test_query_result_with_types() {
3743        use grafeo_common::types::LogicalType;
3744        let result = QueryResult::with_types(
3745            vec!["name".into(), "age".into()],
3746            vec![LogicalType::String, LogicalType::Int64],
3747        );
3748        assert_eq!(result.column_count(), 2);
3749        assert_eq!(result.column_types[0], LogicalType::String);
3750        assert_eq!(result.column_types[1], LogicalType::Int64);
3751    }
3752
3753    #[test]
3754    fn test_query_result_with_metrics() {
3755        let result = QueryResult::new(vec!["x".into()]).with_metrics(42.5, 100);
3756        assert_eq!(result.execution_time_ms(), Some(42.5));
3757        assert_eq!(result.rows_scanned(), Some(100));
3758    }
3759
3760    #[test]
3761    fn test_query_result_scalar_success() {
3762        use grafeo_common::types::Value;
3763        let mut result = QueryResult::new(vec!["count".into()]);
3764        result.rows.push(vec![Value::Int64(42)]);
3765
3766        let val: i64 = result.scalar().unwrap();
3767        assert_eq!(val, 42);
3768    }
3769
3770    #[test]
3771    fn test_query_result_scalar_wrong_shape() {
3772        use grafeo_common::types::Value;
3773        // Multiple rows
3774        let mut result = QueryResult::new(vec!["x".into()]);
3775        result.rows.push(vec![Value::Int64(1)]);
3776        result.rows.push(vec![Value::Int64(2)]);
3777        assert!(result.scalar::<i64>().is_err());
3778
3779        // Multiple columns
3780        let mut result2 = QueryResult::new(vec!["a".into(), "b".into()]);
3781        result2.rows.push(vec![Value::Int64(1), Value::Int64(2)]);
3782        assert!(result2.scalar::<i64>().is_err());
3783
3784        // Empty
3785        let result3 = QueryResult::new(vec!["x".into()]);
3786        assert!(result3.scalar::<i64>().is_err());
3787    }
3788
3789    #[test]
3790    fn test_query_result_iter() {
3791        use grafeo_common::types::Value;
3792        let mut result = QueryResult::new(vec!["x".into()]);
3793        result.rows.push(vec![Value::Int64(1)]);
3794        result.rows.push(vec![Value::Int64(2)]);
3795
3796        let collected: Vec<_> = result.iter().collect();
3797        assert_eq!(collected.len(), 2);
3798    }
3799
3800    #[test]
3801    fn test_query_result_display() {
3802        use grafeo_common::types::Value;
3803        let mut result = QueryResult::new(vec!["name".into()]);
3804        result.rows.push(vec![Value::from("Alix")]);
3805        let display = result.to_string();
3806        assert!(display.contains("name"));
3807        assert!(display.contains("Alix"));
3808    }
3809
3810    // =========================================================================
3811    // FromValue error paths
3812    // =========================================================================
3813
3814    #[test]
3815    fn test_from_value_i64_type_mismatch() {
3816        use grafeo_common::types::Value;
3817        let val = Value::from("not a number");
3818        assert!(i64::from_value(&val).is_err());
3819    }
3820
3821    #[test]
3822    fn test_from_value_f64_type_mismatch() {
3823        use grafeo_common::types::Value;
3824        let val = Value::from("not a float");
3825        assert!(f64::from_value(&val).is_err());
3826    }
3827
3828    #[test]
3829    fn test_from_value_string_type_mismatch() {
3830        use grafeo_common::types::Value;
3831        let val = Value::Int64(42);
3832        assert!(String::from_value(&val).is_err());
3833    }
3834
3835    #[test]
3836    fn test_from_value_bool_type_mismatch() {
3837        use grafeo_common::types::Value;
3838        let val = Value::Int64(1);
3839        assert!(bool::from_value(&val).is_err());
3840    }
3841
3842    #[test]
3843    fn test_from_value_all_success() {
3844        use grafeo_common::types::Value;
3845        assert_eq!(i64::from_value(&Value::Int64(99)).unwrap(), 99);
3846        assert!((f64::from_value(&Value::Float64(2.72)).unwrap() - 2.72).abs() < f64::EPSILON);
3847        assert_eq!(String::from_value(&Value::from("hello")).unwrap(), "hello");
3848        assert!(bool::from_value(&Value::Bool(true)).unwrap());
3849    }
3850
3851    // =========================================================================
3852    // GrafeoDB accessor tests
3853    // =========================================================================
3854
3855    #[test]
3856    fn test_database_is_read_only_false_by_default() {
3857        let db = GrafeoDB::new_in_memory();
3858        assert!(!db.is_read_only());
3859    }
3860
3861    #[test]
3862    fn test_database_graph_model() {
3863        let db = GrafeoDB::new_in_memory();
3864        assert_eq!(db.graph_model(), crate::config::GraphModel::Lpg);
3865    }
3866
3867    #[test]
3868    fn test_database_memory_limit_none_by_default() {
3869        let db = GrafeoDB::new_in_memory();
3870        assert!(db.memory_limit().is_none());
3871    }
3872
3873    #[test]
3874    fn test_database_memory_limit_custom() {
3875        let config = Config::in_memory().with_memory_limit(128 * 1024 * 1024);
3876        let db = GrafeoDB::with_config(config).unwrap();
3877        assert_eq!(db.memory_limit(), Some(128 * 1024 * 1024));
3878    }
3879
3880    #[test]
3881    fn test_database_adaptive_config() {
3882        let db = GrafeoDB::new_in_memory();
3883        let adaptive = db.adaptive_config();
3884        assert!(adaptive.enabled);
3885        assert!((adaptive.threshold - 3.0).abs() < f64::EPSILON);
3886    }
3887
3888    #[test]
3889    fn test_database_buffer_manager() {
3890        let db = GrafeoDB::new_in_memory();
3891        let _bm = db.buffer_manager();
3892        // Just verify it doesn't panic
3893    }
3894
3895    #[test]
3896    fn test_database_query_cache() {
3897        let db = GrafeoDB::new_in_memory();
3898        let _qc = db.query_cache();
3899    }
3900
3901    #[test]
3902    fn test_database_clear_plan_cache() {
3903        let db = GrafeoDB::new_in_memory();
3904        // Execute a query to populate the cache
3905        #[cfg(feature = "gql")]
3906        {
3907            let _ = db.execute("MATCH (n) RETURN count(n)");
3908        }
3909        db.clear_plan_cache();
3910        // No panic means success
3911    }
3912
3913    #[test]
3914    fn test_database_gc() {
3915        let db = GrafeoDB::new_in_memory();
3916        db.create_node(&["Person"]);
3917        db.gc();
3918        // Verify no panic, node still accessible
3919        assert_eq!(db.node_count(), 1);
3920    }
3921
3922    // =========================================================================
3923    // Named graph management
3924    // =========================================================================
3925
3926    #[test]
3927    fn test_create_and_list_graphs() {
3928        let db = GrafeoDB::new_in_memory();
3929        let created = db.create_graph("social").unwrap();
3930        assert!(created);
3931
3932        // Creating same graph again returns false
3933        let created_again = db.create_graph("social").unwrap();
3934        assert!(!created_again);
3935
3936        let names = db.list_graphs();
3937        assert!(names.contains(&"social".to_string()));
3938    }
3939
3940    #[test]
3941    fn test_drop_graph() {
3942        let db = GrafeoDB::new_in_memory();
3943        db.create_graph("temp").unwrap();
3944        assert!(db.drop_graph("temp"));
3945        assert!(!db.drop_graph("temp")); // Already dropped
3946    }
3947
3948    #[test]
3949    fn test_drop_graph_resets_current_graph() {
3950        let db = GrafeoDB::new_in_memory();
3951        db.create_graph("active").unwrap();
3952        db.set_current_graph(Some("active")).unwrap();
3953        assert_eq!(db.current_graph(), Some("active".to_string()));
3954
3955        db.drop_graph("active");
3956        assert_eq!(db.current_graph(), None);
3957    }
3958
3959    // =========================================================================
3960    // Current graph / schema context
3961    // =========================================================================
3962
3963    #[test]
3964    fn test_current_graph_default_none() {
3965        let db = GrafeoDB::new_in_memory();
3966        assert_eq!(db.current_graph(), None);
3967    }
3968
3969    #[test]
3970    fn test_set_current_graph_valid() {
3971        let db = GrafeoDB::new_in_memory();
3972        db.create_graph("social").unwrap();
3973        db.set_current_graph(Some("social")).unwrap();
3974        assert_eq!(db.current_graph(), Some("social".to_string()));
3975    }
3976
3977    #[test]
3978    fn test_set_current_graph_nonexistent() {
3979        let db = GrafeoDB::new_in_memory();
3980        let result = db.set_current_graph(Some("nonexistent"));
3981        assert!(result.is_err());
3982    }
3983
3984    #[test]
3985    fn test_set_current_graph_none_resets() {
3986        let db = GrafeoDB::new_in_memory();
3987        db.create_graph("social").unwrap();
3988        db.set_current_graph(Some("social")).unwrap();
3989        db.set_current_graph(None).unwrap();
3990        assert_eq!(db.current_graph(), None);
3991    }
3992
3993    #[test]
3994    fn test_set_current_graph_default_keyword() {
3995        let db = GrafeoDB::new_in_memory();
3996        // "default" is a special case that always succeeds
3997        db.set_current_graph(Some("default")).unwrap();
3998        assert_eq!(db.current_graph(), Some("default".to_string()));
3999    }
4000
4001    #[test]
4002    fn test_current_schema_default_none() {
4003        let db = GrafeoDB::new_in_memory();
4004        assert_eq!(db.current_schema(), None);
4005    }
4006
4007    #[test]
4008    fn test_set_current_schema_nonexistent() {
4009        let db = GrafeoDB::new_in_memory();
4010        let result = db.set_current_schema(Some("nonexistent"));
4011        assert!(result.is_err());
4012    }
4013
4014    #[test]
4015    fn test_set_current_schema_none_resets() {
4016        let db = GrafeoDB::new_in_memory();
4017        db.set_current_schema(None).unwrap();
4018        assert_eq!(db.current_schema(), None);
4019    }
4020
4021    // =========================================================================
4022    // graph_store / graph_store_mut
4023    // =========================================================================
4024
4025    #[test]
4026    fn test_graph_store_returns_lpg_by_default() {
4027        let db = GrafeoDB::new_in_memory();
4028        db.create_node(&["Person"]);
4029        let store = db.graph_store();
4030        assert_eq!(store.node_count(), 1);
4031    }
4032
4033    #[test]
4034    fn test_graph_store_mut_returns_some_by_default() {
4035        let db = GrafeoDB::new_in_memory();
4036        assert!(db.graph_store_mut().is_some());
4037    }
4038
4039    #[test]
4040    fn test_with_read_store() {
4041        use grafeo_core::graph::lpg::LpgStore;
4042
4043        let store = Arc::new(LpgStore::new().unwrap());
4044        store.create_node(&["Person"]);
4045
4046        let read_store = Arc::clone(&store) as Arc<dyn GraphStoreSearch>;
4047        let db = GrafeoDB::with_read_store(read_store, Config::in_memory()).unwrap();
4048
4049        assert!(db.is_read_only());
4050        assert!(db.graph_store_mut().is_none());
4051
4052        // Read queries should work
4053        let gs = db.graph_store();
4054        assert_eq!(gs.node_count(), 1);
4055    }
4056
4057    #[test]
4058    fn test_with_store_graph_store_methods() {
4059        use grafeo_core::graph::lpg::LpgStore;
4060
4061        let store = Arc::new(LpgStore::new().unwrap());
4062        store.create_node(&["Person"]);
4063
4064        let db = GrafeoDB::with_store(
4065            Arc::clone(&store) as Arc<dyn GraphStoreMut>,
4066            Config::in_memory(),
4067        )
4068        .unwrap();
4069
4070        assert!(!db.is_read_only());
4071        assert!(db.graph_store_mut().is_some());
4072        assert_eq!(db.graph_store().node_count(), 1);
4073    }
4074
4075    // =========================================================================
4076    // session_read_only
4077    // =========================================================================
4078
4079    #[test]
4080    #[allow(deprecated)]
4081    fn test_session_read_only() {
4082        let db = GrafeoDB::new_in_memory();
4083        db.create_node(&["Person"]);
4084
4085        let session = db.session_read_only();
4086        // Read queries should work
4087        #[cfg(feature = "gql")]
4088        {
4089            let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
4090            assert_eq!(result.rows.len(), 1);
4091        }
4092    }
4093
4094    // =========================================================================
4095    // close on in-memory database
4096    // =========================================================================
4097
4098    #[test]
4099    fn test_close_in_memory_database() {
4100        let db = GrafeoDB::new_in_memory();
4101        db.create_node(&["Person"]);
4102        assert!(db.close().is_ok());
4103        // Second close should also be fine (idempotent)
4104        assert!(db.close().is_ok());
4105    }
4106
4107    // =========================================================================
4108    // with_config validation failure
4109    // =========================================================================
4110
4111    #[test]
4112    fn test_with_config_invalid_config_zero_threads() {
4113        let config = Config::in_memory().with_threads(0);
4114        let result = GrafeoDB::with_config(config);
4115        assert!(result.is_err());
4116    }
4117
4118    #[test]
4119    fn test_with_config_invalid_config_zero_memory_limit() {
4120        let config = Config::in_memory().with_memory_limit(0);
4121        let result = GrafeoDB::with_config(config);
4122        assert!(result.is_err());
4123    }
4124
4125    // =========================================================================
4126    // StorageFormat display (for config.rs coverage)
4127    // =========================================================================
4128
4129    #[test]
4130    fn test_storage_format_display() {
4131        use crate::config::StorageFormat;
4132        assert_eq!(StorageFormat::Auto.to_string(), "auto");
4133        assert_eq!(StorageFormat::WalDirectory.to_string(), "wal-directory");
4134        assert_eq!(StorageFormat::SingleFile.to_string(), "single-file");
4135    }
4136
4137    #[test]
4138    fn test_storage_format_default() {
4139        use crate::config::StorageFormat;
4140        assert_eq!(StorageFormat::default(), StorageFormat::Auto);
4141    }
4142
4143    #[test]
4144    fn test_config_with_storage_format() {
4145        use crate::config::StorageFormat;
4146        let config = Config::in_memory().with_storage_format(StorageFormat::SingleFile);
4147        assert_eq!(config.storage_format, StorageFormat::SingleFile);
4148    }
4149
4150    // =========================================================================
4151    // Config CDC
4152    // =========================================================================
4153
4154    #[test]
4155    fn test_config_with_cdc() {
4156        let config = Config::in_memory().with_cdc();
4157        assert!(config.cdc_enabled);
4158    }
4159
4160    #[test]
4161    fn test_config_cdc_default_false() {
4162        let config = Config::in_memory();
4163        assert!(!config.cdc_enabled);
4164    }
4165
4166    // =========================================================================
4167    // ConfigError as std::error::Error
4168    // =========================================================================
4169
4170    #[test]
4171    fn test_config_error_is_error_trait() {
4172        use crate::config::ConfigError;
4173        let err: Box<dyn std::error::Error> = Box::new(ConfigError::ZeroMemoryLimit);
4174        assert!(err.source().is_none());
4175    }
4176
4177    // =========================================================================
4178    // Metrics tests
4179    // =========================================================================
4180
4181    #[cfg(feature = "metrics")]
4182    #[test]
4183    fn test_metrics_prometheus_output() {
4184        let db = GrafeoDB::new_in_memory();
4185        let prom = db.metrics_prometheus();
4186        // Should contain at least some metric names
4187        assert!(!prom.is_empty());
4188    }
4189
4190    #[cfg(feature = "metrics")]
4191    #[test]
4192    fn test_reset_metrics() {
4193        let db = GrafeoDB::new_in_memory();
4194        // Execute something to generate metrics
4195        let _session = db.session();
4196        db.reset_metrics();
4197        let snap = db.metrics();
4198        assert_eq!(snap.query_count, 0);
4199    }
4200
4201    // =========================================================================
4202    // drop_graph on external store
4203    // =========================================================================
4204
4205    #[test]
4206    fn test_drop_graph_on_external_store() {
4207        use grafeo_core::graph::lpg::LpgStore;
4208
4209        let store = Arc::new(LpgStore::new().unwrap());
4210        let read_store = Arc::clone(&store) as Arc<dyn GraphStoreSearch>;
4211        let db = GrafeoDB::with_read_store(read_store, Config::in_memory()).unwrap();
4212
4213        // drop_graph with external store (no built-in store) returns false
4214        assert!(!db.drop_graph("anything"));
4215    }
4216}