Skip to main content

grafeo_engine/database/
mod.rs

1//! The main database struct and operations.
2//!
3//! Start here with [`GrafeoDB`] - it's your handle to everything.
4//!
5//! Operations are split across focused submodules:
6//! - `query` - Query execution (execute, execute_cypher, etc.)
7//! - `crud` - Node/edge CRUD operations
8//! - `index` - Property, vector, and text index management
9//! - `search` - Vector, text, and hybrid search
10//! - `embed` - Embedding model management
11//! - `persistence` - Save, load, snapshots, iteration
12//! - `admin` - Stats, introspection, diagnostics, CDC
13
14#[cfg(feature = "lpg")]
15mod admin;
16#[cfg(feature = "arrow-export")]
17pub mod arrow;
18#[cfg(all(feature = "async-storage", feature = "lpg"))]
19mod async_ops;
20#[cfg(all(feature = "async-storage", feature = "lpg"))]
21pub(crate) mod async_wal_store;
22#[cfg(all(feature = "wal", feature = "grafeo-file"))]
23pub mod backup;
24#[cfg(feature = "lpg")]
25pub(crate) mod catalog_section;
26#[cfg(feature = "cdc")]
27pub(crate) mod cdc_store;
28#[cfg(all(feature = "grafeo-file", feature = "lpg"))]
29mod checkpoint_timer;
30#[cfg(feature = "lpg")]
31mod crud;
32#[cfg(feature = "embed")]
33mod embed;
34#[cfg(feature = "grafeo-file")]
35pub(crate) mod flush;
36#[cfg(feature = "lpg")]
37mod import;
38#[cfg(feature = "lpg")]
39mod index;
40#[cfg(feature = "lpg")]
41mod persistence;
42mod query;
43#[cfg(feature = "triple-store")]
44mod rdf_ops;
45#[cfg(feature = "lpg")]
46mod search;
47pub(crate) mod section_consumer;
48#[cfg(all(feature = "wal", feature = "lpg"))]
49pub(crate) mod wal_store;
50
51use grafeo_common::{grafeo_error, grafeo_warn};
52#[cfg(feature = "wal")]
53use std::path::Path;
54use std::sync::Arc;
55use std::sync::atomic::AtomicUsize;
56
57use parking_lot::RwLock;
58
59use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
60use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind, Result};
61#[cfg(feature = "lpg")]
62use grafeo_core::graph::lpg::LpgStore;
63#[cfg(feature = "triple-store")]
64use grafeo_core::graph::rdf::RdfStore;
65use grafeo_core::graph::{GraphStore, GraphStoreMut};
66#[cfg(feature = "grafeo-file")]
67use grafeo_storage::file::GrafeoFileManager;
68#[cfg(all(feature = "wal", feature = "lpg"))]
69use grafeo_storage::wal::WalRecovery;
70#[cfg(feature = "wal")]
71use grafeo_storage::wal::{DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord};
72
73use crate::catalog::Catalog;
74use crate::config::Config;
75use crate::query::cache::QueryCache;
76use crate::session::Session;
77use crate::transaction::TransactionManager;
78
79/// Your handle to a Grafeo database.
80///
81/// Start here. Create one with [`new_in_memory()`](Self::new_in_memory) for
82/// quick experiments, or [`open()`](Self::open) for persistent storage.
83/// Then grab a [`session()`](Self::session) to start querying.
84///
85/// # Examples
86///
87/// ```
88/// use grafeo_engine::GrafeoDB;
89///
90/// // Quick in-memory database
91/// let db = GrafeoDB::new_in_memory();
92///
93/// // Add some data
94/// db.create_node(&["Person"]);
95///
96/// // Query it
97/// let session = db.session();
98/// let result = session.execute("MATCH (p:Person) RETURN p")?;
99/// # Ok::<(), grafeo_common::utils::error::Error>(())
100/// ```
101pub struct GrafeoDB {
102    /// Database configuration.
103    pub(super) config: Config,
104    /// The underlying graph store (None when using an external store).
105    #[cfg(feature = "lpg")]
106    pub(super) store: Option<Arc<LpgStore>>,
107    /// Schema and metadata catalog shared across sessions.
108    pub(super) catalog: Arc<Catalog>,
109    /// RDF triple store (if RDF feature is enabled).
110    #[cfg(feature = "triple-store")]
111    pub(super) rdf_store: Arc<RdfStore>,
112    /// Transaction manager.
113    pub(super) transaction_manager: Arc<TransactionManager>,
114    /// Unified buffer manager.
115    pub(super) buffer_manager: Arc<BufferManager>,
116    /// Write-ahead log manager (if durability is enabled).
117    #[cfg(feature = "wal")]
118    pub(super) wal: Option<Arc<LpgWal>>,
119    /// Shared WAL graph context tracker. Tracks which named graph was last
120    /// written to the WAL, so concurrent sessions can emit `SwitchGraph`
121    /// records only when the context actually changes.
122    #[cfg(feature = "wal")]
123    pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
124    /// Query cache for parsed and optimized plans.
125    pub(super) query_cache: Arc<QueryCache>,
126    /// Shared commit counter for auto-GC across sessions.
127    pub(super) commit_counter: Arc<AtomicUsize>,
128    /// Whether the database is open.
129    pub(super) is_open: RwLock<bool>,
130    /// Change data capture log for tracking mutations.
131    #[cfg(feature = "cdc")]
132    pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
133    /// Whether CDC is active for new sessions and direct CRUD (runtime-mutable).
134    #[cfg(feature = "cdc")]
135    cdc_enabled: std::sync::atomic::AtomicBool,
136    /// Registered embedding models for text-to-vector conversion.
137    #[cfg(feature = "embed")]
138    pub(super) embedding_models:
139        RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
140    /// Single-file database manager (when using `.grafeo` format).
141    #[cfg(feature = "grafeo-file")]
142    pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
143    /// Periodic checkpoint timer (when `checkpoint_interval` is configured).
144    /// Wrapped in Mutex because `close()` takes `&self` but needs to stop the timer.
145    #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
146    checkpoint_timer: parking_lot::Mutex<Option<checkpoint_timer::CheckpointTimer>>,
147    /// Shared registry of spilled vector storages.
148    /// Used by the search path to create `SpillableVectorAccessor` instances.
149    #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
150    vector_spill_storages: Option<
151        Arc<
152            parking_lot::RwLock<
153                std::collections::HashMap<String, Arc<grafeo_core::index::vector::MmapStorage>>,
154            >,
155        >,
156    >,
157    /// External read-only graph store (when using with_store() or with_read_store()).
158    /// When set, sessions route queries through this store instead of the built-in LpgStore.
159    pub(super) external_read_store: Option<Arc<dyn GraphStore>>,
160    /// External writable graph store (when using with_store()).
161    /// None for read-only databases created via with_read_store().
162    pub(super) external_write_store: Option<Arc<dyn GraphStoreMut>>,
163    /// Metrics registry shared across all sessions.
164    #[cfg(feature = "metrics")]
165    pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
166    /// Persistent graph context for one-shot `execute()` calls.
167    /// When set, each call to `session()` pre-configures the session to this graph.
168    /// Updated after every one-shot `execute()` to reflect `USE GRAPH` / `SESSION RESET`.
169    current_graph: RwLock<Option<String>>,
170    /// Persistent schema context for one-shot `execute()` calls.
171    /// When set, each call to `session()` pre-configures the session to this schema.
172    /// Updated after every one-shot `execute()` to reflect `SESSION SET SCHEMA` / `SESSION RESET`.
173    current_schema: RwLock<Option<String>>,
174    /// Whether this database is open in read-only mode.
175    /// When true, sessions automatically enforce read-only transactions.
176    read_only: bool,
177}
178
179impl GrafeoDB {
180    /// Returns a reference to the built-in LPG store.
181    ///
182    /// # Panics
183    ///
184    /// Panics if the database was created with [`with_store()`](Self::with_store) or
185    /// [`with_read_store()`](Self::with_read_store), which use an external store
186    /// instead of the built-in LPG store.
187    #[cfg(feature = "lpg")]
188    fn lpg_store(&self) -> &Arc<LpgStore> {
189        self.store.as_ref().expect(
190            "no built-in LpgStore: this GrafeoDB was created with an external store \
191             (with_store / with_read_store). Use session() or graph_store() instead.",
192        )
193    }
194
195    /// Returns whether CDC is active (runtime check).
196    #[cfg(feature = "cdc")]
197    #[inline]
198    pub(super) fn cdc_active(&self) -> bool {
199        self.cdc_enabled.load(std::sync::atomic::Ordering::Relaxed)
200    }
201
202    /// Creates an in-memory database, fast to create, gone when dropped.
203    ///
204    /// Use this for tests, experiments, or when you don't need persistence.
205    /// For data that survives restarts, use [`open()`](Self::open) instead.
206    ///
207    /// # Panics
208    ///
209    /// Panics if the internal arena allocator cannot be initialized (out of memory).
210    /// Use [`with_config()`](Self::with_config) for a fallible alternative.
211    ///
212    /// # Examples
213    ///
214    /// ```
215    /// use grafeo_engine::GrafeoDB;
216    ///
217    /// let db = GrafeoDB::new_in_memory();
218    /// let session = db.session();
219    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
220    /// # Ok::<(), grafeo_common::utils::error::Error>(())
221    /// ```
222    #[must_use]
223    pub fn new_in_memory() -> Self {
224        Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
225    }
226
227    /// Opens a database at the given path, creating it if it doesn't exist.
228    ///
229    /// If you've used this path before, Grafeo recovers your data from the
230    /// write-ahead log automatically. First open on a new path creates an
231    /// empty database.
232    ///
233    /// # Errors
234    ///
235    /// Returns an error if the path isn't writable or recovery fails.
236    ///
237    /// # Examples
238    ///
239    /// ```no_run
240    /// use grafeo_engine::GrafeoDB;
241    ///
242    /// let db = GrafeoDB::open("./my_social_network")?;
243    /// # Ok::<(), grafeo_common::utils::error::Error>(())
244    /// ```
245    #[cfg(feature = "wal")]
246    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
247        Self::with_config(Config::persistent(path.as_ref()))
248    }
249
250    /// Opens an existing database in read-only mode.
251    ///
252    /// Uses a shared file lock, so multiple processes can read the same
253    /// `.grafeo` file concurrently. The database loads the last checkpoint
254    /// snapshot but does **not** replay the WAL or allow mutations.
255    ///
256    /// Currently only supports the single-file (`.grafeo`) format.
257    ///
258    /// # Errors
259    ///
260    /// Returns an error if the file doesn't exist or can't be read.
261    ///
262    /// # Examples
263    ///
264    /// ```no_run
265    /// use grafeo_engine::GrafeoDB;
266    ///
267    /// let db = GrafeoDB::open_read_only("./my_graph.grafeo")?;
268    /// let session = db.session();
269    /// let result = session.execute("MATCH (n) RETURN n LIMIT 10")?;
270    /// // Mutations will return an error:
271    /// // session.execute("INSERT (:Person)") => Err(ReadOnly)
272    /// # Ok::<(), grafeo_common::utils::error::Error>(())
273    /// ```
274    #[cfg(feature = "grafeo-file")]
275    pub fn open_read_only(path: impl AsRef<std::path::Path>) -> Result<Self> {
276        Self::with_config(Config::read_only(path.as_ref()))
277    }
278
279    /// Creates a database with custom configuration.
280    ///
281    /// Use this when you need fine-grained control over memory limits,
282    /// thread counts, or persistence settings. For most cases,
283    /// [`new_in_memory()`](Self::new_in_memory) or [`open()`](Self::open)
284    /// are simpler.
285    ///
286    /// # Errors
287    ///
288    /// Returns an error if the database can't be created or recovery fails.
289    ///
290    /// # Examples
291    ///
292    /// ```
293    /// use grafeo_engine::{GrafeoDB, Config};
294    ///
295    /// // In-memory with a 512MB limit
296    /// let config = Config::in_memory()
297    ///     .with_memory_limit(512 * 1024 * 1024);
298    ///
299    /// let db = GrafeoDB::with_config(config)?;
300    /// # Ok::<(), grafeo_common::utils::error::Error>(())
301    /// ```
302    pub fn with_config(config: Config) -> Result<Self> {
303        // Validate configuration before proceeding
304        config
305            .validate()
306            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
307
308        #[cfg(feature = "lpg")]
309        let store = Arc::new(LpgStore::new()?);
310        #[cfg(feature = "triple-store")]
311        let rdf_store = Arc::new(RdfStore::new());
312        let transaction_manager = Arc::new(TransactionManager::new());
313
314        // Create buffer manager with configured limits
315        let buffer_config = BufferManagerConfig {
316            budget: config.memory_limit.unwrap_or_else(|| {
317                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
318            }),
319            spill_path: config.spill_path.clone().or_else(|| {
320                config.path.as_ref().and_then(|p| {
321                    let parent = p.parent()?;
322                    let name = p.file_name()?.to_str()?;
323                    Some(parent.join(format!("{name}.spill")))
324                })
325            }),
326            ..BufferManagerConfig::default()
327        };
328        let buffer_manager = BufferManager::new(buffer_config);
329
330        // Create catalog early so WAL replay can restore schema definitions
331        let catalog = Arc::new(Catalog::new());
332
333        let is_read_only = config.access_mode == crate::config::AccessMode::ReadOnly;
334
335        // --- Single-file format (.grafeo) ---
336        #[cfg(feature = "grafeo-file")]
337        let file_manager: Option<Arc<GrafeoFileManager>> = if is_read_only {
338            // Read-only mode: open with shared lock, load snapshot, skip WAL
339            if let Some(ref db_path) = config.path {
340                if db_path.exists() && db_path.is_file() {
341                    let fm = GrafeoFileManager::open_read_only(db_path)?;
342                    // Try v2 section-based format first
343                    #[cfg(feature = "lpg")]
344                    if fm.read_section_directory()?.is_some() {
345                        Self::load_from_sections(
346                            &fm,
347                            &store,
348                            &catalog,
349                            #[cfg(feature = "triple-store")]
350                            &rdf_store,
351                        )?;
352                    } else {
353                        // Fall back to v1 blob format
354                        let snapshot_data = fm.read_snapshot()?;
355                        if !snapshot_data.is_empty() {
356                            Self::apply_snapshot_data(
357                                &store,
358                                &catalog,
359                                #[cfg(feature = "triple-store")]
360                                &rdf_store,
361                                &snapshot_data,
362                            )?;
363                        }
364                    }
365                    Some(Arc::new(fm))
366                } else {
367                    return Err(grafeo_common::utils::error::Error::Internal(format!(
368                        "read-only open requires an existing .grafeo file: {}",
369                        db_path.display()
370                    )));
371                }
372            } else {
373                return Err(grafeo_common::utils::error::Error::Internal(
374                    "read-only mode requires a database path".to_string(),
375                ));
376            }
377        } else if let Some(ref db_path) = config.path {
378            // Initialize the file manager whenever single-file format is selected,
379            // regardless of whether WAL is enabled. Without this, a database opened
380            // with wal_enabled:false + StorageFormat::SingleFile would produce no
381            // output at all (the file manager was previously gated behind wal_enabled).
382            if Self::should_use_single_file(db_path, config.storage_format) {
383                let fm = if db_path.exists() && db_path.is_file() {
384                    GrafeoFileManager::open(db_path)?
385                } else if !db_path.exists() {
386                    GrafeoFileManager::create(db_path)?
387                } else {
388                    // Path exists but is not a file (directory, etc.)
389                    return Err(grafeo_common::utils::error::Error::Internal(format!(
390                        "path exists but is not a file: {}",
391                        db_path.display()
392                    )));
393                };
394
395                // Load data: try v2 section-based format, fall back to v1 blob
396                #[cfg(feature = "lpg")]
397                if fm.read_section_directory()?.is_some() {
398                    Self::load_from_sections(
399                        &fm,
400                        &store,
401                        &catalog,
402                        #[cfg(feature = "triple-store")]
403                        &rdf_store,
404                    )?;
405                } else {
406                    let snapshot_data = fm.read_snapshot()?;
407                    if !snapshot_data.is_empty() {
408                        Self::apply_snapshot_data(
409                            &store,
410                            &catalog,
411                            #[cfg(feature = "triple-store")]
412                            &rdf_store,
413                            &snapshot_data,
414                        )?;
415                    }
416                }
417
418                // Recover sidecar WAL if WAL is enabled and a sidecar exists
419                #[cfg(all(feature = "wal", feature = "lpg"))]
420                if config.wal_enabled && fm.has_sidecar_wal() {
421                    let recovery = WalRecovery::new(fm.sidecar_wal_path());
422                    let records = recovery.recover()?;
423                    Self::apply_wal_records(
424                        &store,
425                        &catalog,
426                        #[cfg(feature = "triple-store")]
427                        &rdf_store,
428                        &records,
429                    )?;
430                }
431
432                Some(Arc::new(fm))
433            } else {
434                None
435            }
436        } else {
437            None
438        };
439
440        // Determine whether to use the WAL directory path (legacy) or sidecar
441        // Read-only mode skips WAL entirely (no recovery, no creation).
442        #[cfg(feature = "wal")]
443        let wal = if is_read_only {
444            None
445        } else if config.wal_enabled {
446            if let Some(ref db_path) = config.path {
447                // When using single-file format, the WAL is a sidecar directory
448                #[cfg(feature = "grafeo-file")]
449                let wal_path = if let Some(ref fm) = file_manager {
450                    let p = fm.sidecar_wal_path();
451                    std::fs::create_dir_all(&p)?;
452                    p
453                } else {
454                    // Legacy: WAL inside the database directory
455                    std::fs::create_dir_all(db_path)?;
456                    db_path.join("wal")
457                };
458
459                #[cfg(not(feature = "grafeo-file"))]
460                let wal_path = {
461                    std::fs::create_dir_all(db_path)?;
462                    db_path.join("wal")
463                };
464
465                // For legacy WAL directory format, check if WAL exists and recover
466                #[cfg(all(feature = "lpg", feature = "grafeo-file"))]
467                let is_single_file = file_manager.is_some();
468                #[cfg(all(feature = "lpg", not(feature = "grafeo-file")))]
469                let is_single_file = false;
470
471                #[cfg(feature = "lpg")]
472                if !is_single_file && wal_path.exists() {
473                    let recovery = WalRecovery::new(&wal_path);
474                    let records = recovery.recover()?;
475                    Self::apply_wal_records(
476                        &store,
477                        &catalog,
478                        #[cfg(feature = "triple-store")]
479                        &rdf_store,
480                        &records,
481                    )?;
482                }
483
484                // Open/create WAL manager with configured durability
485                let wal_durability = match config.wal_durability {
486                    crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
487                    crate::config::DurabilityMode::Batch {
488                        max_delay_ms,
489                        max_records,
490                    } => WalDurabilityMode::Batch {
491                        max_delay_ms,
492                        max_records,
493                    },
494                    crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
495                        WalDurabilityMode::Adaptive { target_interval_ms }
496                    }
497                    crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
498                };
499                let wal_config = WalConfig {
500                    durability: wal_durability,
501                    ..WalConfig::default()
502                };
503                let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
504                Some(Arc::new(wal_manager))
505            } else {
506                None
507            }
508        } else {
509            None
510        };
511
512        // Create query cache with default capacity (1000 queries)
513        let query_cache = Arc::new(QueryCache::default());
514
515        // After all snapshot/WAL recovery, sync TransactionManager epoch
516        // with the store so queries use the correct viewing epoch.
517        #[cfg(all(feature = "temporal", feature = "lpg"))]
518        transaction_manager.sync_epoch(store.current_epoch());
519
520        #[cfg(feature = "cdc")]
521        let cdc_enabled_val = config.cdc_enabled;
522        #[cfg(feature = "cdc")]
523        let cdc_retention = config.cdc_retention.clone();
524
525        // Clone Arcs for the checkpoint timer before moving originals into the struct.
526        // The timer captures its own references and runs in a background thread.
527        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
528        let checkpoint_interval = config.checkpoint_interval;
529        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
530        let timer_store = Arc::clone(&store);
531        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
532        let timer_catalog = Arc::clone(&catalog);
533        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
534        let timer_tm = Arc::clone(&transaction_manager);
535        #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "triple-store"))]
536        let timer_rdf = Arc::clone(&rdf_store);
537        #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "wal"))]
538        let timer_wal = wal.clone();
539
540        let mut db = Self {
541            config,
542            #[cfg(feature = "lpg")]
543            store: Some(store),
544            catalog,
545            #[cfg(feature = "triple-store")]
546            rdf_store,
547            transaction_manager,
548            buffer_manager,
549            #[cfg(feature = "wal")]
550            wal,
551            #[cfg(feature = "wal")]
552            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
553            query_cache,
554            commit_counter: Arc::new(AtomicUsize::new(0)),
555            is_open: RwLock::new(true),
556            #[cfg(feature = "cdc")]
557            cdc_log: Arc::new(crate::cdc::CdcLog::with_retention(cdc_retention)),
558            #[cfg(feature = "cdc")]
559            cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
560            #[cfg(feature = "embed")]
561            embedding_models: RwLock::new(hashbrown::HashMap::new()),
562            #[cfg(feature = "grafeo-file")]
563            file_manager,
564            #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
565            checkpoint_timer: parking_lot::Mutex::new(None),
566            #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
567            vector_spill_storages: None,
568            external_read_store: None,
569            external_write_store: None,
570            #[cfg(feature = "metrics")]
571            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
572            current_graph: RwLock::new(None),
573            current_schema: RwLock::new(None),
574            read_only: is_read_only,
575        };
576
577        // Register storage sections as memory consumers for pressure tracking
578        db.register_section_consumers();
579
580        // Start periodic checkpoint timer if configured
581        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
582        if let (Some(interval), Some(fm)) = (checkpoint_interval, &db.file_manager)
583            && !is_read_only
584        {
585            *db.checkpoint_timer.lock() = Some(checkpoint_timer::CheckpointTimer::start(
586                interval,
587                Arc::clone(fm),
588                timer_store,
589                timer_catalog,
590                timer_tm,
591                #[cfg(feature = "triple-store")]
592                timer_rdf,
593                #[cfg(feature = "wal")]
594                timer_wal,
595            ));
596        }
597
598        // Discover existing spill files from a previous session.
599        // If vectors were spilled before close, the spill files persist on disk
600        // and need to be re-mapped so search can read from them.
601        #[cfg(all(
602            feature = "lpg",
603            feature = "vector-index",
604            feature = "mmap",
605            not(feature = "temporal")
606        ))]
607        db.restore_spill_files();
608
609        // If VectorStore is configured as ForceDisk, immediately spill embeddings.
610        // This must happen after register_section_consumers() which creates the consumer.
611        #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
612        if db
613            .config
614            .section_configs
615            .get(&grafeo_common::storage::SectionType::VectorStore)
616            .is_some_and(|c| c.tier == grafeo_common::storage::TierOverride::ForceDisk)
617        {
618            db.buffer_manager.spill_all();
619        }
620
621        Ok(db)
622    }
623
624    /// Creates a database backed by a custom [`GraphStoreMut`] implementation.
625    ///
626    /// The external store handles all data persistence. WAL, CDC, and index
627    /// management are the responsibility of the store implementation.
628    ///
629    /// Query execution (all 6 languages, optimizer, planner) works through the
630    /// provided store. Admin operations (schema introspection, persistence,
631    /// vector/text indexes) are not available on external stores.
632    ///
633    /// # Examples
634    ///
635    /// ```no_run
636    /// use std::sync::Arc;
637    /// use grafeo_engine::{GrafeoDB, Config};
638    /// use grafeo_core::graph::GraphStoreMut;
639    ///
640    /// fn example(store: Arc<dyn GraphStoreMut>) -> grafeo_common::utils::error::Result<()> {
641    ///     let db = GrafeoDB::with_store(store, Config::in_memory())?;
642    ///     let result = db.execute("MATCH (n) RETURN count(n)")?;
643    ///     Ok(())
644    /// }
645    /// ```
646    ///
647    /// # Errors
648    ///
649    /// Returns an error if config validation fails.
650    ///
651    /// [`GraphStoreMut`]: grafeo_core::graph::GraphStoreMut
652    pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
653        config
654            .validate()
655            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
656
657        let transaction_manager = Arc::new(TransactionManager::new());
658
659        let buffer_config = BufferManagerConfig {
660            budget: config.memory_limit.unwrap_or_else(|| {
661                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
662            }),
663            spill_path: None,
664            ..BufferManagerConfig::default()
665        };
666        let buffer_manager = BufferManager::new(buffer_config);
667
668        let query_cache = Arc::new(QueryCache::default());
669
670        #[cfg(feature = "cdc")]
671        let cdc_enabled_val = config.cdc_enabled;
672
673        Ok(Self {
674            config,
675            #[cfg(feature = "lpg")]
676            store: None,
677            catalog: Arc::new(Catalog::new()),
678            #[cfg(feature = "triple-store")]
679            rdf_store: Arc::new(RdfStore::new()),
680            transaction_manager,
681            buffer_manager,
682            #[cfg(feature = "wal")]
683            wal: None,
684            #[cfg(feature = "wal")]
685            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
686            query_cache,
687            commit_counter: Arc::new(AtomicUsize::new(0)),
688            is_open: RwLock::new(true),
689            #[cfg(feature = "cdc")]
690            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
691            #[cfg(feature = "cdc")]
692            cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
693            #[cfg(feature = "embed")]
694            embedding_models: RwLock::new(hashbrown::HashMap::new()),
695            #[cfg(feature = "grafeo-file")]
696            file_manager: None,
697            #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
698            checkpoint_timer: parking_lot::Mutex::new(None),
699            #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
700            vector_spill_storages: None,
701            external_read_store: Some(Arc::clone(&store) as Arc<dyn GraphStore>),
702            external_write_store: Some(store),
703            #[cfg(feature = "metrics")]
704            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
705            current_graph: RwLock::new(None),
706            current_schema: RwLock::new(None),
707            read_only: false,
708        })
709    }
710
711    /// Creates a database backed by a read-only [`GraphStore`].
712    ///
713    /// The database is set to read-only mode. Write queries (CREATE, SET,
714    /// DELETE) will return `TransactionError::ReadOnly`.
715    ///
716    /// # Examples
717    ///
718    /// ```no_run
719    /// use std::sync::Arc;
720    /// use grafeo_engine::{GrafeoDB, Config};
721    /// use grafeo_core::graph::GraphStore;
722    ///
723    /// fn example(store: Arc<dyn GraphStore>) -> grafeo_common::utils::error::Result<()> {
724    ///     let db = GrafeoDB::with_read_store(store, Config::in_memory())?;
725    ///     let result = db.execute("MATCH (n) RETURN count(n)")?;
726    ///     Ok(())
727    /// }
728    /// ```
729    ///
730    /// # Errors
731    ///
732    /// Returns an error if config validation fails.
733    ///
734    /// [`GraphStore`]: grafeo_core::graph::GraphStore
735    pub fn with_read_store(store: Arc<dyn GraphStore>, config: Config) -> Result<Self> {
736        config
737            .validate()
738            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
739
740        let transaction_manager = Arc::new(TransactionManager::new());
741
742        let buffer_config = BufferManagerConfig {
743            budget: config.memory_limit.unwrap_or_else(|| {
744                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
745            }),
746            spill_path: None,
747            ..BufferManagerConfig::default()
748        };
749        let buffer_manager = BufferManager::new(buffer_config);
750
751        let query_cache = Arc::new(QueryCache::default());
752
753        #[cfg(feature = "cdc")]
754        let cdc_enabled_val = config.cdc_enabled;
755
756        Ok(Self {
757            config,
758            #[cfg(feature = "lpg")]
759            store: None,
760            catalog: Arc::new(Catalog::new()),
761            #[cfg(feature = "triple-store")]
762            rdf_store: Arc::new(RdfStore::new()),
763            transaction_manager,
764            buffer_manager,
765            #[cfg(feature = "wal")]
766            wal: None,
767            #[cfg(feature = "wal")]
768            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
769            query_cache,
770            commit_counter: Arc::new(AtomicUsize::new(0)),
771            is_open: RwLock::new(true),
772            #[cfg(feature = "cdc")]
773            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
774            #[cfg(feature = "cdc")]
775            cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
776            #[cfg(feature = "embed")]
777            embedding_models: RwLock::new(hashbrown::HashMap::new()),
778            #[cfg(feature = "grafeo-file")]
779            file_manager: None,
780            #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
781            checkpoint_timer: parking_lot::Mutex::new(None),
782            #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
783            vector_spill_storages: None,
784            external_read_store: Some(store),
785            external_write_store: None,
786            #[cfg(feature = "metrics")]
787            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
788            current_graph: RwLock::new(None),
789            current_schema: RwLock::new(None),
790            read_only: true,
791        })
792    }
793
794    /// Converts the database to a read-only [`CompactStore`] for faster queries.
795    ///
796    /// Takes a snapshot of all nodes and edges from the current store, builds
797    /// a columnar `CompactStore` with CSR adjacency, and switches the database
798    /// to read-only mode. The original store is dropped to free memory.
799    ///
800    /// After calling this, all write queries will fail with
801    /// `TransactionError::ReadOnly`. Read queries (across all supported
802    /// languages) continue to work and benefit from ~60x memory reduction
803    /// and 100x+ traversal speedup.
804    ///
805    /// # Errors
806    ///
807    /// Returns an error if the conversion fails (e.g. more than 32,767
808    /// distinct labels or edge types).
809    ///
810    /// [`CompactStore`]: grafeo_core::graph::compact::CompactStore
811    #[cfg(feature = "compact-store")]
812    pub fn compact(&mut self) -> Result<()> {
813        use grafeo_core::graph::compact::from_graph_store;
814
815        let current_store = self.graph_store();
816        let compact = from_graph_store(current_store.as_ref())
817            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
818
819        self.external_read_store = Some(Arc::new(compact) as Arc<dyn GraphStore>);
820        self.external_write_store = None;
821        #[cfg(feature = "lpg")]
822        {
823            self.store = None;
824        }
825        self.read_only = true;
826        self.query_cache = Arc::new(QueryCache::default());
827
828        Ok(())
829    }
830
831    /// Applies WAL records to restore the database state.
832    ///
833    /// Data mutation records are routed through a graph cursor that tracks
834    /// `SwitchGraph` context markers, replaying mutations into the correct
835    /// named graph (or the default graph when cursor is `None`).
836    #[cfg(all(feature = "wal", feature = "lpg"))]
837    fn apply_wal_records(
838        store: &Arc<LpgStore>,
839        catalog: &Catalog,
840        #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
841        records: &[WalRecord],
842    ) -> Result<()> {
843        use crate::catalog::{
844            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
845        };
846        use grafeo_common::utils::error::Error;
847
848        // Graph cursor: tracks which named graph receives data mutations.
849        // `None` means the default graph.
850        let mut current_graph: Option<String> = None;
851        let mut target_store: Arc<LpgStore> = Arc::clone(store);
852
853        for record in records {
854            match record {
855                // --- Named graph lifecycle ---
856                WalRecord::CreateNamedGraph { name } => {
857                    let _ = store.create_graph(name);
858                }
859                WalRecord::DropNamedGraph { name } => {
860                    store.drop_graph(name);
861                    // Reset cursor if the dropped graph was active
862                    if current_graph.as_deref() == Some(name.as_str()) {
863                        current_graph = None;
864                        target_store = Arc::clone(store);
865                    }
866                }
867                WalRecord::SwitchGraph { name } => {
868                    current_graph.clone_from(name);
869                    target_store = match &current_graph {
870                        None => Arc::clone(store),
871                        Some(graph_name) => store
872                            .graph_or_create(graph_name)
873                            .map_err(|e| Error::Internal(e.to_string()))?,
874                    };
875                }
876
877                // --- Data mutations: routed through target_store ---
878                WalRecord::CreateNode { id, labels } => {
879                    let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
880                    target_store.create_node_with_id(*id, &label_refs)?;
881                }
882                WalRecord::DeleteNode { id } => {
883                    target_store.delete_node(*id);
884                }
885                WalRecord::CreateEdge {
886                    id,
887                    src,
888                    dst,
889                    edge_type,
890                } => {
891                    target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
892                }
893                WalRecord::DeleteEdge { id } => {
894                    target_store.delete_edge(*id);
895                }
896                WalRecord::SetNodeProperty { id, key, value } => {
897                    target_store.set_node_property(*id, key, value.clone());
898                }
899                WalRecord::SetEdgeProperty { id, key, value } => {
900                    target_store.set_edge_property(*id, key, value.clone());
901                }
902                WalRecord::AddNodeLabel { id, label } => {
903                    target_store.add_label(*id, label);
904                }
905                WalRecord::RemoveNodeLabel { id, label } => {
906                    target_store.remove_label(*id, label);
907                }
908                WalRecord::RemoveNodeProperty { id, key } => {
909                    target_store.remove_node_property(*id, key);
910                }
911                WalRecord::RemoveEdgeProperty { id, key } => {
912                    target_store.remove_edge_property(*id, key);
913                }
914
915                // --- Schema DDL replay (always on root catalog) ---
916                WalRecord::CreateNodeType {
917                    name,
918                    properties,
919                    constraints,
920                } => {
921                    let def = NodeTypeDefinition {
922                        name: name.clone(),
923                        properties: properties
924                            .iter()
925                            .map(|(n, t, nullable)| TypedProperty {
926                                name: n.clone(),
927                                data_type: PropertyDataType::from_type_name(t),
928                                nullable: *nullable,
929                                default_value: None,
930                            })
931                            .collect(),
932                        constraints: constraints
933                            .iter()
934                            .map(|(kind, props)| match kind.as_str() {
935                                "unique" => TypeConstraint::Unique(props.clone()),
936                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
937                                "not_null" if !props.is_empty() => {
938                                    TypeConstraint::NotNull(props[0].clone())
939                                }
940                                _ => TypeConstraint::Unique(props.clone()),
941                            })
942                            .collect(),
943                        parent_types: Vec::new(),
944                    };
945                    let _ = catalog.register_node_type(def);
946                }
947                WalRecord::DropNodeType { name } => {
948                    let _ = catalog.drop_node_type(name);
949                }
950                WalRecord::CreateEdgeType {
951                    name,
952                    properties,
953                    constraints,
954                } => {
955                    let def = EdgeTypeDefinition {
956                        name: name.clone(),
957                        properties: properties
958                            .iter()
959                            .map(|(n, t, nullable)| TypedProperty {
960                                name: n.clone(),
961                                data_type: PropertyDataType::from_type_name(t),
962                                nullable: *nullable,
963                                default_value: None,
964                            })
965                            .collect(),
966                        constraints: constraints
967                            .iter()
968                            .map(|(kind, props)| match kind.as_str() {
969                                "unique" => TypeConstraint::Unique(props.clone()),
970                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
971                                "not_null" if !props.is_empty() => {
972                                    TypeConstraint::NotNull(props[0].clone())
973                                }
974                                _ => TypeConstraint::Unique(props.clone()),
975                            })
976                            .collect(),
977                        source_node_types: Vec::new(),
978                        target_node_types: Vec::new(),
979                    };
980                    let _ = catalog.register_edge_type_def(def);
981                }
982                WalRecord::DropEdgeType { name } => {
983                    let _ = catalog.drop_edge_type_def(name);
984                }
985                WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
986                    // Index recreation is handled by the store on startup
987                    // (indexes are rebuilt from data, not WAL)
988                }
989                WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
990                    // Constraint definitions are part of type definitions
991                    // and replayed via CreateNodeType/CreateEdgeType
992                }
993                WalRecord::CreateGraphType {
994                    name,
995                    node_types,
996                    edge_types,
997                    open,
998                } => {
999                    use crate::catalog::GraphTypeDefinition;
1000                    let def = GraphTypeDefinition {
1001                        name: name.clone(),
1002                        allowed_node_types: node_types.clone(),
1003                        allowed_edge_types: edge_types.clone(),
1004                        open: *open,
1005                    };
1006                    let _ = catalog.register_graph_type(def);
1007                }
1008                WalRecord::DropGraphType { name } => {
1009                    let _ = catalog.drop_graph_type(name);
1010                }
1011                WalRecord::CreateSchema { name } => {
1012                    let _ = catalog.register_schema_namespace(name.clone());
1013                }
1014                WalRecord::DropSchema { name } => {
1015                    let _ = catalog.drop_schema_namespace(name);
1016                }
1017
1018                WalRecord::AlterNodeType { name, alterations } => {
1019                    for (action, prop_name, type_name, nullable) in alterations {
1020                        match action.as_str() {
1021                            "add" => {
1022                                let prop = TypedProperty {
1023                                    name: prop_name.clone(),
1024                                    data_type: PropertyDataType::from_type_name(type_name),
1025                                    nullable: *nullable,
1026                                    default_value: None,
1027                                };
1028                                let _ = catalog.alter_node_type_add_property(name, prop);
1029                            }
1030                            "drop" => {
1031                                let _ = catalog.alter_node_type_drop_property(name, prop_name);
1032                            }
1033                            _ => {}
1034                        }
1035                    }
1036                }
1037                WalRecord::AlterEdgeType { name, alterations } => {
1038                    for (action, prop_name, type_name, nullable) in alterations {
1039                        match action.as_str() {
1040                            "add" => {
1041                                let prop = TypedProperty {
1042                                    name: prop_name.clone(),
1043                                    data_type: PropertyDataType::from_type_name(type_name),
1044                                    nullable: *nullable,
1045                                    default_value: None,
1046                                };
1047                                let _ = catalog.alter_edge_type_add_property(name, prop);
1048                            }
1049                            "drop" => {
1050                                let _ = catalog.alter_edge_type_drop_property(name, prop_name);
1051                            }
1052                            _ => {}
1053                        }
1054                    }
1055                }
1056                WalRecord::AlterGraphType { name, alterations } => {
1057                    for (action, type_name) in alterations {
1058                        match action.as_str() {
1059                            "add_node" => {
1060                                let _ =
1061                                    catalog.alter_graph_type_add_node_type(name, type_name.clone());
1062                            }
1063                            "drop_node" => {
1064                                let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
1065                            }
1066                            "add_edge" => {
1067                                let _ =
1068                                    catalog.alter_graph_type_add_edge_type(name, type_name.clone());
1069                            }
1070                            "drop_edge" => {
1071                                let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
1072                            }
1073                            _ => {}
1074                        }
1075                    }
1076                }
1077
1078                WalRecord::CreateProcedure {
1079                    name,
1080                    params,
1081                    returns,
1082                    body,
1083                } => {
1084                    use crate::catalog::ProcedureDefinition;
1085                    let def = ProcedureDefinition {
1086                        name: name.clone(),
1087                        params: params.clone(),
1088                        returns: returns.clone(),
1089                        body: body.clone(),
1090                    };
1091                    let _ = catalog.register_procedure(def);
1092                }
1093                WalRecord::DropProcedure { name } => {
1094                    let _ = catalog.drop_procedure(name);
1095                }
1096
1097                // --- RDF triple replay ---
1098                #[cfg(feature = "triple-store")]
1099                WalRecord::InsertRdfTriple { .. }
1100                | WalRecord::DeleteRdfTriple { .. }
1101                | WalRecord::ClearRdfGraph { .. }
1102                | WalRecord::CreateRdfGraph { .. }
1103                | WalRecord::DropRdfGraph { .. } => {
1104                    rdf_ops::replay_rdf_wal_record(rdf_store, record);
1105                }
1106                #[cfg(not(feature = "triple-store"))]
1107                WalRecord::InsertRdfTriple { .. }
1108                | WalRecord::DeleteRdfTriple { .. }
1109                | WalRecord::ClearRdfGraph { .. }
1110                | WalRecord::CreateRdfGraph { .. }
1111                | WalRecord::DropRdfGraph { .. } => {}
1112
1113                WalRecord::TransactionCommit { .. } => {
1114                    // In temporal mode, advance the store epoch on each committed
1115                    // transaction so that subsequent property/label operations
1116                    // are recorded at the correct epoch in their VersionLogs.
1117                    #[cfg(feature = "temporal")]
1118                    {
1119                        target_store.new_epoch();
1120                    }
1121                }
1122                WalRecord::TransactionAbort { .. } | WalRecord::Checkpoint { .. } => {
1123                    // Transaction control records don't need replay action
1124                    // (recovery already filtered to only committed transactions)
1125                }
1126                WalRecord::EpochAdvance { .. } => {
1127                    // Metadata record: no store mutation needed.
1128                    // Used by incremental backup and point-in-time recovery.
1129                }
1130            }
1131        }
1132        Ok(())
1133    }
1134
1135    // =========================================================================
1136    // Single-file format helpers
1137    // =========================================================================
1138
1139    /// Returns `true` if the given path should use single-file format.
1140    #[cfg(feature = "grafeo-file")]
1141    fn should_use_single_file(
1142        path: &std::path::Path,
1143        configured: crate::config::StorageFormat,
1144    ) -> bool {
1145        use crate::config::StorageFormat;
1146        match configured {
1147            StorageFormat::SingleFile => true,
1148            StorageFormat::WalDirectory => false,
1149            StorageFormat::Auto => {
1150                // Existing file: check magic bytes
1151                if path.is_file() {
1152                    if let Ok(mut f) = std::fs::File::open(path) {
1153                        use std::io::Read;
1154                        let mut magic = [0u8; 4];
1155                        if f.read_exact(&mut magic).is_ok() && magic == grafeo_storage::file::MAGIC
1156                        {
1157                            return true;
1158                        }
1159                    }
1160                    return false;
1161                }
1162                // Existing directory: legacy format
1163                if path.is_dir() {
1164                    return false;
1165                }
1166                // New path: check extension
1167                path.extension().is_some_and(|ext| ext == "grafeo")
1168            }
1169        }
1170    }
1171
1172    /// Applies snapshot data (from a `.grafeo` file) to restore the store and catalog.
1173    ///
1174    /// Supports both v1 (monolithic blob) and v2 (section-based) formats.
1175    #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
1176    fn apply_snapshot_data(
1177        store: &Arc<LpgStore>,
1178        catalog: &Arc<crate::catalog::Catalog>,
1179        #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
1180        data: &[u8],
1181    ) -> Result<()> {
1182        // v1 blob format: pass through to legacy loader
1183        persistence::load_snapshot_into_store(
1184            store,
1185            catalog,
1186            #[cfg(feature = "triple-store")]
1187            rdf_store,
1188            data,
1189        )
1190    }
1191
1192    /// Loads from a section-based `.grafeo` file (v2 format).
1193    ///
1194    /// Reads the section directory, then deserializes each section independently.
1195    #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
1196    fn load_from_sections(
1197        fm: &GrafeoFileManager,
1198        store: &Arc<LpgStore>,
1199        catalog: &Arc<crate::catalog::Catalog>,
1200        #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
1201    ) -> Result<()> {
1202        use grafeo_common::storage::{Section, SectionType};
1203
1204        let dir = fm.read_section_directory()?.ok_or_else(|| {
1205            grafeo_common::utils::error::Error::Internal(
1206                "expected v2 section directory but found none".to_string(),
1207            )
1208        })?;
1209
1210        // Load catalog section first (schema defs needed before data)
1211        if let Some(entry) = dir.find(SectionType::Catalog) {
1212            let data = fm.read_section_data(entry)?;
1213            let tm = Arc::new(crate::transaction::TransactionManager::new());
1214            let mut section = catalog_section::CatalogSection::new(
1215                Arc::clone(catalog),
1216                Arc::clone(store),
1217                move || tm.current_epoch().as_u64(),
1218            );
1219            section.deserialize(&data)?;
1220        }
1221
1222        // Load LPG store
1223        if let Some(entry) = dir.find(SectionType::LpgStore) {
1224            let data = fm.read_section_data(entry)?;
1225            let mut section = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
1226            section.deserialize(&data)?;
1227        }
1228
1229        // Load RDF store
1230        #[cfg(feature = "triple-store")]
1231        if let Some(entry) = dir.find(SectionType::RdfStore) {
1232            let data = fm.read_section_data(entry)?;
1233            let mut section = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(rdf_store));
1234            section.deserialize(&data)?;
1235        }
1236
1237        // Restore HNSW topology (if vector indexes exist in both catalog and section)
1238        #[cfg(feature = "vector-index")]
1239        if let Some(entry) = dir.find(SectionType::VectorStore) {
1240            let data = fm.read_section_data(entry)?;
1241            let indexes = store.vector_index_entries();
1242            if !indexes.is_empty() {
1243                let mut section = grafeo_core::index::vector::VectorStoreSection::new(indexes);
1244                section.deserialize(&data)?;
1245            }
1246        }
1247
1248        // Restore BM25 postings (if text indexes exist in both catalog and section)
1249        #[cfg(feature = "text-index")]
1250        if let Some(entry) = dir.find(SectionType::TextIndex) {
1251            let data = fm.read_section_data(entry)?;
1252            let indexes = store.text_index_entries();
1253            if !indexes.is_empty() {
1254                let mut section = grafeo_core::index::text::TextIndexSection::new(indexes);
1255                section.deserialize(&data)?;
1256            }
1257        }
1258
1259        Ok(())
1260    }
1261
1262    // =========================================================================
1263    // Session & Configuration
1264    // =========================================================================
1265
1266    /// Opens a new session for running queries.
1267    ///
1268    /// Sessions are cheap to create: spin up as many as you need. Each
1269    /// gets its own transaction context, so concurrent sessions won't
1270    /// block each other on reads.
1271    ///
1272    /// # Panics
1273    ///
1274    /// Panics if the database was configured with an external graph store and
1275    /// the internal arena allocator cannot be initialized (out of memory).
1276    ///
1277    /// # Examples
1278    ///
1279    /// ```
1280    /// use grafeo_engine::GrafeoDB;
1281    ///
1282    /// let db = GrafeoDB::new_in_memory();
1283    /// let session = db.session();
1284    ///
1285    /// // Run queries through the session
1286    /// let result = session.execute("MATCH (n) RETURN count(n)")?;
1287    /// # Ok::<(), grafeo_common::utils::error::Error>(())
1288    /// ```
1289    #[must_use]
1290    pub fn session(&self) -> Session {
1291        self.create_session_inner(None)
1292    }
1293
1294    /// Creates a session with an explicit CDC override.
1295    ///
1296    /// When `cdc_enabled` is `true`, mutations in this session are tracked
1297    /// regardless of the database default. When `false`, mutations are not
1298    /// tracked regardless of the database default.
1299    ///
1300    /// # Examples
1301    ///
1302    /// ```
1303    /// use grafeo_engine::GrafeoDB;
1304    ///
1305    /// let db = GrafeoDB::new_in_memory();
1306    ///
1307    /// // Opt in to CDC for just this session
1308    /// let tracked = db.session_with_cdc(true);
1309    /// tracked.execute("INSERT (:Person {name: 'Alix'})")?;
1310    /// # Ok::<(), grafeo_common::utils::error::Error>(())
1311    /// ```
1312    #[cfg(feature = "cdc")]
1313    #[must_use]
1314    pub fn session_with_cdc(&self, cdc_enabled: bool) -> Session {
1315        self.create_session_inner(Some(cdc_enabled))
1316    }
1317
1318    /// Creates a read-only session regardless of the database's access mode.
1319    ///
1320    /// Mutations executed through this session will fail with
1321    /// `TransactionError::ReadOnly`. Useful for replication replicas where
1322    /// the database itself must remain writable (for applying CDC changes)
1323    /// but client-facing queries must be read-only.
1324    #[must_use]
1325    pub fn session_read_only(&self) -> Session {
1326        self.create_session_inner_opts(None, true)
1327    }
1328
1329    /// Shared session creation logic.
1330    ///
1331    /// `cdc_override` overrides the database-wide `cdc_enabled` default when
1332    /// `Some`. `None` falls back to the database default.
1333    #[allow(unused_variables)] // cdc_override unused when cdc feature is off
1334    fn create_session_inner(&self, cdc_override: Option<bool>) -> Session {
1335        self.create_session_inner_opts(cdc_override, false)
1336    }
1337
1338    /// Shared session creation with all overrides.
1339    #[allow(unused_variables)]
1340    fn create_session_inner_opts(
1341        &self,
1342        cdc_override: Option<bool>,
1343        force_read_only: bool,
1344    ) -> Session {
1345        let session_cfg = || crate::session::SessionConfig {
1346            transaction_manager: Arc::clone(&self.transaction_manager),
1347            query_cache: Arc::clone(&self.query_cache),
1348            catalog: Arc::clone(&self.catalog),
1349            adaptive_config: self.config.adaptive.clone(),
1350            factorized_execution: self.config.factorized_execution,
1351            graph_model: self.config.graph_model,
1352            query_timeout: self.config.query_timeout,
1353            commit_counter: Arc::clone(&self.commit_counter),
1354            gc_interval: self.config.gc_interval,
1355            read_only: self.read_only || force_read_only,
1356        };
1357
1358        if let Some(ref ext_read) = self.external_read_store {
1359            return Session::with_external_store(
1360                Arc::clone(ext_read),
1361                self.external_write_store.as_ref().map(Arc::clone),
1362                session_cfg(),
1363            )
1364            .expect("arena allocation for external store session");
1365        }
1366
1367        #[cfg(all(feature = "lpg", feature = "triple-store"))]
1368        let mut session = Session::with_rdf_store_and_adaptive(
1369            Arc::clone(self.lpg_store()),
1370            Arc::clone(&self.rdf_store),
1371            session_cfg(),
1372        );
1373        #[cfg(all(feature = "lpg", not(feature = "triple-store")))]
1374        let mut session = Session::with_adaptive(Arc::clone(self.lpg_store()), session_cfg());
1375        #[cfg(not(feature = "lpg"))]
1376        let mut session =
1377            Session::with_external_store(self.graph_store(), self.graph_store_mut(), session_cfg())
1378                .expect("session creation for non-lpg build");
1379
1380        #[cfg(all(feature = "wal", feature = "lpg"))]
1381        if let Some(ref wal) = self.wal {
1382            session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
1383        }
1384
1385        #[cfg(feature = "cdc")]
1386        {
1387            let should_enable = cdc_override.unwrap_or_else(|| self.cdc_active());
1388            if should_enable {
1389                session.set_cdc_log(Arc::clone(&self.cdc_log));
1390            }
1391        }
1392
1393        #[cfg(feature = "metrics")]
1394        {
1395            if let Some(ref m) = self.metrics {
1396                session.set_metrics(Arc::clone(m));
1397                m.session_created
1398                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1399                m.session_active
1400                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1401            }
1402        }
1403
1404        // Propagate persistent graph context to the new session
1405        if let Some(ref graph) = *self.current_graph.read() {
1406            session.use_graph(graph);
1407        }
1408
1409        // Propagate persistent schema context to the new session
1410        if let Some(ref schema) = *self.current_schema.read() {
1411            session.set_schema(schema);
1412        }
1413
1414        // Suppress unused_mut when cdc/wal are disabled
1415        let _ = &mut session;
1416
1417        session
1418    }
1419
1420    /// Returns the current graph name, if any.
1421    ///
1422    /// This is the persistent graph context used by one-shot `execute()` calls.
1423    /// It is updated whenever `execute()` encounters `USE GRAPH`, `SESSION SET GRAPH`,
1424    /// or `SESSION RESET`.
1425    #[must_use]
1426    pub fn current_graph(&self) -> Option<String> {
1427        self.current_graph.read().clone()
1428    }
1429
1430    /// Sets the current graph context for subsequent one-shot `execute()` calls.
1431    ///
1432    /// This is equivalent to running `USE GRAPH <name>` but without creating a session.
1433    /// Pass `None` to reset to the default graph.
1434    ///
1435    /// # Errors
1436    ///
1437    /// Returns an error if the named graph does not exist.
1438    pub fn set_current_graph(&self, name: Option<&str>) -> Result<()> {
1439        #[cfg(feature = "lpg")]
1440        if let Some(name) = name
1441            && !name.eq_ignore_ascii_case("default")
1442            && let Some(store) = &self.store
1443            && store.graph(name).is_none()
1444        {
1445            return Err(Error::Query(QueryError::new(
1446                QueryErrorKind::Semantic,
1447                format!("Graph '{name}' does not exist"),
1448            )));
1449        }
1450        *self.current_graph.write() = name.map(ToString::to_string);
1451        Ok(())
1452    }
1453
1454    /// Returns the current schema name, if any.
1455    ///
1456    /// This is the persistent schema context used by one-shot `execute()` calls.
1457    /// It is updated whenever `execute()` encounters `SESSION SET SCHEMA` or `SESSION RESET`.
1458    #[must_use]
1459    pub fn current_schema(&self) -> Option<String> {
1460        self.current_schema.read().clone()
1461    }
1462
1463    /// Sets the current schema context for subsequent one-shot `execute()` calls.
1464    ///
1465    /// This is equivalent to running `SESSION SET SCHEMA <name>` but without creating
1466    /// a session. Pass `None` to clear the schema context.
1467    ///
1468    /// # Errors
1469    ///
1470    /// Returns an error if the named schema does not exist.
1471    pub fn set_current_schema(&self, name: Option<&str>) -> Result<()> {
1472        if let Some(name) = name
1473            && !self.catalog.schema_exists(name)
1474        {
1475            return Err(Error::Query(QueryError::new(
1476                QueryErrorKind::Semantic,
1477                format!("Schema '{name}' does not exist"),
1478            )));
1479        }
1480        *self.current_schema.write() = name.map(ToString::to_string);
1481        Ok(())
1482    }
1483
1484    /// Returns the adaptive execution configuration.
1485    #[must_use]
1486    pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
1487        &self.config.adaptive
1488    }
1489
1490    /// Returns `true` if this database was opened in read-only mode.
1491    #[must_use]
1492    pub fn is_read_only(&self) -> bool {
1493        self.read_only
1494    }
1495
1496    /// Returns the configuration.
1497    #[must_use]
1498    pub fn config(&self) -> &Config {
1499        &self.config
1500    }
1501
1502    /// Returns the graph data model of this database.
1503    #[must_use]
1504    pub fn graph_model(&self) -> crate::config::GraphModel {
1505        self.config.graph_model
1506    }
1507
1508    /// Returns the configured memory limit in bytes, if any.
1509    #[must_use]
1510    pub fn memory_limit(&self) -> Option<usize> {
1511        self.config.memory_limit
1512    }
1513
1514    /// Returns a point-in-time snapshot of all metrics.
1515    ///
1516    /// If the `metrics` feature is disabled or the registry is not
1517    /// initialized, returns a default (all-zero) snapshot.
1518    #[cfg(feature = "metrics")]
1519    #[must_use]
1520    pub fn metrics(&self) -> crate::metrics::MetricsSnapshot {
1521        let mut snapshot = self
1522            .metrics
1523            .as_ref()
1524            .map_or_else(crate::metrics::MetricsSnapshot::default, |m| m.snapshot());
1525
1526        // Augment with cache stats from the query cache (not tracked in the registry)
1527        let cache_stats = self.query_cache.stats();
1528        snapshot.cache_hits = cache_stats.parsed_hits + cache_stats.optimized_hits;
1529        snapshot.cache_misses = cache_stats.parsed_misses + cache_stats.optimized_misses;
1530        snapshot.cache_size = cache_stats.parsed_size + cache_stats.optimized_size;
1531        snapshot.cache_invalidations = cache_stats.invalidations;
1532
1533        snapshot
1534    }
1535
1536    /// Returns all metrics in Prometheus text exposition format.
1537    ///
1538    /// The output is ready to serve from an HTTP `/metrics` endpoint.
1539    #[cfg(feature = "metrics")]
1540    #[must_use]
1541    pub fn metrics_prometheus(&self) -> String {
1542        self.metrics
1543            .as_ref()
1544            .map_or_else(String::new, |m| m.to_prometheus())
1545    }
1546
1547    /// Resets all metrics counters and histograms to zero.
1548    #[cfg(feature = "metrics")]
1549    pub fn reset_metrics(&self) {
1550        if let Some(ref m) = self.metrics {
1551            m.reset();
1552        }
1553        self.query_cache.reset_stats();
1554    }
1555
1556    /// Returns the underlying (default) store.
1557    ///
1558    /// This provides direct access to the LPG store for algorithm implementations
1559    /// and admin operations (index management, schema introspection, MVCC internals).
1560    ///
1561    /// For code that only needs read/write graph operations, prefer
1562    /// [`graph_store()`](Self::graph_store) which returns the trait interface.
1563    #[cfg(feature = "lpg")]
1564    #[must_use]
1565    pub fn store(&self) -> &Arc<LpgStore> {
1566        self.lpg_store()
1567    }
1568
1569    // === Named Graph Management ===
1570
1571    /// Creates a named graph. Returns `true` if created, `false` if it already exists.
1572    ///
1573    /// # Errors
1574    ///
1575    /// Returns an error if arena allocation fails.
1576    #[cfg(feature = "lpg")]
1577    pub fn create_graph(&self, name: &str) -> Result<bool> {
1578        Ok(self.lpg_store().create_graph(name)?)
1579    }
1580
1581    /// Drops a named graph. Returns `true` if dropped, `false` if it did not exist.
1582    ///
1583    /// If the dropped graph was the active graph context, the context is reset
1584    /// to the default graph.
1585    #[cfg(feature = "lpg")]
1586    pub fn drop_graph(&self, name: &str) -> bool {
1587        let Some(store) = &self.store else {
1588            return false;
1589        };
1590        let dropped = store.drop_graph(name);
1591        if dropped {
1592            let mut current = self.current_graph.write();
1593            if current
1594                .as_deref()
1595                .is_some_and(|g| g.eq_ignore_ascii_case(name))
1596            {
1597                *current = None;
1598            }
1599        }
1600        dropped
1601    }
1602
1603    /// Returns all named graph names.
1604    #[cfg(feature = "lpg")]
1605    #[must_use]
1606    pub fn list_graphs(&self) -> Vec<String> {
1607        self.lpg_store().graph_names()
1608    }
1609
1610    /// Returns the graph store as a trait object.
1611    ///
1612    /// Returns a read-only trait object for the active graph store.
1613    ///
1614    /// This provides the [`GraphStore`] interface for code that only needs
1615    /// read operations. For write access, use [`graph_store_mut()`](Self::graph_store_mut).
1616    ///
1617    /// [`GraphStore`]: grafeo_core::graph::GraphStore
1618    #[must_use]
1619    pub fn graph_store(&self) -> Arc<dyn GraphStore> {
1620        if let Some(ref ext_read) = self.external_read_store {
1621            Arc::clone(ext_read)
1622        } else {
1623            #[cfg(feature = "lpg")]
1624            {
1625                Arc::clone(self.lpg_store()) as Arc<dyn GraphStore>
1626            }
1627            #[cfg(not(feature = "lpg"))]
1628            unreachable!("no graph store available: enable the `lpg` feature or use with_store()")
1629        }
1630    }
1631
1632    /// Returns the writable graph store, if available.
1633    ///
1634    /// Returns `None` for read-only databases created via
1635    /// [`with_read_store()`](Self::with_read_store).
1636    #[must_use]
1637    pub fn graph_store_mut(&self) -> Option<Arc<dyn GraphStoreMut>> {
1638        if self.external_read_store.is_some() {
1639            self.external_write_store.as_ref().map(Arc::clone)
1640        } else {
1641            #[cfg(feature = "lpg")]
1642            {
1643                Some(Arc::clone(self.lpg_store()) as Arc<dyn GraphStoreMut>)
1644            }
1645            #[cfg(not(feature = "lpg"))]
1646            {
1647                None
1648            }
1649        }
1650    }
1651
1652    /// Garbage collects old MVCC versions that are no longer visible.
1653    ///
1654    /// Determines the minimum epoch required by active transactions and prunes
1655    /// version chains older than that threshold. Also cleans up completed
1656    /// transaction metadata in the transaction manager, and prunes the CDC
1657    /// event log according to its retention policy.
1658    pub fn gc(&self) {
1659        #[cfg(feature = "lpg")]
1660        let current_epoch = {
1661            let min_epoch = self.transaction_manager.min_active_epoch();
1662            self.lpg_store().gc_versions(min_epoch);
1663            self.transaction_manager.current_epoch()
1664        };
1665        self.transaction_manager.gc();
1666
1667        // Prune CDC events based on retention config (epoch + count limits)
1668        #[cfg(feature = "cdc")]
1669        if self.cdc_enabled.load(std::sync::atomic::Ordering::Relaxed) {
1670            #[cfg(feature = "lpg")]
1671            self.cdc_log.apply_retention(current_epoch);
1672        }
1673    }
1674
1675    /// Returns the buffer manager for memory-aware operations.
1676    #[must_use]
1677    pub fn buffer_manager(&self) -> &Arc<BufferManager> {
1678        &self.buffer_manager
1679    }
1680
1681    /// Returns the query cache.
1682    #[must_use]
1683    pub fn query_cache(&self) -> &Arc<QueryCache> {
1684        &self.query_cache
1685    }
1686
1687    /// Clears all cached query plans.
1688    ///
1689    /// This is called automatically after DDL operations, but can also be
1690    /// invoked manually after external schema changes (e.g., WAL replay,
1691    /// import) or when you want to force re-optimization of all queries.
1692    pub fn clear_plan_cache(&self) {
1693        self.query_cache.clear();
1694    }
1695
1696    // =========================================================================
1697    // Lifecycle
1698    // =========================================================================
1699
1700    /// Closes the database, flushing all pending writes.
1701    ///
1702    /// For persistent databases, this ensures everything is safely on disk.
1703    /// Called automatically when the database is dropped, but you can call
1704    /// it explicitly if you need to guarantee durability at a specific point.
1705    ///
1706    /// # Errors
1707    ///
1708    /// Returns an error if the WAL can't be flushed (check disk space/permissions).
1709    pub fn close(&self) -> Result<()> {
1710        let mut is_open = self.is_open.write();
1711        if !*is_open {
1712            return Ok(());
1713        }
1714
1715        // Stop the periodic checkpoint timer first, even for read-only databases.
1716        // compact() can switch a writable DB to read-only after the timer started,
1717        // so the timer must be stopped before any early return to avoid racing
1718        // with the closed file manager.
1719        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
1720        if let Some(mut timer) = self.checkpoint_timer.lock().take() {
1721            timer.stop();
1722        }
1723
1724        // Read-only databases: just release the shared lock, no checkpointing
1725        if self.read_only {
1726            #[cfg(feature = "grafeo-file")]
1727            if let Some(ref fm) = self.file_manager {
1728                fm.close()?;
1729            }
1730            *is_open = false;
1731            return Ok(());
1732        }
1733
1734        // For single-file format: checkpoint to .grafeo file, then clean up sidecar WAL.
1735        // We must do this BEFORE the WAL close path because checkpoint_to_file
1736        // removes the sidecar WAL directory.
1737        #[cfg(feature = "grafeo-file")]
1738        let is_single_file = self.file_manager.is_some();
1739        #[cfg(not(feature = "grafeo-file"))]
1740        let is_single_file = false;
1741
1742        #[cfg(feature = "grafeo-file")]
1743        if let Some(ref fm) = self.file_manager {
1744            // Flush WAL first so all records are on disk before we snapshot
1745            #[cfg(feature = "wal")]
1746            if let Some(ref wal) = self.wal {
1747                wal.sync()?;
1748            }
1749            let flush_result = self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?;
1750
1751            // Safety check: if WAL has records but the checkpoint was a no-op
1752            // (zero sections written), the container file may not contain the
1753            // latest data. This can happen when sections are not marked dirty
1754            // despite mutations going through the WAL. Force-dirty all sections
1755            // and retry before removing the sidecar.
1756            #[cfg(feature = "wal")]
1757            let flush_result = if flush_result.sections_written == 0 {
1758                if let Some(ref wal) = self.wal {
1759                    if wal.record_count() > 0 {
1760                        grafeo_warn!(
1761                            "WAL has {} records but checkpoint wrote 0 sections; retrying with forced flush",
1762                            wal.record_count()
1763                        );
1764                        self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?
1765                    } else {
1766                        flush_result
1767                    }
1768                } else {
1769                    flush_result
1770                }
1771            } else {
1772                flush_result
1773            };
1774
1775            // Release WAL file handles before removing sidecar directory.
1776            // On Windows, open handles prevent directory deletion.
1777            #[cfg(feature = "wal")]
1778            if let Some(ref wal) = self.wal {
1779                wal.close_active_log();
1780            }
1781
1782            // Only remove the sidecar WAL after verifying the checkpoint wrote
1783            // data to the container. If nothing was written and the WAL had
1784            // records, keep the sidecar so the next open can recover from it.
1785            #[cfg(feature = "wal")]
1786            let has_wal_records = self.wal.as_ref().is_some_and(|wal| wal.record_count() > 0);
1787            #[cfg(not(feature = "wal"))]
1788            let has_wal_records = false;
1789
1790            if flush_result.sections_written > 0 || !has_wal_records {
1791                {
1792                    use grafeo_common::testing::crash::maybe_crash;
1793                    maybe_crash("close:before_remove_sidecar_wal");
1794                }
1795                fm.remove_sidecar_wal()?;
1796            } else {
1797                grafeo_warn!(
1798                    "keeping sidecar WAL for recovery: checkpoint wrote 0 sections but WAL has records"
1799                );
1800            }
1801            fm.close()?;
1802        }
1803
1804        // Commit and sync WAL (legacy directory format only).
1805        // We intentionally do NOT call wal.checkpoint() here. Directory format
1806        // has no snapshot: the WAL files are the sole source of truth. Writing
1807        // checkpoint.meta would cause recovery to skip older WAL files, losing
1808        // all data that predates the current log sequence.
1809        #[cfg(feature = "wal")]
1810        if !is_single_file && let Some(ref wal) = self.wal {
1811            // Use the last assigned transaction ID, or create one for the commit record
1812            let commit_tx = self
1813                .transaction_manager
1814                .last_assigned_transaction_id()
1815                .unwrap_or_else(|| self.transaction_manager.begin());
1816
1817            // Log a TransactionCommit to mark all pending records as committed
1818            wal.log(&WalRecord::TransactionCommit {
1819                transaction_id: commit_tx,
1820            })?;
1821
1822            wal.sync()?;
1823        }
1824
1825        *is_open = false;
1826        Ok(())
1827    }
1828
1829    /// Returns the typed WAL if available.
1830    #[cfg(feature = "wal")]
1831    #[must_use]
1832    pub fn wal(&self) -> Option<&Arc<LpgWal>> {
1833        self.wal.as_ref()
1834    }
1835
1836    /// Logs a WAL record if WAL is enabled.
1837    #[cfg(feature = "wal")]
1838    pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
1839        if let Some(ref wal) = self.wal {
1840            wal.log(record)?;
1841        }
1842        Ok(())
1843    }
1844
1845    /// Registers storage sections as [`MemoryConsumer`]s with the BufferManager.
1846    ///
1847    /// Each section reports its memory usage to the buffer manager, enabling
1848    /// accurate pressure tracking. Called once after database construction.
1849    fn register_section_consumers(&mut self) {
1850        #[cfg(feature = "lpg")]
1851        let store_ref = self.store.as_ref();
1852        #[cfg(not(feature = "lpg"))]
1853        // LPG store section
1854        #[cfg(feature = "lpg")]
1855        if let Some(store) = store_ref {
1856            let lpg = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
1857            self.buffer_manager.register_consumer(Arc::new(
1858                section_consumer::SectionConsumer::new(Arc::new(lpg)),
1859            ));
1860        }
1861
1862        // RDF store: only when data exists
1863        #[cfg(feature = "triple-store")]
1864        if !self.rdf_store.is_empty() || self.rdf_store.graph_count() > 0 {
1865            let rdf = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(&self.rdf_store));
1866            self.buffer_manager.register_consumer(Arc::new(
1867                section_consumer::SectionConsumer::new(Arc::new(rdf)),
1868            ));
1869        }
1870
1871        // Vector indexes: dynamic consumer that re-queries the store on each
1872        // memory_usage() call, so dropped indexes are freed and new ones tracked.
1873        #[cfg(all(
1874            feature = "lpg",
1875            feature = "vector-index",
1876            feature = "mmap",
1877            not(feature = "temporal")
1878        ))]
1879        if let Some(store) = store_ref {
1880            let spill_path = self.buffer_manager.config().spill_path.clone();
1881            let consumer = Arc::new(section_consumer::VectorIndexConsumer::new(
1882                store, spill_path,
1883            ));
1884            // Share the spill registry with the search path
1885            self.vector_spill_storages = Some(Arc::clone(consumer.spilled_storages()));
1886            self.buffer_manager.register_consumer(consumer);
1887        }
1888
1889        // Text indexes: same dynamic approach as vector indexes.
1890        #[cfg(all(feature = "lpg", feature = "text-index"))]
1891        if let Some(store) = store_ref {
1892            self.buffer_manager
1893                .register_consumer(Arc::new(section_consumer::TextIndexConsumer::new(store)));
1894        }
1895
1896        // CDC log: register as memory consumer so the buffer manager can
1897        // prune events under memory pressure.
1898        #[cfg(feature = "cdc")]
1899        self.buffer_manager.register_consumer(
1900            Arc::clone(&self.cdc_log) as Arc<dyn grafeo_common::memory::MemoryConsumer>
1901        );
1902    }
1903
1904    /// Discovers and re-opens spill files from a previous session.
1905    ///
1906    /// When the database was closed with spilled vector embeddings, the
1907    /// `vectors_*.bin` files persist in the spill directory. This method
1908    /// scans for them, opens each as `MmapStorage`, and registers them
1909    /// in the `vector_spill_storages` map so search can read from them.
1910    #[cfg(all(
1911        feature = "lpg",
1912        feature = "vector-index",
1913        feature = "mmap",
1914        not(feature = "temporal")
1915    ))]
1916    fn restore_spill_files(&mut self) {
1917        use grafeo_core::index::vector::MmapStorage;
1918
1919        let spill_dir = match self.buffer_manager.config().spill_path {
1920            Some(ref path) => path.clone(),
1921            None => return,
1922        };
1923
1924        if !spill_dir.exists() {
1925            return;
1926        }
1927
1928        let spill_map = match self.vector_spill_storages {
1929            Some(ref map) => Arc::clone(map),
1930            None => return,
1931        };
1932
1933        let Ok(entries) = std::fs::read_dir(&spill_dir) else {
1934            return;
1935        };
1936
1937        let Some(ref store) = self.store else {
1938            return;
1939        };
1940
1941        for entry in entries.flatten() {
1942            let path = entry.path();
1943            let file_name = match path.file_name().and_then(|n| n.to_str()) {
1944                Some(name) => name.to_string(),
1945                None => continue,
1946            };
1947
1948            // Match pattern: vectors_{key}.bin where key is percent-encoded
1949            if !file_name.starts_with("vectors_")
1950                || !std::path::Path::new(&file_name)
1951                    .extension()
1952                    .is_some_and(|ext| ext.eq_ignore_ascii_case("bin"))
1953            {
1954                continue;
1955            }
1956
1957            // Extract and decode key: "vectors_Label%3Aembedding.bin" -> "Label:embedding"
1958            let key_part = &file_name["vectors_".len()..file_name.len() - ".bin".len()];
1959
1960            // Percent-decode: %3A -> ':', %25 -> '%'
1961            let key = key_part.replace("%3A", ":").replace("%25", "%");
1962
1963            // Key must contain ':' (label:property format)
1964            if !key.contains(':') {
1965                // Legacy file with old encoding, skip (will be re-created on next spill)
1966                continue;
1967            }
1968
1969            // Only restore if the corresponding vector index exists
1970            if store.get_vector_index_by_key(&key).is_none() {
1971                // Stale spill file (index was dropped), clean it up
1972                let _ = std::fs::remove_file(&path);
1973                continue;
1974            }
1975
1976            // Open the MmapStorage
1977            match MmapStorage::open(&path) {
1978                Ok(mmap_storage) => {
1979                    // Mark the property column as spilled so get() returns None
1980                    let property = key.split(':').nth(1).unwrap_or("");
1981                    let prop_key = grafeo_common::types::PropertyKey::new(property);
1982                    store.node_properties_mark_spilled(&prop_key);
1983
1984                    spill_map.write().insert(key, Arc::new(mmap_storage));
1985                }
1986                Err(e) => {
1987                    eprintln!("failed to restore spill file {}: {e}", path.display());
1988                    // Remove corrupt spill file
1989                    let _ = std::fs::remove_file(&path);
1990                }
1991            }
1992        }
1993    }
1994
1995    /// Builds section objects for the current database state.
1996    #[cfg(feature = "grafeo-file")]
1997    fn build_sections(&self) -> Vec<Box<dyn grafeo_common::storage::Section>> {
1998        let mut sections: Vec<Box<dyn grafeo_common::storage::Section>> = Vec::new();
1999
2000        // LPG sections: store, catalog, vector indexes, text indexes
2001        #[cfg(feature = "lpg")]
2002        if let Some(store) = self.store.as_ref() {
2003            let lpg = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
2004
2005            let catalog = catalog_section::CatalogSection::new(
2006                Arc::clone(&self.catalog),
2007                Arc::clone(store),
2008                {
2009                    let tm = Arc::clone(&self.transaction_manager);
2010                    move || tm.current_epoch().as_u64()
2011                },
2012            );
2013
2014            sections.push(Box::new(catalog));
2015            sections.push(Box::new(lpg));
2016
2017            // Vector indexes: persist HNSW topology to avoid rebuild on load
2018            #[cfg(feature = "vector-index")]
2019            {
2020                let indexes = store.vector_index_entries();
2021                if !indexes.is_empty() {
2022                    let vector = grafeo_core::index::vector::VectorStoreSection::new(indexes);
2023                    sections.push(Box::new(vector));
2024                }
2025            }
2026
2027            // Text indexes: persist BM25 postings to avoid rebuild on load
2028            #[cfg(feature = "text-index")]
2029            {
2030                let indexes = store.text_index_entries();
2031                if !indexes.is_empty() {
2032                    let text = grafeo_core::index::text::TextIndexSection::new(indexes);
2033                    sections.push(Box::new(text));
2034                }
2035            }
2036        }
2037
2038        #[cfg(feature = "triple-store")]
2039        if !self.rdf_store.is_empty() || self.rdf_store.graph_count() > 0 {
2040            let rdf = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(&self.rdf_store));
2041            sections.push(Box::new(rdf));
2042        }
2043
2044        sections
2045    }
2046
2047    // =========================================================================
2048    // Backup API
2049    // =========================================================================
2050
2051    /// Creates a full backup of the database in the given directory.
2052    ///
2053    /// Checkpoints the database, copies the `.grafeo` file, and creates a
2054    /// backup manifest. Subsequent incremental backups will use this as the
2055    /// base.
2056    ///
2057    /// # Errors
2058    ///
2059    /// Returns an error if the database has no file manager or I/O fails.
2060    #[cfg(all(feature = "wal", feature = "grafeo-file", feature = "lpg"))]
2061    pub fn backup_full(&self, backup_dir: &std::path::Path) -> Result<backup::BackupSegment> {
2062        let fm = self
2063            .file_manager
2064            .as_ref()
2065            .ok_or_else(|| Error::Internal("backup requires a persistent database".to_string()))?;
2066
2067        // Checkpoint first to ensure the container has latest data
2068        let _ = self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?;
2069
2070        let current_epoch = self.transaction_manager.current_epoch();
2071        backup::do_backup_full(backup_dir, fm.path(), self.wal.as_deref(), current_epoch)
2072    }
2073
2074    /// Creates an incremental backup containing WAL records since the last backup.
2075    ///
2076    /// Requires a prior full backup in the backup directory.
2077    ///
2078    /// # Errors
2079    ///
2080    /// Returns an error if no full backup exists, or if the WAL has no new records.
2081    #[cfg(all(feature = "wal", feature = "grafeo-file", feature = "lpg"))]
2082    pub fn backup_incremental(
2083        &self,
2084        backup_dir: &std::path::Path,
2085    ) -> Result<backup::BackupSegment> {
2086        let wal = self
2087            .wal
2088            .as_ref()
2089            .ok_or_else(|| Error::Internal("incremental backup requires WAL".to_string()))?;
2090
2091        let current_epoch = self.transaction_manager.current_epoch();
2092        backup::do_backup_incremental(backup_dir, wal, current_epoch)
2093    }
2094
2095    /// Returns the backup manifest for a backup directory, if one exists.
2096    ///
2097    /// # Errors
2098    ///
2099    /// Returns an error if the manifest file exists but cannot be parsed.
2100    #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2101    pub fn read_backup_manifest(
2102        backup_dir: &std::path::Path,
2103    ) -> Result<Option<backup::BackupManifest>> {
2104        backup::read_manifest(backup_dir)
2105    }
2106
2107    /// Returns the current backup cursor (last backed-up position), if any.
2108    #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2109    #[must_use]
2110    pub fn backup_cursor(&self) -> Option<backup::BackupCursor> {
2111        self.wal
2112            .as_ref()
2113            .and_then(|wal| backup::read_backup_cursor(wal.dir()).ok().flatten())
2114    }
2115
2116    /// Restores a database from a backup chain to a specific epoch.
2117    ///
2118    /// Copies the full backup to `output_path`, then replays incremental
2119    /// WAL segments up to `target_epoch`. The restored database can be
2120    /// opened with [`GrafeoDB::open`].
2121    ///
2122    /// # Errors
2123    ///
2124    /// Returns an error if the backup chain does not cover the target epoch,
2125    /// segment checksums fail, or I/O fails.
2126    #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2127    pub fn restore_to_epoch(
2128        backup_dir: &std::path::Path,
2129        target_epoch: grafeo_common::types::EpochId,
2130        output_path: &std::path::Path,
2131    ) -> Result<()> {
2132        backup::do_restore_to_epoch(backup_dir, target_epoch, output_path)
2133    }
2134
2135    /// Writes the current database state to the `.grafeo` file using the unified flush.
2136    ///
2137    /// Does NOT remove the sidecar WAL: callers that want to clean up
2138    /// the sidecar (e.g. `close()`) should call `fm.remove_sidecar_wal()`
2139    /// separately after this returns.
2140    #[cfg(feature = "grafeo-file")]
2141    fn checkpoint_to_file(
2142        &self,
2143        fm: &GrafeoFileManager,
2144        reason: flush::FlushReason,
2145    ) -> Result<flush::FlushResult> {
2146        let sections = self.build_sections();
2147        let section_refs: Vec<&dyn grafeo_common::storage::Section> =
2148            sections.iter().map(|s| s.as_ref()).collect();
2149        #[cfg(feature = "lpg")]
2150        let context = flush::build_context(self.lpg_store(), &self.transaction_manager);
2151        #[cfg(not(feature = "lpg"))]
2152        let context = flush::build_context_minimal(&self.transaction_manager);
2153
2154        flush::flush(
2155            fm,
2156            &section_refs,
2157            &context,
2158            reason,
2159            #[cfg(feature = "wal")]
2160            self.wal.as_deref(),
2161        )
2162    }
2163
2164    /// Returns the file manager if using single-file format.
2165    #[cfg(feature = "grafeo-file")]
2166    #[must_use]
2167    pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
2168        self.file_manager.as_ref()
2169    }
2170}
2171
2172impl Drop for GrafeoDB {
2173    fn drop(&mut self) {
2174        if let Err(e) = self.close() {
2175            grafeo_error!("Error closing database: {}", e);
2176        }
2177    }
2178}
2179
2180#[cfg(feature = "lpg")]
2181impl crate::admin::AdminService for GrafeoDB {
2182    fn info(&self) -> crate::admin::DatabaseInfo {
2183        self.info()
2184    }
2185
2186    fn detailed_stats(&self) -> crate::admin::DatabaseStats {
2187        self.detailed_stats()
2188    }
2189
2190    fn schema(&self) -> crate::admin::SchemaInfo {
2191        self.schema()
2192    }
2193
2194    fn validate(&self) -> crate::admin::ValidationResult {
2195        self.validate()
2196    }
2197
2198    fn wal_status(&self) -> crate::admin::WalStatus {
2199        self.wal_status()
2200    }
2201
2202    fn wal_checkpoint(&self) -> Result<()> {
2203        self.wal_checkpoint()
2204    }
2205}
2206
2207// =========================================================================
2208// Query Result Types
2209// =========================================================================
2210
2211/// The result of running a query.
2212///
2213/// Contains rows and columns, like a table. Use [`iter()`](Self::iter) to
2214/// loop through rows, or [`scalar()`](Self::scalar) if you expect a single value.
2215///
2216/// # Examples
2217///
2218/// ```
2219/// use grafeo_engine::GrafeoDB;
2220///
2221/// let db = GrafeoDB::new_in_memory();
2222/// db.create_node(&["Person"]);
2223///
2224/// let result = db.execute("MATCH (p:Person) RETURN count(p) AS total")?;
2225///
2226/// // Check what we got
2227/// println!("Columns: {:?}", result.columns);
2228/// println!("Rows: {}", result.row_count());
2229///
2230/// // Iterate through results
2231/// for row in result.iter() {
2232///     println!("{:?}", row);
2233/// }
2234/// # Ok::<(), grafeo_common::utils::error::Error>(())
2235/// ```
2236#[derive(Debug)]
2237pub struct QueryResult {
2238    /// Column names from the RETURN clause.
2239    pub columns: Vec<String>,
2240    /// Column types - useful for distinguishing NodeId/EdgeId from plain integers.
2241    pub column_types: Vec<grafeo_common::types::LogicalType>,
2242    /// The actual result rows.
2243    ///
2244    /// Use [`rows()`](Self::rows) for borrowed access or
2245    /// [`into_rows()`](Self::into_rows) to take ownership.
2246    pub(crate) rows: Vec<Vec<grafeo_common::types::Value>>,
2247    /// Query execution time in milliseconds (if timing was enabled).
2248    pub execution_time_ms: Option<f64>,
2249    /// Number of rows scanned during query execution (estimate).
2250    pub rows_scanned: Option<u64>,
2251    /// Status message for DDL and session commands (e.g., "Created node type 'Person'").
2252    pub status_message: Option<String>,
2253    /// GQLSTATUS code per ISO/IEC 39075:2024, sec 23.
2254    pub gql_status: grafeo_common::utils::GqlStatus,
2255}
2256
2257impl QueryResult {
2258    /// Creates a fully empty query result (no columns, no rows).
2259    #[must_use]
2260    pub fn empty() -> Self {
2261        Self {
2262            columns: Vec::new(),
2263            column_types: Vec::new(),
2264            rows: Vec::new(),
2265            execution_time_ms: None,
2266            rows_scanned: None,
2267            status_message: None,
2268            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2269        }
2270    }
2271
2272    /// Creates a query result with only a status message (for DDL commands).
2273    #[must_use]
2274    pub fn status(msg: impl Into<String>) -> Self {
2275        Self {
2276            columns: Vec::new(),
2277            column_types: Vec::new(),
2278            rows: Vec::new(),
2279            execution_time_ms: None,
2280            rows_scanned: None,
2281            status_message: Some(msg.into()),
2282            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2283        }
2284    }
2285
2286    /// Creates a new empty query result.
2287    #[must_use]
2288    pub fn new(columns: Vec<String>) -> Self {
2289        let len = columns.len();
2290        Self {
2291            columns,
2292            column_types: vec![grafeo_common::types::LogicalType::Any; len],
2293            rows: Vec::new(),
2294            execution_time_ms: None,
2295            rows_scanned: None,
2296            status_message: None,
2297            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2298        }
2299    }
2300
2301    /// Creates a new empty query result with column types.
2302    #[must_use]
2303    pub fn with_types(
2304        columns: Vec<String>,
2305        column_types: Vec<grafeo_common::types::LogicalType>,
2306    ) -> Self {
2307        Self {
2308            columns,
2309            column_types,
2310            rows: Vec::new(),
2311            execution_time_ms: None,
2312            rows_scanned: None,
2313            status_message: None,
2314            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2315        }
2316    }
2317
2318    /// Creates a query result with pre-populated rows.
2319    #[must_use]
2320    pub fn from_rows(columns: Vec<String>, rows: Vec<Vec<grafeo_common::types::Value>>) -> Self {
2321        let len = columns.len();
2322        Self {
2323            columns,
2324            column_types: vec![grafeo_common::types::LogicalType::Any; len],
2325            rows,
2326            execution_time_ms: None,
2327            rows_scanned: None,
2328            status_message: None,
2329            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2330        }
2331    }
2332
2333    /// Appends a row to this result.
2334    pub fn push_row(&mut self, row: Vec<grafeo_common::types::Value>) {
2335        self.rows.push(row);
2336    }
2337
2338    /// Sets the execution metrics on this result.
2339    pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
2340        self.execution_time_ms = Some(execution_time_ms);
2341        self.rows_scanned = Some(rows_scanned);
2342        self
2343    }
2344
2345    /// Returns the execution time in milliseconds, if available.
2346    #[must_use]
2347    pub fn execution_time_ms(&self) -> Option<f64> {
2348        self.execution_time_ms
2349    }
2350
2351    /// Returns the number of rows scanned, if available.
2352    #[must_use]
2353    pub fn rows_scanned(&self) -> Option<u64> {
2354        self.rows_scanned
2355    }
2356
2357    /// Returns the number of rows.
2358    #[must_use]
2359    pub fn row_count(&self) -> usize {
2360        self.rows.len()
2361    }
2362
2363    /// Returns the number of columns.
2364    #[must_use]
2365    pub fn column_count(&self) -> usize {
2366        self.columns.len()
2367    }
2368
2369    /// Returns true if the result is empty.
2370    #[must_use]
2371    pub fn is_empty(&self) -> bool {
2372        self.rows.is_empty()
2373    }
2374
2375    /// Extracts a single value from the result.
2376    ///
2377    /// Use this when your query returns exactly one row with one column,
2378    /// like `RETURN count(n)` or `RETURN sum(p.amount)`.
2379    ///
2380    /// # Errors
2381    ///
2382    /// Returns an error if the result has multiple rows or columns.
2383    pub fn scalar<T: FromValue>(&self) -> Result<T> {
2384        if self.rows.len() != 1 || self.columns.len() != 1 {
2385            return Err(grafeo_common::utils::error::Error::InvalidValue(
2386                "Expected single value".to_string(),
2387            ));
2388        }
2389        T::from_value(&self.rows[0][0])
2390    }
2391
2392    /// Returns a slice of all result rows.
2393    #[must_use]
2394    pub fn rows(&self) -> &[Vec<grafeo_common::types::Value>] {
2395        &self.rows
2396    }
2397
2398    /// Takes ownership of all result rows.
2399    #[must_use]
2400    pub fn into_rows(self) -> Vec<Vec<grafeo_common::types::Value>> {
2401        self.rows
2402    }
2403
2404    /// Returns an iterator over the rows.
2405    pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
2406        self.rows.iter()
2407    }
2408
2409    /// Converts this query result to an Arrow [`RecordBatch`](arrow_array::RecordBatch).
2410    ///
2411    /// Each column in the result becomes an Arrow array. Type mapping:
2412    /// - `Int64` / `Float64` / `Bool` / `String` / `Bytes`: direct Arrow equivalents
2413    /// - `Timestamp` / `ZonedDatetime`: `Timestamp(Microsecond, UTC)`
2414    /// - `Date`: `Date32`, `Time`: `Time64(Nanosecond)`
2415    /// - `Vector`: `FixedSizeList(Float32, dim)`
2416    /// - `Duration` / `List` / `Map` / `Path`: serialized as `Utf8`
2417    ///
2418    /// Heterogeneous columns (mixed types) fall back to `Utf8`.
2419    ///
2420    /// # Errors
2421    ///
2422    /// Returns [`ArrowExportError`](arrow::ArrowExportError) if Arrow array construction fails.
2423    #[cfg(feature = "arrow-export")]
2424    pub fn to_record_batch(
2425        &self,
2426    ) -> std::result::Result<arrow_array::RecordBatch, arrow::ArrowExportError> {
2427        arrow::query_result_to_record_batch(&self.columns, &self.column_types, &self.rows)
2428    }
2429
2430    /// Serializes this query result as Arrow IPC stream bytes.
2431    ///
2432    /// The returned bytes can be read by any Arrow implementation:
2433    /// - Python: `pyarrow.ipc.open_stream(buf).read_all()`
2434    /// - Polars: `pl.read_ipc(buf)`
2435    /// - Node.js: `apache-arrow` `RecordBatchStreamReader`
2436    ///
2437    /// # Errors
2438    ///
2439    /// Returns [`ArrowExportError`](arrow::ArrowExportError) on conversion or serialization failure.
2440    #[cfg(feature = "arrow-export")]
2441    pub fn to_arrow_ipc(&self) -> std::result::Result<Vec<u8>, arrow::ArrowExportError> {
2442        let batch = self.to_record_batch()?;
2443        arrow::record_batch_to_ipc_stream(&batch)
2444    }
2445}
2446
2447impl std::fmt::Display for QueryResult {
2448    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2449        let table = grafeo_common::fmt::format_result_table(
2450            &self.columns,
2451            &self.rows,
2452            self.execution_time_ms,
2453            self.status_message.as_deref(),
2454        );
2455        f.write_str(&table)
2456    }
2457}
2458
2459/// Converts a [`grafeo_common::types::Value`] to a concrete Rust type.
2460///
2461/// Implemented for common types like `i64`, `f64`, `String`, and `bool`.
2462/// Used by [`QueryResult::scalar()`] to extract typed values.
2463pub trait FromValue: Sized {
2464    /// Attempts the conversion, returning an error on type mismatch.
2465    ///
2466    /// # Errors
2467    ///
2468    /// Returns `Error::TypeMismatch` if the value is not the expected type.
2469    fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
2470}
2471
2472impl FromValue for i64 {
2473    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2474        value
2475            .as_int64()
2476            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2477                expected: "INT64".to_string(),
2478                found: value.type_name().to_string(),
2479            })
2480    }
2481}
2482
2483impl FromValue for f64 {
2484    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2485        value
2486            .as_float64()
2487            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2488                expected: "FLOAT64".to_string(),
2489                found: value.type_name().to_string(),
2490            })
2491    }
2492}
2493
2494impl FromValue for String {
2495    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2496        value.as_str().map(String::from).ok_or_else(|| {
2497            grafeo_common::utils::error::Error::TypeMismatch {
2498                expected: "STRING".to_string(),
2499                found: value.type_name().to_string(),
2500            }
2501        })
2502    }
2503}
2504
2505impl FromValue for bool {
2506    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2507        value
2508            .as_bool()
2509            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2510                expected: "BOOL".to_string(),
2511                found: value.type_name().to_string(),
2512            })
2513    }
2514}
2515
2516#[cfg(test)]
2517mod tests {
2518    use super::*;
2519
2520    #[test]
2521    fn test_create_in_memory_database() {
2522        let db = GrafeoDB::new_in_memory();
2523        assert_eq!(db.node_count(), 0);
2524        assert_eq!(db.edge_count(), 0);
2525    }
2526
2527    #[test]
2528    fn test_database_config() {
2529        let config = Config::in_memory().with_threads(4).with_query_logging();
2530
2531        let db = GrafeoDB::with_config(config).unwrap();
2532        assert_eq!(db.config().threads, 4);
2533        assert!(db.config().query_logging);
2534    }
2535
2536    #[test]
2537    fn test_database_session() {
2538        let db = GrafeoDB::new_in_memory();
2539        let _session = db.session();
2540        // Session should be created successfully
2541    }
2542
2543    #[cfg(feature = "wal")]
2544    #[test]
2545    fn test_persistent_database_recovery() {
2546        use grafeo_common::types::Value;
2547        use tempfile::tempdir;
2548
2549        let dir = tempdir().unwrap();
2550        let db_path = dir.path().join("test_db");
2551
2552        // Create database and add some data
2553        {
2554            let db = GrafeoDB::open(&db_path).unwrap();
2555
2556            let alix = db.create_node(&["Person"]);
2557            db.set_node_property(alix, "name", Value::from("Alix"));
2558
2559            let gus = db.create_node(&["Person"]);
2560            db.set_node_property(gus, "name", Value::from("Gus"));
2561
2562            let _edge = db.create_edge(alix, gus, "KNOWS");
2563
2564            // Explicitly close to flush WAL
2565            db.close().unwrap();
2566        }
2567
2568        // Reopen and verify data was recovered
2569        {
2570            let db = GrafeoDB::open(&db_path).unwrap();
2571
2572            assert_eq!(db.node_count(), 2);
2573            assert_eq!(db.edge_count(), 1);
2574
2575            // Verify nodes exist
2576            let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
2577            assert!(node0.is_some());
2578
2579            let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
2580            assert!(node1.is_some());
2581        }
2582    }
2583
2584    #[cfg(feature = "wal")]
2585    #[test]
2586    fn test_wal_logging() {
2587        use tempfile::tempdir;
2588
2589        let dir = tempdir().unwrap();
2590        let db_path = dir.path().join("wal_test_db");
2591
2592        let db = GrafeoDB::open(&db_path).unwrap();
2593
2594        // Create some data
2595        let node = db.create_node(&["Test"]);
2596        db.delete_node(node);
2597
2598        // WAL should have records
2599        if let Some(wal) = db.wal() {
2600            assert!(wal.record_count() > 0);
2601        }
2602
2603        db.close().unwrap();
2604    }
2605
2606    #[cfg(feature = "wal")]
2607    #[test]
2608    fn test_wal_recovery_multiple_sessions() {
2609        // Tests that WAL recovery works correctly across multiple open/close cycles
2610        use grafeo_common::types::Value;
2611        use tempfile::tempdir;
2612
2613        let dir = tempdir().unwrap();
2614        let db_path = dir.path().join("multi_session_db");
2615
2616        // Session 1: Create initial data
2617        {
2618            let db = GrafeoDB::open(&db_path).unwrap();
2619            let alix = db.create_node(&["Person"]);
2620            db.set_node_property(alix, "name", Value::from("Alix"));
2621            db.close().unwrap();
2622        }
2623
2624        // Session 2: Add more data
2625        {
2626            let db = GrafeoDB::open(&db_path).unwrap();
2627            assert_eq!(db.node_count(), 1); // Previous data recovered
2628            let gus = db.create_node(&["Person"]);
2629            db.set_node_property(gus, "name", Value::from("Gus"));
2630            db.close().unwrap();
2631        }
2632
2633        // Session 3: Verify all data
2634        {
2635            let db = GrafeoDB::open(&db_path).unwrap();
2636            assert_eq!(db.node_count(), 2);
2637
2638            // Verify properties were recovered correctly
2639            let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
2640            assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
2641
2642            let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
2643            assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
2644        }
2645    }
2646
2647    #[cfg(feature = "wal")]
2648    #[test]
2649    fn test_database_consistency_after_mutations() {
2650        // Tests that database remains consistent after a series of create/delete operations
2651        use grafeo_common::types::Value;
2652        use tempfile::tempdir;
2653
2654        let dir = tempdir().unwrap();
2655        let db_path = dir.path().join("consistency_db");
2656
2657        {
2658            let db = GrafeoDB::open(&db_path).unwrap();
2659
2660            // Create nodes
2661            let a = db.create_node(&["Node"]);
2662            let b = db.create_node(&["Node"]);
2663            let c = db.create_node(&["Node"]);
2664
2665            // Create edges
2666            let e1 = db.create_edge(a, b, "LINKS");
2667            let _e2 = db.create_edge(b, c, "LINKS");
2668
2669            // Delete middle node and its edge
2670            db.delete_edge(e1);
2671            db.delete_node(b);
2672
2673            // Set properties on remaining nodes
2674            db.set_node_property(a, "value", Value::Int64(1));
2675            db.set_node_property(c, "value", Value::Int64(3));
2676
2677            db.close().unwrap();
2678        }
2679
2680        // Reopen and verify consistency
2681        {
2682            let db = GrafeoDB::open(&db_path).unwrap();
2683
2684            // Should have 2 nodes (a and c), b was deleted
2685            // Note: node_count includes deleted nodes in some implementations
2686            // What matters is that the non-deleted nodes are accessible
2687            let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
2688            assert!(node_a.is_some());
2689
2690            let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
2691            assert!(node_c.is_some());
2692
2693            // Middle node should be deleted
2694            let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
2695            assert!(node_b.is_none());
2696        }
2697    }
2698
2699    #[cfg(feature = "wal")]
2700    #[test]
2701    fn test_close_is_idempotent() {
2702        // Calling close() multiple times should not cause errors
2703        use tempfile::tempdir;
2704
2705        let dir = tempdir().unwrap();
2706        let db_path = dir.path().join("close_test_db");
2707
2708        let db = GrafeoDB::open(&db_path).unwrap();
2709        db.create_node(&["Test"]);
2710
2711        // First close should succeed
2712        assert!(db.close().is_ok());
2713
2714        // Second close should also succeed (idempotent)
2715        assert!(db.close().is_ok());
2716    }
2717
2718    #[test]
2719    fn test_with_store_external_backend() {
2720        use grafeo_core::graph::lpg::LpgStore;
2721
2722        let external = Arc::new(LpgStore::new().unwrap());
2723
2724        // Seed data on the external store directly
2725        let n1 = external.create_node(&["Person"]);
2726        external.set_node_property(n1, "name", grafeo_common::types::Value::from("Alix"));
2727
2728        let db = GrafeoDB::with_store(
2729            Arc::clone(&external) as Arc<dyn GraphStoreMut>,
2730            Config::in_memory(),
2731        )
2732        .unwrap();
2733
2734        let session = db.session();
2735
2736        // Session should see data from the external store via execute
2737        #[cfg(feature = "gql")]
2738        {
2739            let result = session.execute("MATCH (p:Person) RETURN p.name").unwrap();
2740            assert_eq!(result.rows.len(), 1);
2741        }
2742    }
2743
2744    #[test]
2745    fn test_with_config_custom_memory_limit() {
2746        let config = Config::in_memory().with_memory_limit(64 * 1024 * 1024); // 64 MB
2747
2748        let db = GrafeoDB::with_config(config).unwrap();
2749        assert_eq!(db.config().memory_limit, Some(64 * 1024 * 1024));
2750        assert_eq!(db.node_count(), 0);
2751    }
2752
2753    #[cfg(feature = "metrics")]
2754    #[test]
2755    fn test_database_metrics_registry() {
2756        let db = GrafeoDB::new_in_memory();
2757
2758        // Perform some operations
2759        db.create_node(&["Person"]);
2760        db.create_node(&["Person"]);
2761
2762        // Check that metrics snapshot returns data
2763        let snap = db.metrics();
2764        // Session created counter should reflect at least 0 (metrics is initialized)
2765        assert_eq!(snap.query_count, 0); // No queries executed yet
2766    }
2767
2768    #[test]
2769    fn test_query_result_has_metrics() {
2770        // Verifies that query results include execution metrics
2771        let db = GrafeoDB::new_in_memory();
2772        db.create_node(&["Person"]);
2773        db.create_node(&["Person"]);
2774
2775        #[cfg(feature = "gql")]
2776        {
2777            let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
2778
2779            // Metrics should be populated
2780            assert!(result.execution_time_ms.is_some());
2781            assert!(result.rows_scanned.is_some());
2782            assert!(result.execution_time_ms.unwrap() >= 0.0);
2783            assert_eq!(result.rows_scanned.unwrap(), 2);
2784        }
2785    }
2786
2787    #[test]
2788    fn test_empty_query_result_metrics() {
2789        // Verifies metrics are correct for queries returning no results
2790        let db = GrafeoDB::new_in_memory();
2791        db.create_node(&["Person"]);
2792
2793        #[cfg(feature = "gql")]
2794        {
2795            // Query that matches nothing
2796            let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
2797
2798            assert!(result.execution_time_ms.is_some());
2799            assert!(result.rows_scanned.is_some());
2800            assert_eq!(result.rows_scanned.unwrap(), 0);
2801        }
2802    }
2803
2804    #[cfg(feature = "cdc")]
2805    mod cdc_integration {
2806        use super::*;
2807
2808        /// Helper: creates an in-memory database with CDC enabled.
2809        fn cdc_db() -> GrafeoDB {
2810            GrafeoDB::with_config(Config::in_memory().with_cdc()).unwrap()
2811        }
2812
2813        #[test]
2814        fn test_node_lifecycle_history() {
2815            let db = cdc_db();
2816
2817            // Create
2818            let id = db.create_node(&["Person"]);
2819            // Update
2820            db.set_node_property(id, "name", "Alix".into());
2821            db.set_node_property(id, "name", "Gus".into());
2822            // Delete
2823            db.delete_node(id);
2824
2825            let history = db.history(id).unwrap();
2826            assert_eq!(history.len(), 4); // create + 2 updates + delete
2827            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2828            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
2829            assert!(history[1].before.is_none()); // first set_node_property has no prior value
2830            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
2831            assert!(history[2].before.is_some()); // second update has prior "Alix"
2832            assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
2833        }
2834
2835        #[test]
2836        fn test_edge_lifecycle_history() {
2837            let db = cdc_db();
2838
2839            let alix = db.create_node(&["Person"]);
2840            let gus = db.create_node(&["Person"]);
2841            let edge = db.create_edge(alix, gus, "KNOWS");
2842            db.set_edge_property(edge, "since", 2024i64.into());
2843            db.delete_edge(edge);
2844
2845            let history = db.history(edge).unwrap();
2846            assert_eq!(history.len(), 3); // create + update + delete
2847            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2848            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
2849            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
2850        }
2851
2852        #[test]
2853        fn test_create_node_with_props_cdc() {
2854            let db = cdc_db();
2855
2856            let id = db.create_node_with_props(
2857                &["Person"],
2858                vec![
2859                    ("name", grafeo_common::types::Value::from("Alix")),
2860                    ("age", grafeo_common::types::Value::from(30i64)),
2861                ],
2862            );
2863
2864            let history = db.history(id).unwrap();
2865            assert_eq!(history.len(), 1);
2866            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2867            // Props should be captured
2868            let after = history[0].after.as_ref().unwrap();
2869            assert_eq!(after.len(), 2);
2870        }
2871
2872        #[test]
2873        fn test_changes_between() {
2874            let db = cdc_db();
2875
2876            let id1 = db.create_node(&["A"]);
2877            let _id2 = db.create_node(&["B"]);
2878            db.set_node_property(id1, "x", 1i64.into());
2879
2880            // All events should be at the same epoch (in-memory, epoch doesn't advance without tx)
2881            let changes = db
2882                .changes_between(
2883                    grafeo_common::types::EpochId(0),
2884                    grafeo_common::types::EpochId(u64::MAX),
2885                )
2886                .unwrap();
2887            assert_eq!(changes.len(), 3); // 2 creates + 1 update
2888        }
2889
2890        #[test]
2891        fn test_cdc_disabled_by_default() {
2892            let db = GrafeoDB::new_in_memory();
2893            assert!(!db.is_cdc_enabled());
2894
2895            let id = db.create_node(&["Person"]);
2896            db.set_node_property(id, "name", "Alix".into());
2897
2898            let history = db.history(id).unwrap();
2899            assert!(history.is_empty(), "CDC off by default: no events recorded");
2900        }
2901
2902        #[test]
2903        fn test_session_with_cdc_override_on() {
2904            // Database default is off, but session opts in
2905            let db = GrafeoDB::new_in_memory();
2906            let session = db.session_with_cdc(true);
2907            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
2908            // The CDC log should have events from the opted-in session
2909            let changes = db
2910                .changes_between(
2911                    grafeo_common::types::EpochId(0),
2912                    grafeo_common::types::EpochId(u64::MAX),
2913                )
2914                .unwrap();
2915            assert!(
2916                !changes.is_empty(),
2917                "session_with_cdc(true) should record events"
2918            );
2919        }
2920
2921        #[test]
2922        fn test_session_with_cdc_override_off() {
2923            // Database default is on, but session opts out
2924            let db = cdc_db();
2925            let session = db.session_with_cdc(false);
2926            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
2927            let changes = db
2928                .changes_between(
2929                    grafeo_common::types::EpochId(0),
2930                    grafeo_common::types::EpochId(u64::MAX),
2931                )
2932                .unwrap();
2933            assert!(
2934                changes.is_empty(),
2935                "session_with_cdc(false) should not record events"
2936            );
2937        }
2938
2939        #[test]
2940        fn test_set_cdc_enabled_runtime() {
2941            let db = GrafeoDB::new_in_memory();
2942            assert!(!db.is_cdc_enabled());
2943
2944            // Enable at runtime
2945            db.set_cdc_enabled(true);
2946            assert!(db.is_cdc_enabled());
2947
2948            let id = db.create_node(&["Person"]);
2949            let history = db.history(id).unwrap();
2950            assert_eq!(history.len(), 1, "CDC enabled at runtime records events");
2951
2952            // Disable again
2953            db.set_cdc_enabled(false);
2954            let id2 = db.create_node(&["Person"]);
2955            let history2 = db.history(id2).unwrap();
2956            assert!(
2957                history2.is_empty(),
2958                "CDC disabled at runtime stops recording"
2959            );
2960        }
2961    }
2962
2963    #[test]
2964    fn test_with_store_basic() {
2965        use grafeo_core::graph::lpg::LpgStore;
2966
2967        let store = Arc::new(LpgStore::new().unwrap());
2968        let n1 = store.create_node(&["Person"]);
2969        store.set_node_property(n1, "name", "Alix".into());
2970
2971        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
2972        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
2973
2974        let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
2975        assert_eq!(result.rows.len(), 1);
2976    }
2977
2978    #[test]
2979    fn test_with_store_session() {
2980        use grafeo_core::graph::lpg::LpgStore;
2981
2982        let store = Arc::new(LpgStore::new().unwrap());
2983        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
2984        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
2985
2986        let session = db.session();
2987        let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
2988        assert_eq!(result.rows.len(), 1);
2989    }
2990
2991    #[test]
2992    fn test_with_store_mutations() {
2993        use grafeo_core::graph::lpg::LpgStore;
2994
2995        let store = Arc::new(LpgStore::new().unwrap());
2996        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
2997        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
2998
2999        let mut session = db.session();
3000
3001        // Use an explicit transaction so INSERT and MATCH share the same
3002        // transaction context. With PENDING epochs, uncommitted versions are
3003        // only visible to the owning transaction.
3004        session.begin_transaction().unwrap();
3005        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3006
3007        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
3008        assert_eq!(result.rows.len(), 1);
3009
3010        session.commit().unwrap();
3011    }
3012
3013    // =========================================================================
3014    // QueryResult tests
3015    // =========================================================================
3016
3017    #[test]
3018    fn test_query_result_empty() {
3019        let result = QueryResult::empty();
3020        assert!(result.is_empty());
3021        assert_eq!(result.row_count(), 0);
3022        assert_eq!(result.column_count(), 0);
3023        assert!(result.execution_time_ms().is_none());
3024        assert!(result.rows_scanned().is_none());
3025        assert!(result.status_message.is_none());
3026    }
3027
3028    #[test]
3029    fn test_query_result_status() {
3030        let result = QueryResult::status("Created node type 'Person'");
3031        assert!(result.is_empty());
3032        assert_eq!(result.column_count(), 0);
3033        assert_eq!(
3034            result.status_message.as_deref(),
3035            Some("Created node type 'Person'")
3036        );
3037    }
3038
3039    #[test]
3040    fn test_query_result_new_with_columns() {
3041        let result = QueryResult::new(vec!["name".into(), "age".into()]);
3042        assert_eq!(result.column_count(), 2);
3043        assert_eq!(result.row_count(), 0);
3044        assert!(result.is_empty());
3045        // Column types should default to Any
3046        assert_eq!(
3047            result.column_types,
3048            vec![
3049                grafeo_common::types::LogicalType::Any,
3050                grafeo_common::types::LogicalType::Any
3051            ]
3052        );
3053    }
3054
3055    #[test]
3056    fn test_query_result_with_types() {
3057        use grafeo_common::types::LogicalType;
3058        let result = QueryResult::with_types(
3059            vec!["name".into(), "age".into()],
3060            vec![LogicalType::String, LogicalType::Int64],
3061        );
3062        assert_eq!(result.column_count(), 2);
3063        assert_eq!(result.column_types[0], LogicalType::String);
3064        assert_eq!(result.column_types[1], LogicalType::Int64);
3065    }
3066
3067    #[test]
3068    fn test_query_result_with_metrics() {
3069        let result = QueryResult::new(vec!["x".into()]).with_metrics(42.5, 100);
3070        assert_eq!(result.execution_time_ms(), Some(42.5));
3071        assert_eq!(result.rows_scanned(), Some(100));
3072    }
3073
3074    #[test]
3075    fn test_query_result_scalar_success() {
3076        use grafeo_common::types::Value;
3077        let mut result = QueryResult::new(vec!["count".into()]);
3078        result.rows.push(vec![Value::Int64(42)]);
3079
3080        let val: i64 = result.scalar().unwrap();
3081        assert_eq!(val, 42);
3082    }
3083
3084    #[test]
3085    fn test_query_result_scalar_wrong_shape() {
3086        use grafeo_common::types::Value;
3087        // Multiple rows
3088        let mut result = QueryResult::new(vec!["x".into()]);
3089        result.rows.push(vec![Value::Int64(1)]);
3090        result.rows.push(vec![Value::Int64(2)]);
3091        assert!(result.scalar::<i64>().is_err());
3092
3093        // Multiple columns
3094        let mut result2 = QueryResult::new(vec!["a".into(), "b".into()]);
3095        result2.rows.push(vec![Value::Int64(1), Value::Int64(2)]);
3096        assert!(result2.scalar::<i64>().is_err());
3097
3098        // Empty
3099        let result3 = QueryResult::new(vec!["x".into()]);
3100        assert!(result3.scalar::<i64>().is_err());
3101    }
3102
3103    #[test]
3104    fn test_query_result_iter() {
3105        use grafeo_common::types::Value;
3106        let mut result = QueryResult::new(vec!["x".into()]);
3107        result.rows.push(vec![Value::Int64(1)]);
3108        result.rows.push(vec![Value::Int64(2)]);
3109
3110        let collected: Vec<_> = result.iter().collect();
3111        assert_eq!(collected.len(), 2);
3112    }
3113
3114    #[test]
3115    fn test_query_result_display() {
3116        use grafeo_common::types::Value;
3117        let mut result = QueryResult::new(vec!["name".into()]);
3118        result.rows.push(vec![Value::from("Alix")]);
3119        let display = result.to_string();
3120        assert!(display.contains("name"));
3121        assert!(display.contains("Alix"));
3122    }
3123
3124    // =========================================================================
3125    // FromValue error paths
3126    // =========================================================================
3127
3128    #[test]
3129    fn test_from_value_i64_type_mismatch() {
3130        use grafeo_common::types::Value;
3131        let val = Value::from("not a number");
3132        assert!(i64::from_value(&val).is_err());
3133    }
3134
3135    #[test]
3136    fn test_from_value_f64_type_mismatch() {
3137        use grafeo_common::types::Value;
3138        let val = Value::from("not a float");
3139        assert!(f64::from_value(&val).is_err());
3140    }
3141
3142    #[test]
3143    fn test_from_value_string_type_mismatch() {
3144        use grafeo_common::types::Value;
3145        let val = Value::Int64(42);
3146        assert!(String::from_value(&val).is_err());
3147    }
3148
3149    #[test]
3150    fn test_from_value_bool_type_mismatch() {
3151        use grafeo_common::types::Value;
3152        let val = Value::Int64(1);
3153        assert!(bool::from_value(&val).is_err());
3154    }
3155
3156    #[test]
3157    fn test_from_value_all_success() {
3158        use grafeo_common::types::Value;
3159        assert_eq!(i64::from_value(&Value::Int64(99)).unwrap(), 99);
3160        assert!((f64::from_value(&Value::Float64(2.72)).unwrap() - 2.72).abs() < f64::EPSILON);
3161        assert_eq!(String::from_value(&Value::from("hello")).unwrap(), "hello");
3162        assert!(bool::from_value(&Value::Bool(true)).unwrap());
3163    }
3164
3165    // =========================================================================
3166    // GrafeoDB accessor tests
3167    // =========================================================================
3168
3169    #[test]
3170    fn test_database_is_read_only_false_by_default() {
3171        let db = GrafeoDB::new_in_memory();
3172        assert!(!db.is_read_only());
3173    }
3174
3175    #[test]
3176    fn test_database_graph_model() {
3177        let db = GrafeoDB::new_in_memory();
3178        assert_eq!(db.graph_model(), crate::config::GraphModel::Lpg);
3179    }
3180
3181    #[test]
3182    fn test_database_memory_limit_none_by_default() {
3183        let db = GrafeoDB::new_in_memory();
3184        assert!(db.memory_limit().is_none());
3185    }
3186
3187    #[test]
3188    fn test_database_memory_limit_custom() {
3189        let config = Config::in_memory().with_memory_limit(128 * 1024 * 1024);
3190        let db = GrafeoDB::with_config(config).unwrap();
3191        assert_eq!(db.memory_limit(), Some(128 * 1024 * 1024));
3192    }
3193
3194    #[test]
3195    fn test_database_adaptive_config() {
3196        let db = GrafeoDB::new_in_memory();
3197        let adaptive = db.adaptive_config();
3198        assert!(adaptive.enabled);
3199        assert!((adaptive.threshold - 3.0).abs() < f64::EPSILON);
3200    }
3201
3202    #[test]
3203    fn test_database_buffer_manager() {
3204        let db = GrafeoDB::new_in_memory();
3205        let _bm = db.buffer_manager();
3206        // Just verify it doesn't panic
3207    }
3208
3209    #[test]
3210    fn test_database_query_cache() {
3211        let db = GrafeoDB::new_in_memory();
3212        let _qc = db.query_cache();
3213    }
3214
3215    #[test]
3216    fn test_database_clear_plan_cache() {
3217        let db = GrafeoDB::new_in_memory();
3218        // Execute a query to populate the cache
3219        #[cfg(feature = "gql")]
3220        {
3221            let _ = db.execute("MATCH (n) RETURN count(n)");
3222        }
3223        db.clear_plan_cache();
3224        // No panic means success
3225    }
3226
3227    #[test]
3228    fn test_database_gc() {
3229        let db = GrafeoDB::new_in_memory();
3230        db.create_node(&["Person"]);
3231        db.gc();
3232        // Verify no panic, node still accessible
3233        assert_eq!(db.node_count(), 1);
3234    }
3235
3236    // =========================================================================
3237    // Named graph management
3238    // =========================================================================
3239
3240    #[test]
3241    fn test_create_and_list_graphs() {
3242        let db = GrafeoDB::new_in_memory();
3243        let created = db.create_graph("social").unwrap();
3244        assert!(created);
3245
3246        // Creating same graph again returns false
3247        let created_again = db.create_graph("social").unwrap();
3248        assert!(!created_again);
3249
3250        let names = db.list_graphs();
3251        assert!(names.contains(&"social".to_string()));
3252    }
3253
3254    #[test]
3255    fn test_drop_graph() {
3256        let db = GrafeoDB::new_in_memory();
3257        db.create_graph("temp").unwrap();
3258        assert!(db.drop_graph("temp"));
3259        assert!(!db.drop_graph("temp")); // Already dropped
3260    }
3261
3262    #[test]
3263    fn test_drop_graph_resets_current_graph() {
3264        let db = GrafeoDB::new_in_memory();
3265        db.create_graph("active").unwrap();
3266        db.set_current_graph(Some("active")).unwrap();
3267        assert_eq!(db.current_graph(), Some("active".to_string()));
3268
3269        db.drop_graph("active");
3270        assert_eq!(db.current_graph(), None);
3271    }
3272
3273    // =========================================================================
3274    // Current graph / schema context
3275    // =========================================================================
3276
3277    #[test]
3278    fn test_current_graph_default_none() {
3279        let db = GrafeoDB::new_in_memory();
3280        assert_eq!(db.current_graph(), None);
3281    }
3282
3283    #[test]
3284    fn test_set_current_graph_valid() {
3285        let db = GrafeoDB::new_in_memory();
3286        db.create_graph("social").unwrap();
3287        db.set_current_graph(Some("social")).unwrap();
3288        assert_eq!(db.current_graph(), Some("social".to_string()));
3289    }
3290
3291    #[test]
3292    fn test_set_current_graph_nonexistent() {
3293        let db = GrafeoDB::new_in_memory();
3294        let result = db.set_current_graph(Some("nonexistent"));
3295        assert!(result.is_err());
3296    }
3297
3298    #[test]
3299    fn test_set_current_graph_none_resets() {
3300        let db = GrafeoDB::new_in_memory();
3301        db.create_graph("social").unwrap();
3302        db.set_current_graph(Some("social")).unwrap();
3303        db.set_current_graph(None).unwrap();
3304        assert_eq!(db.current_graph(), None);
3305    }
3306
3307    #[test]
3308    fn test_set_current_graph_default_keyword() {
3309        let db = GrafeoDB::new_in_memory();
3310        // "default" is a special case that always succeeds
3311        db.set_current_graph(Some("default")).unwrap();
3312        assert_eq!(db.current_graph(), Some("default".to_string()));
3313    }
3314
3315    #[test]
3316    fn test_current_schema_default_none() {
3317        let db = GrafeoDB::new_in_memory();
3318        assert_eq!(db.current_schema(), None);
3319    }
3320
3321    #[test]
3322    fn test_set_current_schema_nonexistent() {
3323        let db = GrafeoDB::new_in_memory();
3324        let result = db.set_current_schema(Some("nonexistent"));
3325        assert!(result.is_err());
3326    }
3327
3328    #[test]
3329    fn test_set_current_schema_none_resets() {
3330        let db = GrafeoDB::new_in_memory();
3331        db.set_current_schema(None).unwrap();
3332        assert_eq!(db.current_schema(), None);
3333    }
3334
3335    // =========================================================================
3336    // graph_store / graph_store_mut
3337    // =========================================================================
3338
3339    #[test]
3340    fn test_graph_store_returns_lpg_by_default() {
3341        let db = GrafeoDB::new_in_memory();
3342        db.create_node(&["Person"]);
3343        let store = db.graph_store();
3344        assert_eq!(store.node_count(), 1);
3345    }
3346
3347    #[test]
3348    fn test_graph_store_mut_returns_some_by_default() {
3349        let db = GrafeoDB::new_in_memory();
3350        assert!(db.graph_store_mut().is_some());
3351    }
3352
3353    #[test]
3354    fn test_with_read_store() {
3355        use grafeo_core::graph::lpg::LpgStore;
3356
3357        let store = Arc::new(LpgStore::new().unwrap());
3358        store.create_node(&["Person"]);
3359
3360        let read_store = Arc::clone(&store) as Arc<dyn GraphStore>;
3361        let db = GrafeoDB::with_read_store(read_store, Config::in_memory()).unwrap();
3362
3363        assert!(db.is_read_only());
3364        assert!(db.graph_store_mut().is_none());
3365
3366        // Read queries should work
3367        let gs = db.graph_store();
3368        assert_eq!(gs.node_count(), 1);
3369    }
3370
3371    #[test]
3372    fn test_with_store_graph_store_methods() {
3373        use grafeo_core::graph::lpg::LpgStore;
3374
3375        let store = Arc::new(LpgStore::new().unwrap());
3376        store.create_node(&["Person"]);
3377
3378        let db = GrafeoDB::with_store(
3379            Arc::clone(&store) as Arc<dyn GraphStoreMut>,
3380            Config::in_memory(),
3381        )
3382        .unwrap();
3383
3384        assert!(!db.is_read_only());
3385        assert!(db.graph_store_mut().is_some());
3386        assert_eq!(db.graph_store().node_count(), 1);
3387    }
3388
3389    // =========================================================================
3390    // session_read_only
3391    // =========================================================================
3392
3393    #[test]
3394    fn test_session_read_only() {
3395        let db = GrafeoDB::new_in_memory();
3396        db.create_node(&["Person"]);
3397
3398        let session = db.session_read_only();
3399        // Read queries should work
3400        #[cfg(feature = "gql")]
3401        {
3402            let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
3403            assert_eq!(result.rows.len(), 1);
3404        }
3405    }
3406
3407    // =========================================================================
3408    // close on in-memory database
3409    // =========================================================================
3410
3411    #[test]
3412    fn test_close_in_memory_database() {
3413        let db = GrafeoDB::new_in_memory();
3414        db.create_node(&["Person"]);
3415        assert!(db.close().is_ok());
3416        // Second close should also be fine (idempotent)
3417        assert!(db.close().is_ok());
3418    }
3419
3420    // =========================================================================
3421    // with_config validation failure
3422    // =========================================================================
3423
3424    #[test]
3425    fn test_with_config_invalid_config_zero_threads() {
3426        let config = Config::in_memory().with_threads(0);
3427        let result = GrafeoDB::with_config(config);
3428        assert!(result.is_err());
3429    }
3430
3431    #[test]
3432    fn test_with_config_invalid_config_zero_memory_limit() {
3433        let config = Config::in_memory().with_memory_limit(0);
3434        let result = GrafeoDB::with_config(config);
3435        assert!(result.is_err());
3436    }
3437
3438    // =========================================================================
3439    // StorageFormat display (for config.rs coverage)
3440    // =========================================================================
3441
3442    #[test]
3443    fn test_storage_format_display() {
3444        use crate::config::StorageFormat;
3445        assert_eq!(StorageFormat::Auto.to_string(), "auto");
3446        assert_eq!(StorageFormat::WalDirectory.to_string(), "wal-directory");
3447        assert_eq!(StorageFormat::SingleFile.to_string(), "single-file");
3448    }
3449
3450    #[test]
3451    fn test_storage_format_default() {
3452        use crate::config::StorageFormat;
3453        assert_eq!(StorageFormat::default(), StorageFormat::Auto);
3454    }
3455
3456    #[test]
3457    fn test_config_with_storage_format() {
3458        use crate::config::StorageFormat;
3459        let config = Config::in_memory().with_storage_format(StorageFormat::SingleFile);
3460        assert_eq!(config.storage_format, StorageFormat::SingleFile);
3461    }
3462
3463    // =========================================================================
3464    // Config CDC
3465    // =========================================================================
3466
3467    #[test]
3468    fn test_config_with_cdc() {
3469        let config = Config::in_memory().with_cdc();
3470        assert!(config.cdc_enabled);
3471    }
3472
3473    #[test]
3474    fn test_config_cdc_default_false() {
3475        let config = Config::in_memory();
3476        assert!(!config.cdc_enabled);
3477    }
3478
3479    // =========================================================================
3480    // ConfigError as std::error::Error
3481    // =========================================================================
3482
3483    #[test]
3484    fn test_config_error_is_error_trait() {
3485        use crate::config::ConfigError;
3486        let err: Box<dyn std::error::Error> = Box::new(ConfigError::ZeroMemoryLimit);
3487        assert!(err.source().is_none());
3488    }
3489
3490    // =========================================================================
3491    // Metrics tests
3492    // =========================================================================
3493
3494    #[cfg(feature = "metrics")]
3495    #[test]
3496    fn test_metrics_prometheus_output() {
3497        let db = GrafeoDB::new_in_memory();
3498        let prom = db.metrics_prometheus();
3499        // Should contain at least some metric names
3500        assert!(!prom.is_empty());
3501    }
3502
3503    #[cfg(feature = "metrics")]
3504    #[test]
3505    fn test_reset_metrics() {
3506        let db = GrafeoDB::new_in_memory();
3507        // Execute something to generate metrics
3508        let _session = db.session();
3509        db.reset_metrics();
3510        let snap = db.metrics();
3511        assert_eq!(snap.query_count, 0);
3512    }
3513
3514    // =========================================================================
3515    // drop_graph on external store
3516    // =========================================================================
3517
3518    #[test]
3519    fn test_drop_graph_on_external_store() {
3520        use grafeo_core::graph::lpg::LpgStore;
3521
3522        let store = Arc::new(LpgStore::new().unwrap());
3523        let read_store = Arc::clone(&store) as Arc<dyn GraphStore>;
3524        let db = GrafeoDB::with_read_store(read_store, Config::in_memory()).unwrap();
3525
3526        // drop_graph with external store (no built-in store) returns false
3527        assert!(!db.drop_graph("anything"));
3528    }
3529}