Skip to main content

grafeo_engine/database/
mod.rs

1//! The main database struct and operations.
2//!
3//! Start here with [`GrafeoDB`] - it's your handle to everything.
4//!
5//! Operations are split across focused submodules:
6//! - `query` - Query execution (execute, execute_cypher, etc.)
7//! - `crud` - Node/edge CRUD operations
8//! - `index` - Property, vector, and text index management
9//! - `search` - Vector, text, and hybrid search
10//! - `embed` - Embedding model management
11//! - `persistence` - Save, load, snapshots, iteration
12//! - `admin` - Stats, introspection, diagnostics, CDC
13
14#[cfg(feature = "lpg")]
15mod admin;
16#[cfg(feature = "arrow-export")]
17pub mod arrow;
18#[cfg(all(feature = "async-storage", feature = "lpg"))]
19mod async_ops;
20#[cfg(all(feature = "async-storage", feature = "lpg"))]
21pub(crate) mod async_wal_store;
22#[cfg(all(feature = "wal", feature = "grafeo-file"))]
23pub mod backup;
24#[cfg(feature = "lpg")]
25pub(crate) mod catalog_section;
26#[cfg(feature = "cdc")]
27pub(crate) mod cdc_store;
28#[cfg(all(feature = "grafeo-file", feature = "lpg"))]
29mod checkpoint_timer;
30#[cfg(feature = "lpg")]
31mod crud;
32#[cfg(feature = "embed")]
33mod embed;
34#[cfg(feature = "grafeo-file")]
35pub(crate) mod flush;
36#[cfg(feature = "lpg")]
37mod import;
38#[cfg(feature = "lpg")]
39mod index;
40#[cfg(feature = "lpg")]
41mod persistence;
42mod query;
43#[cfg(feature = "triple-store")]
44mod rdf_ops;
45#[cfg(feature = "lpg")]
46mod search;
47pub(crate) mod section_consumer;
48#[cfg(all(feature = "wal", feature = "lpg"))]
49pub(crate) mod wal_store;
50
51use grafeo_common::{grafeo_error, grafeo_warn};
52#[cfg(feature = "wal")]
53use std::path::Path;
54use std::sync::Arc;
55use std::sync::atomic::AtomicUsize;
56
57use parking_lot::RwLock;
58
59use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
60use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind, Result};
61#[cfg(feature = "lpg")]
62use grafeo_core::graph::lpg::LpgStore;
63#[cfg(feature = "triple-store")]
64use grafeo_core::graph::rdf::RdfStore;
65use grafeo_core::graph::{GraphStore, GraphStoreMut};
66#[cfg(feature = "grafeo-file")]
67use grafeo_storage::file::GrafeoFileManager;
68#[cfg(all(feature = "wal", feature = "lpg"))]
69use grafeo_storage::wal::WalRecovery;
70#[cfg(feature = "wal")]
71use grafeo_storage::wal::{DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord};
72
73use crate::catalog::Catalog;
74use crate::config::Config;
75use crate::query::cache::QueryCache;
76use crate::session::Session;
77use crate::transaction::TransactionManager;
78
79/// Your handle to a Grafeo database.
80///
81/// Start here. Create one with [`new_in_memory()`](Self::new_in_memory) for
82/// quick experiments, or [`open()`](Self::open) for persistent storage.
83/// Then grab a [`session()`](Self::session) to start querying.
84///
85/// # Examples
86///
87/// ```
88/// use grafeo_engine::GrafeoDB;
89///
90/// // Quick in-memory database
91/// let db = GrafeoDB::new_in_memory();
92///
93/// // Add some data
94/// db.create_node(&["Person"]);
95///
96/// // Query it
97/// let session = db.session();
98/// let result = session.execute("MATCH (p:Person) RETURN p")?;
99/// # Ok::<(), grafeo_common::utils::error::Error>(())
100/// ```
101pub struct GrafeoDB {
102    /// Database configuration.
103    pub(super) config: Config,
104    /// The underlying graph store (None when using an external store).
105    #[cfg(feature = "lpg")]
106    pub(super) store: Option<Arc<LpgStore>>,
107    /// Schema and metadata catalog shared across sessions.
108    pub(super) catalog: Arc<Catalog>,
109    /// RDF triple store (if RDF feature is enabled).
110    #[cfg(feature = "triple-store")]
111    pub(super) rdf_store: Arc<RdfStore>,
112    /// Transaction manager.
113    pub(super) transaction_manager: Arc<TransactionManager>,
114    /// Unified buffer manager.
115    pub(super) buffer_manager: Arc<BufferManager>,
116    /// Write-ahead log manager (if durability is enabled).
117    #[cfg(feature = "wal")]
118    pub(super) wal: Option<Arc<LpgWal>>,
119    /// Shared WAL graph context tracker. Tracks which named graph was last
120    /// written to the WAL, so concurrent sessions can emit `SwitchGraph`
121    /// records only when the context actually changes.
122    #[cfg(feature = "wal")]
123    pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
124    /// Query cache for parsed and optimized plans.
125    pub(super) query_cache: Arc<QueryCache>,
126    /// Shared commit counter for auto-GC across sessions.
127    pub(super) commit_counter: Arc<AtomicUsize>,
128    /// Whether the database is open.
129    pub(super) is_open: RwLock<bool>,
130    /// Change data capture log for tracking mutations.
131    #[cfg(feature = "cdc")]
132    pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
133    /// Whether CDC is active for new sessions and direct CRUD (runtime-mutable).
134    #[cfg(feature = "cdc")]
135    cdc_enabled: std::sync::atomic::AtomicBool,
136    /// Registered embedding models for text-to-vector conversion.
137    #[cfg(feature = "embed")]
138    pub(super) embedding_models:
139        RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
140    /// Single-file database manager (when using `.grafeo` format).
141    #[cfg(feature = "grafeo-file")]
142    pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
143    /// Periodic checkpoint timer (when `checkpoint_interval` is configured).
144    /// Wrapped in Mutex because `close()` takes `&self` but needs to stop the timer.
145    #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
146    checkpoint_timer: parking_lot::Mutex<Option<checkpoint_timer::CheckpointTimer>>,
147    /// Shared registry of spilled vector storages.
148    /// Used by the search path to create `SpillableVectorAccessor` instances.
149    #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
150    vector_spill_storages: Option<
151        Arc<
152            parking_lot::RwLock<
153                std::collections::HashMap<String, Arc<grafeo_core::index::vector::MmapStorage>>,
154            >,
155        >,
156    >,
157    /// External read-only graph store (when using with_store() or with_read_store()).
158    /// When set, sessions route queries through this store instead of the built-in LpgStore.
159    pub(super) external_read_store: Option<Arc<dyn GraphStore>>,
160    /// External writable graph store (when using with_store()).
161    /// None for read-only databases created via with_read_store().
162    pub(super) external_write_store: Option<Arc<dyn GraphStoreMut>>,
163    /// Metrics registry shared across all sessions.
164    #[cfg(feature = "metrics")]
165    pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
166    /// Persistent graph context for one-shot `execute()` calls.
167    /// When set, each call to `session()` pre-configures the session to this graph.
168    /// Updated after every one-shot `execute()` to reflect `USE GRAPH` / `SESSION RESET`.
169    current_graph: RwLock<Option<String>>,
170    /// Persistent schema context for one-shot `execute()` calls.
171    /// When set, each call to `session()` pre-configures the session to this schema.
172    /// Updated after every one-shot `execute()` to reflect `SESSION SET SCHEMA` / `SESSION RESET`.
173    current_schema: RwLock<Option<String>>,
174    /// Whether this database is open in read-only mode.
175    /// When true, sessions automatically enforce read-only transactions.
176    read_only: bool,
177    /// Named graph projections (virtual subgraphs), shared with sessions.
178    projections:
179        Arc<RwLock<std::collections::HashMap<String, Arc<grafeo_core::graph::GraphProjection>>>>,
180    /// Layered store (compact base + mutable overlay), set after `compact()`.
181    #[cfg(all(feature = "compact-store", feature = "lpg"))]
182    layered_store: Option<Arc<grafeo_core::graph::compact::layered::LayeredStore>>,
183}
184
185impl GrafeoDB {
186    /// Returns a reference to the built-in LPG store.
187    ///
188    /// # Panics
189    ///
190    /// Panics if the database was created with [`with_store()`](Self::with_store) or
191    /// [`with_read_store()`](Self::with_read_store), which use an external store
192    /// instead of the built-in LPG store.
193    #[cfg(feature = "lpg")]
194    fn lpg_store(&self) -> &Arc<LpgStore> {
195        self.store.as_ref().expect(
196            "no built-in LpgStore: this GrafeoDB was created with an external store \
197             (with_store / with_read_store). Use session() or graph_store() instead.",
198        )
199    }
200
201    /// Returns whether CDC is active (runtime check).
202    #[cfg(feature = "cdc")]
203    #[inline]
204    pub(super) fn cdc_active(&self) -> bool {
205        self.cdc_enabled.load(std::sync::atomic::Ordering::Relaxed)
206    }
207
208    /// Creates an in-memory database, fast to create, gone when dropped.
209    ///
210    /// Use this for tests, experiments, or when you don't need persistence.
211    /// For data that survives restarts, use [`open()`](Self::open) instead.
212    ///
213    /// # Panics
214    ///
215    /// Panics if the internal arena allocator cannot be initialized (out of memory).
216    /// Use [`with_config()`](Self::with_config) for a fallible alternative.
217    ///
218    /// # Examples
219    ///
220    /// ```
221    /// use grafeo_engine::GrafeoDB;
222    ///
223    /// let db = GrafeoDB::new_in_memory();
224    /// let session = db.session();
225    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
226    /// # Ok::<(), grafeo_common::utils::error::Error>(())
227    /// ```
228    #[must_use]
229    pub fn new_in_memory() -> Self {
230        Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
231    }
232
233    /// Opens a database at the given path, creating it if it doesn't exist.
234    ///
235    /// If you've used this path before, Grafeo recovers your data from the
236    /// write-ahead log automatically. First open on a new path creates an
237    /// empty database.
238    ///
239    /// # Errors
240    ///
241    /// Returns an error if the path isn't writable or recovery fails.
242    ///
243    /// # Examples
244    ///
245    /// ```no_run
246    /// use grafeo_engine::GrafeoDB;
247    ///
248    /// let db = GrafeoDB::open("./my_social_network")?;
249    /// # Ok::<(), grafeo_common::utils::error::Error>(())
250    /// ```
251    #[cfg(feature = "wal")]
252    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
253        Self::with_config(Config::persistent(path.as_ref()))
254    }
255
256    /// Opens an existing database in read-only mode.
257    ///
258    /// Uses a shared file lock, so multiple processes can read the same
259    /// `.grafeo` file concurrently. The database loads the last checkpoint
260    /// snapshot but does **not** replay the WAL or allow mutations.
261    ///
262    /// Currently only supports the single-file (`.grafeo`) format.
263    ///
264    /// # Errors
265    ///
266    /// Returns an error if the file doesn't exist or can't be read.
267    ///
268    /// # Examples
269    ///
270    /// ```no_run
271    /// use grafeo_engine::GrafeoDB;
272    ///
273    /// let db = GrafeoDB::open_read_only("./my_graph.grafeo")?;
274    /// let session = db.session();
275    /// let result = session.execute("MATCH (n) RETURN n LIMIT 10")?;
276    /// // Mutations will return an error:
277    /// // session.execute("INSERT (:Person)") => Err(ReadOnly)
278    /// # Ok::<(), grafeo_common::utils::error::Error>(())
279    /// ```
280    #[cfg(feature = "grafeo-file")]
281    pub fn open_read_only(path: impl AsRef<std::path::Path>) -> Result<Self> {
282        Self::with_config(Config::read_only(path.as_ref()))
283    }
284
285    /// Creates a database with custom configuration.
286    ///
287    /// Use this when you need fine-grained control over memory limits,
288    /// thread counts, or persistence settings. For most cases,
289    /// [`new_in_memory()`](Self::new_in_memory) or [`open()`](Self::open)
290    /// are simpler.
291    ///
292    /// # Errors
293    ///
294    /// Returns an error if the database can't be created or recovery fails.
295    ///
296    /// # Examples
297    ///
298    /// ```
299    /// use grafeo_engine::{GrafeoDB, Config};
300    ///
301    /// // In-memory with a 512MB limit
302    /// let config = Config::in_memory()
303    ///     .with_memory_limit(512 * 1024 * 1024);
304    ///
305    /// let db = GrafeoDB::with_config(config)?;
306    /// # Ok::<(), grafeo_common::utils::error::Error>(())
307    /// ```
308    pub fn with_config(config: Config) -> Result<Self> {
309        // Validate configuration before proceeding
310        config
311            .validate()
312            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
313
314        #[cfg(feature = "lpg")]
315        let store = Arc::new(LpgStore::new()?);
316        #[cfg(feature = "triple-store")]
317        let rdf_store = Arc::new(RdfStore::new());
318        let transaction_manager = Arc::new(TransactionManager::new());
319
320        // Create buffer manager with configured limits
321        let buffer_config = BufferManagerConfig {
322            budget: config.memory_limit.unwrap_or_else(|| {
323                // reason: product of system RAM and 0.75 is always a valid positive usize
324                #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
325                let b = (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize;
326                b
327            }),
328            spill_path: config.spill_path.clone().or_else(|| {
329                config.path.as_ref().and_then(|p| {
330                    let parent = p.parent()?;
331                    let name = p.file_name()?.to_str()?;
332                    Some(parent.join(format!("{name}.spill")))
333                })
334            }),
335            ..BufferManagerConfig::default()
336        };
337        let buffer_manager = BufferManager::new(buffer_config);
338
339        // Create catalog early so WAL replay can restore schema definitions
340        let catalog = Arc::new(Catalog::new());
341
342        let is_read_only = config.access_mode == crate::config::AccessMode::ReadOnly;
343
344        // --- Single-file format (.grafeo) ---
345        #[cfg(feature = "grafeo-file")]
346        let file_manager: Option<Arc<GrafeoFileManager>> = if is_read_only {
347            // Read-only mode: open with shared lock, load snapshot, skip WAL
348            if let Some(ref db_path) = config.path {
349                if db_path.exists() && db_path.is_file() {
350                    let fm = GrafeoFileManager::open_read_only(db_path)?;
351                    // Try v2 section-based format first
352                    #[cfg(feature = "lpg")]
353                    if fm.read_section_directory()?.is_some() {
354                        Self::load_from_sections(
355                            &fm,
356                            &store,
357                            &catalog,
358                            #[cfg(feature = "triple-store")]
359                            &rdf_store,
360                        )?;
361                    } else {
362                        // Fall back to v1 blob format
363                        let snapshot_data = fm.read_snapshot()?;
364                        if !snapshot_data.is_empty() {
365                            Self::apply_snapshot_data(
366                                &store,
367                                &catalog,
368                                #[cfg(feature = "triple-store")]
369                                &rdf_store,
370                                &snapshot_data,
371                            )?;
372                        }
373                    }
374                    Some(Arc::new(fm))
375                } else {
376                    return Err(grafeo_common::utils::error::Error::Internal(format!(
377                        "read-only open requires an existing .grafeo file: {}",
378                        db_path.display()
379                    )));
380                }
381            } else {
382                return Err(grafeo_common::utils::error::Error::Internal(
383                    "read-only mode requires a database path".to_string(),
384                ));
385            }
386        } else if let Some(ref db_path) = config.path {
387            // Initialize the file manager whenever single-file format is selected,
388            // regardless of whether WAL is enabled. Without this, a database opened
389            // with wal_enabled:false + StorageFormat::SingleFile would produce no
390            // output at all (the file manager was previously gated behind wal_enabled).
391            if Self::should_use_single_file(db_path, config.storage_format) {
392                let fm = if db_path.exists() && db_path.is_file() {
393                    GrafeoFileManager::open(db_path)?
394                } else if !db_path.exists() {
395                    GrafeoFileManager::create(db_path)?
396                } else {
397                    // Path exists but is not a file (directory, etc.)
398                    return Err(grafeo_common::utils::error::Error::Internal(format!(
399                        "path exists but is not a file: {}",
400                        db_path.display()
401                    )));
402                };
403
404                // Load data: try v2 section-based format, fall back to v1 blob
405                #[cfg(feature = "lpg")]
406                if fm.read_section_directory()?.is_some() {
407                    Self::load_from_sections(
408                        &fm,
409                        &store,
410                        &catalog,
411                        #[cfg(feature = "triple-store")]
412                        &rdf_store,
413                    )?;
414                } else {
415                    let snapshot_data = fm.read_snapshot()?;
416                    if !snapshot_data.is_empty() {
417                        Self::apply_snapshot_data(
418                            &store,
419                            &catalog,
420                            #[cfg(feature = "triple-store")]
421                            &rdf_store,
422                            &snapshot_data,
423                        )?;
424                    }
425                }
426
427                // Recover sidecar WAL if WAL is enabled and a sidecar exists
428                #[cfg(all(feature = "wal", feature = "lpg"))]
429                if config.wal_enabled && fm.has_sidecar_wal() {
430                    let recovery = WalRecovery::new(fm.sidecar_wal_path());
431                    let records = recovery.recover()?;
432                    Self::apply_wal_records(
433                        &store,
434                        &catalog,
435                        #[cfg(feature = "triple-store")]
436                        &rdf_store,
437                        &records,
438                    )?;
439                }
440
441                Some(Arc::new(fm))
442            } else {
443                None
444            }
445        } else {
446            None
447        };
448
449        // Determine whether to use the WAL directory path (legacy) or sidecar
450        // Read-only mode skips WAL entirely (no recovery, no creation).
451        #[cfg(feature = "wal")]
452        let wal = if is_read_only {
453            None
454        } else if config.wal_enabled {
455            if let Some(ref db_path) = config.path {
456                // When using single-file format, the WAL is a sidecar directory
457                #[cfg(feature = "grafeo-file")]
458                let wal_path = if let Some(ref fm) = file_manager {
459                    let p = fm.sidecar_wal_path();
460                    std::fs::create_dir_all(&p)?;
461                    p
462                } else {
463                    // Legacy: WAL inside the database directory
464                    std::fs::create_dir_all(db_path)?;
465                    db_path.join("wal")
466                };
467
468                #[cfg(not(feature = "grafeo-file"))]
469                let wal_path = {
470                    std::fs::create_dir_all(db_path)?;
471                    db_path.join("wal")
472                };
473
474                // For legacy WAL directory format, check if WAL exists and recover
475                #[cfg(all(feature = "lpg", feature = "grafeo-file"))]
476                let is_single_file = file_manager.is_some();
477                #[cfg(all(feature = "lpg", not(feature = "grafeo-file")))]
478                let is_single_file = false;
479
480                #[cfg(feature = "lpg")]
481                if !is_single_file && wal_path.exists() {
482                    let recovery = WalRecovery::new(&wal_path);
483                    let records = recovery.recover()?;
484                    Self::apply_wal_records(
485                        &store,
486                        &catalog,
487                        #[cfg(feature = "triple-store")]
488                        &rdf_store,
489                        &records,
490                    )?;
491                }
492
493                // Open/create WAL manager with configured durability
494                let wal_durability = match config.wal_durability {
495                    crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
496                    crate::config::DurabilityMode::Batch {
497                        max_delay_ms,
498                        max_records,
499                    } => WalDurabilityMode::Batch {
500                        max_delay_ms,
501                        max_records,
502                    },
503                    crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
504                        WalDurabilityMode::Adaptive { target_interval_ms }
505                    }
506                    crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
507                };
508                let wal_config = WalConfig {
509                    durability: wal_durability,
510                    ..WalConfig::default()
511                };
512                let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
513                Some(Arc::new(wal_manager))
514            } else {
515                None
516            }
517        } else {
518            None
519        };
520
521        // Create query cache with default capacity (1000 queries)
522        let query_cache = Arc::new(QueryCache::default());
523
524        // After all snapshot/WAL recovery, sync TransactionManager epoch
525        // with the store so queries use the correct viewing epoch.
526        #[cfg(all(feature = "temporal", feature = "lpg"))]
527        transaction_manager.sync_epoch(store.current_epoch());
528
529        #[cfg(feature = "cdc")]
530        let cdc_enabled_val = config.cdc_enabled;
531        #[cfg(feature = "cdc")]
532        let cdc_retention = config.cdc_retention.clone();
533
534        // Clone Arcs for the checkpoint timer before moving originals into the struct.
535        // The timer captures its own references and runs in a background thread.
536        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
537        let checkpoint_interval = config.checkpoint_interval;
538        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
539        let timer_store = Arc::clone(&store);
540        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
541        let timer_catalog = Arc::clone(&catalog);
542        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
543        let timer_tm = Arc::clone(&transaction_manager);
544        #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "triple-store"))]
545        let timer_rdf = Arc::clone(&rdf_store);
546        #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "wal"))]
547        let timer_wal = wal.clone();
548
549        let mut db = Self {
550            config,
551            #[cfg(feature = "lpg")]
552            store: Some(store),
553            catalog,
554            #[cfg(feature = "triple-store")]
555            rdf_store,
556            transaction_manager,
557            buffer_manager,
558            #[cfg(feature = "wal")]
559            wal,
560            #[cfg(feature = "wal")]
561            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
562            query_cache,
563            commit_counter: Arc::new(AtomicUsize::new(0)),
564            is_open: RwLock::new(true),
565            #[cfg(feature = "cdc")]
566            cdc_log: Arc::new(crate::cdc::CdcLog::with_retention(cdc_retention)),
567            #[cfg(feature = "cdc")]
568            cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
569            #[cfg(feature = "embed")]
570            embedding_models: RwLock::new(hashbrown::HashMap::new()),
571            #[cfg(feature = "grafeo-file")]
572            file_manager,
573            #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
574            checkpoint_timer: parking_lot::Mutex::new(None),
575            #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
576            vector_spill_storages: None,
577            external_read_store: None,
578            external_write_store: None,
579            #[cfg(feature = "metrics")]
580            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
581            current_graph: RwLock::new(None),
582            current_schema: RwLock::new(None),
583            read_only: is_read_only,
584            projections: Arc::new(RwLock::new(std::collections::HashMap::new())),
585            #[cfg(all(feature = "compact-store", feature = "lpg"))]
586            layered_store: None,
587        };
588
589        // Register storage sections as memory consumers for pressure tracking
590        db.register_section_consumers();
591
592        // Start periodic checkpoint timer if configured
593        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
594        if let (Some(interval), Some(fm)) = (checkpoint_interval, &db.file_manager)
595            && !is_read_only
596        {
597            *db.checkpoint_timer.lock() = Some(checkpoint_timer::CheckpointTimer::start(
598                interval,
599                Arc::clone(fm),
600                timer_store,
601                timer_catalog,
602                timer_tm,
603                #[cfg(feature = "triple-store")]
604                timer_rdf,
605                #[cfg(feature = "wal")]
606                timer_wal,
607            ));
608        }
609
610        // Discover existing spill files from a previous session.
611        // If vectors were spilled before close, the spill files persist on disk
612        // and need to be re-mapped so search can read from them.
613        #[cfg(all(
614            feature = "lpg",
615            feature = "vector-index",
616            feature = "mmap",
617            not(feature = "temporal")
618        ))]
619        db.restore_spill_files();
620
621        // If VectorStore is configured as ForceDisk, immediately spill embeddings.
622        // This must happen after register_section_consumers() which creates the consumer.
623        #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
624        if db
625            .config
626            .section_configs
627            .get(&grafeo_common::storage::SectionType::VectorStore)
628            .is_some_and(|c| c.tier == grafeo_common::storage::TierOverride::ForceDisk)
629        {
630            db.buffer_manager.spill_all();
631        }
632
633        Ok(db)
634    }
635
636    /// Creates a database backed by a custom [`GraphStoreMut`] implementation.
637    ///
638    /// The external store handles all data persistence. WAL, CDC, and index
639    /// management are the responsibility of the store implementation.
640    ///
641    /// Query execution (all 6 languages, optimizer, planner) works through the
642    /// provided store. Admin operations (schema introspection, persistence,
643    /// vector/text indexes) are not available on external stores.
644    ///
645    /// # Examples
646    ///
647    /// ```no_run
648    /// use std::sync::Arc;
649    /// use grafeo_engine::{GrafeoDB, Config};
650    /// use grafeo_core::graph::GraphStoreMut;
651    ///
652    /// fn example(store: Arc<dyn GraphStoreMut>) -> grafeo_common::utils::error::Result<()> {
653    ///     let db = GrafeoDB::with_store(store, Config::in_memory())?;
654    ///     let result = db.execute("MATCH (n) RETURN count(n)")?;
655    ///     Ok(())
656    /// }
657    /// ```
658    ///
659    /// # Errors
660    ///
661    /// Returns an error if config validation fails.
662    ///
663    /// [`GraphStoreMut`]: grafeo_core::graph::GraphStoreMut
664    pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
665        config
666            .validate()
667            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
668
669        let transaction_manager = Arc::new(TransactionManager::new());
670
671        let buffer_config = BufferManagerConfig {
672            budget: config.memory_limit.unwrap_or_else(|| {
673                // reason: product of system RAM and 0.75 is always a valid positive usize
674                #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
675                let b = (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize;
676                b
677            }),
678            spill_path: None,
679            ..BufferManagerConfig::default()
680        };
681        let buffer_manager = BufferManager::new(buffer_config);
682
683        let query_cache = Arc::new(QueryCache::default());
684
685        #[cfg(feature = "cdc")]
686        let cdc_enabled_val = config.cdc_enabled;
687
688        Ok(Self {
689            config,
690            #[cfg(feature = "lpg")]
691            store: None,
692            catalog: Arc::new(Catalog::new()),
693            #[cfg(feature = "triple-store")]
694            rdf_store: Arc::new(RdfStore::new()),
695            transaction_manager,
696            buffer_manager,
697            #[cfg(feature = "wal")]
698            wal: None,
699            #[cfg(feature = "wal")]
700            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
701            query_cache,
702            commit_counter: Arc::new(AtomicUsize::new(0)),
703            is_open: RwLock::new(true),
704            #[cfg(feature = "cdc")]
705            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
706            #[cfg(feature = "cdc")]
707            cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
708            #[cfg(feature = "embed")]
709            embedding_models: RwLock::new(hashbrown::HashMap::new()),
710            #[cfg(feature = "grafeo-file")]
711            file_manager: None,
712            #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
713            checkpoint_timer: parking_lot::Mutex::new(None),
714            #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
715            vector_spill_storages: None,
716            external_read_store: Some(Arc::clone(&store) as Arc<dyn GraphStore>),
717            external_write_store: Some(store),
718            #[cfg(feature = "metrics")]
719            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
720            current_graph: RwLock::new(None),
721            current_schema: RwLock::new(None),
722            read_only: false,
723            projections: Arc::new(RwLock::new(std::collections::HashMap::new())),
724            #[cfg(all(feature = "compact-store", feature = "lpg"))]
725            layered_store: None,
726        })
727    }
728
729    /// Creates a database backed by a read-only [`GraphStore`].
730    ///
731    /// The database is set to read-only mode. Write queries (CREATE, SET,
732    /// DELETE) will return `TransactionError::ReadOnly`.
733    ///
734    /// # Examples
735    ///
736    /// ```no_run
737    /// use std::sync::Arc;
738    /// use grafeo_engine::{GrafeoDB, Config};
739    /// use grafeo_core::graph::GraphStore;
740    ///
741    /// fn example(store: Arc<dyn GraphStore>) -> grafeo_common::utils::error::Result<()> {
742    ///     let db = GrafeoDB::with_read_store(store, Config::in_memory())?;
743    ///     let result = db.execute("MATCH (n) RETURN count(n)")?;
744    ///     Ok(())
745    /// }
746    /// ```
747    ///
748    /// # Errors
749    ///
750    /// Returns an error if config validation fails.
751    ///
752    /// [`GraphStore`]: grafeo_core::graph::GraphStore
753    pub fn with_read_store(store: Arc<dyn GraphStore>, config: Config) -> Result<Self> {
754        config
755            .validate()
756            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
757
758        let transaction_manager = Arc::new(TransactionManager::new());
759
760        let buffer_config = BufferManagerConfig {
761            budget: config.memory_limit.unwrap_or_else(|| {
762                // reason: product of system RAM and 0.75 is always a valid positive usize
763                #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
764                let b = (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize;
765                b
766            }),
767            spill_path: None,
768            ..BufferManagerConfig::default()
769        };
770        let buffer_manager = BufferManager::new(buffer_config);
771
772        let query_cache = Arc::new(QueryCache::default());
773
774        #[cfg(feature = "cdc")]
775        let cdc_enabled_val = config.cdc_enabled;
776
777        Ok(Self {
778            config,
779            #[cfg(feature = "lpg")]
780            store: None,
781            catalog: Arc::new(Catalog::new()),
782            #[cfg(feature = "triple-store")]
783            rdf_store: Arc::new(RdfStore::new()),
784            transaction_manager,
785            buffer_manager,
786            #[cfg(feature = "wal")]
787            wal: None,
788            #[cfg(feature = "wal")]
789            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
790            query_cache,
791            commit_counter: Arc::new(AtomicUsize::new(0)),
792            is_open: RwLock::new(true),
793            #[cfg(feature = "cdc")]
794            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
795            #[cfg(feature = "cdc")]
796            cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
797            #[cfg(feature = "embed")]
798            embedding_models: RwLock::new(hashbrown::HashMap::new()),
799            #[cfg(feature = "grafeo-file")]
800            file_manager: None,
801            #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
802            checkpoint_timer: parking_lot::Mutex::new(None),
803            #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
804            vector_spill_storages: None,
805            external_read_store: Some(store),
806            external_write_store: None,
807            #[cfg(feature = "metrics")]
808            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
809            current_graph: RwLock::new(None),
810            current_schema: RwLock::new(None),
811            read_only: true,
812            projections: Arc::new(RwLock::new(std::collections::HashMap::new())),
813            #[cfg(all(feature = "compact-store", feature = "lpg"))]
814            layered_store: None,
815        })
816    }
817
818    /// Compacts the database into a two-layer store: columnar base + mutable overlay.
819    ///
820    /// Takes a snapshot of all nodes and edges from the current store, builds
821    /// a columnar `CompactStore` with CSR adjacency as the read-only base,
822    /// and creates a fresh `LpgStore` overlay for future mutations. The
823    /// original store is dropped to free memory.
824    ///
825    /// Unlike the pre-0.5.39 behavior, the database remains writable after
826    /// compaction: new writes go to the overlay. Call [`recompact()`](Self::recompact)
827    /// to merge the overlay back into the columnar base periodically.
828    ///
829    /// # Errors
830    ///
831    /// Returns an error if the conversion fails (e.g. more than 32,767
832    /// distinct labels or edge types).
833    ///
834    /// [`CompactStore`]: grafeo_core::graph::compact::CompactStore
835    #[cfg(all(feature = "compact-store", feature = "lpg"))]
836    pub fn compact(&mut self) -> Result<()> {
837        use grafeo_core::graph::compact::from_graph_store_preserving_ids;
838        use grafeo_core::graph::compact::layered::LayeredStore;
839
840        let current_store = self.graph_store();
841
842        // Determine max node/edge IDs for overlay seeding.
843        let max_node_id = if let Some(ref store) = self.store {
844            store.next_node_id().saturating_sub(1)
845        } else {
846            current_store.node_ids().last().map_or(0, |id| id.as_u64())
847        };
848        let max_edge_id = if let Some(ref store) = self.store {
849            store.next_edge_id().saturating_sub(1)
850        } else {
851            // Scan edges to find max ID.
852            let mut max_eid = 0u64;
853            for nid in current_store.node_ids() {
854                for (_, eid) in
855                    current_store.edges_from(nid, grafeo_core::graph::Direction::Outgoing)
856                {
857                    max_eid = max_eid.max(eid.as_u64());
858                }
859            }
860            max_eid
861        };
862
863        let compact = from_graph_store_preserving_ids(current_store.as_ref())
864            .map_err(|e| Error::Internal(e.to_string()))?;
865
866        let layered = Arc::new(
867            LayeredStore::new(compact, max_node_id, max_edge_id)
868                .map_err(|e| Error::Internal(e.to_string()))?,
869        );
870
871        // Sync the overlay's epoch with the TransactionManager so MVCC
872        // visibility works correctly for nodes created after compact().
873        let current_epoch = self.transaction_manager.current_epoch();
874        layered.overlay_store().sync_epoch(current_epoch);
875
876        self.external_read_store = Some(Arc::clone(&layered) as Arc<dyn GraphStore>);
877        self.external_write_store = Some(Arc::clone(&layered) as Arc<dyn GraphStoreMut>);
878        self.layered_store = Some(layered);
879        self.store = None;
880        self.read_only = false;
881        self.query_cache = Arc::new(QueryCache::default());
882        self.projections.write().clear();
883
884        Ok(())
885    }
886
887    /// Merges the overlay back into the columnar base.
888    ///
889    /// Reads the combined view (base + overlay), builds a fresh `CompactStore`,
890    /// and replaces the `LayeredStore` with one backed by the merged base and
891    /// an empty overlay.
892    ///
893    /// # Errors
894    ///
895    /// Returns an error if the database was not previously compacted, or if
896    /// the merge fails.
897    #[cfg(all(feature = "compact-store", feature = "lpg"))]
898    pub fn recompact(&mut self) -> Result<()> {
899        use grafeo_core::graph::compact::from_graph_store_preserving_ids;
900        use grafeo_core::graph::compact::layered::LayeredStore;
901
902        let layered = self
903            .layered_store
904            .as_ref()
905            .ok_or_else(|| Error::Internal("recompact() requires a prior compact()".into()))?;
906
907        // Read the combined view.
908        let combined: Arc<dyn GraphStore> = Arc::clone(layered) as Arc<dyn GraphStore>;
909
910        // Max IDs from the overlay's allocators.
911        let max_node_id = layered.overlay_store().next_node_id().saturating_sub(1);
912        let max_edge_id = layered.overlay_store().next_edge_id().saturating_sub(1);
913
914        let fresh_compact = from_graph_store_preserving_ids(combined.as_ref())
915            .map_err(|e| Error::Internal(e.to_string()))?;
916
917        let new_layered = Arc::new(
918            LayeredStore::new(fresh_compact, max_node_id, max_edge_id)
919                .map_err(|e| Error::Internal(e.to_string()))?,
920        );
921
922        // Sync overlay epoch.
923        let current_epoch = self.transaction_manager.current_epoch();
924        new_layered.overlay_store().sync_epoch(current_epoch);
925
926        self.external_read_store = Some(Arc::clone(&new_layered) as Arc<dyn GraphStore>);
927        self.external_write_store = Some(Arc::clone(&new_layered) as Arc<dyn GraphStoreMut>);
928        self.layered_store = Some(new_layered);
929        self.query_cache = Arc::new(QueryCache::default());
930
931        Ok(())
932    }
933
934    /// Applies WAL records to restore the database state.
935    ///
936    /// Data mutation records are routed through a graph cursor that tracks
937    /// `SwitchGraph` context markers, replaying mutations into the correct
938    /// named graph (or the default graph when cursor is `None`).
939    #[cfg(all(feature = "wal", feature = "lpg"))]
940    fn apply_wal_records(
941        store: &Arc<LpgStore>,
942        catalog: &Catalog,
943        #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
944        records: &[WalRecord],
945    ) -> Result<()> {
946        use crate::catalog::{
947            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
948        };
949        use grafeo_common::utils::error::Error;
950
951        // Graph cursor: tracks which named graph receives data mutations.
952        // `None` means the default graph.
953        let mut current_graph: Option<String> = None;
954        let mut target_store: Arc<LpgStore> = Arc::clone(store);
955
956        for record in records {
957            match record {
958                // --- Named graph lifecycle ---
959                WalRecord::CreateNamedGraph { name } => {
960                    let _ = store.create_graph(name);
961                }
962                WalRecord::DropNamedGraph { name } => {
963                    store.drop_graph(name);
964                    // Reset cursor if the dropped graph was active
965                    if current_graph.as_deref() == Some(name.as_str()) {
966                        current_graph = None;
967                        target_store = Arc::clone(store);
968                    }
969                }
970                WalRecord::SwitchGraph { name } => {
971                    current_graph.clone_from(name);
972                    target_store = match &current_graph {
973                        None => Arc::clone(store),
974                        Some(graph_name) => store
975                            .graph_or_create(graph_name)
976                            .map_err(|e| Error::Internal(e.to_string()))?,
977                    };
978                }
979
980                // --- Data mutations: routed through target_store ---
981                WalRecord::CreateNode { id, labels } => {
982                    let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
983                    target_store.create_node_with_id(*id, &label_refs)?;
984                }
985                WalRecord::DeleteNode { id } => {
986                    target_store.delete_node(*id);
987                }
988                WalRecord::CreateEdge {
989                    id,
990                    src,
991                    dst,
992                    edge_type,
993                } => {
994                    target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
995                }
996                WalRecord::DeleteEdge { id } => {
997                    target_store.delete_edge(*id);
998                }
999                WalRecord::SetNodeProperty { id, key, value } => {
1000                    target_store.set_node_property(*id, key, value.clone());
1001                }
1002                WalRecord::SetEdgeProperty { id, key, value } => {
1003                    target_store.set_edge_property(*id, key, value.clone());
1004                }
1005                WalRecord::AddNodeLabel { id, label } => {
1006                    target_store.add_label(*id, label);
1007                }
1008                WalRecord::RemoveNodeLabel { id, label } => {
1009                    target_store.remove_label(*id, label);
1010                }
1011                WalRecord::RemoveNodeProperty { id, key } => {
1012                    target_store.remove_node_property(*id, key);
1013                }
1014                WalRecord::RemoveEdgeProperty { id, key } => {
1015                    target_store.remove_edge_property(*id, key);
1016                }
1017
1018                // --- Schema DDL replay (always on root catalog) ---
1019                WalRecord::CreateNodeType {
1020                    name,
1021                    properties,
1022                    constraints,
1023                } => {
1024                    let def = NodeTypeDefinition {
1025                        name: name.clone(),
1026                        properties: properties
1027                            .iter()
1028                            .map(|(n, t, nullable)| TypedProperty {
1029                                name: n.clone(),
1030                                data_type: PropertyDataType::from_type_name(t),
1031                                nullable: *nullable,
1032                                default_value: None,
1033                            })
1034                            .collect(),
1035                        constraints: constraints
1036                            .iter()
1037                            .map(|(kind, props)| match kind.as_str() {
1038                                "unique" => TypeConstraint::Unique(props.clone()),
1039                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
1040                                "not_null" if !props.is_empty() => {
1041                                    TypeConstraint::NotNull(props[0].clone())
1042                                }
1043                                _ => TypeConstraint::Unique(props.clone()),
1044                            })
1045                            .collect(),
1046                        parent_types: Vec::new(),
1047                    };
1048                    let _ = catalog.register_node_type(def);
1049                }
1050                WalRecord::DropNodeType { name } => {
1051                    let _ = catalog.drop_node_type(name);
1052                }
1053                WalRecord::CreateEdgeType {
1054                    name,
1055                    properties,
1056                    constraints,
1057                } => {
1058                    let def = EdgeTypeDefinition {
1059                        name: name.clone(),
1060                        properties: properties
1061                            .iter()
1062                            .map(|(n, t, nullable)| TypedProperty {
1063                                name: n.clone(),
1064                                data_type: PropertyDataType::from_type_name(t),
1065                                nullable: *nullable,
1066                                default_value: None,
1067                            })
1068                            .collect(),
1069                        constraints: constraints
1070                            .iter()
1071                            .map(|(kind, props)| match kind.as_str() {
1072                                "unique" => TypeConstraint::Unique(props.clone()),
1073                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
1074                                "not_null" if !props.is_empty() => {
1075                                    TypeConstraint::NotNull(props[0].clone())
1076                                }
1077                                _ => TypeConstraint::Unique(props.clone()),
1078                            })
1079                            .collect(),
1080                        source_node_types: Vec::new(),
1081                        target_node_types: Vec::new(),
1082                    };
1083                    let _ = catalog.register_edge_type_def(def);
1084                }
1085                WalRecord::DropEdgeType { name } => {
1086                    let _ = catalog.drop_edge_type_def(name);
1087                }
1088                WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
1089                    // Index recreation is handled by the store on startup
1090                    // (indexes are rebuilt from data, not WAL)
1091                }
1092                WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
1093                    // Constraint definitions are part of type definitions
1094                    // and replayed via CreateNodeType/CreateEdgeType
1095                }
1096                WalRecord::CreateGraphType {
1097                    name,
1098                    node_types,
1099                    edge_types,
1100                    open,
1101                } => {
1102                    use crate::catalog::GraphTypeDefinition;
1103                    let def = GraphTypeDefinition {
1104                        name: name.clone(),
1105                        allowed_node_types: node_types.clone(),
1106                        allowed_edge_types: edge_types.clone(),
1107                        open: *open,
1108                    };
1109                    let _ = catalog.register_graph_type(def);
1110                }
1111                WalRecord::DropGraphType { name } => {
1112                    let _ = catalog.drop_graph_type(name);
1113                }
1114                WalRecord::CreateSchema { name } => {
1115                    let _ = catalog.register_schema_namespace(name.clone());
1116                }
1117                WalRecord::DropSchema { name } => {
1118                    let _ = catalog.drop_schema_namespace(name);
1119                }
1120
1121                WalRecord::AlterNodeType { name, alterations } => {
1122                    for (action, prop_name, type_name, nullable) in alterations {
1123                        match action.as_str() {
1124                            "add" => {
1125                                let prop = TypedProperty {
1126                                    name: prop_name.clone(),
1127                                    data_type: PropertyDataType::from_type_name(type_name),
1128                                    nullable: *nullable,
1129                                    default_value: None,
1130                                };
1131                                let _ = catalog.alter_node_type_add_property(name, prop);
1132                            }
1133                            "drop" => {
1134                                let _ = catalog.alter_node_type_drop_property(name, prop_name);
1135                            }
1136                            _ => {}
1137                        }
1138                    }
1139                }
1140                WalRecord::AlterEdgeType { name, alterations } => {
1141                    for (action, prop_name, type_name, nullable) in alterations {
1142                        match action.as_str() {
1143                            "add" => {
1144                                let prop = TypedProperty {
1145                                    name: prop_name.clone(),
1146                                    data_type: PropertyDataType::from_type_name(type_name),
1147                                    nullable: *nullable,
1148                                    default_value: None,
1149                                };
1150                                let _ = catalog.alter_edge_type_add_property(name, prop);
1151                            }
1152                            "drop" => {
1153                                let _ = catalog.alter_edge_type_drop_property(name, prop_name);
1154                            }
1155                            _ => {}
1156                        }
1157                    }
1158                }
1159                WalRecord::AlterGraphType { name, alterations } => {
1160                    for (action, type_name) in alterations {
1161                        match action.as_str() {
1162                            "add_node" => {
1163                                let _ =
1164                                    catalog.alter_graph_type_add_node_type(name, type_name.clone());
1165                            }
1166                            "drop_node" => {
1167                                let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
1168                            }
1169                            "add_edge" => {
1170                                let _ =
1171                                    catalog.alter_graph_type_add_edge_type(name, type_name.clone());
1172                            }
1173                            "drop_edge" => {
1174                                let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
1175                            }
1176                            _ => {}
1177                        }
1178                    }
1179                }
1180
1181                WalRecord::CreateProcedure {
1182                    name,
1183                    params,
1184                    returns,
1185                    body,
1186                } => {
1187                    use crate::catalog::ProcedureDefinition;
1188                    let def = ProcedureDefinition {
1189                        name: name.clone(),
1190                        params: params.clone(),
1191                        returns: returns.clone(),
1192                        body: body.clone(),
1193                    };
1194                    let _ = catalog.register_procedure(def);
1195                }
1196                WalRecord::DropProcedure { name } => {
1197                    let _ = catalog.drop_procedure(name);
1198                }
1199
1200                // --- RDF triple replay ---
1201                #[cfg(feature = "triple-store")]
1202                WalRecord::InsertRdfTriple { .. }
1203                | WalRecord::DeleteRdfTriple { .. }
1204                | WalRecord::ClearRdfGraph { .. }
1205                | WalRecord::CreateRdfGraph { .. }
1206                | WalRecord::DropRdfGraph { .. } => {
1207                    rdf_ops::replay_rdf_wal_record(rdf_store, record);
1208                }
1209                #[cfg(not(feature = "triple-store"))]
1210                WalRecord::InsertRdfTriple { .. }
1211                | WalRecord::DeleteRdfTriple { .. }
1212                | WalRecord::ClearRdfGraph { .. }
1213                | WalRecord::CreateRdfGraph { .. }
1214                | WalRecord::DropRdfGraph { .. } => {}
1215
1216                WalRecord::TransactionCommit { .. } => {
1217                    // In temporal mode, advance the store epoch on each committed
1218                    // transaction so that subsequent property/label operations
1219                    // are recorded at the correct epoch in their VersionLogs.
1220                    #[cfg(feature = "temporal")]
1221                    {
1222                        target_store.new_epoch();
1223                    }
1224                }
1225                WalRecord::TransactionAbort { .. } | WalRecord::Checkpoint { .. } => {
1226                    // Transaction control records don't need replay action
1227                    // (recovery already filtered to only committed transactions)
1228                }
1229                WalRecord::EpochAdvance { .. } => {
1230                    // Metadata record: no store mutation needed.
1231                    // Used by incremental backup and point-in-time recovery.
1232                }
1233            }
1234        }
1235        Ok(())
1236    }
1237
1238    // =========================================================================
1239    // Single-file format helpers
1240    // =========================================================================
1241
1242    /// Returns `true` if the given path should use single-file format.
1243    #[cfg(feature = "grafeo-file")]
1244    fn should_use_single_file(
1245        path: &std::path::Path,
1246        configured: crate::config::StorageFormat,
1247    ) -> bool {
1248        use crate::config::StorageFormat;
1249        match configured {
1250            StorageFormat::SingleFile => true,
1251            StorageFormat::WalDirectory => false,
1252            StorageFormat::Auto => {
1253                // Existing file: check magic bytes
1254                if path.is_file() {
1255                    if let Ok(mut f) = std::fs::File::open(path) {
1256                        use std::io::Read;
1257                        let mut magic = [0u8; 4];
1258                        if f.read_exact(&mut magic).is_ok() && magic == grafeo_storage::file::MAGIC
1259                        {
1260                            return true;
1261                        }
1262                    }
1263                    return false;
1264                }
1265                // Existing directory: legacy format
1266                if path.is_dir() {
1267                    return false;
1268                }
1269                // New path: check extension
1270                path.extension().is_some_and(|ext| ext == "grafeo")
1271            }
1272        }
1273    }
1274
1275    /// Applies snapshot data (from a `.grafeo` file) to restore the store and catalog.
1276    ///
1277    /// Supports both v1 (monolithic blob) and v2 (section-based) formats.
1278    #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
1279    fn apply_snapshot_data(
1280        store: &Arc<LpgStore>,
1281        catalog: &Arc<crate::catalog::Catalog>,
1282        #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
1283        data: &[u8],
1284    ) -> Result<()> {
1285        // v1 blob format: pass through to legacy loader
1286        persistence::load_snapshot_into_store(
1287            store,
1288            catalog,
1289            #[cfg(feature = "triple-store")]
1290            rdf_store,
1291            data,
1292        )
1293    }
1294
1295    /// Loads from a section-based `.grafeo` file (v2 format).
1296    ///
1297    /// Reads the section directory, then deserializes each section independently.
1298    #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
1299    fn load_from_sections(
1300        fm: &GrafeoFileManager,
1301        store: &Arc<LpgStore>,
1302        catalog: &Arc<crate::catalog::Catalog>,
1303        #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
1304    ) -> Result<()> {
1305        use grafeo_common::storage::{Section, SectionType};
1306
1307        let dir = fm.read_section_directory()?.ok_or_else(|| {
1308            grafeo_common::utils::error::Error::Internal(
1309                "expected v2 section directory but found none".to_string(),
1310            )
1311        })?;
1312
1313        // Load catalog section first (schema defs needed before data)
1314        if let Some(entry) = dir.find(SectionType::Catalog) {
1315            let data = fm.read_section_data(entry)?;
1316            let tm = Arc::new(crate::transaction::TransactionManager::new());
1317            let mut section = catalog_section::CatalogSection::new(
1318                Arc::clone(catalog),
1319                Arc::clone(store),
1320                move || tm.current_epoch().as_u64(),
1321            );
1322            section.deserialize(&data)?;
1323        }
1324
1325        // Load LPG store
1326        if let Some(entry) = dir.find(SectionType::LpgStore) {
1327            let data = fm.read_section_data(entry)?;
1328            let mut section = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
1329            section.deserialize(&data)?;
1330        }
1331
1332        // Load RDF store
1333        #[cfg(feature = "triple-store")]
1334        if let Some(entry) = dir.find(SectionType::RdfStore) {
1335            let data = fm.read_section_data(entry)?;
1336            let mut section = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(rdf_store));
1337            section.deserialize(&data)?;
1338        }
1339
1340        // Restore Ring Index (if persisted)
1341        #[cfg(feature = "ring-index")]
1342        if let Some(entry) = dir.find(SectionType::RdfRing) {
1343            let data = fm.read_section_data(entry)?;
1344            let mut section = grafeo_core::index::ring::RdfRingSection::new(Arc::clone(rdf_store));
1345            section.deserialize(&data)?;
1346        }
1347
1348        // Restore HNSW topology (if vector indexes exist in both catalog and section)
1349        #[cfg(feature = "vector-index")]
1350        if let Some(entry) = dir.find(SectionType::VectorStore) {
1351            let data = fm.read_section_data(entry)?;
1352            let indexes = store.vector_index_entries();
1353            if !indexes.is_empty() {
1354                let mut section = grafeo_core::index::vector::VectorStoreSection::new(indexes);
1355                section.deserialize(&data)?;
1356            }
1357        }
1358
1359        // Restore BM25 postings (if text indexes exist in both catalog and section)
1360        #[cfg(feature = "text-index")]
1361        if let Some(entry) = dir.find(SectionType::TextIndex) {
1362            let data = fm.read_section_data(entry)?;
1363            let indexes = store.text_index_entries();
1364            if !indexes.is_empty() {
1365                let mut section = grafeo_core::index::text::TextIndexSection::new(indexes);
1366                section.deserialize(&data)?;
1367            }
1368        }
1369
1370        Ok(())
1371    }
1372
1373    // =========================================================================
1374    // Session & Configuration
1375    // =========================================================================
1376
1377    /// Opens a new session for running queries.
1378    ///
1379    /// Sessions are cheap to create: spin up as many as you need. Each
1380    /// gets its own transaction context, so concurrent sessions won't
1381    /// block each other on reads.
1382    ///
1383    /// # Panics
1384    ///
1385    /// Panics if the database was configured with an external graph store and
1386    /// the internal arena allocator cannot be initialized (out of memory).
1387    ///
1388    /// # Examples
1389    ///
1390    /// ```
1391    /// use grafeo_engine::GrafeoDB;
1392    ///
1393    /// let db = GrafeoDB::new_in_memory();
1394    /// let session = db.session();
1395    ///
1396    /// // Run queries through the session
1397    /// let result = session.execute("MATCH (n) RETURN count(n)")?;
1398    /// # Ok::<(), grafeo_common::utils::error::Error>(())
1399    /// ```
1400    #[must_use]
1401    pub fn session(&self) -> Session {
1402        self.create_session_inner(None)
1403    }
1404
1405    /// Creates a session scoped to the given identity.
1406    ///
1407    /// The identity determines what operations the session is allowed to
1408    /// perform. A [`Role::ReadOnly`](crate::auth::Role::ReadOnly) identity
1409    /// creates a read-only session; a [`Role::ReadWrite`](crate::auth::Role::ReadWrite)
1410    /// identity allows data mutations but not schema DDL; a
1411    /// [`Role::Admin`](crate::auth::Role::Admin) identity has full access.
1412    ///
1413    /// # Examples
1414    ///
1415    /// ```
1416    /// use grafeo_engine::{GrafeoDB, auth::{Identity, Role}};
1417    ///
1418    /// let db = GrafeoDB::new_in_memory();
1419    /// let identity = Identity::new("app-service", [Role::ReadWrite]);
1420    /// let session = db.session_with_identity(identity);
1421    /// ```
1422    #[must_use]
1423    pub fn session_with_identity(&self, identity: crate::auth::Identity) -> Session {
1424        let force_read_only = !identity.can_write();
1425        self.create_session_inner_full(None, force_read_only, identity)
1426    }
1427
1428    /// Creates a session scoped to a single role.
1429    ///
1430    /// Convenience shorthand for
1431    /// `session_with_identity(Identity::new("anonymous", [role]))`.
1432    ///
1433    /// # Examples
1434    ///
1435    /// ```
1436    /// use grafeo_engine::{GrafeoDB, auth::Role};
1437    ///
1438    /// let db = GrafeoDB::new_in_memory();
1439    /// let reader = db.session_with_role(Role::ReadOnly);
1440    /// ```
1441    #[must_use]
1442    pub fn session_with_role(&self, role: crate::auth::Role) -> Session {
1443        self.session_with_identity(crate::auth::Identity::new("anonymous", [role]))
1444    }
1445
1446    /// Creates a session with an explicit CDC override.
1447    ///
1448    /// When `cdc_enabled` is `true`, mutations in this session are tracked
1449    /// regardless of the database default. When `false`, mutations are not
1450    /// tracked regardless of the database default.
1451    ///
1452    /// # Examples
1453    ///
1454    /// ```
1455    /// use grafeo_engine::GrafeoDB;
1456    ///
1457    /// let db = GrafeoDB::new_in_memory();
1458    ///
1459    /// // Opt in to CDC for just this session
1460    /// let tracked = db.session_with_cdc(true);
1461    /// tracked.execute("INSERT (:Person {name: 'Alix'})")?;
1462    /// # Ok::<(), grafeo_common::utils::error::Error>(())
1463    /// ```
1464    #[cfg(feature = "cdc")]
1465    #[must_use]
1466    pub fn session_with_cdc(&self, cdc_enabled: bool) -> Session {
1467        self.create_session_inner(Some(cdc_enabled))
1468    }
1469
1470    /// Creates a read-only session regardless of the database's access mode.
1471    ///
1472    /// Mutations executed through this session will fail with
1473    /// `TransactionError::ReadOnly`. Useful for replication replicas where
1474    /// the database itself must remain writable (for applying CDC changes)
1475    /// but client-facing queries must be read-only.
1476    ///
1477    /// **Deprecated**: Use `session_with_role(Role::ReadOnly)` instead.
1478    #[deprecated(
1479        since = "0.5.36",
1480        note = "use session_with_role(Role::ReadOnly) instead"
1481    )]
1482    #[must_use]
1483    pub fn session_read_only(&self) -> Session {
1484        self.session_with_role(crate::auth::Role::ReadOnly)
1485    }
1486
1487    /// Shared session creation logic.
1488    ///
1489    /// `cdc_override` overrides the database-wide `cdc_enabled` default when
1490    /// `Some`. `None` falls back to the database default.
1491    #[allow(unused_variables)] // cdc_override unused when cdc feature is off
1492    fn create_session_inner(&self, cdc_override: Option<bool>) -> Session {
1493        self.create_session_inner_full(cdc_override, false, crate::auth::Identity::anonymous())
1494    }
1495
1496    /// Shared session creation with all overrides.
1497    #[allow(unused_variables)]
1498    fn create_session_inner_full(
1499        &self,
1500        cdc_override: Option<bool>,
1501        force_read_only: bool,
1502        identity: crate::auth::Identity,
1503    ) -> Session {
1504        let session_cfg = || crate::session::SessionConfig {
1505            transaction_manager: Arc::clone(&self.transaction_manager),
1506            query_cache: Arc::clone(&self.query_cache),
1507            catalog: Arc::clone(&self.catalog),
1508            adaptive_config: self.config.adaptive.clone(),
1509            factorized_execution: self.config.factorized_execution,
1510            graph_model: self.config.graph_model,
1511            query_timeout: self.config.query_timeout,
1512            max_property_size: self.config.max_property_size,
1513            buffer_manager: Some(Arc::clone(&self.buffer_manager)),
1514            commit_counter: Arc::clone(&self.commit_counter),
1515            gc_interval: self.config.gc_interval,
1516            read_only: self.read_only || force_read_only,
1517            identity: identity.clone(),
1518            #[cfg(feature = "lpg")]
1519            projections: Arc::clone(&self.projections),
1520        };
1521
1522        // Layered store: use the overlay LpgStore as the session's internal store
1523        // so MVCC operations (begin_tx, commit, visibility) work correctly.
1524        #[cfg(all(feature = "compact-store", feature = "lpg"))]
1525        if let Some(ref layered) = self.layered_store {
1526            let overlay = Arc::clone(layered.overlay_store());
1527            let layered_arc = Arc::clone(layered);
1528            let mut session = Session::with_adaptive(overlay, session_cfg());
1529            // Override graph_store/graph_store_mut to use the LayeredStore
1530            // (which merges base + overlay), not just the overlay alone.
1531            session.override_stores(
1532                Arc::clone(&layered_arc) as Arc<dyn GraphStore>,
1533                Some(layered_arc as Arc<dyn GraphStoreMut>),
1534            );
1535            return session;
1536        }
1537
1538        if let Some(ref ext_read) = self.external_read_store {
1539            return Session::with_external_store(
1540                Arc::clone(ext_read),
1541                self.external_write_store.as_ref().map(Arc::clone),
1542                session_cfg(),
1543            )
1544            .expect("arena allocation for external store session");
1545        }
1546
1547        #[cfg(all(feature = "lpg", feature = "triple-store"))]
1548        let mut session = Session::with_rdf_store_and_adaptive(
1549            Arc::clone(self.lpg_store()),
1550            Arc::clone(&self.rdf_store),
1551            session_cfg(),
1552        );
1553        #[cfg(all(feature = "lpg", not(feature = "triple-store")))]
1554        let mut session = Session::with_adaptive(Arc::clone(self.lpg_store()), session_cfg());
1555        #[cfg(not(feature = "lpg"))]
1556        let mut session =
1557            Session::with_external_store(self.graph_store(), self.graph_store_mut(), session_cfg())
1558                .expect("session creation for non-lpg build");
1559
1560        #[cfg(all(feature = "wal", feature = "lpg"))]
1561        if let Some(ref wal) = self.wal {
1562            session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
1563        }
1564
1565        #[cfg(feature = "cdc")]
1566        {
1567            let should_enable = cdc_override.unwrap_or_else(|| self.cdc_active());
1568            if should_enable {
1569                session.set_cdc_log(Arc::clone(&self.cdc_log));
1570            }
1571        }
1572
1573        #[cfg(feature = "metrics")]
1574        {
1575            if let Some(ref m) = self.metrics {
1576                session.set_metrics(Arc::clone(m));
1577                m.session_created
1578                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1579                m.session_active
1580                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1581            }
1582        }
1583
1584        // Propagate persistent graph context to the new session
1585        if let Some(ref graph) = *self.current_graph.read() {
1586            session.use_graph(graph);
1587        }
1588
1589        // Propagate persistent schema context to the new session
1590        if let Some(ref schema) = *self.current_schema.read() {
1591            session.set_schema(schema);
1592        }
1593
1594        // Suppress unused_mut when cdc/wal are disabled
1595        let _ = &mut session;
1596
1597        session
1598    }
1599
1600    /// Returns the current graph name, if any.
1601    ///
1602    /// This is the persistent graph context used by one-shot `execute()` calls.
1603    /// It is updated whenever `execute()` encounters `USE GRAPH`, `SESSION SET GRAPH`,
1604    /// or `SESSION RESET`.
1605    #[must_use]
1606    pub fn current_graph(&self) -> Option<String> {
1607        self.current_graph.read().clone()
1608    }
1609
1610    /// Sets the current graph context for subsequent one-shot `execute()` calls.
1611    ///
1612    /// This is equivalent to running `USE GRAPH <name>` but without creating a session.
1613    /// Pass `None` to reset to the default graph.
1614    ///
1615    /// # Errors
1616    ///
1617    /// Returns an error if the named graph does not exist.
1618    pub fn set_current_graph(&self, name: Option<&str>) -> Result<()> {
1619        #[cfg(feature = "lpg")]
1620        if let Some(name) = name
1621            && !name.eq_ignore_ascii_case("default")
1622            && let Some(store) = &self.store
1623            && store.graph(name).is_none()
1624        {
1625            return Err(Error::Query(QueryError::new(
1626                QueryErrorKind::Semantic,
1627                format!("Graph '{name}' does not exist"),
1628            )));
1629        }
1630        *self.current_graph.write() = name.map(ToString::to_string);
1631        Ok(())
1632    }
1633
1634    /// Returns the current schema name, if any.
1635    ///
1636    /// This is the persistent schema context used by one-shot `execute()` calls.
1637    /// It is updated whenever `execute()` encounters `SESSION SET SCHEMA` or `SESSION RESET`.
1638    #[must_use]
1639    pub fn current_schema(&self) -> Option<String> {
1640        self.current_schema.read().clone()
1641    }
1642
1643    /// Sets the current schema context for subsequent one-shot `execute()` calls.
1644    ///
1645    /// This is equivalent to running `SESSION SET SCHEMA <name>` but without creating
1646    /// a session. Pass `None` to clear the schema context.
1647    ///
1648    /// # Errors
1649    ///
1650    /// Returns an error if the named schema does not exist.
1651    pub fn set_current_schema(&self, name: Option<&str>) -> Result<()> {
1652        if let Some(name) = name
1653            && !self.catalog.schema_exists(name)
1654        {
1655            return Err(Error::Query(QueryError::new(
1656                QueryErrorKind::Semantic,
1657                format!("Schema '{name}' does not exist"),
1658            )));
1659        }
1660        *self.current_schema.write() = name.map(ToString::to_string);
1661        Ok(())
1662    }
1663
1664    /// Returns the adaptive execution configuration.
1665    #[must_use]
1666    pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
1667        &self.config.adaptive
1668    }
1669
1670    /// Returns `true` if this database was opened in read-only mode.
1671    #[must_use]
1672    pub fn is_read_only(&self) -> bool {
1673        self.read_only
1674    }
1675
1676    /// Returns the configuration.
1677    #[must_use]
1678    pub fn config(&self) -> &Config {
1679        &self.config
1680    }
1681
1682    /// Returns the graph data model of this database.
1683    #[must_use]
1684    pub fn graph_model(&self) -> crate::config::GraphModel {
1685        self.config.graph_model
1686    }
1687
1688    /// Returns the configured memory limit in bytes, if any.
1689    #[must_use]
1690    pub fn memory_limit(&self) -> Option<usize> {
1691        self.config.memory_limit
1692    }
1693
1694    /// Returns a point-in-time snapshot of all metrics.
1695    ///
1696    /// If the `metrics` feature is disabled or the registry is not
1697    /// initialized, returns a default (all-zero) snapshot.
1698    #[cfg(feature = "metrics")]
1699    #[must_use]
1700    pub fn metrics(&self) -> crate::metrics::MetricsSnapshot {
1701        let mut snapshot = self
1702            .metrics
1703            .as_ref()
1704            .map_or_else(crate::metrics::MetricsSnapshot::default, |m| m.snapshot());
1705
1706        // Augment with cache stats from the query cache (not tracked in the registry)
1707        let cache_stats = self.query_cache.stats();
1708        snapshot.cache_hits = cache_stats.parsed_hits + cache_stats.optimized_hits;
1709        snapshot.cache_misses = cache_stats.parsed_misses + cache_stats.optimized_misses;
1710        snapshot.cache_size = cache_stats.parsed_size + cache_stats.optimized_size;
1711        snapshot.cache_invalidations = cache_stats.invalidations;
1712
1713        snapshot
1714    }
1715
1716    /// Returns all metrics in Prometheus text exposition format.
1717    ///
1718    /// The output is ready to serve from an HTTP `/metrics` endpoint.
1719    #[cfg(feature = "metrics")]
1720    #[must_use]
1721    pub fn metrics_prometheus(&self) -> String {
1722        self.metrics
1723            .as_ref()
1724            .map_or_else(String::new, |m| m.to_prometheus())
1725    }
1726
1727    /// Resets all metrics counters and histograms to zero.
1728    #[cfg(feature = "metrics")]
1729    pub fn reset_metrics(&self) {
1730        if let Some(ref m) = self.metrics {
1731            m.reset();
1732        }
1733        self.query_cache.reset_stats();
1734    }
1735
1736    /// Returns the underlying (default) store.
1737    ///
1738    /// This provides direct access to the LPG store for algorithm implementations
1739    /// and admin operations (index management, schema introspection, MVCC internals).
1740    ///
1741    /// For code that only needs read/write graph operations, prefer
1742    /// [`graph_store()`](Self::graph_store) which returns the trait interface.
1743    #[cfg(feature = "lpg")]
1744    #[must_use]
1745    pub fn store(&self) -> &Arc<LpgStore> {
1746        self.lpg_store()
1747    }
1748
1749    // === Named Graph Management ===
1750
1751    /// Creates a named graph. Returns `true` if created, `false` if it already exists.
1752    ///
1753    /// # Errors
1754    ///
1755    /// Returns an error if arena allocation fails.
1756    #[cfg(feature = "lpg")]
1757    pub fn create_graph(&self, name: &str) -> Result<bool> {
1758        Ok(self.lpg_store().create_graph(name)?)
1759    }
1760
1761    /// Drops a named graph. Returns `true` if dropped, `false` if it did not exist.
1762    ///
1763    /// If the dropped graph was the active graph context, the context is reset
1764    /// to the default graph.
1765    #[cfg(feature = "lpg")]
1766    pub fn drop_graph(&self, name: &str) -> bool {
1767        let Some(store) = &self.store else {
1768            return false;
1769        };
1770        let dropped = store.drop_graph(name);
1771        if dropped {
1772            let mut current = self.current_graph.write();
1773            if current
1774                .as_deref()
1775                .is_some_and(|g| g.eq_ignore_ascii_case(name))
1776            {
1777                *current = None;
1778            }
1779        }
1780        dropped
1781    }
1782
1783    /// Returns all named graph names.
1784    #[cfg(feature = "lpg")]
1785    #[must_use]
1786    pub fn list_graphs(&self) -> Vec<String> {
1787        self.lpg_store().graph_names()
1788    }
1789
1790    // === Graph Projections ===
1791
1792    /// Creates a named graph projection (virtual subgraph).
1793    ///
1794    /// The projection filters the graph store to only include nodes with the
1795    /// specified labels and edges with the specified types. Returns `true` if
1796    /// created, `false` if a projection with that name already exists.
1797    ///
1798    /// # Examples
1799    ///
1800    /// ```
1801    /// use grafeo_engine::GrafeoDB;
1802    /// use grafeo_core::graph::ProjectionSpec;
1803    ///
1804    /// let db = GrafeoDB::new_in_memory();
1805    /// let spec = ProjectionSpec::new()
1806    ///     .with_node_labels(["Person", "City"])
1807    ///     .with_edge_types(["LIVES_IN"]);
1808    /// assert!(db.create_projection("social", spec));
1809    /// ```
1810    pub fn create_projection(
1811        &self,
1812        name: impl Into<String>,
1813        spec: grafeo_core::graph::ProjectionSpec,
1814    ) -> bool {
1815        use grafeo_core::graph::GraphProjection;
1816        use std::collections::hash_map::Entry;
1817
1818        let store = self.graph_store();
1819        let projection = Arc::new(GraphProjection::new(store, spec));
1820        let mut projections = self.projections.write();
1821        match projections.entry(name.into()) {
1822            Entry::Occupied(_) => false,
1823            Entry::Vacant(e) => {
1824                e.insert(projection);
1825                true
1826            }
1827        }
1828    }
1829
1830    /// Drops a named graph projection. Returns `true` if it existed.
1831    pub fn drop_projection(&self, name: &str) -> bool {
1832        self.projections.write().remove(name).is_some()
1833    }
1834
1835    /// Returns the names of all graph projections.
1836    #[must_use]
1837    pub fn list_projections(&self) -> Vec<String> {
1838        self.projections.read().keys().cloned().collect()
1839    }
1840
1841    /// Returns a named projection as a [`GraphStore`] trait object.
1842    #[must_use]
1843    pub fn projection(&self, name: &str) -> Option<Arc<dyn GraphStore>> {
1844        self.projections
1845            .read()
1846            .get(name)
1847            .map(|p| Arc::clone(p) as Arc<dyn GraphStore>)
1848    }
1849
1850    /// Returns the graph store as a trait object.
1851    ///
1852    /// Returns a read-only trait object for the active graph store.
1853    ///
1854    /// This provides the [`GraphStore`] interface for code that only needs
1855    /// read operations. For write access, use [`graph_store_mut()`](Self::graph_store_mut).
1856    ///
1857    /// [`GraphStore`]: grafeo_core::graph::GraphStore
1858    #[must_use]
1859    pub fn graph_store(&self) -> Arc<dyn GraphStore> {
1860        if let Some(ref ext_read) = self.external_read_store {
1861            Arc::clone(ext_read)
1862        } else {
1863            #[cfg(feature = "lpg")]
1864            {
1865                Arc::clone(self.lpg_store()) as Arc<dyn GraphStore>
1866            }
1867            #[cfg(not(feature = "lpg"))]
1868            unreachable!("no graph store available: enable the `lpg` feature or use with_store()")
1869        }
1870    }
1871
1872    /// Returns the writable graph store, if available.
1873    ///
1874    /// Returns `None` for read-only databases created via
1875    /// [`with_read_store()`](Self::with_read_store).
1876    #[must_use]
1877    pub fn graph_store_mut(&self) -> Option<Arc<dyn GraphStoreMut>> {
1878        if self.external_read_store.is_some() {
1879            self.external_write_store.as_ref().map(Arc::clone)
1880        } else {
1881            #[cfg(feature = "lpg")]
1882            {
1883                Some(Arc::clone(self.lpg_store()) as Arc<dyn GraphStoreMut>)
1884            }
1885            #[cfg(not(feature = "lpg"))]
1886            {
1887                None
1888            }
1889        }
1890    }
1891
1892    /// Garbage collects old MVCC versions that are no longer visible.
1893    ///
1894    /// Determines the minimum epoch required by active transactions and prunes
1895    /// version chains older than that threshold. Also cleans up completed
1896    /// transaction metadata in the transaction manager, and prunes the CDC
1897    /// event log according to its retention policy.
1898    pub fn gc(&self) {
1899        #[cfg(feature = "lpg")]
1900        let current_epoch = {
1901            let min_epoch = self.transaction_manager.min_active_epoch();
1902            self.lpg_store().gc_versions(min_epoch);
1903            self.transaction_manager.current_epoch()
1904        };
1905        self.transaction_manager.gc();
1906
1907        // Prune CDC events based on retention config (epoch + count limits)
1908        #[cfg(feature = "cdc")]
1909        if self.cdc_enabled.load(std::sync::atomic::Ordering::Relaxed) {
1910            #[cfg(feature = "lpg")]
1911            self.cdc_log.apply_retention(current_epoch);
1912        }
1913    }
1914
1915    /// Returns the buffer manager for memory-aware operations.
1916    #[must_use]
1917    pub fn buffer_manager(&self) -> &Arc<BufferManager> {
1918        &self.buffer_manager
1919    }
1920
1921    /// Returns the query cache.
1922    #[must_use]
1923    pub fn query_cache(&self) -> &Arc<QueryCache> {
1924        &self.query_cache
1925    }
1926
1927    /// Clears all cached query plans.
1928    ///
1929    /// This is called automatically after DDL operations, but can also be
1930    /// invoked manually after external schema changes (e.g., WAL replay,
1931    /// import) or when you want to force re-optimization of all queries.
1932    pub fn clear_plan_cache(&self) {
1933        self.query_cache.clear();
1934    }
1935
1936    // =========================================================================
1937    // Lifecycle
1938    // =========================================================================
1939
1940    /// Closes the database, flushing all pending writes.
1941    ///
1942    /// For persistent databases, this ensures everything is safely on disk.
1943    /// Called automatically when the database is dropped, but you can call
1944    /// it explicitly if you need to guarantee durability at a specific point.
1945    ///
1946    /// # Errors
1947    ///
1948    /// Returns an error if the WAL can't be flushed (check disk space/permissions).
1949    pub fn close(&self) -> Result<()> {
1950        let mut is_open = self.is_open.write();
1951        if !*is_open {
1952            return Ok(());
1953        }
1954
1955        // Stop the periodic checkpoint timer first, even for read-only databases.
1956        // compact() can switch a writable DB to read-only after the timer started,
1957        // so the timer must be stopped before any early return to avoid racing
1958        // with the closed file manager.
1959        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
1960        if let Some(mut timer) = self.checkpoint_timer.lock().take() {
1961            timer.stop();
1962        }
1963
1964        // Read-only databases: just release the shared lock, no checkpointing
1965        if self.read_only {
1966            #[cfg(feature = "grafeo-file")]
1967            if let Some(ref fm) = self.file_manager {
1968                fm.close()?;
1969            }
1970            *is_open = false;
1971            return Ok(());
1972        }
1973
1974        // For single-file format: checkpoint to .grafeo file, then clean up sidecar WAL.
1975        // We must do this BEFORE the WAL close path because checkpoint_to_file
1976        // removes the sidecar WAL directory.
1977        #[cfg(feature = "grafeo-file")]
1978        let is_single_file = self.file_manager.is_some();
1979        #[cfg(not(feature = "grafeo-file"))]
1980        let is_single_file = false;
1981
1982        #[cfg(feature = "grafeo-file")]
1983        if let Some(ref fm) = self.file_manager {
1984            // Flush WAL first so all records are on disk before we snapshot
1985            #[cfg(feature = "wal")]
1986            if let Some(ref wal) = self.wal {
1987                wal.sync()?;
1988            }
1989            let flush_result = self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?;
1990
1991            // Safety check: if WAL has records but the checkpoint was a no-op
1992            // (zero sections written), the container file may not contain the
1993            // latest data. This can happen when sections are not marked dirty
1994            // despite mutations going through the WAL. Force-dirty all sections
1995            // and retry before removing the sidecar.
1996            #[cfg(feature = "wal")]
1997            let flush_result = if flush_result.sections_written == 0 {
1998                if let Some(ref wal) = self.wal {
1999                    if wal.record_count() > 0 {
2000                        grafeo_warn!(
2001                            "WAL has {} records but checkpoint wrote 0 sections; retrying with forced flush",
2002                            wal.record_count()
2003                        );
2004                        self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?
2005                    } else {
2006                        flush_result
2007                    }
2008                } else {
2009                    flush_result
2010                }
2011            } else {
2012                flush_result
2013            };
2014
2015            // Release WAL file handles before removing sidecar directory.
2016            // On Windows, open handles prevent directory deletion.
2017            #[cfg(feature = "wal")]
2018            if let Some(ref wal) = self.wal {
2019                wal.close_active_log();
2020            }
2021
2022            // Only remove the sidecar WAL after verifying the checkpoint wrote
2023            // data to the container. If nothing was written and the WAL had
2024            // records, keep the sidecar so the next open can recover from it.
2025            #[cfg(feature = "wal")]
2026            let has_wal_records = self.wal.as_ref().is_some_and(|wal| wal.record_count() > 0);
2027            #[cfg(not(feature = "wal"))]
2028            let has_wal_records = false;
2029
2030            if flush_result.sections_written > 0 || !has_wal_records {
2031                {
2032                    use grafeo_common::testing::crash::maybe_crash;
2033                    maybe_crash("close:before_remove_sidecar_wal");
2034                }
2035                fm.remove_sidecar_wal()?;
2036            } else {
2037                grafeo_warn!(
2038                    "keeping sidecar WAL for recovery: checkpoint wrote 0 sections but WAL has records"
2039                );
2040            }
2041            fm.close()?;
2042        }
2043
2044        // Commit and sync WAL (legacy directory format only).
2045        // We intentionally do NOT call wal.checkpoint() here. Directory format
2046        // has no snapshot: the WAL files are the sole source of truth. Writing
2047        // checkpoint.meta would cause recovery to skip older WAL files, losing
2048        // all data that predates the current log sequence.
2049        #[cfg(feature = "wal")]
2050        if !is_single_file && let Some(ref wal) = self.wal {
2051            // Use the last assigned transaction ID, or create one for the commit record
2052            let commit_tx = self
2053                .transaction_manager
2054                .last_assigned_transaction_id()
2055                .unwrap_or_else(|| self.transaction_manager.begin());
2056
2057            // Log a TransactionCommit to mark all pending records as committed
2058            wal.log(&WalRecord::TransactionCommit {
2059                transaction_id: commit_tx,
2060            })?;
2061
2062            wal.sync()?;
2063        }
2064
2065        *is_open = false;
2066        Ok(())
2067    }
2068
2069    /// Returns the typed WAL if available.
2070    #[cfg(feature = "wal")]
2071    #[must_use]
2072    pub fn wal(&self) -> Option<&Arc<LpgWal>> {
2073        self.wal.as_ref()
2074    }
2075
2076    /// Logs a WAL record if WAL is enabled.
2077    #[cfg(feature = "wal")]
2078    pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
2079        if let Some(ref wal) = self.wal {
2080            wal.log(record)?;
2081        }
2082        Ok(())
2083    }
2084
2085    /// Registers storage sections as [`MemoryConsumer`]s with the BufferManager.
2086    ///
2087    /// Each section reports its memory usage to the buffer manager, enabling
2088    /// accurate pressure tracking. Called once after database construction.
2089    fn register_section_consumers(&mut self) {
2090        #[cfg(feature = "lpg")]
2091        let store_ref = self.store.as_ref();
2092        #[cfg(not(feature = "lpg"))]
2093        // LPG store section
2094        #[cfg(feature = "lpg")]
2095        if let Some(store) = store_ref {
2096            let lpg = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
2097            self.buffer_manager.register_consumer(Arc::new(
2098                section_consumer::SectionConsumer::new(Arc::new(lpg)),
2099            ));
2100        }
2101
2102        // RDF store: only when data exists
2103        #[cfg(feature = "triple-store")]
2104        if !self.rdf_store.is_empty() || self.rdf_store.graph_count() > 0 {
2105            let rdf = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(&self.rdf_store));
2106            self.buffer_manager.register_consumer(Arc::new(
2107                section_consumer::SectionConsumer::new(Arc::new(rdf)),
2108            ));
2109        }
2110
2111        // Ring Index: only when Ring has been built
2112        #[cfg(feature = "ring-index")]
2113        if self.rdf_store.ring().is_some() {
2114            let ring = grafeo_core::index::ring::RdfRingSection::new(Arc::clone(&self.rdf_store));
2115            self.buffer_manager.register_consumer(Arc::new(
2116                section_consumer::SectionConsumer::new(Arc::new(ring)),
2117            ));
2118        }
2119
2120        // Vector indexes: dynamic consumer that re-queries the store on each
2121        // memory_usage() call, so dropped indexes are freed and new ones tracked.
2122        #[cfg(all(
2123            feature = "lpg",
2124            feature = "vector-index",
2125            feature = "mmap",
2126            not(feature = "temporal")
2127        ))]
2128        if let Some(store) = store_ref {
2129            let spill_path = self.buffer_manager.config().spill_path.clone();
2130            let consumer = Arc::new(section_consumer::VectorIndexConsumer::new(
2131                store, spill_path,
2132            ));
2133            // Share the spill registry with the search path
2134            self.vector_spill_storages = Some(Arc::clone(consumer.spilled_storages()));
2135            self.buffer_manager.register_consumer(consumer);
2136        }
2137
2138        // Text indexes: same dynamic approach as vector indexes.
2139        #[cfg(all(feature = "lpg", feature = "text-index"))]
2140        if let Some(store) = store_ref {
2141            self.buffer_manager
2142                .register_consumer(Arc::new(section_consumer::TextIndexConsumer::new(store)));
2143        }
2144
2145        // CDC log: register as memory consumer so the buffer manager can
2146        // prune events under memory pressure.
2147        #[cfg(feature = "cdc")]
2148        self.buffer_manager.register_consumer(
2149            Arc::clone(&self.cdc_log) as Arc<dyn grafeo_common::memory::MemoryConsumer>
2150        );
2151    }
2152
2153    /// Discovers and re-opens spill files from a previous session.
2154    ///
2155    /// When the database was closed with spilled vector embeddings, the
2156    /// `vectors_*.bin` files persist in the spill directory. This method
2157    /// scans for them, opens each as `MmapStorage`, and registers them
2158    /// in the `vector_spill_storages` map so search can read from them.
2159    #[cfg(all(
2160        feature = "lpg",
2161        feature = "vector-index",
2162        feature = "mmap",
2163        not(feature = "temporal")
2164    ))]
2165    fn restore_spill_files(&mut self) {
2166        use grafeo_core::index::vector::MmapStorage;
2167
2168        let spill_dir = match self.buffer_manager.config().spill_path {
2169            Some(ref path) => path.clone(),
2170            None => return,
2171        };
2172
2173        if !spill_dir.exists() {
2174            return;
2175        }
2176
2177        let spill_map = match self.vector_spill_storages {
2178            Some(ref map) => Arc::clone(map),
2179            None => return,
2180        };
2181
2182        let Ok(entries) = std::fs::read_dir(&spill_dir) else {
2183            return;
2184        };
2185
2186        let Some(ref store) = self.store else {
2187            return;
2188        };
2189
2190        for entry in entries.flatten() {
2191            let path = entry.path();
2192            let file_name = match path.file_name().and_then(|n| n.to_str()) {
2193                Some(name) => name.to_string(),
2194                None => continue,
2195            };
2196
2197            // Match pattern: vectors_{key}.bin where key is percent-encoded
2198            if !file_name.starts_with("vectors_")
2199                || !std::path::Path::new(&file_name)
2200                    .extension()
2201                    .is_some_and(|ext| ext.eq_ignore_ascii_case("bin"))
2202            {
2203                continue;
2204            }
2205
2206            // Extract and decode key: "vectors_Label%3Aembedding.bin" -> "Label:embedding"
2207            let key_part = &file_name["vectors_".len()..file_name.len() - ".bin".len()];
2208
2209            // Percent-decode: %3A -> ':', %25 -> '%'
2210            let key = key_part.replace("%3A", ":").replace("%25", "%");
2211
2212            // Key must contain ':' (label:property format)
2213            if !key.contains(':') {
2214                // Legacy file with old encoding, skip (will be re-created on next spill)
2215                continue;
2216            }
2217
2218            // Only restore if the corresponding vector index exists
2219            if store.get_vector_index_by_key(&key).is_none() {
2220                // Stale spill file (index was dropped), clean it up
2221                let _ = std::fs::remove_file(&path);
2222                continue;
2223            }
2224
2225            // Open the MmapStorage
2226            match MmapStorage::open(&path) {
2227                Ok(mmap_storage) => {
2228                    // Mark the property column as spilled so get() returns None
2229                    let property = key.split(':').nth(1).unwrap_or("");
2230                    let prop_key = grafeo_common::types::PropertyKey::new(property);
2231                    store.node_properties_mark_spilled(&prop_key);
2232
2233                    spill_map.write().insert(key, Arc::new(mmap_storage));
2234                }
2235                Err(e) => {
2236                    eprintln!("failed to restore spill file {}: {e}", path.display());
2237                    // Remove corrupt spill file
2238                    let _ = std::fs::remove_file(&path);
2239                }
2240            }
2241        }
2242    }
2243
2244    /// Builds section objects for the current database state.
2245    #[cfg(feature = "grafeo-file")]
2246    fn build_sections(&self) -> Vec<Box<dyn grafeo_common::storage::Section>> {
2247        let mut sections: Vec<Box<dyn grafeo_common::storage::Section>> = Vec::new();
2248
2249        // Layered store: serialize both the compact base and the overlay.
2250        #[cfg(all(feature = "compact-store", feature = "lpg"))]
2251        if let Some(ref layered) = self.layered_store {
2252            // Compact base section.
2253            let compact_section = grafeo_core::graph::compact::section::CompactStoreSection::new(
2254                layered.base_store_arc(),
2255            );
2256            sections.push(Box::new(compact_section));
2257
2258            // Overlay LPG section.
2259            let overlay = layered.overlay_store();
2260            let overlay_section =
2261                grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(overlay));
2262            sections.push(Box::new(overlay_section));
2263
2264            return sections;
2265        }
2266
2267        // LPG sections: store, catalog, vector indexes, text indexes
2268        #[cfg(feature = "lpg")]
2269        if let Some(store) = self.store.as_ref() {
2270            let lpg = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
2271
2272            let catalog = catalog_section::CatalogSection::new(
2273                Arc::clone(&self.catalog),
2274                Arc::clone(store),
2275                {
2276                    let tm = Arc::clone(&self.transaction_manager);
2277                    move || tm.current_epoch().as_u64()
2278                },
2279            );
2280
2281            sections.push(Box::new(catalog));
2282            sections.push(Box::new(lpg));
2283
2284            // Vector indexes: persist HNSW topology to avoid rebuild on load
2285            #[cfg(feature = "vector-index")]
2286            {
2287                let indexes = store.vector_index_entries();
2288                if !indexes.is_empty() {
2289                    let vector = grafeo_core::index::vector::VectorStoreSection::new(indexes);
2290                    sections.push(Box::new(vector));
2291                }
2292            }
2293
2294            // Text indexes: persist BM25 postings to avoid rebuild on load
2295            #[cfg(feature = "text-index")]
2296            {
2297                let indexes = store.text_index_entries();
2298                if !indexes.is_empty() {
2299                    let text = grafeo_core::index::text::TextIndexSection::new(indexes);
2300                    sections.push(Box::new(text));
2301                }
2302            }
2303        }
2304
2305        #[cfg(feature = "triple-store")]
2306        if !self.rdf_store.is_empty() || self.rdf_store.graph_count() > 0 {
2307            let rdf = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(&self.rdf_store));
2308            sections.push(Box::new(rdf));
2309        }
2310
2311        #[cfg(feature = "ring-index")]
2312        if self.rdf_store.ring().is_some() {
2313            let ring = grafeo_core::index::ring::RdfRingSection::new(Arc::clone(&self.rdf_store));
2314            sections.push(Box::new(ring));
2315        }
2316
2317        sections
2318    }
2319
2320    // =========================================================================
2321    // Backup API
2322    // =========================================================================
2323
2324    /// Creates a full backup of the database in the given directory.
2325    ///
2326    /// Checkpoints the database, copies the `.grafeo` file, and creates a
2327    /// backup manifest. Subsequent incremental backups will use this as the
2328    /// base.
2329    ///
2330    /// # Errors
2331    ///
2332    /// Returns an error if the database has no file manager or I/O fails.
2333    #[cfg(all(feature = "wal", feature = "grafeo-file", feature = "lpg"))]
2334    pub fn backup_full(&self, backup_dir: &std::path::Path) -> Result<backup::BackupSegment> {
2335        let fm = self
2336            .file_manager
2337            .as_ref()
2338            .ok_or_else(|| Error::Internal("backup requires a persistent database".to_string()))?;
2339
2340        // Checkpoint to ensure the container has the latest data.
2341        // Skip for read-only databases: the on-disk file is already a valid
2342        // snapshot and the file manager rejects writes.
2343        if !self.read_only {
2344            let _ = self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?;
2345        }
2346
2347        let current_epoch = self.transaction_manager.current_epoch();
2348        backup::do_backup_full(backup_dir, fm, self.wal.as_deref(), current_epoch)
2349    }
2350
2351    /// Creates an incremental backup containing WAL records since the last backup.
2352    ///
2353    /// Requires a prior full backup in the backup directory.
2354    ///
2355    /// # Errors
2356    ///
2357    /// Returns an error if no full backup exists, or if the WAL has no new records.
2358    #[cfg(all(feature = "wal", feature = "grafeo-file", feature = "lpg"))]
2359    pub fn backup_incremental(
2360        &self,
2361        backup_dir: &std::path::Path,
2362    ) -> Result<backup::BackupSegment> {
2363        let wal = self
2364            .wal
2365            .as_ref()
2366            .ok_or_else(|| Error::Internal("incremental backup requires WAL".to_string()))?;
2367
2368        let current_epoch = self.transaction_manager.current_epoch();
2369        backup::do_backup_incremental(backup_dir, wal, current_epoch)
2370    }
2371
2372    /// Returns the backup manifest for a backup directory, if one exists.
2373    ///
2374    /// # Errors
2375    ///
2376    /// Returns an error if the manifest file exists but cannot be parsed.
2377    #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2378    pub fn read_backup_manifest(
2379        backup_dir: &std::path::Path,
2380    ) -> Result<Option<backup::BackupManifest>> {
2381        backup::read_manifest(backup_dir)
2382    }
2383
2384    /// Returns the current backup cursor (last backed-up position), if any.
2385    #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2386    #[must_use]
2387    pub fn backup_cursor(&self) -> Option<backup::BackupCursor> {
2388        self.wal
2389            .as_ref()
2390            .and_then(|wal| backup::read_backup_cursor(wal.dir()).ok().flatten())
2391    }
2392
2393    /// Restores a database from a backup chain to a specific epoch.
2394    ///
2395    /// Copies the full backup to `output_path`, then replays incremental
2396    /// WAL segments up to `target_epoch`. The restored database can be
2397    /// opened with [`GrafeoDB::open`].
2398    ///
2399    /// # Errors
2400    ///
2401    /// Returns an error if the backup chain does not cover the target epoch,
2402    /// segment checksums fail, or I/O fails.
2403    #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2404    pub fn restore_to_epoch(
2405        backup_dir: &std::path::Path,
2406        target_epoch: grafeo_common::types::EpochId,
2407        output_path: &std::path::Path,
2408    ) -> Result<()> {
2409        backup::do_restore_to_epoch(backup_dir, target_epoch, output_path)
2410    }
2411
2412    /// Writes the current database state to the `.grafeo` file using the unified flush.
2413    ///
2414    /// Does NOT remove the sidecar WAL: callers that want to clean up
2415    /// the sidecar (e.g. `close()`) should call `fm.remove_sidecar_wal()`
2416    /// separately after this returns.
2417    #[cfg(feature = "grafeo-file")]
2418    fn checkpoint_to_file(
2419        &self,
2420        fm: &GrafeoFileManager,
2421        reason: flush::FlushReason,
2422    ) -> Result<flush::FlushResult> {
2423        let sections = self.build_sections();
2424        let section_refs: Vec<&dyn grafeo_common::storage::Section> =
2425            sections.iter().map(|s| s.as_ref()).collect();
2426        #[cfg(feature = "lpg")]
2427        let context = flush::build_context(self.lpg_store(), &self.transaction_manager);
2428        #[cfg(not(feature = "lpg"))]
2429        let context = flush::build_context_minimal(&self.transaction_manager);
2430
2431        flush::flush(
2432            fm,
2433            &section_refs,
2434            &context,
2435            reason,
2436            #[cfg(feature = "wal")]
2437            self.wal.as_deref(),
2438        )
2439    }
2440
2441    /// Returns the file manager if using single-file format.
2442    #[cfg(feature = "grafeo-file")]
2443    #[must_use]
2444    pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
2445        self.file_manager.as_ref()
2446    }
2447}
2448
2449impl Drop for GrafeoDB {
2450    fn drop(&mut self) {
2451        if let Err(e) = self.close() {
2452            grafeo_error!("Error closing database: {}", e);
2453        }
2454    }
2455}
2456
2457#[cfg(feature = "lpg")]
2458impl crate::admin::AdminService for GrafeoDB {
2459    fn info(&self) -> crate::admin::DatabaseInfo {
2460        self.info()
2461    }
2462
2463    fn detailed_stats(&self) -> crate::admin::DatabaseStats {
2464        self.detailed_stats()
2465    }
2466
2467    fn schema(&self) -> crate::admin::SchemaInfo {
2468        self.schema()
2469    }
2470
2471    fn validate(&self) -> crate::admin::ValidationResult {
2472        self.validate()
2473    }
2474
2475    fn wal_status(&self) -> crate::admin::WalStatus {
2476        self.wal_status()
2477    }
2478
2479    fn wal_checkpoint(&self) -> Result<()> {
2480        self.wal_checkpoint()
2481    }
2482}
2483
2484// =========================================================================
2485// Query Result Types
2486// =========================================================================
2487
2488/// The result of running a query.
2489///
2490/// Contains rows and columns, like a table. Use [`iter()`](Self::iter) to
2491/// loop through rows, or [`scalar()`](Self::scalar) if you expect a single value.
2492///
2493/// # Examples
2494///
2495/// ```
2496/// use grafeo_engine::GrafeoDB;
2497///
2498/// let db = GrafeoDB::new_in_memory();
2499/// db.create_node(&["Person"]);
2500///
2501/// let result = db.execute("MATCH (p:Person) RETURN count(p) AS total")?;
2502///
2503/// // Check what we got
2504/// println!("Columns: {:?}", result.columns);
2505/// println!("Rows: {}", result.row_count());
2506///
2507/// // Iterate through results
2508/// for row in result.iter() {
2509///     println!("{:?}", row);
2510/// }
2511/// # Ok::<(), grafeo_common::utils::error::Error>(())
2512/// ```
2513#[derive(Debug)]
2514pub struct QueryResult {
2515    /// Column names from the RETURN clause.
2516    pub columns: Vec<String>,
2517    /// Column types - useful for distinguishing NodeId/EdgeId from plain integers.
2518    pub column_types: Vec<grafeo_common::types::LogicalType>,
2519    /// The actual result rows.
2520    ///
2521    /// Use [`rows()`](Self::rows) for borrowed access or
2522    /// [`into_rows()`](Self::into_rows) to take ownership.
2523    pub(crate) rows: Vec<Vec<grafeo_common::types::Value>>,
2524    /// Query execution time in milliseconds (if timing was enabled).
2525    pub execution_time_ms: Option<f64>,
2526    /// Number of rows scanned during query execution (estimate).
2527    pub rows_scanned: Option<u64>,
2528    /// Status message for DDL and session commands (e.g., "Created node type 'Person'").
2529    pub status_message: Option<String>,
2530    /// GQLSTATUS code per ISO/IEC 39075:2024, sec 23.
2531    pub gql_status: grafeo_common::utils::GqlStatus,
2532}
2533
2534impl QueryResult {
2535    /// Creates a fully empty query result (no columns, no rows).
2536    #[must_use]
2537    pub fn empty() -> Self {
2538        Self {
2539            columns: Vec::new(),
2540            column_types: Vec::new(),
2541            rows: Vec::new(),
2542            execution_time_ms: None,
2543            rows_scanned: None,
2544            status_message: None,
2545            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2546        }
2547    }
2548
2549    /// Creates a query result with only a status message (for DDL commands).
2550    #[must_use]
2551    pub fn status(msg: impl Into<String>) -> Self {
2552        Self {
2553            columns: Vec::new(),
2554            column_types: Vec::new(),
2555            rows: Vec::new(),
2556            execution_time_ms: None,
2557            rows_scanned: None,
2558            status_message: Some(msg.into()),
2559            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2560        }
2561    }
2562
2563    /// Creates a new empty query result.
2564    #[must_use]
2565    pub fn new(columns: Vec<String>) -> Self {
2566        let len = columns.len();
2567        Self {
2568            columns,
2569            column_types: vec![grafeo_common::types::LogicalType::Any; len],
2570            rows: Vec::new(),
2571            execution_time_ms: None,
2572            rows_scanned: None,
2573            status_message: None,
2574            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2575        }
2576    }
2577
2578    /// Creates a new empty query result with column types.
2579    #[must_use]
2580    pub fn with_types(
2581        columns: Vec<String>,
2582        column_types: Vec<grafeo_common::types::LogicalType>,
2583    ) -> Self {
2584        Self {
2585            columns,
2586            column_types,
2587            rows: Vec::new(),
2588            execution_time_ms: None,
2589            rows_scanned: None,
2590            status_message: None,
2591            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2592        }
2593    }
2594
2595    /// Creates a query result with pre-populated rows.
2596    #[must_use]
2597    pub fn from_rows(columns: Vec<String>, rows: Vec<Vec<grafeo_common::types::Value>>) -> Self {
2598        let len = columns.len();
2599        Self {
2600            columns,
2601            column_types: vec![grafeo_common::types::LogicalType::Any; len],
2602            rows,
2603            execution_time_ms: None,
2604            rows_scanned: None,
2605            status_message: None,
2606            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2607        }
2608    }
2609
2610    /// Appends a row to this result.
2611    pub fn push_row(&mut self, row: Vec<grafeo_common::types::Value>) {
2612        self.rows.push(row);
2613    }
2614
2615    /// Sets the execution metrics on this result.
2616    pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
2617        self.execution_time_ms = Some(execution_time_ms);
2618        self.rows_scanned = Some(rows_scanned);
2619        self
2620    }
2621
2622    /// Returns the execution time in milliseconds, if available.
2623    #[must_use]
2624    pub fn execution_time_ms(&self) -> Option<f64> {
2625        self.execution_time_ms
2626    }
2627
2628    /// Returns the number of rows scanned, if available.
2629    #[must_use]
2630    pub fn rows_scanned(&self) -> Option<u64> {
2631        self.rows_scanned
2632    }
2633
2634    /// Returns the number of rows.
2635    #[must_use]
2636    pub fn row_count(&self) -> usize {
2637        self.rows.len()
2638    }
2639
2640    /// Returns the number of columns.
2641    #[must_use]
2642    pub fn column_count(&self) -> usize {
2643        self.columns.len()
2644    }
2645
2646    /// Returns true if the result is empty.
2647    #[must_use]
2648    pub fn is_empty(&self) -> bool {
2649        self.rows.is_empty()
2650    }
2651
2652    /// Extracts a single value from the result.
2653    ///
2654    /// Use this when your query returns exactly one row with one column,
2655    /// like `RETURN count(n)` or `RETURN sum(p.amount)`.
2656    ///
2657    /// # Errors
2658    ///
2659    /// Returns an error if the result has multiple rows or columns.
2660    pub fn scalar<T: FromValue>(&self) -> Result<T> {
2661        if self.rows.len() != 1 || self.columns.len() != 1 {
2662            return Err(grafeo_common::utils::error::Error::InvalidValue(
2663                "Expected single value".to_string(),
2664            ));
2665        }
2666        T::from_value(&self.rows[0][0])
2667    }
2668
2669    /// Returns a slice of all result rows.
2670    #[must_use]
2671    pub fn rows(&self) -> &[Vec<grafeo_common::types::Value>] {
2672        &self.rows
2673    }
2674
2675    /// Takes ownership of all result rows.
2676    #[must_use]
2677    pub fn into_rows(self) -> Vec<Vec<grafeo_common::types::Value>> {
2678        self.rows
2679    }
2680
2681    /// Returns an iterator over the rows.
2682    pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
2683        self.rows.iter()
2684    }
2685
2686    /// Converts this query result to an Arrow [`RecordBatch`](arrow_array::RecordBatch).
2687    ///
2688    /// Each column in the result becomes an Arrow array. Type mapping:
2689    /// - `Int64` / `Float64` / `Bool` / `String` / `Bytes`: direct Arrow equivalents
2690    /// - `Timestamp` / `ZonedDatetime`: `Timestamp(Microsecond, UTC)`
2691    /// - `Date`: `Date32`, `Time`: `Time64(Nanosecond)`
2692    /// - `Vector`: `FixedSizeList(Float32, dim)`
2693    /// - `Duration` / `List` / `Map` / `Path`: serialized as `Utf8`
2694    ///
2695    /// Heterogeneous columns (mixed types) fall back to `Utf8`.
2696    ///
2697    /// # Errors
2698    ///
2699    /// Returns [`ArrowExportError`](arrow::ArrowExportError) if Arrow array construction fails.
2700    #[cfg(feature = "arrow-export")]
2701    pub fn to_record_batch(
2702        &self,
2703    ) -> std::result::Result<arrow_array::RecordBatch, arrow::ArrowExportError> {
2704        arrow::query_result_to_record_batch(&self.columns, &self.column_types, &self.rows)
2705    }
2706
2707    /// Serializes this query result as Arrow IPC stream bytes.
2708    ///
2709    /// The returned bytes can be read by any Arrow implementation:
2710    /// - Python: `pyarrow.ipc.open_stream(buf).read_all()`
2711    /// - Polars: `pl.read_ipc(buf)`
2712    /// - Node.js: `apache-arrow` `RecordBatchStreamReader`
2713    ///
2714    /// # Errors
2715    ///
2716    /// Returns [`ArrowExportError`](arrow::ArrowExportError) on conversion or serialization failure.
2717    #[cfg(feature = "arrow-export")]
2718    pub fn to_arrow_ipc(&self) -> std::result::Result<Vec<u8>, arrow::ArrowExportError> {
2719        let batch = self.to_record_batch()?;
2720        arrow::record_batch_to_ipc_stream(&batch)
2721    }
2722}
2723
2724impl std::fmt::Display for QueryResult {
2725    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2726        let table = grafeo_common::fmt::format_result_table(
2727            &self.columns,
2728            &self.rows,
2729            self.execution_time_ms,
2730            self.status_message.as_deref(),
2731        );
2732        f.write_str(&table)
2733    }
2734}
2735
2736/// Converts a [`grafeo_common::types::Value`] to a concrete Rust type.
2737///
2738/// Implemented for common types like `i64`, `f64`, `String`, and `bool`.
2739/// Used by [`QueryResult::scalar()`] to extract typed values.
2740pub trait FromValue: Sized {
2741    /// Attempts the conversion, returning an error on type mismatch.
2742    ///
2743    /// # Errors
2744    ///
2745    /// Returns `Error::TypeMismatch` if the value is not the expected type.
2746    fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
2747}
2748
2749impl FromValue for i64 {
2750    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2751        value
2752            .as_int64()
2753            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2754                expected: "INT64".to_string(),
2755                found: value.type_name().to_string(),
2756            })
2757    }
2758}
2759
2760impl FromValue for f64 {
2761    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2762        value
2763            .as_float64()
2764            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2765                expected: "FLOAT64".to_string(),
2766                found: value.type_name().to_string(),
2767            })
2768    }
2769}
2770
2771impl FromValue for String {
2772    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2773        value.as_str().map(String::from).ok_or_else(|| {
2774            grafeo_common::utils::error::Error::TypeMismatch {
2775                expected: "STRING".to_string(),
2776                found: value.type_name().to_string(),
2777            }
2778        })
2779    }
2780}
2781
2782impl FromValue for bool {
2783    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2784        value
2785            .as_bool()
2786            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2787                expected: "BOOL".to_string(),
2788                found: value.type_name().to_string(),
2789            })
2790    }
2791}
2792
2793#[cfg(test)]
2794mod tests {
2795    use super::*;
2796
2797    #[test]
2798    fn test_create_in_memory_database() {
2799        let db = GrafeoDB::new_in_memory();
2800        assert_eq!(db.node_count(), 0);
2801        assert_eq!(db.edge_count(), 0);
2802    }
2803
2804    #[test]
2805    fn test_database_config() {
2806        let config = Config::in_memory().with_threads(4).with_query_logging();
2807
2808        let db = GrafeoDB::with_config(config).unwrap();
2809        assert_eq!(db.config().threads, 4);
2810        assert!(db.config().query_logging);
2811    }
2812
2813    #[test]
2814    fn test_database_session() {
2815        let db = GrafeoDB::new_in_memory();
2816        let _session = db.session();
2817        // Session should be created successfully
2818    }
2819
2820    #[cfg(feature = "wal")]
2821    #[test]
2822    fn test_persistent_database_recovery() {
2823        use grafeo_common::types::Value;
2824        use tempfile::tempdir;
2825
2826        let dir = tempdir().unwrap();
2827        let db_path = dir.path().join("test_db");
2828
2829        // Create database and add some data
2830        {
2831            let db = GrafeoDB::open(&db_path).unwrap();
2832
2833            let alix = db.create_node(&["Person"]);
2834            db.set_node_property(alix, "name", Value::from("Alix"));
2835
2836            let gus = db.create_node(&["Person"]);
2837            db.set_node_property(gus, "name", Value::from("Gus"));
2838
2839            let _edge = db.create_edge(alix, gus, "KNOWS");
2840
2841            // Explicitly close to flush WAL
2842            db.close().unwrap();
2843        }
2844
2845        // Reopen and verify data was recovered
2846        {
2847            let db = GrafeoDB::open(&db_path).unwrap();
2848
2849            assert_eq!(db.node_count(), 2);
2850            assert_eq!(db.edge_count(), 1);
2851
2852            // Verify nodes exist
2853            let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
2854            assert!(node0.is_some());
2855
2856            let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
2857            assert!(node1.is_some());
2858        }
2859    }
2860
2861    #[cfg(feature = "wal")]
2862    #[test]
2863    fn test_wal_logging() {
2864        use tempfile::tempdir;
2865
2866        let dir = tempdir().unwrap();
2867        let db_path = dir.path().join("wal_test_db");
2868
2869        let db = GrafeoDB::open(&db_path).unwrap();
2870
2871        // Create some data
2872        let node = db.create_node(&["Test"]);
2873        db.delete_node(node);
2874
2875        // WAL should have records
2876        if let Some(wal) = db.wal() {
2877            assert!(wal.record_count() > 0);
2878        }
2879
2880        db.close().unwrap();
2881    }
2882
2883    #[cfg(feature = "wal")]
2884    #[test]
2885    fn test_wal_recovery_multiple_sessions() {
2886        // Tests that WAL recovery works correctly across multiple open/close cycles
2887        use grafeo_common::types::Value;
2888        use tempfile::tempdir;
2889
2890        let dir = tempdir().unwrap();
2891        let db_path = dir.path().join("multi_session_db");
2892
2893        // Session 1: Create initial data
2894        {
2895            let db = GrafeoDB::open(&db_path).unwrap();
2896            let alix = db.create_node(&["Person"]);
2897            db.set_node_property(alix, "name", Value::from("Alix"));
2898            db.close().unwrap();
2899        }
2900
2901        // Session 2: Add more data
2902        {
2903            let db = GrafeoDB::open(&db_path).unwrap();
2904            assert_eq!(db.node_count(), 1); // Previous data recovered
2905            let gus = db.create_node(&["Person"]);
2906            db.set_node_property(gus, "name", Value::from("Gus"));
2907            db.close().unwrap();
2908        }
2909
2910        // Session 3: Verify all data
2911        {
2912            let db = GrafeoDB::open(&db_path).unwrap();
2913            assert_eq!(db.node_count(), 2);
2914
2915            // Verify properties were recovered correctly
2916            let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
2917            assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
2918
2919            let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
2920            assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
2921        }
2922    }
2923
2924    #[cfg(feature = "wal")]
2925    #[test]
2926    fn test_database_consistency_after_mutations() {
2927        // Tests that database remains consistent after a series of create/delete operations
2928        use grafeo_common::types::Value;
2929        use tempfile::tempdir;
2930
2931        let dir = tempdir().unwrap();
2932        let db_path = dir.path().join("consistency_db");
2933
2934        {
2935            let db = GrafeoDB::open(&db_path).unwrap();
2936
2937            // Create nodes
2938            let a = db.create_node(&["Node"]);
2939            let b = db.create_node(&["Node"]);
2940            let c = db.create_node(&["Node"]);
2941
2942            // Create edges
2943            let e1 = db.create_edge(a, b, "LINKS");
2944            let _e2 = db.create_edge(b, c, "LINKS");
2945
2946            // Delete middle node and its edge
2947            db.delete_edge(e1);
2948            db.delete_node(b);
2949
2950            // Set properties on remaining nodes
2951            db.set_node_property(a, "value", Value::Int64(1));
2952            db.set_node_property(c, "value", Value::Int64(3));
2953
2954            db.close().unwrap();
2955        }
2956
2957        // Reopen and verify consistency
2958        {
2959            let db = GrafeoDB::open(&db_path).unwrap();
2960
2961            // Should have 2 nodes (a and c), b was deleted
2962            // Note: node_count includes deleted nodes in some implementations
2963            // What matters is that the non-deleted nodes are accessible
2964            let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
2965            assert!(node_a.is_some());
2966
2967            let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
2968            assert!(node_c.is_some());
2969
2970            // Middle node should be deleted
2971            let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
2972            assert!(node_b.is_none());
2973        }
2974    }
2975
2976    #[cfg(feature = "wal")]
2977    #[test]
2978    fn test_close_is_idempotent() {
2979        // Calling close() multiple times should not cause errors
2980        use tempfile::tempdir;
2981
2982        let dir = tempdir().unwrap();
2983        let db_path = dir.path().join("close_test_db");
2984
2985        let db = GrafeoDB::open(&db_path).unwrap();
2986        db.create_node(&["Test"]);
2987
2988        // First close should succeed
2989        assert!(db.close().is_ok());
2990
2991        // Second close should also succeed (idempotent)
2992        assert!(db.close().is_ok());
2993    }
2994
2995    #[test]
2996    fn test_with_store_external_backend() {
2997        use grafeo_core::graph::lpg::LpgStore;
2998
2999        let external = Arc::new(LpgStore::new().unwrap());
3000
3001        // Seed data on the external store directly
3002        let n1 = external.create_node(&["Person"]);
3003        external.set_node_property(n1, "name", grafeo_common::types::Value::from("Alix"));
3004
3005        let db = GrafeoDB::with_store(
3006            Arc::clone(&external) as Arc<dyn GraphStoreMut>,
3007            Config::in_memory(),
3008        )
3009        .unwrap();
3010
3011        let session = db.session();
3012
3013        // Session should see data from the external store via execute
3014        #[cfg(feature = "gql")]
3015        {
3016            let result = session.execute("MATCH (p:Person) RETURN p.name").unwrap();
3017            assert_eq!(result.rows.len(), 1);
3018        }
3019    }
3020
3021    #[test]
3022    fn test_with_config_custom_memory_limit() {
3023        let config = Config::in_memory().with_memory_limit(64 * 1024 * 1024); // 64 MB
3024
3025        let db = GrafeoDB::with_config(config).unwrap();
3026        assert_eq!(db.config().memory_limit, Some(64 * 1024 * 1024));
3027        assert_eq!(db.node_count(), 0);
3028    }
3029
3030    #[cfg(feature = "metrics")]
3031    #[test]
3032    fn test_database_metrics_registry() {
3033        let db = GrafeoDB::new_in_memory();
3034
3035        // Perform some operations
3036        db.create_node(&["Person"]);
3037        db.create_node(&["Person"]);
3038
3039        // Check that metrics snapshot returns data
3040        let snap = db.metrics();
3041        // Session created counter should reflect at least 0 (metrics is initialized)
3042        assert_eq!(snap.query_count, 0); // No queries executed yet
3043    }
3044
3045    #[test]
3046    fn test_query_result_has_metrics() {
3047        // Verifies that query results include execution metrics
3048        let db = GrafeoDB::new_in_memory();
3049        db.create_node(&["Person"]);
3050        db.create_node(&["Person"]);
3051
3052        #[cfg(feature = "gql")]
3053        {
3054            let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
3055
3056            // Metrics should be populated
3057            assert!(result.execution_time_ms.is_some());
3058            assert!(result.rows_scanned.is_some());
3059            assert!(result.execution_time_ms.unwrap() >= 0.0);
3060            assert_eq!(result.rows_scanned.unwrap(), 2);
3061        }
3062    }
3063
3064    #[test]
3065    fn test_empty_query_result_metrics() {
3066        // Verifies metrics are correct for queries returning no results
3067        let db = GrafeoDB::new_in_memory();
3068        db.create_node(&["Person"]);
3069
3070        #[cfg(feature = "gql")]
3071        {
3072            // Query that matches nothing
3073            let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
3074
3075            assert!(result.execution_time_ms.is_some());
3076            assert!(result.rows_scanned.is_some());
3077            assert_eq!(result.rows_scanned.unwrap(), 0);
3078        }
3079    }
3080
3081    #[cfg(feature = "cdc")]
3082    mod cdc_integration {
3083        use super::*;
3084
3085        /// Helper: creates an in-memory database with CDC enabled.
3086        fn cdc_db() -> GrafeoDB {
3087            GrafeoDB::with_config(Config::in_memory().with_cdc()).unwrap()
3088        }
3089
3090        #[test]
3091        fn test_node_lifecycle_history() {
3092            let db = cdc_db();
3093
3094            // Create
3095            let id = db.create_node(&["Person"]);
3096            // Update
3097            db.set_node_property(id, "name", "Alix".into());
3098            db.set_node_property(id, "name", "Gus".into());
3099            // Delete
3100            db.delete_node(id);
3101
3102            let history = db.history(id).unwrap();
3103            assert_eq!(history.len(), 4); // create + 2 updates + delete
3104            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3105            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
3106            assert!(history[1].before.is_none()); // first set_node_property has no prior value
3107            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
3108            assert!(history[2].before.is_some()); // second update has prior "Alix"
3109            assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
3110        }
3111
3112        #[test]
3113        fn test_edge_lifecycle_history() {
3114            let db = cdc_db();
3115
3116            let alix = db.create_node(&["Person"]);
3117            let gus = db.create_node(&["Person"]);
3118            let edge = db.create_edge(alix, gus, "KNOWS");
3119            db.set_edge_property(edge, "since", 2024i64.into());
3120            db.delete_edge(edge);
3121
3122            let history = db.history(edge).unwrap();
3123            assert_eq!(history.len(), 3); // create + update + delete
3124            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3125            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
3126            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
3127        }
3128
3129        #[test]
3130        fn test_create_node_with_props_cdc() {
3131            let db = cdc_db();
3132
3133            let id = db.create_node_with_props(
3134                &["Person"],
3135                vec![
3136                    ("name", grafeo_common::types::Value::from("Alix")),
3137                    ("age", grafeo_common::types::Value::from(30i64)),
3138                ],
3139            );
3140
3141            let history = db.history(id).unwrap();
3142            assert_eq!(history.len(), 1);
3143            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3144            // Props should be captured
3145            let after = history[0].after.as_ref().unwrap();
3146            assert_eq!(after.len(), 2);
3147        }
3148
3149        #[test]
3150        fn test_changes_between() {
3151            let db = cdc_db();
3152
3153            let id1 = db.create_node(&["A"]);
3154            let _id2 = db.create_node(&["B"]);
3155            db.set_node_property(id1, "x", 1i64.into());
3156
3157            // All events should be at the same epoch (in-memory, epoch doesn't advance without tx)
3158            let changes = db
3159                .changes_between(
3160                    grafeo_common::types::EpochId(0),
3161                    grafeo_common::types::EpochId(u64::MAX),
3162                )
3163                .unwrap();
3164            assert_eq!(changes.len(), 3); // 2 creates + 1 update
3165        }
3166
3167        #[test]
3168        fn test_cdc_disabled_by_default() {
3169            let db = GrafeoDB::new_in_memory();
3170            assert!(!db.is_cdc_enabled());
3171
3172            let id = db.create_node(&["Person"]);
3173            db.set_node_property(id, "name", "Alix".into());
3174
3175            let history = db.history(id).unwrap();
3176            assert!(history.is_empty(), "CDC off by default: no events recorded");
3177        }
3178
3179        #[test]
3180        fn test_session_with_cdc_override_on() {
3181            // Database default is off, but session opts in
3182            let db = GrafeoDB::new_in_memory();
3183            let session = db.session_with_cdc(true);
3184            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3185            // The CDC log should have events from the opted-in session
3186            let changes = db
3187                .changes_between(
3188                    grafeo_common::types::EpochId(0),
3189                    grafeo_common::types::EpochId(u64::MAX),
3190                )
3191                .unwrap();
3192            assert!(
3193                !changes.is_empty(),
3194                "session_with_cdc(true) should record events"
3195            );
3196        }
3197
3198        #[test]
3199        fn test_session_with_cdc_override_off() {
3200            // Database default is on, but session opts out
3201            let db = cdc_db();
3202            let session = db.session_with_cdc(false);
3203            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3204            let changes = db
3205                .changes_between(
3206                    grafeo_common::types::EpochId(0),
3207                    grafeo_common::types::EpochId(u64::MAX),
3208                )
3209                .unwrap();
3210            assert!(
3211                changes.is_empty(),
3212                "session_with_cdc(false) should not record events"
3213            );
3214        }
3215
3216        #[test]
3217        fn test_set_cdc_enabled_runtime() {
3218            let db = GrafeoDB::new_in_memory();
3219            assert!(!db.is_cdc_enabled());
3220
3221            // Enable at runtime
3222            db.set_cdc_enabled(true);
3223            assert!(db.is_cdc_enabled());
3224
3225            let id = db.create_node(&["Person"]);
3226            let history = db.history(id).unwrap();
3227            assert_eq!(history.len(), 1, "CDC enabled at runtime records events");
3228
3229            // Disable again
3230            db.set_cdc_enabled(false);
3231            let id2 = db.create_node(&["Person"]);
3232            let history2 = db.history(id2).unwrap();
3233            assert!(
3234                history2.is_empty(),
3235                "CDC disabled at runtime stops recording"
3236            );
3237        }
3238    }
3239
3240    #[test]
3241    fn test_with_store_basic() {
3242        use grafeo_core::graph::lpg::LpgStore;
3243
3244        let store = Arc::new(LpgStore::new().unwrap());
3245        let n1 = store.create_node(&["Person"]);
3246        store.set_node_property(n1, "name", "Alix".into());
3247
3248        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
3249        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
3250
3251        let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
3252        assert_eq!(result.rows.len(), 1);
3253    }
3254
3255    #[test]
3256    fn test_with_store_session() {
3257        use grafeo_core::graph::lpg::LpgStore;
3258
3259        let store = Arc::new(LpgStore::new().unwrap());
3260        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
3261        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
3262
3263        let session = db.session();
3264        let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
3265        assert_eq!(result.rows.len(), 1);
3266    }
3267
3268    #[test]
3269    fn test_with_store_mutations() {
3270        use grafeo_core::graph::lpg::LpgStore;
3271
3272        let store = Arc::new(LpgStore::new().unwrap());
3273        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
3274        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
3275
3276        let mut session = db.session();
3277
3278        // Use an explicit transaction so INSERT and MATCH share the same
3279        // transaction context. With PENDING epochs, uncommitted versions are
3280        // only visible to the owning transaction.
3281        session.begin_transaction().unwrap();
3282        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3283
3284        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
3285        assert_eq!(result.rows.len(), 1);
3286
3287        session.commit().unwrap();
3288    }
3289
3290    // =========================================================================
3291    // QueryResult tests
3292    // =========================================================================
3293
3294    #[test]
3295    fn test_query_result_empty() {
3296        let result = QueryResult::empty();
3297        assert!(result.is_empty());
3298        assert_eq!(result.row_count(), 0);
3299        assert_eq!(result.column_count(), 0);
3300        assert!(result.execution_time_ms().is_none());
3301        assert!(result.rows_scanned().is_none());
3302        assert!(result.status_message.is_none());
3303    }
3304
3305    #[test]
3306    fn test_query_result_status() {
3307        let result = QueryResult::status("Created node type 'Person'");
3308        assert!(result.is_empty());
3309        assert_eq!(result.column_count(), 0);
3310        assert_eq!(
3311            result.status_message.as_deref(),
3312            Some("Created node type 'Person'")
3313        );
3314    }
3315
3316    #[test]
3317    fn test_query_result_new_with_columns() {
3318        let result = QueryResult::new(vec!["name".into(), "age".into()]);
3319        assert_eq!(result.column_count(), 2);
3320        assert_eq!(result.row_count(), 0);
3321        assert!(result.is_empty());
3322        // Column types should default to Any
3323        assert_eq!(
3324            result.column_types,
3325            vec![
3326                grafeo_common::types::LogicalType::Any,
3327                grafeo_common::types::LogicalType::Any
3328            ]
3329        );
3330    }
3331
3332    #[test]
3333    fn test_query_result_with_types() {
3334        use grafeo_common::types::LogicalType;
3335        let result = QueryResult::with_types(
3336            vec!["name".into(), "age".into()],
3337            vec![LogicalType::String, LogicalType::Int64],
3338        );
3339        assert_eq!(result.column_count(), 2);
3340        assert_eq!(result.column_types[0], LogicalType::String);
3341        assert_eq!(result.column_types[1], LogicalType::Int64);
3342    }
3343
3344    #[test]
3345    fn test_query_result_with_metrics() {
3346        let result = QueryResult::new(vec!["x".into()]).with_metrics(42.5, 100);
3347        assert_eq!(result.execution_time_ms(), Some(42.5));
3348        assert_eq!(result.rows_scanned(), Some(100));
3349    }
3350
3351    #[test]
3352    fn test_query_result_scalar_success() {
3353        use grafeo_common::types::Value;
3354        let mut result = QueryResult::new(vec!["count".into()]);
3355        result.rows.push(vec![Value::Int64(42)]);
3356
3357        let val: i64 = result.scalar().unwrap();
3358        assert_eq!(val, 42);
3359    }
3360
3361    #[test]
3362    fn test_query_result_scalar_wrong_shape() {
3363        use grafeo_common::types::Value;
3364        // Multiple rows
3365        let mut result = QueryResult::new(vec!["x".into()]);
3366        result.rows.push(vec![Value::Int64(1)]);
3367        result.rows.push(vec![Value::Int64(2)]);
3368        assert!(result.scalar::<i64>().is_err());
3369
3370        // Multiple columns
3371        let mut result2 = QueryResult::new(vec!["a".into(), "b".into()]);
3372        result2.rows.push(vec![Value::Int64(1), Value::Int64(2)]);
3373        assert!(result2.scalar::<i64>().is_err());
3374
3375        // Empty
3376        let result3 = QueryResult::new(vec!["x".into()]);
3377        assert!(result3.scalar::<i64>().is_err());
3378    }
3379
3380    #[test]
3381    fn test_query_result_iter() {
3382        use grafeo_common::types::Value;
3383        let mut result = QueryResult::new(vec!["x".into()]);
3384        result.rows.push(vec![Value::Int64(1)]);
3385        result.rows.push(vec![Value::Int64(2)]);
3386
3387        let collected: Vec<_> = result.iter().collect();
3388        assert_eq!(collected.len(), 2);
3389    }
3390
3391    #[test]
3392    fn test_query_result_display() {
3393        use grafeo_common::types::Value;
3394        let mut result = QueryResult::new(vec!["name".into()]);
3395        result.rows.push(vec![Value::from("Alix")]);
3396        let display = result.to_string();
3397        assert!(display.contains("name"));
3398        assert!(display.contains("Alix"));
3399    }
3400
3401    // =========================================================================
3402    // FromValue error paths
3403    // =========================================================================
3404
3405    #[test]
3406    fn test_from_value_i64_type_mismatch() {
3407        use grafeo_common::types::Value;
3408        let val = Value::from("not a number");
3409        assert!(i64::from_value(&val).is_err());
3410    }
3411
3412    #[test]
3413    fn test_from_value_f64_type_mismatch() {
3414        use grafeo_common::types::Value;
3415        let val = Value::from("not a float");
3416        assert!(f64::from_value(&val).is_err());
3417    }
3418
3419    #[test]
3420    fn test_from_value_string_type_mismatch() {
3421        use grafeo_common::types::Value;
3422        let val = Value::Int64(42);
3423        assert!(String::from_value(&val).is_err());
3424    }
3425
3426    #[test]
3427    fn test_from_value_bool_type_mismatch() {
3428        use grafeo_common::types::Value;
3429        let val = Value::Int64(1);
3430        assert!(bool::from_value(&val).is_err());
3431    }
3432
3433    #[test]
3434    fn test_from_value_all_success() {
3435        use grafeo_common::types::Value;
3436        assert_eq!(i64::from_value(&Value::Int64(99)).unwrap(), 99);
3437        assert!((f64::from_value(&Value::Float64(2.72)).unwrap() - 2.72).abs() < f64::EPSILON);
3438        assert_eq!(String::from_value(&Value::from("hello")).unwrap(), "hello");
3439        assert!(bool::from_value(&Value::Bool(true)).unwrap());
3440    }
3441
3442    // =========================================================================
3443    // GrafeoDB accessor tests
3444    // =========================================================================
3445
3446    #[test]
3447    fn test_database_is_read_only_false_by_default() {
3448        let db = GrafeoDB::new_in_memory();
3449        assert!(!db.is_read_only());
3450    }
3451
3452    #[test]
3453    fn test_database_graph_model() {
3454        let db = GrafeoDB::new_in_memory();
3455        assert_eq!(db.graph_model(), crate::config::GraphModel::Lpg);
3456    }
3457
3458    #[test]
3459    fn test_database_memory_limit_none_by_default() {
3460        let db = GrafeoDB::new_in_memory();
3461        assert!(db.memory_limit().is_none());
3462    }
3463
3464    #[test]
3465    fn test_database_memory_limit_custom() {
3466        let config = Config::in_memory().with_memory_limit(128 * 1024 * 1024);
3467        let db = GrafeoDB::with_config(config).unwrap();
3468        assert_eq!(db.memory_limit(), Some(128 * 1024 * 1024));
3469    }
3470
3471    #[test]
3472    fn test_database_adaptive_config() {
3473        let db = GrafeoDB::new_in_memory();
3474        let adaptive = db.adaptive_config();
3475        assert!(adaptive.enabled);
3476        assert!((adaptive.threshold - 3.0).abs() < f64::EPSILON);
3477    }
3478
3479    #[test]
3480    fn test_database_buffer_manager() {
3481        let db = GrafeoDB::new_in_memory();
3482        let _bm = db.buffer_manager();
3483        // Just verify it doesn't panic
3484    }
3485
3486    #[test]
3487    fn test_database_query_cache() {
3488        let db = GrafeoDB::new_in_memory();
3489        let _qc = db.query_cache();
3490    }
3491
3492    #[test]
3493    fn test_database_clear_plan_cache() {
3494        let db = GrafeoDB::new_in_memory();
3495        // Execute a query to populate the cache
3496        #[cfg(feature = "gql")]
3497        {
3498            let _ = db.execute("MATCH (n) RETURN count(n)");
3499        }
3500        db.clear_plan_cache();
3501        // No panic means success
3502    }
3503
3504    #[test]
3505    fn test_database_gc() {
3506        let db = GrafeoDB::new_in_memory();
3507        db.create_node(&["Person"]);
3508        db.gc();
3509        // Verify no panic, node still accessible
3510        assert_eq!(db.node_count(), 1);
3511    }
3512
3513    // =========================================================================
3514    // Named graph management
3515    // =========================================================================
3516
3517    #[test]
3518    fn test_create_and_list_graphs() {
3519        let db = GrafeoDB::new_in_memory();
3520        let created = db.create_graph("social").unwrap();
3521        assert!(created);
3522
3523        // Creating same graph again returns false
3524        let created_again = db.create_graph("social").unwrap();
3525        assert!(!created_again);
3526
3527        let names = db.list_graphs();
3528        assert!(names.contains(&"social".to_string()));
3529    }
3530
3531    #[test]
3532    fn test_drop_graph() {
3533        let db = GrafeoDB::new_in_memory();
3534        db.create_graph("temp").unwrap();
3535        assert!(db.drop_graph("temp"));
3536        assert!(!db.drop_graph("temp")); // Already dropped
3537    }
3538
3539    #[test]
3540    fn test_drop_graph_resets_current_graph() {
3541        let db = GrafeoDB::new_in_memory();
3542        db.create_graph("active").unwrap();
3543        db.set_current_graph(Some("active")).unwrap();
3544        assert_eq!(db.current_graph(), Some("active".to_string()));
3545
3546        db.drop_graph("active");
3547        assert_eq!(db.current_graph(), None);
3548    }
3549
3550    // =========================================================================
3551    // Current graph / schema context
3552    // =========================================================================
3553
3554    #[test]
3555    fn test_current_graph_default_none() {
3556        let db = GrafeoDB::new_in_memory();
3557        assert_eq!(db.current_graph(), None);
3558    }
3559
3560    #[test]
3561    fn test_set_current_graph_valid() {
3562        let db = GrafeoDB::new_in_memory();
3563        db.create_graph("social").unwrap();
3564        db.set_current_graph(Some("social")).unwrap();
3565        assert_eq!(db.current_graph(), Some("social".to_string()));
3566    }
3567
3568    #[test]
3569    fn test_set_current_graph_nonexistent() {
3570        let db = GrafeoDB::new_in_memory();
3571        let result = db.set_current_graph(Some("nonexistent"));
3572        assert!(result.is_err());
3573    }
3574
3575    #[test]
3576    fn test_set_current_graph_none_resets() {
3577        let db = GrafeoDB::new_in_memory();
3578        db.create_graph("social").unwrap();
3579        db.set_current_graph(Some("social")).unwrap();
3580        db.set_current_graph(None).unwrap();
3581        assert_eq!(db.current_graph(), None);
3582    }
3583
3584    #[test]
3585    fn test_set_current_graph_default_keyword() {
3586        let db = GrafeoDB::new_in_memory();
3587        // "default" is a special case that always succeeds
3588        db.set_current_graph(Some("default")).unwrap();
3589        assert_eq!(db.current_graph(), Some("default".to_string()));
3590    }
3591
3592    #[test]
3593    fn test_current_schema_default_none() {
3594        let db = GrafeoDB::new_in_memory();
3595        assert_eq!(db.current_schema(), None);
3596    }
3597
3598    #[test]
3599    fn test_set_current_schema_nonexistent() {
3600        let db = GrafeoDB::new_in_memory();
3601        let result = db.set_current_schema(Some("nonexistent"));
3602        assert!(result.is_err());
3603    }
3604
3605    #[test]
3606    fn test_set_current_schema_none_resets() {
3607        let db = GrafeoDB::new_in_memory();
3608        db.set_current_schema(None).unwrap();
3609        assert_eq!(db.current_schema(), None);
3610    }
3611
3612    // =========================================================================
3613    // graph_store / graph_store_mut
3614    // =========================================================================
3615
3616    #[test]
3617    fn test_graph_store_returns_lpg_by_default() {
3618        let db = GrafeoDB::new_in_memory();
3619        db.create_node(&["Person"]);
3620        let store = db.graph_store();
3621        assert_eq!(store.node_count(), 1);
3622    }
3623
3624    #[test]
3625    fn test_graph_store_mut_returns_some_by_default() {
3626        let db = GrafeoDB::new_in_memory();
3627        assert!(db.graph_store_mut().is_some());
3628    }
3629
3630    #[test]
3631    fn test_with_read_store() {
3632        use grafeo_core::graph::lpg::LpgStore;
3633
3634        let store = Arc::new(LpgStore::new().unwrap());
3635        store.create_node(&["Person"]);
3636
3637        let read_store = Arc::clone(&store) as Arc<dyn GraphStore>;
3638        let db = GrafeoDB::with_read_store(read_store, Config::in_memory()).unwrap();
3639
3640        assert!(db.is_read_only());
3641        assert!(db.graph_store_mut().is_none());
3642
3643        // Read queries should work
3644        let gs = db.graph_store();
3645        assert_eq!(gs.node_count(), 1);
3646    }
3647
3648    #[test]
3649    fn test_with_store_graph_store_methods() {
3650        use grafeo_core::graph::lpg::LpgStore;
3651
3652        let store = Arc::new(LpgStore::new().unwrap());
3653        store.create_node(&["Person"]);
3654
3655        let db = GrafeoDB::with_store(
3656            Arc::clone(&store) as Arc<dyn GraphStoreMut>,
3657            Config::in_memory(),
3658        )
3659        .unwrap();
3660
3661        assert!(!db.is_read_only());
3662        assert!(db.graph_store_mut().is_some());
3663        assert_eq!(db.graph_store().node_count(), 1);
3664    }
3665
3666    // =========================================================================
3667    // session_read_only
3668    // =========================================================================
3669
3670    #[test]
3671    #[allow(deprecated)]
3672    fn test_session_read_only() {
3673        let db = GrafeoDB::new_in_memory();
3674        db.create_node(&["Person"]);
3675
3676        let session = db.session_read_only();
3677        // Read queries should work
3678        #[cfg(feature = "gql")]
3679        {
3680            let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
3681            assert_eq!(result.rows.len(), 1);
3682        }
3683    }
3684
3685    // =========================================================================
3686    // close on in-memory database
3687    // =========================================================================
3688
3689    #[test]
3690    fn test_close_in_memory_database() {
3691        let db = GrafeoDB::new_in_memory();
3692        db.create_node(&["Person"]);
3693        assert!(db.close().is_ok());
3694        // Second close should also be fine (idempotent)
3695        assert!(db.close().is_ok());
3696    }
3697
3698    // =========================================================================
3699    // with_config validation failure
3700    // =========================================================================
3701
3702    #[test]
3703    fn test_with_config_invalid_config_zero_threads() {
3704        let config = Config::in_memory().with_threads(0);
3705        let result = GrafeoDB::with_config(config);
3706        assert!(result.is_err());
3707    }
3708
3709    #[test]
3710    fn test_with_config_invalid_config_zero_memory_limit() {
3711        let config = Config::in_memory().with_memory_limit(0);
3712        let result = GrafeoDB::with_config(config);
3713        assert!(result.is_err());
3714    }
3715
3716    // =========================================================================
3717    // StorageFormat display (for config.rs coverage)
3718    // =========================================================================
3719
3720    #[test]
3721    fn test_storage_format_display() {
3722        use crate::config::StorageFormat;
3723        assert_eq!(StorageFormat::Auto.to_string(), "auto");
3724        assert_eq!(StorageFormat::WalDirectory.to_string(), "wal-directory");
3725        assert_eq!(StorageFormat::SingleFile.to_string(), "single-file");
3726    }
3727
3728    #[test]
3729    fn test_storage_format_default() {
3730        use crate::config::StorageFormat;
3731        assert_eq!(StorageFormat::default(), StorageFormat::Auto);
3732    }
3733
3734    #[test]
3735    fn test_config_with_storage_format() {
3736        use crate::config::StorageFormat;
3737        let config = Config::in_memory().with_storage_format(StorageFormat::SingleFile);
3738        assert_eq!(config.storage_format, StorageFormat::SingleFile);
3739    }
3740
3741    // =========================================================================
3742    // Config CDC
3743    // =========================================================================
3744
3745    #[test]
3746    fn test_config_with_cdc() {
3747        let config = Config::in_memory().with_cdc();
3748        assert!(config.cdc_enabled);
3749    }
3750
3751    #[test]
3752    fn test_config_cdc_default_false() {
3753        let config = Config::in_memory();
3754        assert!(!config.cdc_enabled);
3755    }
3756
3757    // =========================================================================
3758    // ConfigError as std::error::Error
3759    // =========================================================================
3760
3761    #[test]
3762    fn test_config_error_is_error_trait() {
3763        use crate::config::ConfigError;
3764        let err: Box<dyn std::error::Error> = Box::new(ConfigError::ZeroMemoryLimit);
3765        assert!(err.source().is_none());
3766    }
3767
3768    // =========================================================================
3769    // Metrics tests
3770    // =========================================================================
3771
3772    #[cfg(feature = "metrics")]
3773    #[test]
3774    fn test_metrics_prometheus_output() {
3775        let db = GrafeoDB::new_in_memory();
3776        let prom = db.metrics_prometheus();
3777        // Should contain at least some metric names
3778        assert!(!prom.is_empty());
3779    }
3780
3781    #[cfg(feature = "metrics")]
3782    #[test]
3783    fn test_reset_metrics() {
3784        let db = GrafeoDB::new_in_memory();
3785        // Execute something to generate metrics
3786        let _session = db.session();
3787        db.reset_metrics();
3788        let snap = db.metrics();
3789        assert_eq!(snap.query_count, 0);
3790    }
3791
3792    // =========================================================================
3793    // drop_graph on external store
3794    // =========================================================================
3795
3796    #[test]
3797    fn test_drop_graph_on_external_store() {
3798        use grafeo_core::graph::lpg::LpgStore;
3799
3800        let store = Arc::new(LpgStore::new().unwrap());
3801        let read_store = Arc::clone(&store) as Arc<dyn GraphStore>;
3802        let db = GrafeoDB::with_read_store(read_store, Config::in_memory()).unwrap();
3803
3804        // drop_graph with external store (no built-in store) returns false
3805        assert!(!db.drop_graph("anything"));
3806    }
3807}