Skip to main content

grafeo_engine/database/
mod.rs

1//! The main database struct and operations.
2//!
3//! Start here with [`GrafeoDB`] - it's your handle to everything.
4//!
5//! Operations are split across focused submodules:
6//! - `query` - Query execution (execute, execute_cypher, etc.)
7//! - `crud` - Node/edge CRUD operations
8//! - `index` - Property, vector, and text index management
9//! - `search` - Vector, text, and hybrid search
10//! - `embed` - Embedding model management
11//! - `persistence` - Save, load, snapshots, iteration
12//! - `admin` - Stats, introspection, diagnostics, CDC
13
14#[cfg(feature = "lpg")]
15mod admin;
16#[cfg(feature = "arrow-export")]
17pub mod arrow;
18#[cfg(all(feature = "async-storage", feature = "lpg"))]
19mod async_ops;
20#[cfg(all(feature = "async-storage", feature = "lpg"))]
21pub(crate) mod async_wal_store;
22#[cfg(all(feature = "wal", feature = "grafeo-file"))]
23pub mod backup;
24#[cfg(feature = "lpg")]
25pub(crate) mod catalog_section;
26#[cfg(feature = "cdc")]
27pub(crate) mod cdc_store;
28#[cfg(all(feature = "grafeo-file", feature = "lpg"))]
29mod checkpoint_timer;
30#[cfg(feature = "lpg")]
31mod crud;
32#[cfg(feature = "embed")]
33mod embed;
34#[cfg(feature = "grafeo-file")]
35pub(crate) mod flush;
36#[cfg(feature = "lpg")]
37mod import;
38#[cfg(feature = "lpg")]
39mod index;
40#[cfg(feature = "lpg")]
41mod persistence;
42mod query;
43#[cfg(feature = "triple-store")]
44mod rdf_ops;
45#[cfg(feature = "lpg")]
46mod search;
47pub(crate) mod section_consumer;
48#[cfg(all(feature = "wal", feature = "lpg"))]
49pub(crate) mod wal_store;
50
51use grafeo_common::{grafeo_error, grafeo_warn};
52#[cfg(feature = "wal")]
53use std::path::Path;
54use std::sync::Arc;
55use std::sync::atomic::AtomicUsize;
56
57use parking_lot::RwLock;
58
59use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
60use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind, Result};
61#[cfg(feature = "lpg")]
62use grafeo_core::graph::lpg::LpgStore;
63#[cfg(feature = "triple-store")]
64use grafeo_core::graph::rdf::RdfStore;
65use grafeo_core::graph::{GraphStoreMut, GraphStoreSearch};
66#[cfg(feature = "grafeo-file")]
67use grafeo_storage::file::GrafeoFileManager;
68#[cfg(all(feature = "wal", feature = "lpg"))]
69use grafeo_storage::wal::WalRecovery;
70#[cfg(feature = "wal")]
71use grafeo_storage::wal::{DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord};
72
73use crate::catalog::Catalog;
74use crate::config::Config;
75use crate::query::cache::QueryCache;
76use crate::session::Session;
77use crate::transaction::TransactionManager;
78
79/// Your handle to a Grafeo database.
80///
81/// Start here. Create one with [`new_in_memory()`](Self::new_in_memory) for
82/// quick experiments, or [`open()`](Self::open) for persistent storage.
83/// Then grab a [`session()`](Self::session) to start querying.
84///
85/// # Examples
86///
87/// ```
88/// use grafeo_engine::GrafeoDB;
89///
90/// // Quick in-memory database
91/// let db = GrafeoDB::new_in_memory();
92///
93/// // Add some data
94/// db.create_node(&["Person"]);
95///
96/// // Query it
97/// let session = db.session();
98/// let result = session.execute("MATCH (p:Person) RETURN p")?;
99/// # Ok::<(), grafeo_common::utils::error::Error>(())
100/// ```
101pub struct GrafeoDB {
102    /// Database configuration.
103    pub(super) config: Config,
104    /// The underlying graph store (None when using an external store).
105    #[cfg(feature = "lpg")]
106    pub(super) store: Option<Arc<LpgStore>>,
107    /// Schema and metadata catalog shared across sessions.
108    pub(super) catalog: Arc<Catalog>,
109    /// RDF triple store (if RDF feature is enabled).
110    #[cfg(feature = "triple-store")]
111    pub(super) rdf_store: Arc<RdfStore>,
112    /// Transaction manager.
113    pub(super) transaction_manager: Arc<TransactionManager>,
114    /// Unified buffer manager.
115    pub(super) buffer_manager: Arc<BufferManager>,
116    /// Write-ahead log manager (if durability is enabled).
117    #[cfg(feature = "wal")]
118    pub(super) wal: Option<Arc<LpgWal>>,
119    /// Shared WAL graph context tracker. Tracks which named graph was last
120    /// written to the WAL, so concurrent sessions can emit `SwitchGraph`
121    /// records only when the context actually changes.
122    #[cfg(feature = "wal")]
123    pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
124    /// Query cache for parsed and optimized plans.
125    pub(super) query_cache: Arc<QueryCache>,
126    /// Shared commit counter for auto-GC across sessions.
127    pub(super) commit_counter: Arc<AtomicUsize>,
128    /// Whether the database is open.
129    pub(super) is_open: RwLock<bool>,
130    /// Change data capture log for tracking mutations.
131    #[cfg(feature = "cdc")]
132    pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
133    /// Whether CDC is active for new sessions and direct CRUD (runtime-mutable).
134    #[cfg(feature = "cdc")]
135    cdc_enabled: std::sync::atomic::AtomicBool,
136    /// Registered embedding models for text-to-vector conversion.
137    #[cfg(feature = "embed")]
138    pub(super) embedding_models:
139        RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
140    /// Single-file database manager (when using `.grafeo` format).
141    #[cfg(feature = "grafeo-file")]
142    pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
143    /// Periodic checkpoint timer (when `checkpoint_interval` is configured).
144    /// Wrapped in Mutex because `close()` takes `&self` but needs to stop the timer.
145    #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
146    checkpoint_timer: parking_lot::Mutex<Option<checkpoint_timer::CheckpointTimer>>,
147    /// Shared registry of spilled vector storages.
148    /// Used by the search path to create `SpillableVectorAccessor` instances.
149    #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
150    vector_spill_storages: Option<
151        Arc<
152            parking_lot::RwLock<
153                std::collections::HashMap<String, Arc<grafeo_core::index::vector::MmapStorage>>,
154            >,
155        >,
156    >,
157    /// External read-only graph store (when using with_store() or with_read_store()).
158    /// When set, sessions route queries through this store instead of the built-in LpgStore.
159    pub(super) external_read_store: Option<Arc<dyn GraphStoreSearch>>,
160    /// External writable graph store (when using with_store()).
161    /// None for read-only databases created via with_read_store().
162    pub(super) external_write_store: Option<Arc<dyn GraphStoreMut>>,
163    /// Metrics registry shared across all sessions.
164    #[cfg(feature = "metrics")]
165    pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
166    /// Persistent graph context for one-shot `execute()` calls.
167    /// When set, each call to `session()` pre-configures the session to this graph.
168    /// Updated after every one-shot `execute()` to reflect `USE GRAPH` / `SESSION RESET`.
169    current_graph: RwLock<Option<String>>,
170    /// Persistent schema context for one-shot `execute()` calls.
171    /// When set, each call to `session()` pre-configures the session to this schema.
172    /// Updated after every one-shot `execute()` to reflect `SESSION SET SCHEMA` / `SESSION RESET`.
173    current_schema: RwLock<Option<String>>,
174    /// Whether this database is open in read-only mode.
175    /// When true, sessions automatically enforce read-only transactions.
176    read_only: bool,
177    /// Named graph projections (virtual subgraphs), shared with sessions.
178    projections:
179        Arc<RwLock<std::collections::HashMap<String, Arc<grafeo_core::graph::GraphProjection>>>>,
180    /// Layered store (compact base + mutable overlay), set after `compact()`.
181    #[cfg(all(feature = "compact-store", feature = "lpg"))]
182    layered_store: Option<Arc<grafeo_core::graph::compact::layered::LayeredStore>>,
183}
184
185impl GrafeoDB {
186    /// Returns a reference to the built-in LPG store.
187    ///
188    /// # Panics
189    ///
190    /// Panics if the database was created with [`with_store()`](Self::with_store) or
191    /// [`with_read_store()`](Self::with_read_store), which use an external store
192    /// instead of the built-in LPG store.
193    #[cfg(feature = "lpg")]
194    fn lpg_store(&self) -> &Arc<LpgStore> {
195        self.store.as_ref().expect(
196            "no built-in LpgStore: this GrafeoDB was created with an external store \
197             (with_store / with_read_store). Use session() or graph_store() instead.",
198        )
199    }
200
201    /// Returns a borrowed reference to the active graph store.
202    ///
203    /// In layered mode (after [`compact()`](Self::compact)), returns the
204    /// `LayeredStore` which merges the columnar base with the overlay.
205    /// Otherwise, returns the built-in `LpgStore`.
206    ///
207    /// Unlike [`graph_store()`](Self::graph_store) (which clones an `Arc`),
208    /// this borrows from `self` — suitable for constructing accessors that
209    /// need `&'a dyn GraphStore` tied to the database lifetime.
210    #[cfg(any(
211        feature = "vector-index",
212        feature = "text-index",
213        feature = "hybrid-search",
214        feature = "embed",
215    ))]
216    fn graph_store_ref(&self) -> &dyn grafeo_core::graph::GraphStore {
217        if let Some(ref ext_read) = self.external_read_store {
218            ext_read.as_ref()
219        } else {
220            #[cfg(feature = "lpg")]
221            {
222                &**self.lpg_store()
223            }
224            #[cfg(not(feature = "lpg"))]
225            unreachable!("no graph store available: enable the `lpg` feature or use with_store()")
226        }
227    }
228
229    /// Returns whether CDC is active (runtime check).
230    #[cfg(feature = "cdc")]
231    #[inline]
232    pub(super) fn cdc_active(&self) -> bool {
233        self.cdc_enabled.load(std::sync::atomic::Ordering::Relaxed)
234    }
235
236    /// Creates an in-memory database, fast to create, gone when dropped.
237    ///
238    /// Use this for tests, experiments, or when you don't need persistence.
239    /// For data that survives restarts, use [`open()`](Self::open) instead.
240    ///
241    /// # Panics
242    ///
243    /// Panics if the internal arena allocator cannot be initialized (out of memory).
244    /// Use [`with_config()`](Self::with_config) for a fallible alternative.
245    ///
246    /// # Examples
247    ///
248    /// ```
249    /// use grafeo_engine::GrafeoDB;
250    ///
251    /// let db = GrafeoDB::new_in_memory();
252    /// let session = db.session();
253    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
254    /// # Ok::<(), grafeo_common::utils::error::Error>(())
255    /// ```
256    #[must_use]
257    pub fn new_in_memory() -> Self {
258        Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
259    }
260
261    /// Opens a database at the given path, creating it if it doesn't exist.
262    ///
263    /// If you've used this path before, Grafeo recovers your data from the
264    /// write-ahead log automatically. First open on a new path creates an
265    /// empty database.
266    ///
267    /// # Errors
268    ///
269    /// Returns an error if the path isn't writable or recovery fails.
270    ///
271    /// # Examples
272    ///
273    /// ```no_run
274    /// use grafeo_engine::GrafeoDB;
275    ///
276    /// let db = GrafeoDB::open("./my_social_network")?;
277    /// # Ok::<(), grafeo_common::utils::error::Error>(())
278    /// ```
279    #[cfg(feature = "wal")]
280    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
281        Self::with_config(Config::persistent(path.as_ref()))
282    }
283
284    /// Opens an existing database in read-only mode.
285    ///
286    /// Uses a shared file lock, so multiple processes can read the same
287    /// `.grafeo` file concurrently. The database loads the last checkpoint
288    /// snapshot but does **not** replay the WAL or allow mutations.
289    ///
290    /// Currently only supports the single-file (`.grafeo`) format.
291    ///
292    /// # Errors
293    ///
294    /// Returns an error if the file doesn't exist or can't be read.
295    ///
296    /// # Examples
297    ///
298    /// ```no_run
299    /// use grafeo_engine::GrafeoDB;
300    ///
301    /// let db = GrafeoDB::open_read_only("./my_graph.grafeo")?;
302    /// let session = db.session();
303    /// let result = session.execute("MATCH (n) RETURN n LIMIT 10")?;
304    /// // Mutations will return an error:
305    /// // session.execute("INSERT (:Person)") => Err(ReadOnly)
306    /// # Ok::<(), grafeo_common::utils::error::Error>(())
307    /// ```
308    #[cfg(feature = "grafeo-file")]
309    pub fn open_read_only(path: impl AsRef<std::path::Path>) -> Result<Self> {
310        Self::with_config(Config::read_only(path.as_ref()))
311    }
312
313    /// Creates a database with custom configuration.
314    ///
315    /// Use this when you need fine-grained control over memory limits,
316    /// thread counts, or persistence settings. For most cases,
317    /// [`new_in_memory()`](Self::new_in_memory) or [`open()`](Self::open)
318    /// are simpler.
319    ///
320    /// # Errors
321    ///
322    /// Returns an error if the database can't be created or recovery fails.
323    ///
324    /// # Examples
325    ///
326    /// ```
327    /// use grafeo_engine::{GrafeoDB, Config};
328    ///
329    /// // In-memory with a 512MB limit
330    /// let config = Config::in_memory()
331    ///     .with_memory_limit(512 * 1024 * 1024);
332    ///
333    /// let db = GrafeoDB::with_config(config)?;
334    /// # Ok::<(), grafeo_common::utils::error::Error>(())
335    /// ```
336    pub fn with_config(config: Config) -> Result<Self> {
337        // Validate configuration before proceeding
338        config
339            .validate()
340            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
341
342        #[cfg(feature = "lpg")]
343        let store = Arc::new(LpgStore::new()?);
344        #[cfg(feature = "triple-store")]
345        let rdf_store = Arc::new(RdfStore::new());
346        let transaction_manager = Arc::new(TransactionManager::new());
347
348        // Create buffer manager with configured limits
349        let buffer_config = BufferManagerConfig {
350            budget: config.memory_limit.unwrap_or_else(|| {
351                // reason: product of system RAM and 0.75 is always a valid positive usize
352                #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
353                let b = (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize;
354                b
355            }),
356            spill_path: config.spill_path.clone().or_else(|| {
357                config.path.as_ref().and_then(|p| {
358                    let parent = p.parent()?;
359                    let name = p.file_name()?.to_str()?;
360                    Some(parent.join(format!("{name}.spill")))
361                })
362            }),
363            ..BufferManagerConfig::default()
364        };
365        let buffer_manager = BufferManager::new(buffer_config);
366
367        // Create catalog early so WAL replay can restore schema definitions
368        let catalog = Arc::new(Catalog::new());
369
370        let is_read_only = config.access_mode == crate::config::AccessMode::ReadOnly;
371
372        // --- Single-file format (.grafeo) ---
373        #[cfg(feature = "grafeo-file")]
374        let file_manager: Option<Arc<GrafeoFileManager>> = if is_read_only {
375            // Read-only mode: open with shared lock, load snapshot, skip WAL
376            if let Some(ref db_path) = config.path {
377                if db_path.exists() && db_path.is_file() {
378                    let fm = GrafeoFileManager::open_read_only(db_path)?;
379                    // Try v2 section-based format first
380                    #[cfg(feature = "lpg")]
381                    if fm.read_section_directory()?.is_some() {
382                        Self::load_from_sections(
383                            &fm,
384                            &store,
385                            &catalog,
386                            #[cfg(feature = "triple-store")]
387                            &rdf_store,
388                        )?;
389                    } else {
390                        // Fall back to v1 blob format
391                        let snapshot_data = fm.read_snapshot()?;
392                        if !snapshot_data.is_empty() {
393                            Self::apply_snapshot_data(
394                                &store,
395                                &catalog,
396                                #[cfg(feature = "triple-store")]
397                                &rdf_store,
398                                &snapshot_data,
399                            )?;
400                        }
401                    }
402                    Some(Arc::new(fm))
403                } else {
404                    return Err(grafeo_common::utils::error::Error::Internal(format!(
405                        "read-only open requires an existing .grafeo file: {}",
406                        db_path.display()
407                    )));
408                }
409            } else {
410                return Err(grafeo_common::utils::error::Error::Internal(
411                    "read-only mode requires a database path".to_string(),
412                ));
413            }
414        } else if let Some(ref db_path) = config.path {
415            // Initialize the file manager whenever single-file format is selected,
416            // regardless of whether WAL is enabled. Without this, a database opened
417            // with wal_enabled:false + StorageFormat::SingleFile would produce no
418            // output at all (the file manager was previously gated behind wal_enabled).
419            if Self::should_use_single_file(db_path, config.storage_format) {
420                let fm = if db_path.exists() && db_path.is_file() {
421                    GrafeoFileManager::open(db_path)?
422                } else if !db_path.exists() {
423                    GrafeoFileManager::create(db_path)?
424                } else {
425                    // Path exists but is not a file (directory, etc.)
426                    return Err(grafeo_common::utils::error::Error::Internal(format!(
427                        "path exists but is not a file: {}",
428                        db_path.display()
429                    )));
430                };
431
432                // Load data: try v2 section-based format, fall back to v1 blob
433                #[cfg(feature = "lpg")]
434                if fm.read_section_directory()?.is_some() {
435                    Self::load_from_sections(
436                        &fm,
437                        &store,
438                        &catalog,
439                        #[cfg(feature = "triple-store")]
440                        &rdf_store,
441                    )?;
442                } else {
443                    let snapshot_data = fm.read_snapshot()?;
444                    if !snapshot_data.is_empty() {
445                        Self::apply_snapshot_data(
446                            &store,
447                            &catalog,
448                            #[cfg(feature = "triple-store")]
449                            &rdf_store,
450                            &snapshot_data,
451                        )?;
452                    }
453                }
454
455                // Recover sidecar WAL if WAL is enabled and a sidecar exists
456                #[cfg(all(feature = "wal", feature = "lpg"))]
457                if config.wal_enabled && fm.has_sidecar_wal() {
458                    let recovery = WalRecovery::new(fm.sidecar_wal_path());
459                    let records = recovery.recover()?;
460                    Self::apply_wal_records(
461                        &store,
462                        &catalog,
463                        #[cfg(feature = "triple-store")]
464                        &rdf_store,
465                        &records,
466                    )?;
467                }
468
469                Some(Arc::new(fm))
470            } else {
471                None
472            }
473        } else {
474            None
475        };
476
477        // Determine whether to use the WAL directory path (legacy) or sidecar
478        // Read-only mode skips WAL entirely (no recovery, no creation).
479        #[cfg(feature = "wal")]
480        let wal = if is_read_only {
481            None
482        } else if config.wal_enabled {
483            if let Some(ref db_path) = config.path {
484                // When using single-file format, the WAL is a sidecar directory
485                #[cfg(feature = "grafeo-file")]
486                let wal_path = if let Some(ref fm) = file_manager {
487                    let p = fm.sidecar_wal_path();
488                    std::fs::create_dir_all(&p)?;
489                    p
490                } else {
491                    // Legacy: WAL inside the database directory
492                    std::fs::create_dir_all(db_path)?;
493                    db_path.join("wal")
494                };
495
496                #[cfg(not(feature = "grafeo-file"))]
497                let wal_path = {
498                    std::fs::create_dir_all(db_path)?;
499                    db_path.join("wal")
500                };
501
502                // For legacy WAL directory format, check if WAL exists and recover
503                #[cfg(all(feature = "lpg", feature = "grafeo-file"))]
504                let is_single_file = file_manager.is_some();
505                #[cfg(all(feature = "lpg", not(feature = "grafeo-file")))]
506                let is_single_file = false;
507
508                #[cfg(feature = "lpg")]
509                if !is_single_file && wal_path.exists() {
510                    let recovery = WalRecovery::new(&wal_path);
511                    let records = recovery.recover()?;
512                    Self::apply_wal_records(
513                        &store,
514                        &catalog,
515                        #[cfg(feature = "triple-store")]
516                        &rdf_store,
517                        &records,
518                    )?;
519                }
520
521                // Open/create WAL manager with configured durability
522                let wal_durability = match config.wal_durability {
523                    crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
524                    crate::config::DurabilityMode::Batch {
525                        max_delay_ms,
526                        max_records,
527                    } => WalDurabilityMode::Batch {
528                        max_delay_ms,
529                        max_records,
530                    },
531                    crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
532                        WalDurabilityMode::Adaptive { target_interval_ms }
533                    }
534                    crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
535                };
536                let wal_config = WalConfig {
537                    durability: wal_durability,
538                    ..WalConfig::default()
539                };
540                let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
541                Some(Arc::new(wal_manager))
542            } else {
543                None
544            }
545        } else {
546            None
547        };
548
549        // Create query cache with default capacity (1000 queries)
550        let query_cache = Arc::new(QueryCache::default());
551
552        // After all snapshot/WAL recovery, sync TransactionManager epoch
553        // with the store so queries use the correct viewing epoch.
554        #[cfg(all(feature = "temporal", feature = "lpg"))]
555        transaction_manager.sync_epoch(store.current_epoch());
556
557        #[cfg(feature = "cdc")]
558        let cdc_enabled_val = config.cdc_enabled;
559        #[cfg(feature = "cdc")]
560        let cdc_retention = config.cdc_retention.clone();
561
562        // Clone Arcs for the checkpoint timer before moving originals into the struct.
563        // The timer captures its own references and runs in a background thread.
564        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
565        let checkpoint_interval = config.checkpoint_interval;
566        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
567        let timer_store = Arc::clone(&store);
568        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
569        let timer_catalog = Arc::clone(&catalog);
570        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
571        let timer_tm = Arc::clone(&transaction_manager);
572        #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "triple-store"))]
573        let timer_rdf = Arc::clone(&rdf_store);
574        #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "wal"))]
575        let timer_wal = wal.clone();
576
577        let mut db = Self {
578            config,
579            #[cfg(feature = "lpg")]
580            store: Some(store),
581            catalog,
582            #[cfg(feature = "triple-store")]
583            rdf_store,
584            transaction_manager,
585            buffer_manager,
586            #[cfg(feature = "wal")]
587            wal,
588            #[cfg(feature = "wal")]
589            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
590            query_cache,
591            commit_counter: Arc::new(AtomicUsize::new(0)),
592            is_open: RwLock::new(true),
593            #[cfg(feature = "cdc")]
594            cdc_log: Arc::new(crate::cdc::CdcLog::with_retention(cdc_retention)),
595            #[cfg(feature = "cdc")]
596            cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
597            #[cfg(feature = "embed")]
598            embedding_models: RwLock::new(hashbrown::HashMap::new()),
599            #[cfg(feature = "grafeo-file")]
600            file_manager,
601            #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
602            checkpoint_timer: parking_lot::Mutex::new(None),
603            #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
604            vector_spill_storages: None,
605            external_read_store: None,
606            external_write_store: None,
607            #[cfg(feature = "metrics")]
608            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
609            current_graph: RwLock::new(None),
610            current_schema: RwLock::new(None),
611            read_only: is_read_only,
612            projections: Arc::new(RwLock::new(std::collections::HashMap::new())),
613            #[cfg(all(feature = "compact-store", feature = "lpg"))]
614            layered_store: None,
615        };
616
617        // Register storage sections as memory consumers for pressure tracking
618        db.register_section_consumers();
619
620        // Start periodic checkpoint timer if configured
621        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
622        if let (Some(interval), Some(fm)) = (checkpoint_interval, &db.file_manager)
623            && !is_read_only
624        {
625            *db.checkpoint_timer.lock() = Some(checkpoint_timer::CheckpointTimer::start(
626                interval,
627                Arc::clone(fm),
628                timer_store,
629                timer_catalog,
630                timer_tm,
631                #[cfg(feature = "triple-store")]
632                timer_rdf,
633                #[cfg(feature = "wal")]
634                timer_wal,
635            ));
636        }
637
638        // Discover existing spill files from a previous session.
639        // If vectors were spilled before close, the spill files persist on disk
640        // and need to be re-mapped so search can read from them.
641        #[cfg(all(
642            feature = "lpg",
643            feature = "vector-index",
644            feature = "mmap",
645            not(feature = "temporal")
646        ))]
647        db.restore_spill_files();
648
649        // If VectorStore is configured as ForceDisk, immediately spill embeddings.
650        // This must happen after register_section_consumers() which creates the consumer.
651        #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
652        if db
653            .config
654            .section_configs
655            .get(&grafeo_common::storage::SectionType::VectorStore)
656            .is_some_and(|c| c.tier == grafeo_common::storage::TierOverride::ForceDisk)
657        {
658            db.buffer_manager.spill_all();
659        }
660
661        Ok(db)
662    }
663
664    /// Creates a database backed by a custom [`GraphStoreMut`] implementation.
665    ///
666    /// The external store handles all data persistence. WAL, CDC, and index
667    /// management are the responsibility of the store implementation.
668    ///
669    /// Query execution (all 6 languages, optimizer, planner) works through the
670    /// provided store. Admin operations (schema introspection, persistence,
671    /// vector/text indexes) are not available on external stores.
672    ///
673    /// # Examples
674    ///
675    /// ```no_run
676    /// use std::sync::Arc;
677    /// use grafeo_engine::{GrafeoDB, Config};
678    /// use grafeo_core::graph::GraphStoreMut;
679    ///
680    /// fn example(store: Arc<dyn GraphStoreMut>) -> grafeo_common::utils::error::Result<()> {
681    ///     let db = GrafeoDB::with_store(store, Config::in_memory())?;
682    ///     let result = db.execute("MATCH (n) RETURN count(n)")?;
683    ///     Ok(())
684    /// }
685    /// ```
686    ///
687    /// # Errors
688    ///
689    /// Returns an error if config validation fails.
690    ///
691    /// [`GraphStoreMut`]: grafeo_core::graph::GraphStoreMut
692    pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
693        config
694            .validate()
695            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
696
697        let transaction_manager = Arc::new(TransactionManager::new());
698
699        let buffer_config = BufferManagerConfig {
700            budget: config.memory_limit.unwrap_or_else(|| {
701                // reason: product of system RAM and 0.75 is always a valid positive usize
702                #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
703                let b = (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize;
704                b
705            }),
706            spill_path: None,
707            ..BufferManagerConfig::default()
708        };
709        let buffer_manager = BufferManager::new(buffer_config);
710
711        let query_cache = Arc::new(QueryCache::default());
712
713        #[cfg(feature = "cdc")]
714        let cdc_enabled_val = config.cdc_enabled;
715
716        Ok(Self {
717            config,
718            #[cfg(feature = "lpg")]
719            store: None,
720            catalog: Arc::new(Catalog::new()),
721            #[cfg(feature = "triple-store")]
722            rdf_store: Arc::new(RdfStore::new()),
723            transaction_manager,
724            buffer_manager,
725            #[cfg(feature = "wal")]
726            wal: None,
727            #[cfg(feature = "wal")]
728            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
729            query_cache,
730            commit_counter: Arc::new(AtomicUsize::new(0)),
731            is_open: RwLock::new(true),
732            #[cfg(feature = "cdc")]
733            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
734            #[cfg(feature = "cdc")]
735            cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
736            #[cfg(feature = "embed")]
737            embedding_models: RwLock::new(hashbrown::HashMap::new()),
738            #[cfg(feature = "grafeo-file")]
739            file_manager: None,
740            #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
741            checkpoint_timer: parking_lot::Mutex::new(None),
742            #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
743            vector_spill_storages: None,
744            external_read_store: Some(Arc::clone(&store) as Arc<dyn GraphStoreSearch>),
745            external_write_store: Some(store),
746            #[cfg(feature = "metrics")]
747            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
748            current_graph: RwLock::new(None),
749            current_schema: RwLock::new(None),
750            read_only: false,
751            projections: Arc::new(RwLock::new(std::collections::HashMap::new())),
752            #[cfg(all(feature = "compact-store", feature = "lpg"))]
753            layered_store: None,
754        })
755    }
756
757    /// Creates a database backed by a read-only [`GraphStore`].
758    ///
759    /// The database is set to read-only mode. Write queries (CREATE, SET,
760    /// DELETE) will return `TransactionError::ReadOnly`.
761    ///
762    /// # Examples
763    ///
764    /// ```no_run
765    /// use std::sync::Arc;
766    /// use grafeo_engine::{GrafeoDB, Config};
767    /// use grafeo_core::graph::GraphStoreSearch;
768    ///
769    /// fn example(store: Arc<dyn GraphStoreSearch>) -> grafeo_common::utils::error::Result<()> {
770    ///     let db = GrafeoDB::with_read_store(store, Config::in_memory())?;
771    ///     let result = db.execute("MATCH (n) RETURN count(n)")?;
772    ///     Ok(())
773    /// }
774    /// ```
775    ///
776    /// # Errors
777    ///
778    /// Returns an error if config validation fails.
779    ///
780    /// [`GraphStore`]: grafeo_core::graph::GraphStore
781    pub fn with_read_store(store: Arc<dyn GraphStoreSearch>, config: Config) -> Result<Self> {
782        config
783            .validate()
784            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
785
786        let transaction_manager = Arc::new(TransactionManager::new());
787
788        let buffer_config = BufferManagerConfig {
789            budget: config.memory_limit.unwrap_or_else(|| {
790                // reason: product of system RAM and 0.75 is always a valid positive usize
791                #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
792                let b = (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize;
793                b
794            }),
795            spill_path: None,
796            ..BufferManagerConfig::default()
797        };
798        let buffer_manager = BufferManager::new(buffer_config);
799
800        let query_cache = Arc::new(QueryCache::default());
801
802        #[cfg(feature = "cdc")]
803        let cdc_enabled_val = config.cdc_enabled;
804
805        Ok(Self {
806            config,
807            #[cfg(feature = "lpg")]
808            store: None,
809            catalog: Arc::new(Catalog::new()),
810            #[cfg(feature = "triple-store")]
811            rdf_store: Arc::new(RdfStore::new()),
812            transaction_manager,
813            buffer_manager,
814            #[cfg(feature = "wal")]
815            wal: None,
816            #[cfg(feature = "wal")]
817            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
818            query_cache,
819            commit_counter: Arc::new(AtomicUsize::new(0)),
820            is_open: RwLock::new(true),
821            #[cfg(feature = "cdc")]
822            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
823            #[cfg(feature = "cdc")]
824            cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
825            #[cfg(feature = "embed")]
826            embedding_models: RwLock::new(hashbrown::HashMap::new()),
827            #[cfg(feature = "grafeo-file")]
828            file_manager: None,
829            #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
830            checkpoint_timer: parking_lot::Mutex::new(None),
831            #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
832            vector_spill_storages: None,
833            external_read_store: Some(store),
834            external_write_store: None,
835            #[cfg(feature = "metrics")]
836            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
837            current_graph: RwLock::new(None),
838            current_schema: RwLock::new(None),
839            read_only: true,
840            projections: Arc::new(RwLock::new(std::collections::HashMap::new())),
841            #[cfg(all(feature = "compact-store", feature = "lpg"))]
842            layered_store: None,
843        })
844    }
845
846    /// Compacts the database into a two-layer store: columnar base + mutable overlay.
847    ///
848    /// Takes a snapshot of all nodes and edges from the current store, builds
849    /// a columnar `CompactStore` with CSR adjacency as the read-only base,
850    /// and creates a fresh `LpgStore` overlay for future mutations. The
851    /// original store is dropped to free memory.
852    ///
853    /// Unlike the pre-0.5.39 behavior, the database remains writable after
854    /// compaction: new writes go to the overlay. Call [`recompact()`](Self::recompact)
855    /// to merge the overlay back into the columnar base periodically.
856    ///
857    /// # Errors
858    ///
859    /// Returns an error if the conversion fails (e.g. more than 32,767
860    /// distinct labels or edge types).
861    ///
862    /// [`CompactStore`]: grafeo_core::graph::compact::CompactStore
863    #[cfg(all(feature = "compact-store", feature = "lpg"))]
864    pub fn compact(&mut self) -> Result<()> {
865        use grafeo_core::graph::compact::from_graph_store_preserving_ids;
866        use grafeo_core::graph::compact::layered::LayeredStore;
867
868        let current_store = self.graph_store();
869
870        // Determine max node/edge IDs for overlay seeding.
871        let max_node_id = if let Some(ref store) = self.store {
872            store.next_node_id().saturating_sub(1)
873        } else {
874            current_store.node_ids().last().map_or(0, |id| id.as_u64())
875        };
876        let max_edge_id = if let Some(ref store) = self.store {
877            store.next_edge_id().saturating_sub(1)
878        } else {
879            // Scan edges to find max ID.
880            let mut max_eid = 0u64;
881            for nid in current_store.node_ids() {
882                for (_, eid) in
883                    current_store.edges_from(nid, grafeo_core::graph::Direction::Outgoing)
884                {
885                    max_eid = max_eid.max(eid.as_u64());
886                }
887            }
888            max_eid
889        };
890
891        let compact = from_graph_store_preserving_ids(current_store.as_ref())
892            .map_err(|e| Error::Internal(e.to_string()))?;
893
894        let layered = Arc::new(
895            LayeredStore::new(compact, max_node_id, max_edge_id)
896                .map_err(|e| Error::Internal(e.to_string()))?,
897        );
898
899        // Sync the overlay's epoch with the TransactionManager so MVCC
900        // visibility works correctly for nodes created after compact().
901        let current_epoch = self.transaction_manager.current_epoch();
902        layered.overlay_store().sync_epoch(current_epoch);
903
904        // Named graphs are LPG-specific and outside the columnar base; move them
905        // from the pre-compact overlay into the new overlay so they survive
906        // compaction.
907        if let Some(ref old) = self.store {
908            layered
909                .overlay_store()
910                .install_named_graphs(old.take_named_graphs());
911        }
912
913        self.external_read_store = Some(Arc::clone(&layered) as Arc<dyn GraphStoreSearch>);
914        self.external_write_store = Some(Arc::clone(&layered) as Arc<dyn GraphStoreMut>);
915        self.store = Some(Arc::clone(layered.overlay_store()));
916        self.layered_store = Some(layered);
917        self.read_only = false;
918        self.query_cache = Arc::new(QueryCache::default());
919        self.projections.write().clear();
920
921        Ok(())
922    }
923
924    /// Merges the overlay back into the columnar base.
925    ///
926    /// Reads the combined view (base + overlay), builds a fresh `CompactStore`,
927    /// and replaces the `LayeredStore` with one backed by the merged base and
928    /// an empty overlay.
929    ///
930    /// # Errors
931    ///
932    /// Returns an error if the database was not previously compacted, or if
933    /// the merge fails.
934    #[cfg(all(feature = "compact-store", feature = "lpg"))]
935    pub fn recompact(&mut self) -> Result<()> {
936        use grafeo_core::graph::compact::from_graph_store_preserving_ids;
937        use grafeo_core::graph::compact::layered::LayeredStore;
938
939        let layered = self
940            .layered_store
941            .as_ref()
942            .ok_or_else(|| Error::Internal("recompact() requires a prior compact()".into()))?;
943
944        // Read the combined view.
945        let combined: Arc<dyn GraphStoreSearch> = Arc::clone(layered) as Arc<dyn GraphStoreSearch>;
946
947        // Max IDs from the overlay's allocators.
948        let max_node_id = layered.overlay_store().next_node_id().saturating_sub(1);
949        let max_edge_id = layered.overlay_store().next_edge_id().saturating_sub(1);
950
951        let fresh_compact = from_graph_store_preserving_ids(combined.as_ref())
952            .map_err(|e| Error::Internal(e.to_string()))?;
953
954        let new_layered = Arc::new(
955            LayeredStore::new(fresh_compact, max_node_id, max_edge_id)
956                .map_err(|e| Error::Internal(e.to_string()))?,
957        );
958
959        // Sync overlay epoch.
960        let current_epoch = self.transaction_manager.current_epoch();
961        new_layered.overlay_store().sync_epoch(current_epoch);
962
963        // Carry named graphs forward: the old overlay is about to be dropped.
964        new_layered
965            .overlay_store()
966            .install_named_graphs(layered.overlay_store().take_named_graphs());
967
968        self.external_read_store = Some(Arc::clone(&new_layered) as Arc<dyn GraphStoreSearch>);
969        self.external_write_store = Some(Arc::clone(&new_layered) as Arc<dyn GraphStoreMut>);
970        self.store = Some(Arc::clone(new_layered.overlay_store()));
971        self.layered_store = Some(new_layered);
972        self.query_cache = Arc::new(QueryCache::default());
973
974        Ok(())
975    }
976
977    /// Applies WAL records to restore the database state.
978    ///
979    /// Data mutation records are routed through a graph cursor that tracks
980    /// `SwitchGraph` context markers, replaying mutations into the correct
981    /// named graph (or the default graph when cursor is `None`).
982    #[cfg(all(feature = "wal", feature = "lpg"))]
983    fn apply_wal_records(
984        store: &Arc<LpgStore>,
985        catalog: &Catalog,
986        #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
987        records: &[WalRecord],
988    ) -> Result<()> {
989        use crate::catalog::{
990            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
991        };
992        use grafeo_common::utils::error::Error;
993
994        // Graph cursor: tracks which named graph receives data mutations.
995        // `None` means the default graph.
996        let mut current_graph: Option<String> = None;
997        let mut target_store: Arc<LpgStore> = Arc::clone(store);
998
999        for record in records {
1000            match record {
1001                // --- Named graph lifecycle ---
1002                WalRecord::CreateNamedGraph { name } => {
1003                    let _ = store.create_graph(name);
1004                }
1005                WalRecord::DropNamedGraph { name } => {
1006                    store.drop_graph(name);
1007                    // Reset cursor if the dropped graph was active
1008                    if current_graph.as_deref() == Some(name.as_str()) {
1009                        current_graph = None;
1010                        target_store = Arc::clone(store);
1011                    }
1012                }
1013                WalRecord::SwitchGraph { name } => {
1014                    current_graph.clone_from(name);
1015                    target_store = match &current_graph {
1016                        None => Arc::clone(store),
1017                        Some(graph_name) => store
1018                            .graph_or_create(graph_name)
1019                            .map_err(|e| Error::Internal(e.to_string()))?,
1020                    };
1021                }
1022
1023                // --- Data mutations: routed through target_store ---
1024                WalRecord::CreateNode { id, labels } => {
1025                    let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
1026                    target_store.create_node_with_id(*id, &label_refs)?;
1027                }
1028                WalRecord::DeleteNode { id } => {
1029                    target_store.delete_node(*id);
1030                }
1031                WalRecord::CreateEdge {
1032                    id,
1033                    src,
1034                    dst,
1035                    edge_type,
1036                } => {
1037                    target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
1038                }
1039                WalRecord::DeleteEdge { id } => {
1040                    target_store.delete_edge(*id);
1041                }
1042                WalRecord::SetNodeProperty { id, key, value } => {
1043                    target_store.set_node_property(*id, key, value.clone());
1044                }
1045                WalRecord::SetEdgeProperty { id, key, value } => {
1046                    target_store.set_edge_property(*id, key, value.clone());
1047                }
1048                WalRecord::AddNodeLabel { id, label } => {
1049                    target_store.add_label(*id, label);
1050                }
1051                WalRecord::RemoveNodeLabel { id, label } => {
1052                    target_store.remove_label(*id, label);
1053                }
1054                WalRecord::RemoveNodeProperty { id, key } => {
1055                    target_store.remove_node_property(*id, key);
1056                }
1057                WalRecord::RemoveEdgeProperty { id, key } => {
1058                    target_store.remove_edge_property(*id, key);
1059                }
1060
1061                // --- Schema DDL replay (always on root catalog) ---
1062                WalRecord::CreateNodeType {
1063                    name,
1064                    properties,
1065                    constraints,
1066                } => {
1067                    let def = NodeTypeDefinition {
1068                        name: name.clone(),
1069                        properties: properties
1070                            .iter()
1071                            .map(|(n, t, nullable)| TypedProperty {
1072                                name: n.clone(),
1073                                data_type: PropertyDataType::from_type_name(t),
1074                                nullable: *nullable,
1075                                default_value: None,
1076                            })
1077                            .collect(),
1078                        constraints: constraints
1079                            .iter()
1080                            .map(|(kind, props)| match kind.as_str() {
1081                                "unique" => TypeConstraint::Unique(props.clone()),
1082                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
1083                                "not_null" if !props.is_empty() => {
1084                                    TypeConstraint::NotNull(props[0].clone())
1085                                }
1086                                _ => TypeConstraint::Unique(props.clone()),
1087                            })
1088                            .collect(),
1089                        parent_types: Vec::new(),
1090                    };
1091                    let _ = catalog.register_node_type(def);
1092                }
1093                WalRecord::DropNodeType { name } => {
1094                    let _ = catalog.drop_node_type(name);
1095                }
1096                WalRecord::CreateEdgeType {
1097                    name,
1098                    properties,
1099                    constraints,
1100                } => {
1101                    let def = EdgeTypeDefinition {
1102                        name: name.clone(),
1103                        properties: properties
1104                            .iter()
1105                            .map(|(n, t, nullable)| TypedProperty {
1106                                name: n.clone(),
1107                                data_type: PropertyDataType::from_type_name(t),
1108                                nullable: *nullable,
1109                                default_value: None,
1110                            })
1111                            .collect(),
1112                        constraints: constraints
1113                            .iter()
1114                            .map(|(kind, props)| match kind.as_str() {
1115                                "unique" => TypeConstraint::Unique(props.clone()),
1116                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
1117                                "not_null" if !props.is_empty() => {
1118                                    TypeConstraint::NotNull(props[0].clone())
1119                                }
1120                                _ => TypeConstraint::Unique(props.clone()),
1121                            })
1122                            .collect(),
1123                        source_node_types: Vec::new(),
1124                        target_node_types: Vec::new(),
1125                    };
1126                    let _ = catalog.register_edge_type_def(def);
1127                }
1128                WalRecord::DropEdgeType { name } => {
1129                    let _ = catalog.drop_edge_type_def(name);
1130                }
1131                WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
1132                    // Index recreation is handled by the store on startup
1133                    // (indexes are rebuilt from data, not WAL)
1134                }
1135                WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
1136                    // Constraint definitions are part of type definitions
1137                    // and replayed via CreateNodeType/CreateEdgeType
1138                }
1139                WalRecord::CreateGraphType {
1140                    name,
1141                    node_types,
1142                    edge_types,
1143                    open,
1144                } => {
1145                    use crate::catalog::GraphTypeDefinition;
1146                    let def = GraphTypeDefinition {
1147                        name: name.clone(),
1148                        allowed_node_types: node_types.clone(),
1149                        allowed_edge_types: edge_types.clone(),
1150                        open: *open,
1151                    };
1152                    let _ = catalog.register_graph_type(def);
1153                }
1154                WalRecord::DropGraphType { name } => {
1155                    let _ = catalog.drop_graph_type(name);
1156                }
1157                WalRecord::CreateSchema { name } => {
1158                    let _ = catalog.register_schema_namespace(name.clone());
1159                }
1160                WalRecord::DropSchema { name } => {
1161                    let _ = catalog.drop_schema_namespace(name);
1162                }
1163
1164                WalRecord::AlterNodeType { name, alterations } => {
1165                    for (action, prop_name, type_name, nullable) in alterations {
1166                        match action.as_str() {
1167                            "add" => {
1168                                let prop = TypedProperty {
1169                                    name: prop_name.clone(),
1170                                    data_type: PropertyDataType::from_type_name(type_name),
1171                                    nullable: *nullable,
1172                                    default_value: None,
1173                                };
1174                                let _ = catalog.alter_node_type_add_property(name, prop);
1175                            }
1176                            "drop" => {
1177                                let _ = catalog.alter_node_type_drop_property(name, prop_name);
1178                            }
1179                            _ => {}
1180                        }
1181                    }
1182                }
1183                WalRecord::AlterEdgeType { name, alterations } => {
1184                    for (action, prop_name, type_name, nullable) in alterations {
1185                        match action.as_str() {
1186                            "add" => {
1187                                let prop = TypedProperty {
1188                                    name: prop_name.clone(),
1189                                    data_type: PropertyDataType::from_type_name(type_name),
1190                                    nullable: *nullable,
1191                                    default_value: None,
1192                                };
1193                                let _ = catalog.alter_edge_type_add_property(name, prop);
1194                            }
1195                            "drop" => {
1196                                let _ = catalog.alter_edge_type_drop_property(name, prop_name);
1197                            }
1198                            _ => {}
1199                        }
1200                    }
1201                }
1202                WalRecord::AlterGraphType { name, alterations } => {
1203                    for (action, type_name) in alterations {
1204                        match action.as_str() {
1205                            "add_node" => {
1206                                let _ =
1207                                    catalog.alter_graph_type_add_node_type(name, type_name.clone());
1208                            }
1209                            "drop_node" => {
1210                                let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
1211                            }
1212                            "add_edge" => {
1213                                let _ =
1214                                    catalog.alter_graph_type_add_edge_type(name, type_name.clone());
1215                            }
1216                            "drop_edge" => {
1217                                let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
1218                            }
1219                            _ => {}
1220                        }
1221                    }
1222                }
1223
1224                WalRecord::CreateProcedure {
1225                    name,
1226                    params,
1227                    returns,
1228                    body,
1229                } => {
1230                    use crate::catalog::ProcedureDefinition;
1231                    let def = ProcedureDefinition {
1232                        name: name.clone(),
1233                        params: params.clone(),
1234                        returns: returns.clone(),
1235                        body: body.clone(),
1236                    };
1237                    let _ = catalog.register_procedure(def);
1238                }
1239                WalRecord::DropProcedure { name } => {
1240                    let _ = catalog.drop_procedure(name);
1241                }
1242
1243                // --- RDF triple replay ---
1244                #[cfg(feature = "triple-store")]
1245                WalRecord::InsertRdfTriple { .. }
1246                | WalRecord::DeleteRdfTriple { .. }
1247                | WalRecord::ClearRdfGraph { .. }
1248                | WalRecord::CreateRdfGraph { .. }
1249                | WalRecord::DropRdfGraph { .. } => {
1250                    rdf_ops::replay_rdf_wal_record(rdf_store, record);
1251                }
1252                #[cfg(not(feature = "triple-store"))]
1253                WalRecord::InsertRdfTriple { .. }
1254                | WalRecord::DeleteRdfTriple { .. }
1255                | WalRecord::ClearRdfGraph { .. }
1256                | WalRecord::CreateRdfGraph { .. }
1257                | WalRecord::DropRdfGraph { .. } => {}
1258
1259                WalRecord::TransactionCommit { .. } => {
1260                    // In temporal mode, advance the store epoch on each committed
1261                    // transaction so that subsequent property/label operations
1262                    // are recorded at the correct epoch in their VersionLogs.
1263                    #[cfg(feature = "temporal")]
1264                    {
1265                        target_store.new_epoch();
1266                    }
1267                }
1268                WalRecord::TransactionAbort { .. } | WalRecord::Checkpoint { .. } => {
1269                    // Transaction control records don't need replay action
1270                    // (recovery already filtered to only committed transactions)
1271                }
1272                WalRecord::EpochAdvance { .. } => {
1273                    // Metadata record: no store mutation needed.
1274                    // Used by incremental backup and point-in-time recovery.
1275                }
1276            }
1277        }
1278        Ok(())
1279    }
1280
1281    // =========================================================================
1282    // Single-file format helpers
1283    // =========================================================================
1284
1285    /// Returns `true` if the given path should use single-file format.
1286    #[cfg(feature = "grafeo-file")]
1287    fn should_use_single_file(
1288        path: &std::path::Path,
1289        configured: crate::config::StorageFormat,
1290    ) -> bool {
1291        use crate::config::StorageFormat;
1292        match configured {
1293            StorageFormat::SingleFile => true,
1294            StorageFormat::WalDirectory => false,
1295            StorageFormat::Auto => {
1296                // Existing file: check magic bytes
1297                if path.is_file() {
1298                    if let Ok(mut f) = std::fs::File::open(path) {
1299                        use std::io::Read;
1300                        let mut magic = [0u8; 4];
1301                        if f.read_exact(&mut magic).is_ok() && magic == grafeo_storage::file::MAGIC
1302                        {
1303                            return true;
1304                        }
1305                    }
1306                    return false;
1307                }
1308                // Existing directory: legacy format
1309                if path.is_dir() {
1310                    return false;
1311                }
1312                // New path: check extension
1313                path.extension().is_some_and(|ext| ext == "grafeo")
1314            }
1315        }
1316    }
1317
1318    /// Applies snapshot data (from a `.grafeo` file) to restore the store and catalog.
1319    ///
1320    /// Supports both v1 (monolithic blob) and v2 (section-based) formats.
1321    #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
1322    fn apply_snapshot_data(
1323        store: &Arc<LpgStore>,
1324        catalog: &Arc<crate::catalog::Catalog>,
1325        #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
1326        data: &[u8],
1327    ) -> Result<()> {
1328        // v1 blob format: pass through to legacy loader
1329        persistence::load_snapshot_into_store(
1330            store,
1331            catalog,
1332            #[cfg(feature = "triple-store")]
1333            rdf_store,
1334            data,
1335        )
1336    }
1337
1338    /// Loads from a section-based `.grafeo` file (v2 format).
1339    ///
1340    /// Reads the section directory, then deserializes each section independently.
1341    #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
1342    fn load_from_sections(
1343        fm: &GrafeoFileManager,
1344        store: &Arc<LpgStore>,
1345        catalog: &Arc<crate::catalog::Catalog>,
1346        #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
1347    ) -> Result<()> {
1348        use grafeo_common::storage::{Section, SectionType};
1349
1350        let dir = fm.read_section_directory()?.ok_or_else(|| {
1351            grafeo_common::utils::error::Error::Internal(
1352                "expected v2 section directory but found none".to_string(),
1353            )
1354        })?;
1355
1356        // Load catalog section first (schema defs needed before data)
1357        if let Some(entry) = dir.find(SectionType::Catalog) {
1358            let data = fm.read_section_data(entry)?;
1359            let tm = Arc::new(crate::transaction::TransactionManager::new());
1360            let mut section = catalog_section::CatalogSection::new(
1361                Arc::clone(catalog),
1362                Arc::clone(store),
1363                move || tm.current_epoch().as_u64(),
1364            );
1365            section.deserialize(&data)?;
1366        }
1367
1368        // Load LPG store
1369        if let Some(entry) = dir.find(SectionType::LpgStore) {
1370            let data = fm.read_section_data(entry)?;
1371            let mut section = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
1372            section.deserialize(&data)?;
1373        }
1374
1375        // Load RDF store
1376        #[cfg(feature = "triple-store")]
1377        if let Some(entry) = dir.find(SectionType::RdfStore) {
1378            let data = fm.read_section_data(entry)?;
1379            let mut section = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(rdf_store));
1380            section.deserialize(&data)?;
1381        }
1382
1383        // Restore Ring Index (if persisted)
1384        #[cfg(feature = "ring-index")]
1385        if let Some(entry) = dir.find(SectionType::RdfRing) {
1386            let data = fm.read_section_data(entry)?;
1387            let mut section = grafeo_core::index::ring::RdfRingSection::new(Arc::clone(rdf_store));
1388            section.deserialize(&data)?;
1389        }
1390
1391        // Restore HNSW topology (if vector indexes exist in both catalog and section)
1392        #[cfg(feature = "vector-index")]
1393        if let Some(entry) = dir.find(SectionType::VectorStore) {
1394            let data = fm.read_section_data(entry)?;
1395            let indexes = store.vector_index_entries();
1396            if !indexes.is_empty() {
1397                let mut section = grafeo_core::index::vector::VectorStoreSection::new(indexes);
1398                section.deserialize(&data)?;
1399            }
1400        }
1401
1402        // Restore BM25 postings (if text indexes exist in both catalog and section)
1403        #[cfg(feature = "text-index")]
1404        if let Some(entry) = dir.find(SectionType::TextIndex) {
1405            let data = fm.read_section_data(entry)?;
1406            let indexes = store.text_index_entries();
1407            if !indexes.is_empty() {
1408                let mut section = grafeo_core::index::text::TextIndexSection::new(indexes);
1409                section.deserialize(&data)?;
1410            }
1411        }
1412
1413        Ok(())
1414    }
1415
1416    // =========================================================================
1417    // Session & Configuration
1418    // =========================================================================
1419
1420    /// Opens a new session for running queries.
1421    ///
1422    /// Sessions are cheap to create: spin up as many as you need. Each
1423    /// gets its own transaction context, so concurrent sessions won't
1424    /// block each other on reads.
1425    ///
1426    /// # Panics
1427    ///
1428    /// Panics if the database was configured with an external graph store and
1429    /// the internal arena allocator cannot be initialized (out of memory).
1430    ///
1431    /// # Examples
1432    ///
1433    /// ```
1434    /// use grafeo_engine::GrafeoDB;
1435    ///
1436    /// let db = GrafeoDB::new_in_memory();
1437    /// let session = db.session();
1438    ///
1439    /// // Run queries through the session
1440    /// let result = session.execute("MATCH (n) RETURN count(n)")?;
1441    /// # Ok::<(), grafeo_common::utils::error::Error>(())
1442    /// ```
1443    #[must_use]
1444    pub fn session(&self) -> Session {
1445        self.create_session_inner(None)
1446    }
1447
1448    /// Creates a session scoped to the given identity.
1449    ///
1450    /// The identity determines what operations the session is allowed to
1451    /// perform. A [`Role::ReadOnly`](crate::auth::Role::ReadOnly) identity
1452    /// creates a read-only session; a [`Role::ReadWrite`](crate::auth::Role::ReadWrite)
1453    /// identity allows data mutations but not schema DDL; a
1454    /// [`Role::Admin`](crate::auth::Role::Admin) identity has full access.
1455    ///
1456    /// # Examples
1457    ///
1458    /// ```
1459    /// use grafeo_engine::{GrafeoDB, auth::{Identity, Role}};
1460    ///
1461    /// let db = GrafeoDB::new_in_memory();
1462    /// let identity = Identity::new("app-service", [Role::ReadWrite]);
1463    /// let session = db.session_with_identity(identity);
1464    /// ```
1465    #[must_use]
1466    pub fn session_with_identity(&self, identity: crate::auth::Identity) -> Session {
1467        let force_read_only = !identity.can_write();
1468        self.create_session_inner_full(None, force_read_only, identity)
1469    }
1470
1471    /// Creates a session scoped to a single role.
1472    ///
1473    /// Convenience shorthand for
1474    /// `session_with_identity(Identity::new("anonymous", [role]))`.
1475    ///
1476    /// # Examples
1477    ///
1478    /// ```
1479    /// use grafeo_engine::{GrafeoDB, auth::Role};
1480    ///
1481    /// let db = GrafeoDB::new_in_memory();
1482    /// let reader = db.session_with_role(Role::ReadOnly);
1483    /// ```
1484    #[must_use]
1485    pub fn session_with_role(&self, role: crate::auth::Role) -> Session {
1486        self.session_with_identity(crate::auth::Identity::new("anonymous", [role]))
1487    }
1488
1489    /// Creates a session with an explicit CDC override.
1490    ///
1491    /// When `cdc_enabled` is `true`, mutations in this session are tracked
1492    /// regardless of the database default. When `false`, mutations are not
1493    /// tracked regardless of the database default.
1494    ///
1495    /// # Examples
1496    ///
1497    /// ```
1498    /// use grafeo_engine::GrafeoDB;
1499    ///
1500    /// let db = GrafeoDB::new_in_memory();
1501    ///
1502    /// // Opt in to CDC for just this session
1503    /// let tracked = db.session_with_cdc(true);
1504    /// tracked.execute("INSERT (:Person {name: 'Alix'})")?;
1505    /// # Ok::<(), grafeo_common::utils::error::Error>(())
1506    /// ```
1507    #[cfg(feature = "cdc")]
1508    #[must_use]
1509    pub fn session_with_cdc(&self, cdc_enabled: bool) -> Session {
1510        self.create_session_inner(Some(cdc_enabled))
1511    }
1512
1513    /// Creates a read-only session regardless of the database's access mode.
1514    ///
1515    /// Mutations executed through this session will fail with
1516    /// `TransactionError::ReadOnly`. Useful for replication replicas where
1517    /// the database itself must remain writable (for applying CDC changes)
1518    /// but client-facing queries must be read-only.
1519    ///
1520    /// **Deprecated**: Use `session_with_role(Role::ReadOnly)` instead.
1521    #[deprecated(
1522        since = "0.5.36",
1523        note = "use session_with_role(Role::ReadOnly) instead"
1524    )]
1525    #[must_use]
1526    pub fn session_read_only(&self) -> Session {
1527        self.session_with_role(crate::auth::Role::ReadOnly)
1528    }
1529
1530    /// Shared session creation logic.
1531    ///
1532    /// `cdc_override` overrides the database-wide `cdc_enabled` default when
1533    /// `Some`. `None` falls back to the database default.
1534    #[allow(unused_variables)] // cdc_override unused when cdc feature is off
1535    fn create_session_inner(&self, cdc_override: Option<bool>) -> Session {
1536        self.create_session_inner_full(cdc_override, false, crate::auth::Identity::anonymous())
1537    }
1538
1539    /// Shared session creation with all overrides.
1540    #[allow(unused_variables)]
1541    fn create_session_inner_full(
1542        &self,
1543        cdc_override: Option<bool>,
1544        force_read_only: bool,
1545        identity: crate::auth::Identity,
1546    ) -> Session {
1547        let session_cfg = || crate::session::SessionConfig {
1548            transaction_manager: Arc::clone(&self.transaction_manager),
1549            query_cache: Arc::clone(&self.query_cache),
1550            catalog: Arc::clone(&self.catalog),
1551            adaptive_config: self.config.adaptive.clone(),
1552            factorized_execution: self.config.factorized_execution,
1553            graph_model: self.config.graph_model,
1554            query_timeout: self.config.query_timeout,
1555            max_property_size: self.config.max_property_size,
1556            buffer_manager: Some(Arc::clone(&self.buffer_manager)),
1557            commit_counter: Arc::clone(&self.commit_counter),
1558            gc_interval: self.config.gc_interval,
1559            read_only: self.read_only || force_read_only,
1560            identity: identity.clone(),
1561            #[cfg(feature = "lpg")]
1562            projections: Arc::clone(&self.projections),
1563        };
1564
1565        // Layered store: use the overlay LpgStore as the session's internal store
1566        // so MVCC operations (begin_tx, commit, visibility) work correctly.
1567        #[cfg(all(feature = "compact-store", feature = "lpg"))]
1568        if let Some(ref layered) = self.layered_store {
1569            let overlay = Arc::clone(layered.overlay_store());
1570            let layered_arc = Arc::clone(layered);
1571            let mut session = Session::with_adaptive(overlay, session_cfg());
1572            // Override graph_store/graph_store_mut to use the LayeredStore
1573            // (which merges base + overlay), not just the overlay alone.
1574            session.override_stores(
1575                Arc::clone(&layered_arc) as Arc<dyn GraphStoreSearch>,
1576                Some(layered_arc as Arc<dyn GraphStoreMut>),
1577            );
1578            return session;
1579        }
1580
1581        if let Some(ref ext_read) = self.external_read_store {
1582            return Session::with_external_store(
1583                Arc::clone(ext_read),
1584                self.external_write_store.as_ref().map(Arc::clone),
1585                session_cfg(),
1586            )
1587            .expect("arena allocation for external store session");
1588        }
1589
1590        #[cfg(all(feature = "lpg", feature = "triple-store"))]
1591        let mut session = Session::with_rdf_store_and_adaptive(
1592            Arc::clone(self.lpg_store()),
1593            Arc::clone(&self.rdf_store),
1594            session_cfg(),
1595        );
1596        #[cfg(all(feature = "lpg", not(feature = "triple-store")))]
1597        let mut session = Session::with_adaptive(Arc::clone(self.lpg_store()), session_cfg());
1598        #[cfg(not(feature = "lpg"))]
1599        let mut session =
1600            Session::with_external_store(self.graph_store(), self.graph_store_mut(), session_cfg())
1601                .expect("session creation for non-lpg build");
1602
1603        #[cfg(all(feature = "wal", feature = "lpg"))]
1604        if let Some(ref wal) = self.wal {
1605            session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
1606        }
1607
1608        #[cfg(feature = "cdc")]
1609        {
1610            let should_enable = cdc_override.unwrap_or_else(|| self.cdc_active());
1611            if should_enable {
1612                session.set_cdc_log(Arc::clone(&self.cdc_log));
1613            }
1614        }
1615
1616        #[cfg(feature = "metrics")]
1617        {
1618            if let Some(ref m) = self.metrics {
1619                session.set_metrics(Arc::clone(m));
1620                m.session_created
1621                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1622                m.session_active
1623                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1624            }
1625        }
1626
1627        // Propagate persistent graph context to the new session
1628        if let Some(ref graph) = *self.current_graph.read() {
1629            session.use_graph(graph);
1630        }
1631
1632        // Propagate persistent schema context to the new session
1633        if let Some(ref schema) = *self.current_schema.read() {
1634            session.set_schema(schema);
1635        }
1636
1637        // Suppress unused_mut when cdc/wal are disabled
1638        let _ = &mut session;
1639
1640        session
1641    }
1642
1643    /// Returns the current graph name, if any.
1644    ///
1645    /// This is the persistent graph context used by one-shot `execute()` calls.
1646    /// It is updated whenever `execute()` encounters `USE GRAPH`, `SESSION SET GRAPH`,
1647    /// or `SESSION RESET`.
1648    #[must_use]
1649    pub fn current_graph(&self) -> Option<String> {
1650        self.current_graph.read().clone()
1651    }
1652
1653    /// Sets the current graph context for subsequent one-shot `execute()` calls.
1654    ///
1655    /// This is equivalent to running `USE GRAPH <name>` but without creating a session.
1656    /// Pass `None` to reset to the default graph.
1657    ///
1658    /// # Errors
1659    ///
1660    /// Returns an error if the named graph does not exist.
1661    pub fn set_current_graph(&self, name: Option<&str>) -> Result<()> {
1662        #[cfg(feature = "lpg")]
1663        if let Some(name) = name
1664            && !name.eq_ignore_ascii_case("default")
1665            && let Some(store) = &self.store
1666            && store.graph(name).is_none()
1667        {
1668            return Err(Error::Query(QueryError::new(
1669                QueryErrorKind::Semantic,
1670                format!("Graph '{name}' does not exist"),
1671            )));
1672        }
1673        *self.current_graph.write() = name.map(ToString::to_string);
1674        Ok(())
1675    }
1676
1677    /// Returns the current schema name, if any.
1678    ///
1679    /// This is the persistent schema context used by one-shot `execute()` calls.
1680    /// It is updated whenever `execute()` encounters `SESSION SET SCHEMA` or `SESSION RESET`.
1681    #[must_use]
1682    pub fn current_schema(&self) -> Option<String> {
1683        self.current_schema.read().clone()
1684    }
1685
1686    /// Sets the current schema context for subsequent one-shot `execute()` calls.
1687    ///
1688    /// This is equivalent to running `SESSION SET SCHEMA <name>` but without creating
1689    /// a session. Pass `None` to clear the schema context.
1690    ///
1691    /// # Errors
1692    ///
1693    /// Returns an error if the named schema does not exist.
1694    pub fn set_current_schema(&self, name: Option<&str>) -> Result<()> {
1695        if let Some(name) = name
1696            && !self.catalog.schema_exists(name)
1697        {
1698            return Err(Error::Query(QueryError::new(
1699                QueryErrorKind::Semantic,
1700                format!("Schema '{name}' does not exist"),
1701            )));
1702        }
1703        *self.current_schema.write() = name.map(ToString::to_string);
1704        Ok(())
1705    }
1706
1707    /// Returns the adaptive execution configuration.
1708    #[must_use]
1709    pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
1710        &self.config.adaptive
1711    }
1712
1713    /// Returns `true` if this database was opened in read-only mode.
1714    #[must_use]
1715    pub fn is_read_only(&self) -> bool {
1716        self.read_only
1717    }
1718
1719    /// Returns the configuration.
1720    #[must_use]
1721    pub fn config(&self) -> &Config {
1722        &self.config
1723    }
1724
1725    /// Returns the graph data model of this database.
1726    #[must_use]
1727    pub fn graph_model(&self) -> crate::config::GraphModel {
1728        self.config.graph_model
1729    }
1730
1731    /// Returns the configured memory limit in bytes, if any.
1732    #[must_use]
1733    pub fn memory_limit(&self) -> Option<usize> {
1734        self.config.memory_limit
1735    }
1736
1737    /// Returns a point-in-time snapshot of all metrics.
1738    ///
1739    /// If the `metrics` feature is disabled or the registry is not
1740    /// initialized, returns a default (all-zero) snapshot.
1741    #[cfg(feature = "metrics")]
1742    #[must_use]
1743    pub fn metrics(&self) -> crate::metrics::MetricsSnapshot {
1744        let mut snapshot = self
1745            .metrics
1746            .as_ref()
1747            .map_or_else(crate::metrics::MetricsSnapshot::default, |m| m.snapshot());
1748
1749        // Augment with cache stats from the query cache (not tracked in the registry)
1750        let cache_stats = self.query_cache.stats();
1751        snapshot.cache_hits = cache_stats.parsed_hits + cache_stats.optimized_hits;
1752        snapshot.cache_misses = cache_stats.parsed_misses + cache_stats.optimized_misses;
1753        snapshot.cache_size = cache_stats.parsed_size + cache_stats.optimized_size;
1754        snapshot.cache_invalidations = cache_stats.invalidations;
1755
1756        snapshot
1757    }
1758
1759    /// Returns all metrics in Prometheus text exposition format.
1760    ///
1761    /// The output is ready to serve from an HTTP `/metrics` endpoint.
1762    #[cfg(feature = "metrics")]
1763    #[must_use]
1764    pub fn metrics_prometheus(&self) -> String {
1765        self.metrics
1766            .as_ref()
1767            .map_or_else(String::new, |m| m.to_prometheus())
1768    }
1769
1770    /// Resets all metrics counters and histograms to zero.
1771    #[cfg(feature = "metrics")]
1772    pub fn reset_metrics(&self) {
1773        if let Some(ref m) = self.metrics {
1774            m.reset();
1775        }
1776        self.query_cache.reset_stats();
1777    }
1778
1779    /// Returns the underlying (default) store.
1780    ///
1781    /// This provides direct access to the LPG store for algorithm implementations
1782    /// and admin operations (index management, schema introspection, MVCC internals).
1783    ///
1784    /// For code that only needs read/write graph operations, prefer
1785    /// [`graph_store()`](Self::graph_store) which returns the trait interface.
1786    #[cfg(feature = "lpg")]
1787    #[must_use]
1788    pub fn store(&self) -> &Arc<LpgStore> {
1789        self.lpg_store()
1790    }
1791
1792    // === Named Graph Management ===
1793
1794    /// Creates a named graph. Returns `true` if created, `false` if it already exists.
1795    ///
1796    /// # Errors
1797    ///
1798    /// Returns an error if arena allocation fails.
1799    #[cfg(feature = "lpg")]
1800    pub fn create_graph(&self, name: &str) -> Result<bool> {
1801        Ok(self.lpg_store().create_graph(name)?)
1802    }
1803
1804    /// Drops a named graph. Returns `true` if dropped, `false` if it did not exist.
1805    ///
1806    /// If the dropped graph was the active graph context, the context is reset
1807    /// to the default graph.
1808    #[cfg(feature = "lpg")]
1809    pub fn drop_graph(&self, name: &str) -> bool {
1810        let Some(store) = &self.store else {
1811            return false;
1812        };
1813        let dropped = store.drop_graph(name);
1814        if dropped {
1815            let mut current = self.current_graph.write();
1816            if current
1817                .as_deref()
1818                .is_some_and(|g| g.eq_ignore_ascii_case(name))
1819            {
1820                *current = None;
1821            }
1822        }
1823        dropped
1824    }
1825
1826    /// Returns all named graph names.
1827    #[cfg(feature = "lpg")]
1828    #[must_use]
1829    pub fn list_graphs(&self) -> Vec<String> {
1830        self.lpg_store().graph_names()
1831    }
1832
1833    // === Graph Projections ===
1834
1835    /// Creates a named graph projection (virtual subgraph).
1836    ///
1837    /// The projection filters the graph store to only include nodes with the
1838    /// specified labels and edges with the specified types. Returns `true` if
1839    /// created, `false` if a projection with that name already exists.
1840    ///
1841    /// # Examples
1842    ///
1843    /// ```
1844    /// use grafeo_engine::GrafeoDB;
1845    /// use grafeo_core::graph::ProjectionSpec;
1846    ///
1847    /// let db = GrafeoDB::new_in_memory();
1848    /// let spec = ProjectionSpec::new()
1849    ///     .with_node_labels(["Person", "City"])
1850    ///     .with_edge_types(["LIVES_IN"]);
1851    /// assert!(db.create_projection("social", spec));
1852    /// ```
1853    pub fn create_projection(
1854        &self,
1855        name: impl Into<String>,
1856        spec: grafeo_core::graph::ProjectionSpec,
1857    ) -> bool {
1858        use grafeo_core::graph::GraphProjection;
1859        use std::collections::hash_map::Entry;
1860
1861        let store = self.graph_store();
1862        let projection = Arc::new(GraphProjection::new(store, spec));
1863        let mut projections = self.projections.write();
1864        match projections.entry(name.into()) {
1865            Entry::Occupied(_) => false,
1866            Entry::Vacant(e) => {
1867                e.insert(projection);
1868                true
1869            }
1870        }
1871    }
1872
1873    /// Drops a named graph projection. Returns `true` if it existed.
1874    pub fn drop_projection(&self, name: &str) -> bool {
1875        self.projections.write().remove(name).is_some()
1876    }
1877
1878    /// Returns the names of all graph projections.
1879    #[must_use]
1880    pub fn list_projections(&self) -> Vec<String> {
1881        self.projections.read().keys().cloned().collect()
1882    }
1883
1884    /// Returns a named projection as a [`GraphStoreSearch`] trait object.
1885    #[must_use]
1886    pub fn projection(&self, name: &str) -> Option<Arc<dyn GraphStoreSearch>> {
1887        self.projections
1888            .read()
1889            .get(name)
1890            .map(|p| Arc::clone(p) as Arc<dyn GraphStoreSearch>)
1891    }
1892
1893    /// Returns the graph store as a trait object.
1894    ///
1895    /// Returns a read-only trait object for the active graph store.
1896    ///
1897    /// This provides the [`GraphStoreSearch`] interface (graph-structure reads
1898    /// plus text/vector search capabilities) for code that only needs read
1899    /// operations. For write access, use [`graph_store_mut()`](Self::graph_store_mut).
1900    #[must_use]
1901    pub fn graph_store(&self) -> Arc<dyn GraphStoreSearch> {
1902        if let Some(ref ext_read) = self.external_read_store {
1903            Arc::clone(ext_read)
1904        } else {
1905            #[cfg(feature = "lpg")]
1906            {
1907                Arc::clone(self.lpg_store()) as Arc<dyn GraphStoreSearch>
1908            }
1909            #[cfg(not(feature = "lpg"))]
1910            unreachable!("no graph store available: enable the `lpg` feature or use with_store()")
1911        }
1912    }
1913
1914    /// Returns the writable graph store, if available.
1915    ///
1916    /// Returns `None` for read-only databases created via
1917    /// [`with_read_store()`](Self::with_read_store).
1918    #[must_use]
1919    pub fn graph_store_mut(&self) -> Option<Arc<dyn GraphStoreMut>> {
1920        if self.external_read_store.is_some() {
1921            self.external_write_store.as_ref().map(Arc::clone)
1922        } else {
1923            #[cfg(feature = "lpg")]
1924            {
1925                Some(Arc::clone(self.lpg_store()) as Arc<dyn GraphStoreMut>)
1926            }
1927            #[cfg(not(feature = "lpg"))]
1928            {
1929                None
1930            }
1931        }
1932    }
1933
1934    /// Garbage collects old MVCC versions that are no longer visible.
1935    ///
1936    /// Determines the minimum epoch required by active transactions and prunes
1937    /// version chains older than that threshold. Also cleans up completed
1938    /// transaction metadata in the transaction manager, and prunes the CDC
1939    /// event log according to its retention policy.
1940    pub fn gc(&self) {
1941        #[cfg(feature = "lpg")]
1942        {
1943            let min_epoch = self.transaction_manager.min_active_epoch();
1944            self.lpg_store().gc_versions(min_epoch);
1945        }
1946        #[cfg(all(feature = "lpg", feature = "cdc"))]
1947        let current_epoch = self.transaction_manager.current_epoch();
1948        self.transaction_manager.gc();
1949
1950        // Prune CDC events based on retention config (epoch + count limits)
1951        #[cfg(feature = "cdc")]
1952        if self.cdc_enabled.load(std::sync::atomic::Ordering::Relaxed) {
1953            #[cfg(feature = "lpg")]
1954            self.cdc_log.apply_retention(current_epoch);
1955        }
1956    }
1957
1958    /// Returns the buffer manager for memory-aware operations.
1959    #[must_use]
1960    pub fn buffer_manager(&self) -> &Arc<BufferManager> {
1961        &self.buffer_manager
1962    }
1963
1964    /// Returns the query cache.
1965    #[must_use]
1966    pub fn query_cache(&self) -> &Arc<QueryCache> {
1967        &self.query_cache
1968    }
1969
1970    /// Clears all cached query plans.
1971    ///
1972    /// This is called automatically after DDL operations, but can also be
1973    /// invoked manually after external schema changes (e.g., WAL replay,
1974    /// import) or when you want to force re-optimization of all queries.
1975    pub fn clear_plan_cache(&self) {
1976        self.query_cache.clear();
1977    }
1978
1979    // =========================================================================
1980    // Lifecycle
1981    // =========================================================================
1982
1983    /// Closes the database, flushing all pending writes.
1984    ///
1985    /// For persistent databases, this ensures everything is safely on disk.
1986    /// Called automatically when the database is dropped, but you can call
1987    /// it explicitly if you need to guarantee durability at a specific point.
1988    ///
1989    /// # Errors
1990    ///
1991    /// Returns an error if the WAL can't be flushed (check disk space/permissions).
1992    pub fn close(&self) -> Result<()> {
1993        let mut is_open = self.is_open.write();
1994        if !*is_open {
1995            return Ok(());
1996        }
1997
1998        // Stop the periodic checkpoint timer first, even for read-only databases.
1999        // compact() can switch a writable DB to read-only after the timer started,
2000        // so the timer must be stopped before any early return to avoid racing
2001        // with the closed file manager.
2002        #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
2003        if let Some(mut timer) = self.checkpoint_timer.lock().take() {
2004            timer.stop();
2005        }
2006
2007        // Read-only databases: just release the shared lock, no checkpointing
2008        if self.read_only {
2009            #[cfg(feature = "grafeo-file")]
2010            if let Some(ref fm) = self.file_manager {
2011                fm.close()?;
2012            }
2013            *is_open = false;
2014            return Ok(());
2015        }
2016
2017        // For single-file format: checkpoint to .grafeo file, then clean up sidecar WAL.
2018        // We must do this BEFORE the WAL close path because checkpoint_to_file
2019        // removes the sidecar WAL directory.
2020        #[cfg(feature = "grafeo-file")]
2021        let is_single_file = self.file_manager.is_some();
2022        #[cfg(not(feature = "grafeo-file"))]
2023        let is_single_file = false;
2024
2025        #[cfg(feature = "grafeo-file")]
2026        if let Some(ref fm) = self.file_manager {
2027            // Flush WAL first so all records are on disk before we snapshot
2028            #[cfg(feature = "wal")]
2029            if let Some(ref wal) = self.wal {
2030                wal.sync()?;
2031            }
2032            let flush_result = self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?;
2033
2034            // Safety check: if WAL has records but the checkpoint was a no-op
2035            // (zero sections written), the container file may not contain the
2036            // latest data. This can happen when sections are not marked dirty
2037            // despite mutations going through the WAL. Force-dirty all sections
2038            // and retry before removing the sidecar.
2039            #[cfg(feature = "wal")]
2040            let flush_result = if flush_result.sections_written == 0 {
2041                if let Some(ref wal) = self.wal {
2042                    if wal.record_count() > 0 {
2043                        grafeo_warn!(
2044                            "WAL has {} records but checkpoint wrote 0 sections; retrying with forced flush",
2045                            wal.record_count()
2046                        );
2047                        self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?
2048                    } else {
2049                        flush_result
2050                    }
2051                } else {
2052                    flush_result
2053                }
2054            } else {
2055                flush_result
2056            };
2057
2058            // Release WAL file handles before removing sidecar directory.
2059            // On Windows, open handles prevent directory deletion.
2060            #[cfg(feature = "wal")]
2061            if let Some(ref wal) = self.wal {
2062                wal.close_active_log();
2063            }
2064
2065            // Only remove the sidecar WAL after verifying the checkpoint wrote
2066            // data to the container. If nothing was written and the WAL had
2067            // records, keep the sidecar so the next open can recover from it.
2068            #[cfg(feature = "wal")]
2069            let has_wal_records = self.wal.as_ref().is_some_and(|wal| wal.record_count() > 0);
2070            #[cfg(not(feature = "wal"))]
2071            let has_wal_records = false;
2072
2073            if flush_result.sections_written > 0 || !has_wal_records {
2074                {
2075                    use grafeo_common::testing::crash::maybe_crash;
2076                    maybe_crash("close:before_remove_sidecar_wal");
2077                }
2078                fm.remove_sidecar_wal()?;
2079            } else {
2080                grafeo_warn!(
2081                    "keeping sidecar WAL for recovery: checkpoint wrote 0 sections but WAL has records"
2082                );
2083            }
2084            fm.close()?;
2085        }
2086
2087        // Commit and sync WAL (legacy directory format only).
2088        // We intentionally do NOT call wal.checkpoint() here. Directory format
2089        // has no snapshot: the WAL files are the sole source of truth. Writing
2090        // checkpoint.meta would cause recovery to skip older WAL files, losing
2091        // all data that predates the current log sequence.
2092        #[cfg(feature = "wal")]
2093        if !is_single_file && let Some(ref wal) = self.wal {
2094            // Use the last assigned transaction ID, or create one for the commit record
2095            let commit_tx = self
2096                .transaction_manager
2097                .last_assigned_transaction_id()
2098                .unwrap_or_else(|| self.transaction_manager.begin());
2099
2100            // Log a TransactionCommit to mark all pending records as committed
2101            wal.log(&WalRecord::TransactionCommit {
2102                transaction_id: commit_tx,
2103            })?;
2104
2105            wal.sync()?;
2106        }
2107
2108        *is_open = false;
2109        Ok(())
2110    }
2111
2112    /// Returns the typed WAL if available.
2113    #[cfg(feature = "wal")]
2114    #[must_use]
2115    pub fn wal(&self) -> Option<&Arc<LpgWal>> {
2116        self.wal.as_ref()
2117    }
2118
2119    /// Logs a WAL record if WAL is enabled.
2120    #[cfg(feature = "wal")]
2121    pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
2122        if let Some(ref wal) = self.wal {
2123            wal.log(record)?;
2124        }
2125        Ok(())
2126    }
2127
2128    /// Registers storage sections as [`MemoryConsumer`]s with the BufferManager.
2129    ///
2130    /// Each section reports its memory usage to the buffer manager, enabling
2131    /// accurate pressure tracking. Called once after database construction.
2132    fn register_section_consumers(&mut self) {
2133        // LPG store section
2134        #[cfg(feature = "lpg")]
2135        let store_ref = self.store.as_ref();
2136        #[cfg(feature = "lpg")]
2137        if let Some(store) = store_ref {
2138            let lpg = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
2139            self.buffer_manager.register_consumer(Arc::new(
2140                section_consumer::SectionConsumer::new(Arc::new(lpg)),
2141            ));
2142        }
2143
2144        // RDF store: only when data exists
2145        #[cfg(feature = "triple-store")]
2146        if !self.rdf_store.is_empty() || self.rdf_store.graph_count() > 0 {
2147            let rdf = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(&self.rdf_store));
2148            self.buffer_manager.register_consumer(Arc::new(
2149                section_consumer::SectionConsumer::new(Arc::new(rdf)),
2150            ));
2151        }
2152
2153        // Ring Index: only when Ring has been built
2154        #[cfg(feature = "ring-index")]
2155        if self.rdf_store.ring().is_some() {
2156            let ring = grafeo_core::index::ring::RdfRingSection::new(Arc::clone(&self.rdf_store));
2157            self.buffer_manager.register_consumer(Arc::new(
2158                section_consumer::SectionConsumer::new(Arc::new(ring)),
2159            ));
2160        }
2161
2162        // Vector indexes: dynamic consumer that re-queries the store on each
2163        // memory_usage() call, so dropped indexes are freed and new ones tracked.
2164        #[cfg(all(
2165            feature = "lpg",
2166            feature = "vector-index",
2167            feature = "mmap",
2168            not(feature = "temporal")
2169        ))]
2170        if let Some(store) = store_ref {
2171            let spill_path = self.buffer_manager.config().spill_path.clone();
2172            let consumer = Arc::new(section_consumer::VectorIndexConsumer::new(
2173                store, spill_path,
2174            ));
2175            // Share the spill registry with the search path
2176            self.vector_spill_storages = Some(Arc::clone(consumer.spilled_storages()));
2177            self.buffer_manager.register_consumer(consumer);
2178        }
2179
2180        // Text indexes: same dynamic approach as vector indexes.
2181        #[cfg(all(feature = "lpg", feature = "text-index"))]
2182        if let Some(store) = store_ref {
2183            self.buffer_manager
2184                .register_consumer(Arc::new(section_consumer::TextIndexConsumer::new(store)));
2185        }
2186
2187        // CDC log: register as memory consumer so the buffer manager can
2188        // prune events under memory pressure.
2189        #[cfg(feature = "cdc")]
2190        self.buffer_manager.register_consumer(
2191            Arc::clone(&self.cdc_log) as Arc<dyn grafeo_common::memory::MemoryConsumer>
2192        );
2193    }
2194
2195    /// Discovers and re-opens spill files from a previous session.
2196    ///
2197    /// When the database was closed with spilled vector embeddings, the
2198    /// `vectors_*.bin` files persist in the spill directory. This method
2199    /// scans for them, opens each as `MmapStorage`, and registers them
2200    /// in the `vector_spill_storages` map so search can read from them.
2201    #[cfg(all(
2202        feature = "lpg",
2203        feature = "vector-index",
2204        feature = "mmap",
2205        not(feature = "temporal")
2206    ))]
2207    fn restore_spill_files(&mut self) {
2208        use grafeo_core::index::vector::MmapStorage;
2209
2210        let spill_dir = match self.buffer_manager.config().spill_path {
2211            Some(ref path) => path.clone(),
2212            None => return,
2213        };
2214
2215        if !spill_dir.exists() {
2216            return;
2217        }
2218
2219        let spill_map = match self.vector_spill_storages {
2220            Some(ref map) => Arc::clone(map),
2221            None => return,
2222        };
2223
2224        let Ok(entries) = std::fs::read_dir(&spill_dir) else {
2225            return;
2226        };
2227
2228        let Some(ref store) = self.store else {
2229            return;
2230        };
2231
2232        for entry in entries.flatten() {
2233            let path = entry.path();
2234            let file_name = match path.file_name().and_then(|n| n.to_str()) {
2235                Some(name) => name.to_string(),
2236                None => continue,
2237            };
2238
2239            // Match pattern: vectors_{key}.bin where key is percent-encoded
2240            if !file_name.starts_with("vectors_")
2241                || !std::path::Path::new(&file_name)
2242                    .extension()
2243                    .is_some_and(|ext| ext.eq_ignore_ascii_case("bin"))
2244            {
2245                continue;
2246            }
2247
2248            // Extract and decode key: "vectors_Label%3Aembedding.bin" -> "Label:embedding"
2249            let key_part = &file_name["vectors_".len()..file_name.len() - ".bin".len()];
2250
2251            // Percent-decode: %3A -> ':', %25 -> '%'
2252            let key = key_part.replace("%3A", ":").replace("%25", "%");
2253
2254            // Key must contain ':' (label:property format)
2255            if !key.contains(':') {
2256                // Legacy file with old encoding, skip (will be re-created on next spill)
2257                continue;
2258            }
2259
2260            // Only restore if the corresponding vector index exists
2261            if store.get_vector_index_by_key(&key).is_none() {
2262                // Stale spill file (index was dropped), clean it up
2263                let _ = std::fs::remove_file(&path);
2264                continue;
2265            }
2266
2267            // Open the MmapStorage
2268            match MmapStorage::open(&path) {
2269                Ok(mmap_storage) => {
2270                    // Mark the property column as spilled so get() returns None
2271                    let property = key.split(':').nth(1).unwrap_or("");
2272                    let prop_key = grafeo_common::types::PropertyKey::new(property);
2273                    store.node_properties_mark_spilled(&prop_key);
2274
2275                    spill_map.write().insert(key, Arc::new(mmap_storage));
2276                }
2277                Err(e) => {
2278                    eprintln!("failed to restore spill file {}: {e}", path.display());
2279                    // Remove corrupt spill file
2280                    let _ = std::fs::remove_file(&path);
2281                }
2282            }
2283        }
2284    }
2285
2286    /// Builds section objects for the current database state.
2287    #[cfg(feature = "grafeo-file")]
2288    fn build_sections(&self) -> Vec<Box<dyn grafeo_common::storage::Section>> {
2289        let mut sections: Vec<Box<dyn grafeo_common::storage::Section>> = Vec::new();
2290
2291        // Layered store: serialize both the compact base and the overlay.
2292        #[cfg(all(feature = "compact-store", feature = "lpg"))]
2293        if let Some(ref layered) = self.layered_store {
2294            // Compact base section.
2295            let compact_section = grafeo_core::graph::compact::section::CompactStoreSection::new(
2296                layered.base_store_arc(),
2297            );
2298            sections.push(Box::new(compact_section));
2299
2300            // Overlay LPG section.
2301            let overlay = layered.overlay_store();
2302            let overlay_section =
2303                grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(overlay));
2304            sections.push(Box::new(overlay_section));
2305
2306            return sections;
2307        }
2308
2309        // LPG sections: store, catalog, vector indexes, text indexes
2310        #[cfg(feature = "lpg")]
2311        if let Some(store) = self.store.as_ref() {
2312            let lpg = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
2313
2314            let catalog = catalog_section::CatalogSection::new(
2315                Arc::clone(&self.catalog),
2316                Arc::clone(store),
2317                {
2318                    let tm = Arc::clone(&self.transaction_manager);
2319                    move || tm.current_epoch().as_u64()
2320                },
2321            );
2322
2323            sections.push(Box::new(catalog));
2324            sections.push(Box::new(lpg));
2325
2326            // Vector indexes: persist HNSW topology to avoid rebuild on load
2327            #[cfg(feature = "vector-index")]
2328            {
2329                let indexes = store.vector_index_entries();
2330                if !indexes.is_empty() {
2331                    let vector = grafeo_core::index::vector::VectorStoreSection::new(indexes);
2332                    sections.push(Box::new(vector));
2333                }
2334            }
2335
2336            // Text indexes: persist BM25 postings to avoid rebuild on load
2337            #[cfg(feature = "text-index")]
2338            {
2339                let indexes = store.text_index_entries();
2340                if !indexes.is_empty() {
2341                    let text = grafeo_core::index::text::TextIndexSection::new(indexes);
2342                    sections.push(Box::new(text));
2343                }
2344            }
2345        }
2346
2347        #[cfg(feature = "triple-store")]
2348        if !self.rdf_store.is_empty() || self.rdf_store.graph_count() > 0 {
2349            let rdf = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(&self.rdf_store));
2350            sections.push(Box::new(rdf));
2351        }
2352
2353        #[cfg(feature = "ring-index")]
2354        if self.rdf_store.ring().is_some() {
2355            let ring = grafeo_core::index::ring::RdfRingSection::new(Arc::clone(&self.rdf_store));
2356            sections.push(Box::new(ring));
2357        }
2358
2359        sections
2360    }
2361
2362    // =========================================================================
2363    // Backup API
2364    // =========================================================================
2365
2366    /// Creates a full backup of the database in the given directory.
2367    ///
2368    /// Checkpoints the database, copies the `.grafeo` file, and creates a
2369    /// backup manifest. Subsequent incremental backups will use this as the
2370    /// base.
2371    ///
2372    /// # Errors
2373    ///
2374    /// Returns an error if the database has no file manager or I/O fails.
2375    #[cfg(all(feature = "wal", feature = "grafeo-file", feature = "lpg"))]
2376    pub fn backup_full(&self, backup_dir: &std::path::Path) -> Result<backup::BackupSegment> {
2377        let fm = self
2378            .file_manager
2379            .as_ref()
2380            .ok_or_else(|| Error::Internal("backup requires a persistent database".to_string()))?;
2381
2382        // Checkpoint to ensure the container has the latest data.
2383        // Skip for read-only databases: the on-disk file is already a valid
2384        // snapshot and the file manager rejects writes.
2385        if !self.read_only {
2386            let _ = self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?;
2387        }
2388
2389        let current_epoch = self.transaction_manager.current_epoch();
2390        backup::do_backup_full(backup_dir, fm, self.wal.as_deref(), current_epoch)
2391    }
2392
2393    /// Creates an incremental backup containing WAL records since the last backup.
2394    ///
2395    /// Requires a prior full backup in the backup directory.
2396    ///
2397    /// # Errors
2398    ///
2399    /// Returns an error if no full backup exists, or if the WAL has no new records.
2400    #[cfg(all(feature = "wal", feature = "grafeo-file", feature = "lpg"))]
2401    pub fn backup_incremental(
2402        &self,
2403        backup_dir: &std::path::Path,
2404    ) -> Result<backup::BackupSegment> {
2405        let wal = self
2406            .wal
2407            .as_ref()
2408            .ok_or_else(|| Error::Internal("incremental backup requires WAL".to_string()))?;
2409
2410        let current_epoch = self.transaction_manager.current_epoch();
2411        backup::do_backup_incremental(backup_dir, wal, current_epoch)
2412    }
2413
2414    /// Returns the backup manifest for a backup directory, if one exists.
2415    ///
2416    /// # Errors
2417    ///
2418    /// Returns an error if the manifest file exists but cannot be parsed.
2419    #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2420    pub fn read_backup_manifest(
2421        backup_dir: &std::path::Path,
2422    ) -> Result<Option<backup::BackupManifest>> {
2423        backup::read_manifest(backup_dir)
2424    }
2425
2426    /// Returns the current backup cursor (last backed-up position), if any.
2427    #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2428    #[must_use]
2429    pub fn backup_cursor(&self) -> Option<backup::BackupCursor> {
2430        self.wal
2431            .as_ref()
2432            .and_then(|wal| backup::read_backup_cursor(wal.dir()).ok().flatten())
2433    }
2434
2435    /// Restores a database from a backup chain to a specific epoch.
2436    ///
2437    /// Copies the full backup to `output_path`, then replays incremental
2438    /// WAL segments up to `target_epoch`. The restored database can be
2439    /// opened with [`GrafeoDB::open`].
2440    ///
2441    /// # Errors
2442    ///
2443    /// Returns an error if the backup chain does not cover the target epoch,
2444    /// segment checksums fail, or I/O fails.
2445    #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2446    pub fn restore_to_epoch(
2447        backup_dir: &std::path::Path,
2448        target_epoch: grafeo_common::types::EpochId,
2449        output_path: &std::path::Path,
2450    ) -> Result<()> {
2451        backup::do_restore_to_epoch(backup_dir, target_epoch, output_path)
2452    }
2453
2454    /// Writes the current database state to the `.grafeo` file using the unified flush.
2455    ///
2456    /// Does NOT remove the sidecar WAL: callers that want to clean up
2457    /// the sidecar (e.g. `close()`) should call `fm.remove_sidecar_wal()`
2458    /// separately after this returns.
2459    #[cfg(feature = "grafeo-file")]
2460    fn checkpoint_to_file(
2461        &self,
2462        fm: &GrafeoFileManager,
2463        reason: flush::FlushReason,
2464    ) -> Result<flush::FlushResult> {
2465        let sections = self.build_sections();
2466        let section_refs: Vec<&dyn grafeo_common::storage::Section> =
2467            sections.iter().map(|s| s.as_ref()).collect();
2468        #[cfg(feature = "lpg")]
2469        let context = flush::build_context(self.lpg_store(), &self.transaction_manager);
2470        #[cfg(not(feature = "lpg"))]
2471        let context = flush::build_context_minimal(&self.transaction_manager);
2472
2473        flush::flush(
2474            fm,
2475            &section_refs,
2476            &context,
2477            reason,
2478            #[cfg(feature = "wal")]
2479            self.wal.as_deref(),
2480        )
2481    }
2482
2483    /// Returns the file manager if using single-file format.
2484    #[cfg(feature = "grafeo-file")]
2485    #[must_use]
2486    pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
2487        self.file_manager.as_ref()
2488    }
2489}
2490
2491impl Drop for GrafeoDB {
2492    fn drop(&mut self) {
2493        if let Err(e) = self.close() {
2494            grafeo_error!("Error closing database: {}", e);
2495        }
2496    }
2497}
2498
2499#[cfg(feature = "lpg")]
2500impl crate::admin::AdminService for GrafeoDB {
2501    fn info(&self) -> crate::admin::DatabaseInfo {
2502        self.info()
2503    }
2504
2505    fn detailed_stats(&self) -> crate::admin::DatabaseStats {
2506        self.detailed_stats()
2507    }
2508
2509    fn schema(&self) -> crate::admin::SchemaInfo {
2510        self.schema()
2511    }
2512
2513    fn validate(&self) -> crate::admin::ValidationResult {
2514        self.validate()
2515    }
2516
2517    fn wal_status(&self) -> crate::admin::WalStatus {
2518        self.wal_status()
2519    }
2520
2521    fn wal_checkpoint(&self) -> Result<()> {
2522        self.wal_checkpoint()
2523    }
2524}
2525
2526// =========================================================================
2527// Query Result Types
2528// =========================================================================
2529
2530/// The result of running a query.
2531///
2532/// Contains rows and columns, like a table. Use [`iter()`](Self::iter) to
2533/// loop through rows, or [`scalar()`](Self::scalar) if you expect a single value.
2534///
2535/// # Examples
2536///
2537/// ```
2538/// use grafeo_engine::GrafeoDB;
2539///
2540/// let db = GrafeoDB::new_in_memory();
2541/// db.create_node(&["Person"]);
2542///
2543/// let result = db.execute("MATCH (p:Person) RETURN count(p) AS total")?;
2544///
2545/// // Check what we got
2546/// println!("Columns: {:?}", result.columns);
2547/// println!("Rows: {}", result.row_count());
2548///
2549/// // Iterate through results
2550/// for row in result.iter() {
2551///     println!("{:?}", row);
2552/// }
2553/// # Ok::<(), grafeo_common::utils::error::Error>(())
2554/// ```
2555#[derive(Debug)]
2556pub struct QueryResult {
2557    /// Column names from the RETURN clause.
2558    pub columns: Vec<String>,
2559    /// Column types - useful for distinguishing NodeId/EdgeId from plain integers.
2560    pub column_types: Vec<grafeo_common::types::LogicalType>,
2561    /// The actual result rows.
2562    ///
2563    /// Use [`rows()`](Self::rows) for borrowed access or
2564    /// [`into_rows()`](Self::into_rows) to take ownership.
2565    pub(crate) rows: Vec<Vec<grafeo_common::types::Value>>,
2566    /// Query execution time in milliseconds (if timing was enabled).
2567    pub execution_time_ms: Option<f64>,
2568    /// Number of rows scanned during query execution (estimate).
2569    pub rows_scanned: Option<u64>,
2570    /// Status message for DDL and session commands (e.g., "Created node type 'Person'").
2571    pub status_message: Option<String>,
2572    /// GQLSTATUS code per ISO/IEC 39075:2024, sec 23.
2573    pub gql_status: grafeo_common::utils::GqlStatus,
2574}
2575
2576impl QueryResult {
2577    /// Creates a fully empty query result (no columns, no rows).
2578    #[must_use]
2579    pub fn empty() -> Self {
2580        Self {
2581            columns: Vec::new(),
2582            column_types: Vec::new(),
2583            rows: Vec::new(),
2584            execution_time_ms: None,
2585            rows_scanned: None,
2586            status_message: None,
2587            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2588        }
2589    }
2590
2591    /// Creates a query result with only a status message (for DDL commands).
2592    #[must_use]
2593    pub fn status(msg: impl Into<String>) -> Self {
2594        Self {
2595            columns: Vec::new(),
2596            column_types: Vec::new(),
2597            rows: Vec::new(),
2598            execution_time_ms: None,
2599            rows_scanned: None,
2600            status_message: Some(msg.into()),
2601            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2602        }
2603    }
2604
2605    /// Creates a new empty query result.
2606    #[must_use]
2607    pub fn new(columns: Vec<String>) -> Self {
2608        let len = columns.len();
2609        Self {
2610            columns,
2611            column_types: vec![grafeo_common::types::LogicalType::Any; len],
2612            rows: Vec::new(),
2613            execution_time_ms: None,
2614            rows_scanned: None,
2615            status_message: None,
2616            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2617        }
2618    }
2619
2620    /// Creates a new empty query result with column types.
2621    #[must_use]
2622    pub fn with_types(
2623        columns: Vec<String>,
2624        column_types: Vec<grafeo_common::types::LogicalType>,
2625    ) -> Self {
2626        Self {
2627            columns,
2628            column_types,
2629            rows: Vec::new(),
2630            execution_time_ms: None,
2631            rows_scanned: None,
2632            status_message: None,
2633            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2634        }
2635    }
2636
2637    /// Creates a query result with pre-populated rows.
2638    #[must_use]
2639    pub fn from_rows(columns: Vec<String>, rows: Vec<Vec<grafeo_common::types::Value>>) -> Self {
2640        let len = columns.len();
2641        Self {
2642            columns,
2643            column_types: vec![grafeo_common::types::LogicalType::Any; len],
2644            rows,
2645            execution_time_ms: None,
2646            rows_scanned: None,
2647            status_message: None,
2648            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2649        }
2650    }
2651
2652    /// Appends a row to this result.
2653    pub fn push_row(&mut self, row: Vec<grafeo_common::types::Value>) {
2654        self.rows.push(row);
2655    }
2656
2657    /// Sets the execution metrics on this result.
2658    pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
2659        self.execution_time_ms = Some(execution_time_ms);
2660        self.rows_scanned = Some(rows_scanned);
2661        self
2662    }
2663
2664    /// Returns the execution time in milliseconds, if available.
2665    #[must_use]
2666    pub fn execution_time_ms(&self) -> Option<f64> {
2667        self.execution_time_ms
2668    }
2669
2670    /// Returns the number of rows scanned, if available.
2671    #[must_use]
2672    pub fn rows_scanned(&self) -> Option<u64> {
2673        self.rows_scanned
2674    }
2675
2676    /// Returns the number of rows.
2677    #[must_use]
2678    pub fn row_count(&self) -> usize {
2679        self.rows.len()
2680    }
2681
2682    /// Returns the number of columns.
2683    #[must_use]
2684    pub fn column_count(&self) -> usize {
2685        self.columns.len()
2686    }
2687
2688    /// Returns true if the result is empty.
2689    #[must_use]
2690    pub fn is_empty(&self) -> bool {
2691        self.rows.is_empty()
2692    }
2693
2694    /// Extracts a single value from the result.
2695    ///
2696    /// Use this when your query returns exactly one row with one column,
2697    /// like `RETURN count(n)` or `RETURN sum(p.amount)`.
2698    ///
2699    /// # Errors
2700    ///
2701    /// Returns an error if the result has multiple rows or columns.
2702    pub fn scalar<T: FromValue>(&self) -> Result<T> {
2703        if self.rows.len() != 1 || self.columns.len() != 1 {
2704            return Err(grafeo_common::utils::error::Error::InvalidValue(
2705                "Expected single value".to_string(),
2706            ));
2707        }
2708        T::from_value(&self.rows[0][0])
2709    }
2710
2711    /// Returns a slice of all result rows.
2712    #[must_use]
2713    pub fn rows(&self) -> &[Vec<grafeo_common::types::Value>] {
2714        &self.rows
2715    }
2716
2717    /// Takes ownership of all result rows.
2718    #[must_use]
2719    pub fn into_rows(self) -> Vec<Vec<grafeo_common::types::Value>> {
2720        self.rows
2721    }
2722
2723    /// Returns an iterator over the rows.
2724    pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
2725        self.rows.iter()
2726    }
2727
2728    /// Converts this query result to an Arrow [`RecordBatch`](arrow_array::RecordBatch).
2729    ///
2730    /// Each column in the result becomes an Arrow array. Type mapping:
2731    /// - `Int64` / `Float64` / `Bool` / `String` / `Bytes`: direct Arrow equivalents
2732    /// - `Timestamp` / `ZonedDatetime`: `Timestamp(Microsecond, UTC)`
2733    /// - `Date`: `Date32`, `Time`: `Time64(Nanosecond)`
2734    /// - `Vector`: `FixedSizeList(Float32, dim)`
2735    /// - `Duration` / `List` / `Map` / `Path`: serialized as `Utf8`
2736    ///
2737    /// Heterogeneous columns (mixed types) fall back to `Utf8`.
2738    ///
2739    /// # Errors
2740    ///
2741    /// Returns [`ArrowExportError`](arrow::ArrowExportError) if Arrow array construction fails.
2742    #[cfg(feature = "arrow-export")]
2743    pub fn to_record_batch(
2744        &self,
2745    ) -> std::result::Result<arrow_array::RecordBatch, arrow::ArrowExportError> {
2746        arrow::query_result_to_record_batch(&self.columns, &self.column_types, &self.rows)
2747    }
2748
2749    /// Serializes this query result as Arrow IPC stream bytes.
2750    ///
2751    /// The returned bytes can be read by any Arrow implementation:
2752    /// - Python: `pyarrow.ipc.open_stream(buf).read_all()`
2753    /// - Polars: `pl.read_ipc(buf)`
2754    /// - Node.js: `apache-arrow` `RecordBatchStreamReader`
2755    ///
2756    /// # Errors
2757    ///
2758    /// Returns [`ArrowExportError`](arrow::ArrowExportError) on conversion or serialization failure.
2759    #[cfg(feature = "arrow-export")]
2760    pub fn to_arrow_ipc(&self) -> std::result::Result<Vec<u8>, arrow::ArrowExportError> {
2761        let batch = self.to_record_batch()?;
2762        arrow::record_batch_to_ipc_stream(&batch)
2763    }
2764}
2765
2766impl std::fmt::Display for QueryResult {
2767    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2768        let table = grafeo_common::fmt::format_result_table(
2769            &self.columns,
2770            &self.rows,
2771            self.execution_time_ms,
2772            self.status_message.as_deref(),
2773        );
2774        f.write_str(&table)
2775    }
2776}
2777
2778/// Converts a [`grafeo_common::types::Value`] to a concrete Rust type.
2779///
2780/// Implemented for common types like `i64`, `f64`, `String`, and `bool`.
2781/// Used by [`QueryResult::scalar()`] to extract typed values.
2782pub trait FromValue: Sized {
2783    /// Attempts the conversion, returning an error on type mismatch.
2784    ///
2785    /// # Errors
2786    ///
2787    /// Returns `Error::TypeMismatch` if the value is not the expected type.
2788    fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
2789}
2790
2791impl FromValue for i64 {
2792    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2793        value
2794            .as_int64()
2795            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2796                expected: "INT64".to_string(),
2797                found: value.type_name().to_string(),
2798            })
2799    }
2800}
2801
2802impl FromValue for f64 {
2803    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2804        value
2805            .as_float64()
2806            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2807                expected: "FLOAT64".to_string(),
2808                found: value.type_name().to_string(),
2809            })
2810    }
2811}
2812
2813impl FromValue for String {
2814    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2815        value.as_str().map(String::from).ok_or_else(|| {
2816            grafeo_common::utils::error::Error::TypeMismatch {
2817                expected: "STRING".to_string(),
2818                found: value.type_name().to_string(),
2819            }
2820        })
2821    }
2822}
2823
2824impl FromValue for bool {
2825    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2826        value
2827            .as_bool()
2828            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2829                expected: "BOOL".to_string(),
2830                found: value.type_name().to_string(),
2831            })
2832    }
2833}
2834
2835#[cfg(test)]
2836mod tests {
2837    use super::*;
2838
2839    #[test]
2840    fn test_create_in_memory_database() {
2841        let db = GrafeoDB::new_in_memory();
2842        assert_eq!(db.node_count(), 0);
2843        assert_eq!(db.edge_count(), 0);
2844    }
2845
2846    #[test]
2847    fn test_database_config() {
2848        let config = Config::in_memory().with_threads(4).with_query_logging();
2849
2850        let db = GrafeoDB::with_config(config).unwrap();
2851        assert_eq!(db.config().threads, 4);
2852        assert!(db.config().query_logging);
2853    }
2854
2855    #[test]
2856    fn test_database_session() {
2857        let db = GrafeoDB::new_in_memory();
2858        let _session = db.session();
2859        // Session should be created successfully
2860    }
2861
2862    #[cfg(feature = "wal")]
2863    #[test]
2864    fn test_persistent_database_recovery() {
2865        use grafeo_common::types::Value;
2866        use tempfile::tempdir;
2867
2868        let dir = tempdir().unwrap();
2869        let db_path = dir.path().join("test_db");
2870
2871        // Create database and add some data
2872        {
2873            let db = GrafeoDB::open(&db_path).unwrap();
2874
2875            let alix = db.create_node(&["Person"]);
2876            db.set_node_property(alix, "name", Value::from("Alix"));
2877
2878            let gus = db.create_node(&["Person"]);
2879            db.set_node_property(gus, "name", Value::from("Gus"));
2880
2881            let _edge = db.create_edge(alix, gus, "KNOWS");
2882
2883            // Explicitly close to flush WAL
2884            db.close().unwrap();
2885        }
2886
2887        // Reopen and verify data was recovered
2888        {
2889            let db = GrafeoDB::open(&db_path).unwrap();
2890
2891            assert_eq!(db.node_count(), 2);
2892            assert_eq!(db.edge_count(), 1);
2893
2894            // Verify nodes exist
2895            let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
2896            assert!(node0.is_some());
2897
2898            let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
2899            assert!(node1.is_some());
2900        }
2901    }
2902
2903    #[cfg(feature = "wal")]
2904    #[test]
2905    fn test_wal_logging() {
2906        use tempfile::tempdir;
2907
2908        let dir = tempdir().unwrap();
2909        let db_path = dir.path().join("wal_test_db");
2910
2911        let db = GrafeoDB::open(&db_path).unwrap();
2912
2913        // Create some data
2914        let node = db.create_node(&["Test"]);
2915        db.delete_node(node);
2916
2917        // WAL should have records
2918        if let Some(wal) = db.wal() {
2919            assert!(wal.record_count() > 0);
2920        }
2921
2922        db.close().unwrap();
2923    }
2924
2925    #[cfg(feature = "wal")]
2926    #[test]
2927    fn test_wal_recovery_multiple_sessions() {
2928        // Tests that WAL recovery works correctly across multiple open/close cycles
2929        use grafeo_common::types::Value;
2930        use tempfile::tempdir;
2931
2932        let dir = tempdir().unwrap();
2933        let db_path = dir.path().join("multi_session_db");
2934
2935        // Session 1: Create initial data
2936        {
2937            let db = GrafeoDB::open(&db_path).unwrap();
2938            let alix = db.create_node(&["Person"]);
2939            db.set_node_property(alix, "name", Value::from("Alix"));
2940            db.close().unwrap();
2941        }
2942
2943        // Session 2: Add more data
2944        {
2945            let db = GrafeoDB::open(&db_path).unwrap();
2946            assert_eq!(db.node_count(), 1); // Previous data recovered
2947            let gus = db.create_node(&["Person"]);
2948            db.set_node_property(gus, "name", Value::from("Gus"));
2949            db.close().unwrap();
2950        }
2951
2952        // Session 3: Verify all data
2953        {
2954            let db = GrafeoDB::open(&db_path).unwrap();
2955            assert_eq!(db.node_count(), 2);
2956
2957            // Verify properties were recovered correctly
2958            let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
2959            assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
2960
2961            let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
2962            assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
2963        }
2964    }
2965
2966    #[cfg(feature = "wal")]
2967    #[test]
2968    fn test_database_consistency_after_mutations() {
2969        // Tests that database remains consistent after a series of create/delete operations
2970        use grafeo_common::types::Value;
2971        use tempfile::tempdir;
2972
2973        let dir = tempdir().unwrap();
2974        let db_path = dir.path().join("consistency_db");
2975
2976        {
2977            let db = GrafeoDB::open(&db_path).unwrap();
2978
2979            // Create nodes
2980            let a = db.create_node(&["Node"]);
2981            let b = db.create_node(&["Node"]);
2982            let c = db.create_node(&["Node"]);
2983
2984            // Create edges
2985            let e1 = db.create_edge(a, b, "LINKS");
2986            let _e2 = db.create_edge(b, c, "LINKS");
2987
2988            // Delete middle node and its edge
2989            db.delete_edge(e1);
2990            db.delete_node(b);
2991
2992            // Set properties on remaining nodes
2993            db.set_node_property(a, "value", Value::Int64(1));
2994            db.set_node_property(c, "value", Value::Int64(3));
2995
2996            db.close().unwrap();
2997        }
2998
2999        // Reopen and verify consistency
3000        {
3001            let db = GrafeoDB::open(&db_path).unwrap();
3002
3003            // Should have 2 nodes (a and c), b was deleted
3004            // Note: node_count includes deleted nodes in some implementations
3005            // What matters is that the non-deleted nodes are accessible
3006            let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
3007            assert!(node_a.is_some());
3008
3009            let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
3010            assert!(node_c.is_some());
3011
3012            // Middle node should be deleted
3013            let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
3014            assert!(node_b.is_none());
3015        }
3016    }
3017
3018    #[cfg(feature = "wal")]
3019    #[test]
3020    fn test_close_is_idempotent() {
3021        // Calling close() multiple times should not cause errors
3022        use tempfile::tempdir;
3023
3024        let dir = tempdir().unwrap();
3025        let db_path = dir.path().join("close_test_db");
3026
3027        let db = GrafeoDB::open(&db_path).unwrap();
3028        db.create_node(&["Test"]);
3029
3030        // First close should succeed
3031        assert!(db.close().is_ok());
3032
3033        // Second close should also succeed (idempotent)
3034        assert!(db.close().is_ok());
3035    }
3036
3037    #[test]
3038    fn test_with_store_external_backend() {
3039        use grafeo_core::graph::lpg::LpgStore;
3040
3041        let external = Arc::new(LpgStore::new().unwrap());
3042
3043        // Seed data on the external store directly
3044        let n1 = external.create_node(&["Person"]);
3045        external.set_node_property(n1, "name", grafeo_common::types::Value::from("Alix"));
3046
3047        let db = GrafeoDB::with_store(
3048            Arc::clone(&external) as Arc<dyn GraphStoreMut>,
3049            Config::in_memory(),
3050        )
3051        .unwrap();
3052
3053        let session = db.session();
3054
3055        // Session should see data from the external store via execute
3056        #[cfg(feature = "gql")]
3057        {
3058            let result = session.execute("MATCH (p:Person) RETURN p.name").unwrap();
3059            assert_eq!(result.rows.len(), 1);
3060        }
3061    }
3062
3063    #[test]
3064    fn test_with_config_custom_memory_limit() {
3065        let config = Config::in_memory().with_memory_limit(64 * 1024 * 1024); // 64 MB
3066
3067        let db = GrafeoDB::with_config(config).unwrap();
3068        assert_eq!(db.config().memory_limit, Some(64 * 1024 * 1024));
3069        assert_eq!(db.node_count(), 0);
3070    }
3071
3072    #[cfg(feature = "metrics")]
3073    #[test]
3074    fn test_database_metrics_registry() {
3075        let db = GrafeoDB::new_in_memory();
3076
3077        // Perform some operations
3078        db.create_node(&["Person"]);
3079        db.create_node(&["Person"]);
3080
3081        // Check that metrics snapshot returns data
3082        let snap = db.metrics();
3083        // Session created counter should reflect at least 0 (metrics is initialized)
3084        assert_eq!(snap.query_count, 0); // No queries executed yet
3085    }
3086
3087    #[test]
3088    fn test_query_result_has_metrics() {
3089        // Verifies that query results include execution metrics
3090        let db = GrafeoDB::new_in_memory();
3091        db.create_node(&["Person"]);
3092        db.create_node(&["Person"]);
3093
3094        #[cfg(feature = "gql")]
3095        {
3096            let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
3097
3098            // Metrics should be populated
3099            assert!(result.execution_time_ms.is_some());
3100            assert!(result.rows_scanned.is_some());
3101            assert!(result.execution_time_ms.unwrap() >= 0.0);
3102            assert_eq!(result.rows_scanned.unwrap(), 2);
3103        }
3104    }
3105
3106    #[test]
3107    fn test_empty_query_result_metrics() {
3108        // Verifies metrics are correct for queries returning no results
3109        let db = GrafeoDB::new_in_memory();
3110        db.create_node(&["Person"]);
3111
3112        #[cfg(feature = "gql")]
3113        {
3114            // Query that matches nothing
3115            let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
3116
3117            assert!(result.execution_time_ms.is_some());
3118            assert!(result.rows_scanned.is_some());
3119            assert_eq!(result.rows_scanned.unwrap(), 0);
3120        }
3121    }
3122
3123    #[cfg(feature = "cdc")]
3124    mod cdc_integration {
3125        use super::*;
3126
3127        /// Helper: creates an in-memory database with CDC enabled.
3128        fn cdc_db() -> GrafeoDB {
3129            GrafeoDB::with_config(Config::in_memory().with_cdc()).unwrap()
3130        }
3131
3132        #[test]
3133        fn test_node_lifecycle_history() {
3134            let db = cdc_db();
3135
3136            // Create
3137            let id = db.create_node(&["Person"]);
3138            // Update
3139            db.set_node_property(id, "name", "Alix".into());
3140            db.set_node_property(id, "name", "Gus".into());
3141            // Delete
3142            db.delete_node(id);
3143
3144            let history = db.history(id).unwrap();
3145            assert_eq!(history.len(), 4); // create + 2 updates + delete
3146            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3147            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
3148            assert!(history[1].before.is_none()); // first set_node_property has no prior value
3149            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
3150            assert!(history[2].before.is_some()); // second update has prior "Alix"
3151            assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
3152        }
3153
3154        #[test]
3155        fn test_edge_lifecycle_history() {
3156            let db = cdc_db();
3157
3158            let alix = db.create_node(&["Person"]);
3159            let gus = db.create_node(&["Person"]);
3160            let edge = db.create_edge(alix, gus, "KNOWS");
3161            db.set_edge_property(edge, "since", 2024i64.into());
3162            db.delete_edge(edge);
3163
3164            let history = db.history(edge).unwrap();
3165            assert_eq!(history.len(), 3); // create + update + delete
3166            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3167            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
3168            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
3169        }
3170
3171        #[test]
3172        fn test_create_node_with_props_cdc() {
3173            let db = cdc_db();
3174
3175            let id = db.create_node_with_props(
3176                &["Person"],
3177                vec![
3178                    ("name", grafeo_common::types::Value::from("Alix")),
3179                    ("age", grafeo_common::types::Value::from(30i64)),
3180                ],
3181            );
3182
3183            let history = db.history(id).unwrap();
3184            assert_eq!(history.len(), 1);
3185            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3186            // Props should be captured
3187            let after = history[0].after.as_ref().unwrap();
3188            assert_eq!(after.len(), 2);
3189        }
3190
3191        #[test]
3192        fn test_changes_between() {
3193            let db = cdc_db();
3194
3195            let id1 = db.create_node(&["A"]);
3196            let _id2 = db.create_node(&["B"]);
3197            db.set_node_property(id1, "x", 1i64.into());
3198
3199            // All events should be at the same epoch (in-memory, epoch doesn't advance without tx)
3200            let changes = db
3201                .changes_between(
3202                    grafeo_common::types::EpochId(0),
3203                    grafeo_common::types::EpochId(u64::MAX),
3204                )
3205                .unwrap();
3206            assert_eq!(changes.len(), 3); // 2 creates + 1 update
3207        }
3208
3209        #[test]
3210        fn test_cdc_disabled_by_default() {
3211            let db = GrafeoDB::new_in_memory();
3212            assert!(!db.is_cdc_enabled());
3213
3214            let id = db.create_node(&["Person"]);
3215            db.set_node_property(id, "name", "Alix".into());
3216
3217            let history = db.history(id).unwrap();
3218            assert!(history.is_empty(), "CDC off by default: no events recorded");
3219        }
3220
3221        #[test]
3222        fn test_session_with_cdc_override_on() {
3223            // Database default is off, but session opts in
3224            let db = GrafeoDB::new_in_memory();
3225            let session = db.session_with_cdc(true);
3226            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3227            // The CDC log should have events from the opted-in session
3228            let changes = db
3229                .changes_between(
3230                    grafeo_common::types::EpochId(0),
3231                    grafeo_common::types::EpochId(u64::MAX),
3232                )
3233                .unwrap();
3234            assert!(
3235                !changes.is_empty(),
3236                "session_with_cdc(true) should record events"
3237            );
3238        }
3239
3240        #[test]
3241        fn test_session_with_cdc_override_off() {
3242            // Database default is on, but session opts out
3243            let db = cdc_db();
3244            let session = db.session_with_cdc(false);
3245            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3246            let changes = db
3247                .changes_between(
3248                    grafeo_common::types::EpochId(0),
3249                    grafeo_common::types::EpochId(u64::MAX),
3250                )
3251                .unwrap();
3252            assert!(
3253                changes.is_empty(),
3254                "session_with_cdc(false) should not record events"
3255            );
3256        }
3257
3258        #[test]
3259        fn test_set_cdc_enabled_runtime() {
3260            let db = GrafeoDB::new_in_memory();
3261            assert!(!db.is_cdc_enabled());
3262
3263            // Enable at runtime
3264            db.set_cdc_enabled(true);
3265            assert!(db.is_cdc_enabled());
3266
3267            let id = db.create_node(&["Person"]);
3268            let history = db.history(id).unwrap();
3269            assert_eq!(history.len(), 1, "CDC enabled at runtime records events");
3270
3271            // Disable again
3272            db.set_cdc_enabled(false);
3273            let id2 = db.create_node(&["Person"]);
3274            let history2 = db.history(id2).unwrap();
3275            assert!(
3276                history2.is_empty(),
3277                "CDC disabled at runtime stops recording"
3278            );
3279        }
3280    }
3281
3282    #[test]
3283    fn test_with_store_basic() {
3284        use grafeo_core::graph::lpg::LpgStore;
3285
3286        let store = Arc::new(LpgStore::new().unwrap());
3287        let n1 = store.create_node(&["Person"]);
3288        store.set_node_property(n1, "name", "Alix".into());
3289
3290        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
3291        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
3292
3293        let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
3294        assert_eq!(result.rows.len(), 1);
3295    }
3296
3297    #[test]
3298    fn test_with_store_session() {
3299        use grafeo_core::graph::lpg::LpgStore;
3300
3301        let store = Arc::new(LpgStore::new().unwrap());
3302        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
3303        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
3304
3305        let session = db.session();
3306        let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
3307        assert_eq!(result.rows.len(), 1);
3308    }
3309
3310    #[test]
3311    fn test_with_store_mutations() {
3312        use grafeo_core::graph::lpg::LpgStore;
3313
3314        let store = Arc::new(LpgStore::new().unwrap());
3315        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
3316        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
3317
3318        let mut session = db.session();
3319
3320        // Use an explicit transaction so INSERT and MATCH share the same
3321        // transaction context. With PENDING epochs, uncommitted versions are
3322        // only visible to the owning transaction.
3323        session.begin_transaction().unwrap();
3324        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3325
3326        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
3327        assert_eq!(result.rows.len(), 1);
3328
3329        session.commit().unwrap();
3330    }
3331
3332    // =========================================================================
3333    // QueryResult tests
3334    // =========================================================================
3335
3336    #[test]
3337    fn test_query_result_empty() {
3338        let result = QueryResult::empty();
3339        assert!(result.is_empty());
3340        assert_eq!(result.row_count(), 0);
3341        assert_eq!(result.column_count(), 0);
3342        assert!(result.execution_time_ms().is_none());
3343        assert!(result.rows_scanned().is_none());
3344        assert!(result.status_message.is_none());
3345    }
3346
3347    #[test]
3348    fn test_query_result_status() {
3349        let result = QueryResult::status("Created node type 'Person'");
3350        assert!(result.is_empty());
3351        assert_eq!(result.column_count(), 0);
3352        assert_eq!(
3353            result.status_message.as_deref(),
3354            Some("Created node type 'Person'")
3355        );
3356    }
3357
3358    #[test]
3359    fn test_query_result_new_with_columns() {
3360        let result = QueryResult::new(vec!["name".into(), "age".into()]);
3361        assert_eq!(result.column_count(), 2);
3362        assert_eq!(result.row_count(), 0);
3363        assert!(result.is_empty());
3364        // Column types should default to Any
3365        assert_eq!(
3366            result.column_types,
3367            vec![
3368                grafeo_common::types::LogicalType::Any,
3369                grafeo_common::types::LogicalType::Any
3370            ]
3371        );
3372    }
3373
3374    #[test]
3375    fn test_query_result_with_types() {
3376        use grafeo_common::types::LogicalType;
3377        let result = QueryResult::with_types(
3378            vec!["name".into(), "age".into()],
3379            vec![LogicalType::String, LogicalType::Int64],
3380        );
3381        assert_eq!(result.column_count(), 2);
3382        assert_eq!(result.column_types[0], LogicalType::String);
3383        assert_eq!(result.column_types[1], LogicalType::Int64);
3384    }
3385
3386    #[test]
3387    fn test_query_result_with_metrics() {
3388        let result = QueryResult::new(vec!["x".into()]).with_metrics(42.5, 100);
3389        assert_eq!(result.execution_time_ms(), Some(42.5));
3390        assert_eq!(result.rows_scanned(), Some(100));
3391    }
3392
3393    #[test]
3394    fn test_query_result_scalar_success() {
3395        use grafeo_common::types::Value;
3396        let mut result = QueryResult::new(vec!["count".into()]);
3397        result.rows.push(vec![Value::Int64(42)]);
3398
3399        let val: i64 = result.scalar().unwrap();
3400        assert_eq!(val, 42);
3401    }
3402
3403    #[test]
3404    fn test_query_result_scalar_wrong_shape() {
3405        use grafeo_common::types::Value;
3406        // Multiple rows
3407        let mut result = QueryResult::new(vec!["x".into()]);
3408        result.rows.push(vec![Value::Int64(1)]);
3409        result.rows.push(vec![Value::Int64(2)]);
3410        assert!(result.scalar::<i64>().is_err());
3411
3412        // Multiple columns
3413        let mut result2 = QueryResult::new(vec!["a".into(), "b".into()]);
3414        result2.rows.push(vec![Value::Int64(1), Value::Int64(2)]);
3415        assert!(result2.scalar::<i64>().is_err());
3416
3417        // Empty
3418        let result3 = QueryResult::new(vec!["x".into()]);
3419        assert!(result3.scalar::<i64>().is_err());
3420    }
3421
3422    #[test]
3423    fn test_query_result_iter() {
3424        use grafeo_common::types::Value;
3425        let mut result = QueryResult::new(vec!["x".into()]);
3426        result.rows.push(vec![Value::Int64(1)]);
3427        result.rows.push(vec![Value::Int64(2)]);
3428
3429        let collected: Vec<_> = result.iter().collect();
3430        assert_eq!(collected.len(), 2);
3431    }
3432
3433    #[test]
3434    fn test_query_result_display() {
3435        use grafeo_common::types::Value;
3436        let mut result = QueryResult::new(vec!["name".into()]);
3437        result.rows.push(vec![Value::from("Alix")]);
3438        let display = result.to_string();
3439        assert!(display.contains("name"));
3440        assert!(display.contains("Alix"));
3441    }
3442
3443    // =========================================================================
3444    // FromValue error paths
3445    // =========================================================================
3446
3447    #[test]
3448    fn test_from_value_i64_type_mismatch() {
3449        use grafeo_common::types::Value;
3450        let val = Value::from("not a number");
3451        assert!(i64::from_value(&val).is_err());
3452    }
3453
3454    #[test]
3455    fn test_from_value_f64_type_mismatch() {
3456        use grafeo_common::types::Value;
3457        let val = Value::from("not a float");
3458        assert!(f64::from_value(&val).is_err());
3459    }
3460
3461    #[test]
3462    fn test_from_value_string_type_mismatch() {
3463        use grafeo_common::types::Value;
3464        let val = Value::Int64(42);
3465        assert!(String::from_value(&val).is_err());
3466    }
3467
3468    #[test]
3469    fn test_from_value_bool_type_mismatch() {
3470        use grafeo_common::types::Value;
3471        let val = Value::Int64(1);
3472        assert!(bool::from_value(&val).is_err());
3473    }
3474
3475    #[test]
3476    fn test_from_value_all_success() {
3477        use grafeo_common::types::Value;
3478        assert_eq!(i64::from_value(&Value::Int64(99)).unwrap(), 99);
3479        assert!((f64::from_value(&Value::Float64(2.72)).unwrap() - 2.72).abs() < f64::EPSILON);
3480        assert_eq!(String::from_value(&Value::from("hello")).unwrap(), "hello");
3481        assert!(bool::from_value(&Value::Bool(true)).unwrap());
3482    }
3483
3484    // =========================================================================
3485    // GrafeoDB accessor tests
3486    // =========================================================================
3487
3488    #[test]
3489    fn test_database_is_read_only_false_by_default() {
3490        let db = GrafeoDB::new_in_memory();
3491        assert!(!db.is_read_only());
3492    }
3493
3494    #[test]
3495    fn test_database_graph_model() {
3496        let db = GrafeoDB::new_in_memory();
3497        assert_eq!(db.graph_model(), crate::config::GraphModel::Lpg);
3498    }
3499
3500    #[test]
3501    fn test_database_memory_limit_none_by_default() {
3502        let db = GrafeoDB::new_in_memory();
3503        assert!(db.memory_limit().is_none());
3504    }
3505
3506    #[test]
3507    fn test_database_memory_limit_custom() {
3508        let config = Config::in_memory().with_memory_limit(128 * 1024 * 1024);
3509        let db = GrafeoDB::with_config(config).unwrap();
3510        assert_eq!(db.memory_limit(), Some(128 * 1024 * 1024));
3511    }
3512
3513    #[test]
3514    fn test_database_adaptive_config() {
3515        let db = GrafeoDB::new_in_memory();
3516        let adaptive = db.adaptive_config();
3517        assert!(adaptive.enabled);
3518        assert!((adaptive.threshold - 3.0).abs() < f64::EPSILON);
3519    }
3520
3521    #[test]
3522    fn test_database_buffer_manager() {
3523        let db = GrafeoDB::new_in_memory();
3524        let _bm = db.buffer_manager();
3525        // Just verify it doesn't panic
3526    }
3527
3528    #[test]
3529    fn test_database_query_cache() {
3530        let db = GrafeoDB::new_in_memory();
3531        let _qc = db.query_cache();
3532    }
3533
3534    #[test]
3535    fn test_database_clear_plan_cache() {
3536        let db = GrafeoDB::new_in_memory();
3537        // Execute a query to populate the cache
3538        #[cfg(feature = "gql")]
3539        {
3540            let _ = db.execute("MATCH (n) RETURN count(n)");
3541        }
3542        db.clear_plan_cache();
3543        // No panic means success
3544    }
3545
3546    #[test]
3547    fn test_database_gc() {
3548        let db = GrafeoDB::new_in_memory();
3549        db.create_node(&["Person"]);
3550        db.gc();
3551        // Verify no panic, node still accessible
3552        assert_eq!(db.node_count(), 1);
3553    }
3554
3555    // =========================================================================
3556    // Named graph management
3557    // =========================================================================
3558
3559    #[test]
3560    fn test_create_and_list_graphs() {
3561        let db = GrafeoDB::new_in_memory();
3562        let created = db.create_graph("social").unwrap();
3563        assert!(created);
3564
3565        // Creating same graph again returns false
3566        let created_again = db.create_graph("social").unwrap();
3567        assert!(!created_again);
3568
3569        let names = db.list_graphs();
3570        assert!(names.contains(&"social".to_string()));
3571    }
3572
3573    #[test]
3574    fn test_drop_graph() {
3575        let db = GrafeoDB::new_in_memory();
3576        db.create_graph("temp").unwrap();
3577        assert!(db.drop_graph("temp"));
3578        assert!(!db.drop_graph("temp")); // Already dropped
3579    }
3580
3581    #[test]
3582    fn test_drop_graph_resets_current_graph() {
3583        let db = GrafeoDB::new_in_memory();
3584        db.create_graph("active").unwrap();
3585        db.set_current_graph(Some("active")).unwrap();
3586        assert_eq!(db.current_graph(), Some("active".to_string()));
3587
3588        db.drop_graph("active");
3589        assert_eq!(db.current_graph(), None);
3590    }
3591
3592    // =========================================================================
3593    // Current graph / schema context
3594    // =========================================================================
3595
3596    #[test]
3597    fn test_current_graph_default_none() {
3598        let db = GrafeoDB::new_in_memory();
3599        assert_eq!(db.current_graph(), None);
3600    }
3601
3602    #[test]
3603    fn test_set_current_graph_valid() {
3604        let db = GrafeoDB::new_in_memory();
3605        db.create_graph("social").unwrap();
3606        db.set_current_graph(Some("social")).unwrap();
3607        assert_eq!(db.current_graph(), Some("social".to_string()));
3608    }
3609
3610    #[test]
3611    fn test_set_current_graph_nonexistent() {
3612        let db = GrafeoDB::new_in_memory();
3613        let result = db.set_current_graph(Some("nonexistent"));
3614        assert!(result.is_err());
3615    }
3616
3617    #[test]
3618    fn test_set_current_graph_none_resets() {
3619        let db = GrafeoDB::new_in_memory();
3620        db.create_graph("social").unwrap();
3621        db.set_current_graph(Some("social")).unwrap();
3622        db.set_current_graph(None).unwrap();
3623        assert_eq!(db.current_graph(), None);
3624    }
3625
3626    #[test]
3627    fn test_set_current_graph_default_keyword() {
3628        let db = GrafeoDB::new_in_memory();
3629        // "default" is a special case that always succeeds
3630        db.set_current_graph(Some("default")).unwrap();
3631        assert_eq!(db.current_graph(), Some("default".to_string()));
3632    }
3633
3634    #[test]
3635    fn test_current_schema_default_none() {
3636        let db = GrafeoDB::new_in_memory();
3637        assert_eq!(db.current_schema(), None);
3638    }
3639
3640    #[test]
3641    fn test_set_current_schema_nonexistent() {
3642        let db = GrafeoDB::new_in_memory();
3643        let result = db.set_current_schema(Some("nonexistent"));
3644        assert!(result.is_err());
3645    }
3646
3647    #[test]
3648    fn test_set_current_schema_none_resets() {
3649        let db = GrafeoDB::new_in_memory();
3650        db.set_current_schema(None).unwrap();
3651        assert_eq!(db.current_schema(), None);
3652    }
3653
3654    // =========================================================================
3655    // graph_store / graph_store_mut
3656    // =========================================================================
3657
3658    #[test]
3659    fn test_graph_store_returns_lpg_by_default() {
3660        let db = GrafeoDB::new_in_memory();
3661        db.create_node(&["Person"]);
3662        let store = db.graph_store();
3663        assert_eq!(store.node_count(), 1);
3664    }
3665
3666    #[test]
3667    fn test_graph_store_mut_returns_some_by_default() {
3668        let db = GrafeoDB::new_in_memory();
3669        assert!(db.graph_store_mut().is_some());
3670    }
3671
3672    #[test]
3673    fn test_with_read_store() {
3674        use grafeo_core::graph::lpg::LpgStore;
3675
3676        let store = Arc::new(LpgStore::new().unwrap());
3677        store.create_node(&["Person"]);
3678
3679        let read_store = Arc::clone(&store) as Arc<dyn GraphStoreSearch>;
3680        let db = GrafeoDB::with_read_store(read_store, Config::in_memory()).unwrap();
3681
3682        assert!(db.is_read_only());
3683        assert!(db.graph_store_mut().is_none());
3684
3685        // Read queries should work
3686        let gs = db.graph_store();
3687        assert_eq!(gs.node_count(), 1);
3688    }
3689
3690    #[test]
3691    fn test_with_store_graph_store_methods() {
3692        use grafeo_core::graph::lpg::LpgStore;
3693
3694        let store = Arc::new(LpgStore::new().unwrap());
3695        store.create_node(&["Person"]);
3696
3697        let db = GrafeoDB::with_store(
3698            Arc::clone(&store) as Arc<dyn GraphStoreMut>,
3699            Config::in_memory(),
3700        )
3701        .unwrap();
3702
3703        assert!(!db.is_read_only());
3704        assert!(db.graph_store_mut().is_some());
3705        assert_eq!(db.graph_store().node_count(), 1);
3706    }
3707
3708    // =========================================================================
3709    // session_read_only
3710    // =========================================================================
3711
3712    #[test]
3713    #[allow(deprecated)]
3714    fn test_session_read_only() {
3715        let db = GrafeoDB::new_in_memory();
3716        db.create_node(&["Person"]);
3717
3718        let session = db.session_read_only();
3719        // Read queries should work
3720        #[cfg(feature = "gql")]
3721        {
3722            let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
3723            assert_eq!(result.rows.len(), 1);
3724        }
3725    }
3726
3727    // =========================================================================
3728    // close on in-memory database
3729    // =========================================================================
3730
3731    #[test]
3732    fn test_close_in_memory_database() {
3733        let db = GrafeoDB::new_in_memory();
3734        db.create_node(&["Person"]);
3735        assert!(db.close().is_ok());
3736        // Second close should also be fine (idempotent)
3737        assert!(db.close().is_ok());
3738    }
3739
3740    // =========================================================================
3741    // with_config validation failure
3742    // =========================================================================
3743
3744    #[test]
3745    fn test_with_config_invalid_config_zero_threads() {
3746        let config = Config::in_memory().with_threads(0);
3747        let result = GrafeoDB::with_config(config);
3748        assert!(result.is_err());
3749    }
3750
3751    #[test]
3752    fn test_with_config_invalid_config_zero_memory_limit() {
3753        let config = Config::in_memory().with_memory_limit(0);
3754        let result = GrafeoDB::with_config(config);
3755        assert!(result.is_err());
3756    }
3757
3758    // =========================================================================
3759    // StorageFormat display (for config.rs coverage)
3760    // =========================================================================
3761
3762    #[test]
3763    fn test_storage_format_display() {
3764        use crate::config::StorageFormat;
3765        assert_eq!(StorageFormat::Auto.to_string(), "auto");
3766        assert_eq!(StorageFormat::WalDirectory.to_string(), "wal-directory");
3767        assert_eq!(StorageFormat::SingleFile.to_string(), "single-file");
3768    }
3769
3770    #[test]
3771    fn test_storage_format_default() {
3772        use crate::config::StorageFormat;
3773        assert_eq!(StorageFormat::default(), StorageFormat::Auto);
3774    }
3775
3776    #[test]
3777    fn test_config_with_storage_format() {
3778        use crate::config::StorageFormat;
3779        let config = Config::in_memory().with_storage_format(StorageFormat::SingleFile);
3780        assert_eq!(config.storage_format, StorageFormat::SingleFile);
3781    }
3782
3783    // =========================================================================
3784    // Config CDC
3785    // =========================================================================
3786
3787    #[test]
3788    fn test_config_with_cdc() {
3789        let config = Config::in_memory().with_cdc();
3790        assert!(config.cdc_enabled);
3791    }
3792
3793    #[test]
3794    fn test_config_cdc_default_false() {
3795        let config = Config::in_memory();
3796        assert!(!config.cdc_enabled);
3797    }
3798
3799    // =========================================================================
3800    // ConfigError as std::error::Error
3801    // =========================================================================
3802
3803    #[test]
3804    fn test_config_error_is_error_trait() {
3805        use crate::config::ConfigError;
3806        let err: Box<dyn std::error::Error> = Box::new(ConfigError::ZeroMemoryLimit);
3807        assert!(err.source().is_none());
3808    }
3809
3810    // =========================================================================
3811    // Metrics tests
3812    // =========================================================================
3813
3814    #[cfg(feature = "metrics")]
3815    #[test]
3816    fn test_metrics_prometheus_output() {
3817        let db = GrafeoDB::new_in_memory();
3818        let prom = db.metrics_prometheus();
3819        // Should contain at least some metric names
3820        assert!(!prom.is_empty());
3821    }
3822
3823    #[cfg(feature = "metrics")]
3824    #[test]
3825    fn test_reset_metrics() {
3826        let db = GrafeoDB::new_in_memory();
3827        // Execute something to generate metrics
3828        let _session = db.session();
3829        db.reset_metrics();
3830        let snap = db.metrics();
3831        assert_eq!(snap.query_count, 0);
3832    }
3833
3834    // =========================================================================
3835    // drop_graph on external store
3836    // =========================================================================
3837
3838    #[test]
3839    fn test_drop_graph_on_external_store() {
3840        use grafeo_core::graph::lpg::LpgStore;
3841
3842        let store = Arc::new(LpgStore::new().unwrap());
3843        let read_store = Arc::clone(&store) as Arc<dyn GraphStoreSearch>;
3844        let db = GrafeoDB::with_read_store(read_store, Config::in_memory()).unwrap();
3845
3846        // drop_graph with external store (no built-in store) returns false
3847        assert!(!db.drop_graph("anything"));
3848    }
3849}