Skip to main content

grafeo_engine/database/
mod.rs

1//! The main database struct and operations.
2//!
3//! Start here with [`GrafeoDB`] - it's your handle to everything.
4//!
5//! Operations are split across focused submodules:
6//! - `query` - Query execution (execute, execute_cypher, etc.)
7//! - `crud` - Node/edge CRUD operations
8//! - `index` - Property, vector, and text index management
9//! - `search` - Vector, text, and hybrid search
10//! - `embed` - Embedding model management
11//! - `persistence` - Save, load, snapshots, iteration
12//! - `admin` - Stats, introspection, diagnostics, CDC
13
14#[cfg(feature = "lpg")]
15mod admin;
16#[cfg(feature = "arrow-export")]
17pub mod arrow;
18#[cfg(all(feature = "async-storage", feature = "lpg"))]
19mod async_ops;
20#[cfg(all(feature = "async-storage", feature = "lpg"))]
21pub(crate) mod async_wal_store;
22#[cfg(all(feature = "wal", feature = "grafeo-file"))]
23pub mod backup;
24#[cfg(feature = "lpg")]
25pub(crate) mod catalog_section;
26#[cfg(feature = "cdc")]
27pub(crate) mod cdc_store;
28#[cfg(all(feature = "grafeo-file", feature = "lpg"))]
29mod checkpoint_timer;
30#[cfg(feature = "lpg")]
31mod crud;
32#[cfg(feature = "embed")]
33mod embed;
34#[cfg(feature = "grafeo-file")]
35pub(crate) mod flush;
36#[cfg(feature = "lpg")]
37mod import;
38#[cfg(feature = "lpg")]
39mod index;
40#[cfg(feature = "lpg")]
41mod persistence;
42mod query;
43#[cfg(feature = "triple-store")]
44mod rdf_ops;
45#[cfg(feature = "lpg")]
46mod search;
47pub(crate) mod section_consumer;
48#[cfg(all(feature = "wal", feature = "lpg"))]
49pub(crate) mod wal_store;
50
51use grafeo_common::{grafeo_error, grafeo_warn};
52#[cfg(feature = "wal")]
53use std::path::Path;
54use std::sync::Arc;
55use std::sync::atomic::AtomicUsize;
56
57use parking_lot::RwLock;
58
59use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
60use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind, Result};
61#[cfg(feature = "lpg")]
62use grafeo_core::graph::lpg::LpgStore;
63#[cfg(feature = "triple-store")]
64use grafeo_core::graph::rdf::RdfStore;
65use grafeo_core::graph::{GraphStore, GraphStoreMut};
66#[cfg(feature = "grafeo-file")]
67use grafeo_storage::file::GrafeoFileManager;
68#[cfg(all(feature = "wal", feature = "lpg"))]
69use grafeo_storage::wal::WalRecovery;
70#[cfg(feature = "wal")]
71use grafeo_storage::wal::{DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord};
72
73use crate::catalog::Catalog;
74use crate::config::Config;
75use crate::query::cache::QueryCache;
76use crate::session::Session;
77use crate::transaction::TransactionManager;
78
79/// Your handle to a Grafeo database.
80///
81/// Start here. Create one with [`new_in_memory()`](Self::new_in_memory) for
82/// quick experiments, or [`open()`](Self::open) for persistent storage.
83/// Then grab a [`session()`](Self::session) to start querying.
84///
85/// # Examples
86///
87/// ```
88/// use grafeo_engine::GrafeoDB;
89///
90/// // Quick in-memory database
91/// let db = GrafeoDB::new_in_memory();
92///
93/// // Add some data
94/// db.create_node(&["Person"]);
95///
96/// // Query it
97/// let session = db.session();
98/// let result = session.execute("MATCH (p:Person) RETURN p")?;
99/// # Ok::<(), grafeo_common::utils::error::Error>(())
100/// ```
101pub struct GrafeoDB {
102    /// Database configuration.
103    pub(super) config: Config,
104    /// The underlying graph store (None when using an external store).
105    #[cfg(feature = "lpg")]
106    pub(super) store: Option<Arc<LpgStore>>,
107    /// Schema and metadata catalog shared across sessions.
108    pub(super) catalog: Arc<Catalog>,
109    /// RDF triple store (if RDF feature is enabled).
110    #[cfg(feature = "triple-store")]
111    pub(super) rdf_store: Arc<RdfStore>,
112    /// Transaction manager.
113    pub(super) transaction_manager: Arc<TransactionManager>,
114    /// Unified buffer manager.
115    pub(super) buffer_manager: Arc<BufferManager>,
116    /// Write-ahead log manager (if durability is enabled).
117    #[cfg(feature = "wal")]
118    pub(super) wal: Option<Arc<LpgWal>>,
119    /// Shared WAL graph context tracker. Tracks which named graph was last
120    /// written to the WAL, so concurrent sessions can emit `SwitchGraph`
121    /// records only when the context actually changes.
122    #[cfg(feature = "wal")]
123    pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
124    /// Query cache for parsed and optimized plans.
125    pub(super) query_cache: Arc<QueryCache>,
126    /// Shared commit counter for auto-GC across sessions.
127    pub(super) commit_counter: Arc<AtomicUsize>,
128    /// Whether the database is open.
129    pub(super) is_open: RwLock<bool>,
130    /// Change data capture log for tracking mutations.
131    #[cfg(feature = "cdc")]
132    pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
133    /// Whether CDC is active for new sessions and direct CRUD (runtime-mutable).
134    #[cfg(feature = "cdc")]
135    cdc_enabled: std::sync::atomic::AtomicBool,
136    /// Registered embedding models for text-to-vector conversion.
137    #[cfg(feature = "embed")]
138    pub(super) embedding_models:
139        RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
140    /// Single-file database manager (when using `.grafeo` format).
141    #[cfg(feature = "grafeo-file")]
142    pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
143    /// Periodic checkpoint timer (when `checkpoint_interval` is configured).
144    /// Wrapped in Mutex because `close()` takes `&self` but needs to stop the timer.
145    #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
146    checkpoint_timer: parking_lot::Mutex<Option<checkpoint_timer::CheckpointTimer>>,
147    /// Shared registry of spilled vector storages.
148    /// Used by the search path to create `SpillableVectorAccessor` instances.
149    #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
150    vector_spill_storages: Option<
151        Arc<
152            parking_lot::RwLock<
153                std::collections::HashMap<String, Arc<grafeo_core::index::vector::MmapStorage>>,
154            >,
155        >,
156    >,
157    /// External read-only graph store (when using with_store() or with_read_store()).
158    /// When set, sessions route queries through this store instead of the built-in LpgStore.
159    pub(super) external_read_store: Option<Arc<dyn GraphStore>>,
160    /// External writable graph store (when using with_store()).
161    /// None for read-only databases created via with_read_store().
162    pub(super) external_write_store: Option<Arc<dyn GraphStoreMut>>,
163    /// Metrics registry shared across all sessions.
164    #[cfg(feature = "metrics")]
165    pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
166    /// Persistent graph context for one-shot `execute()` calls.
167    /// When set, each call to `session()` pre-configures the session to this graph.
168    /// Updated after every one-shot `execute()` to reflect `USE GRAPH` / `SESSION RESET`.
169    current_graph: RwLock<Option<String>>,
170    /// Persistent schema context for one-shot `execute()` calls.
171    /// When set, each call to `session()` pre-configures the session to this schema.
172    /// Updated after every one-shot `execute()` to reflect `SESSION SET SCHEMA` / `SESSION RESET`.
173    current_schema: RwLock<Option<String>>,
174    /// Whether this database is open in read-only mode.
175    /// When true, sessions automatically enforce read-only transactions.
176    read_only: bool,
177    /// Named graph projections (virtual subgraphs), shared with sessions.
178    projections:
179        Arc<RwLock<std::collections::HashMap<String, Arc<grafeo_core::graph::GraphProjection>>>>,
180}
181
182impl GrafeoDB {
183    /// Returns a reference to the built-in LPG store.
184    ///
185    /// # Panics
186    ///
187    /// Panics if the database was created with [`with_store()`](Self::with_store) or
188    /// [`with_read_store()`](Self::with_read_store), which use an external store
189    /// instead of the built-in LPG store.
190    #[cfg(feature = "lpg")]
191    fn lpg_store(&self) -> &Arc<LpgStore> {
192        self.store.as_ref().expect(
193            "no built-in LpgStore: this GrafeoDB was created with an external store \
194             (with_store / with_read_store). Use session() or graph_store() instead.",
195        )
196    }
197
198    /// Returns whether CDC is active (runtime check).
199    #[cfg(feature = "cdc")]
200    #[inline]
201    pub(super) fn cdc_active(&self) -> bool {
202        self.cdc_enabled.load(std::sync::atomic::Ordering::Relaxed)
203    }
204
205    /// Creates an in-memory database, fast to create, gone when dropped.
206    ///
207    /// Use this for tests, experiments, or when you don't need persistence.
208    /// For data that survives restarts, use [`open()`](Self::open) instead.
209    ///
210    /// # Panics
211    ///
212    /// Panics if the internal arena allocator cannot be initialized (out of memory).
213    /// Use [`with_config()`](Self::with_config) for a fallible alternative.
214    ///
215    /// # Examples
216    ///
217    /// ```
218    /// use grafeo_engine::GrafeoDB;
219    ///
220    /// let db = GrafeoDB::new_in_memory();
221    /// let session = db.session();
222    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
223    /// # Ok::<(), grafeo_common::utils::error::Error>(())
224    /// ```
225    #[must_use]
226    pub fn new_in_memory() -> Self {
227        Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
228    }
229
230    /// Opens a database at the given path, creating it if it doesn't exist.
231    ///
232    /// If you've used this path before, Grafeo recovers your data from the
233    /// write-ahead log automatically. First open on a new path creates an
234    /// empty database.
235    ///
236    /// # Errors
237    ///
238    /// Returns an error if the path isn't writable or recovery fails.
239    ///
240    /// # Examples
241    ///
242    /// ```no_run
243    /// use grafeo_engine::GrafeoDB;
244    ///
245    /// let db = GrafeoDB::open("./my_social_network")?;
246    /// # Ok::<(), grafeo_common::utils::error::Error>(())
247    /// ```
248    #[cfg(feature = "wal")]
249    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
250        Self::with_config(Config::persistent(path.as_ref()))
251    }
252
253    /// Opens an existing database in read-only mode.
254    ///
255    /// Uses a shared file lock, so multiple processes can read the same
256    /// `.grafeo` file concurrently. The database loads the last checkpoint
257    /// snapshot but does **not** replay the WAL or allow mutations.
258    ///
259    /// Currently only supports the single-file (`.grafeo`) format.
260    ///
261    /// # Errors
262    ///
263    /// Returns an error if the file doesn't exist or can't be read.
264    ///
265    /// # Examples
266    ///
267    /// ```no_run
268    /// use grafeo_engine::GrafeoDB;
269    ///
270    /// let db = GrafeoDB::open_read_only("./my_graph.grafeo")?;
271    /// let session = db.session();
272    /// let result = session.execute("MATCH (n) RETURN n LIMIT 10")?;
273    /// // Mutations will return an error:
274    /// // session.execute("INSERT (:Person)") => Err(ReadOnly)
275    /// # Ok::<(), grafeo_common::utils::error::Error>(())
276    /// ```
277    #[cfg(feature = "grafeo-file")]
278    pub fn open_read_only(path: impl AsRef<std::path::Path>) -> Result<Self> {
279        Self::with_config(Config::read_only(path.as_ref()))
280    }
281
282    /// Creates a database with custom configuration.
283    ///
284    /// Use this when you need fine-grained control over memory limits,
285    /// thread counts, or persistence settings. For most cases,
286    /// [`new_in_memory()`](Self::new_in_memory) or [`open()`](Self::open)
287    /// are simpler.
288    ///
289    /// # Errors
290    ///
291    /// Returns an error if the database can't be created or recovery fails.
292    ///
293    /// # Examples
294    ///
295    /// ```
296    /// use grafeo_engine::{GrafeoDB, Config};
297    ///
298    /// // In-memory with a 512MB limit
299    /// let config = Config::in_memory()
300    ///     .with_memory_limit(512 * 1024 * 1024);
301    ///
302    /// let db = GrafeoDB::with_config(config)?;
303    /// # Ok::<(), grafeo_common::utils::error::Error>(())
304    /// ```
305    pub fn with_config(config: Config) -> Result<Self> {
306        // Validate configuration before proceeding
307        config
308            .validate()
309            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
310
311        #[cfg(feature = "lpg")]
312        let store = Arc::new(LpgStore::new()?);
313        #[cfg(feature = "triple-store")]
314        let rdf_store = Arc::new(RdfStore::new());
315        let transaction_manager = Arc::new(TransactionManager::new());
316
317        // Create buffer manager with configured limits
318        let buffer_config = BufferManagerConfig {
319            budget: config.memory_limit.unwrap_or_else(|| {
320                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
321            }),
322            spill_path: config.spill_path.clone().or_else(|| {
323                config.path.as_ref().and_then(|p| {
324                    let parent = p.parent()?;
325                    let name = p.file_name()?.to_str()?;
326                    Some(parent.join(format!("{name}.spill")))
327                })
328            }),
329            ..BufferManagerConfig::default()
330        };
331        let buffer_manager = BufferManager::new(buffer_config);
332
333        // Create catalog early so WAL replay can restore schema definitions
334        let catalog = Arc::new(Catalog::new());
335
336        let is_read_only = config.access_mode == crate::config::AccessMode::ReadOnly;
337
338        // --- Single-file format (.grafeo) ---
339        #[cfg(feature = "grafeo-file")]
340        let file_manager: Option<Arc<GrafeoFileManager>> = if is_read_only {
341            // Read-only mode: open with shared lock, load snapshot, skip WAL
342            if let Some(ref db_path) = config.path {
343                if db_path.exists() && db_path.is_file() {
344                    let fm = GrafeoFileManager::open_read_only(db_path)?;
345                    // Try v2 section-based format first
346                    #[cfg(feature = "lpg")]
347                    if fm.read_section_directory()?.is_some() {
348                        Self::load_from_sections(
349                            &fm,
350                            &store,
351                            &catalog,
352                            #[cfg(feature = "triple-store")]
353                            &rdf_store,
354                        )?;
355                    } else {
356                        // Fall back to v1 blob format
357                        let snapshot_data = fm.read_snapshot()?;
358                        if !snapshot_data.is_empty() {
359                            Self::apply_snapshot_data(
360                                &store,
361                                &catalog,
362                                #[cfg(feature = "triple-store")]
363                                &rdf_store,
364                                &snapshot_data,
365                            )?;
366                        }
367                    }
368                    Some(Arc::new(fm))
369                } else {
370                    return Err(grafeo_common::utils::error::Error::Internal(format!(
371                        "read-only open requires an existing .grafeo file: {}",
372                        db_path.display()
373                    )));
374                }
375            } else {
376                return Err(grafeo_common::utils::error::Error::Internal(
377                    "read-only mode requires a database path".to_string(),
378                ));
379            }
380        } else if let Some(ref db_path) = config.path {
381            // Initialize the file manager whenever single-file format is selected,
382            // regardless of whether WAL is enabled. Without this, a database opened
383            // with wal_enabled:false + StorageFormat::SingleFile would produce no
384            // output at all (the file manager was previously gated behind wal_enabled).
385            if Self::should_use_single_file(db_path, config.storage_format) {
386                let fm = if db_path.exists() && db_path.is_file() {
387                    GrafeoFileManager::open(db_path)?
388                } else if !db_path.exists() {
389                    GrafeoFileManager::create(db_path)?
390                } else {
391                    // Path exists but is not a file (directory, etc.)
392                    return Err(grafeo_common::utils::error::Error::Internal(format!(
393                        "path exists but is not a file: {}",
394                        db_path.display()
395                    )));
396                };
397
398                // Load data: try v2 section-based format, fall back to v1 blob
399                #[cfg(feature = "lpg")]
400                if fm.read_section_directory()?.is_some() {
401                    Self::load_from_sections(
402                        &fm,
403                        &store,
404                        &catalog,
405                        #[cfg(feature = "triple-store")]
406                        &rdf_store,
407                    )?;
408                } else {
409                    let snapshot_data = fm.read_snapshot()?;
410                    if !snapshot_data.is_empty() {
411                        Self::apply_snapshot_data(
412                            &store,
413                            &catalog,
414                            #[cfg(feature = "triple-store")]
415                            &rdf_store,
416                            &snapshot_data,
417                        )?;
418                    }
419                }
420
421                // Recover sidecar WAL if WAL is enabled and a sidecar exists
422                #[cfg(all(feature = "wal", feature = "lpg"))]
423                if config.wal_enabled && fm.has_sidecar_wal() {
424                    let recovery = WalRecovery::new(fm.sidecar_wal_path());
425                    let records = recovery.recover()?;
426                    Self::apply_wal_records(
427                        &store,
428                        &catalog,
429                        #[cfg(feature = "triple-store")]
430                        &rdf_store,
431                        &records,
432                    )?;
433                }
434
435                Some(Arc::new(fm))
436            } else {
437                None
438            }
439        } else {
440            None
441        };
442
443        // Determine whether to use the WAL directory path (legacy) or sidecar
444        // Read-only mode skips WAL entirely (no recovery, no creation).
445        #[cfg(feature = "wal")]
446        let wal = if is_read_only {
447            None
448        } else if config.wal_enabled {
449            if let Some(ref db_path) = config.path {
450                // When using single-file format, the WAL is a sidecar directory
451                #[cfg(feature = "grafeo-file")]
452                let wal_path = if let Some(ref fm) = file_manager {
453                    let p = fm.sidecar_wal_path();
454                    std::fs::create_dir_all(&p)?;
455                    p
456                } else {
457                    // Legacy: WAL inside the database directory
458                    std::fs::create_dir_all(db_path)?;
459                    db_path.join("wal")
460                };
461
462                #[cfg(not(feature = "grafeo-file"))]
463                let wal_path = {
464                    std::fs::create_dir_all(db_path)?;
465                    db_path.join("wal")
466                };
467
468                // For legacy WAL directory format, check if WAL exists and recover
469                #[cfg(all(feature = "lpg", feature = "grafeo-file"))]
470                let is_single_file = file_manager.is_some();
471                #[cfg(all(feature = "lpg", not(feature = "grafeo-file")))]
472                let is_single_file = false;
473
474                #[cfg(feature = "lpg")]
475                if !is_single_file && wal_path.exists() {
476                    let recovery = WalRecovery::new(&wal_path);
477                    let records = recovery.recover()?;
478                    Self::apply_wal_records(
479                        &store,
480                        &catalog,
481                        #[cfg(feature = "triple-store")]
482                        &rdf_store,
483                        &records,
484                    )?;
485                }
486
487                // Open/create WAL manager with configured durability
488                let wal_durability = match config.wal_durability {
489                    crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
490                    crate::config::DurabilityMode::Batch {
491                        max_delay_ms,
492                        max_records,
493                    } => WalDurabilityMode::Batch {
494                        max_delay_ms,
495                        max_records,
496                    },
497                    crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
498                        WalDurabilityMode::Adaptive { target_interval_ms }
499                    }
500                    crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
501                };
502                let wal_config = WalConfig {
503                    durability: wal_durability,
504                    ..WalConfig::default()
505                };
506                let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
507                Some(Arc::new(wal_manager))
508            } else {
509                None
510            }
511        } else {
512            None
513        };
514
515        // Create query cache with default capacity (1000 queries)
516        let query_cache = Arc::new(QueryCache::default());
517
518        // After all snapshot/WAL recovery, sync TransactionManager epoch
519        // with the store so queries use the correct viewing epoch.
520        #[cfg(all(feature = "temporal", feature = "lpg"))]
521        transaction_manager.sync_epoch(store.current_epoch());
522
523        #[cfg(feature = "cdc")]
524        let cdc_enabled_val = config.cdc_enabled;
525        #[cfg(feature = "cdc")]
526        let cdc_retention = config.cdc_retention.clone();
527
528        // Clone Arcs for the checkpoint timer before moving originals into the struct.
529        // The timer captures its own references and runs in a background thread.
530        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
531        let checkpoint_interval = config.checkpoint_interval;
532        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
533        let timer_store = Arc::clone(&store);
534        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
535        let timer_catalog = Arc::clone(&catalog);
536        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
537        let timer_tm = Arc::clone(&transaction_manager);
538        #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "triple-store"))]
539        let timer_rdf = Arc::clone(&rdf_store);
540        #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "wal"))]
541        let timer_wal = wal.clone();
542
543        let mut db = Self {
544            config,
545            #[cfg(feature = "lpg")]
546            store: Some(store),
547            catalog,
548            #[cfg(feature = "triple-store")]
549            rdf_store,
550            transaction_manager,
551            buffer_manager,
552            #[cfg(feature = "wal")]
553            wal,
554            #[cfg(feature = "wal")]
555            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
556            query_cache,
557            commit_counter: Arc::new(AtomicUsize::new(0)),
558            is_open: RwLock::new(true),
559            #[cfg(feature = "cdc")]
560            cdc_log: Arc::new(crate::cdc::CdcLog::with_retention(cdc_retention)),
561            #[cfg(feature = "cdc")]
562            cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
563            #[cfg(feature = "embed")]
564            embedding_models: RwLock::new(hashbrown::HashMap::new()),
565            #[cfg(feature = "grafeo-file")]
566            file_manager,
567            #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
568            checkpoint_timer: parking_lot::Mutex::new(None),
569            #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
570            vector_spill_storages: None,
571            external_read_store: None,
572            external_write_store: None,
573            #[cfg(feature = "metrics")]
574            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
575            current_graph: RwLock::new(None),
576            current_schema: RwLock::new(None),
577            read_only: is_read_only,
578            projections: Arc::new(RwLock::new(std::collections::HashMap::new())),
579        };
580
581        // Register storage sections as memory consumers for pressure tracking
582        db.register_section_consumers();
583
584        // Start periodic checkpoint timer if configured
585        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
586        if let (Some(interval), Some(fm)) = (checkpoint_interval, &db.file_manager)
587            && !is_read_only
588        {
589            *db.checkpoint_timer.lock() = Some(checkpoint_timer::CheckpointTimer::start(
590                interval,
591                Arc::clone(fm),
592                timer_store,
593                timer_catalog,
594                timer_tm,
595                #[cfg(feature = "triple-store")]
596                timer_rdf,
597                #[cfg(feature = "wal")]
598                timer_wal,
599            ));
600        }
601
602        // Discover existing spill files from a previous session.
603        // If vectors were spilled before close, the spill files persist on disk
604        // and need to be re-mapped so search can read from them.
605        #[cfg(all(
606            feature = "lpg",
607            feature = "vector-index",
608            feature = "mmap",
609            not(feature = "temporal")
610        ))]
611        db.restore_spill_files();
612
613        // If VectorStore is configured as ForceDisk, immediately spill embeddings.
614        // This must happen after register_section_consumers() which creates the consumer.
615        #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
616        if db
617            .config
618            .section_configs
619            .get(&grafeo_common::storage::SectionType::VectorStore)
620            .is_some_and(|c| c.tier == grafeo_common::storage::TierOverride::ForceDisk)
621        {
622            db.buffer_manager.spill_all();
623        }
624
625        Ok(db)
626    }
627
628    /// Creates a database backed by a custom [`GraphStoreMut`] implementation.
629    ///
630    /// The external store handles all data persistence. WAL, CDC, and index
631    /// management are the responsibility of the store implementation.
632    ///
633    /// Query execution (all 6 languages, optimizer, planner) works through the
634    /// provided store. Admin operations (schema introspection, persistence,
635    /// vector/text indexes) are not available on external stores.
636    ///
637    /// # Examples
638    ///
639    /// ```no_run
640    /// use std::sync::Arc;
641    /// use grafeo_engine::{GrafeoDB, Config};
642    /// use grafeo_core::graph::GraphStoreMut;
643    ///
644    /// fn example(store: Arc<dyn GraphStoreMut>) -> grafeo_common::utils::error::Result<()> {
645    ///     let db = GrafeoDB::with_store(store, Config::in_memory())?;
646    ///     let result = db.execute("MATCH (n) RETURN count(n)")?;
647    ///     Ok(())
648    /// }
649    /// ```
650    ///
651    /// # Errors
652    ///
653    /// Returns an error if config validation fails.
654    ///
655    /// [`GraphStoreMut`]: grafeo_core::graph::GraphStoreMut
656    pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
657        config
658            .validate()
659            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
660
661        let transaction_manager = Arc::new(TransactionManager::new());
662
663        let buffer_config = BufferManagerConfig {
664            budget: config.memory_limit.unwrap_or_else(|| {
665                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
666            }),
667            spill_path: None,
668            ..BufferManagerConfig::default()
669        };
670        let buffer_manager = BufferManager::new(buffer_config);
671
672        let query_cache = Arc::new(QueryCache::default());
673
674        #[cfg(feature = "cdc")]
675        let cdc_enabled_val = config.cdc_enabled;
676
677        Ok(Self {
678            config,
679            #[cfg(feature = "lpg")]
680            store: None,
681            catalog: Arc::new(Catalog::new()),
682            #[cfg(feature = "triple-store")]
683            rdf_store: Arc::new(RdfStore::new()),
684            transaction_manager,
685            buffer_manager,
686            #[cfg(feature = "wal")]
687            wal: None,
688            #[cfg(feature = "wal")]
689            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
690            query_cache,
691            commit_counter: Arc::new(AtomicUsize::new(0)),
692            is_open: RwLock::new(true),
693            #[cfg(feature = "cdc")]
694            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
695            #[cfg(feature = "cdc")]
696            cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
697            #[cfg(feature = "embed")]
698            embedding_models: RwLock::new(hashbrown::HashMap::new()),
699            #[cfg(feature = "grafeo-file")]
700            file_manager: None,
701            #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
702            checkpoint_timer: parking_lot::Mutex::new(None),
703            #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
704            vector_spill_storages: None,
705            external_read_store: Some(Arc::clone(&store) as Arc<dyn GraphStore>),
706            external_write_store: Some(store),
707            #[cfg(feature = "metrics")]
708            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
709            current_graph: RwLock::new(None),
710            current_schema: RwLock::new(None),
711            read_only: false,
712            projections: Arc::new(RwLock::new(std::collections::HashMap::new())),
713        })
714    }
715
716    /// Creates a database backed by a read-only [`GraphStore`].
717    ///
718    /// The database is set to read-only mode. Write queries (CREATE, SET,
719    /// DELETE) will return `TransactionError::ReadOnly`.
720    ///
721    /// # Examples
722    ///
723    /// ```no_run
724    /// use std::sync::Arc;
725    /// use grafeo_engine::{GrafeoDB, Config};
726    /// use grafeo_core::graph::GraphStore;
727    ///
728    /// fn example(store: Arc<dyn GraphStore>) -> grafeo_common::utils::error::Result<()> {
729    ///     let db = GrafeoDB::with_read_store(store, Config::in_memory())?;
730    ///     let result = db.execute("MATCH (n) RETURN count(n)")?;
731    ///     Ok(())
732    /// }
733    /// ```
734    ///
735    /// # Errors
736    ///
737    /// Returns an error if config validation fails.
738    ///
739    /// [`GraphStore`]: grafeo_core::graph::GraphStore
740    pub fn with_read_store(store: Arc<dyn GraphStore>, config: Config) -> Result<Self> {
741        config
742            .validate()
743            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
744
745        let transaction_manager = Arc::new(TransactionManager::new());
746
747        let buffer_config = BufferManagerConfig {
748            budget: config.memory_limit.unwrap_or_else(|| {
749                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
750            }),
751            spill_path: None,
752            ..BufferManagerConfig::default()
753        };
754        let buffer_manager = BufferManager::new(buffer_config);
755
756        let query_cache = Arc::new(QueryCache::default());
757
758        #[cfg(feature = "cdc")]
759        let cdc_enabled_val = config.cdc_enabled;
760
761        Ok(Self {
762            config,
763            #[cfg(feature = "lpg")]
764            store: None,
765            catalog: Arc::new(Catalog::new()),
766            #[cfg(feature = "triple-store")]
767            rdf_store: Arc::new(RdfStore::new()),
768            transaction_manager,
769            buffer_manager,
770            #[cfg(feature = "wal")]
771            wal: None,
772            #[cfg(feature = "wal")]
773            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
774            query_cache,
775            commit_counter: Arc::new(AtomicUsize::new(0)),
776            is_open: RwLock::new(true),
777            #[cfg(feature = "cdc")]
778            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
779            #[cfg(feature = "cdc")]
780            cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
781            #[cfg(feature = "embed")]
782            embedding_models: RwLock::new(hashbrown::HashMap::new()),
783            #[cfg(feature = "grafeo-file")]
784            file_manager: None,
785            #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
786            checkpoint_timer: parking_lot::Mutex::new(None),
787            #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
788            vector_spill_storages: None,
789            external_read_store: Some(store),
790            external_write_store: None,
791            #[cfg(feature = "metrics")]
792            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
793            current_graph: RwLock::new(None),
794            current_schema: RwLock::new(None),
795            read_only: true,
796            projections: Arc::new(RwLock::new(std::collections::HashMap::new())),
797        })
798    }
799
800    /// Converts the database to a read-only [`CompactStore`] for faster queries.
801    ///
802    /// Takes a snapshot of all nodes and edges from the current store, builds
803    /// a columnar `CompactStore` with CSR adjacency, and switches the database
804    /// to read-only mode. The original store is dropped to free memory.
805    ///
806    /// After calling this, all write queries will fail with
807    /// `TransactionError::ReadOnly`. Read queries (across all supported
808    /// languages) continue to work and benefit from ~60x memory reduction
809    /// and 100x+ traversal speedup.
810    ///
811    /// # Errors
812    ///
813    /// Returns an error if the conversion fails (e.g. more than 32,767
814    /// distinct labels or edge types).
815    ///
816    /// [`CompactStore`]: grafeo_core::graph::compact::CompactStore
817    #[cfg(feature = "compact-store")]
818    pub fn compact(&mut self) -> Result<()> {
819        use grafeo_core::graph::compact::from_graph_store;
820
821        let current_store = self.graph_store();
822        let compact = from_graph_store(current_store.as_ref())
823            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
824
825        self.external_read_store = Some(Arc::new(compact) as Arc<dyn GraphStore>);
826        self.external_write_store = None;
827        #[cfg(feature = "lpg")]
828        {
829            self.store = None;
830        }
831        self.read_only = true;
832        self.query_cache = Arc::new(QueryCache::default());
833        // Projections hold Arc refs to the old store: clear them so they don't
834        // serve stale data or prevent the old store's memory from being freed.
835        self.projections.write().clear();
836
837        Ok(())
838    }
839
840    /// Applies WAL records to restore the database state.
841    ///
842    /// Data mutation records are routed through a graph cursor that tracks
843    /// `SwitchGraph` context markers, replaying mutations into the correct
844    /// named graph (or the default graph when cursor is `None`).
845    #[cfg(all(feature = "wal", feature = "lpg"))]
846    fn apply_wal_records(
847        store: &Arc<LpgStore>,
848        catalog: &Catalog,
849        #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
850        records: &[WalRecord],
851    ) -> Result<()> {
852        use crate::catalog::{
853            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
854        };
855        use grafeo_common::utils::error::Error;
856
857        // Graph cursor: tracks which named graph receives data mutations.
858        // `None` means the default graph.
859        let mut current_graph: Option<String> = None;
860        let mut target_store: Arc<LpgStore> = Arc::clone(store);
861
862        for record in records {
863            match record {
864                // --- Named graph lifecycle ---
865                WalRecord::CreateNamedGraph { name } => {
866                    let _ = store.create_graph(name);
867                }
868                WalRecord::DropNamedGraph { name } => {
869                    store.drop_graph(name);
870                    // Reset cursor if the dropped graph was active
871                    if current_graph.as_deref() == Some(name.as_str()) {
872                        current_graph = None;
873                        target_store = Arc::clone(store);
874                    }
875                }
876                WalRecord::SwitchGraph { name } => {
877                    current_graph.clone_from(name);
878                    target_store = match &current_graph {
879                        None => Arc::clone(store),
880                        Some(graph_name) => store
881                            .graph_or_create(graph_name)
882                            .map_err(|e| Error::Internal(e.to_string()))?,
883                    };
884                }
885
886                // --- Data mutations: routed through target_store ---
887                WalRecord::CreateNode { id, labels } => {
888                    let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
889                    target_store.create_node_with_id(*id, &label_refs)?;
890                }
891                WalRecord::DeleteNode { id } => {
892                    target_store.delete_node(*id);
893                }
894                WalRecord::CreateEdge {
895                    id,
896                    src,
897                    dst,
898                    edge_type,
899                } => {
900                    target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
901                }
902                WalRecord::DeleteEdge { id } => {
903                    target_store.delete_edge(*id);
904                }
905                WalRecord::SetNodeProperty { id, key, value } => {
906                    target_store.set_node_property(*id, key, value.clone());
907                }
908                WalRecord::SetEdgeProperty { id, key, value } => {
909                    target_store.set_edge_property(*id, key, value.clone());
910                }
911                WalRecord::AddNodeLabel { id, label } => {
912                    target_store.add_label(*id, label);
913                }
914                WalRecord::RemoveNodeLabel { id, label } => {
915                    target_store.remove_label(*id, label);
916                }
917                WalRecord::RemoveNodeProperty { id, key } => {
918                    target_store.remove_node_property(*id, key);
919                }
920                WalRecord::RemoveEdgeProperty { id, key } => {
921                    target_store.remove_edge_property(*id, key);
922                }
923
924                // --- Schema DDL replay (always on root catalog) ---
925                WalRecord::CreateNodeType {
926                    name,
927                    properties,
928                    constraints,
929                } => {
930                    let def = NodeTypeDefinition {
931                        name: name.clone(),
932                        properties: properties
933                            .iter()
934                            .map(|(n, t, nullable)| TypedProperty {
935                                name: n.clone(),
936                                data_type: PropertyDataType::from_type_name(t),
937                                nullable: *nullable,
938                                default_value: None,
939                            })
940                            .collect(),
941                        constraints: constraints
942                            .iter()
943                            .map(|(kind, props)| match kind.as_str() {
944                                "unique" => TypeConstraint::Unique(props.clone()),
945                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
946                                "not_null" if !props.is_empty() => {
947                                    TypeConstraint::NotNull(props[0].clone())
948                                }
949                                _ => TypeConstraint::Unique(props.clone()),
950                            })
951                            .collect(),
952                        parent_types: Vec::new(),
953                    };
954                    let _ = catalog.register_node_type(def);
955                }
956                WalRecord::DropNodeType { name } => {
957                    let _ = catalog.drop_node_type(name);
958                }
959                WalRecord::CreateEdgeType {
960                    name,
961                    properties,
962                    constraints,
963                } => {
964                    let def = EdgeTypeDefinition {
965                        name: name.clone(),
966                        properties: properties
967                            .iter()
968                            .map(|(n, t, nullable)| TypedProperty {
969                                name: n.clone(),
970                                data_type: PropertyDataType::from_type_name(t),
971                                nullable: *nullable,
972                                default_value: None,
973                            })
974                            .collect(),
975                        constraints: constraints
976                            .iter()
977                            .map(|(kind, props)| match kind.as_str() {
978                                "unique" => TypeConstraint::Unique(props.clone()),
979                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
980                                "not_null" if !props.is_empty() => {
981                                    TypeConstraint::NotNull(props[0].clone())
982                                }
983                                _ => TypeConstraint::Unique(props.clone()),
984                            })
985                            .collect(),
986                        source_node_types: Vec::new(),
987                        target_node_types: Vec::new(),
988                    };
989                    let _ = catalog.register_edge_type_def(def);
990                }
991                WalRecord::DropEdgeType { name } => {
992                    let _ = catalog.drop_edge_type_def(name);
993                }
994                WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
995                    // Index recreation is handled by the store on startup
996                    // (indexes are rebuilt from data, not WAL)
997                }
998                WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
999                    // Constraint definitions are part of type definitions
1000                    // and replayed via CreateNodeType/CreateEdgeType
1001                }
1002                WalRecord::CreateGraphType {
1003                    name,
1004                    node_types,
1005                    edge_types,
1006                    open,
1007                } => {
1008                    use crate::catalog::GraphTypeDefinition;
1009                    let def = GraphTypeDefinition {
1010                        name: name.clone(),
1011                        allowed_node_types: node_types.clone(),
1012                        allowed_edge_types: edge_types.clone(),
1013                        open: *open,
1014                    };
1015                    let _ = catalog.register_graph_type(def);
1016                }
1017                WalRecord::DropGraphType { name } => {
1018                    let _ = catalog.drop_graph_type(name);
1019                }
1020                WalRecord::CreateSchema { name } => {
1021                    let _ = catalog.register_schema_namespace(name.clone());
1022                }
1023                WalRecord::DropSchema { name } => {
1024                    let _ = catalog.drop_schema_namespace(name);
1025                }
1026
1027                WalRecord::AlterNodeType { name, alterations } => {
1028                    for (action, prop_name, type_name, nullable) in alterations {
1029                        match action.as_str() {
1030                            "add" => {
1031                                let prop = TypedProperty {
1032                                    name: prop_name.clone(),
1033                                    data_type: PropertyDataType::from_type_name(type_name),
1034                                    nullable: *nullable,
1035                                    default_value: None,
1036                                };
1037                                let _ = catalog.alter_node_type_add_property(name, prop);
1038                            }
1039                            "drop" => {
1040                                let _ = catalog.alter_node_type_drop_property(name, prop_name);
1041                            }
1042                            _ => {}
1043                        }
1044                    }
1045                }
1046                WalRecord::AlterEdgeType { name, alterations } => {
1047                    for (action, prop_name, type_name, nullable) in alterations {
1048                        match action.as_str() {
1049                            "add" => {
1050                                let prop = TypedProperty {
1051                                    name: prop_name.clone(),
1052                                    data_type: PropertyDataType::from_type_name(type_name),
1053                                    nullable: *nullable,
1054                                    default_value: None,
1055                                };
1056                                let _ = catalog.alter_edge_type_add_property(name, prop);
1057                            }
1058                            "drop" => {
1059                                let _ = catalog.alter_edge_type_drop_property(name, prop_name);
1060                            }
1061                            _ => {}
1062                        }
1063                    }
1064                }
1065                WalRecord::AlterGraphType { name, alterations } => {
1066                    for (action, type_name) in alterations {
1067                        match action.as_str() {
1068                            "add_node" => {
1069                                let _ =
1070                                    catalog.alter_graph_type_add_node_type(name, type_name.clone());
1071                            }
1072                            "drop_node" => {
1073                                let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
1074                            }
1075                            "add_edge" => {
1076                                let _ =
1077                                    catalog.alter_graph_type_add_edge_type(name, type_name.clone());
1078                            }
1079                            "drop_edge" => {
1080                                let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
1081                            }
1082                            _ => {}
1083                        }
1084                    }
1085                }
1086
1087                WalRecord::CreateProcedure {
1088                    name,
1089                    params,
1090                    returns,
1091                    body,
1092                } => {
1093                    use crate::catalog::ProcedureDefinition;
1094                    let def = ProcedureDefinition {
1095                        name: name.clone(),
1096                        params: params.clone(),
1097                        returns: returns.clone(),
1098                        body: body.clone(),
1099                    };
1100                    let _ = catalog.register_procedure(def);
1101                }
1102                WalRecord::DropProcedure { name } => {
1103                    let _ = catalog.drop_procedure(name);
1104                }
1105
1106                // --- RDF triple replay ---
1107                #[cfg(feature = "triple-store")]
1108                WalRecord::InsertRdfTriple { .. }
1109                | WalRecord::DeleteRdfTriple { .. }
1110                | WalRecord::ClearRdfGraph { .. }
1111                | WalRecord::CreateRdfGraph { .. }
1112                | WalRecord::DropRdfGraph { .. } => {
1113                    rdf_ops::replay_rdf_wal_record(rdf_store, record);
1114                }
1115                #[cfg(not(feature = "triple-store"))]
1116                WalRecord::InsertRdfTriple { .. }
1117                | WalRecord::DeleteRdfTriple { .. }
1118                | WalRecord::ClearRdfGraph { .. }
1119                | WalRecord::CreateRdfGraph { .. }
1120                | WalRecord::DropRdfGraph { .. } => {}
1121
1122                WalRecord::TransactionCommit { .. } => {
1123                    // In temporal mode, advance the store epoch on each committed
1124                    // transaction so that subsequent property/label operations
1125                    // are recorded at the correct epoch in their VersionLogs.
1126                    #[cfg(feature = "temporal")]
1127                    {
1128                        target_store.new_epoch();
1129                    }
1130                }
1131                WalRecord::TransactionAbort { .. } | WalRecord::Checkpoint { .. } => {
1132                    // Transaction control records don't need replay action
1133                    // (recovery already filtered to only committed transactions)
1134                }
1135                WalRecord::EpochAdvance { .. } => {
1136                    // Metadata record: no store mutation needed.
1137                    // Used by incremental backup and point-in-time recovery.
1138                }
1139            }
1140        }
1141        Ok(())
1142    }
1143
1144    // =========================================================================
1145    // Single-file format helpers
1146    // =========================================================================
1147
1148    /// Returns `true` if the given path should use single-file format.
1149    #[cfg(feature = "grafeo-file")]
1150    fn should_use_single_file(
1151        path: &std::path::Path,
1152        configured: crate::config::StorageFormat,
1153    ) -> bool {
1154        use crate::config::StorageFormat;
1155        match configured {
1156            StorageFormat::SingleFile => true,
1157            StorageFormat::WalDirectory => false,
1158            StorageFormat::Auto => {
1159                // Existing file: check magic bytes
1160                if path.is_file() {
1161                    if let Ok(mut f) = std::fs::File::open(path) {
1162                        use std::io::Read;
1163                        let mut magic = [0u8; 4];
1164                        if f.read_exact(&mut magic).is_ok() && magic == grafeo_storage::file::MAGIC
1165                        {
1166                            return true;
1167                        }
1168                    }
1169                    return false;
1170                }
1171                // Existing directory: legacy format
1172                if path.is_dir() {
1173                    return false;
1174                }
1175                // New path: check extension
1176                path.extension().is_some_and(|ext| ext == "grafeo")
1177            }
1178        }
1179    }
1180
1181    /// Applies snapshot data (from a `.grafeo` file) to restore the store and catalog.
1182    ///
1183    /// Supports both v1 (monolithic blob) and v2 (section-based) formats.
1184    #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
1185    fn apply_snapshot_data(
1186        store: &Arc<LpgStore>,
1187        catalog: &Arc<crate::catalog::Catalog>,
1188        #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
1189        data: &[u8],
1190    ) -> Result<()> {
1191        // v1 blob format: pass through to legacy loader
1192        persistence::load_snapshot_into_store(
1193            store,
1194            catalog,
1195            #[cfg(feature = "triple-store")]
1196            rdf_store,
1197            data,
1198        )
1199    }
1200
1201    /// Loads from a section-based `.grafeo` file (v2 format).
1202    ///
1203    /// Reads the section directory, then deserializes each section independently.
1204    #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
1205    fn load_from_sections(
1206        fm: &GrafeoFileManager,
1207        store: &Arc<LpgStore>,
1208        catalog: &Arc<crate::catalog::Catalog>,
1209        #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
1210    ) -> Result<()> {
1211        use grafeo_common::storage::{Section, SectionType};
1212
1213        let dir = fm.read_section_directory()?.ok_or_else(|| {
1214            grafeo_common::utils::error::Error::Internal(
1215                "expected v2 section directory but found none".to_string(),
1216            )
1217        })?;
1218
1219        // Load catalog section first (schema defs needed before data)
1220        if let Some(entry) = dir.find(SectionType::Catalog) {
1221            let data = fm.read_section_data(entry)?;
1222            let tm = Arc::new(crate::transaction::TransactionManager::new());
1223            let mut section = catalog_section::CatalogSection::new(
1224                Arc::clone(catalog),
1225                Arc::clone(store),
1226                move || tm.current_epoch().as_u64(),
1227            );
1228            section.deserialize(&data)?;
1229        }
1230
1231        // Load LPG store
1232        if let Some(entry) = dir.find(SectionType::LpgStore) {
1233            let data = fm.read_section_data(entry)?;
1234            let mut section = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
1235            section.deserialize(&data)?;
1236        }
1237
1238        // Load RDF store
1239        #[cfg(feature = "triple-store")]
1240        if let Some(entry) = dir.find(SectionType::RdfStore) {
1241            let data = fm.read_section_data(entry)?;
1242            let mut section = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(rdf_store));
1243            section.deserialize(&data)?;
1244        }
1245
1246        // Restore Ring Index (if persisted)
1247        #[cfg(feature = "ring-index")]
1248        if let Some(entry) = dir.find(SectionType::RdfRing) {
1249            let data = fm.read_section_data(entry)?;
1250            let mut section = grafeo_core::index::ring::RdfRingSection::new(Arc::clone(rdf_store));
1251            section.deserialize(&data)?;
1252        }
1253
1254        // Restore HNSW topology (if vector indexes exist in both catalog and section)
1255        #[cfg(feature = "vector-index")]
1256        if let Some(entry) = dir.find(SectionType::VectorStore) {
1257            let data = fm.read_section_data(entry)?;
1258            let indexes = store.vector_index_entries();
1259            if !indexes.is_empty() {
1260                let mut section = grafeo_core::index::vector::VectorStoreSection::new(indexes);
1261                section.deserialize(&data)?;
1262            }
1263        }
1264
1265        // Restore BM25 postings (if text indexes exist in both catalog and section)
1266        #[cfg(feature = "text-index")]
1267        if let Some(entry) = dir.find(SectionType::TextIndex) {
1268            let data = fm.read_section_data(entry)?;
1269            let indexes = store.text_index_entries();
1270            if !indexes.is_empty() {
1271                let mut section = grafeo_core::index::text::TextIndexSection::new(indexes);
1272                section.deserialize(&data)?;
1273            }
1274        }
1275
1276        Ok(())
1277    }
1278
1279    // =========================================================================
1280    // Session & Configuration
1281    // =========================================================================
1282
1283    /// Opens a new session for running queries.
1284    ///
1285    /// Sessions are cheap to create: spin up as many as you need. Each
1286    /// gets its own transaction context, so concurrent sessions won't
1287    /// block each other on reads.
1288    ///
1289    /// # Panics
1290    ///
1291    /// Panics if the database was configured with an external graph store and
1292    /// the internal arena allocator cannot be initialized (out of memory).
1293    ///
1294    /// # Examples
1295    ///
1296    /// ```
1297    /// use grafeo_engine::GrafeoDB;
1298    ///
1299    /// let db = GrafeoDB::new_in_memory();
1300    /// let session = db.session();
1301    ///
1302    /// // Run queries through the session
1303    /// let result = session.execute("MATCH (n) RETURN count(n)")?;
1304    /// # Ok::<(), grafeo_common::utils::error::Error>(())
1305    /// ```
1306    #[must_use]
1307    pub fn session(&self) -> Session {
1308        self.create_session_inner(None)
1309    }
1310
1311    /// Creates a session scoped to the given identity.
1312    ///
1313    /// The identity determines what operations the session is allowed to
1314    /// perform. A [`Role::ReadOnly`](crate::auth::Role::ReadOnly) identity
1315    /// creates a read-only session; a [`Role::ReadWrite`](crate::auth::Role::ReadWrite)
1316    /// identity allows data mutations but not schema DDL; a
1317    /// [`Role::Admin`](crate::auth::Role::Admin) identity has full access.
1318    ///
1319    /// # Examples
1320    ///
1321    /// ```
1322    /// use grafeo_engine::{GrafeoDB, auth::{Identity, Role}};
1323    ///
1324    /// let db = GrafeoDB::new_in_memory();
1325    /// let identity = Identity::new("app-service", [Role::ReadWrite]);
1326    /// let session = db.session_with_identity(identity);
1327    /// ```
1328    #[must_use]
1329    pub fn session_with_identity(&self, identity: crate::auth::Identity) -> Session {
1330        let force_read_only = !identity.can_write();
1331        self.create_session_inner_full(None, force_read_only, identity)
1332    }
1333
1334    /// Creates a session scoped to a single role.
1335    ///
1336    /// Convenience shorthand for
1337    /// `session_with_identity(Identity::new("anonymous", [role]))`.
1338    ///
1339    /// # Examples
1340    ///
1341    /// ```
1342    /// use grafeo_engine::{GrafeoDB, auth::Role};
1343    ///
1344    /// let db = GrafeoDB::new_in_memory();
1345    /// let reader = db.session_with_role(Role::ReadOnly);
1346    /// ```
1347    #[must_use]
1348    pub fn session_with_role(&self, role: crate::auth::Role) -> Session {
1349        self.session_with_identity(crate::auth::Identity::new("anonymous", [role]))
1350    }
1351
1352    /// Creates a session with an explicit CDC override.
1353    ///
1354    /// When `cdc_enabled` is `true`, mutations in this session are tracked
1355    /// regardless of the database default. When `false`, mutations are not
1356    /// tracked regardless of the database default.
1357    ///
1358    /// # Examples
1359    ///
1360    /// ```
1361    /// use grafeo_engine::GrafeoDB;
1362    ///
1363    /// let db = GrafeoDB::new_in_memory();
1364    ///
1365    /// // Opt in to CDC for just this session
1366    /// let tracked = db.session_with_cdc(true);
1367    /// tracked.execute("INSERT (:Person {name: 'Alix'})")?;
1368    /// # Ok::<(), grafeo_common::utils::error::Error>(())
1369    /// ```
1370    #[cfg(feature = "cdc")]
1371    #[must_use]
1372    pub fn session_with_cdc(&self, cdc_enabled: bool) -> Session {
1373        self.create_session_inner(Some(cdc_enabled))
1374    }
1375
1376    /// Creates a read-only session regardless of the database's access mode.
1377    ///
1378    /// Mutations executed through this session will fail with
1379    /// `TransactionError::ReadOnly`. Useful for replication replicas where
1380    /// the database itself must remain writable (for applying CDC changes)
1381    /// but client-facing queries must be read-only.
1382    ///
1383    /// **Deprecated**: Use `session_with_role(Role::ReadOnly)` instead.
1384    #[deprecated(
1385        since = "0.5.36",
1386        note = "use session_with_role(Role::ReadOnly) instead"
1387    )]
1388    #[must_use]
1389    pub fn session_read_only(&self) -> Session {
1390        self.session_with_role(crate::auth::Role::ReadOnly)
1391    }
1392
1393    /// Shared session creation logic.
1394    ///
1395    /// `cdc_override` overrides the database-wide `cdc_enabled` default when
1396    /// `Some`. `None` falls back to the database default.
1397    #[allow(unused_variables)] // cdc_override unused when cdc feature is off
1398    fn create_session_inner(&self, cdc_override: Option<bool>) -> Session {
1399        self.create_session_inner_full(cdc_override, false, crate::auth::Identity::anonymous())
1400    }
1401
1402    /// Shared session creation with all overrides.
1403    #[allow(unused_variables)]
1404    fn create_session_inner_full(
1405        &self,
1406        cdc_override: Option<bool>,
1407        force_read_only: bool,
1408        identity: crate::auth::Identity,
1409    ) -> Session {
1410        let session_cfg = || crate::session::SessionConfig {
1411            transaction_manager: Arc::clone(&self.transaction_manager),
1412            query_cache: Arc::clone(&self.query_cache),
1413            catalog: Arc::clone(&self.catalog),
1414            adaptive_config: self.config.adaptive.clone(),
1415            factorized_execution: self.config.factorized_execution,
1416            graph_model: self.config.graph_model,
1417            query_timeout: self.config.query_timeout,
1418            commit_counter: Arc::clone(&self.commit_counter),
1419            gc_interval: self.config.gc_interval,
1420            read_only: self.read_only || force_read_only,
1421            identity: identity.clone(),
1422            #[cfg(feature = "lpg")]
1423            projections: Arc::clone(&self.projections),
1424        };
1425
1426        if let Some(ref ext_read) = self.external_read_store {
1427            return Session::with_external_store(
1428                Arc::clone(ext_read),
1429                self.external_write_store.as_ref().map(Arc::clone),
1430                session_cfg(),
1431            )
1432            .expect("arena allocation for external store session");
1433        }
1434
1435        #[cfg(all(feature = "lpg", feature = "triple-store"))]
1436        let mut session = Session::with_rdf_store_and_adaptive(
1437            Arc::clone(self.lpg_store()),
1438            Arc::clone(&self.rdf_store),
1439            session_cfg(),
1440        );
1441        #[cfg(all(feature = "lpg", not(feature = "triple-store")))]
1442        let mut session = Session::with_adaptive(Arc::clone(self.lpg_store()), session_cfg());
1443        #[cfg(not(feature = "lpg"))]
1444        let mut session =
1445            Session::with_external_store(self.graph_store(), self.graph_store_mut(), session_cfg())
1446                .expect("session creation for non-lpg build");
1447
1448        #[cfg(all(feature = "wal", feature = "lpg"))]
1449        if let Some(ref wal) = self.wal {
1450            session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
1451        }
1452
1453        #[cfg(feature = "cdc")]
1454        {
1455            let should_enable = cdc_override.unwrap_or_else(|| self.cdc_active());
1456            if should_enable {
1457                session.set_cdc_log(Arc::clone(&self.cdc_log));
1458            }
1459        }
1460
1461        #[cfg(feature = "metrics")]
1462        {
1463            if let Some(ref m) = self.metrics {
1464                session.set_metrics(Arc::clone(m));
1465                m.session_created
1466                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1467                m.session_active
1468                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1469            }
1470        }
1471
1472        // Propagate persistent graph context to the new session
1473        if let Some(ref graph) = *self.current_graph.read() {
1474            session.use_graph(graph);
1475        }
1476
1477        // Propagate persistent schema context to the new session
1478        if let Some(ref schema) = *self.current_schema.read() {
1479            session.set_schema(schema);
1480        }
1481
1482        // Suppress unused_mut when cdc/wal are disabled
1483        let _ = &mut session;
1484
1485        session
1486    }
1487
1488    /// Returns the current graph name, if any.
1489    ///
1490    /// This is the persistent graph context used by one-shot `execute()` calls.
1491    /// It is updated whenever `execute()` encounters `USE GRAPH`, `SESSION SET GRAPH`,
1492    /// or `SESSION RESET`.
1493    #[must_use]
1494    pub fn current_graph(&self) -> Option<String> {
1495        self.current_graph.read().clone()
1496    }
1497
1498    /// Sets the current graph context for subsequent one-shot `execute()` calls.
1499    ///
1500    /// This is equivalent to running `USE GRAPH <name>` but without creating a session.
1501    /// Pass `None` to reset to the default graph.
1502    ///
1503    /// # Errors
1504    ///
1505    /// Returns an error if the named graph does not exist.
1506    pub fn set_current_graph(&self, name: Option<&str>) -> Result<()> {
1507        #[cfg(feature = "lpg")]
1508        if let Some(name) = name
1509            && !name.eq_ignore_ascii_case("default")
1510            && let Some(store) = &self.store
1511            && store.graph(name).is_none()
1512        {
1513            return Err(Error::Query(QueryError::new(
1514                QueryErrorKind::Semantic,
1515                format!("Graph '{name}' does not exist"),
1516            )));
1517        }
1518        *self.current_graph.write() = name.map(ToString::to_string);
1519        Ok(())
1520    }
1521
1522    /// Returns the current schema name, if any.
1523    ///
1524    /// This is the persistent schema context used by one-shot `execute()` calls.
1525    /// It is updated whenever `execute()` encounters `SESSION SET SCHEMA` or `SESSION RESET`.
1526    #[must_use]
1527    pub fn current_schema(&self) -> Option<String> {
1528        self.current_schema.read().clone()
1529    }
1530
1531    /// Sets the current schema context for subsequent one-shot `execute()` calls.
1532    ///
1533    /// This is equivalent to running `SESSION SET SCHEMA <name>` but without creating
1534    /// a session. Pass `None` to clear the schema context.
1535    ///
1536    /// # Errors
1537    ///
1538    /// Returns an error if the named schema does not exist.
1539    pub fn set_current_schema(&self, name: Option<&str>) -> Result<()> {
1540        if let Some(name) = name
1541            && !self.catalog.schema_exists(name)
1542        {
1543            return Err(Error::Query(QueryError::new(
1544                QueryErrorKind::Semantic,
1545                format!("Schema '{name}' does not exist"),
1546            )));
1547        }
1548        *self.current_schema.write() = name.map(ToString::to_string);
1549        Ok(())
1550    }
1551
1552    /// Returns the adaptive execution configuration.
1553    #[must_use]
1554    pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
1555        &self.config.adaptive
1556    }
1557
1558    /// Returns `true` if this database was opened in read-only mode.
1559    #[must_use]
1560    pub fn is_read_only(&self) -> bool {
1561        self.read_only
1562    }
1563
1564    /// Returns the configuration.
1565    #[must_use]
1566    pub fn config(&self) -> &Config {
1567        &self.config
1568    }
1569
1570    /// Returns the graph data model of this database.
1571    #[must_use]
1572    pub fn graph_model(&self) -> crate::config::GraphModel {
1573        self.config.graph_model
1574    }
1575
1576    /// Returns the configured memory limit in bytes, if any.
1577    #[must_use]
1578    pub fn memory_limit(&self) -> Option<usize> {
1579        self.config.memory_limit
1580    }
1581
1582    /// Returns a point-in-time snapshot of all metrics.
1583    ///
1584    /// If the `metrics` feature is disabled or the registry is not
1585    /// initialized, returns a default (all-zero) snapshot.
1586    #[cfg(feature = "metrics")]
1587    #[must_use]
1588    pub fn metrics(&self) -> crate::metrics::MetricsSnapshot {
1589        let mut snapshot = self
1590            .metrics
1591            .as_ref()
1592            .map_or_else(crate::metrics::MetricsSnapshot::default, |m| m.snapshot());
1593
1594        // Augment with cache stats from the query cache (not tracked in the registry)
1595        let cache_stats = self.query_cache.stats();
1596        snapshot.cache_hits = cache_stats.parsed_hits + cache_stats.optimized_hits;
1597        snapshot.cache_misses = cache_stats.parsed_misses + cache_stats.optimized_misses;
1598        snapshot.cache_size = cache_stats.parsed_size + cache_stats.optimized_size;
1599        snapshot.cache_invalidations = cache_stats.invalidations;
1600
1601        snapshot
1602    }
1603
1604    /// Returns all metrics in Prometheus text exposition format.
1605    ///
1606    /// The output is ready to serve from an HTTP `/metrics` endpoint.
1607    #[cfg(feature = "metrics")]
1608    #[must_use]
1609    pub fn metrics_prometheus(&self) -> String {
1610        self.metrics
1611            .as_ref()
1612            .map_or_else(String::new, |m| m.to_prometheus())
1613    }
1614
1615    /// Resets all metrics counters and histograms to zero.
1616    #[cfg(feature = "metrics")]
1617    pub fn reset_metrics(&self) {
1618        if let Some(ref m) = self.metrics {
1619            m.reset();
1620        }
1621        self.query_cache.reset_stats();
1622    }
1623
1624    /// Returns the underlying (default) store.
1625    ///
1626    /// This provides direct access to the LPG store for algorithm implementations
1627    /// and admin operations (index management, schema introspection, MVCC internals).
1628    ///
1629    /// For code that only needs read/write graph operations, prefer
1630    /// [`graph_store()`](Self::graph_store) which returns the trait interface.
1631    #[cfg(feature = "lpg")]
1632    #[must_use]
1633    pub fn store(&self) -> &Arc<LpgStore> {
1634        self.lpg_store()
1635    }
1636
1637    // === Named Graph Management ===
1638
1639    /// Creates a named graph. Returns `true` if created, `false` if it already exists.
1640    ///
1641    /// # Errors
1642    ///
1643    /// Returns an error if arena allocation fails.
1644    #[cfg(feature = "lpg")]
1645    pub fn create_graph(&self, name: &str) -> Result<bool> {
1646        Ok(self.lpg_store().create_graph(name)?)
1647    }
1648
1649    /// Drops a named graph. Returns `true` if dropped, `false` if it did not exist.
1650    ///
1651    /// If the dropped graph was the active graph context, the context is reset
1652    /// to the default graph.
1653    #[cfg(feature = "lpg")]
1654    pub fn drop_graph(&self, name: &str) -> bool {
1655        let Some(store) = &self.store else {
1656            return false;
1657        };
1658        let dropped = store.drop_graph(name);
1659        if dropped {
1660            let mut current = self.current_graph.write();
1661            if current
1662                .as_deref()
1663                .is_some_and(|g| g.eq_ignore_ascii_case(name))
1664            {
1665                *current = None;
1666            }
1667        }
1668        dropped
1669    }
1670
1671    /// Returns all named graph names.
1672    #[cfg(feature = "lpg")]
1673    #[must_use]
1674    pub fn list_graphs(&self) -> Vec<String> {
1675        self.lpg_store().graph_names()
1676    }
1677
1678    // === Graph Projections ===
1679
1680    /// Creates a named graph projection (virtual subgraph).
1681    ///
1682    /// The projection filters the graph store to only include nodes with the
1683    /// specified labels and edges with the specified types. Returns `true` if
1684    /// created, `false` if a projection with that name already exists.
1685    ///
1686    /// # Examples
1687    ///
1688    /// ```
1689    /// use grafeo_engine::GrafeoDB;
1690    /// use grafeo_core::graph::ProjectionSpec;
1691    ///
1692    /// let db = GrafeoDB::new_in_memory();
1693    /// let spec = ProjectionSpec::new()
1694    ///     .with_node_labels(["Person", "City"])
1695    ///     .with_edge_types(["LIVES_IN"]);
1696    /// assert!(db.create_projection("social", spec));
1697    /// ```
1698    pub fn create_projection(
1699        &self,
1700        name: impl Into<String>,
1701        spec: grafeo_core::graph::ProjectionSpec,
1702    ) -> bool {
1703        use grafeo_core::graph::GraphProjection;
1704        use std::collections::hash_map::Entry;
1705
1706        let store = self.graph_store();
1707        let projection = Arc::new(GraphProjection::new(store, spec));
1708        let mut projections = self.projections.write();
1709        match projections.entry(name.into()) {
1710            Entry::Occupied(_) => false,
1711            Entry::Vacant(e) => {
1712                e.insert(projection);
1713                true
1714            }
1715        }
1716    }
1717
1718    /// Drops a named graph projection. Returns `true` if it existed.
1719    pub fn drop_projection(&self, name: &str) -> bool {
1720        self.projections.write().remove(name).is_some()
1721    }
1722
1723    /// Returns the names of all graph projections.
1724    #[must_use]
1725    pub fn list_projections(&self) -> Vec<String> {
1726        self.projections.read().keys().cloned().collect()
1727    }
1728
1729    /// Returns a named projection as a [`GraphStore`] trait object.
1730    #[must_use]
1731    pub fn projection(&self, name: &str) -> Option<Arc<dyn GraphStore>> {
1732        self.projections
1733            .read()
1734            .get(name)
1735            .map(|p| Arc::clone(p) as Arc<dyn GraphStore>)
1736    }
1737
1738    /// Returns the graph store as a trait object.
1739    ///
1740    /// Returns a read-only trait object for the active graph store.
1741    ///
1742    /// This provides the [`GraphStore`] interface for code that only needs
1743    /// read operations. For write access, use [`graph_store_mut()`](Self::graph_store_mut).
1744    ///
1745    /// [`GraphStore`]: grafeo_core::graph::GraphStore
1746    #[must_use]
1747    pub fn graph_store(&self) -> Arc<dyn GraphStore> {
1748        if let Some(ref ext_read) = self.external_read_store {
1749            Arc::clone(ext_read)
1750        } else {
1751            #[cfg(feature = "lpg")]
1752            {
1753                Arc::clone(self.lpg_store()) as Arc<dyn GraphStore>
1754            }
1755            #[cfg(not(feature = "lpg"))]
1756            unreachable!("no graph store available: enable the `lpg` feature or use with_store()")
1757        }
1758    }
1759
1760    /// Returns the writable graph store, if available.
1761    ///
1762    /// Returns `None` for read-only databases created via
1763    /// [`with_read_store()`](Self::with_read_store).
1764    #[must_use]
1765    pub fn graph_store_mut(&self) -> Option<Arc<dyn GraphStoreMut>> {
1766        if self.external_read_store.is_some() {
1767            self.external_write_store.as_ref().map(Arc::clone)
1768        } else {
1769            #[cfg(feature = "lpg")]
1770            {
1771                Some(Arc::clone(self.lpg_store()) as Arc<dyn GraphStoreMut>)
1772            }
1773            #[cfg(not(feature = "lpg"))]
1774            {
1775                None
1776            }
1777        }
1778    }
1779
1780    /// Garbage collects old MVCC versions that are no longer visible.
1781    ///
1782    /// Determines the minimum epoch required by active transactions and prunes
1783    /// version chains older than that threshold. Also cleans up completed
1784    /// transaction metadata in the transaction manager, and prunes the CDC
1785    /// event log according to its retention policy.
1786    pub fn gc(&self) {
1787        #[cfg(feature = "lpg")]
1788        let current_epoch = {
1789            let min_epoch = self.transaction_manager.min_active_epoch();
1790            self.lpg_store().gc_versions(min_epoch);
1791            self.transaction_manager.current_epoch()
1792        };
1793        self.transaction_manager.gc();
1794
1795        // Prune CDC events based on retention config (epoch + count limits)
1796        #[cfg(feature = "cdc")]
1797        if self.cdc_enabled.load(std::sync::atomic::Ordering::Relaxed) {
1798            #[cfg(feature = "lpg")]
1799            self.cdc_log.apply_retention(current_epoch);
1800        }
1801    }
1802
1803    /// Returns the buffer manager for memory-aware operations.
1804    #[must_use]
1805    pub fn buffer_manager(&self) -> &Arc<BufferManager> {
1806        &self.buffer_manager
1807    }
1808
1809    /// Returns the query cache.
1810    #[must_use]
1811    pub fn query_cache(&self) -> &Arc<QueryCache> {
1812        &self.query_cache
1813    }
1814
1815    /// Clears all cached query plans.
1816    ///
1817    /// This is called automatically after DDL operations, but can also be
1818    /// invoked manually after external schema changes (e.g., WAL replay,
1819    /// import) or when you want to force re-optimization of all queries.
1820    pub fn clear_plan_cache(&self) {
1821        self.query_cache.clear();
1822    }
1823
1824    // =========================================================================
1825    // Lifecycle
1826    // =========================================================================
1827
1828    /// Closes the database, flushing all pending writes.
1829    ///
1830    /// For persistent databases, this ensures everything is safely on disk.
1831    /// Called automatically when the database is dropped, but you can call
1832    /// it explicitly if you need to guarantee durability at a specific point.
1833    ///
1834    /// # Errors
1835    ///
1836    /// Returns an error if the WAL can't be flushed (check disk space/permissions).
1837    pub fn close(&self) -> Result<()> {
1838        let mut is_open = self.is_open.write();
1839        if !*is_open {
1840            return Ok(());
1841        }
1842
1843        // Stop the periodic checkpoint timer first, even for read-only databases.
1844        // compact() can switch a writable DB to read-only after the timer started,
1845        // so the timer must be stopped before any early return to avoid racing
1846        // with the closed file manager.
1847        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
1848        if let Some(mut timer) = self.checkpoint_timer.lock().take() {
1849            timer.stop();
1850        }
1851
1852        // Read-only databases: just release the shared lock, no checkpointing
1853        if self.read_only {
1854            #[cfg(feature = "grafeo-file")]
1855            if let Some(ref fm) = self.file_manager {
1856                fm.close()?;
1857            }
1858            *is_open = false;
1859            return Ok(());
1860        }
1861
1862        // For single-file format: checkpoint to .grafeo file, then clean up sidecar WAL.
1863        // We must do this BEFORE the WAL close path because checkpoint_to_file
1864        // removes the sidecar WAL directory.
1865        #[cfg(feature = "grafeo-file")]
1866        let is_single_file = self.file_manager.is_some();
1867        #[cfg(not(feature = "grafeo-file"))]
1868        let is_single_file = false;
1869
1870        #[cfg(feature = "grafeo-file")]
1871        if let Some(ref fm) = self.file_manager {
1872            // Flush WAL first so all records are on disk before we snapshot
1873            #[cfg(feature = "wal")]
1874            if let Some(ref wal) = self.wal {
1875                wal.sync()?;
1876            }
1877            let flush_result = self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?;
1878
1879            // Safety check: if WAL has records but the checkpoint was a no-op
1880            // (zero sections written), the container file may not contain the
1881            // latest data. This can happen when sections are not marked dirty
1882            // despite mutations going through the WAL. Force-dirty all sections
1883            // and retry before removing the sidecar.
1884            #[cfg(feature = "wal")]
1885            let flush_result = if flush_result.sections_written == 0 {
1886                if let Some(ref wal) = self.wal {
1887                    if wal.record_count() > 0 {
1888                        grafeo_warn!(
1889                            "WAL has {} records but checkpoint wrote 0 sections; retrying with forced flush",
1890                            wal.record_count()
1891                        );
1892                        self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?
1893                    } else {
1894                        flush_result
1895                    }
1896                } else {
1897                    flush_result
1898                }
1899            } else {
1900                flush_result
1901            };
1902
1903            // Release WAL file handles before removing sidecar directory.
1904            // On Windows, open handles prevent directory deletion.
1905            #[cfg(feature = "wal")]
1906            if let Some(ref wal) = self.wal {
1907                wal.close_active_log();
1908            }
1909
1910            // Only remove the sidecar WAL after verifying the checkpoint wrote
1911            // data to the container. If nothing was written and the WAL had
1912            // records, keep the sidecar so the next open can recover from it.
1913            #[cfg(feature = "wal")]
1914            let has_wal_records = self.wal.as_ref().is_some_and(|wal| wal.record_count() > 0);
1915            #[cfg(not(feature = "wal"))]
1916            let has_wal_records = false;
1917
1918            if flush_result.sections_written > 0 || !has_wal_records {
1919                {
1920                    use grafeo_common::testing::crash::maybe_crash;
1921                    maybe_crash("close:before_remove_sidecar_wal");
1922                }
1923                fm.remove_sidecar_wal()?;
1924            } else {
1925                grafeo_warn!(
1926                    "keeping sidecar WAL for recovery: checkpoint wrote 0 sections but WAL has records"
1927                );
1928            }
1929            fm.close()?;
1930        }
1931
1932        // Commit and sync WAL (legacy directory format only).
1933        // We intentionally do NOT call wal.checkpoint() here. Directory format
1934        // has no snapshot: the WAL files are the sole source of truth. Writing
1935        // checkpoint.meta would cause recovery to skip older WAL files, losing
1936        // all data that predates the current log sequence.
1937        #[cfg(feature = "wal")]
1938        if !is_single_file && let Some(ref wal) = self.wal {
1939            // Use the last assigned transaction ID, or create one for the commit record
1940            let commit_tx = self
1941                .transaction_manager
1942                .last_assigned_transaction_id()
1943                .unwrap_or_else(|| self.transaction_manager.begin());
1944
1945            // Log a TransactionCommit to mark all pending records as committed
1946            wal.log(&WalRecord::TransactionCommit {
1947                transaction_id: commit_tx,
1948            })?;
1949
1950            wal.sync()?;
1951        }
1952
1953        *is_open = false;
1954        Ok(())
1955    }
1956
1957    /// Returns the typed WAL if available.
1958    #[cfg(feature = "wal")]
1959    #[must_use]
1960    pub fn wal(&self) -> Option<&Arc<LpgWal>> {
1961        self.wal.as_ref()
1962    }
1963
1964    /// Logs a WAL record if WAL is enabled.
1965    #[cfg(feature = "wal")]
1966    pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
1967        if let Some(ref wal) = self.wal {
1968            wal.log(record)?;
1969        }
1970        Ok(())
1971    }
1972
1973    /// Registers storage sections as [`MemoryConsumer`]s with the BufferManager.
1974    ///
1975    /// Each section reports its memory usage to the buffer manager, enabling
1976    /// accurate pressure tracking. Called once after database construction.
1977    fn register_section_consumers(&mut self) {
1978        #[cfg(feature = "lpg")]
1979        let store_ref = self.store.as_ref();
1980        #[cfg(not(feature = "lpg"))]
1981        // LPG store section
1982        #[cfg(feature = "lpg")]
1983        if let Some(store) = store_ref {
1984            let lpg = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
1985            self.buffer_manager.register_consumer(Arc::new(
1986                section_consumer::SectionConsumer::new(Arc::new(lpg)),
1987            ));
1988        }
1989
1990        // RDF store: only when data exists
1991        #[cfg(feature = "triple-store")]
1992        if !self.rdf_store.is_empty() || self.rdf_store.graph_count() > 0 {
1993            let rdf = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(&self.rdf_store));
1994            self.buffer_manager.register_consumer(Arc::new(
1995                section_consumer::SectionConsumer::new(Arc::new(rdf)),
1996            ));
1997        }
1998
1999        // Ring Index: only when Ring has been built
2000        #[cfg(feature = "ring-index")]
2001        if self.rdf_store.ring().is_some() {
2002            let ring = grafeo_core::index::ring::RdfRingSection::new(Arc::clone(&self.rdf_store));
2003            self.buffer_manager.register_consumer(Arc::new(
2004                section_consumer::SectionConsumer::new(Arc::new(ring)),
2005            ));
2006        }
2007
2008        // Vector indexes: dynamic consumer that re-queries the store on each
2009        // memory_usage() call, so dropped indexes are freed and new ones tracked.
2010        #[cfg(all(
2011            feature = "lpg",
2012            feature = "vector-index",
2013            feature = "mmap",
2014            not(feature = "temporal")
2015        ))]
2016        if let Some(store) = store_ref {
2017            let spill_path = self.buffer_manager.config().spill_path.clone();
2018            let consumer = Arc::new(section_consumer::VectorIndexConsumer::new(
2019                store, spill_path,
2020            ));
2021            // Share the spill registry with the search path
2022            self.vector_spill_storages = Some(Arc::clone(consumer.spilled_storages()));
2023            self.buffer_manager.register_consumer(consumer);
2024        }
2025
2026        // Text indexes: same dynamic approach as vector indexes.
2027        #[cfg(all(feature = "lpg", feature = "text-index"))]
2028        if let Some(store) = store_ref {
2029            self.buffer_manager
2030                .register_consumer(Arc::new(section_consumer::TextIndexConsumer::new(store)));
2031        }
2032
2033        // CDC log: register as memory consumer so the buffer manager can
2034        // prune events under memory pressure.
2035        #[cfg(feature = "cdc")]
2036        self.buffer_manager.register_consumer(
2037            Arc::clone(&self.cdc_log) as Arc<dyn grafeo_common::memory::MemoryConsumer>
2038        );
2039    }
2040
2041    /// Discovers and re-opens spill files from a previous session.
2042    ///
2043    /// When the database was closed with spilled vector embeddings, the
2044    /// `vectors_*.bin` files persist in the spill directory. This method
2045    /// scans for them, opens each as `MmapStorage`, and registers them
2046    /// in the `vector_spill_storages` map so search can read from them.
2047    #[cfg(all(
2048        feature = "lpg",
2049        feature = "vector-index",
2050        feature = "mmap",
2051        not(feature = "temporal")
2052    ))]
2053    fn restore_spill_files(&mut self) {
2054        use grafeo_core::index::vector::MmapStorage;
2055
2056        let spill_dir = match self.buffer_manager.config().spill_path {
2057            Some(ref path) => path.clone(),
2058            None => return,
2059        };
2060
2061        if !spill_dir.exists() {
2062            return;
2063        }
2064
2065        let spill_map = match self.vector_spill_storages {
2066            Some(ref map) => Arc::clone(map),
2067            None => return,
2068        };
2069
2070        let Ok(entries) = std::fs::read_dir(&spill_dir) else {
2071            return;
2072        };
2073
2074        let Some(ref store) = self.store else {
2075            return;
2076        };
2077
2078        for entry in entries.flatten() {
2079            let path = entry.path();
2080            let file_name = match path.file_name().and_then(|n| n.to_str()) {
2081                Some(name) => name.to_string(),
2082                None => continue,
2083            };
2084
2085            // Match pattern: vectors_{key}.bin where key is percent-encoded
2086            if !file_name.starts_with("vectors_")
2087                || !std::path::Path::new(&file_name)
2088                    .extension()
2089                    .is_some_and(|ext| ext.eq_ignore_ascii_case("bin"))
2090            {
2091                continue;
2092            }
2093
2094            // Extract and decode key: "vectors_Label%3Aembedding.bin" -> "Label:embedding"
2095            let key_part = &file_name["vectors_".len()..file_name.len() - ".bin".len()];
2096
2097            // Percent-decode: %3A -> ':', %25 -> '%'
2098            let key = key_part.replace("%3A", ":").replace("%25", "%");
2099
2100            // Key must contain ':' (label:property format)
2101            if !key.contains(':') {
2102                // Legacy file with old encoding, skip (will be re-created on next spill)
2103                continue;
2104            }
2105
2106            // Only restore if the corresponding vector index exists
2107            if store.get_vector_index_by_key(&key).is_none() {
2108                // Stale spill file (index was dropped), clean it up
2109                let _ = std::fs::remove_file(&path);
2110                continue;
2111            }
2112
2113            // Open the MmapStorage
2114            match MmapStorage::open(&path) {
2115                Ok(mmap_storage) => {
2116                    // Mark the property column as spilled so get() returns None
2117                    let property = key.split(':').nth(1).unwrap_or("");
2118                    let prop_key = grafeo_common::types::PropertyKey::new(property);
2119                    store.node_properties_mark_spilled(&prop_key);
2120
2121                    spill_map.write().insert(key, Arc::new(mmap_storage));
2122                }
2123                Err(e) => {
2124                    eprintln!("failed to restore spill file {}: {e}", path.display());
2125                    // Remove corrupt spill file
2126                    let _ = std::fs::remove_file(&path);
2127                }
2128            }
2129        }
2130    }
2131
2132    /// Builds section objects for the current database state.
2133    #[cfg(feature = "grafeo-file")]
2134    fn build_sections(&self) -> Vec<Box<dyn grafeo_common::storage::Section>> {
2135        let mut sections: Vec<Box<dyn grafeo_common::storage::Section>> = Vec::new();
2136
2137        // LPG sections: store, catalog, vector indexes, text indexes
2138        #[cfg(feature = "lpg")]
2139        if let Some(store) = self.store.as_ref() {
2140            let lpg = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
2141
2142            let catalog = catalog_section::CatalogSection::new(
2143                Arc::clone(&self.catalog),
2144                Arc::clone(store),
2145                {
2146                    let tm = Arc::clone(&self.transaction_manager);
2147                    move || tm.current_epoch().as_u64()
2148                },
2149            );
2150
2151            sections.push(Box::new(catalog));
2152            sections.push(Box::new(lpg));
2153
2154            // Vector indexes: persist HNSW topology to avoid rebuild on load
2155            #[cfg(feature = "vector-index")]
2156            {
2157                let indexes = store.vector_index_entries();
2158                if !indexes.is_empty() {
2159                    let vector = grafeo_core::index::vector::VectorStoreSection::new(indexes);
2160                    sections.push(Box::new(vector));
2161                }
2162            }
2163
2164            // Text indexes: persist BM25 postings to avoid rebuild on load
2165            #[cfg(feature = "text-index")]
2166            {
2167                let indexes = store.text_index_entries();
2168                if !indexes.is_empty() {
2169                    let text = grafeo_core::index::text::TextIndexSection::new(indexes);
2170                    sections.push(Box::new(text));
2171                }
2172            }
2173        }
2174
2175        #[cfg(feature = "triple-store")]
2176        if !self.rdf_store.is_empty() || self.rdf_store.graph_count() > 0 {
2177            let rdf = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(&self.rdf_store));
2178            sections.push(Box::new(rdf));
2179        }
2180
2181        #[cfg(feature = "ring-index")]
2182        if self.rdf_store.ring().is_some() {
2183            let ring = grafeo_core::index::ring::RdfRingSection::new(Arc::clone(&self.rdf_store));
2184            sections.push(Box::new(ring));
2185        }
2186
2187        sections
2188    }
2189
2190    // =========================================================================
2191    // Backup API
2192    // =========================================================================
2193
2194    /// Creates a full backup of the database in the given directory.
2195    ///
2196    /// Checkpoints the database, copies the `.grafeo` file, and creates a
2197    /// backup manifest. Subsequent incremental backups will use this as the
2198    /// base.
2199    ///
2200    /// # Errors
2201    ///
2202    /// Returns an error if the database has no file manager or I/O fails.
2203    #[cfg(all(feature = "wal", feature = "grafeo-file", feature = "lpg"))]
2204    pub fn backup_full(&self, backup_dir: &std::path::Path) -> Result<backup::BackupSegment> {
2205        let fm = self
2206            .file_manager
2207            .as_ref()
2208            .ok_or_else(|| Error::Internal("backup requires a persistent database".to_string()))?;
2209
2210        // Checkpoint to ensure the container has the latest data.
2211        // Skip for read-only databases: the on-disk file is already a valid
2212        // snapshot and the file manager rejects writes.
2213        if !self.read_only {
2214            let _ = self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?;
2215        }
2216
2217        let current_epoch = self.transaction_manager.current_epoch();
2218        backup::do_backup_full(backup_dir, fm, self.wal.as_deref(), current_epoch)
2219    }
2220
2221    /// Creates an incremental backup containing WAL records since the last backup.
2222    ///
2223    /// Requires a prior full backup in the backup directory.
2224    ///
2225    /// # Errors
2226    ///
2227    /// Returns an error if no full backup exists, or if the WAL has no new records.
2228    #[cfg(all(feature = "wal", feature = "grafeo-file", feature = "lpg"))]
2229    pub fn backup_incremental(
2230        &self,
2231        backup_dir: &std::path::Path,
2232    ) -> Result<backup::BackupSegment> {
2233        let wal = self
2234            .wal
2235            .as_ref()
2236            .ok_or_else(|| Error::Internal("incremental backup requires WAL".to_string()))?;
2237
2238        let current_epoch = self.transaction_manager.current_epoch();
2239        backup::do_backup_incremental(backup_dir, wal, current_epoch)
2240    }
2241
2242    /// Returns the backup manifest for a backup directory, if one exists.
2243    ///
2244    /// # Errors
2245    ///
2246    /// Returns an error if the manifest file exists but cannot be parsed.
2247    #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2248    pub fn read_backup_manifest(
2249        backup_dir: &std::path::Path,
2250    ) -> Result<Option<backup::BackupManifest>> {
2251        backup::read_manifest(backup_dir)
2252    }
2253
2254    /// Returns the current backup cursor (last backed-up position), if any.
2255    #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2256    #[must_use]
2257    pub fn backup_cursor(&self) -> Option<backup::BackupCursor> {
2258        self.wal
2259            .as_ref()
2260            .and_then(|wal| backup::read_backup_cursor(wal.dir()).ok().flatten())
2261    }
2262
2263    /// Restores a database from a backup chain to a specific epoch.
2264    ///
2265    /// Copies the full backup to `output_path`, then replays incremental
2266    /// WAL segments up to `target_epoch`. The restored database can be
2267    /// opened with [`GrafeoDB::open`].
2268    ///
2269    /// # Errors
2270    ///
2271    /// Returns an error if the backup chain does not cover the target epoch,
2272    /// segment checksums fail, or I/O fails.
2273    #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2274    pub fn restore_to_epoch(
2275        backup_dir: &std::path::Path,
2276        target_epoch: grafeo_common::types::EpochId,
2277        output_path: &std::path::Path,
2278    ) -> Result<()> {
2279        backup::do_restore_to_epoch(backup_dir, target_epoch, output_path)
2280    }
2281
2282    /// Writes the current database state to the `.grafeo` file using the unified flush.
2283    ///
2284    /// Does NOT remove the sidecar WAL: callers that want to clean up
2285    /// the sidecar (e.g. `close()`) should call `fm.remove_sidecar_wal()`
2286    /// separately after this returns.
2287    #[cfg(feature = "grafeo-file")]
2288    fn checkpoint_to_file(
2289        &self,
2290        fm: &GrafeoFileManager,
2291        reason: flush::FlushReason,
2292    ) -> Result<flush::FlushResult> {
2293        let sections = self.build_sections();
2294        let section_refs: Vec<&dyn grafeo_common::storage::Section> =
2295            sections.iter().map(|s| s.as_ref()).collect();
2296        #[cfg(feature = "lpg")]
2297        let context = flush::build_context(self.lpg_store(), &self.transaction_manager);
2298        #[cfg(not(feature = "lpg"))]
2299        let context = flush::build_context_minimal(&self.transaction_manager);
2300
2301        flush::flush(
2302            fm,
2303            &section_refs,
2304            &context,
2305            reason,
2306            #[cfg(feature = "wal")]
2307            self.wal.as_deref(),
2308        )
2309    }
2310
2311    /// Returns the file manager if using single-file format.
2312    #[cfg(feature = "grafeo-file")]
2313    #[must_use]
2314    pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
2315        self.file_manager.as_ref()
2316    }
2317}
2318
2319impl Drop for GrafeoDB {
2320    fn drop(&mut self) {
2321        if let Err(e) = self.close() {
2322            grafeo_error!("Error closing database: {}", e);
2323        }
2324    }
2325}
2326
2327#[cfg(feature = "lpg")]
2328impl crate::admin::AdminService for GrafeoDB {
2329    fn info(&self) -> crate::admin::DatabaseInfo {
2330        self.info()
2331    }
2332
2333    fn detailed_stats(&self) -> crate::admin::DatabaseStats {
2334        self.detailed_stats()
2335    }
2336
2337    fn schema(&self) -> crate::admin::SchemaInfo {
2338        self.schema()
2339    }
2340
2341    fn validate(&self) -> crate::admin::ValidationResult {
2342        self.validate()
2343    }
2344
2345    fn wal_status(&self) -> crate::admin::WalStatus {
2346        self.wal_status()
2347    }
2348
2349    fn wal_checkpoint(&self) -> Result<()> {
2350        self.wal_checkpoint()
2351    }
2352}
2353
2354// =========================================================================
2355// Query Result Types
2356// =========================================================================
2357
2358/// The result of running a query.
2359///
2360/// Contains rows and columns, like a table. Use [`iter()`](Self::iter) to
2361/// loop through rows, or [`scalar()`](Self::scalar) if you expect a single value.
2362///
2363/// # Examples
2364///
2365/// ```
2366/// use grafeo_engine::GrafeoDB;
2367///
2368/// let db = GrafeoDB::new_in_memory();
2369/// db.create_node(&["Person"]);
2370///
2371/// let result = db.execute("MATCH (p:Person) RETURN count(p) AS total")?;
2372///
2373/// // Check what we got
2374/// println!("Columns: {:?}", result.columns);
2375/// println!("Rows: {}", result.row_count());
2376///
2377/// // Iterate through results
2378/// for row in result.iter() {
2379///     println!("{:?}", row);
2380/// }
2381/// # Ok::<(), grafeo_common::utils::error::Error>(())
2382/// ```
2383#[derive(Debug)]
2384pub struct QueryResult {
2385    /// Column names from the RETURN clause.
2386    pub columns: Vec<String>,
2387    /// Column types - useful for distinguishing NodeId/EdgeId from plain integers.
2388    pub column_types: Vec<grafeo_common::types::LogicalType>,
2389    /// The actual result rows.
2390    ///
2391    /// Use [`rows()`](Self::rows) for borrowed access or
2392    /// [`into_rows()`](Self::into_rows) to take ownership.
2393    pub(crate) rows: Vec<Vec<grafeo_common::types::Value>>,
2394    /// Query execution time in milliseconds (if timing was enabled).
2395    pub execution_time_ms: Option<f64>,
2396    /// Number of rows scanned during query execution (estimate).
2397    pub rows_scanned: Option<u64>,
2398    /// Status message for DDL and session commands (e.g., "Created node type 'Person'").
2399    pub status_message: Option<String>,
2400    /// GQLSTATUS code per ISO/IEC 39075:2024, sec 23.
2401    pub gql_status: grafeo_common::utils::GqlStatus,
2402}
2403
2404impl QueryResult {
2405    /// Creates a fully empty query result (no columns, no rows).
2406    #[must_use]
2407    pub fn empty() -> Self {
2408        Self {
2409            columns: Vec::new(),
2410            column_types: Vec::new(),
2411            rows: Vec::new(),
2412            execution_time_ms: None,
2413            rows_scanned: None,
2414            status_message: None,
2415            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2416        }
2417    }
2418
2419    /// Creates a query result with only a status message (for DDL commands).
2420    #[must_use]
2421    pub fn status(msg: impl Into<String>) -> Self {
2422        Self {
2423            columns: Vec::new(),
2424            column_types: Vec::new(),
2425            rows: Vec::new(),
2426            execution_time_ms: None,
2427            rows_scanned: None,
2428            status_message: Some(msg.into()),
2429            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2430        }
2431    }
2432
2433    /// Creates a new empty query result.
2434    #[must_use]
2435    pub fn new(columns: Vec<String>) -> Self {
2436        let len = columns.len();
2437        Self {
2438            columns,
2439            column_types: vec![grafeo_common::types::LogicalType::Any; len],
2440            rows: Vec::new(),
2441            execution_time_ms: None,
2442            rows_scanned: None,
2443            status_message: None,
2444            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2445        }
2446    }
2447
2448    /// Creates a new empty query result with column types.
2449    #[must_use]
2450    pub fn with_types(
2451        columns: Vec<String>,
2452        column_types: Vec<grafeo_common::types::LogicalType>,
2453    ) -> Self {
2454        Self {
2455            columns,
2456            column_types,
2457            rows: Vec::new(),
2458            execution_time_ms: None,
2459            rows_scanned: None,
2460            status_message: None,
2461            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2462        }
2463    }
2464
2465    /// Creates a query result with pre-populated rows.
2466    #[must_use]
2467    pub fn from_rows(columns: Vec<String>, rows: Vec<Vec<grafeo_common::types::Value>>) -> Self {
2468        let len = columns.len();
2469        Self {
2470            columns,
2471            column_types: vec![grafeo_common::types::LogicalType::Any; len],
2472            rows,
2473            execution_time_ms: None,
2474            rows_scanned: None,
2475            status_message: None,
2476            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2477        }
2478    }
2479
2480    /// Appends a row to this result.
2481    pub fn push_row(&mut self, row: Vec<grafeo_common::types::Value>) {
2482        self.rows.push(row);
2483    }
2484
2485    /// Sets the execution metrics on this result.
2486    pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
2487        self.execution_time_ms = Some(execution_time_ms);
2488        self.rows_scanned = Some(rows_scanned);
2489        self
2490    }
2491
2492    /// Returns the execution time in milliseconds, if available.
2493    #[must_use]
2494    pub fn execution_time_ms(&self) -> Option<f64> {
2495        self.execution_time_ms
2496    }
2497
2498    /// Returns the number of rows scanned, if available.
2499    #[must_use]
2500    pub fn rows_scanned(&self) -> Option<u64> {
2501        self.rows_scanned
2502    }
2503
2504    /// Returns the number of rows.
2505    #[must_use]
2506    pub fn row_count(&self) -> usize {
2507        self.rows.len()
2508    }
2509
2510    /// Returns the number of columns.
2511    #[must_use]
2512    pub fn column_count(&self) -> usize {
2513        self.columns.len()
2514    }
2515
2516    /// Returns true if the result is empty.
2517    #[must_use]
2518    pub fn is_empty(&self) -> bool {
2519        self.rows.is_empty()
2520    }
2521
2522    /// Extracts a single value from the result.
2523    ///
2524    /// Use this when your query returns exactly one row with one column,
2525    /// like `RETURN count(n)` or `RETURN sum(p.amount)`.
2526    ///
2527    /// # Errors
2528    ///
2529    /// Returns an error if the result has multiple rows or columns.
2530    pub fn scalar<T: FromValue>(&self) -> Result<T> {
2531        if self.rows.len() != 1 || self.columns.len() != 1 {
2532            return Err(grafeo_common::utils::error::Error::InvalidValue(
2533                "Expected single value".to_string(),
2534            ));
2535        }
2536        T::from_value(&self.rows[0][0])
2537    }
2538
2539    /// Returns a slice of all result rows.
2540    #[must_use]
2541    pub fn rows(&self) -> &[Vec<grafeo_common::types::Value>] {
2542        &self.rows
2543    }
2544
2545    /// Takes ownership of all result rows.
2546    #[must_use]
2547    pub fn into_rows(self) -> Vec<Vec<grafeo_common::types::Value>> {
2548        self.rows
2549    }
2550
2551    /// Returns an iterator over the rows.
2552    pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
2553        self.rows.iter()
2554    }
2555
2556    /// Converts this query result to an Arrow [`RecordBatch`](arrow_array::RecordBatch).
2557    ///
2558    /// Each column in the result becomes an Arrow array. Type mapping:
2559    /// - `Int64` / `Float64` / `Bool` / `String` / `Bytes`: direct Arrow equivalents
2560    /// - `Timestamp` / `ZonedDatetime`: `Timestamp(Microsecond, UTC)`
2561    /// - `Date`: `Date32`, `Time`: `Time64(Nanosecond)`
2562    /// - `Vector`: `FixedSizeList(Float32, dim)`
2563    /// - `Duration` / `List` / `Map` / `Path`: serialized as `Utf8`
2564    ///
2565    /// Heterogeneous columns (mixed types) fall back to `Utf8`.
2566    ///
2567    /// # Errors
2568    ///
2569    /// Returns [`ArrowExportError`](arrow::ArrowExportError) if Arrow array construction fails.
2570    #[cfg(feature = "arrow-export")]
2571    pub fn to_record_batch(
2572        &self,
2573    ) -> std::result::Result<arrow_array::RecordBatch, arrow::ArrowExportError> {
2574        arrow::query_result_to_record_batch(&self.columns, &self.column_types, &self.rows)
2575    }
2576
2577    /// Serializes this query result as Arrow IPC stream bytes.
2578    ///
2579    /// The returned bytes can be read by any Arrow implementation:
2580    /// - Python: `pyarrow.ipc.open_stream(buf).read_all()`
2581    /// - Polars: `pl.read_ipc(buf)`
2582    /// - Node.js: `apache-arrow` `RecordBatchStreamReader`
2583    ///
2584    /// # Errors
2585    ///
2586    /// Returns [`ArrowExportError`](arrow::ArrowExportError) on conversion or serialization failure.
2587    #[cfg(feature = "arrow-export")]
2588    pub fn to_arrow_ipc(&self) -> std::result::Result<Vec<u8>, arrow::ArrowExportError> {
2589        let batch = self.to_record_batch()?;
2590        arrow::record_batch_to_ipc_stream(&batch)
2591    }
2592}
2593
2594impl std::fmt::Display for QueryResult {
2595    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2596        let table = grafeo_common::fmt::format_result_table(
2597            &self.columns,
2598            &self.rows,
2599            self.execution_time_ms,
2600            self.status_message.as_deref(),
2601        );
2602        f.write_str(&table)
2603    }
2604}
2605
2606/// Converts a [`grafeo_common::types::Value`] to a concrete Rust type.
2607///
2608/// Implemented for common types like `i64`, `f64`, `String`, and `bool`.
2609/// Used by [`QueryResult::scalar()`] to extract typed values.
2610pub trait FromValue: Sized {
2611    /// Attempts the conversion, returning an error on type mismatch.
2612    ///
2613    /// # Errors
2614    ///
2615    /// Returns `Error::TypeMismatch` if the value is not the expected type.
2616    fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
2617}
2618
2619impl FromValue for i64 {
2620    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2621        value
2622            .as_int64()
2623            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2624                expected: "INT64".to_string(),
2625                found: value.type_name().to_string(),
2626            })
2627    }
2628}
2629
2630impl FromValue for f64 {
2631    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2632        value
2633            .as_float64()
2634            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2635                expected: "FLOAT64".to_string(),
2636                found: value.type_name().to_string(),
2637            })
2638    }
2639}
2640
2641impl FromValue for String {
2642    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2643        value.as_str().map(String::from).ok_or_else(|| {
2644            grafeo_common::utils::error::Error::TypeMismatch {
2645                expected: "STRING".to_string(),
2646                found: value.type_name().to_string(),
2647            }
2648        })
2649    }
2650}
2651
2652impl FromValue for bool {
2653    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2654        value
2655            .as_bool()
2656            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2657                expected: "BOOL".to_string(),
2658                found: value.type_name().to_string(),
2659            })
2660    }
2661}
2662
2663#[cfg(test)]
2664mod tests {
2665    use super::*;
2666
2667    #[test]
2668    fn test_create_in_memory_database() {
2669        let db = GrafeoDB::new_in_memory();
2670        assert_eq!(db.node_count(), 0);
2671        assert_eq!(db.edge_count(), 0);
2672    }
2673
2674    #[test]
2675    fn test_database_config() {
2676        let config = Config::in_memory().with_threads(4).with_query_logging();
2677
2678        let db = GrafeoDB::with_config(config).unwrap();
2679        assert_eq!(db.config().threads, 4);
2680        assert!(db.config().query_logging);
2681    }
2682
2683    #[test]
2684    fn test_database_session() {
2685        let db = GrafeoDB::new_in_memory();
2686        let _session = db.session();
2687        // Session should be created successfully
2688    }
2689
2690    #[cfg(feature = "wal")]
2691    #[test]
2692    fn test_persistent_database_recovery() {
2693        use grafeo_common::types::Value;
2694        use tempfile::tempdir;
2695
2696        let dir = tempdir().unwrap();
2697        let db_path = dir.path().join("test_db");
2698
2699        // Create database and add some data
2700        {
2701            let db = GrafeoDB::open(&db_path).unwrap();
2702
2703            let alix = db.create_node(&["Person"]);
2704            db.set_node_property(alix, "name", Value::from("Alix"));
2705
2706            let gus = db.create_node(&["Person"]);
2707            db.set_node_property(gus, "name", Value::from("Gus"));
2708
2709            let _edge = db.create_edge(alix, gus, "KNOWS");
2710
2711            // Explicitly close to flush WAL
2712            db.close().unwrap();
2713        }
2714
2715        // Reopen and verify data was recovered
2716        {
2717            let db = GrafeoDB::open(&db_path).unwrap();
2718
2719            assert_eq!(db.node_count(), 2);
2720            assert_eq!(db.edge_count(), 1);
2721
2722            // Verify nodes exist
2723            let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
2724            assert!(node0.is_some());
2725
2726            let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
2727            assert!(node1.is_some());
2728        }
2729    }
2730
2731    #[cfg(feature = "wal")]
2732    #[test]
2733    fn test_wal_logging() {
2734        use tempfile::tempdir;
2735
2736        let dir = tempdir().unwrap();
2737        let db_path = dir.path().join("wal_test_db");
2738
2739        let db = GrafeoDB::open(&db_path).unwrap();
2740
2741        // Create some data
2742        let node = db.create_node(&["Test"]);
2743        db.delete_node(node);
2744
2745        // WAL should have records
2746        if let Some(wal) = db.wal() {
2747            assert!(wal.record_count() > 0);
2748        }
2749
2750        db.close().unwrap();
2751    }
2752
2753    #[cfg(feature = "wal")]
2754    #[test]
2755    fn test_wal_recovery_multiple_sessions() {
2756        // Tests that WAL recovery works correctly across multiple open/close cycles
2757        use grafeo_common::types::Value;
2758        use tempfile::tempdir;
2759
2760        let dir = tempdir().unwrap();
2761        let db_path = dir.path().join("multi_session_db");
2762
2763        // Session 1: Create initial data
2764        {
2765            let db = GrafeoDB::open(&db_path).unwrap();
2766            let alix = db.create_node(&["Person"]);
2767            db.set_node_property(alix, "name", Value::from("Alix"));
2768            db.close().unwrap();
2769        }
2770
2771        // Session 2: Add more data
2772        {
2773            let db = GrafeoDB::open(&db_path).unwrap();
2774            assert_eq!(db.node_count(), 1); // Previous data recovered
2775            let gus = db.create_node(&["Person"]);
2776            db.set_node_property(gus, "name", Value::from("Gus"));
2777            db.close().unwrap();
2778        }
2779
2780        // Session 3: Verify all data
2781        {
2782            let db = GrafeoDB::open(&db_path).unwrap();
2783            assert_eq!(db.node_count(), 2);
2784
2785            // Verify properties were recovered correctly
2786            let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
2787            assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
2788
2789            let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
2790            assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
2791        }
2792    }
2793
2794    #[cfg(feature = "wal")]
2795    #[test]
2796    fn test_database_consistency_after_mutations() {
2797        // Tests that database remains consistent after a series of create/delete operations
2798        use grafeo_common::types::Value;
2799        use tempfile::tempdir;
2800
2801        let dir = tempdir().unwrap();
2802        let db_path = dir.path().join("consistency_db");
2803
2804        {
2805            let db = GrafeoDB::open(&db_path).unwrap();
2806
2807            // Create nodes
2808            let a = db.create_node(&["Node"]);
2809            let b = db.create_node(&["Node"]);
2810            let c = db.create_node(&["Node"]);
2811
2812            // Create edges
2813            let e1 = db.create_edge(a, b, "LINKS");
2814            let _e2 = db.create_edge(b, c, "LINKS");
2815
2816            // Delete middle node and its edge
2817            db.delete_edge(e1);
2818            db.delete_node(b);
2819
2820            // Set properties on remaining nodes
2821            db.set_node_property(a, "value", Value::Int64(1));
2822            db.set_node_property(c, "value", Value::Int64(3));
2823
2824            db.close().unwrap();
2825        }
2826
2827        // Reopen and verify consistency
2828        {
2829            let db = GrafeoDB::open(&db_path).unwrap();
2830
2831            // Should have 2 nodes (a and c), b was deleted
2832            // Note: node_count includes deleted nodes in some implementations
2833            // What matters is that the non-deleted nodes are accessible
2834            let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
2835            assert!(node_a.is_some());
2836
2837            let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
2838            assert!(node_c.is_some());
2839
2840            // Middle node should be deleted
2841            let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
2842            assert!(node_b.is_none());
2843        }
2844    }
2845
2846    #[cfg(feature = "wal")]
2847    #[test]
2848    fn test_close_is_idempotent() {
2849        // Calling close() multiple times should not cause errors
2850        use tempfile::tempdir;
2851
2852        let dir = tempdir().unwrap();
2853        let db_path = dir.path().join("close_test_db");
2854
2855        let db = GrafeoDB::open(&db_path).unwrap();
2856        db.create_node(&["Test"]);
2857
2858        // First close should succeed
2859        assert!(db.close().is_ok());
2860
2861        // Second close should also succeed (idempotent)
2862        assert!(db.close().is_ok());
2863    }
2864
2865    #[test]
2866    fn test_with_store_external_backend() {
2867        use grafeo_core::graph::lpg::LpgStore;
2868
2869        let external = Arc::new(LpgStore::new().unwrap());
2870
2871        // Seed data on the external store directly
2872        let n1 = external.create_node(&["Person"]);
2873        external.set_node_property(n1, "name", grafeo_common::types::Value::from("Alix"));
2874
2875        let db = GrafeoDB::with_store(
2876            Arc::clone(&external) as Arc<dyn GraphStoreMut>,
2877            Config::in_memory(),
2878        )
2879        .unwrap();
2880
2881        let session = db.session();
2882
2883        // Session should see data from the external store via execute
2884        #[cfg(feature = "gql")]
2885        {
2886            let result = session.execute("MATCH (p:Person) RETURN p.name").unwrap();
2887            assert_eq!(result.rows.len(), 1);
2888        }
2889    }
2890
2891    #[test]
2892    fn test_with_config_custom_memory_limit() {
2893        let config = Config::in_memory().with_memory_limit(64 * 1024 * 1024); // 64 MB
2894
2895        let db = GrafeoDB::with_config(config).unwrap();
2896        assert_eq!(db.config().memory_limit, Some(64 * 1024 * 1024));
2897        assert_eq!(db.node_count(), 0);
2898    }
2899
2900    #[cfg(feature = "metrics")]
2901    #[test]
2902    fn test_database_metrics_registry() {
2903        let db = GrafeoDB::new_in_memory();
2904
2905        // Perform some operations
2906        db.create_node(&["Person"]);
2907        db.create_node(&["Person"]);
2908
2909        // Check that metrics snapshot returns data
2910        let snap = db.metrics();
2911        // Session created counter should reflect at least 0 (metrics is initialized)
2912        assert_eq!(snap.query_count, 0); // No queries executed yet
2913    }
2914
2915    #[test]
2916    fn test_query_result_has_metrics() {
2917        // Verifies that query results include execution metrics
2918        let db = GrafeoDB::new_in_memory();
2919        db.create_node(&["Person"]);
2920        db.create_node(&["Person"]);
2921
2922        #[cfg(feature = "gql")]
2923        {
2924            let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
2925
2926            // Metrics should be populated
2927            assert!(result.execution_time_ms.is_some());
2928            assert!(result.rows_scanned.is_some());
2929            assert!(result.execution_time_ms.unwrap() >= 0.0);
2930            assert_eq!(result.rows_scanned.unwrap(), 2);
2931        }
2932    }
2933
2934    #[test]
2935    fn test_empty_query_result_metrics() {
2936        // Verifies metrics are correct for queries returning no results
2937        let db = GrafeoDB::new_in_memory();
2938        db.create_node(&["Person"]);
2939
2940        #[cfg(feature = "gql")]
2941        {
2942            // Query that matches nothing
2943            let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
2944
2945            assert!(result.execution_time_ms.is_some());
2946            assert!(result.rows_scanned.is_some());
2947            assert_eq!(result.rows_scanned.unwrap(), 0);
2948        }
2949    }
2950
2951    #[cfg(feature = "cdc")]
2952    mod cdc_integration {
2953        use super::*;
2954
2955        /// Helper: creates an in-memory database with CDC enabled.
2956        fn cdc_db() -> GrafeoDB {
2957            GrafeoDB::with_config(Config::in_memory().with_cdc()).unwrap()
2958        }
2959
2960        #[test]
2961        fn test_node_lifecycle_history() {
2962            let db = cdc_db();
2963
2964            // Create
2965            let id = db.create_node(&["Person"]);
2966            // Update
2967            db.set_node_property(id, "name", "Alix".into());
2968            db.set_node_property(id, "name", "Gus".into());
2969            // Delete
2970            db.delete_node(id);
2971
2972            let history = db.history(id).unwrap();
2973            assert_eq!(history.len(), 4); // create + 2 updates + delete
2974            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2975            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
2976            assert!(history[1].before.is_none()); // first set_node_property has no prior value
2977            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
2978            assert!(history[2].before.is_some()); // second update has prior "Alix"
2979            assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
2980        }
2981
2982        #[test]
2983        fn test_edge_lifecycle_history() {
2984            let db = cdc_db();
2985
2986            let alix = db.create_node(&["Person"]);
2987            let gus = db.create_node(&["Person"]);
2988            let edge = db.create_edge(alix, gus, "KNOWS");
2989            db.set_edge_property(edge, "since", 2024i64.into());
2990            db.delete_edge(edge);
2991
2992            let history = db.history(edge).unwrap();
2993            assert_eq!(history.len(), 3); // create + update + delete
2994            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2995            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
2996            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
2997        }
2998
2999        #[test]
3000        fn test_create_node_with_props_cdc() {
3001            let db = cdc_db();
3002
3003            let id = db.create_node_with_props(
3004                &["Person"],
3005                vec![
3006                    ("name", grafeo_common::types::Value::from("Alix")),
3007                    ("age", grafeo_common::types::Value::from(30i64)),
3008                ],
3009            );
3010
3011            let history = db.history(id).unwrap();
3012            assert_eq!(history.len(), 1);
3013            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3014            // Props should be captured
3015            let after = history[0].after.as_ref().unwrap();
3016            assert_eq!(after.len(), 2);
3017        }
3018
3019        #[test]
3020        fn test_changes_between() {
3021            let db = cdc_db();
3022
3023            let id1 = db.create_node(&["A"]);
3024            let _id2 = db.create_node(&["B"]);
3025            db.set_node_property(id1, "x", 1i64.into());
3026
3027            // All events should be at the same epoch (in-memory, epoch doesn't advance without tx)
3028            let changes = db
3029                .changes_between(
3030                    grafeo_common::types::EpochId(0),
3031                    grafeo_common::types::EpochId(u64::MAX),
3032                )
3033                .unwrap();
3034            assert_eq!(changes.len(), 3); // 2 creates + 1 update
3035        }
3036
3037        #[test]
3038        fn test_cdc_disabled_by_default() {
3039            let db = GrafeoDB::new_in_memory();
3040            assert!(!db.is_cdc_enabled());
3041
3042            let id = db.create_node(&["Person"]);
3043            db.set_node_property(id, "name", "Alix".into());
3044
3045            let history = db.history(id).unwrap();
3046            assert!(history.is_empty(), "CDC off by default: no events recorded");
3047        }
3048
3049        #[test]
3050        fn test_session_with_cdc_override_on() {
3051            // Database default is off, but session opts in
3052            let db = GrafeoDB::new_in_memory();
3053            let session = db.session_with_cdc(true);
3054            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3055            // The CDC log should have events from the opted-in session
3056            let changes = db
3057                .changes_between(
3058                    grafeo_common::types::EpochId(0),
3059                    grafeo_common::types::EpochId(u64::MAX),
3060                )
3061                .unwrap();
3062            assert!(
3063                !changes.is_empty(),
3064                "session_with_cdc(true) should record events"
3065            );
3066        }
3067
3068        #[test]
3069        fn test_session_with_cdc_override_off() {
3070            // Database default is on, but session opts out
3071            let db = cdc_db();
3072            let session = db.session_with_cdc(false);
3073            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3074            let changes = db
3075                .changes_between(
3076                    grafeo_common::types::EpochId(0),
3077                    grafeo_common::types::EpochId(u64::MAX),
3078                )
3079                .unwrap();
3080            assert!(
3081                changes.is_empty(),
3082                "session_with_cdc(false) should not record events"
3083            );
3084        }
3085
3086        #[test]
3087        fn test_set_cdc_enabled_runtime() {
3088            let db = GrafeoDB::new_in_memory();
3089            assert!(!db.is_cdc_enabled());
3090
3091            // Enable at runtime
3092            db.set_cdc_enabled(true);
3093            assert!(db.is_cdc_enabled());
3094
3095            let id = db.create_node(&["Person"]);
3096            let history = db.history(id).unwrap();
3097            assert_eq!(history.len(), 1, "CDC enabled at runtime records events");
3098
3099            // Disable again
3100            db.set_cdc_enabled(false);
3101            let id2 = db.create_node(&["Person"]);
3102            let history2 = db.history(id2).unwrap();
3103            assert!(
3104                history2.is_empty(),
3105                "CDC disabled at runtime stops recording"
3106            );
3107        }
3108    }
3109
3110    #[test]
3111    fn test_with_store_basic() {
3112        use grafeo_core::graph::lpg::LpgStore;
3113
3114        let store = Arc::new(LpgStore::new().unwrap());
3115        let n1 = store.create_node(&["Person"]);
3116        store.set_node_property(n1, "name", "Alix".into());
3117
3118        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
3119        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
3120
3121        let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
3122        assert_eq!(result.rows.len(), 1);
3123    }
3124
3125    #[test]
3126    fn test_with_store_session() {
3127        use grafeo_core::graph::lpg::LpgStore;
3128
3129        let store = Arc::new(LpgStore::new().unwrap());
3130        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
3131        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
3132
3133        let session = db.session();
3134        let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
3135        assert_eq!(result.rows.len(), 1);
3136    }
3137
3138    #[test]
3139    fn test_with_store_mutations() {
3140        use grafeo_core::graph::lpg::LpgStore;
3141
3142        let store = Arc::new(LpgStore::new().unwrap());
3143        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
3144        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
3145
3146        let mut session = db.session();
3147
3148        // Use an explicit transaction so INSERT and MATCH share the same
3149        // transaction context. With PENDING epochs, uncommitted versions are
3150        // only visible to the owning transaction.
3151        session.begin_transaction().unwrap();
3152        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3153
3154        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
3155        assert_eq!(result.rows.len(), 1);
3156
3157        session.commit().unwrap();
3158    }
3159
3160    // =========================================================================
3161    // QueryResult tests
3162    // =========================================================================
3163
3164    #[test]
3165    fn test_query_result_empty() {
3166        let result = QueryResult::empty();
3167        assert!(result.is_empty());
3168        assert_eq!(result.row_count(), 0);
3169        assert_eq!(result.column_count(), 0);
3170        assert!(result.execution_time_ms().is_none());
3171        assert!(result.rows_scanned().is_none());
3172        assert!(result.status_message.is_none());
3173    }
3174
3175    #[test]
3176    fn test_query_result_status() {
3177        let result = QueryResult::status("Created node type 'Person'");
3178        assert!(result.is_empty());
3179        assert_eq!(result.column_count(), 0);
3180        assert_eq!(
3181            result.status_message.as_deref(),
3182            Some("Created node type 'Person'")
3183        );
3184    }
3185
3186    #[test]
3187    fn test_query_result_new_with_columns() {
3188        let result = QueryResult::new(vec!["name".into(), "age".into()]);
3189        assert_eq!(result.column_count(), 2);
3190        assert_eq!(result.row_count(), 0);
3191        assert!(result.is_empty());
3192        // Column types should default to Any
3193        assert_eq!(
3194            result.column_types,
3195            vec![
3196                grafeo_common::types::LogicalType::Any,
3197                grafeo_common::types::LogicalType::Any
3198            ]
3199        );
3200    }
3201
3202    #[test]
3203    fn test_query_result_with_types() {
3204        use grafeo_common::types::LogicalType;
3205        let result = QueryResult::with_types(
3206            vec!["name".into(), "age".into()],
3207            vec![LogicalType::String, LogicalType::Int64],
3208        );
3209        assert_eq!(result.column_count(), 2);
3210        assert_eq!(result.column_types[0], LogicalType::String);
3211        assert_eq!(result.column_types[1], LogicalType::Int64);
3212    }
3213
3214    #[test]
3215    fn test_query_result_with_metrics() {
3216        let result = QueryResult::new(vec!["x".into()]).with_metrics(42.5, 100);
3217        assert_eq!(result.execution_time_ms(), Some(42.5));
3218        assert_eq!(result.rows_scanned(), Some(100));
3219    }
3220
3221    #[test]
3222    fn test_query_result_scalar_success() {
3223        use grafeo_common::types::Value;
3224        let mut result = QueryResult::new(vec!["count".into()]);
3225        result.rows.push(vec![Value::Int64(42)]);
3226
3227        let val: i64 = result.scalar().unwrap();
3228        assert_eq!(val, 42);
3229    }
3230
3231    #[test]
3232    fn test_query_result_scalar_wrong_shape() {
3233        use grafeo_common::types::Value;
3234        // Multiple rows
3235        let mut result = QueryResult::new(vec!["x".into()]);
3236        result.rows.push(vec![Value::Int64(1)]);
3237        result.rows.push(vec![Value::Int64(2)]);
3238        assert!(result.scalar::<i64>().is_err());
3239
3240        // Multiple columns
3241        let mut result2 = QueryResult::new(vec!["a".into(), "b".into()]);
3242        result2.rows.push(vec![Value::Int64(1), Value::Int64(2)]);
3243        assert!(result2.scalar::<i64>().is_err());
3244
3245        // Empty
3246        let result3 = QueryResult::new(vec!["x".into()]);
3247        assert!(result3.scalar::<i64>().is_err());
3248    }
3249
3250    #[test]
3251    fn test_query_result_iter() {
3252        use grafeo_common::types::Value;
3253        let mut result = QueryResult::new(vec!["x".into()]);
3254        result.rows.push(vec![Value::Int64(1)]);
3255        result.rows.push(vec![Value::Int64(2)]);
3256
3257        let collected: Vec<_> = result.iter().collect();
3258        assert_eq!(collected.len(), 2);
3259    }
3260
3261    #[test]
3262    fn test_query_result_display() {
3263        use grafeo_common::types::Value;
3264        let mut result = QueryResult::new(vec!["name".into()]);
3265        result.rows.push(vec![Value::from("Alix")]);
3266        let display = result.to_string();
3267        assert!(display.contains("name"));
3268        assert!(display.contains("Alix"));
3269    }
3270
3271    // =========================================================================
3272    // FromValue error paths
3273    // =========================================================================
3274
3275    #[test]
3276    fn test_from_value_i64_type_mismatch() {
3277        use grafeo_common::types::Value;
3278        let val = Value::from("not a number");
3279        assert!(i64::from_value(&val).is_err());
3280    }
3281
3282    #[test]
3283    fn test_from_value_f64_type_mismatch() {
3284        use grafeo_common::types::Value;
3285        let val = Value::from("not a float");
3286        assert!(f64::from_value(&val).is_err());
3287    }
3288
3289    #[test]
3290    fn test_from_value_string_type_mismatch() {
3291        use grafeo_common::types::Value;
3292        let val = Value::Int64(42);
3293        assert!(String::from_value(&val).is_err());
3294    }
3295
3296    #[test]
3297    fn test_from_value_bool_type_mismatch() {
3298        use grafeo_common::types::Value;
3299        let val = Value::Int64(1);
3300        assert!(bool::from_value(&val).is_err());
3301    }
3302
3303    #[test]
3304    fn test_from_value_all_success() {
3305        use grafeo_common::types::Value;
3306        assert_eq!(i64::from_value(&Value::Int64(99)).unwrap(), 99);
3307        assert!((f64::from_value(&Value::Float64(2.72)).unwrap() - 2.72).abs() < f64::EPSILON);
3308        assert_eq!(String::from_value(&Value::from("hello")).unwrap(), "hello");
3309        assert!(bool::from_value(&Value::Bool(true)).unwrap());
3310    }
3311
3312    // =========================================================================
3313    // GrafeoDB accessor tests
3314    // =========================================================================
3315
3316    #[test]
3317    fn test_database_is_read_only_false_by_default() {
3318        let db = GrafeoDB::new_in_memory();
3319        assert!(!db.is_read_only());
3320    }
3321
3322    #[test]
3323    fn test_database_graph_model() {
3324        let db = GrafeoDB::new_in_memory();
3325        assert_eq!(db.graph_model(), crate::config::GraphModel::Lpg);
3326    }
3327
3328    #[test]
3329    fn test_database_memory_limit_none_by_default() {
3330        let db = GrafeoDB::new_in_memory();
3331        assert!(db.memory_limit().is_none());
3332    }
3333
3334    #[test]
3335    fn test_database_memory_limit_custom() {
3336        let config = Config::in_memory().with_memory_limit(128 * 1024 * 1024);
3337        let db = GrafeoDB::with_config(config).unwrap();
3338        assert_eq!(db.memory_limit(), Some(128 * 1024 * 1024));
3339    }
3340
3341    #[test]
3342    fn test_database_adaptive_config() {
3343        let db = GrafeoDB::new_in_memory();
3344        let adaptive = db.adaptive_config();
3345        assert!(adaptive.enabled);
3346        assert!((adaptive.threshold - 3.0).abs() < f64::EPSILON);
3347    }
3348
3349    #[test]
3350    fn test_database_buffer_manager() {
3351        let db = GrafeoDB::new_in_memory();
3352        let _bm = db.buffer_manager();
3353        // Just verify it doesn't panic
3354    }
3355
3356    #[test]
3357    fn test_database_query_cache() {
3358        let db = GrafeoDB::new_in_memory();
3359        let _qc = db.query_cache();
3360    }
3361
3362    #[test]
3363    fn test_database_clear_plan_cache() {
3364        let db = GrafeoDB::new_in_memory();
3365        // Execute a query to populate the cache
3366        #[cfg(feature = "gql")]
3367        {
3368            let _ = db.execute("MATCH (n) RETURN count(n)");
3369        }
3370        db.clear_plan_cache();
3371        // No panic means success
3372    }
3373
3374    #[test]
3375    fn test_database_gc() {
3376        let db = GrafeoDB::new_in_memory();
3377        db.create_node(&["Person"]);
3378        db.gc();
3379        // Verify no panic, node still accessible
3380        assert_eq!(db.node_count(), 1);
3381    }
3382
3383    // =========================================================================
3384    // Named graph management
3385    // =========================================================================
3386
3387    #[test]
3388    fn test_create_and_list_graphs() {
3389        let db = GrafeoDB::new_in_memory();
3390        let created = db.create_graph("social").unwrap();
3391        assert!(created);
3392
3393        // Creating same graph again returns false
3394        let created_again = db.create_graph("social").unwrap();
3395        assert!(!created_again);
3396
3397        let names = db.list_graphs();
3398        assert!(names.contains(&"social".to_string()));
3399    }
3400
3401    #[test]
3402    fn test_drop_graph() {
3403        let db = GrafeoDB::new_in_memory();
3404        db.create_graph("temp").unwrap();
3405        assert!(db.drop_graph("temp"));
3406        assert!(!db.drop_graph("temp")); // Already dropped
3407    }
3408
3409    #[test]
3410    fn test_drop_graph_resets_current_graph() {
3411        let db = GrafeoDB::new_in_memory();
3412        db.create_graph("active").unwrap();
3413        db.set_current_graph(Some("active")).unwrap();
3414        assert_eq!(db.current_graph(), Some("active".to_string()));
3415
3416        db.drop_graph("active");
3417        assert_eq!(db.current_graph(), None);
3418    }
3419
3420    // =========================================================================
3421    // Current graph / schema context
3422    // =========================================================================
3423
3424    #[test]
3425    fn test_current_graph_default_none() {
3426        let db = GrafeoDB::new_in_memory();
3427        assert_eq!(db.current_graph(), None);
3428    }
3429
3430    #[test]
3431    fn test_set_current_graph_valid() {
3432        let db = GrafeoDB::new_in_memory();
3433        db.create_graph("social").unwrap();
3434        db.set_current_graph(Some("social")).unwrap();
3435        assert_eq!(db.current_graph(), Some("social".to_string()));
3436    }
3437
3438    #[test]
3439    fn test_set_current_graph_nonexistent() {
3440        let db = GrafeoDB::new_in_memory();
3441        let result = db.set_current_graph(Some("nonexistent"));
3442        assert!(result.is_err());
3443    }
3444
3445    #[test]
3446    fn test_set_current_graph_none_resets() {
3447        let db = GrafeoDB::new_in_memory();
3448        db.create_graph("social").unwrap();
3449        db.set_current_graph(Some("social")).unwrap();
3450        db.set_current_graph(None).unwrap();
3451        assert_eq!(db.current_graph(), None);
3452    }
3453
3454    #[test]
3455    fn test_set_current_graph_default_keyword() {
3456        let db = GrafeoDB::new_in_memory();
3457        // "default" is a special case that always succeeds
3458        db.set_current_graph(Some("default")).unwrap();
3459        assert_eq!(db.current_graph(), Some("default".to_string()));
3460    }
3461
3462    #[test]
3463    fn test_current_schema_default_none() {
3464        let db = GrafeoDB::new_in_memory();
3465        assert_eq!(db.current_schema(), None);
3466    }
3467
3468    #[test]
3469    fn test_set_current_schema_nonexistent() {
3470        let db = GrafeoDB::new_in_memory();
3471        let result = db.set_current_schema(Some("nonexistent"));
3472        assert!(result.is_err());
3473    }
3474
3475    #[test]
3476    fn test_set_current_schema_none_resets() {
3477        let db = GrafeoDB::new_in_memory();
3478        db.set_current_schema(None).unwrap();
3479        assert_eq!(db.current_schema(), None);
3480    }
3481
3482    // =========================================================================
3483    // graph_store / graph_store_mut
3484    // =========================================================================
3485
3486    #[test]
3487    fn test_graph_store_returns_lpg_by_default() {
3488        let db = GrafeoDB::new_in_memory();
3489        db.create_node(&["Person"]);
3490        let store = db.graph_store();
3491        assert_eq!(store.node_count(), 1);
3492    }
3493
3494    #[test]
3495    fn test_graph_store_mut_returns_some_by_default() {
3496        let db = GrafeoDB::new_in_memory();
3497        assert!(db.graph_store_mut().is_some());
3498    }
3499
3500    #[test]
3501    fn test_with_read_store() {
3502        use grafeo_core::graph::lpg::LpgStore;
3503
3504        let store = Arc::new(LpgStore::new().unwrap());
3505        store.create_node(&["Person"]);
3506
3507        let read_store = Arc::clone(&store) as Arc<dyn GraphStore>;
3508        let db = GrafeoDB::with_read_store(read_store, Config::in_memory()).unwrap();
3509
3510        assert!(db.is_read_only());
3511        assert!(db.graph_store_mut().is_none());
3512
3513        // Read queries should work
3514        let gs = db.graph_store();
3515        assert_eq!(gs.node_count(), 1);
3516    }
3517
3518    #[test]
3519    fn test_with_store_graph_store_methods() {
3520        use grafeo_core::graph::lpg::LpgStore;
3521
3522        let store = Arc::new(LpgStore::new().unwrap());
3523        store.create_node(&["Person"]);
3524
3525        let db = GrafeoDB::with_store(
3526            Arc::clone(&store) as Arc<dyn GraphStoreMut>,
3527            Config::in_memory(),
3528        )
3529        .unwrap();
3530
3531        assert!(!db.is_read_only());
3532        assert!(db.graph_store_mut().is_some());
3533        assert_eq!(db.graph_store().node_count(), 1);
3534    }
3535
3536    // =========================================================================
3537    // session_read_only
3538    // =========================================================================
3539
3540    #[test]
3541    #[allow(deprecated)]
3542    fn test_session_read_only() {
3543        let db = GrafeoDB::new_in_memory();
3544        db.create_node(&["Person"]);
3545
3546        let session = db.session_read_only();
3547        // Read queries should work
3548        #[cfg(feature = "gql")]
3549        {
3550            let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
3551            assert_eq!(result.rows.len(), 1);
3552        }
3553    }
3554
3555    // =========================================================================
3556    // close on in-memory database
3557    // =========================================================================
3558
3559    #[test]
3560    fn test_close_in_memory_database() {
3561        let db = GrafeoDB::new_in_memory();
3562        db.create_node(&["Person"]);
3563        assert!(db.close().is_ok());
3564        // Second close should also be fine (idempotent)
3565        assert!(db.close().is_ok());
3566    }
3567
3568    // =========================================================================
3569    // with_config validation failure
3570    // =========================================================================
3571
3572    #[test]
3573    fn test_with_config_invalid_config_zero_threads() {
3574        let config = Config::in_memory().with_threads(0);
3575        let result = GrafeoDB::with_config(config);
3576        assert!(result.is_err());
3577    }
3578
3579    #[test]
3580    fn test_with_config_invalid_config_zero_memory_limit() {
3581        let config = Config::in_memory().with_memory_limit(0);
3582        let result = GrafeoDB::with_config(config);
3583        assert!(result.is_err());
3584    }
3585
3586    // =========================================================================
3587    // StorageFormat display (for config.rs coverage)
3588    // =========================================================================
3589
3590    #[test]
3591    fn test_storage_format_display() {
3592        use crate::config::StorageFormat;
3593        assert_eq!(StorageFormat::Auto.to_string(), "auto");
3594        assert_eq!(StorageFormat::WalDirectory.to_string(), "wal-directory");
3595        assert_eq!(StorageFormat::SingleFile.to_string(), "single-file");
3596    }
3597
3598    #[test]
3599    fn test_storage_format_default() {
3600        use crate::config::StorageFormat;
3601        assert_eq!(StorageFormat::default(), StorageFormat::Auto);
3602    }
3603
3604    #[test]
3605    fn test_config_with_storage_format() {
3606        use crate::config::StorageFormat;
3607        let config = Config::in_memory().with_storage_format(StorageFormat::SingleFile);
3608        assert_eq!(config.storage_format, StorageFormat::SingleFile);
3609    }
3610
3611    // =========================================================================
3612    // Config CDC
3613    // =========================================================================
3614
3615    #[test]
3616    fn test_config_with_cdc() {
3617        let config = Config::in_memory().with_cdc();
3618        assert!(config.cdc_enabled);
3619    }
3620
3621    #[test]
3622    fn test_config_cdc_default_false() {
3623        let config = Config::in_memory();
3624        assert!(!config.cdc_enabled);
3625    }
3626
3627    // =========================================================================
3628    // ConfigError as std::error::Error
3629    // =========================================================================
3630
3631    #[test]
3632    fn test_config_error_is_error_trait() {
3633        use crate::config::ConfigError;
3634        let err: Box<dyn std::error::Error> = Box::new(ConfigError::ZeroMemoryLimit);
3635        assert!(err.source().is_none());
3636    }
3637
3638    // =========================================================================
3639    // Metrics tests
3640    // =========================================================================
3641
3642    #[cfg(feature = "metrics")]
3643    #[test]
3644    fn test_metrics_prometheus_output() {
3645        let db = GrafeoDB::new_in_memory();
3646        let prom = db.metrics_prometheus();
3647        // Should contain at least some metric names
3648        assert!(!prom.is_empty());
3649    }
3650
3651    #[cfg(feature = "metrics")]
3652    #[test]
3653    fn test_reset_metrics() {
3654        let db = GrafeoDB::new_in_memory();
3655        // Execute something to generate metrics
3656        let _session = db.session();
3657        db.reset_metrics();
3658        let snap = db.metrics();
3659        assert_eq!(snap.query_count, 0);
3660    }
3661
3662    // =========================================================================
3663    // drop_graph on external store
3664    // =========================================================================
3665
3666    #[test]
3667    fn test_drop_graph_on_external_store() {
3668        use grafeo_core::graph::lpg::LpgStore;
3669
3670        let store = Arc::new(LpgStore::new().unwrap());
3671        let read_store = Arc::clone(&store) as Arc<dyn GraphStore>;
3672        let db = GrafeoDB::with_read_store(read_store, Config::in_memory()).unwrap();
3673
3674        // drop_graph with external store (no built-in store) returns false
3675        assert!(!db.drop_graph("anything"));
3676    }
3677}