Skip to main content

grafeo_engine/database/
mod.rs

1//! The main database struct and operations.
2//!
3//! Start here with [`GrafeoDB`] - it's your handle to everything.
4//!
5//! Operations are split across focused submodules:
6//! - `query` - Query execution (execute, execute_cypher, etc.)
7//! - `crud` - Node/edge CRUD operations
8//! - `index` - Property, vector, and text index management
9//! - `search` - Vector, text, and hybrid search
10//! - `embed` - Embedding model management
11//! - `persistence` - Save, load, snapshots, iteration
12//! - `admin` - Stats, introspection, diagnostics, CDC
13
14mod admin;
15mod crud;
16#[cfg(feature = "embed")]
17mod embed;
18mod index;
19mod persistence;
20mod query;
21mod search;
22#[cfg(feature = "wal")]
23pub(crate) mod wal_store;
24
25#[cfg(feature = "wal")]
26use std::path::Path;
27use std::sync::Arc;
28use std::sync::atomic::AtomicUsize;
29
30use parking_lot::RwLock;
31
32#[cfg(feature = "grafeo-file")]
33use grafeo_adapters::storage::file::GrafeoFileManager;
34#[cfg(feature = "wal")]
35use grafeo_adapters::storage::wal::{
36    DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord, WalRecovery,
37};
38use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
39use grafeo_common::utils::error::Result;
40use grafeo_core::graph::GraphStoreMut;
41use grafeo_core::graph::lpg::LpgStore;
42#[cfg(feature = "rdf")]
43use grafeo_core::graph::rdf::RdfStore;
44
45use crate::catalog::Catalog;
46use crate::config::Config;
47use crate::query::cache::QueryCache;
48use crate::session::Session;
49use crate::transaction::TransactionManager;
50
51/// Your handle to a Grafeo database.
52///
53/// Start here. Create one with [`new_in_memory()`](Self::new_in_memory) for
54/// quick experiments, or [`open()`](Self::open) for persistent storage.
55/// Then grab a [`session()`](Self::session) to start querying.
56///
57/// # Examples
58///
59/// ```
60/// use grafeo_engine::GrafeoDB;
61///
62/// // Quick in-memory database
63/// let db = GrafeoDB::new_in_memory();
64///
65/// // Add some data
66/// db.create_node(&["Person"]);
67///
68/// // Query it
69/// let session = db.session();
70/// let result = session.execute("MATCH (p:Person) RETURN p")?;
71/// # Ok::<(), grafeo_common::utils::error::Error>(())
72/// ```
73pub struct GrafeoDB {
74    /// Database configuration.
75    pub(super) config: Config,
76    /// The underlying graph store.
77    pub(super) store: Arc<LpgStore>,
78    /// Schema and metadata catalog shared across sessions.
79    pub(super) catalog: Arc<Catalog>,
80    /// RDF triple store (if RDF feature is enabled).
81    #[cfg(feature = "rdf")]
82    pub(super) rdf_store: Arc<RdfStore>,
83    /// Transaction manager.
84    pub(super) transaction_manager: Arc<TransactionManager>,
85    /// Unified buffer manager.
86    pub(super) buffer_manager: Arc<BufferManager>,
87    /// Write-ahead log manager (if durability is enabled).
88    #[cfg(feature = "wal")]
89    pub(super) wal: Option<Arc<LpgWal>>,
90    /// Shared WAL graph context tracker. Tracks which named graph was last
91    /// written to the WAL, so concurrent sessions can emit `SwitchGraph`
92    /// records only when the context actually changes.
93    #[cfg(feature = "wal")]
94    pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
95    /// Query cache for parsed and optimized plans.
96    pub(super) query_cache: Arc<QueryCache>,
97    /// Shared commit counter for auto-GC across sessions.
98    pub(super) commit_counter: Arc<AtomicUsize>,
99    /// Whether the database is open.
100    pub(super) is_open: RwLock<bool>,
101    /// Change data capture log for tracking mutations.
102    #[cfg(feature = "cdc")]
103    pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
104    /// Registered embedding models for text-to-vector conversion.
105    #[cfg(feature = "embed")]
106    pub(super) embedding_models:
107        RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
108    /// Single-file database manager (when using `.grafeo` format).
109    #[cfg(feature = "grafeo-file")]
110    pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
111    /// External graph store (when using with_store()).
112    /// When set, sessions route queries through this store instead of the built-in LpgStore.
113    pub(super) external_store: Option<Arc<dyn GraphStoreMut>>,
114    /// Persistent graph context for one-shot `execute()` calls.
115    /// When set, each call to `session()` pre-configures the session to this graph.
116    /// Updated after every one-shot `execute()` to reflect `USE GRAPH` / `SESSION RESET`.
117    current_graph: RwLock<Option<String>>,
118}
119
120impl GrafeoDB {
121    /// Creates an in-memory database, fast to create, gone when dropped.
122    ///
123    /// Use this for tests, experiments, or when you don't need persistence.
124    /// For data that survives restarts, use [`open()`](Self::open) instead.
125    ///
126    /// # Panics
127    ///
128    /// Panics if the internal arena allocator cannot be initialized (out of memory).
129    /// Use [`with_config()`](Self::with_config) for a fallible alternative.
130    ///
131    /// # Examples
132    ///
133    /// ```
134    /// use grafeo_engine::GrafeoDB;
135    ///
136    /// let db = GrafeoDB::new_in_memory();
137    /// let session = db.session();
138    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
139    /// # Ok::<(), grafeo_common::utils::error::Error>(())
140    /// ```
141    #[must_use]
142    pub fn new_in_memory() -> Self {
143        Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
144    }
145
146    /// Opens a database at the given path, creating it if it doesn't exist.
147    ///
148    /// If you've used this path before, Grafeo recovers your data from the
149    /// write-ahead log automatically. First open on a new path creates an
150    /// empty database.
151    ///
152    /// # Errors
153    ///
154    /// Returns an error if the path isn't writable or recovery fails.
155    ///
156    /// # Examples
157    ///
158    /// ```no_run
159    /// use grafeo_engine::GrafeoDB;
160    ///
161    /// let db = GrafeoDB::open("./my_social_network")?;
162    /// # Ok::<(), grafeo_common::utils::error::Error>(())
163    /// ```
164    #[cfg(feature = "wal")]
165    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
166        Self::with_config(Config::persistent(path.as_ref()))
167    }
168
169    /// Creates a database with custom configuration.
170    ///
171    /// Use this when you need fine-grained control over memory limits,
172    /// thread counts, or persistence settings. For most cases,
173    /// [`new_in_memory()`](Self::new_in_memory) or [`open()`](Self::open)
174    /// are simpler.
175    ///
176    /// # Errors
177    ///
178    /// Returns an error if the database can't be created or recovery fails.
179    ///
180    /// # Examples
181    ///
182    /// ```
183    /// use grafeo_engine::{GrafeoDB, Config};
184    ///
185    /// // In-memory with a 512MB limit
186    /// let config = Config::in_memory()
187    ///     .with_memory_limit(512 * 1024 * 1024);
188    ///
189    /// let db = GrafeoDB::with_config(config)?;
190    /// # Ok::<(), grafeo_common::utils::error::Error>(())
191    /// ```
192    pub fn with_config(config: Config) -> Result<Self> {
193        // Validate configuration before proceeding
194        config
195            .validate()
196            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
197
198        let store = Arc::new(LpgStore::new()?);
199        #[cfg(feature = "rdf")]
200        let rdf_store = Arc::new(RdfStore::new());
201        let transaction_manager = Arc::new(TransactionManager::new());
202
203        // Create buffer manager with configured limits
204        let buffer_config = BufferManagerConfig {
205            budget: config.memory_limit.unwrap_or_else(|| {
206                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
207            }),
208            spill_path: config
209                .spill_path
210                .clone()
211                .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
212            ..BufferManagerConfig::default()
213        };
214        let buffer_manager = BufferManager::new(buffer_config);
215
216        // Create catalog early so WAL replay can restore schema definitions
217        let catalog = Arc::new(Catalog::new());
218
219        // --- Single-file format (.grafeo) ---
220        #[cfg(feature = "grafeo-file")]
221        let file_manager: Option<Arc<GrafeoFileManager>> = if config.wal_enabled {
222            if let Some(ref db_path) = config.path {
223                if Self::should_use_single_file(db_path, config.storage_format) {
224                    let fm = if db_path.exists() && db_path.is_file() {
225                        GrafeoFileManager::open(db_path)?
226                    } else if !db_path.exists() {
227                        GrafeoFileManager::create(db_path)?
228                    } else {
229                        // Path exists but is not a file (directory, etc.)
230                        return Err(grafeo_common::utils::error::Error::Internal(format!(
231                            "path exists but is not a file: {}",
232                            db_path.display()
233                        )));
234                    };
235
236                    // Load snapshot data from the file
237                    let snapshot_data = fm.read_snapshot()?;
238                    if !snapshot_data.is_empty() {
239                        Self::apply_snapshot_data(
240                            &store,
241                            &catalog,
242                            #[cfg(feature = "rdf")]
243                            &rdf_store,
244                            &snapshot_data,
245                        )?;
246                    }
247
248                    // Recover sidecar WAL if present
249                    if fm.has_sidecar_wal() {
250                        let recovery = WalRecovery::new(fm.sidecar_wal_path());
251                        let records = recovery.recover()?;
252                        Self::apply_wal_records(
253                            &store,
254                            &catalog,
255                            #[cfg(feature = "rdf")]
256                            &rdf_store,
257                            &records,
258                        )?;
259                    }
260
261                    Some(Arc::new(fm))
262                } else {
263                    None
264                }
265            } else {
266                None
267            }
268        } else {
269            None
270        };
271
272        // Determine whether to use the WAL directory path (legacy) or sidecar
273        #[cfg(feature = "wal")]
274        let wal = if config.wal_enabled {
275            if let Some(ref db_path) = config.path {
276                // When using single-file format, the WAL is a sidecar directory
277                #[cfg(feature = "grafeo-file")]
278                let wal_path = if let Some(ref fm) = file_manager {
279                    let p = fm.sidecar_wal_path();
280                    std::fs::create_dir_all(&p)?;
281                    p
282                } else {
283                    // Legacy: WAL inside the database directory
284                    std::fs::create_dir_all(db_path)?;
285                    db_path.join("wal")
286                };
287
288                #[cfg(not(feature = "grafeo-file"))]
289                let wal_path = {
290                    std::fs::create_dir_all(db_path)?;
291                    db_path.join("wal")
292                };
293
294                // For legacy WAL directory format, check if WAL exists and recover
295                #[cfg(feature = "grafeo-file")]
296                let is_single_file = file_manager.is_some();
297                #[cfg(not(feature = "grafeo-file"))]
298                let is_single_file = false;
299
300                if !is_single_file && wal_path.exists() {
301                    let recovery = WalRecovery::new(&wal_path);
302                    let records = recovery.recover()?;
303                    Self::apply_wal_records(
304                        &store,
305                        &catalog,
306                        #[cfg(feature = "rdf")]
307                        &rdf_store,
308                        &records,
309                    )?;
310                }
311
312                // Open/create WAL manager with configured durability
313                let wal_durability = match config.wal_durability {
314                    crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
315                    crate::config::DurabilityMode::Batch {
316                        max_delay_ms,
317                        max_records,
318                    } => WalDurabilityMode::Batch {
319                        max_delay_ms,
320                        max_records,
321                    },
322                    crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
323                        WalDurabilityMode::Adaptive { target_interval_ms }
324                    }
325                    crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
326                };
327                let wal_config = WalConfig {
328                    durability: wal_durability,
329                    ..WalConfig::default()
330                };
331                let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
332                Some(Arc::new(wal_manager))
333            } else {
334                None
335            }
336        } else {
337            None
338        };
339
340        // Create query cache with default capacity (1000 queries)
341        let query_cache = Arc::new(QueryCache::default());
342
343        Ok(Self {
344            config,
345            store,
346            catalog,
347            #[cfg(feature = "rdf")]
348            rdf_store,
349            transaction_manager,
350            buffer_manager,
351            #[cfg(feature = "wal")]
352            wal,
353            #[cfg(feature = "wal")]
354            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
355            query_cache,
356            commit_counter: Arc::new(AtomicUsize::new(0)),
357            is_open: RwLock::new(true),
358            #[cfg(feature = "cdc")]
359            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
360            #[cfg(feature = "embed")]
361            embedding_models: RwLock::new(hashbrown::HashMap::new()),
362            #[cfg(feature = "grafeo-file")]
363            file_manager,
364            external_store: None,
365            current_graph: RwLock::new(None),
366        })
367    }
368
369    /// Creates a database backed by a custom [`GraphStoreMut`] implementation.
370    ///
371    /// The external store handles all data persistence. WAL, CDC, and index
372    /// management are the responsibility of the store implementation.
373    ///
374    /// Query execution (all 6 languages, optimizer, planner) works through the
375    /// provided store. Admin operations (schema introspection, persistence,
376    /// vector/text indexes) are not available on external stores.
377    ///
378    /// # Examples
379    ///
380    /// ```no_run
381    /// use std::sync::Arc;
382    /// use grafeo_engine::{GrafeoDB, Config};
383    /// use grafeo_core::graph::GraphStoreMut;
384    ///
385    /// fn example(store: Arc<dyn GraphStoreMut>) -> grafeo_common::utils::error::Result<()> {
386    ///     let db = GrafeoDB::with_store(store, Config::in_memory())?;
387    ///     let result = db.execute("MATCH (n) RETURN count(n)")?;
388    ///     Ok(())
389    /// }
390    /// ```
391    ///
392    /// [`GraphStoreMut`]: grafeo_core::graph::GraphStoreMut
393    pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
394        config
395            .validate()
396            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
397
398        let dummy_store = Arc::new(LpgStore::new()?);
399        let transaction_manager = Arc::new(TransactionManager::new());
400
401        let buffer_config = BufferManagerConfig {
402            budget: config.memory_limit.unwrap_or_else(|| {
403                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
404            }),
405            spill_path: None,
406            ..BufferManagerConfig::default()
407        };
408        let buffer_manager = BufferManager::new(buffer_config);
409
410        let query_cache = Arc::new(QueryCache::default());
411
412        Ok(Self {
413            config,
414            store: dummy_store,
415            catalog: Arc::new(Catalog::new()),
416            #[cfg(feature = "rdf")]
417            rdf_store: Arc::new(RdfStore::new()),
418            transaction_manager,
419            buffer_manager,
420            #[cfg(feature = "wal")]
421            wal: None,
422            #[cfg(feature = "wal")]
423            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
424            query_cache,
425            commit_counter: Arc::new(AtomicUsize::new(0)),
426            is_open: RwLock::new(true),
427            #[cfg(feature = "cdc")]
428            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
429            #[cfg(feature = "embed")]
430            embedding_models: RwLock::new(hashbrown::HashMap::new()),
431            #[cfg(feature = "grafeo-file")]
432            file_manager: None,
433            external_store: Some(store),
434            current_graph: RwLock::new(None),
435        })
436    }
437
438    /// Applies WAL records to restore the database state.
439    ///
440    /// Data mutation records are routed through a graph cursor that tracks
441    /// `SwitchGraph` context markers, replaying mutations into the correct
442    /// named graph (or the default graph when cursor is `None`).
443    #[cfg(feature = "wal")]
444    fn apply_wal_records(
445        store: &Arc<LpgStore>,
446        catalog: &Catalog,
447        #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
448        records: &[WalRecord],
449    ) -> Result<()> {
450        use crate::catalog::{
451            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
452        };
453        use grafeo_common::utils::error::Error;
454
455        // Graph cursor: tracks which named graph receives data mutations.
456        // `None` means the default graph.
457        let mut current_graph: Option<String> = None;
458        let mut target_store: Arc<LpgStore> = Arc::clone(store);
459
460        for record in records {
461            match record {
462                // --- Named graph lifecycle ---
463                WalRecord::CreateNamedGraph { name } => {
464                    let _ = store.create_graph(name);
465                }
466                WalRecord::DropNamedGraph { name } => {
467                    store.drop_graph(name);
468                    // Reset cursor if the dropped graph was active
469                    if current_graph.as_deref() == Some(name.as_str()) {
470                        current_graph = None;
471                        target_store = Arc::clone(store);
472                    }
473                }
474                WalRecord::SwitchGraph { name } => {
475                    current_graph.clone_from(name);
476                    target_store = match &current_graph {
477                        None => Arc::clone(store),
478                        Some(graph_name) => store
479                            .graph_or_create(graph_name)
480                            .map_err(|e| Error::Internal(e.to_string()))?,
481                    };
482                }
483
484                // --- Data mutations: routed through target_store ---
485                WalRecord::CreateNode { id, labels } => {
486                    let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
487                    target_store.create_node_with_id(*id, &label_refs)?;
488                }
489                WalRecord::DeleteNode { id } => {
490                    target_store.delete_node(*id);
491                }
492                WalRecord::CreateEdge {
493                    id,
494                    src,
495                    dst,
496                    edge_type,
497                } => {
498                    target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
499                }
500                WalRecord::DeleteEdge { id } => {
501                    target_store.delete_edge(*id);
502                }
503                WalRecord::SetNodeProperty { id, key, value } => {
504                    target_store.set_node_property(*id, key, value.clone());
505                }
506                WalRecord::SetEdgeProperty { id, key, value } => {
507                    target_store.set_edge_property(*id, key, value.clone());
508                }
509                WalRecord::AddNodeLabel { id, label } => {
510                    target_store.add_label(*id, label);
511                }
512                WalRecord::RemoveNodeLabel { id, label } => {
513                    target_store.remove_label(*id, label);
514                }
515                WalRecord::RemoveNodeProperty { id, key } => {
516                    target_store.remove_node_property(*id, key);
517                }
518                WalRecord::RemoveEdgeProperty { id, key } => {
519                    target_store.remove_edge_property(*id, key);
520                }
521
522                // --- Schema DDL replay (always on root catalog) ---
523                WalRecord::CreateNodeType {
524                    name,
525                    properties,
526                    constraints,
527                } => {
528                    let def = NodeTypeDefinition {
529                        name: name.clone(),
530                        properties: properties
531                            .iter()
532                            .map(|(n, t, nullable)| TypedProperty {
533                                name: n.clone(),
534                                data_type: PropertyDataType::from_type_name(t),
535                                nullable: *nullable,
536                                default_value: None,
537                            })
538                            .collect(),
539                        constraints: constraints
540                            .iter()
541                            .map(|(kind, props)| match kind.as_str() {
542                                "unique" => TypeConstraint::Unique(props.clone()),
543                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
544                                "not_null" if !props.is_empty() => {
545                                    TypeConstraint::NotNull(props[0].clone())
546                                }
547                                _ => TypeConstraint::Unique(props.clone()),
548                            })
549                            .collect(),
550                        parent_types: Vec::new(),
551                    };
552                    let _ = catalog.register_node_type(def);
553                }
554                WalRecord::DropNodeType { name } => {
555                    let _ = catalog.drop_node_type(name);
556                }
557                WalRecord::CreateEdgeType {
558                    name,
559                    properties,
560                    constraints,
561                } => {
562                    let def = EdgeTypeDefinition {
563                        name: name.clone(),
564                        properties: properties
565                            .iter()
566                            .map(|(n, t, nullable)| TypedProperty {
567                                name: n.clone(),
568                                data_type: PropertyDataType::from_type_name(t),
569                                nullable: *nullable,
570                                default_value: None,
571                            })
572                            .collect(),
573                        constraints: constraints
574                            .iter()
575                            .map(|(kind, props)| match kind.as_str() {
576                                "unique" => TypeConstraint::Unique(props.clone()),
577                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
578                                "not_null" if !props.is_empty() => {
579                                    TypeConstraint::NotNull(props[0].clone())
580                                }
581                                _ => TypeConstraint::Unique(props.clone()),
582                            })
583                            .collect(),
584                        source_node_types: Vec::new(),
585                        target_node_types: Vec::new(),
586                    };
587                    let _ = catalog.register_edge_type_def(def);
588                }
589                WalRecord::DropEdgeType { name } => {
590                    let _ = catalog.drop_edge_type_def(name);
591                }
592                WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
593                    // Index recreation is handled by the store on startup
594                    // (indexes are rebuilt from data, not WAL)
595                }
596                WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
597                    // Constraint definitions are part of type definitions
598                    // and replayed via CreateNodeType/CreateEdgeType
599                }
600                WalRecord::CreateGraphType {
601                    name,
602                    node_types,
603                    edge_types,
604                    open,
605                } => {
606                    use crate::catalog::GraphTypeDefinition;
607                    let def = GraphTypeDefinition {
608                        name: name.clone(),
609                        allowed_node_types: node_types.clone(),
610                        allowed_edge_types: edge_types.clone(),
611                        open: *open,
612                    };
613                    let _ = catalog.register_graph_type(def);
614                }
615                WalRecord::DropGraphType { name } => {
616                    let _ = catalog.drop_graph_type(name);
617                }
618                WalRecord::CreateSchema { name } => {
619                    let _ = catalog.register_schema_namespace(name.clone());
620                }
621                WalRecord::DropSchema { name } => {
622                    let _ = catalog.drop_schema_namespace(name);
623                }
624
625                WalRecord::AlterNodeType { name, alterations } => {
626                    for (action, prop_name, type_name, nullable) in alterations {
627                        match action.as_str() {
628                            "add" => {
629                                let prop = TypedProperty {
630                                    name: prop_name.clone(),
631                                    data_type: PropertyDataType::from_type_name(type_name),
632                                    nullable: *nullable,
633                                    default_value: None,
634                                };
635                                let _ = catalog.alter_node_type_add_property(name, prop);
636                            }
637                            "drop" => {
638                                let _ = catalog.alter_node_type_drop_property(name, prop_name);
639                            }
640                            _ => {}
641                        }
642                    }
643                }
644                WalRecord::AlterEdgeType { name, alterations } => {
645                    for (action, prop_name, type_name, nullable) in alterations {
646                        match action.as_str() {
647                            "add" => {
648                                let prop = TypedProperty {
649                                    name: prop_name.clone(),
650                                    data_type: PropertyDataType::from_type_name(type_name),
651                                    nullable: *nullable,
652                                    default_value: None,
653                                };
654                                let _ = catalog.alter_edge_type_add_property(name, prop);
655                            }
656                            "drop" => {
657                                let _ = catalog.alter_edge_type_drop_property(name, prop_name);
658                            }
659                            _ => {}
660                        }
661                    }
662                }
663                WalRecord::AlterGraphType { name, alterations } => {
664                    for (action, type_name) in alterations {
665                        match action.as_str() {
666                            "add_node" => {
667                                let _ =
668                                    catalog.alter_graph_type_add_node_type(name, type_name.clone());
669                            }
670                            "drop_node" => {
671                                let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
672                            }
673                            "add_edge" => {
674                                let _ =
675                                    catalog.alter_graph_type_add_edge_type(name, type_name.clone());
676                            }
677                            "drop_edge" => {
678                                let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
679                            }
680                            _ => {}
681                        }
682                    }
683                }
684
685                WalRecord::CreateProcedure {
686                    name,
687                    params,
688                    returns,
689                    body,
690                } => {
691                    use crate::catalog::ProcedureDefinition;
692                    let def = ProcedureDefinition {
693                        name: name.clone(),
694                        params: params.clone(),
695                        returns: returns.clone(),
696                        body: body.clone(),
697                    };
698                    let _ = catalog.register_procedure(def);
699                }
700                WalRecord::DropProcedure { name } => {
701                    let _ = catalog.drop_procedure(name);
702                }
703
704                // --- RDF triple replay ---
705                #[cfg(feature = "rdf")]
706                WalRecord::InsertRdfTriple {
707                    subject,
708                    predicate,
709                    object,
710                    graph,
711                } => {
712                    use grafeo_core::graph::rdf::Term;
713                    if let (Some(s), Some(p), Some(o)) = (
714                        Term::from_ntriples(subject),
715                        Term::from_ntriples(predicate),
716                        Term::from_ntriples(object),
717                    ) {
718                        let triple = grafeo_core::graph::rdf::Triple::new(s, p, o);
719                        let target = match graph {
720                            Some(name) => rdf_store.graph_or_create(name),
721                            None => Arc::clone(rdf_store),
722                        };
723                        target.insert(triple);
724                    }
725                }
726                #[cfg(feature = "rdf")]
727                WalRecord::DeleteRdfTriple {
728                    subject,
729                    predicate,
730                    object,
731                    graph,
732                } => {
733                    use grafeo_core::graph::rdf::Term;
734                    if let (Some(s), Some(p), Some(o)) = (
735                        Term::from_ntriples(subject),
736                        Term::from_ntriples(predicate),
737                        Term::from_ntriples(object),
738                    ) {
739                        let triple = grafeo_core::graph::rdf::Triple::new(s, p, o);
740                        let target = match graph {
741                            Some(name) => rdf_store.graph_or_create(name),
742                            None => Arc::clone(rdf_store),
743                        };
744                        target.remove(&triple);
745                    }
746                }
747                #[cfg(feature = "rdf")]
748                WalRecord::ClearRdfGraph { graph } => {
749                    rdf_store.clear_graph(graph.as_deref());
750                }
751                #[cfg(feature = "rdf")]
752                WalRecord::CreateRdfGraph { name } => {
753                    let _ = rdf_store.create_graph(name);
754                }
755                #[cfg(feature = "rdf")]
756                WalRecord::DropRdfGraph { name } => match name {
757                    None => rdf_store.clear(),
758                    Some(graph_name) => {
759                        rdf_store.drop_graph(graph_name);
760                    }
761                },
762                // Skip RDF records when rdf feature is disabled
763                #[cfg(not(feature = "rdf"))]
764                WalRecord::InsertRdfTriple { .. }
765                | WalRecord::DeleteRdfTriple { .. }
766                | WalRecord::ClearRdfGraph { .. }
767                | WalRecord::CreateRdfGraph { .. }
768                | WalRecord::DropRdfGraph { .. } => {}
769
770                WalRecord::TransactionCommit { .. }
771                | WalRecord::TransactionAbort { .. }
772                | WalRecord::Checkpoint { .. } => {
773                    // Transaction control records don't need replay action
774                    // (recovery already filtered to only committed transactions)
775                }
776            }
777        }
778        Ok(())
779    }
780
781    // =========================================================================
782    // Single-file format helpers
783    // =========================================================================
784
785    /// Returns `true` if the given path should use single-file format.
786    #[cfg(feature = "grafeo-file")]
787    fn should_use_single_file(
788        path: &std::path::Path,
789        configured: crate::config::StorageFormat,
790    ) -> bool {
791        use crate::config::StorageFormat;
792        match configured {
793            StorageFormat::SingleFile => true,
794            StorageFormat::WalDirectory => false,
795            StorageFormat::Auto => {
796                // Existing file: check magic bytes
797                if path.is_file() {
798                    if let Ok(mut f) = std::fs::File::open(path) {
799                        use std::io::Read;
800                        let mut magic = [0u8; 4];
801                        if f.read_exact(&mut magic).is_ok()
802                            && magic == grafeo_adapters::storage::file::MAGIC
803                        {
804                            return true;
805                        }
806                    }
807                    return false;
808                }
809                // Existing directory: legacy format
810                if path.is_dir() {
811                    return false;
812                }
813                // New path: check extension
814                path.extension().is_some_and(|ext| ext == "grafeo")
815            }
816        }
817    }
818
819    /// Applies snapshot data (from a `.grafeo` file) to restore the store and catalog.
820    #[cfg(feature = "grafeo-file")]
821    fn apply_snapshot_data(
822        store: &Arc<LpgStore>,
823        catalog: &Arc<crate::catalog::Catalog>,
824        #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
825        data: &[u8],
826    ) -> Result<()> {
827        persistence::load_snapshot_into_store(
828            store,
829            catalog,
830            #[cfg(feature = "rdf")]
831            rdf_store,
832            data,
833        )
834    }
835
836    // =========================================================================
837    // Session & Configuration
838    // =========================================================================
839
840    /// Opens a new session for running queries.
841    ///
842    /// Sessions are cheap to create: spin up as many as you need. Each
843    /// gets its own transaction context, so concurrent sessions won't
844    /// block each other on reads.
845    ///
846    /// # Panics
847    ///
848    /// Panics if the database was configured with an external graph store and
849    /// the internal arena allocator cannot be initialized (out of memory).
850    ///
851    /// # Examples
852    ///
853    /// ```
854    /// use grafeo_engine::GrafeoDB;
855    ///
856    /// let db = GrafeoDB::new_in_memory();
857    /// let session = db.session();
858    ///
859    /// // Run queries through the session
860    /// let result = session.execute("MATCH (n) RETURN count(n)")?;
861    /// # Ok::<(), grafeo_common::utils::error::Error>(())
862    /// ```
863    #[must_use]
864    pub fn session(&self) -> Session {
865        let session_cfg = || crate::session::SessionConfig {
866            transaction_manager: Arc::clone(&self.transaction_manager),
867            query_cache: Arc::clone(&self.query_cache),
868            catalog: Arc::clone(&self.catalog),
869            adaptive_config: self.config.adaptive.clone(),
870            factorized_execution: self.config.factorized_execution,
871            graph_model: self.config.graph_model,
872            query_timeout: self.config.query_timeout,
873            commit_counter: Arc::clone(&self.commit_counter),
874            gc_interval: self.config.gc_interval,
875        };
876
877        if let Some(ref ext_store) = self.external_store {
878            return Session::with_external_store(Arc::clone(ext_store), session_cfg())
879                .expect("arena allocation for external store session");
880        }
881
882        #[cfg(feature = "rdf")]
883        let mut session = Session::with_rdf_store_and_adaptive(
884            Arc::clone(&self.store),
885            Arc::clone(&self.rdf_store),
886            session_cfg(),
887        );
888        #[cfg(not(feature = "rdf"))]
889        let mut session = Session::with_adaptive(Arc::clone(&self.store), session_cfg());
890
891        #[cfg(feature = "wal")]
892        if let Some(ref wal) = self.wal {
893            session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
894        }
895
896        #[cfg(feature = "cdc")]
897        session.set_cdc_log(Arc::clone(&self.cdc_log));
898
899        // Propagate persistent graph context to the new session
900        if let Some(ref graph) = *self.current_graph.read() {
901            session.use_graph(graph);
902        }
903
904        // Suppress unused_mut when cdc/wal are disabled
905        let _ = &mut session;
906
907        session
908    }
909
910    /// Returns the current graph name, if any.
911    ///
912    /// This is the persistent graph context used by one-shot `execute()` calls.
913    /// It is updated whenever `execute()` encounters `USE GRAPH`, `SESSION SET GRAPH`,
914    /// or `SESSION RESET`.
915    #[must_use]
916    pub fn current_graph(&self) -> Option<String> {
917        self.current_graph.read().clone()
918    }
919
920    /// Sets the current graph context for subsequent one-shot `execute()` calls.
921    ///
922    /// This is equivalent to running `USE GRAPH <name>` but without creating a session.
923    /// Pass `None` to reset to the default graph.
924    pub fn set_current_graph(&self, name: Option<&str>) {
925        *self.current_graph.write() = name.map(ToString::to_string);
926    }
927
928    /// Returns the adaptive execution configuration.
929    #[must_use]
930    pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
931        &self.config.adaptive
932    }
933
934    /// Returns the configuration.
935    #[must_use]
936    pub fn config(&self) -> &Config {
937        &self.config
938    }
939
940    /// Returns the graph data model of this database.
941    #[must_use]
942    pub fn graph_model(&self) -> crate::config::GraphModel {
943        self.config.graph_model
944    }
945
946    /// Returns the configured memory limit in bytes, if any.
947    #[must_use]
948    pub fn memory_limit(&self) -> Option<usize> {
949        self.config.memory_limit
950    }
951
952    /// Returns the underlying (default) store.
953    ///
954    /// This provides direct access to the LPG store for algorithm implementations
955    /// and admin operations (index management, schema introspection, MVCC internals).
956    ///
957    /// For code that only needs read/write graph operations, prefer
958    /// [`graph_store()`](Self::graph_store) which returns the trait interface.
959    #[must_use]
960    pub fn store(&self) -> &Arc<LpgStore> {
961        &self.store
962    }
963
964    /// Returns the LPG store for the currently active graph.
965    ///
966    /// If [`current_graph`](Self::current_graph) is `None` or `"default"`, returns
967    /// the default store. Otherwise looks up the named graph in the root store.
968    /// Falls back to the default store if the named graph does not exist.
969    #[allow(dead_code)] // Reserved for future graph-aware CRUD methods
970    fn active_store(&self) -> Arc<LpgStore> {
971        let graph_name = self.current_graph.read().clone();
972        match graph_name {
973            None => Arc::clone(&self.store),
974            Some(ref name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
975            Some(ref name) => self
976                .store
977                .graph(name)
978                .unwrap_or_else(|| Arc::clone(&self.store)),
979        }
980    }
981
982    // === Named Graph Management ===
983
984    /// Creates a named graph. Returns `true` if created, `false` if it already exists.
985    ///
986    /// # Errors
987    ///
988    /// Returns an error if arena allocation fails.
989    pub fn create_graph(&self, name: &str) -> Result<bool> {
990        Ok(self.store.create_graph(name)?)
991    }
992
993    /// Drops a named graph. Returns `true` if dropped, `false` if it did not exist.
994    pub fn drop_graph(&self, name: &str) -> bool {
995        self.store.drop_graph(name)
996    }
997
998    /// Returns all named graph names.
999    #[must_use]
1000    pub fn list_graphs(&self) -> Vec<String> {
1001        self.store.graph_names()
1002    }
1003
1004    /// Returns the graph store as a trait object.
1005    ///
1006    /// This provides the [`GraphStoreMut`] interface for code that should work
1007    /// with any storage backend. Use this when you only need graph read/write
1008    /// operations and don't need admin methods like index management.
1009    ///
1010    /// [`GraphStoreMut`]: grafeo_core::graph::GraphStoreMut
1011    #[must_use]
1012    pub fn graph_store(&self) -> Arc<dyn GraphStoreMut> {
1013        if let Some(ref ext_store) = self.external_store {
1014            Arc::clone(ext_store)
1015        } else {
1016            Arc::clone(&self.store) as Arc<dyn GraphStoreMut>
1017        }
1018    }
1019
1020    /// Garbage collects old MVCC versions that are no longer visible.
1021    ///
1022    /// Determines the minimum epoch required by active transactions and prunes
1023    /// version chains older than that threshold. Also cleans up completed
1024    /// transaction metadata in the transaction manager.
1025    pub fn gc(&self) {
1026        let min_epoch = self.transaction_manager.min_active_epoch();
1027        self.store.gc_versions(min_epoch);
1028        self.transaction_manager.gc();
1029    }
1030
1031    /// Returns the buffer manager for memory-aware operations.
1032    #[must_use]
1033    pub fn buffer_manager(&self) -> &Arc<BufferManager> {
1034        &self.buffer_manager
1035    }
1036
1037    /// Returns the query cache.
1038    #[must_use]
1039    pub fn query_cache(&self) -> &Arc<QueryCache> {
1040        &self.query_cache
1041    }
1042
1043    /// Clears all cached query plans.
1044    ///
1045    /// This is called automatically after DDL operations, but can also be
1046    /// invoked manually after external schema changes (e.g., WAL replay,
1047    /// import) or when you want to force re-optimization of all queries.
1048    pub fn clear_plan_cache(&self) {
1049        self.query_cache.clear();
1050    }
1051
1052    // =========================================================================
1053    // Lifecycle
1054    // =========================================================================
1055
1056    /// Closes the database, flushing all pending writes.
1057    ///
1058    /// For persistent databases, this ensures everything is safely on disk.
1059    /// Called automatically when the database is dropped, but you can call
1060    /// it explicitly if you need to guarantee durability at a specific point.
1061    ///
1062    /// # Errors
1063    ///
1064    /// Returns an error if the WAL can't be flushed (check disk space/permissions).
1065    pub fn close(&self) -> Result<()> {
1066        let mut is_open = self.is_open.write();
1067        if !*is_open {
1068            return Ok(());
1069        }
1070
1071        // For single-file format: checkpoint to .grafeo file, then clean up sidecar WAL.
1072        // We must do this BEFORE the WAL close path because checkpoint_to_file
1073        // removes the sidecar WAL directory.
1074        #[cfg(feature = "grafeo-file")]
1075        let is_single_file = self.file_manager.is_some();
1076        #[cfg(not(feature = "grafeo-file"))]
1077        let is_single_file = false;
1078
1079        #[cfg(feature = "grafeo-file")]
1080        if let Some(ref fm) = self.file_manager {
1081            // Flush WAL first so all records are on disk before we snapshot
1082            #[cfg(feature = "wal")]
1083            if let Some(ref wal) = self.wal {
1084                wal.sync()?;
1085            }
1086            self.checkpoint_to_file(fm)?;
1087
1088            // Release WAL file handles before removing sidecar directory.
1089            // On Windows, open handles prevent directory deletion.
1090            #[cfg(feature = "wal")]
1091            if let Some(ref wal) = self.wal {
1092                wal.close_active_log();
1093            }
1094
1095            fm.remove_sidecar_wal()?;
1096            fm.close()?;
1097        }
1098
1099        // Commit and checkpoint WAL (legacy directory format only)
1100        #[cfg(feature = "wal")]
1101        if !is_single_file && let Some(ref wal) = self.wal {
1102            let epoch = self.store.current_epoch();
1103
1104            // Use the last assigned transaction ID, or create a checkpoint-only tx
1105            let checkpoint_tx = self
1106                .transaction_manager
1107                .last_assigned_transaction_id()
1108                .unwrap_or_else(|| {
1109                    // No transactions have been started; begin one for checkpoint
1110                    self.transaction_manager.begin()
1111                });
1112
1113            // Log a TransactionCommit to mark all pending records as committed
1114            wal.log(&WalRecord::TransactionCommit {
1115                transaction_id: checkpoint_tx,
1116            })?;
1117
1118            // Then checkpoint
1119            wal.checkpoint(checkpoint_tx, epoch)?;
1120            wal.sync()?;
1121        }
1122
1123        *is_open = false;
1124        Ok(())
1125    }
1126
1127    /// Returns the typed WAL if available.
1128    #[cfg(feature = "wal")]
1129    #[must_use]
1130    pub fn wal(&self) -> Option<&Arc<LpgWal>> {
1131        self.wal.as_ref()
1132    }
1133
1134    /// Logs a WAL record if WAL is enabled.
1135    #[cfg(feature = "wal")]
1136    pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
1137        if let Some(ref wal) = self.wal {
1138            wal.log(record)?;
1139        }
1140        Ok(())
1141    }
1142
1143    /// Writes the current database snapshot to the `.grafeo` file.
1144    ///
1145    /// Does NOT remove the sidecar WAL: callers that want to clean up
1146    /// the sidecar (e.g. `close()`) should call `fm.remove_sidecar_wal()`
1147    /// separately after this returns.
1148    #[cfg(feature = "grafeo-file")]
1149    fn checkpoint_to_file(&self, fm: &GrafeoFileManager) -> Result<()> {
1150        use grafeo_core::testing::crash::maybe_crash;
1151
1152        maybe_crash("checkpoint_to_file:before_export");
1153        let snapshot_data = self.export_snapshot()?;
1154        maybe_crash("checkpoint_to_file:after_export");
1155
1156        let epoch = self.store.current_epoch();
1157        let transaction_id = self
1158            .transaction_manager
1159            .last_assigned_transaction_id()
1160            .map_or(0, |t| t.0);
1161        let node_count = self.store.node_count() as u64;
1162        let edge_count = self.store.edge_count() as u64;
1163
1164        fm.write_snapshot(
1165            &snapshot_data,
1166            epoch.0,
1167            transaction_id,
1168            node_count,
1169            edge_count,
1170        )?;
1171
1172        maybe_crash("checkpoint_to_file:after_write_snapshot");
1173        Ok(())
1174    }
1175
1176    /// Returns the file manager if using single-file format.
1177    #[cfg(feature = "grafeo-file")]
1178    #[must_use]
1179    pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
1180        self.file_manager.as_ref()
1181    }
1182}
1183
1184impl Drop for GrafeoDB {
1185    fn drop(&mut self) {
1186        if let Err(e) = self.close() {
1187            tracing::error!("Error closing database: {}", e);
1188        }
1189    }
1190}
1191
1192impl crate::admin::AdminService for GrafeoDB {
1193    fn info(&self) -> crate::admin::DatabaseInfo {
1194        self.info()
1195    }
1196
1197    fn detailed_stats(&self) -> crate::admin::DatabaseStats {
1198        self.detailed_stats()
1199    }
1200
1201    fn schema(&self) -> crate::admin::SchemaInfo {
1202        self.schema()
1203    }
1204
1205    fn validate(&self) -> crate::admin::ValidationResult {
1206        self.validate()
1207    }
1208
1209    fn wal_status(&self) -> crate::admin::WalStatus {
1210        self.wal_status()
1211    }
1212
1213    fn wal_checkpoint(&self) -> Result<()> {
1214        self.wal_checkpoint()
1215    }
1216}
1217
1218// =========================================================================
1219// Query Result Types
1220// =========================================================================
1221
1222/// The result of running a query.
1223///
1224/// Contains rows and columns, like a table. Use [`iter()`](Self::iter) to
1225/// loop through rows, or [`scalar()`](Self::scalar) if you expect a single value.
1226///
1227/// # Examples
1228///
1229/// ```
1230/// use grafeo_engine::GrafeoDB;
1231///
1232/// let db = GrafeoDB::new_in_memory();
1233/// db.create_node(&["Person"]);
1234///
1235/// let result = db.execute("MATCH (p:Person) RETURN count(p) AS total")?;
1236///
1237/// // Check what we got
1238/// println!("Columns: {:?}", result.columns);
1239/// println!("Rows: {}", result.row_count());
1240///
1241/// // Iterate through results
1242/// for row in result.iter() {
1243///     println!("{:?}", row);
1244/// }
1245/// # Ok::<(), grafeo_common::utils::error::Error>(())
1246/// ```
1247#[derive(Debug)]
1248pub struct QueryResult {
1249    /// Column names from the RETURN clause.
1250    pub columns: Vec<String>,
1251    /// Column types - useful for distinguishing NodeId/EdgeId from plain integers.
1252    pub column_types: Vec<grafeo_common::types::LogicalType>,
1253    /// The actual result rows.
1254    pub rows: Vec<Vec<grafeo_common::types::Value>>,
1255    /// Query execution time in milliseconds (if timing was enabled).
1256    pub execution_time_ms: Option<f64>,
1257    /// Number of rows scanned during query execution (estimate).
1258    pub rows_scanned: Option<u64>,
1259    /// Status message for DDL and session commands (e.g., "Created node type 'Person'").
1260    pub status_message: Option<String>,
1261    /// GQLSTATUS code per ISO/IEC 39075:2024, sec 23.
1262    pub gql_status: grafeo_common::utils::GqlStatus,
1263}
1264
1265impl QueryResult {
1266    /// Creates a fully empty query result (no columns, no rows).
1267    #[must_use]
1268    pub fn empty() -> Self {
1269        Self {
1270            columns: Vec::new(),
1271            column_types: Vec::new(),
1272            rows: Vec::new(),
1273            execution_time_ms: None,
1274            rows_scanned: None,
1275            status_message: None,
1276            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1277        }
1278    }
1279
1280    /// Creates a query result with only a status message (for DDL commands).
1281    #[must_use]
1282    pub fn status(msg: impl Into<String>) -> Self {
1283        Self {
1284            columns: Vec::new(),
1285            column_types: Vec::new(),
1286            rows: Vec::new(),
1287            execution_time_ms: None,
1288            rows_scanned: None,
1289            status_message: Some(msg.into()),
1290            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1291        }
1292    }
1293
1294    /// Creates a new empty query result.
1295    #[must_use]
1296    pub fn new(columns: Vec<String>) -> Self {
1297        let len = columns.len();
1298        Self {
1299            columns,
1300            column_types: vec![grafeo_common::types::LogicalType::Any; len],
1301            rows: Vec::new(),
1302            execution_time_ms: None,
1303            rows_scanned: None,
1304            status_message: None,
1305            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1306        }
1307    }
1308
1309    /// Creates a new empty query result with column types.
1310    #[must_use]
1311    pub fn with_types(
1312        columns: Vec<String>,
1313        column_types: Vec<grafeo_common::types::LogicalType>,
1314    ) -> Self {
1315        Self {
1316            columns,
1317            column_types,
1318            rows: Vec::new(),
1319            execution_time_ms: None,
1320            rows_scanned: None,
1321            status_message: None,
1322            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1323        }
1324    }
1325
1326    /// Sets the execution metrics on this result.
1327    pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
1328        self.execution_time_ms = Some(execution_time_ms);
1329        self.rows_scanned = Some(rows_scanned);
1330        self
1331    }
1332
1333    /// Returns the execution time in milliseconds, if available.
1334    #[must_use]
1335    pub fn execution_time_ms(&self) -> Option<f64> {
1336        self.execution_time_ms
1337    }
1338
1339    /// Returns the number of rows scanned, if available.
1340    #[must_use]
1341    pub fn rows_scanned(&self) -> Option<u64> {
1342        self.rows_scanned
1343    }
1344
1345    /// Returns the number of rows.
1346    #[must_use]
1347    pub fn row_count(&self) -> usize {
1348        self.rows.len()
1349    }
1350
1351    /// Returns the number of columns.
1352    #[must_use]
1353    pub fn column_count(&self) -> usize {
1354        self.columns.len()
1355    }
1356
1357    /// Returns true if the result is empty.
1358    #[must_use]
1359    pub fn is_empty(&self) -> bool {
1360        self.rows.is_empty()
1361    }
1362
1363    /// Extracts a single value from the result.
1364    ///
1365    /// Use this when your query returns exactly one row with one column,
1366    /// like `RETURN count(n)` or `RETURN sum(p.amount)`.
1367    ///
1368    /// # Errors
1369    ///
1370    /// Returns an error if the result has multiple rows or columns.
1371    pub fn scalar<T: FromValue>(&self) -> Result<T> {
1372        if self.rows.len() != 1 || self.columns.len() != 1 {
1373            return Err(grafeo_common::utils::error::Error::InvalidValue(
1374                "Expected single value".to_string(),
1375            ));
1376        }
1377        T::from_value(&self.rows[0][0])
1378    }
1379
1380    /// Returns an iterator over the rows.
1381    pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1382        self.rows.iter()
1383    }
1384}
1385
1386/// Converts a [`grafeo_common::types::Value`] to a concrete Rust type.
1387///
1388/// Implemented for common types like `i64`, `f64`, `String`, and `bool`.
1389/// Used by [`QueryResult::scalar()`] to extract typed values.
1390pub trait FromValue: Sized {
1391    /// Attempts the conversion, returning an error on type mismatch.
1392    fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1393}
1394
1395impl FromValue for i64 {
1396    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1397        value
1398            .as_int64()
1399            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1400                expected: "INT64".to_string(),
1401                found: value.type_name().to_string(),
1402            })
1403    }
1404}
1405
1406impl FromValue for f64 {
1407    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1408        value
1409            .as_float64()
1410            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1411                expected: "FLOAT64".to_string(),
1412                found: value.type_name().to_string(),
1413            })
1414    }
1415}
1416
1417impl FromValue for String {
1418    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1419        value.as_str().map(String::from).ok_or_else(|| {
1420            grafeo_common::utils::error::Error::TypeMismatch {
1421                expected: "STRING".to_string(),
1422                found: value.type_name().to_string(),
1423            }
1424        })
1425    }
1426}
1427
1428impl FromValue for bool {
1429    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1430        value
1431            .as_bool()
1432            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1433                expected: "BOOL".to_string(),
1434                found: value.type_name().to_string(),
1435            })
1436    }
1437}
1438
1439#[cfg(test)]
1440mod tests {
1441    use super::*;
1442
1443    #[test]
1444    fn test_create_in_memory_database() {
1445        let db = GrafeoDB::new_in_memory();
1446        assert_eq!(db.node_count(), 0);
1447        assert_eq!(db.edge_count(), 0);
1448    }
1449
1450    #[test]
1451    fn test_database_config() {
1452        let config = Config::in_memory().with_threads(4).with_query_logging();
1453
1454        let db = GrafeoDB::with_config(config).unwrap();
1455        assert_eq!(db.config().threads, 4);
1456        assert!(db.config().query_logging);
1457    }
1458
1459    #[test]
1460    fn test_database_session() {
1461        let db = GrafeoDB::new_in_memory();
1462        let _session = db.session();
1463        // Session should be created successfully
1464    }
1465
1466    #[cfg(feature = "wal")]
1467    #[test]
1468    fn test_persistent_database_recovery() {
1469        use grafeo_common::types::Value;
1470        use tempfile::tempdir;
1471
1472        let dir = tempdir().unwrap();
1473        let db_path = dir.path().join("test_db");
1474
1475        // Create database and add some data
1476        {
1477            let db = GrafeoDB::open(&db_path).unwrap();
1478
1479            let alix = db.create_node(&["Person"]);
1480            db.set_node_property(alix, "name", Value::from("Alix"));
1481
1482            let gus = db.create_node(&["Person"]);
1483            db.set_node_property(gus, "name", Value::from("Gus"));
1484
1485            let _edge = db.create_edge(alix, gus, "KNOWS");
1486
1487            // Explicitly close to flush WAL
1488            db.close().unwrap();
1489        }
1490
1491        // Reopen and verify data was recovered
1492        {
1493            let db = GrafeoDB::open(&db_path).unwrap();
1494
1495            assert_eq!(db.node_count(), 2);
1496            assert_eq!(db.edge_count(), 1);
1497
1498            // Verify nodes exist
1499            let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1500            assert!(node0.is_some());
1501
1502            let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1503            assert!(node1.is_some());
1504        }
1505    }
1506
1507    #[cfg(feature = "wal")]
1508    #[test]
1509    fn test_wal_logging() {
1510        use tempfile::tempdir;
1511
1512        let dir = tempdir().unwrap();
1513        let db_path = dir.path().join("wal_test_db");
1514
1515        let db = GrafeoDB::open(&db_path).unwrap();
1516
1517        // Create some data
1518        let node = db.create_node(&["Test"]);
1519        db.delete_node(node);
1520
1521        // WAL should have records
1522        if let Some(wal) = db.wal() {
1523            assert!(wal.record_count() > 0);
1524        }
1525
1526        db.close().unwrap();
1527    }
1528
1529    #[cfg(feature = "wal")]
1530    #[test]
1531    fn test_wal_recovery_multiple_sessions() {
1532        // Tests that WAL recovery works correctly across multiple open/close cycles
1533        use grafeo_common::types::Value;
1534        use tempfile::tempdir;
1535
1536        let dir = tempdir().unwrap();
1537        let db_path = dir.path().join("multi_session_db");
1538
1539        // Session 1: Create initial data
1540        {
1541            let db = GrafeoDB::open(&db_path).unwrap();
1542            let alix = db.create_node(&["Person"]);
1543            db.set_node_property(alix, "name", Value::from("Alix"));
1544            db.close().unwrap();
1545        }
1546
1547        // Session 2: Add more data
1548        {
1549            let db = GrafeoDB::open(&db_path).unwrap();
1550            assert_eq!(db.node_count(), 1); // Previous data recovered
1551            let gus = db.create_node(&["Person"]);
1552            db.set_node_property(gus, "name", Value::from("Gus"));
1553            db.close().unwrap();
1554        }
1555
1556        // Session 3: Verify all data
1557        {
1558            let db = GrafeoDB::open(&db_path).unwrap();
1559            assert_eq!(db.node_count(), 2);
1560
1561            // Verify properties were recovered correctly
1562            let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
1563            assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
1564
1565            let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
1566            assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
1567        }
1568    }
1569
1570    #[cfg(feature = "wal")]
1571    #[test]
1572    fn test_database_consistency_after_mutations() {
1573        // Tests that database remains consistent after a series of create/delete operations
1574        use grafeo_common::types::Value;
1575        use tempfile::tempdir;
1576
1577        let dir = tempdir().unwrap();
1578        let db_path = dir.path().join("consistency_db");
1579
1580        {
1581            let db = GrafeoDB::open(&db_path).unwrap();
1582
1583            // Create nodes
1584            let a = db.create_node(&["Node"]);
1585            let b = db.create_node(&["Node"]);
1586            let c = db.create_node(&["Node"]);
1587
1588            // Create edges
1589            let e1 = db.create_edge(a, b, "LINKS");
1590            let _e2 = db.create_edge(b, c, "LINKS");
1591
1592            // Delete middle node and its edge
1593            db.delete_edge(e1);
1594            db.delete_node(b);
1595
1596            // Set properties on remaining nodes
1597            db.set_node_property(a, "value", Value::Int64(1));
1598            db.set_node_property(c, "value", Value::Int64(3));
1599
1600            db.close().unwrap();
1601        }
1602
1603        // Reopen and verify consistency
1604        {
1605            let db = GrafeoDB::open(&db_path).unwrap();
1606
1607            // Should have 2 nodes (a and c), b was deleted
1608            // Note: node_count includes deleted nodes in some implementations
1609            // What matters is that the non-deleted nodes are accessible
1610            let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
1611            assert!(node_a.is_some());
1612
1613            let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
1614            assert!(node_c.is_some());
1615
1616            // Middle node should be deleted
1617            let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
1618            assert!(node_b.is_none());
1619        }
1620    }
1621
1622    #[cfg(feature = "wal")]
1623    #[test]
1624    fn test_close_is_idempotent() {
1625        // Calling close() multiple times should not cause errors
1626        use tempfile::tempdir;
1627
1628        let dir = tempdir().unwrap();
1629        let db_path = dir.path().join("close_test_db");
1630
1631        let db = GrafeoDB::open(&db_path).unwrap();
1632        db.create_node(&["Test"]);
1633
1634        // First close should succeed
1635        assert!(db.close().is_ok());
1636
1637        // Second close should also succeed (idempotent)
1638        assert!(db.close().is_ok());
1639    }
1640
1641    #[test]
1642    fn test_query_result_has_metrics() {
1643        // Verifies that query results include execution metrics
1644        let db = GrafeoDB::new_in_memory();
1645        db.create_node(&["Person"]);
1646        db.create_node(&["Person"]);
1647
1648        #[cfg(feature = "gql")]
1649        {
1650            let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
1651
1652            // Metrics should be populated
1653            assert!(result.execution_time_ms.is_some());
1654            assert!(result.rows_scanned.is_some());
1655            assert!(result.execution_time_ms.unwrap() >= 0.0);
1656            assert_eq!(result.rows_scanned.unwrap(), 2);
1657        }
1658    }
1659
1660    #[test]
1661    fn test_empty_query_result_metrics() {
1662        // Verifies metrics are correct for queries returning no results
1663        let db = GrafeoDB::new_in_memory();
1664        db.create_node(&["Person"]);
1665
1666        #[cfg(feature = "gql")]
1667        {
1668            // Query that matches nothing
1669            let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
1670
1671            assert!(result.execution_time_ms.is_some());
1672            assert!(result.rows_scanned.is_some());
1673            assert_eq!(result.rows_scanned.unwrap(), 0);
1674        }
1675    }
1676
1677    #[cfg(feature = "cdc")]
1678    mod cdc_integration {
1679        use super::*;
1680
1681        #[test]
1682        fn test_node_lifecycle_history() {
1683            let db = GrafeoDB::new_in_memory();
1684
1685            // Create
1686            let id = db.create_node(&["Person"]);
1687            // Update
1688            db.set_node_property(id, "name", "Alix".into());
1689            db.set_node_property(id, "name", "Gus".into());
1690            // Delete
1691            db.delete_node(id);
1692
1693            let history = db.history(id).unwrap();
1694            assert_eq!(history.len(), 4); // create + 2 updates + delete
1695            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1696            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1697            assert!(history[1].before.is_none()); // first set_node_property has no prior value
1698            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
1699            assert!(history[2].before.is_some()); // second update has prior "Alix"
1700            assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
1701        }
1702
1703        #[test]
1704        fn test_edge_lifecycle_history() {
1705            let db = GrafeoDB::new_in_memory();
1706
1707            let alix = db.create_node(&["Person"]);
1708            let gus = db.create_node(&["Person"]);
1709            let edge = db.create_edge(alix, gus, "KNOWS");
1710            db.set_edge_property(edge, "since", 2024i64.into());
1711            db.delete_edge(edge);
1712
1713            let history = db.history(edge).unwrap();
1714            assert_eq!(history.len(), 3); // create + update + delete
1715            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1716            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1717            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
1718        }
1719
1720        #[test]
1721        fn test_create_node_with_props_cdc() {
1722            let db = GrafeoDB::new_in_memory();
1723
1724            let id = db.create_node_with_props(
1725                &["Person"],
1726                vec![
1727                    ("name", grafeo_common::types::Value::from("Alix")),
1728                    ("age", grafeo_common::types::Value::from(30i64)),
1729                ],
1730            );
1731
1732            let history = db.history(id).unwrap();
1733            assert_eq!(history.len(), 1);
1734            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1735            // Props should be captured
1736            let after = history[0].after.as_ref().unwrap();
1737            assert_eq!(after.len(), 2);
1738        }
1739
1740        #[test]
1741        fn test_changes_between() {
1742            let db = GrafeoDB::new_in_memory();
1743
1744            let id1 = db.create_node(&["A"]);
1745            let _id2 = db.create_node(&["B"]);
1746            db.set_node_property(id1, "x", 1i64.into());
1747
1748            // All events should be at the same epoch (in-memory, epoch doesn't advance without tx)
1749            let changes = db
1750                .changes_between(
1751                    grafeo_common::types::EpochId(0),
1752                    grafeo_common::types::EpochId(u64::MAX),
1753                )
1754                .unwrap();
1755            assert_eq!(changes.len(), 3); // 2 creates + 1 update
1756        }
1757    }
1758
1759    #[test]
1760    fn test_with_store_basic() {
1761        use grafeo_core::graph::lpg::LpgStore;
1762
1763        let store = Arc::new(LpgStore::new().unwrap());
1764        let n1 = store.create_node(&["Person"]);
1765        store.set_node_property(n1, "name", "Alix".into());
1766
1767        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1768        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1769
1770        let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
1771        assert_eq!(result.rows.len(), 1);
1772    }
1773
1774    #[test]
1775    fn test_with_store_session() {
1776        use grafeo_core::graph::lpg::LpgStore;
1777
1778        let store = Arc::new(LpgStore::new().unwrap());
1779        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1780        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1781
1782        let session = db.session();
1783        let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
1784        assert_eq!(result.rows.len(), 1);
1785    }
1786
1787    #[test]
1788    fn test_with_store_mutations() {
1789        use grafeo_core::graph::lpg::LpgStore;
1790
1791        let store = Arc::new(LpgStore::new().unwrap());
1792        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1793        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1794
1795        let mut session = db.session();
1796
1797        // Use an explicit transaction so INSERT and MATCH share the same
1798        // transaction context. With PENDING epochs, uncommitted versions are
1799        // only visible to the owning transaction.
1800        session.begin_transaction().unwrap();
1801        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1802
1803        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
1804        assert_eq!(result.rows.len(), 1);
1805
1806        session.commit().unwrap();
1807    }
1808}