Skip to main content

grafeo_engine/database/
mod.rs

1//! The main database struct and operations.
2//!
3//! Start here with [`GrafeoDB`] - it's your handle to everything.
4//!
5//! Operations are split across focused submodules:
6//! - `query` - Query execution (execute, execute_cypher, etc.)
7//! - `crud` - Node/edge CRUD operations
8//! - `index` - Property, vector, and text index management
9//! - `search` - Vector, text, and hybrid search
10//! - `embed` - Embedding model management
11//! - `persistence` - Save, load, snapshots, iteration
12//! - `admin` - Stats, introspection, diagnostics, CDC
13
14#[cfg(feature = "lpg")]
15mod admin;
16#[cfg(feature = "arrow-export")]
17pub mod arrow;
18#[cfg(all(feature = "async-storage", feature = "lpg"))]
19mod async_ops;
20#[cfg(all(feature = "async-storage", feature = "lpg"))]
21pub(crate) mod async_wal_store;
22#[cfg(all(feature = "wal", feature = "grafeo-file"))]
23pub mod backup;
24#[cfg(feature = "lpg")]
25pub(crate) mod catalog_section;
26#[cfg(feature = "cdc")]
27pub(crate) mod cdc_store;
28#[cfg(all(feature = "grafeo-file", feature = "lpg"))]
29mod checkpoint_timer;
30#[cfg(feature = "lpg")]
31mod crud;
32#[cfg(feature = "embed")]
33mod embed;
34#[cfg(feature = "grafeo-file")]
35pub(crate) mod flush;
36#[cfg(feature = "lpg")]
37mod import;
38#[cfg(feature = "lpg")]
39mod index;
40#[cfg(feature = "lpg")]
41mod persistence;
42mod query;
43#[cfg(feature = "triple-store")]
44mod rdf_ops;
45#[cfg(feature = "lpg")]
46mod search;
47pub(crate) mod section_consumer;
48#[cfg(all(feature = "wal", feature = "lpg"))]
49pub(crate) mod wal_store;
50
51use grafeo_common::{grafeo_error, grafeo_warn};
52#[cfg(feature = "wal")]
53use std::path::Path;
54use std::sync::Arc;
55use std::sync::atomic::AtomicUsize;
56
57use parking_lot::RwLock;
58
59use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
60use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind, Result};
61#[cfg(feature = "lpg")]
62use grafeo_core::graph::lpg::LpgStore;
63#[cfg(feature = "triple-store")]
64use grafeo_core::graph::rdf::RdfStore;
65use grafeo_core::graph::{GraphStore, GraphStoreMut};
66#[cfg(feature = "grafeo-file")]
67use grafeo_storage::file::GrafeoFileManager;
68#[cfg(all(feature = "wal", feature = "lpg"))]
69use grafeo_storage::wal::WalRecovery;
70#[cfg(feature = "wal")]
71use grafeo_storage::wal::{DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord};
72
73use crate::catalog::Catalog;
74use crate::config::Config;
75use crate::query::cache::QueryCache;
76use crate::session::Session;
77use crate::transaction::TransactionManager;
78
79/// Your handle to a Grafeo database.
80///
81/// Start here. Create one with [`new_in_memory()`](Self::new_in_memory) for
82/// quick experiments, or [`open()`](Self::open) for persistent storage.
83/// Then grab a [`session()`](Self::session) to start querying.
84///
85/// # Examples
86///
87/// ```
88/// use grafeo_engine::GrafeoDB;
89///
90/// // Quick in-memory database
91/// let db = GrafeoDB::new_in_memory();
92///
93/// // Add some data
94/// db.create_node(&["Person"]);
95///
96/// // Query it
97/// let session = db.session();
98/// let result = session.execute("MATCH (p:Person) RETURN p")?;
99/// # Ok::<(), grafeo_common::utils::error::Error>(())
100/// ```
101pub struct GrafeoDB {
102    /// Database configuration.
103    pub(super) config: Config,
104    /// The underlying graph store (None when using an external store).
105    #[cfg(feature = "lpg")]
106    pub(super) store: Option<Arc<LpgStore>>,
107    /// Schema and metadata catalog shared across sessions.
108    pub(super) catalog: Arc<Catalog>,
109    /// RDF triple store (if RDF feature is enabled).
110    #[cfg(feature = "triple-store")]
111    pub(super) rdf_store: Arc<RdfStore>,
112    /// Transaction manager.
113    pub(super) transaction_manager: Arc<TransactionManager>,
114    /// Unified buffer manager.
115    pub(super) buffer_manager: Arc<BufferManager>,
116    /// Write-ahead log manager (if durability is enabled).
117    #[cfg(feature = "wal")]
118    pub(super) wal: Option<Arc<LpgWal>>,
119    /// Shared WAL graph context tracker. Tracks which named graph was last
120    /// written to the WAL, so concurrent sessions can emit `SwitchGraph`
121    /// records only when the context actually changes.
122    #[cfg(feature = "wal")]
123    pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
124    /// Query cache for parsed and optimized plans.
125    pub(super) query_cache: Arc<QueryCache>,
126    /// Shared commit counter for auto-GC across sessions.
127    pub(super) commit_counter: Arc<AtomicUsize>,
128    /// Whether the database is open.
129    pub(super) is_open: RwLock<bool>,
130    /// Change data capture log for tracking mutations.
131    #[cfg(feature = "cdc")]
132    pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
133    /// Whether CDC is active for new sessions and direct CRUD (runtime-mutable).
134    #[cfg(feature = "cdc")]
135    cdc_enabled: std::sync::atomic::AtomicBool,
136    /// Registered embedding models for text-to-vector conversion.
137    #[cfg(feature = "embed")]
138    pub(super) embedding_models:
139        RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
140    /// Single-file database manager (when using `.grafeo` format).
141    #[cfg(feature = "grafeo-file")]
142    pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
143    /// Periodic checkpoint timer (when `checkpoint_interval` is configured).
144    /// Wrapped in Mutex because `close()` takes `&self` but needs to stop the timer.
145    #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
146    checkpoint_timer: parking_lot::Mutex<Option<checkpoint_timer::CheckpointTimer>>,
147    /// Shared registry of spilled vector storages.
148    /// Used by the search path to create `SpillableVectorAccessor` instances.
149    #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
150    vector_spill_storages: Option<
151        Arc<
152            parking_lot::RwLock<
153                std::collections::HashMap<String, Arc<grafeo_core::index::vector::MmapStorage>>,
154            >,
155        >,
156    >,
157    /// External read-only graph store (when using with_store() or with_read_store()).
158    /// When set, sessions route queries through this store instead of the built-in LpgStore.
159    pub(super) external_read_store: Option<Arc<dyn GraphStore>>,
160    /// External writable graph store (when using with_store()).
161    /// None for read-only databases created via with_read_store().
162    pub(super) external_write_store: Option<Arc<dyn GraphStoreMut>>,
163    /// Metrics registry shared across all sessions.
164    #[cfg(feature = "metrics")]
165    pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
166    /// Persistent graph context for one-shot `execute()` calls.
167    /// When set, each call to `session()` pre-configures the session to this graph.
168    /// Updated after every one-shot `execute()` to reflect `USE GRAPH` / `SESSION RESET`.
169    current_graph: RwLock<Option<String>>,
170    /// Persistent schema context for one-shot `execute()` calls.
171    /// When set, each call to `session()` pre-configures the session to this schema.
172    /// Updated after every one-shot `execute()` to reflect `SESSION SET SCHEMA` / `SESSION RESET`.
173    current_schema: RwLock<Option<String>>,
174    /// Whether this database is open in read-only mode.
175    /// When true, sessions automatically enforce read-only transactions.
176    read_only: bool,
177    /// Named graph projections (virtual subgraphs), shared with sessions.
178    projections:
179        Arc<RwLock<std::collections::HashMap<String, Arc<grafeo_core::graph::GraphProjection>>>>,
180}
181
182impl GrafeoDB {
183    /// Returns a reference to the built-in LPG store.
184    ///
185    /// # Panics
186    ///
187    /// Panics if the database was created with [`with_store()`](Self::with_store) or
188    /// [`with_read_store()`](Self::with_read_store), which use an external store
189    /// instead of the built-in LPG store.
190    #[cfg(feature = "lpg")]
191    fn lpg_store(&self) -> &Arc<LpgStore> {
192        self.store.as_ref().expect(
193            "no built-in LpgStore: this GrafeoDB was created with an external store \
194             (with_store / with_read_store). Use session() or graph_store() instead.",
195        )
196    }
197
198    /// Returns whether CDC is active (runtime check).
199    #[cfg(feature = "cdc")]
200    #[inline]
201    pub(super) fn cdc_active(&self) -> bool {
202        self.cdc_enabled.load(std::sync::atomic::Ordering::Relaxed)
203    }
204
205    /// Creates an in-memory database, fast to create, gone when dropped.
206    ///
207    /// Use this for tests, experiments, or when you don't need persistence.
208    /// For data that survives restarts, use [`open()`](Self::open) instead.
209    ///
210    /// # Panics
211    ///
212    /// Panics if the internal arena allocator cannot be initialized (out of memory).
213    /// Use [`with_config()`](Self::with_config) for a fallible alternative.
214    ///
215    /// # Examples
216    ///
217    /// ```
218    /// use grafeo_engine::GrafeoDB;
219    ///
220    /// let db = GrafeoDB::new_in_memory();
221    /// let session = db.session();
222    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
223    /// # Ok::<(), grafeo_common::utils::error::Error>(())
224    /// ```
225    #[must_use]
226    pub fn new_in_memory() -> Self {
227        Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
228    }
229
230    /// Opens a database at the given path, creating it if it doesn't exist.
231    ///
232    /// If you've used this path before, Grafeo recovers your data from the
233    /// write-ahead log automatically. First open on a new path creates an
234    /// empty database.
235    ///
236    /// # Errors
237    ///
238    /// Returns an error if the path isn't writable or recovery fails.
239    ///
240    /// # Examples
241    ///
242    /// ```no_run
243    /// use grafeo_engine::GrafeoDB;
244    ///
245    /// let db = GrafeoDB::open("./my_social_network")?;
246    /// # Ok::<(), grafeo_common::utils::error::Error>(())
247    /// ```
248    #[cfg(feature = "wal")]
249    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
250        Self::with_config(Config::persistent(path.as_ref()))
251    }
252
253    /// Opens an existing database in read-only mode.
254    ///
255    /// Uses a shared file lock, so multiple processes can read the same
256    /// `.grafeo` file concurrently. The database loads the last checkpoint
257    /// snapshot but does **not** replay the WAL or allow mutations.
258    ///
259    /// Currently only supports the single-file (`.grafeo`) format.
260    ///
261    /// # Errors
262    ///
263    /// Returns an error if the file doesn't exist or can't be read.
264    ///
265    /// # Examples
266    ///
267    /// ```no_run
268    /// use grafeo_engine::GrafeoDB;
269    ///
270    /// let db = GrafeoDB::open_read_only("./my_graph.grafeo")?;
271    /// let session = db.session();
272    /// let result = session.execute("MATCH (n) RETURN n LIMIT 10")?;
273    /// // Mutations will return an error:
274    /// // session.execute("INSERT (:Person)") => Err(ReadOnly)
275    /// # Ok::<(), grafeo_common::utils::error::Error>(())
276    /// ```
277    #[cfg(feature = "grafeo-file")]
278    pub fn open_read_only(path: impl AsRef<std::path::Path>) -> Result<Self> {
279        Self::with_config(Config::read_only(path.as_ref()))
280    }
281
282    /// Creates a database with custom configuration.
283    ///
284    /// Use this when you need fine-grained control over memory limits,
285    /// thread counts, or persistence settings. For most cases,
286    /// [`new_in_memory()`](Self::new_in_memory) or [`open()`](Self::open)
287    /// are simpler.
288    ///
289    /// # Errors
290    ///
291    /// Returns an error if the database can't be created or recovery fails.
292    ///
293    /// # Examples
294    ///
295    /// ```
296    /// use grafeo_engine::{GrafeoDB, Config};
297    ///
298    /// // In-memory with a 512MB limit
299    /// let config = Config::in_memory()
300    ///     .with_memory_limit(512 * 1024 * 1024);
301    ///
302    /// let db = GrafeoDB::with_config(config)?;
303    /// # Ok::<(), grafeo_common::utils::error::Error>(())
304    /// ```
305    pub fn with_config(config: Config) -> Result<Self> {
306        // Validate configuration before proceeding
307        config
308            .validate()
309            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
310
311        #[cfg(feature = "lpg")]
312        let store = Arc::new(LpgStore::new()?);
313        #[cfg(feature = "triple-store")]
314        let rdf_store = Arc::new(RdfStore::new());
315        let transaction_manager = Arc::new(TransactionManager::new());
316
317        // Create buffer manager with configured limits
318        let buffer_config = BufferManagerConfig {
319            budget: config.memory_limit.unwrap_or_else(|| {
320                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
321            }),
322            spill_path: config.spill_path.clone().or_else(|| {
323                config.path.as_ref().and_then(|p| {
324                    let parent = p.parent()?;
325                    let name = p.file_name()?.to_str()?;
326                    Some(parent.join(format!("{name}.spill")))
327                })
328            }),
329            ..BufferManagerConfig::default()
330        };
331        let buffer_manager = BufferManager::new(buffer_config);
332
333        // Create catalog early so WAL replay can restore schema definitions
334        let catalog = Arc::new(Catalog::new());
335
336        let is_read_only = config.access_mode == crate::config::AccessMode::ReadOnly;
337
338        // --- Single-file format (.grafeo) ---
339        #[cfg(feature = "grafeo-file")]
340        let file_manager: Option<Arc<GrafeoFileManager>> = if is_read_only {
341            // Read-only mode: open with shared lock, load snapshot, skip WAL
342            if let Some(ref db_path) = config.path {
343                if db_path.exists() && db_path.is_file() {
344                    let fm = GrafeoFileManager::open_read_only(db_path)?;
345                    // Try v2 section-based format first
346                    #[cfg(feature = "lpg")]
347                    if fm.read_section_directory()?.is_some() {
348                        Self::load_from_sections(
349                            &fm,
350                            &store,
351                            &catalog,
352                            #[cfg(feature = "triple-store")]
353                            &rdf_store,
354                        )?;
355                    } else {
356                        // Fall back to v1 blob format
357                        let snapshot_data = fm.read_snapshot()?;
358                        if !snapshot_data.is_empty() {
359                            Self::apply_snapshot_data(
360                                &store,
361                                &catalog,
362                                #[cfg(feature = "triple-store")]
363                                &rdf_store,
364                                &snapshot_data,
365                            )?;
366                        }
367                    }
368                    Some(Arc::new(fm))
369                } else {
370                    return Err(grafeo_common::utils::error::Error::Internal(format!(
371                        "read-only open requires an existing .grafeo file: {}",
372                        db_path.display()
373                    )));
374                }
375            } else {
376                return Err(grafeo_common::utils::error::Error::Internal(
377                    "read-only mode requires a database path".to_string(),
378                ));
379            }
380        } else if let Some(ref db_path) = config.path {
381            // Initialize the file manager whenever single-file format is selected,
382            // regardless of whether WAL is enabled. Without this, a database opened
383            // with wal_enabled:false + StorageFormat::SingleFile would produce no
384            // output at all (the file manager was previously gated behind wal_enabled).
385            if Self::should_use_single_file(db_path, config.storage_format) {
386                let fm = if db_path.exists() && db_path.is_file() {
387                    GrafeoFileManager::open(db_path)?
388                } else if !db_path.exists() {
389                    GrafeoFileManager::create(db_path)?
390                } else {
391                    // Path exists but is not a file (directory, etc.)
392                    return Err(grafeo_common::utils::error::Error::Internal(format!(
393                        "path exists but is not a file: {}",
394                        db_path.display()
395                    )));
396                };
397
398                // Load data: try v2 section-based format, fall back to v1 blob
399                #[cfg(feature = "lpg")]
400                if fm.read_section_directory()?.is_some() {
401                    Self::load_from_sections(
402                        &fm,
403                        &store,
404                        &catalog,
405                        #[cfg(feature = "triple-store")]
406                        &rdf_store,
407                    )?;
408                } else {
409                    let snapshot_data = fm.read_snapshot()?;
410                    if !snapshot_data.is_empty() {
411                        Self::apply_snapshot_data(
412                            &store,
413                            &catalog,
414                            #[cfg(feature = "triple-store")]
415                            &rdf_store,
416                            &snapshot_data,
417                        )?;
418                    }
419                }
420
421                // Recover sidecar WAL if WAL is enabled and a sidecar exists
422                #[cfg(all(feature = "wal", feature = "lpg"))]
423                if config.wal_enabled && fm.has_sidecar_wal() {
424                    let recovery = WalRecovery::new(fm.sidecar_wal_path());
425                    let records = recovery.recover()?;
426                    Self::apply_wal_records(
427                        &store,
428                        &catalog,
429                        #[cfg(feature = "triple-store")]
430                        &rdf_store,
431                        &records,
432                    )?;
433                }
434
435                Some(Arc::new(fm))
436            } else {
437                None
438            }
439        } else {
440            None
441        };
442
443        // Determine whether to use the WAL directory path (legacy) or sidecar
444        // Read-only mode skips WAL entirely (no recovery, no creation).
445        #[cfg(feature = "wal")]
446        let wal = if is_read_only {
447            None
448        } else if config.wal_enabled {
449            if let Some(ref db_path) = config.path {
450                // When using single-file format, the WAL is a sidecar directory
451                #[cfg(feature = "grafeo-file")]
452                let wal_path = if let Some(ref fm) = file_manager {
453                    let p = fm.sidecar_wal_path();
454                    std::fs::create_dir_all(&p)?;
455                    p
456                } else {
457                    // Legacy: WAL inside the database directory
458                    std::fs::create_dir_all(db_path)?;
459                    db_path.join("wal")
460                };
461
462                #[cfg(not(feature = "grafeo-file"))]
463                let wal_path = {
464                    std::fs::create_dir_all(db_path)?;
465                    db_path.join("wal")
466                };
467
468                // For legacy WAL directory format, check if WAL exists and recover
469                #[cfg(all(feature = "lpg", feature = "grafeo-file"))]
470                let is_single_file = file_manager.is_some();
471                #[cfg(all(feature = "lpg", not(feature = "grafeo-file")))]
472                let is_single_file = false;
473
474                #[cfg(feature = "lpg")]
475                if !is_single_file && wal_path.exists() {
476                    let recovery = WalRecovery::new(&wal_path);
477                    let records = recovery.recover()?;
478                    Self::apply_wal_records(
479                        &store,
480                        &catalog,
481                        #[cfg(feature = "triple-store")]
482                        &rdf_store,
483                        &records,
484                    )?;
485                }
486
487                // Open/create WAL manager with configured durability
488                let wal_durability = match config.wal_durability {
489                    crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
490                    crate::config::DurabilityMode::Batch {
491                        max_delay_ms,
492                        max_records,
493                    } => WalDurabilityMode::Batch {
494                        max_delay_ms,
495                        max_records,
496                    },
497                    crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
498                        WalDurabilityMode::Adaptive { target_interval_ms }
499                    }
500                    crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
501                };
502                let wal_config = WalConfig {
503                    durability: wal_durability,
504                    ..WalConfig::default()
505                };
506                let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
507                Some(Arc::new(wal_manager))
508            } else {
509                None
510            }
511        } else {
512            None
513        };
514
515        // Create query cache with default capacity (1000 queries)
516        let query_cache = Arc::new(QueryCache::default());
517
518        // After all snapshot/WAL recovery, sync TransactionManager epoch
519        // with the store so queries use the correct viewing epoch.
520        #[cfg(all(feature = "temporal", feature = "lpg"))]
521        transaction_manager.sync_epoch(store.current_epoch());
522
523        #[cfg(feature = "cdc")]
524        let cdc_enabled_val = config.cdc_enabled;
525        #[cfg(feature = "cdc")]
526        let cdc_retention = config.cdc_retention.clone();
527
528        // Clone Arcs for the checkpoint timer before moving originals into the struct.
529        // The timer captures its own references and runs in a background thread.
530        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
531        let checkpoint_interval = config.checkpoint_interval;
532        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
533        let timer_store = Arc::clone(&store);
534        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
535        let timer_catalog = Arc::clone(&catalog);
536        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
537        let timer_tm = Arc::clone(&transaction_manager);
538        #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "triple-store"))]
539        let timer_rdf = Arc::clone(&rdf_store);
540        #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "wal"))]
541        let timer_wal = wal.clone();
542
543        let mut db = Self {
544            config,
545            #[cfg(feature = "lpg")]
546            store: Some(store),
547            catalog,
548            #[cfg(feature = "triple-store")]
549            rdf_store,
550            transaction_manager,
551            buffer_manager,
552            #[cfg(feature = "wal")]
553            wal,
554            #[cfg(feature = "wal")]
555            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
556            query_cache,
557            commit_counter: Arc::new(AtomicUsize::new(0)),
558            is_open: RwLock::new(true),
559            #[cfg(feature = "cdc")]
560            cdc_log: Arc::new(crate::cdc::CdcLog::with_retention(cdc_retention)),
561            #[cfg(feature = "cdc")]
562            cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
563            #[cfg(feature = "embed")]
564            embedding_models: RwLock::new(hashbrown::HashMap::new()),
565            #[cfg(feature = "grafeo-file")]
566            file_manager,
567            #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
568            checkpoint_timer: parking_lot::Mutex::new(None),
569            #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
570            vector_spill_storages: None,
571            external_read_store: None,
572            external_write_store: None,
573            #[cfg(feature = "metrics")]
574            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
575            current_graph: RwLock::new(None),
576            current_schema: RwLock::new(None),
577            read_only: is_read_only,
578            projections: Arc::new(RwLock::new(std::collections::HashMap::new())),
579        };
580
581        // Register storage sections as memory consumers for pressure tracking
582        db.register_section_consumers();
583
584        // Start periodic checkpoint timer if configured
585        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
586        if let (Some(interval), Some(fm)) = (checkpoint_interval, &db.file_manager)
587            && !is_read_only
588        {
589            *db.checkpoint_timer.lock() = Some(checkpoint_timer::CheckpointTimer::start(
590                interval,
591                Arc::clone(fm),
592                timer_store,
593                timer_catalog,
594                timer_tm,
595                #[cfg(feature = "triple-store")]
596                timer_rdf,
597                #[cfg(feature = "wal")]
598                timer_wal,
599            ));
600        }
601
602        // Discover existing spill files from a previous session.
603        // If vectors were spilled before close, the spill files persist on disk
604        // and need to be re-mapped so search can read from them.
605        #[cfg(all(
606            feature = "lpg",
607            feature = "vector-index",
608            feature = "mmap",
609            not(feature = "temporal")
610        ))]
611        db.restore_spill_files();
612
613        // If VectorStore is configured as ForceDisk, immediately spill embeddings.
614        // This must happen after register_section_consumers() which creates the consumer.
615        #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
616        if db
617            .config
618            .section_configs
619            .get(&grafeo_common::storage::SectionType::VectorStore)
620            .is_some_and(|c| c.tier == grafeo_common::storage::TierOverride::ForceDisk)
621        {
622            db.buffer_manager.spill_all();
623        }
624
625        Ok(db)
626    }
627
628    /// Creates a database backed by a custom [`GraphStoreMut`] implementation.
629    ///
630    /// The external store handles all data persistence. WAL, CDC, and index
631    /// management are the responsibility of the store implementation.
632    ///
633    /// Query execution (all 6 languages, optimizer, planner) works through the
634    /// provided store. Admin operations (schema introspection, persistence,
635    /// vector/text indexes) are not available on external stores.
636    ///
637    /// # Examples
638    ///
639    /// ```no_run
640    /// use std::sync::Arc;
641    /// use grafeo_engine::{GrafeoDB, Config};
642    /// use grafeo_core::graph::GraphStoreMut;
643    ///
644    /// fn example(store: Arc<dyn GraphStoreMut>) -> grafeo_common::utils::error::Result<()> {
645    ///     let db = GrafeoDB::with_store(store, Config::in_memory())?;
646    ///     let result = db.execute("MATCH (n) RETURN count(n)")?;
647    ///     Ok(())
648    /// }
649    /// ```
650    ///
651    /// # Errors
652    ///
653    /// Returns an error if config validation fails.
654    ///
655    /// [`GraphStoreMut`]: grafeo_core::graph::GraphStoreMut
656    pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
657        config
658            .validate()
659            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
660
661        let transaction_manager = Arc::new(TransactionManager::new());
662
663        let buffer_config = BufferManagerConfig {
664            budget: config.memory_limit.unwrap_or_else(|| {
665                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
666            }),
667            spill_path: None,
668            ..BufferManagerConfig::default()
669        };
670        let buffer_manager = BufferManager::new(buffer_config);
671
672        let query_cache = Arc::new(QueryCache::default());
673
674        #[cfg(feature = "cdc")]
675        let cdc_enabled_val = config.cdc_enabled;
676
677        Ok(Self {
678            config,
679            #[cfg(feature = "lpg")]
680            store: None,
681            catalog: Arc::new(Catalog::new()),
682            #[cfg(feature = "triple-store")]
683            rdf_store: Arc::new(RdfStore::new()),
684            transaction_manager,
685            buffer_manager,
686            #[cfg(feature = "wal")]
687            wal: None,
688            #[cfg(feature = "wal")]
689            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
690            query_cache,
691            commit_counter: Arc::new(AtomicUsize::new(0)),
692            is_open: RwLock::new(true),
693            #[cfg(feature = "cdc")]
694            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
695            #[cfg(feature = "cdc")]
696            cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
697            #[cfg(feature = "embed")]
698            embedding_models: RwLock::new(hashbrown::HashMap::new()),
699            #[cfg(feature = "grafeo-file")]
700            file_manager: None,
701            #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
702            checkpoint_timer: parking_lot::Mutex::new(None),
703            #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
704            vector_spill_storages: None,
705            external_read_store: Some(Arc::clone(&store) as Arc<dyn GraphStore>),
706            external_write_store: Some(store),
707            #[cfg(feature = "metrics")]
708            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
709            current_graph: RwLock::new(None),
710            current_schema: RwLock::new(None),
711            read_only: false,
712            projections: Arc::new(RwLock::new(std::collections::HashMap::new())),
713        })
714    }
715
716    /// Creates a database backed by a read-only [`GraphStore`].
717    ///
718    /// The database is set to read-only mode. Write queries (CREATE, SET,
719    /// DELETE) will return `TransactionError::ReadOnly`.
720    ///
721    /// # Examples
722    ///
723    /// ```no_run
724    /// use std::sync::Arc;
725    /// use grafeo_engine::{GrafeoDB, Config};
726    /// use grafeo_core::graph::GraphStore;
727    ///
728    /// fn example(store: Arc<dyn GraphStore>) -> grafeo_common::utils::error::Result<()> {
729    ///     let db = GrafeoDB::with_read_store(store, Config::in_memory())?;
730    ///     let result = db.execute("MATCH (n) RETURN count(n)")?;
731    ///     Ok(())
732    /// }
733    /// ```
734    ///
735    /// # Errors
736    ///
737    /// Returns an error if config validation fails.
738    ///
739    /// [`GraphStore`]: grafeo_core::graph::GraphStore
740    pub fn with_read_store(store: Arc<dyn GraphStore>, config: Config) -> Result<Self> {
741        config
742            .validate()
743            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
744
745        let transaction_manager = Arc::new(TransactionManager::new());
746
747        let buffer_config = BufferManagerConfig {
748            budget: config.memory_limit.unwrap_or_else(|| {
749                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
750            }),
751            spill_path: None,
752            ..BufferManagerConfig::default()
753        };
754        let buffer_manager = BufferManager::new(buffer_config);
755
756        let query_cache = Arc::new(QueryCache::default());
757
758        #[cfg(feature = "cdc")]
759        let cdc_enabled_val = config.cdc_enabled;
760
761        Ok(Self {
762            config,
763            #[cfg(feature = "lpg")]
764            store: None,
765            catalog: Arc::new(Catalog::new()),
766            #[cfg(feature = "triple-store")]
767            rdf_store: Arc::new(RdfStore::new()),
768            transaction_manager,
769            buffer_manager,
770            #[cfg(feature = "wal")]
771            wal: None,
772            #[cfg(feature = "wal")]
773            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
774            query_cache,
775            commit_counter: Arc::new(AtomicUsize::new(0)),
776            is_open: RwLock::new(true),
777            #[cfg(feature = "cdc")]
778            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
779            #[cfg(feature = "cdc")]
780            cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
781            #[cfg(feature = "embed")]
782            embedding_models: RwLock::new(hashbrown::HashMap::new()),
783            #[cfg(feature = "grafeo-file")]
784            file_manager: None,
785            #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
786            checkpoint_timer: parking_lot::Mutex::new(None),
787            #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
788            vector_spill_storages: None,
789            external_read_store: Some(store),
790            external_write_store: None,
791            #[cfg(feature = "metrics")]
792            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
793            current_graph: RwLock::new(None),
794            current_schema: RwLock::new(None),
795            read_only: true,
796            projections: Arc::new(RwLock::new(std::collections::HashMap::new())),
797        })
798    }
799
800    /// Converts the database to a read-only [`CompactStore`] for faster queries.
801    ///
802    /// Takes a snapshot of all nodes and edges from the current store, builds
803    /// a columnar `CompactStore` with CSR adjacency, and switches the database
804    /// to read-only mode. The original store is dropped to free memory.
805    ///
806    /// After calling this, all write queries will fail with
807    /// `TransactionError::ReadOnly`. Read queries (across all supported
808    /// languages) continue to work and benefit from ~60x memory reduction
809    /// and 100x+ traversal speedup.
810    ///
811    /// # Errors
812    ///
813    /// Returns an error if the conversion fails (e.g. more than 32,767
814    /// distinct labels or edge types).
815    ///
816    /// [`CompactStore`]: grafeo_core::graph::compact::CompactStore
817    #[cfg(feature = "compact-store")]
818    pub fn compact(&mut self) -> Result<()> {
819        use grafeo_core::graph::compact::from_graph_store;
820
821        let current_store = self.graph_store();
822        let compact = from_graph_store(current_store.as_ref())
823            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
824
825        self.external_read_store = Some(Arc::new(compact) as Arc<dyn GraphStore>);
826        self.external_write_store = None;
827        #[cfg(feature = "lpg")]
828        {
829            self.store = None;
830        }
831        self.read_only = true;
832        self.query_cache = Arc::new(QueryCache::default());
833        // Projections hold Arc refs to the old store: clear them so they don't
834        // serve stale data or prevent the old store's memory from being freed.
835        self.projections.write().clear();
836
837        Ok(())
838    }
839
840    /// Applies WAL records to restore the database state.
841    ///
842    /// Data mutation records are routed through a graph cursor that tracks
843    /// `SwitchGraph` context markers, replaying mutations into the correct
844    /// named graph (or the default graph when cursor is `None`).
845    #[cfg(all(feature = "wal", feature = "lpg"))]
846    fn apply_wal_records(
847        store: &Arc<LpgStore>,
848        catalog: &Catalog,
849        #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
850        records: &[WalRecord],
851    ) -> Result<()> {
852        use crate::catalog::{
853            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
854        };
855        use grafeo_common::utils::error::Error;
856
857        // Graph cursor: tracks which named graph receives data mutations.
858        // `None` means the default graph.
859        let mut current_graph: Option<String> = None;
860        let mut target_store: Arc<LpgStore> = Arc::clone(store);
861
862        for record in records {
863            match record {
864                // --- Named graph lifecycle ---
865                WalRecord::CreateNamedGraph { name } => {
866                    let _ = store.create_graph(name);
867                }
868                WalRecord::DropNamedGraph { name } => {
869                    store.drop_graph(name);
870                    // Reset cursor if the dropped graph was active
871                    if current_graph.as_deref() == Some(name.as_str()) {
872                        current_graph = None;
873                        target_store = Arc::clone(store);
874                    }
875                }
876                WalRecord::SwitchGraph { name } => {
877                    current_graph.clone_from(name);
878                    target_store = match &current_graph {
879                        None => Arc::clone(store),
880                        Some(graph_name) => store
881                            .graph_or_create(graph_name)
882                            .map_err(|e| Error::Internal(e.to_string()))?,
883                    };
884                }
885
886                // --- Data mutations: routed through target_store ---
887                WalRecord::CreateNode { id, labels } => {
888                    let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
889                    target_store.create_node_with_id(*id, &label_refs)?;
890                }
891                WalRecord::DeleteNode { id } => {
892                    target_store.delete_node(*id);
893                }
894                WalRecord::CreateEdge {
895                    id,
896                    src,
897                    dst,
898                    edge_type,
899                } => {
900                    target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
901                }
902                WalRecord::DeleteEdge { id } => {
903                    target_store.delete_edge(*id);
904                }
905                WalRecord::SetNodeProperty { id, key, value } => {
906                    target_store.set_node_property(*id, key, value.clone());
907                }
908                WalRecord::SetEdgeProperty { id, key, value } => {
909                    target_store.set_edge_property(*id, key, value.clone());
910                }
911                WalRecord::AddNodeLabel { id, label } => {
912                    target_store.add_label(*id, label);
913                }
914                WalRecord::RemoveNodeLabel { id, label } => {
915                    target_store.remove_label(*id, label);
916                }
917                WalRecord::RemoveNodeProperty { id, key } => {
918                    target_store.remove_node_property(*id, key);
919                }
920                WalRecord::RemoveEdgeProperty { id, key } => {
921                    target_store.remove_edge_property(*id, key);
922                }
923
924                // --- Schema DDL replay (always on root catalog) ---
925                WalRecord::CreateNodeType {
926                    name,
927                    properties,
928                    constraints,
929                } => {
930                    let def = NodeTypeDefinition {
931                        name: name.clone(),
932                        properties: properties
933                            .iter()
934                            .map(|(n, t, nullable)| TypedProperty {
935                                name: n.clone(),
936                                data_type: PropertyDataType::from_type_name(t),
937                                nullable: *nullable,
938                                default_value: None,
939                            })
940                            .collect(),
941                        constraints: constraints
942                            .iter()
943                            .map(|(kind, props)| match kind.as_str() {
944                                "unique" => TypeConstraint::Unique(props.clone()),
945                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
946                                "not_null" if !props.is_empty() => {
947                                    TypeConstraint::NotNull(props[0].clone())
948                                }
949                                _ => TypeConstraint::Unique(props.clone()),
950                            })
951                            .collect(),
952                        parent_types: Vec::new(),
953                    };
954                    let _ = catalog.register_node_type(def);
955                }
956                WalRecord::DropNodeType { name } => {
957                    let _ = catalog.drop_node_type(name);
958                }
959                WalRecord::CreateEdgeType {
960                    name,
961                    properties,
962                    constraints,
963                } => {
964                    let def = EdgeTypeDefinition {
965                        name: name.clone(),
966                        properties: properties
967                            .iter()
968                            .map(|(n, t, nullable)| TypedProperty {
969                                name: n.clone(),
970                                data_type: PropertyDataType::from_type_name(t),
971                                nullable: *nullable,
972                                default_value: None,
973                            })
974                            .collect(),
975                        constraints: constraints
976                            .iter()
977                            .map(|(kind, props)| match kind.as_str() {
978                                "unique" => TypeConstraint::Unique(props.clone()),
979                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
980                                "not_null" if !props.is_empty() => {
981                                    TypeConstraint::NotNull(props[0].clone())
982                                }
983                                _ => TypeConstraint::Unique(props.clone()),
984                            })
985                            .collect(),
986                        source_node_types: Vec::new(),
987                        target_node_types: Vec::new(),
988                    };
989                    let _ = catalog.register_edge_type_def(def);
990                }
991                WalRecord::DropEdgeType { name } => {
992                    let _ = catalog.drop_edge_type_def(name);
993                }
994                WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
995                    // Index recreation is handled by the store on startup
996                    // (indexes are rebuilt from data, not WAL)
997                }
998                WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
999                    // Constraint definitions are part of type definitions
1000                    // and replayed via CreateNodeType/CreateEdgeType
1001                }
1002                WalRecord::CreateGraphType {
1003                    name,
1004                    node_types,
1005                    edge_types,
1006                    open,
1007                } => {
1008                    use crate::catalog::GraphTypeDefinition;
1009                    let def = GraphTypeDefinition {
1010                        name: name.clone(),
1011                        allowed_node_types: node_types.clone(),
1012                        allowed_edge_types: edge_types.clone(),
1013                        open: *open,
1014                    };
1015                    let _ = catalog.register_graph_type(def);
1016                }
1017                WalRecord::DropGraphType { name } => {
1018                    let _ = catalog.drop_graph_type(name);
1019                }
1020                WalRecord::CreateSchema { name } => {
1021                    let _ = catalog.register_schema_namespace(name.clone());
1022                }
1023                WalRecord::DropSchema { name } => {
1024                    let _ = catalog.drop_schema_namespace(name);
1025                }
1026
1027                WalRecord::AlterNodeType { name, alterations } => {
1028                    for (action, prop_name, type_name, nullable) in alterations {
1029                        match action.as_str() {
1030                            "add" => {
1031                                let prop = TypedProperty {
1032                                    name: prop_name.clone(),
1033                                    data_type: PropertyDataType::from_type_name(type_name),
1034                                    nullable: *nullable,
1035                                    default_value: None,
1036                                };
1037                                let _ = catalog.alter_node_type_add_property(name, prop);
1038                            }
1039                            "drop" => {
1040                                let _ = catalog.alter_node_type_drop_property(name, prop_name);
1041                            }
1042                            _ => {}
1043                        }
1044                    }
1045                }
1046                WalRecord::AlterEdgeType { name, alterations } => {
1047                    for (action, prop_name, type_name, nullable) in alterations {
1048                        match action.as_str() {
1049                            "add" => {
1050                                let prop = TypedProperty {
1051                                    name: prop_name.clone(),
1052                                    data_type: PropertyDataType::from_type_name(type_name),
1053                                    nullable: *nullable,
1054                                    default_value: None,
1055                                };
1056                                let _ = catalog.alter_edge_type_add_property(name, prop);
1057                            }
1058                            "drop" => {
1059                                let _ = catalog.alter_edge_type_drop_property(name, prop_name);
1060                            }
1061                            _ => {}
1062                        }
1063                    }
1064                }
1065                WalRecord::AlterGraphType { name, alterations } => {
1066                    for (action, type_name) in alterations {
1067                        match action.as_str() {
1068                            "add_node" => {
1069                                let _ =
1070                                    catalog.alter_graph_type_add_node_type(name, type_name.clone());
1071                            }
1072                            "drop_node" => {
1073                                let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
1074                            }
1075                            "add_edge" => {
1076                                let _ =
1077                                    catalog.alter_graph_type_add_edge_type(name, type_name.clone());
1078                            }
1079                            "drop_edge" => {
1080                                let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
1081                            }
1082                            _ => {}
1083                        }
1084                    }
1085                }
1086
1087                WalRecord::CreateProcedure {
1088                    name,
1089                    params,
1090                    returns,
1091                    body,
1092                } => {
1093                    use crate::catalog::ProcedureDefinition;
1094                    let def = ProcedureDefinition {
1095                        name: name.clone(),
1096                        params: params.clone(),
1097                        returns: returns.clone(),
1098                        body: body.clone(),
1099                    };
1100                    let _ = catalog.register_procedure(def);
1101                }
1102                WalRecord::DropProcedure { name } => {
1103                    let _ = catalog.drop_procedure(name);
1104                }
1105
1106                // --- RDF triple replay ---
1107                #[cfg(feature = "triple-store")]
1108                WalRecord::InsertRdfTriple { .. }
1109                | WalRecord::DeleteRdfTriple { .. }
1110                | WalRecord::ClearRdfGraph { .. }
1111                | WalRecord::CreateRdfGraph { .. }
1112                | WalRecord::DropRdfGraph { .. } => {
1113                    rdf_ops::replay_rdf_wal_record(rdf_store, record);
1114                }
1115                #[cfg(not(feature = "triple-store"))]
1116                WalRecord::InsertRdfTriple { .. }
1117                | WalRecord::DeleteRdfTriple { .. }
1118                | WalRecord::ClearRdfGraph { .. }
1119                | WalRecord::CreateRdfGraph { .. }
1120                | WalRecord::DropRdfGraph { .. } => {}
1121
1122                WalRecord::TransactionCommit { .. } => {
1123                    // In temporal mode, advance the store epoch on each committed
1124                    // transaction so that subsequent property/label operations
1125                    // are recorded at the correct epoch in their VersionLogs.
1126                    #[cfg(feature = "temporal")]
1127                    {
1128                        target_store.new_epoch();
1129                    }
1130                }
1131                WalRecord::TransactionAbort { .. } | WalRecord::Checkpoint { .. } => {
1132                    // Transaction control records don't need replay action
1133                    // (recovery already filtered to only committed transactions)
1134                }
1135                WalRecord::EpochAdvance { .. } => {
1136                    // Metadata record: no store mutation needed.
1137                    // Used by incremental backup and point-in-time recovery.
1138                }
1139            }
1140        }
1141        Ok(())
1142    }
1143
1144    // =========================================================================
1145    // Single-file format helpers
1146    // =========================================================================
1147
1148    /// Returns `true` if the given path should use single-file format.
1149    #[cfg(feature = "grafeo-file")]
1150    fn should_use_single_file(
1151        path: &std::path::Path,
1152        configured: crate::config::StorageFormat,
1153    ) -> bool {
1154        use crate::config::StorageFormat;
1155        match configured {
1156            StorageFormat::SingleFile => true,
1157            StorageFormat::WalDirectory => false,
1158            StorageFormat::Auto => {
1159                // Existing file: check magic bytes
1160                if path.is_file() {
1161                    if let Ok(mut f) = std::fs::File::open(path) {
1162                        use std::io::Read;
1163                        let mut magic = [0u8; 4];
1164                        if f.read_exact(&mut magic).is_ok() && magic == grafeo_storage::file::MAGIC
1165                        {
1166                            return true;
1167                        }
1168                    }
1169                    return false;
1170                }
1171                // Existing directory: legacy format
1172                if path.is_dir() {
1173                    return false;
1174                }
1175                // New path: check extension
1176                path.extension().is_some_and(|ext| ext == "grafeo")
1177            }
1178        }
1179    }
1180
1181    /// Applies snapshot data (from a `.grafeo` file) to restore the store and catalog.
1182    ///
1183    /// Supports both v1 (monolithic blob) and v2 (section-based) formats.
1184    #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
1185    fn apply_snapshot_data(
1186        store: &Arc<LpgStore>,
1187        catalog: &Arc<crate::catalog::Catalog>,
1188        #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
1189        data: &[u8],
1190    ) -> Result<()> {
1191        // v1 blob format: pass through to legacy loader
1192        persistence::load_snapshot_into_store(
1193            store,
1194            catalog,
1195            #[cfg(feature = "triple-store")]
1196            rdf_store,
1197            data,
1198        )
1199    }
1200
1201    /// Loads from a section-based `.grafeo` file (v2 format).
1202    ///
1203    /// Reads the section directory, then deserializes each section independently.
1204    #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
1205    fn load_from_sections(
1206        fm: &GrafeoFileManager,
1207        store: &Arc<LpgStore>,
1208        catalog: &Arc<crate::catalog::Catalog>,
1209        #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
1210    ) -> Result<()> {
1211        use grafeo_common::storage::{Section, SectionType};
1212
1213        let dir = fm.read_section_directory()?.ok_or_else(|| {
1214            grafeo_common::utils::error::Error::Internal(
1215                "expected v2 section directory but found none".to_string(),
1216            )
1217        })?;
1218
1219        // Load catalog section first (schema defs needed before data)
1220        if let Some(entry) = dir.find(SectionType::Catalog) {
1221            let data = fm.read_section_data(entry)?;
1222            let tm = Arc::new(crate::transaction::TransactionManager::new());
1223            let mut section = catalog_section::CatalogSection::new(
1224                Arc::clone(catalog),
1225                Arc::clone(store),
1226                move || tm.current_epoch().as_u64(),
1227            );
1228            section.deserialize(&data)?;
1229        }
1230
1231        // Load LPG store
1232        if let Some(entry) = dir.find(SectionType::LpgStore) {
1233            let data = fm.read_section_data(entry)?;
1234            let mut section = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
1235            section.deserialize(&data)?;
1236        }
1237
1238        // Load RDF store
1239        #[cfg(feature = "triple-store")]
1240        if let Some(entry) = dir.find(SectionType::RdfStore) {
1241            let data = fm.read_section_data(entry)?;
1242            let mut section = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(rdf_store));
1243            section.deserialize(&data)?;
1244        }
1245
1246        // Restore HNSW topology (if vector indexes exist in both catalog and section)
1247        #[cfg(feature = "vector-index")]
1248        if let Some(entry) = dir.find(SectionType::VectorStore) {
1249            let data = fm.read_section_data(entry)?;
1250            let indexes = store.vector_index_entries();
1251            if !indexes.is_empty() {
1252                let mut section = grafeo_core::index::vector::VectorStoreSection::new(indexes);
1253                section.deserialize(&data)?;
1254            }
1255        }
1256
1257        // Restore BM25 postings (if text indexes exist in both catalog and section)
1258        #[cfg(feature = "text-index")]
1259        if let Some(entry) = dir.find(SectionType::TextIndex) {
1260            let data = fm.read_section_data(entry)?;
1261            let indexes = store.text_index_entries();
1262            if !indexes.is_empty() {
1263                let mut section = grafeo_core::index::text::TextIndexSection::new(indexes);
1264                section.deserialize(&data)?;
1265            }
1266        }
1267
1268        Ok(())
1269    }
1270
1271    // =========================================================================
1272    // Session & Configuration
1273    // =========================================================================
1274
1275    /// Opens a new session for running queries.
1276    ///
1277    /// Sessions are cheap to create: spin up as many as you need. Each
1278    /// gets its own transaction context, so concurrent sessions won't
1279    /// block each other on reads.
1280    ///
1281    /// # Panics
1282    ///
1283    /// Panics if the database was configured with an external graph store and
1284    /// the internal arena allocator cannot be initialized (out of memory).
1285    ///
1286    /// # Examples
1287    ///
1288    /// ```
1289    /// use grafeo_engine::GrafeoDB;
1290    ///
1291    /// let db = GrafeoDB::new_in_memory();
1292    /// let session = db.session();
1293    ///
1294    /// // Run queries through the session
1295    /// let result = session.execute("MATCH (n) RETURN count(n)")?;
1296    /// # Ok::<(), grafeo_common::utils::error::Error>(())
1297    /// ```
1298    #[must_use]
1299    pub fn session(&self) -> Session {
1300        self.create_session_inner(None)
1301    }
1302
1303    /// Creates a session scoped to the given identity.
1304    ///
1305    /// The identity determines what operations the session is allowed to
1306    /// perform. A [`Role::ReadOnly`](crate::auth::Role::ReadOnly) identity
1307    /// creates a read-only session; a [`Role::ReadWrite`](crate::auth::Role::ReadWrite)
1308    /// identity allows data mutations but not schema DDL; a
1309    /// [`Role::Admin`](crate::auth::Role::Admin) identity has full access.
1310    ///
1311    /// # Examples
1312    ///
1313    /// ```
1314    /// use grafeo_engine::{GrafeoDB, auth::{Identity, Role}};
1315    ///
1316    /// let db = GrafeoDB::new_in_memory();
1317    /// let identity = Identity::new("app-service", [Role::ReadWrite]);
1318    /// let session = db.session_with_identity(identity);
1319    /// ```
1320    #[must_use]
1321    pub fn session_with_identity(&self, identity: crate::auth::Identity) -> Session {
1322        let force_read_only = !identity.can_write();
1323        self.create_session_inner_full(None, force_read_only, identity)
1324    }
1325
1326    /// Creates a session scoped to a single role.
1327    ///
1328    /// Convenience shorthand for
1329    /// `session_with_identity(Identity::new("anonymous", [role]))`.
1330    ///
1331    /// # Examples
1332    ///
1333    /// ```
1334    /// use grafeo_engine::{GrafeoDB, auth::Role};
1335    ///
1336    /// let db = GrafeoDB::new_in_memory();
1337    /// let reader = db.session_with_role(Role::ReadOnly);
1338    /// ```
1339    #[must_use]
1340    pub fn session_with_role(&self, role: crate::auth::Role) -> Session {
1341        self.session_with_identity(crate::auth::Identity::new("anonymous", [role]))
1342    }
1343
1344    /// Creates a session with an explicit CDC override.
1345    ///
1346    /// When `cdc_enabled` is `true`, mutations in this session are tracked
1347    /// regardless of the database default. When `false`, mutations are not
1348    /// tracked regardless of the database default.
1349    ///
1350    /// # Examples
1351    ///
1352    /// ```
1353    /// use grafeo_engine::GrafeoDB;
1354    ///
1355    /// let db = GrafeoDB::new_in_memory();
1356    ///
1357    /// // Opt in to CDC for just this session
1358    /// let tracked = db.session_with_cdc(true);
1359    /// tracked.execute("INSERT (:Person {name: 'Alix'})")?;
1360    /// # Ok::<(), grafeo_common::utils::error::Error>(())
1361    /// ```
1362    #[cfg(feature = "cdc")]
1363    #[must_use]
1364    pub fn session_with_cdc(&self, cdc_enabled: bool) -> Session {
1365        self.create_session_inner(Some(cdc_enabled))
1366    }
1367
1368    /// Creates a read-only session regardless of the database's access mode.
1369    ///
1370    /// Mutations executed through this session will fail with
1371    /// `TransactionError::ReadOnly`. Useful for replication replicas where
1372    /// the database itself must remain writable (for applying CDC changes)
1373    /// but client-facing queries must be read-only.
1374    ///
1375    /// **Deprecated**: Use `session_with_role(Role::ReadOnly)` instead.
1376    #[deprecated(
1377        since = "0.5.36",
1378        note = "use session_with_role(Role::ReadOnly) instead"
1379    )]
1380    #[must_use]
1381    pub fn session_read_only(&self) -> Session {
1382        self.session_with_role(crate::auth::Role::ReadOnly)
1383    }
1384
1385    /// Shared session creation logic.
1386    ///
1387    /// `cdc_override` overrides the database-wide `cdc_enabled` default when
1388    /// `Some`. `None` falls back to the database default.
1389    #[allow(unused_variables)] // cdc_override unused when cdc feature is off
1390    fn create_session_inner(&self, cdc_override: Option<bool>) -> Session {
1391        self.create_session_inner_full(cdc_override, false, crate::auth::Identity::anonymous())
1392    }
1393
1394    /// Shared session creation with all overrides.
1395    #[allow(unused_variables)]
1396    fn create_session_inner_full(
1397        &self,
1398        cdc_override: Option<bool>,
1399        force_read_only: bool,
1400        identity: crate::auth::Identity,
1401    ) -> Session {
1402        let session_cfg = || crate::session::SessionConfig {
1403            transaction_manager: Arc::clone(&self.transaction_manager),
1404            query_cache: Arc::clone(&self.query_cache),
1405            catalog: Arc::clone(&self.catalog),
1406            adaptive_config: self.config.adaptive.clone(),
1407            factorized_execution: self.config.factorized_execution,
1408            graph_model: self.config.graph_model,
1409            query_timeout: self.config.query_timeout,
1410            commit_counter: Arc::clone(&self.commit_counter),
1411            gc_interval: self.config.gc_interval,
1412            read_only: self.read_only || force_read_only,
1413            identity: identity.clone(),
1414            #[cfg(feature = "lpg")]
1415            projections: Arc::clone(&self.projections),
1416        };
1417
1418        if let Some(ref ext_read) = self.external_read_store {
1419            return Session::with_external_store(
1420                Arc::clone(ext_read),
1421                self.external_write_store.as_ref().map(Arc::clone),
1422                session_cfg(),
1423            )
1424            .expect("arena allocation for external store session");
1425        }
1426
1427        #[cfg(all(feature = "lpg", feature = "triple-store"))]
1428        let mut session = Session::with_rdf_store_and_adaptive(
1429            Arc::clone(self.lpg_store()),
1430            Arc::clone(&self.rdf_store),
1431            session_cfg(),
1432        );
1433        #[cfg(all(feature = "lpg", not(feature = "triple-store")))]
1434        let mut session = Session::with_adaptive(Arc::clone(self.lpg_store()), session_cfg());
1435        #[cfg(not(feature = "lpg"))]
1436        let mut session =
1437            Session::with_external_store(self.graph_store(), self.graph_store_mut(), session_cfg())
1438                .expect("session creation for non-lpg build");
1439
1440        #[cfg(all(feature = "wal", feature = "lpg"))]
1441        if let Some(ref wal) = self.wal {
1442            session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
1443        }
1444
1445        #[cfg(feature = "cdc")]
1446        {
1447            let should_enable = cdc_override.unwrap_or_else(|| self.cdc_active());
1448            if should_enable {
1449                session.set_cdc_log(Arc::clone(&self.cdc_log));
1450            }
1451        }
1452
1453        #[cfg(feature = "metrics")]
1454        {
1455            if let Some(ref m) = self.metrics {
1456                session.set_metrics(Arc::clone(m));
1457                m.session_created
1458                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1459                m.session_active
1460                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1461            }
1462        }
1463
1464        // Propagate persistent graph context to the new session
1465        if let Some(ref graph) = *self.current_graph.read() {
1466            session.use_graph(graph);
1467        }
1468
1469        // Propagate persistent schema context to the new session
1470        if let Some(ref schema) = *self.current_schema.read() {
1471            session.set_schema(schema);
1472        }
1473
1474        // Suppress unused_mut when cdc/wal are disabled
1475        let _ = &mut session;
1476
1477        session
1478    }
1479
1480    /// Returns the current graph name, if any.
1481    ///
1482    /// This is the persistent graph context used by one-shot `execute()` calls.
1483    /// It is updated whenever `execute()` encounters `USE GRAPH`, `SESSION SET GRAPH`,
1484    /// or `SESSION RESET`.
1485    #[must_use]
1486    pub fn current_graph(&self) -> Option<String> {
1487        self.current_graph.read().clone()
1488    }
1489
1490    /// Sets the current graph context for subsequent one-shot `execute()` calls.
1491    ///
1492    /// This is equivalent to running `USE GRAPH <name>` but without creating a session.
1493    /// Pass `None` to reset to the default graph.
1494    ///
1495    /// # Errors
1496    ///
1497    /// Returns an error if the named graph does not exist.
1498    pub fn set_current_graph(&self, name: Option<&str>) -> Result<()> {
1499        #[cfg(feature = "lpg")]
1500        if let Some(name) = name
1501            && !name.eq_ignore_ascii_case("default")
1502            && let Some(store) = &self.store
1503            && store.graph(name).is_none()
1504        {
1505            return Err(Error::Query(QueryError::new(
1506                QueryErrorKind::Semantic,
1507                format!("Graph '{name}' does not exist"),
1508            )));
1509        }
1510        *self.current_graph.write() = name.map(ToString::to_string);
1511        Ok(())
1512    }
1513
1514    /// Returns the current schema name, if any.
1515    ///
1516    /// This is the persistent schema context used by one-shot `execute()` calls.
1517    /// It is updated whenever `execute()` encounters `SESSION SET SCHEMA` or `SESSION RESET`.
1518    #[must_use]
1519    pub fn current_schema(&self) -> Option<String> {
1520        self.current_schema.read().clone()
1521    }
1522
1523    /// Sets the current schema context for subsequent one-shot `execute()` calls.
1524    ///
1525    /// This is equivalent to running `SESSION SET SCHEMA <name>` but without creating
1526    /// a session. Pass `None` to clear the schema context.
1527    ///
1528    /// # Errors
1529    ///
1530    /// Returns an error if the named schema does not exist.
1531    pub fn set_current_schema(&self, name: Option<&str>) -> Result<()> {
1532        if let Some(name) = name
1533            && !self.catalog.schema_exists(name)
1534        {
1535            return Err(Error::Query(QueryError::new(
1536                QueryErrorKind::Semantic,
1537                format!("Schema '{name}' does not exist"),
1538            )));
1539        }
1540        *self.current_schema.write() = name.map(ToString::to_string);
1541        Ok(())
1542    }
1543
1544    /// Returns the adaptive execution configuration.
1545    #[must_use]
1546    pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
1547        &self.config.adaptive
1548    }
1549
1550    /// Returns `true` if this database was opened in read-only mode.
1551    #[must_use]
1552    pub fn is_read_only(&self) -> bool {
1553        self.read_only
1554    }
1555
1556    /// Returns the configuration.
1557    #[must_use]
1558    pub fn config(&self) -> &Config {
1559        &self.config
1560    }
1561
1562    /// Returns the graph data model of this database.
1563    #[must_use]
1564    pub fn graph_model(&self) -> crate::config::GraphModel {
1565        self.config.graph_model
1566    }
1567
1568    /// Returns the configured memory limit in bytes, if any.
1569    #[must_use]
1570    pub fn memory_limit(&self) -> Option<usize> {
1571        self.config.memory_limit
1572    }
1573
1574    /// Returns a point-in-time snapshot of all metrics.
1575    ///
1576    /// If the `metrics` feature is disabled or the registry is not
1577    /// initialized, returns a default (all-zero) snapshot.
1578    #[cfg(feature = "metrics")]
1579    #[must_use]
1580    pub fn metrics(&self) -> crate::metrics::MetricsSnapshot {
1581        let mut snapshot = self
1582            .metrics
1583            .as_ref()
1584            .map_or_else(crate::metrics::MetricsSnapshot::default, |m| m.snapshot());
1585
1586        // Augment with cache stats from the query cache (not tracked in the registry)
1587        let cache_stats = self.query_cache.stats();
1588        snapshot.cache_hits = cache_stats.parsed_hits + cache_stats.optimized_hits;
1589        snapshot.cache_misses = cache_stats.parsed_misses + cache_stats.optimized_misses;
1590        snapshot.cache_size = cache_stats.parsed_size + cache_stats.optimized_size;
1591        snapshot.cache_invalidations = cache_stats.invalidations;
1592
1593        snapshot
1594    }
1595
1596    /// Returns all metrics in Prometheus text exposition format.
1597    ///
1598    /// The output is ready to serve from an HTTP `/metrics` endpoint.
1599    #[cfg(feature = "metrics")]
1600    #[must_use]
1601    pub fn metrics_prometheus(&self) -> String {
1602        self.metrics
1603            .as_ref()
1604            .map_or_else(String::new, |m| m.to_prometheus())
1605    }
1606
1607    /// Resets all metrics counters and histograms to zero.
1608    #[cfg(feature = "metrics")]
1609    pub fn reset_metrics(&self) {
1610        if let Some(ref m) = self.metrics {
1611            m.reset();
1612        }
1613        self.query_cache.reset_stats();
1614    }
1615
1616    /// Returns the underlying (default) store.
1617    ///
1618    /// This provides direct access to the LPG store for algorithm implementations
1619    /// and admin operations (index management, schema introspection, MVCC internals).
1620    ///
1621    /// For code that only needs read/write graph operations, prefer
1622    /// [`graph_store()`](Self::graph_store) which returns the trait interface.
1623    #[cfg(feature = "lpg")]
1624    #[must_use]
1625    pub fn store(&self) -> &Arc<LpgStore> {
1626        self.lpg_store()
1627    }
1628
1629    // === Named Graph Management ===
1630
1631    /// Creates a named graph. Returns `true` if created, `false` if it already exists.
1632    ///
1633    /// # Errors
1634    ///
1635    /// Returns an error if arena allocation fails.
1636    #[cfg(feature = "lpg")]
1637    pub fn create_graph(&self, name: &str) -> Result<bool> {
1638        Ok(self.lpg_store().create_graph(name)?)
1639    }
1640
1641    /// Drops a named graph. Returns `true` if dropped, `false` if it did not exist.
1642    ///
1643    /// If the dropped graph was the active graph context, the context is reset
1644    /// to the default graph.
1645    #[cfg(feature = "lpg")]
1646    pub fn drop_graph(&self, name: &str) -> bool {
1647        let Some(store) = &self.store else {
1648            return false;
1649        };
1650        let dropped = store.drop_graph(name);
1651        if dropped {
1652            let mut current = self.current_graph.write();
1653            if current
1654                .as_deref()
1655                .is_some_and(|g| g.eq_ignore_ascii_case(name))
1656            {
1657                *current = None;
1658            }
1659        }
1660        dropped
1661    }
1662
1663    /// Returns all named graph names.
1664    #[cfg(feature = "lpg")]
1665    #[must_use]
1666    pub fn list_graphs(&self) -> Vec<String> {
1667        self.lpg_store().graph_names()
1668    }
1669
1670    // === Graph Projections ===
1671
1672    /// Creates a named graph projection (virtual subgraph).
1673    ///
1674    /// The projection filters the graph store to only include nodes with the
1675    /// specified labels and edges with the specified types. Returns `true` if
1676    /// created, `false` if a projection with that name already exists.
1677    ///
1678    /// # Examples
1679    ///
1680    /// ```
1681    /// use grafeo_engine::GrafeoDB;
1682    /// use grafeo_core::graph::ProjectionSpec;
1683    ///
1684    /// let db = GrafeoDB::new_in_memory();
1685    /// let spec = ProjectionSpec::new()
1686    ///     .with_node_labels(["Person", "City"])
1687    ///     .with_edge_types(["LIVES_IN"]);
1688    /// assert!(db.create_projection("social", spec));
1689    /// ```
1690    pub fn create_projection(
1691        &self,
1692        name: impl Into<String>,
1693        spec: grafeo_core::graph::ProjectionSpec,
1694    ) -> bool {
1695        use grafeo_core::graph::GraphProjection;
1696        use std::collections::hash_map::Entry;
1697
1698        let store = self.graph_store();
1699        let projection = Arc::new(GraphProjection::new(store, spec));
1700        let mut projections = self.projections.write();
1701        match projections.entry(name.into()) {
1702            Entry::Occupied(_) => false,
1703            Entry::Vacant(e) => {
1704                e.insert(projection);
1705                true
1706            }
1707        }
1708    }
1709
1710    /// Drops a named graph projection. Returns `true` if it existed.
1711    pub fn drop_projection(&self, name: &str) -> bool {
1712        self.projections.write().remove(name).is_some()
1713    }
1714
1715    /// Returns the names of all graph projections.
1716    #[must_use]
1717    pub fn list_projections(&self) -> Vec<String> {
1718        self.projections.read().keys().cloned().collect()
1719    }
1720
1721    /// Returns a named projection as a [`GraphStore`] trait object.
1722    #[must_use]
1723    pub fn projection(&self, name: &str) -> Option<Arc<dyn GraphStore>> {
1724        self.projections
1725            .read()
1726            .get(name)
1727            .map(|p| Arc::clone(p) as Arc<dyn GraphStore>)
1728    }
1729
1730    /// Returns the graph store as a trait object.
1731    ///
1732    /// Returns a read-only trait object for the active graph store.
1733    ///
1734    /// This provides the [`GraphStore`] interface for code that only needs
1735    /// read operations. For write access, use [`graph_store_mut()`](Self::graph_store_mut).
1736    ///
1737    /// [`GraphStore`]: grafeo_core::graph::GraphStore
1738    #[must_use]
1739    pub fn graph_store(&self) -> Arc<dyn GraphStore> {
1740        if let Some(ref ext_read) = self.external_read_store {
1741            Arc::clone(ext_read)
1742        } else {
1743            #[cfg(feature = "lpg")]
1744            {
1745                Arc::clone(self.lpg_store()) as Arc<dyn GraphStore>
1746            }
1747            #[cfg(not(feature = "lpg"))]
1748            unreachable!("no graph store available: enable the `lpg` feature or use with_store()")
1749        }
1750    }
1751
1752    /// Returns the writable graph store, if available.
1753    ///
1754    /// Returns `None` for read-only databases created via
1755    /// [`with_read_store()`](Self::with_read_store).
1756    #[must_use]
1757    pub fn graph_store_mut(&self) -> Option<Arc<dyn GraphStoreMut>> {
1758        if self.external_read_store.is_some() {
1759            self.external_write_store.as_ref().map(Arc::clone)
1760        } else {
1761            #[cfg(feature = "lpg")]
1762            {
1763                Some(Arc::clone(self.lpg_store()) as Arc<dyn GraphStoreMut>)
1764            }
1765            #[cfg(not(feature = "lpg"))]
1766            {
1767                None
1768            }
1769        }
1770    }
1771
1772    /// Garbage collects old MVCC versions that are no longer visible.
1773    ///
1774    /// Determines the minimum epoch required by active transactions and prunes
1775    /// version chains older than that threshold. Also cleans up completed
1776    /// transaction metadata in the transaction manager, and prunes the CDC
1777    /// event log according to its retention policy.
1778    pub fn gc(&self) {
1779        #[cfg(feature = "lpg")]
1780        let current_epoch = {
1781            let min_epoch = self.transaction_manager.min_active_epoch();
1782            self.lpg_store().gc_versions(min_epoch);
1783            self.transaction_manager.current_epoch()
1784        };
1785        self.transaction_manager.gc();
1786
1787        // Prune CDC events based on retention config (epoch + count limits)
1788        #[cfg(feature = "cdc")]
1789        if self.cdc_enabled.load(std::sync::atomic::Ordering::Relaxed) {
1790            #[cfg(feature = "lpg")]
1791            self.cdc_log.apply_retention(current_epoch);
1792        }
1793    }
1794
1795    /// Returns the buffer manager for memory-aware operations.
1796    #[must_use]
1797    pub fn buffer_manager(&self) -> &Arc<BufferManager> {
1798        &self.buffer_manager
1799    }
1800
1801    /// Returns the query cache.
1802    #[must_use]
1803    pub fn query_cache(&self) -> &Arc<QueryCache> {
1804        &self.query_cache
1805    }
1806
1807    /// Clears all cached query plans.
1808    ///
1809    /// This is called automatically after DDL operations, but can also be
1810    /// invoked manually after external schema changes (e.g., WAL replay,
1811    /// import) or when you want to force re-optimization of all queries.
1812    pub fn clear_plan_cache(&self) {
1813        self.query_cache.clear();
1814    }
1815
1816    // =========================================================================
1817    // Lifecycle
1818    // =========================================================================
1819
1820    /// Closes the database, flushing all pending writes.
1821    ///
1822    /// For persistent databases, this ensures everything is safely on disk.
1823    /// Called automatically when the database is dropped, but you can call
1824    /// it explicitly if you need to guarantee durability at a specific point.
1825    ///
1826    /// # Errors
1827    ///
1828    /// Returns an error if the WAL can't be flushed (check disk space/permissions).
1829    pub fn close(&self) -> Result<()> {
1830        let mut is_open = self.is_open.write();
1831        if !*is_open {
1832            return Ok(());
1833        }
1834
1835        // Stop the periodic checkpoint timer first, even for read-only databases.
1836        // compact() can switch a writable DB to read-only after the timer started,
1837        // so the timer must be stopped before any early return to avoid racing
1838        // with the closed file manager.
1839        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
1840        if let Some(mut timer) = self.checkpoint_timer.lock().take() {
1841            timer.stop();
1842        }
1843
1844        // Read-only databases: just release the shared lock, no checkpointing
1845        if self.read_only {
1846            #[cfg(feature = "grafeo-file")]
1847            if let Some(ref fm) = self.file_manager {
1848                fm.close()?;
1849            }
1850            *is_open = false;
1851            return Ok(());
1852        }
1853
1854        // For single-file format: checkpoint to .grafeo file, then clean up sidecar WAL.
1855        // We must do this BEFORE the WAL close path because checkpoint_to_file
1856        // removes the sidecar WAL directory.
1857        #[cfg(feature = "grafeo-file")]
1858        let is_single_file = self.file_manager.is_some();
1859        #[cfg(not(feature = "grafeo-file"))]
1860        let is_single_file = false;
1861
1862        #[cfg(feature = "grafeo-file")]
1863        if let Some(ref fm) = self.file_manager {
1864            // Flush WAL first so all records are on disk before we snapshot
1865            #[cfg(feature = "wal")]
1866            if let Some(ref wal) = self.wal {
1867                wal.sync()?;
1868            }
1869            let flush_result = self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?;
1870
1871            // Safety check: if WAL has records but the checkpoint was a no-op
1872            // (zero sections written), the container file may not contain the
1873            // latest data. This can happen when sections are not marked dirty
1874            // despite mutations going through the WAL. Force-dirty all sections
1875            // and retry before removing the sidecar.
1876            #[cfg(feature = "wal")]
1877            let flush_result = if flush_result.sections_written == 0 {
1878                if let Some(ref wal) = self.wal {
1879                    if wal.record_count() > 0 {
1880                        grafeo_warn!(
1881                            "WAL has {} records but checkpoint wrote 0 sections; retrying with forced flush",
1882                            wal.record_count()
1883                        );
1884                        self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?
1885                    } else {
1886                        flush_result
1887                    }
1888                } else {
1889                    flush_result
1890                }
1891            } else {
1892                flush_result
1893            };
1894
1895            // Release WAL file handles before removing sidecar directory.
1896            // On Windows, open handles prevent directory deletion.
1897            #[cfg(feature = "wal")]
1898            if let Some(ref wal) = self.wal {
1899                wal.close_active_log();
1900            }
1901
1902            // Only remove the sidecar WAL after verifying the checkpoint wrote
1903            // data to the container. If nothing was written and the WAL had
1904            // records, keep the sidecar so the next open can recover from it.
1905            #[cfg(feature = "wal")]
1906            let has_wal_records = self.wal.as_ref().is_some_and(|wal| wal.record_count() > 0);
1907            #[cfg(not(feature = "wal"))]
1908            let has_wal_records = false;
1909
1910            if flush_result.sections_written > 0 || !has_wal_records {
1911                {
1912                    use grafeo_common::testing::crash::maybe_crash;
1913                    maybe_crash("close:before_remove_sidecar_wal");
1914                }
1915                fm.remove_sidecar_wal()?;
1916            } else {
1917                grafeo_warn!(
1918                    "keeping sidecar WAL for recovery: checkpoint wrote 0 sections but WAL has records"
1919                );
1920            }
1921            fm.close()?;
1922        }
1923
1924        // Commit and sync WAL (legacy directory format only).
1925        // We intentionally do NOT call wal.checkpoint() here. Directory format
1926        // has no snapshot: the WAL files are the sole source of truth. Writing
1927        // checkpoint.meta would cause recovery to skip older WAL files, losing
1928        // all data that predates the current log sequence.
1929        #[cfg(feature = "wal")]
1930        if !is_single_file && let Some(ref wal) = self.wal {
1931            // Use the last assigned transaction ID, or create one for the commit record
1932            let commit_tx = self
1933                .transaction_manager
1934                .last_assigned_transaction_id()
1935                .unwrap_or_else(|| self.transaction_manager.begin());
1936
1937            // Log a TransactionCommit to mark all pending records as committed
1938            wal.log(&WalRecord::TransactionCommit {
1939                transaction_id: commit_tx,
1940            })?;
1941
1942            wal.sync()?;
1943        }
1944
1945        *is_open = false;
1946        Ok(())
1947    }
1948
1949    /// Returns the typed WAL if available.
1950    #[cfg(feature = "wal")]
1951    #[must_use]
1952    pub fn wal(&self) -> Option<&Arc<LpgWal>> {
1953        self.wal.as_ref()
1954    }
1955
1956    /// Logs a WAL record if WAL is enabled.
1957    #[cfg(feature = "wal")]
1958    pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
1959        if let Some(ref wal) = self.wal {
1960            wal.log(record)?;
1961        }
1962        Ok(())
1963    }
1964
1965    /// Registers storage sections as [`MemoryConsumer`]s with the BufferManager.
1966    ///
1967    /// Each section reports its memory usage to the buffer manager, enabling
1968    /// accurate pressure tracking. Called once after database construction.
1969    fn register_section_consumers(&mut self) {
1970        #[cfg(feature = "lpg")]
1971        let store_ref = self.store.as_ref();
1972        #[cfg(not(feature = "lpg"))]
1973        // LPG store section
1974        #[cfg(feature = "lpg")]
1975        if let Some(store) = store_ref {
1976            let lpg = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
1977            self.buffer_manager.register_consumer(Arc::new(
1978                section_consumer::SectionConsumer::new(Arc::new(lpg)),
1979            ));
1980        }
1981
1982        // RDF store: only when data exists
1983        #[cfg(feature = "triple-store")]
1984        if !self.rdf_store.is_empty() || self.rdf_store.graph_count() > 0 {
1985            let rdf = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(&self.rdf_store));
1986            self.buffer_manager.register_consumer(Arc::new(
1987                section_consumer::SectionConsumer::new(Arc::new(rdf)),
1988            ));
1989        }
1990
1991        // Vector indexes: dynamic consumer that re-queries the store on each
1992        // memory_usage() call, so dropped indexes are freed and new ones tracked.
1993        #[cfg(all(
1994            feature = "lpg",
1995            feature = "vector-index",
1996            feature = "mmap",
1997            not(feature = "temporal")
1998        ))]
1999        if let Some(store) = store_ref {
2000            let spill_path = self.buffer_manager.config().spill_path.clone();
2001            let consumer = Arc::new(section_consumer::VectorIndexConsumer::new(
2002                store, spill_path,
2003            ));
2004            // Share the spill registry with the search path
2005            self.vector_spill_storages = Some(Arc::clone(consumer.spilled_storages()));
2006            self.buffer_manager.register_consumer(consumer);
2007        }
2008
2009        // Text indexes: same dynamic approach as vector indexes.
2010        #[cfg(all(feature = "lpg", feature = "text-index"))]
2011        if let Some(store) = store_ref {
2012            self.buffer_manager
2013                .register_consumer(Arc::new(section_consumer::TextIndexConsumer::new(store)));
2014        }
2015
2016        // CDC log: register as memory consumer so the buffer manager can
2017        // prune events under memory pressure.
2018        #[cfg(feature = "cdc")]
2019        self.buffer_manager.register_consumer(
2020            Arc::clone(&self.cdc_log) as Arc<dyn grafeo_common::memory::MemoryConsumer>
2021        );
2022    }
2023
2024    /// Discovers and re-opens spill files from a previous session.
2025    ///
2026    /// When the database was closed with spilled vector embeddings, the
2027    /// `vectors_*.bin` files persist in the spill directory. This method
2028    /// scans for them, opens each as `MmapStorage`, and registers them
2029    /// in the `vector_spill_storages` map so search can read from them.
2030    #[cfg(all(
2031        feature = "lpg",
2032        feature = "vector-index",
2033        feature = "mmap",
2034        not(feature = "temporal")
2035    ))]
2036    fn restore_spill_files(&mut self) {
2037        use grafeo_core::index::vector::MmapStorage;
2038
2039        let spill_dir = match self.buffer_manager.config().spill_path {
2040            Some(ref path) => path.clone(),
2041            None => return,
2042        };
2043
2044        if !spill_dir.exists() {
2045            return;
2046        }
2047
2048        let spill_map = match self.vector_spill_storages {
2049            Some(ref map) => Arc::clone(map),
2050            None => return,
2051        };
2052
2053        let Ok(entries) = std::fs::read_dir(&spill_dir) else {
2054            return;
2055        };
2056
2057        let Some(ref store) = self.store else {
2058            return;
2059        };
2060
2061        for entry in entries.flatten() {
2062            let path = entry.path();
2063            let file_name = match path.file_name().and_then(|n| n.to_str()) {
2064                Some(name) => name.to_string(),
2065                None => continue,
2066            };
2067
2068            // Match pattern: vectors_{key}.bin where key is percent-encoded
2069            if !file_name.starts_with("vectors_")
2070                || !std::path::Path::new(&file_name)
2071                    .extension()
2072                    .is_some_and(|ext| ext.eq_ignore_ascii_case("bin"))
2073            {
2074                continue;
2075            }
2076
2077            // Extract and decode key: "vectors_Label%3Aembedding.bin" -> "Label:embedding"
2078            let key_part = &file_name["vectors_".len()..file_name.len() - ".bin".len()];
2079
2080            // Percent-decode: %3A -> ':', %25 -> '%'
2081            let key = key_part.replace("%3A", ":").replace("%25", "%");
2082
2083            // Key must contain ':' (label:property format)
2084            if !key.contains(':') {
2085                // Legacy file with old encoding, skip (will be re-created on next spill)
2086                continue;
2087            }
2088
2089            // Only restore if the corresponding vector index exists
2090            if store.get_vector_index_by_key(&key).is_none() {
2091                // Stale spill file (index was dropped), clean it up
2092                let _ = std::fs::remove_file(&path);
2093                continue;
2094            }
2095
2096            // Open the MmapStorage
2097            match MmapStorage::open(&path) {
2098                Ok(mmap_storage) => {
2099                    // Mark the property column as spilled so get() returns None
2100                    let property = key.split(':').nth(1).unwrap_or("");
2101                    let prop_key = grafeo_common::types::PropertyKey::new(property);
2102                    store.node_properties_mark_spilled(&prop_key);
2103
2104                    spill_map.write().insert(key, Arc::new(mmap_storage));
2105                }
2106                Err(e) => {
2107                    eprintln!("failed to restore spill file {}: {e}", path.display());
2108                    // Remove corrupt spill file
2109                    let _ = std::fs::remove_file(&path);
2110                }
2111            }
2112        }
2113    }
2114
2115    /// Builds section objects for the current database state.
2116    #[cfg(feature = "grafeo-file")]
2117    fn build_sections(&self) -> Vec<Box<dyn grafeo_common::storage::Section>> {
2118        let mut sections: Vec<Box<dyn grafeo_common::storage::Section>> = Vec::new();
2119
2120        // LPG sections: store, catalog, vector indexes, text indexes
2121        #[cfg(feature = "lpg")]
2122        if let Some(store) = self.store.as_ref() {
2123            let lpg = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
2124
2125            let catalog = catalog_section::CatalogSection::new(
2126                Arc::clone(&self.catalog),
2127                Arc::clone(store),
2128                {
2129                    let tm = Arc::clone(&self.transaction_manager);
2130                    move || tm.current_epoch().as_u64()
2131                },
2132            );
2133
2134            sections.push(Box::new(catalog));
2135            sections.push(Box::new(lpg));
2136
2137            // Vector indexes: persist HNSW topology to avoid rebuild on load
2138            #[cfg(feature = "vector-index")]
2139            {
2140                let indexes = store.vector_index_entries();
2141                if !indexes.is_empty() {
2142                    let vector = grafeo_core::index::vector::VectorStoreSection::new(indexes);
2143                    sections.push(Box::new(vector));
2144                }
2145            }
2146
2147            // Text indexes: persist BM25 postings to avoid rebuild on load
2148            #[cfg(feature = "text-index")]
2149            {
2150                let indexes = store.text_index_entries();
2151                if !indexes.is_empty() {
2152                    let text = grafeo_core::index::text::TextIndexSection::new(indexes);
2153                    sections.push(Box::new(text));
2154                }
2155            }
2156        }
2157
2158        #[cfg(feature = "triple-store")]
2159        if !self.rdf_store.is_empty() || self.rdf_store.graph_count() > 0 {
2160            let rdf = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(&self.rdf_store));
2161            sections.push(Box::new(rdf));
2162        }
2163
2164        sections
2165    }
2166
2167    // =========================================================================
2168    // Backup API
2169    // =========================================================================
2170
2171    /// Creates a full backup of the database in the given directory.
2172    ///
2173    /// Checkpoints the database, copies the `.grafeo` file, and creates a
2174    /// backup manifest. Subsequent incremental backups will use this as the
2175    /// base.
2176    ///
2177    /// # Errors
2178    ///
2179    /// Returns an error if the database has no file manager or I/O fails.
2180    #[cfg(all(feature = "wal", feature = "grafeo-file", feature = "lpg"))]
2181    pub fn backup_full(&self, backup_dir: &std::path::Path) -> Result<backup::BackupSegment> {
2182        let fm = self
2183            .file_manager
2184            .as_ref()
2185            .ok_or_else(|| Error::Internal("backup requires a persistent database".to_string()))?;
2186
2187        // Checkpoint first to ensure the container has latest data
2188        let _ = self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?;
2189
2190        let current_epoch = self.transaction_manager.current_epoch();
2191        backup::do_backup_full(backup_dir, fm.path(), self.wal.as_deref(), current_epoch)
2192    }
2193
2194    /// Creates an incremental backup containing WAL records since the last backup.
2195    ///
2196    /// Requires a prior full backup in the backup directory.
2197    ///
2198    /// # Errors
2199    ///
2200    /// Returns an error if no full backup exists, or if the WAL has no new records.
2201    #[cfg(all(feature = "wal", feature = "grafeo-file", feature = "lpg"))]
2202    pub fn backup_incremental(
2203        &self,
2204        backup_dir: &std::path::Path,
2205    ) -> Result<backup::BackupSegment> {
2206        let wal = self
2207            .wal
2208            .as_ref()
2209            .ok_or_else(|| Error::Internal("incremental backup requires WAL".to_string()))?;
2210
2211        let current_epoch = self.transaction_manager.current_epoch();
2212        backup::do_backup_incremental(backup_dir, wal, current_epoch)
2213    }
2214
2215    /// Returns the backup manifest for a backup directory, if one exists.
2216    ///
2217    /// # Errors
2218    ///
2219    /// Returns an error if the manifest file exists but cannot be parsed.
2220    #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2221    pub fn read_backup_manifest(
2222        backup_dir: &std::path::Path,
2223    ) -> Result<Option<backup::BackupManifest>> {
2224        backup::read_manifest(backup_dir)
2225    }
2226
2227    /// Returns the current backup cursor (last backed-up position), if any.
2228    #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2229    #[must_use]
2230    pub fn backup_cursor(&self) -> Option<backup::BackupCursor> {
2231        self.wal
2232            .as_ref()
2233            .and_then(|wal| backup::read_backup_cursor(wal.dir()).ok().flatten())
2234    }
2235
2236    /// Restores a database from a backup chain to a specific epoch.
2237    ///
2238    /// Copies the full backup to `output_path`, then replays incremental
2239    /// WAL segments up to `target_epoch`. The restored database can be
2240    /// opened with [`GrafeoDB::open`].
2241    ///
2242    /// # Errors
2243    ///
2244    /// Returns an error if the backup chain does not cover the target epoch,
2245    /// segment checksums fail, or I/O fails.
2246    #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2247    pub fn restore_to_epoch(
2248        backup_dir: &std::path::Path,
2249        target_epoch: grafeo_common::types::EpochId,
2250        output_path: &std::path::Path,
2251    ) -> Result<()> {
2252        backup::do_restore_to_epoch(backup_dir, target_epoch, output_path)
2253    }
2254
2255    /// Writes the current database state to the `.grafeo` file using the unified flush.
2256    ///
2257    /// Does NOT remove the sidecar WAL: callers that want to clean up
2258    /// the sidecar (e.g. `close()`) should call `fm.remove_sidecar_wal()`
2259    /// separately after this returns.
2260    #[cfg(feature = "grafeo-file")]
2261    fn checkpoint_to_file(
2262        &self,
2263        fm: &GrafeoFileManager,
2264        reason: flush::FlushReason,
2265    ) -> Result<flush::FlushResult> {
2266        let sections = self.build_sections();
2267        let section_refs: Vec<&dyn grafeo_common::storage::Section> =
2268            sections.iter().map(|s| s.as_ref()).collect();
2269        #[cfg(feature = "lpg")]
2270        let context = flush::build_context(self.lpg_store(), &self.transaction_manager);
2271        #[cfg(not(feature = "lpg"))]
2272        let context = flush::build_context_minimal(&self.transaction_manager);
2273
2274        flush::flush(
2275            fm,
2276            &section_refs,
2277            &context,
2278            reason,
2279            #[cfg(feature = "wal")]
2280            self.wal.as_deref(),
2281        )
2282    }
2283
2284    /// Returns the file manager if using single-file format.
2285    #[cfg(feature = "grafeo-file")]
2286    #[must_use]
2287    pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
2288        self.file_manager.as_ref()
2289    }
2290}
2291
2292impl Drop for GrafeoDB {
2293    fn drop(&mut self) {
2294        if let Err(e) = self.close() {
2295            grafeo_error!("Error closing database: {}", e);
2296        }
2297    }
2298}
2299
2300#[cfg(feature = "lpg")]
2301impl crate::admin::AdminService for GrafeoDB {
2302    fn info(&self) -> crate::admin::DatabaseInfo {
2303        self.info()
2304    }
2305
2306    fn detailed_stats(&self) -> crate::admin::DatabaseStats {
2307        self.detailed_stats()
2308    }
2309
2310    fn schema(&self) -> crate::admin::SchemaInfo {
2311        self.schema()
2312    }
2313
2314    fn validate(&self) -> crate::admin::ValidationResult {
2315        self.validate()
2316    }
2317
2318    fn wal_status(&self) -> crate::admin::WalStatus {
2319        self.wal_status()
2320    }
2321
2322    fn wal_checkpoint(&self) -> Result<()> {
2323        self.wal_checkpoint()
2324    }
2325}
2326
2327// =========================================================================
2328// Query Result Types
2329// =========================================================================
2330
2331/// The result of running a query.
2332///
2333/// Contains rows and columns, like a table. Use [`iter()`](Self::iter) to
2334/// loop through rows, or [`scalar()`](Self::scalar) if you expect a single value.
2335///
2336/// # Examples
2337///
2338/// ```
2339/// use grafeo_engine::GrafeoDB;
2340///
2341/// let db = GrafeoDB::new_in_memory();
2342/// db.create_node(&["Person"]);
2343///
2344/// let result = db.execute("MATCH (p:Person) RETURN count(p) AS total")?;
2345///
2346/// // Check what we got
2347/// println!("Columns: {:?}", result.columns);
2348/// println!("Rows: {}", result.row_count());
2349///
2350/// // Iterate through results
2351/// for row in result.iter() {
2352///     println!("{:?}", row);
2353/// }
2354/// # Ok::<(), grafeo_common::utils::error::Error>(())
2355/// ```
2356#[derive(Debug)]
2357pub struct QueryResult {
2358    /// Column names from the RETURN clause.
2359    pub columns: Vec<String>,
2360    /// Column types - useful for distinguishing NodeId/EdgeId from plain integers.
2361    pub column_types: Vec<grafeo_common::types::LogicalType>,
2362    /// The actual result rows.
2363    ///
2364    /// Use [`rows()`](Self::rows) for borrowed access or
2365    /// [`into_rows()`](Self::into_rows) to take ownership.
2366    pub(crate) rows: Vec<Vec<grafeo_common::types::Value>>,
2367    /// Query execution time in milliseconds (if timing was enabled).
2368    pub execution_time_ms: Option<f64>,
2369    /// Number of rows scanned during query execution (estimate).
2370    pub rows_scanned: Option<u64>,
2371    /// Status message for DDL and session commands (e.g., "Created node type 'Person'").
2372    pub status_message: Option<String>,
2373    /// GQLSTATUS code per ISO/IEC 39075:2024, sec 23.
2374    pub gql_status: grafeo_common::utils::GqlStatus,
2375}
2376
2377impl QueryResult {
2378    /// Creates a fully empty query result (no columns, no rows).
2379    #[must_use]
2380    pub fn empty() -> Self {
2381        Self {
2382            columns: Vec::new(),
2383            column_types: Vec::new(),
2384            rows: Vec::new(),
2385            execution_time_ms: None,
2386            rows_scanned: None,
2387            status_message: None,
2388            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2389        }
2390    }
2391
2392    /// Creates a query result with only a status message (for DDL commands).
2393    #[must_use]
2394    pub fn status(msg: impl Into<String>) -> Self {
2395        Self {
2396            columns: Vec::new(),
2397            column_types: Vec::new(),
2398            rows: Vec::new(),
2399            execution_time_ms: None,
2400            rows_scanned: None,
2401            status_message: Some(msg.into()),
2402            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2403        }
2404    }
2405
2406    /// Creates a new empty query result.
2407    #[must_use]
2408    pub fn new(columns: Vec<String>) -> Self {
2409        let len = columns.len();
2410        Self {
2411            columns,
2412            column_types: vec![grafeo_common::types::LogicalType::Any; len],
2413            rows: Vec::new(),
2414            execution_time_ms: None,
2415            rows_scanned: None,
2416            status_message: None,
2417            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2418        }
2419    }
2420
2421    /// Creates a new empty query result with column types.
2422    #[must_use]
2423    pub fn with_types(
2424        columns: Vec<String>,
2425        column_types: Vec<grafeo_common::types::LogicalType>,
2426    ) -> Self {
2427        Self {
2428            columns,
2429            column_types,
2430            rows: Vec::new(),
2431            execution_time_ms: None,
2432            rows_scanned: None,
2433            status_message: None,
2434            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2435        }
2436    }
2437
2438    /// Creates a query result with pre-populated rows.
2439    #[must_use]
2440    pub fn from_rows(columns: Vec<String>, rows: Vec<Vec<grafeo_common::types::Value>>) -> Self {
2441        let len = columns.len();
2442        Self {
2443            columns,
2444            column_types: vec![grafeo_common::types::LogicalType::Any; len],
2445            rows,
2446            execution_time_ms: None,
2447            rows_scanned: None,
2448            status_message: None,
2449            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2450        }
2451    }
2452
2453    /// Appends a row to this result.
2454    pub fn push_row(&mut self, row: Vec<grafeo_common::types::Value>) {
2455        self.rows.push(row);
2456    }
2457
2458    /// Sets the execution metrics on this result.
2459    pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
2460        self.execution_time_ms = Some(execution_time_ms);
2461        self.rows_scanned = Some(rows_scanned);
2462        self
2463    }
2464
2465    /// Returns the execution time in milliseconds, if available.
2466    #[must_use]
2467    pub fn execution_time_ms(&self) -> Option<f64> {
2468        self.execution_time_ms
2469    }
2470
2471    /// Returns the number of rows scanned, if available.
2472    #[must_use]
2473    pub fn rows_scanned(&self) -> Option<u64> {
2474        self.rows_scanned
2475    }
2476
2477    /// Returns the number of rows.
2478    #[must_use]
2479    pub fn row_count(&self) -> usize {
2480        self.rows.len()
2481    }
2482
2483    /// Returns the number of columns.
2484    #[must_use]
2485    pub fn column_count(&self) -> usize {
2486        self.columns.len()
2487    }
2488
2489    /// Returns true if the result is empty.
2490    #[must_use]
2491    pub fn is_empty(&self) -> bool {
2492        self.rows.is_empty()
2493    }
2494
2495    /// Extracts a single value from the result.
2496    ///
2497    /// Use this when your query returns exactly one row with one column,
2498    /// like `RETURN count(n)` or `RETURN sum(p.amount)`.
2499    ///
2500    /// # Errors
2501    ///
2502    /// Returns an error if the result has multiple rows or columns.
2503    pub fn scalar<T: FromValue>(&self) -> Result<T> {
2504        if self.rows.len() != 1 || self.columns.len() != 1 {
2505            return Err(grafeo_common::utils::error::Error::InvalidValue(
2506                "Expected single value".to_string(),
2507            ));
2508        }
2509        T::from_value(&self.rows[0][0])
2510    }
2511
2512    /// Returns a slice of all result rows.
2513    #[must_use]
2514    pub fn rows(&self) -> &[Vec<grafeo_common::types::Value>] {
2515        &self.rows
2516    }
2517
2518    /// Takes ownership of all result rows.
2519    #[must_use]
2520    pub fn into_rows(self) -> Vec<Vec<grafeo_common::types::Value>> {
2521        self.rows
2522    }
2523
2524    /// Returns an iterator over the rows.
2525    pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
2526        self.rows.iter()
2527    }
2528
2529    /// Converts this query result to an Arrow [`RecordBatch`](arrow_array::RecordBatch).
2530    ///
2531    /// Each column in the result becomes an Arrow array. Type mapping:
2532    /// - `Int64` / `Float64` / `Bool` / `String` / `Bytes`: direct Arrow equivalents
2533    /// - `Timestamp` / `ZonedDatetime`: `Timestamp(Microsecond, UTC)`
2534    /// - `Date`: `Date32`, `Time`: `Time64(Nanosecond)`
2535    /// - `Vector`: `FixedSizeList(Float32, dim)`
2536    /// - `Duration` / `List` / `Map` / `Path`: serialized as `Utf8`
2537    ///
2538    /// Heterogeneous columns (mixed types) fall back to `Utf8`.
2539    ///
2540    /// # Errors
2541    ///
2542    /// Returns [`ArrowExportError`](arrow::ArrowExportError) if Arrow array construction fails.
2543    #[cfg(feature = "arrow-export")]
2544    pub fn to_record_batch(
2545        &self,
2546    ) -> std::result::Result<arrow_array::RecordBatch, arrow::ArrowExportError> {
2547        arrow::query_result_to_record_batch(&self.columns, &self.column_types, &self.rows)
2548    }
2549
2550    /// Serializes this query result as Arrow IPC stream bytes.
2551    ///
2552    /// The returned bytes can be read by any Arrow implementation:
2553    /// - Python: `pyarrow.ipc.open_stream(buf).read_all()`
2554    /// - Polars: `pl.read_ipc(buf)`
2555    /// - Node.js: `apache-arrow` `RecordBatchStreamReader`
2556    ///
2557    /// # Errors
2558    ///
2559    /// Returns [`ArrowExportError`](arrow::ArrowExportError) on conversion or serialization failure.
2560    #[cfg(feature = "arrow-export")]
2561    pub fn to_arrow_ipc(&self) -> std::result::Result<Vec<u8>, arrow::ArrowExportError> {
2562        let batch = self.to_record_batch()?;
2563        arrow::record_batch_to_ipc_stream(&batch)
2564    }
2565}
2566
2567impl std::fmt::Display for QueryResult {
2568    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2569        let table = grafeo_common::fmt::format_result_table(
2570            &self.columns,
2571            &self.rows,
2572            self.execution_time_ms,
2573            self.status_message.as_deref(),
2574        );
2575        f.write_str(&table)
2576    }
2577}
2578
2579/// Converts a [`grafeo_common::types::Value`] to a concrete Rust type.
2580///
2581/// Implemented for common types like `i64`, `f64`, `String`, and `bool`.
2582/// Used by [`QueryResult::scalar()`] to extract typed values.
2583pub trait FromValue: Sized {
2584    /// Attempts the conversion, returning an error on type mismatch.
2585    ///
2586    /// # Errors
2587    ///
2588    /// Returns `Error::TypeMismatch` if the value is not the expected type.
2589    fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
2590}
2591
2592impl FromValue for i64 {
2593    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2594        value
2595            .as_int64()
2596            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2597                expected: "INT64".to_string(),
2598                found: value.type_name().to_string(),
2599            })
2600    }
2601}
2602
2603impl FromValue for f64 {
2604    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2605        value
2606            .as_float64()
2607            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2608                expected: "FLOAT64".to_string(),
2609                found: value.type_name().to_string(),
2610            })
2611    }
2612}
2613
2614impl FromValue for String {
2615    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2616        value.as_str().map(String::from).ok_or_else(|| {
2617            grafeo_common::utils::error::Error::TypeMismatch {
2618                expected: "STRING".to_string(),
2619                found: value.type_name().to_string(),
2620            }
2621        })
2622    }
2623}
2624
2625impl FromValue for bool {
2626    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2627        value
2628            .as_bool()
2629            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2630                expected: "BOOL".to_string(),
2631                found: value.type_name().to_string(),
2632            })
2633    }
2634}
2635
2636#[cfg(test)]
2637mod tests {
2638    use super::*;
2639
2640    #[test]
2641    fn test_create_in_memory_database() {
2642        let db = GrafeoDB::new_in_memory();
2643        assert_eq!(db.node_count(), 0);
2644        assert_eq!(db.edge_count(), 0);
2645    }
2646
2647    #[test]
2648    fn test_database_config() {
2649        let config = Config::in_memory().with_threads(4).with_query_logging();
2650
2651        let db = GrafeoDB::with_config(config).unwrap();
2652        assert_eq!(db.config().threads, 4);
2653        assert!(db.config().query_logging);
2654    }
2655
2656    #[test]
2657    fn test_database_session() {
2658        let db = GrafeoDB::new_in_memory();
2659        let _session = db.session();
2660        // Session should be created successfully
2661    }
2662
2663    #[cfg(feature = "wal")]
2664    #[test]
2665    fn test_persistent_database_recovery() {
2666        use grafeo_common::types::Value;
2667        use tempfile::tempdir;
2668
2669        let dir = tempdir().unwrap();
2670        let db_path = dir.path().join("test_db");
2671
2672        // Create database and add some data
2673        {
2674            let db = GrafeoDB::open(&db_path).unwrap();
2675
2676            let alix = db.create_node(&["Person"]);
2677            db.set_node_property(alix, "name", Value::from("Alix"));
2678
2679            let gus = db.create_node(&["Person"]);
2680            db.set_node_property(gus, "name", Value::from("Gus"));
2681
2682            let _edge = db.create_edge(alix, gus, "KNOWS");
2683
2684            // Explicitly close to flush WAL
2685            db.close().unwrap();
2686        }
2687
2688        // Reopen and verify data was recovered
2689        {
2690            let db = GrafeoDB::open(&db_path).unwrap();
2691
2692            assert_eq!(db.node_count(), 2);
2693            assert_eq!(db.edge_count(), 1);
2694
2695            // Verify nodes exist
2696            let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
2697            assert!(node0.is_some());
2698
2699            let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
2700            assert!(node1.is_some());
2701        }
2702    }
2703
2704    #[cfg(feature = "wal")]
2705    #[test]
2706    fn test_wal_logging() {
2707        use tempfile::tempdir;
2708
2709        let dir = tempdir().unwrap();
2710        let db_path = dir.path().join("wal_test_db");
2711
2712        let db = GrafeoDB::open(&db_path).unwrap();
2713
2714        // Create some data
2715        let node = db.create_node(&["Test"]);
2716        db.delete_node(node);
2717
2718        // WAL should have records
2719        if let Some(wal) = db.wal() {
2720            assert!(wal.record_count() > 0);
2721        }
2722
2723        db.close().unwrap();
2724    }
2725
2726    #[cfg(feature = "wal")]
2727    #[test]
2728    fn test_wal_recovery_multiple_sessions() {
2729        // Tests that WAL recovery works correctly across multiple open/close cycles
2730        use grafeo_common::types::Value;
2731        use tempfile::tempdir;
2732
2733        let dir = tempdir().unwrap();
2734        let db_path = dir.path().join("multi_session_db");
2735
2736        // Session 1: Create initial data
2737        {
2738            let db = GrafeoDB::open(&db_path).unwrap();
2739            let alix = db.create_node(&["Person"]);
2740            db.set_node_property(alix, "name", Value::from("Alix"));
2741            db.close().unwrap();
2742        }
2743
2744        // Session 2: Add more data
2745        {
2746            let db = GrafeoDB::open(&db_path).unwrap();
2747            assert_eq!(db.node_count(), 1); // Previous data recovered
2748            let gus = db.create_node(&["Person"]);
2749            db.set_node_property(gus, "name", Value::from("Gus"));
2750            db.close().unwrap();
2751        }
2752
2753        // Session 3: Verify all data
2754        {
2755            let db = GrafeoDB::open(&db_path).unwrap();
2756            assert_eq!(db.node_count(), 2);
2757
2758            // Verify properties were recovered correctly
2759            let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
2760            assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
2761
2762            let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
2763            assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
2764        }
2765    }
2766
2767    #[cfg(feature = "wal")]
2768    #[test]
2769    fn test_database_consistency_after_mutations() {
2770        // Tests that database remains consistent after a series of create/delete operations
2771        use grafeo_common::types::Value;
2772        use tempfile::tempdir;
2773
2774        let dir = tempdir().unwrap();
2775        let db_path = dir.path().join("consistency_db");
2776
2777        {
2778            let db = GrafeoDB::open(&db_path).unwrap();
2779
2780            // Create nodes
2781            let a = db.create_node(&["Node"]);
2782            let b = db.create_node(&["Node"]);
2783            let c = db.create_node(&["Node"]);
2784
2785            // Create edges
2786            let e1 = db.create_edge(a, b, "LINKS");
2787            let _e2 = db.create_edge(b, c, "LINKS");
2788
2789            // Delete middle node and its edge
2790            db.delete_edge(e1);
2791            db.delete_node(b);
2792
2793            // Set properties on remaining nodes
2794            db.set_node_property(a, "value", Value::Int64(1));
2795            db.set_node_property(c, "value", Value::Int64(3));
2796
2797            db.close().unwrap();
2798        }
2799
2800        // Reopen and verify consistency
2801        {
2802            let db = GrafeoDB::open(&db_path).unwrap();
2803
2804            // Should have 2 nodes (a and c), b was deleted
2805            // Note: node_count includes deleted nodes in some implementations
2806            // What matters is that the non-deleted nodes are accessible
2807            let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
2808            assert!(node_a.is_some());
2809
2810            let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
2811            assert!(node_c.is_some());
2812
2813            // Middle node should be deleted
2814            let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
2815            assert!(node_b.is_none());
2816        }
2817    }
2818
2819    #[cfg(feature = "wal")]
2820    #[test]
2821    fn test_close_is_idempotent() {
2822        // Calling close() multiple times should not cause errors
2823        use tempfile::tempdir;
2824
2825        let dir = tempdir().unwrap();
2826        let db_path = dir.path().join("close_test_db");
2827
2828        let db = GrafeoDB::open(&db_path).unwrap();
2829        db.create_node(&["Test"]);
2830
2831        // First close should succeed
2832        assert!(db.close().is_ok());
2833
2834        // Second close should also succeed (idempotent)
2835        assert!(db.close().is_ok());
2836    }
2837
2838    #[test]
2839    fn test_with_store_external_backend() {
2840        use grafeo_core::graph::lpg::LpgStore;
2841
2842        let external = Arc::new(LpgStore::new().unwrap());
2843
2844        // Seed data on the external store directly
2845        let n1 = external.create_node(&["Person"]);
2846        external.set_node_property(n1, "name", grafeo_common::types::Value::from("Alix"));
2847
2848        let db = GrafeoDB::with_store(
2849            Arc::clone(&external) as Arc<dyn GraphStoreMut>,
2850            Config::in_memory(),
2851        )
2852        .unwrap();
2853
2854        let session = db.session();
2855
2856        // Session should see data from the external store via execute
2857        #[cfg(feature = "gql")]
2858        {
2859            let result = session.execute("MATCH (p:Person) RETURN p.name").unwrap();
2860            assert_eq!(result.rows.len(), 1);
2861        }
2862    }
2863
2864    #[test]
2865    fn test_with_config_custom_memory_limit() {
2866        let config = Config::in_memory().with_memory_limit(64 * 1024 * 1024); // 64 MB
2867
2868        let db = GrafeoDB::with_config(config).unwrap();
2869        assert_eq!(db.config().memory_limit, Some(64 * 1024 * 1024));
2870        assert_eq!(db.node_count(), 0);
2871    }
2872
2873    #[cfg(feature = "metrics")]
2874    #[test]
2875    fn test_database_metrics_registry() {
2876        let db = GrafeoDB::new_in_memory();
2877
2878        // Perform some operations
2879        db.create_node(&["Person"]);
2880        db.create_node(&["Person"]);
2881
2882        // Check that metrics snapshot returns data
2883        let snap = db.metrics();
2884        // Session created counter should reflect at least 0 (metrics is initialized)
2885        assert_eq!(snap.query_count, 0); // No queries executed yet
2886    }
2887
2888    #[test]
2889    fn test_query_result_has_metrics() {
2890        // Verifies that query results include execution metrics
2891        let db = GrafeoDB::new_in_memory();
2892        db.create_node(&["Person"]);
2893        db.create_node(&["Person"]);
2894
2895        #[cfg(feature = "gql")]
2896        {
2897            let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
2898
2899            // Metrics should be populated
2900            assert!(result.execution_time_ms.is_some());
2901            assert!(result.rows_scanned.is_some());
2902            assert!(result.execution_time_ms.unwrap() >= 0.0);
2903            assert_eq!(result.rows_scanned.unwrap(), 2);
2904        }
2905    }
2906
2907    #[test]
2908    fn test_empty_query_result_metrics() {
2909        // Verifies metrics are correct for queries returning no results
2910        let db = GrafeoDB::new_in_memory();
2911        db.create_node(&["Person"]);
2912
2913        #[cfg(feature = "gql")]
2914        {
2915            // Query that matches nothing
2916            let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
2917
2918            assert!(result.execution_time_ms.is_some());
2919            assert!(result.rows_scanned.is_some());
2920            assert_eq!(result.rows_scanned.unwrap(), 0);
2921        }
2922    }
2923
2924    #[cfg(feature = "cdc")]
2925    mod cdc_integration {
2926        use super::*;
2927
2928        /// Helper: creates an in-memory database with CDC enabled.
2929        fn cdc_db() -> GrafeoDB {
2930            GrafeoDB::with_config(Config::in_memory().with_cdc()).unwrap()
2931        }
2932
2933        #[test]
2934        fn test_node_lifecycle_history() {
2935            let db = cdc_db();
2936
2937            // Create
2938            let id = db.create_node(&["Person"]);
2939            // Update
2940            db.set_node_property(id, "name", "Alix".into());
2941            db.set_node_property(id, "name", "Gus".into());
2942            // Delete
2943            db.delete_node(id);
2944
2945            let history = db.history(id).unwrap();
2946            assert_eq!(history.len(), 4); // create + 2 updates + delete
2947            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2948            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
2949            assert!(history[1].before.is_none()); // first set_node_property has no prior value
2950            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
2951            assert!(history[2].before.is_some()); // second update has prior "Alix"
2952            assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
2953        }
2954
2955        #[test]
2956        fn test_edge_lifecycle_history() {
2957            let db = cdc_db();
2958
2959            let alix = db.create_node(&["Person"]);
2960            let gus = db.create_node(&["Person"]);
2961            let edge = db.create_edge(alix, gus, "KNOWS");
2962            db.set_edge_property(edge, "since", 2024i64.into());
2963            db.delete_edge(edge);
2964
2965            let history = db.history(edge).unwrap();
2966            assert_eq!(history.len(), 3); // create + update + delete
2967            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2968            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
2969            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
2970        }
2971
2972        #[test]
2973        fn test_create_node_with_props_cdc() {
2974            let db = cdc_db();
2975
2976            let id = db.create_node_with_props(
2977                &["Person"],
2978                vec![
2979                    ("name", grafeo_common::types::Value::from("Alix")),
2980                    ("age", grafeo_common::types::Value::from(30i64)),
2981                ],
2982            );
2983
2984            let history = db.history(id).unwrap();
2985            assert_eq!(history.len(), 1);
2986            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2987            // Props should be captured
2988            let after = history[0].after.as_ref().unwrap();
2989            assert_eq!(after.len(), 2);
2990        }
2991
2992        #[test]
2993        fn test_changes_between() {
2994            let db = cdc_db();
2995
2996            let id1 = db.create_node(&["A"]);
2997            let _id2 = db.create_node(&["B"]);
2998            db.set_node_property(id1, "x", 1i64.into());
2999
3000            // All events should be at the same epoch (in-memory, epoch doesn't advance without tx)
3001            let changes = db
3002                .changes_between(
3003                    grafeo_common::types::EpochId(0),
3004                    grafeo_common::types::EpochId(u64::MAX),
3005                )
3006                .unwrap();
3007            assert_eq!(changes.len(), 3); // 2 creates + 1 update
3008        }
3009
3010        #[test]
3011        fn test_cdc_disabled_by_default() {
3012            let db = GrafeoDB::new_in_memory();
3013            assert!(!db.is_cdc_enabled());
3014
3015            let id = db.create_node(&["Person"]);
3016            db.set_node_property(id, "name", "Alix".into());
3017
3018            let history = db.history(id).unwrap();
3019            assert!(history.is_empty(), "CDC off by default: no events recorded");
3020        }
3021
3022        #[test]
3023        fn test_session_with_cdc_override_on() {
3024            // Database default is off, but session opts in
3025            let db = GrafeoDB::new_in_memory();
3026            let session = db.session_with_cdc(true);
3027            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3028            // The CDC log should have events from the opted-in session
3029            let changes = db
3030                .changes_between(
3031                    grafeo_common::types::EpochId(0),
3032                    grafeo_common::types::EpochId(u64::MAX),
3033                )
3034                .unwrap();
3035            assert!(
3036                !changes.is_empty(),
3037                "session_with_cdc(true) should record events"
3038            );
3039        }
3040
3041        #[test]
3042        fn test_session_with_cdc_override_off() {
3043            // Database default is on, but session opts out
3044            let db = cdc_db();
3045            let session = db.session_with_cdc(false);
3046            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3047            let changes = db
3048                .changes_between(
3049                    grafeo_common::types::EpochId(0),
3050                    grafeo_common::types::EpochId(u64::MAX),
3051                )
3052                .unwrap();
3053            assert!(
3054                changes.is_empty(),
3055                "session_with_cdc(false) should not record events"
3056            );
3057        }
3058
3059        #[test]
3060        fn test_set_cdc_enabled_runtime() {
3061            let db = GrafeoDB::new_in_memory();
3062            assert!(!db.is_cdc_enabled());
3063
3064            // Enable at runtime
3065            db.set_cdc_enabled(true);
3066            assert!(db.is_cdc_enabled());
3067
3068            let id = db.create_node(&["Person"]);
3069            let history = db.history(id).unwrap();
3070            assert_eq!(history.len(), 1, "CDC enabled at runtime records events");
3071
3072            // Disable again
3073            db.set_cdc_enabled(false);
3074            let id2 = db.create_node(&["Person"]);
3075            let history2 = db.history(id2).unwrap();
3076            assert!(
3077                history2.is_empty(),
3078                "CDC disabled at runtime stops recording"
3079            );
3080        }
3081    }
3082
3083    #[test]
3084    fn test_with_store_basic() {
3085        use grafeo_core::graph::lpg::LpgStore;
3086
3087        let store = Arc::new(LpgStore::new().unwrap());
3088        let n1 = store.create_node(&["Person"]);
3089        store.set_node_property(n1, "name", "Alix".into());
3090
3091        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
3092        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
3093
3094        let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
3095        assert_eq!(result.rows.len(), 1);
3096    }
3097
3098    #[test]
3099    fn test_with_store_session() {
3100        use grafeo_core::graph::lpg::LpgStore;
3101
3102        let store = Arc::new(LpgStore::new().unwrap());
3103        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
3104        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
3105
3106        let session = db.session();
3107        let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
3108        assert_eq!(result.rows.len(), 1);
3109    }
3110
3111    #[test]
3112    fn test_with_store_mutations() {
3113        use grafeo_core::graph::lpg::LpgStore;
3114
3115        let store = Arc::new(LpgStore::new().unwrap());
3116        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
3117        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
3118
3119        let mut session = db.session();
3120
3121        // Use an explicit transaction so INSERT and MATCH share the same
3122        // transaction context. With PENDING epochs, uncommitted versions are
3123        // only visible to the owning transaction.
3124        session.begin_transaction().unwrap();
3125        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3126
3127        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
3128        assert_eq!(result.rows.len(), 1);
3129
3130        session.commit().unwrap();
3131    }
3132
3133    // =========================================================================
3134    // QueryResult tests
3135    // =========================================================================
3136
3137    #[test]
3138    fn test_query_result_empty() {
3139        let result = QueryResult::empty();
3140        assert!(result.is_empty());
3141        assert_eq!(result.row_count(), 0);
3142        assert_eq!(result.column_count(), 0);
3143        assert!(result.execution_time_ms().is_none());
3144        assert!(result.rows_scanned().is_none());
3145        assert!(result.status_message.is_none());
3146    }
3147
3148    #[test]
3149    fn test_query_result_status() {
3150        let result = QueryResult::status("Created node type 'Person'");
3151        assert!(result.is_empty());
3152        assert_eq!(result.column_count(), 0);
3153        assert_eq!(
3154            result.status_message.as_deref(),
3155            Some("Created node type 'Person'")
3156        );
3157    }
3158
3159    #[test]
3160    fn test_query_result_new_with_columns() {
3161        let result = QueryResult::new(vec!["name".into(), "age".into()]);
3162        assert_eq!(result.column_count(), 2);
3163        assert_eq!(result.row_count(), 0);
3164        assert!(result.is_empty());
3165        // Column types should default to Any
3166        assert_eq!(
3167            result.column_types,
3168            vec![
3169                grafeo_common::types::LogicalType::Any,
3170                grafeo_common::types::LogicalType::Any
3171            ]
3172        );
3173    }
3174
3175    #[test]
3176    fn test_query_result_with_types() {
3177        use grafeo_common::types::LogicalType;
3178        let result = QueryResult::with_types(
3179            vec!["name".into(), "age".into()],
3180            vec![LogicalType::String, LogicalType::Int64],
3181        );
3182        assert_eq!(result.column_count(), 2);
3183        assert_eq!(result.column_types[0], LogicalType::String);
3184        assert_eq!(result.column_types[1], LogicalType::Int64);
3185    }
3186
3187    #[test]
3188    fn test_query_result_with_metrics() {
3189        let result = QueryResult::new(vec!["x".into()]).with_metrics(42.5, 100);
3190        assert_eq!(result.execution_time_ms(), Some(42.5));
3191        assert_eq!(result.rows_scanned(), Some(100));
3192    }
3193
3194    #[test]
3195    fn test_query_result_scalar_success() {
3196        use grafeo_common::types::Value;
3197        let mut result = QueryResult::new(vec!["count".into()]);
3198        result.rows.push(vec![Value::Int64(42)]);
3199
3200        let val: i64 = result.scalar().unwrap();
3201        assert_eq!(val, 42);
3202    }
3203
3204    #[test]
3205    fn test_query_result_scalar_wrong_shape() {
3206        use grafeo_common::types::Value;
3207        // Multiple rows
3208        let mut result = QueryResult::new(vec!["x".into()]);
3209        result.rows.push(vec![Value::Int64(1)]);
3210        result.rows.push(vec![Value::Int64(2)]);
3211        assert!(result.scalar::<i64>().is_err());
3212
3213        // Multiple columns
3214        let mut result2 = QueryResult::new(vec!["a".into(), "b".into()]);
3215        result2.rows.push(vec![Value::Int64(1), Value::Int64(2)]);
3216        assert!(result2.scalar::<i64>().is_err());
3217
3218        // Empty
3219        let result3 = QueryResult::new(vec!["x".into()]);
3220        assert!(result3.scalar::<i64>().is_err());
3221    }
3222
3223    #[test]
3224    fn test_query_result_iter() {
3225        use grafeo_common::types::Value;
3226        let mut result = QueryResult::new(vec!["x".into()]);
3227        result.rows.push(vec![Value::Int64(1)]);
3228        result.rows.push(vec![Value::Int64(2)]);
3229
3230        let collected: Vec<_> = result.iter().collect();
3231        assert_eq!(collected.len(), 2);
3232    }
3233
3234    #[test]
3235    fn test_query_result_display() {
3236        use grafeo_common::types::Value;
3237        let mut result = QueryResult::new(vec!["name".into()]);
3238        result.rows.push(vec![Value::from("Alix")]);
3239        let display = result.to_string();
3240        assert!(display.contains("name"));
3241        assert!(display.contains("Alix"));
3242    }
3243
3244    // =========================================================================
3245    // FromValue error paths
3246    // =========================================================================
3247
3248    #[test]
3249    fn test_from_value_i64_type_mismatch() {
3250        use grafeo_common::types::Value;
3251        let val = Value::from("not a number");
3252        assert!(i64::from_value(&val).is_err());
3253    }
3254
3255    #[test]
3256    fn test_from_value_f64_type_mismatch() {
3257        use grafeo_common::types::Value;
3258        let val = Value::from("not a float");
3259        assert!(f64::from_value(&val).is_err());
3260    }
3261
3262    #[test]
3263    fn test_from_value_string_type_mismatch() {
3264        use grafeo_common::types::Value;
3265        let val = Value::Int64(42);
3266        assert!(String::from_value(&val).is_err());
3267    }
3268
3269    #[test]
3270    fn test_from_value_bool_type_mismatch() {
3271        use grafeo_common::types::Value;
3272        let val = Value::Int64(1);
3273        assert!(bool::from_value(&val).is_err());
3274    }
3275
3276    #[test]
3277    fn test_from_value_all_success() {
3278        use grafeo_common::types::Value;
3279        assert_eq!(i64::from_value(&Value::Int64(99)).unwrap(), 99);
3280        assert!((f64::from_value(&Value::Float64(2.72)).unwrap() - 2.72).abs() < f64::EPSILON);
3281        assert_eq!(String::from_value(&Value::from("hello")).unwrap(), "hello");
3282        assert!(bool::from_value(&Value::Bool(true)).unwrap());
3283    }
3284
3285    // =========================================================================
3286    // GrafeoDB accessor tests
3287    // =========================================================================
3288
3289    #[test]
3290    fn test_database_is_read_only_false_by_default() {
3291        let db = GrafeoDB::new_in_memory();
3292        assert!(!db.is_read_only());
3293    }
3294
3295    #[test]
3296    fn test_database_graph_model() {
3297        let db = GrafeoDB::new_in_memory();
3298        assert_eq!(db.graph_model(), crate::config::GraphModel::Lpg);
3299    }
3300
3301    #[test]
3302    fn test_database_memory_limit_none_by_default() {
3303        let db = GrafeoDB::new_in_memory();
3304        assert!(db.memory_limit().is_none());
3305    }
3306
3307    #[test]
3308    fn test_database_memory_limit_custom() {
3309        let config = Config::in_memory().with_memory_limit(128 * 1024 * 1024);
3310        let db = GrafeoDB::with_config(config).unwrap();
3311        assert_eq!(db.memory_limit(), Some(128 * 1024 * 1024));
3312    }
3313
3314    #[test]
3315    fn test_database_adaptive_config() {
3316        let db = GrafeoDB::new_in_memory();
3317        let adaptive = db.adaptive_config();
3318        assert!(adaptive.enabled);
3319        assert!((adaptive.threshold - 3.0).abs() < f64::EPSILON);
3320    }
3321
3322    #[test]
3323    fn test_database_buffer_manager() {
3324        let db = GrafeoDB::new_in_memory();
3325        let _bm = db.buffer_manager();
3326        // Just verify it doesn't panic
3327    }
3328
3329    #[test]
3330    fn test_database_query_cache() {
3331        let db = GrafeoDB::new_in_memory();
3332        let _qc = db.query_cache();
3333    }
3334
3335    #[test]
3336    fn test_database_clear_plan_cache() {
3337        let db = GrafeoDB::new_in_memory();
3338        // Execute a query to populate the cache
3339        #[cfg(feature = "gql")]
3340        {
3341            let _ = db.execute("MATCH (n) RETURN count(n)");
3342        }
3343        db.clear_plan_cache();
3344        // No panic means success
3345    }
3346
3347    #[test]
3348    fn test_database_gc() {
3349        let db = GrafeoDB::new_in_memory();
3350        db.create_node(&["Person"]);
3351        db.gc();
3352        // Verify no panic, node still accessible
3353        assert_eq!(db.node_count(), 1);
3354    }
3355
3356    // =========================================================================
3357    // Named graph management
3358    // =========================================================================
3359
3360    #[test]
3361    fn test_create_and_list_graphs() {
3362        let db = GrafeoDB::new_in_memory();
3363        let created = db.create_graph("social").unwrap();
3364        assert!(created);
3365
3366        // Creating same graph again returns false
3367        let created_again = db.create_graph("social").unwrap();
3368        assert!(!created_again);
3369
3370        let names = db.list_graphs();
3371        assert!(names.contains(&"social".to_string()));
3372    }
3373
3374    #[test]
3375    fn test_drop_graph() {
3376        let db = GrafeoDB::new_in_memory();
3377        db.create_graph("temp").unwrap();
3378        assert!(db.drop_graph("temp"));
3379        assert!(!db.drop_graph("temp")); // Already dropped
3380    }
3381
3382    #[test]
3383    fn test_drop_graph_resets_current_graph() {
3384        let db = GrafeoDB::new_in_memory();
3385        db.create_graph("active").unwrap();
3386        db.set_current_graph(Some("active")).unwrap();
3387        assert_eq!(db.current_graph(), Some("active".to_string()));
3388
3389        db.drop_graph("active");
3390        assert_eq!(db.current_graph(), None);
3391    }
3392
3393    // =========================================================================
3394    // Current graph / schema context
3395    // =========================================================================
3396
3397    #[test]
3398    fn test_current_graph_default_none() {
3399        let db = GrafeoDB::new_in_memory();
3400        assert_eq!(db.current_graph(), None);
3401    }
3402
3403    #[test]
3404    fn test_set_current_graph_valid() {
3405        let db = GrafeoDB::new_in_memory();
3406        db.create_graph("social").unwrap();
3407        db.set_current_graph(Some("social")).unwrap();
3408        assert_eq!(db.current_graph(), Some("social".to_string()));
3409    }
3410
3411    #[test]
3412    fn test_set_current_graph_nonexistent() {
3413        let db = GrafeoDB::new_in_memory();
3414        let result = db.set_current_graph(Some("nonexistent"));
3415        assert!(result.is_err());
3416    }
3417
3418    #[test]
3419    fn test_set_current_graph_none_resets() {
3420        let db = GrafeoDB::new_in_memory();
3421        db.create_graph("social").unwrap();
3422        db.set_current_graph(Some("social")).unwrap();
3423        db.set_current_graph(None).unwrap();
3424        assert_eq!(db.current_graph(), None);
3425    }
3426
3427    #[test]
3428    fn test_set_current_graph_default_keyword() {
3429        let db = GrafeoDB::new_in_memory();
3430        // "default" is a special case that always succeeds
3431        db.set_current_graph(Some("default")).unwrap();
3432        assert_eq!(db.current_graph(), Some("default".to_string()));
3433    }
3434
3435    #[test]
3436    fn test_current_schema_default_none() {
3437        let db = GrafeoDB::new_in_memory();
3438        assert_eq!(db.current_schema(), None);
3439    }
3440
3441    #[test]
3442    fn test_set_current_schema_nonexistent() {
3443        let db = GrafeoDB::new_in_memory();
3444        let result = db.set_current_schema(Some("nonexistent"));
3445        assert!(result.is_err());
3446    }
3447
3448    #[test]
3449    fn test_set_current_schema_none_resets() {
3450        let db = GrafeoDB::new_in_memory();
3451        db.set_current_schema(None).unwrap();
3452        assert_eq!(db.current_schema(), None);
3453    }
3454
3455    // =========================================================================
3456    // graph_store / graph_store_mut
3457    // =========================================================================
3458
3459    #[test]
3460    fn test_graph_store_returns_lpg_by_default() {
3461        let db = GrafeoDB::new_in_memory();
3462        db.create_node(&["Person"]);
3463        let store = db.graph_store();
3464        assert_eq!(store.node_count(), 1);
3465    }
3466
3467    #[test]
3468    fn test_graph_store_mut_returns_some_by_default() {
3469        let db = GrafeoDB::new_in_memory();
3470        assert!(db.graph_store_mut().is_some());
3471    }
3472
3473    #[test]
3474    fn test_with_read_store() {
3475        use grafeo_core::graph::lpg::LpgStore;
3476
3477        let store = Arc::new(LpgStore::new().unwrap());
3478        store.create_node(&["Person"]);
3479
3480        let read_store = Arc::clone(&store) as Arc<dyn GraphStore>;
3481        let db = GrafeoDB::with_read_store(read_store, Config::in_memory()).unwrap();
3482
3483        assert!(db.is_read_only());
3484        assert!(db.graph_store_mut().is_none());
3485
3486        // Read queries should work
3487        let gs = db.graph_store();
3488        assert_eq!(gs.node_count(), 1);
3489    }
3490
3491    #[test]
3492    fn test_with_store_graph_store_methods() {
3493        use grafeo_core::graph::lpg::LpgStore;
3494
3495        let store = Arc::new(LpgStore::new().unwrap());
3496        store.create_node(&["Person"]);
3497
3498        let db = GrafeoDB::with_store(
3499            Arc::clone(&store) as Arc<dyn GraphStoreMut>,
3500            Config::in_memory(),
3501        )
3502        .unwrap();
3503
3504        assert!(!db.is_read_only());
3505        assert!(db.graph_store_mut().is_some());
3506        assert_eq!(db.graph_store().node_count(), 1);
3507    }
3508
3509    // =========================================================================
3510    // session_read_only
3511    // =========================================================================
3512
3513    #[test]
3514    #[allow(deprecated)]
3515    fn test_session_read_only() {
3516        let db = GrafeoDB::new_in_memory();
3517        db.create_node(&["Person"]);
3518
3519        let session = db.session_read_only();
3520        // Read queries should work
3521        #[cfg(feature = "gql")]
3522        {
3523            let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
3524            assert_eq!(result.rows.len(), 1);
3525        }
3526    }
3527
3528    // =========================================================================
3529    // close on in-memory database
3530    // =========================================================================
3531
3532    #[test]
3533    fn test_close_in_memory_database() {
3534        let db = GrafeoDB::new_in_memory();
3535        db.create_node(&["Person"]);
3536        assert!(db.close().is_ok());
3537        // Second close should also be fine (idempotent)
3538        assert!(db.close().is_ok());
3539    }
3540
3541    // =========================================================================
3542    // with_config validation failure
3543    // =========================================================================
3544
3545    #[test]
3546    fn test_with_config_invalid_config_zero_threads() {
3547        let config = Config::in_memory().with_threads(0);
3548        let result = GrafeoDB::with_config(config);
3549        assert!(result.is_err());
3550    }
3551
3552    #[test]
3553    fn test_with_config_invalid_config_zero_memory_limit() {
3554        let config = Config::in_memory().with_memory_limit(0);
3555        let result = GrafeoDB::with_config(config);
3556        assert!(result.is_err());
3557    }
3558
3559    // =========================================================================
3560    // StorageFormat display (for config.rs coverage)
3561    // =========================================================================
3562
3563    #[test]
3564    fn test_storage_format_display() {
3565        use crate::config::StorageFormat;
3566        assert_eq!(StorageFormat::Auto.to_string(), "auto");
3567        assert_eq!(StorageFormat::WalDirectory.to_string(), "wal-directory");
3568        assert_eq!(StorageFormat::SingleFile.to_string(), "single-file");
3569    }
3570
3571    #[test]
3572    fn test_storage_format_default() {
3573        use crate::config::StorageFormat;
3574        assert_eq!(StorageFormat::default(), StorageFormat::Auto);
3575    }
3576
3577    #[test]
3578    fn test_config_with_storage_format() {
3579        use crate::config::StorageFormat;
3580        let config = Config::in_memory().with_storage_format(StorageFormat::SingleFile);
3581        assert_eq!(config.storage_format, StorageFormat::SingleFile);
3582    }
3583
3584    // =========================================================================
3585    // Config CDC
3586    // =========================================================================
3587
3588    #[test]
3589    fn test_config_with_cdc() {
3590        let config = Config::in_memory().with_cdc();
3591        assert!(config.cdc_enabled);
3592    }
3593
3594    #[test]
3595    fn test_config_cdc_default_false() {
3596        let config = Config::in_memory();
3597        assert!(!config.cdc_enabled);
3598    }
3599
3600    // =========================================================================
3601    // ConfigError as std::error::Error
3602    // =========================================================================
3603
3604    #[test]
3605    fn test_config_error_is_error_trait() {
3606        use crate::config::ConfigError;
3607        let err: Box<dyn std::error::Error> = Box::new(ConfigError::ZeroMemoryLimit);
3608        assert!(err.source().is_none());
3609    }
3610
3611    // =========================================================================
3612    // Metrics tests
3613    // =========================================================================
3614
3615    #[cfg(feature = "metrics")]
3616    #[test]
3617    fn test_metrics_prometheus_output() {
3618        let db = GrafeoDB::new_in_memory();
3619        let prom = db.metrics_prometheus();
3620        // Should contain at least some metric names
3621        assert!(!prom.is_empty());
3622    }
3623
3624    #[cfg(feature = "metrics")]
3625    #[test]
3626    fn test_reset_metrics() {
3627        let db = GrafeoDB::new_in_memory();
3628        // Execute something to generate metrics
3629        let _session = db.session();
3630        db.reset_metrics();
3631        let snap = db.metrics();
3632        assert_eq!(snap.query_count, 0);
3633    }
3634
3635    // =========================================================================
3636    // drop_graph on external store
3637    // =========================================================================
3638
3639    #[test]
3640    fn test_drop_graph_on_external_store() {
3641        use grafeo_core::graph::lpg::LpgStore;
3642
3643        let store = Arc::new(LpgStore::new().unwrap());
3644        let read_store = Arc::clone(&store) as Arc<dyn GraphStore>;
3645        let db = GrafeoDB::with_read_store(read_store, Config::in_memory()).unwrap();
3646
3647        // drop_graph with external store (no built-in store) returns false
3648        assert!(!db.drop_graph("anything"));
3649    }
3650}