Skip to main content

grafeo_engine/database/
mod.rs

1//! The main database struct and operations.
2//!
3//! Start here with [`GrafeoDB`] - it's your handle to everything.
4//!
5//! Operations are split across focused submodules:
6//! - `query` - Query execution (execute, execute_cypher, etc.)
7//! - `crud` - Node/edge CRUD operations
8//! - `index` - Property, vector, and text index management
9//! - `search` - Vector, text, and hybrid search
10//! - `embed` - Embedding model management
11//! - `persistence` - Save, load, snapshots, iteration
12//! - `admin` - Stats, introspection, diagnostics, CDC
13
14mod admin;
15mod crud;
16#[cfg(feature = "embed")]
17mod embed;
18mod index;
19mod persistence;
20mod query;
21mod search;
22
23#[cfg(feature = "wal")]
24use std::path::Path;
25use std::sync::Arc;
26use std::sync::atomic::AtomicUsize;
27
28use parking_lot::RwLock;
29
30#[cfg(feature = "wal")]
31use grafeo_adapters::storage::wal::{
32    DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord, WalRecovery,
33};
34use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
35use grafeo_common::utils::error::Result;
36use grafeo_core::graph::GraphStoreMut;
37use grafeo_core::graph::lpg::LpgStore;
38#[cfg(feature = "rdf")]
39use grafeo_core::graph::rdf::RdfStore;
40
41use crate::catalog::Catalog;
42use crate::config::Config;
43use crate::query::cache::QueryCache;
44use crate::session::Session;
45use crate::transaction::TransactionManager;
46
47/// Your handle to a Grafeo database.
48///
49/// Start here. Create one with [`new_in_memory()`](Self::new_in_memory) for
50/// quick experiments, or [`open()`](Self::open) for persistent storage.
51/// Then grab a [`session()`](Self::session) to start querying.
52///
53/// # Examples
54///
55/// ```
56/// use grafeo_engine::GrafeoDB;
57///
58/// // Quick in-memory database
59/// let db = GrafeoDB::new_in_memory();
60///
61/// // Add some data
62/// db.create_node(&["Person"]);
63///
64/// // Query it
65/// let session = db.session();
66/// let result = session.execute("MATCH (p:Person) RETURN p")?;
67/// # Ok::<(), grafeo_common::utils::error::Error>(())
68/// ```
69pub struct GrafeoDB {
70    /// Database configuration.
71    pub(super) config: Config,
72    /// The underlying graph store.
73    pub(super) store: Arc<LpgStore>,
74    /// Schema and metadata catalog shared across sessions.
75    pub(super) catalog: Arc<Catalog>,
76    /// RDF triple store (if RDF feature is enabled).
77    #[cfg(feature = "rdf")]
78    pub(super) rdf_store: Arc<RdfStore>,
79    /// Transaction manager.
80    pub(super) tx_manager: Arc<TransactionManager>,
81    /// Unified buffer manager.
82    pub(super) buffer_manager: Arc<BufferManager>,
83    /// Write-ahead log manager (if durability is enabled).
84    #[cfg(feature = "wal")]
85    pub(super) wal: Option<Arc<LpgWal>>,
86    /// Query cache for parsed and optimized plans.
87    pub(super) query_cache: Arc<QueryCache>,
88    /// Shared commit counter for auto-GC across sessions.
89    pub(super) commit_counter: Arc<AtomicUsize>,
90    /// Whether the database is open.
91    pub(super) is_open: RwLock<bool>,
92    /// Change data capture log for tracking mutations.
93    #[cfg(feature = "cdc")]
94    pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
95    /// Registered embedding models for text-to-vector conversion.
96    #[cfg(feature = "embed")]
97    pub(super) embedding_models:
98        RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
99    /// External graph store (when using with_store()).
100    /// When set, sessions route queries through this store instead of the built-in LpgStore.
101    pub(super) external_store: Option<Arc<dyn GraphStoreMut>>,
102}
103
104impl GrafeoDB {
105    /// Creates an in-memory database - fast to create, gone when dropped.
106    ///
107    /// Use this for tests, experiments, or when you don't need persistence.
108    /// For data that survives restarts, use [`open()`](Self::open) instead.
109    ///
110    /// # Examples
111    ///
112    /// ```
113    /// use grafeo_engine::GrafeoDB;
114    ///
115    /// let db = GrafeoDB::new_in_memory();
116    /// let session = db.session();
117    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
118    /// # Ok::<(), grafeo_common::utils::error::Error>(())
119    /// ```
120    #[must_use]
121    pub fn new_in_memory() -> Self {
122        Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
123    }
124
125    /// Opens a database at the given path, creating it if it doesn't exist.
126    ///
127    /// If you've used this path before, Grafeo recovers your data from the
128    /// write-ahead log automatically. First open on a new path creates an
129    /// empty database.
130    ///
131    /// # Errors
132    ///
133    /// Returns an error if the path isn't writable or recovery fails.
134    ///
135    /// # Examples
136    ///
137    /// ```no_run
138    /// use grafeo_engine::GrafeoDB;
139    ///
140    /// let db = GrafeoDB::open("./my_social_network")?;
141    /// # Ok::<(), grafeo_common::utils::error::Error>(())
142    /// ```
143    #[cfg(feature = "wal")]
144    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
145        Self::with_config(Config::persistent(path.as_ref()))
146    }
147
148    /// Creates a database with custom configuration.
149    ///
150    /// Use this when you need fine-grained control over memory limits,
151    /// thread counts, or persistence settings. For most cases,
152    /// [`new_in_memory()`](Self::new_in_memory) or [`open()`](Self::open)
153    /// are simpler.
154    ///
155    /// # Errors
156    ///
157    /// Returns an error if the database can't be created or recovery fails.
158    ///
159    /// # Examples
160    ///
161    /// ```
162    /// use grafeo_engine::{GrafeoDB, Config};
163    ///
164    /// // In-memory with a 512MB limit
165    /// let config = Config::in_memory()
166    ///     .with_memory_limit(512 * 1024 * 1024);
167    ///
168    /// let db = GrafeoDB::with_config(config)?;
169    /// # Ok::<(), grafeo_common::utils::error::Error>(())
170    /// ```
171    pub fn with_config(config: Config) -> Result<Self> {
172        // Validate configuration before proceeding
173        config
174            .validate()
175            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
176
177        let store = Arc::new(LpgStore::new());
178        #[cfg(feature = "rdf")]
179        let rdf_store = Arc::new(RdfStore::new());
180        let tx_manager = Arc::new(TransactionManager::new());
181
182        // Create buffer manager with configured limits
183        let buffer_config = BufferManagerConfig {
184            budget: config.memory_limit.unwrap_or_else(|| {
185                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
186            }),
187            spill_path: config
188                .spill_path
189                .clone()
190                .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
191            ..BufferManagerConfig::default()
192        };
193        let buffer_manager = BufferManager::new(buffer_config);
194
195        // Create catalog early so WAL replay can restore schema definitions
196        let catalog = Arc::new(Catalog::new());
197
198        // Initialize WAL if persistence is enabled
199        #[cfg(feature = "wal")]
200        let wal = if config.wal_enabled {
201            if let Some(ref db_path) = config.path {
202                // Create database directory if it doesn't exist
203                std::fs::create_dir_all(db_path)?;
204
205                let wal_path = db_path.join("wal");
206
207                // Check if WAL exists and recover if needed
208                if wal_path.exists() {
209                    let recovery = WalRecovery::new(&wal_path);
210                    let records = recovery.recover()?;
211                    Self::apply_wal_records(&store, &catalog, &records)?;
212                }
213
214                // Open/create WAL manager with configured durability
215                let wal_durability = match config.wal_durability {
216                    crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
217                    crate::config::DurabilityMode::Batch {
218                        max_delay_ms,
219                        max_records,
220                    } => WalDurabilityMode::Batch {
221                        max_delay_ms,
222                        max_records,
223                    },
224                    crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
225                        WalDurabilityMode::Adaptive { target_interval_ms }
226                    }
227                    crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
228                };
229                let wal_config = WalConfig {
230                    durability: wal_durability,
231                    ..WalConfig::default()
232                };
233                let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
234                Some(Arc::new(wal_manager))
235            } else {
236                None
237            }
238        } else {
239            None
240        };
241
242        // Create query cache with default capacity (1000 queries)
243        let query_cache = Arc::new(QueryCache::default());
244
245        Ok(Self {
246            config,
247            store,
248            catalog,
249            #[cfg(feature = "rdf")]
250            rdf_store,
251            tx_manager,
252            buffer_manager,
253            #[cfg(feature = "wal")]
254            wal,
255            query_cache,
256            commit_counter: Arc::new(AtomicUsize::new(0)),
257            is_open: RwLock::new(true),
258            #[cfg(feature = "cdc")]
259            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
260            #[cfg(feature = "embed")]
261            embedding_models: RwLock::new(hashbrown::HashMap::new()),
262            external_store: None,
263        })
264    }
265
266    /// Creates a database backed by a custom [`GraphStoreMut`] implementation.
267    ///
268    /// The external store handles all data persistence. WAL, CDC, and index
269    /// management are the responsibility of the store implementation.
270    ///
271    /// Query execution (all 6 languages, optimizer, planner) works through the
272    /// provided store. Admin operations (schema introspection, persistence,
273    /// vector/text indexes) are not available on external stores.
274    ///
275    /// # Examples
276    ///
277    /// ```no_run
278    /// use std::sync::Arc;
279    /// use grafeo_engine::{GrafeoDB, Config};
280    /// use grafeo_core::graph::GraphStoreMut;
281    ///
282    /// fn example(store: Arc<dyn GraphStoreMut>) -> grafeo_common::utils::error::Result<()> {
283    ///     let db = GrafeoDB::with_store(store, Config::in_memory())?;
284    ///     let result = db.execute("MATCH (n) RETURN count(n)")?;
285    ///     Ok(())
286    /// }
287    /// ```
288    ///
289    /// [`GraphStoreMut`]: grafeo_core::graph::GraphStoreMut
290    pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
291        config
292            .validate()
293            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
294
295        let dummy_store = Arc::new(LpgStore::new());
296        let tx_manager = Arc::new(TransactionManager::new());
297
298        let buffer_config = BufferManagerConfig {
299            budget: config.memory_limit.unwrap_or_else(|| {
300                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
301            }),
302            spill_path: None,
303            ..BufferManagerConfig::default()
304        };
305        let buffer_manager = BufferManager::new(buffer_config);
306
307        let query_cache = Arc::new(QueryCache::default());
308
309        Ok(Self {
310            config,
311            store: dummy_store,
312            catalog: Arc::new(Catalog::new()),
313            #[cfg(feature = "rdf")]
314            rdf_store: Arc::new(RdfStore::new()),
315            tx_manager,
316            buffer_manager,
317            #[cfg(feature = "wal")]
318            wal: None,
319            query_cache,
320            commit_counter: Arc::new(AtomicUsize::new(0)),
321            is_open: RwLock::new(true),
322            #[cfg(feature = "cdc")]
323            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
324            #[cfg(feature = "embed")]
325            embedding_models: RwLock::new(hashbrown::HashMap::new()),
326            external_store: Some(store),
327        })
328    }
329
330    /// Applies WAL records to restore the database state.
331    #[cfg(feature = "wal")]
332    fn apply_wal_records(store: &LpgStore, catalog: &Catalog, records: &[WalRecord]) -> Result<()> {
333        use crate::catalog::{
334            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
335        };
336
337        for record in records {
338            match record {
339                WalRecord::CreateNode { id, labels } => {
340                    let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
341                    store.create_node_with_id(*id, &label_refs);
342                }
343                WalRecord::DeleteNode { id } => {
344                    store.delete_node(*id);
345                }
346                WalRecord::CreateEdge {
347                    id,
348                    src,
349                    dst,
350                    edge_type,
351                } => {
352                    store.create_edge_with_id(*id, *src, *dst, edge_type);
353                }
354                WalRecord::DeleteEdge { id } => {
355                    store.delete_edge(*id);
356                }
357                WalRecord::SetNodeProperty { id, key, value } => {
358                    store.set_node_property(*id, key, value.clone());
359                }
360                WalRecord::SetEdgeProperty { id, key, value } => {
361                    store.set_edge_property(*id, key, value.clone());
362                }
363                WalRecord::AddNodeLabel { id, label } => {
364                    store.add_label(*id, label);
365                }
366                WalRecord::RemoveNodeLabel { id, label } => {
367                    store.remove_label(*id, label);
368                }
369
370                // Schema DDL replay
371                WalRecord::CreateNodeType {
372                    name,
373                    properties,
374                    constraints,
375                } => {
376                    let def = NodeTypeDefinition {
377                        name: name.clone(),
378                        properties: properties
379                            .iter()
380                            .map(|(n, t, nullable)| TypedProperty {
381                                name: n.clone(),
382                                data_type: PropertyDataType::from_type_name(t),
383                                nullable: *nullable,
384                                default_value: None,
385                            })
386                            .collect(),
387                        constraints: constraints
388                            .iter()
389                            .map(|(kind, props)| match kind.as_str() {
390                                "unique" => TypeConstraint::Unique(props.clone()),
391                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
392                                "not_null" if !props.is_empty() => {
393                                    TypeConstraint::NotNull(props[0].clone())
394                                }
395                                _ => TypeConstraint::Unique(props.clone()),
396                            })
397                            .collect(),
398                    };
399                    let _ = catalog.register_node_type(def);
400                }
401                WalRecord::DropNodeType { name } => {
402                    let _ = catalog.drop_node_type(name);
403                }
404                WalRecord::CreateEdgeType {
405                    name,
406                    properties,
407                    constraints,
408                } => {
409                    let def = EdgeTypeDefinition {
410                        name: name.clone(),
411                        properties: properties
412                            .iter()
413                            .map(|(n, t, nullable)| TypedProperty {
414                                name: n.clone(),
415                                data_type: PropertyDataType::from_type_name(t),
416                                nullable: *nullable,
417                                default_value: None,
418                            })
419                            .collect(),
420                        constraints: constraints
421                            .iter()
422                            .map(|(kind, props)| match kind.as_str() {
423                                "unique" => TypeConstraint::Unique(props.clone()),
424                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
425                                "not_null" if !props.is_empty() => {
426                                    TypeConstraint::NotNull(props[0].clone())
427                                }
428                                _ => TypeConstraint::Unique(props.clone()),
429                            })
430                            .collect(),
431                    };
432                    let _ = catalog.register_edge_type_def(def);
433                }
434                WalRecord::DropEdgeType { name } => {
435                    let _ = catalog.drop_edge_type_def(name);
436                }
437                WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
438                    // Index recreation is handled by the store on startup
439                    // (indexes are rebuilt from data, not WAL)
440                }
441                WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
442                    // Constraint definitions are part of type definitions
443                    // and replayed via CreateNodeType/CreateEdgeType
444                }
445                WalRecord::CreateGraphType {
446                    name,
447                    node_types,
448                    edge_types,
449                    open,
450                } => {
451                    use crate::catalog::GraphTypeDefinition;
452                    let def = GraphTypeDefinition {
453                        name: name.clone(),
454                        allowed_node_types: node_types.clone(),
455                        allowed_edge_types: edge_types.clone(),
456                        open: *open,
457                    };
458                    let _ = catalog.register_graph_type(def);
459                }
460                WalRecord::DropGraphType { name } => {
461                    let _ = catalog.drop_graph_type(name);
462                }
463                WalRecord::CreateSchema { name } => {
464                    let _ = catalog.register_schema_namespace(name.clone());
465                }
466                WalRecord::DropSchema { name } => {
467                    let _ = catalog.drop_schema_namespace(name);
468                }
469
470                WalRecord::AlterNodeType { name, alterations } => {
471                    for (action, prop_name, type_name, nullable) in alterations {
472                        match action.as_str() {
473                            "add" => {
474                                let prop = TypedProperty {
475                                    name: prop_name.clone(),
476                                    data_type: PropertyDataType::from_type_name(type_name),
477                                    nullable: *nullable,
478                                    default_value: None,
479                                };
480                                let _ = catalog.alter_node_type_add_property(name, prop);
481                            }
482                            "drop" => {
483                                let _ = catalog.alter_node_type_drop_property(name, prop_name);
484                            }
485                            _ => {}
486                        }
487                    }
488                }
489                WalRecord::AlterEdgeType { name, alterations } => {
490                    for (action, prop_name, type_name, nullable) in alterations {
491                        match action.as_str() {
492                            "add" => {
493                                let prop = TypedProperty {
494                                    name: prop_name.clone(),
495                                    data_type: PropertyDataType::from_type_name(type_name),
496                                    nullable: *nullable,
497                                    default_value: None,
498                                };
499                                let _ = catalog.alter_edge_type_add_property(name, prop);
500                            }
501                            "drop" => {
502                                let _ = catalog.alter_edge_type_drop_property(name, prop_name);
503                            }
504                            _ => {}
505                        }
506                    }
507                }
508                WalRecord::AlterGraphType { name, alterations } => {
509                    for (action, type_name) in alterations {
510                        match action.as_str() {
511                            "add_node" => {
512                                let _ =
513                                    catalog.alter_graph_type_add_node_type(name, type_name.clone());
514                            }
515                            "drop_node" => {
516                                let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
517                            }
518                            "add_edge" => {
519                                let _ =
520                                    catalog.alter_graph_type_add_edge_type(name, type_name.clone());
521                            }
522                            "drop_edge" => {
523                                let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
524                            }
525                            _ => {}
526                        }
527                    }
528                }
529
530                WalRecord::CreateProcedure {
531                    name,
532                    params,
533                    returns,
534                    body,
535                } => {
536                    use crate::catalog::ProcedureDefinition;
537                    let def = ProcedureDefinition {
538                        name: name.clone(),
539                        params: params.clone(),
540                        returns: returns.clone(),
541                        body: body.clone(),
542                    };
543                    let _ = catalog.register_procedure(def);
544                }
545                WalRecord::DropProcedure { name } => {
546                    let _ = catalog.drop_procedure(name);
547                }
548
549                WalRecord::TxCommit { .. }
550                | WalRecord::TxAbort { .. }
551                | WalRecord::Checkpoint { .. } => {
552                    // Transaction control records don't need replay action
553                    // (recovery already filtered to only committed transactions)
554                }
555            }
556        }
557        Ok(())
558    }
559
560    // =========================================================================
561    // Session & Configuration
562    // =========================================================================
563
564    /// Opens a new session for running queries.
565    ///
566    /// Sessions are cheap to create - spin up as many as you need. Each
567    /// gets its own transaction context, so concurrent sessions won't
568    /// block each other on reads.
569    ///
570    /// # Examples
571    ///
572    /// ```
573    /// use grafeo_engine::GrafeoDB;
574    ///
575    /// let db = GrafeoDB::new_in_memory();
576    /// let session = db.session();
577    ///
578    /// // Run queries through the session
579    /// let result = session.execute("MATCH (n) RETURN count(n)")?;
580    /// # Ok::<(), grafeo_common::utils::error::Error>(())
581    /// ```
582    #[must_use]
583    pub fn session(&self) -> Session {
584        if let Some(ref ext_store) = self.external_store {
585            return Session::with_external_store(
586                Arc::clone(ext_store),
587                Arc::clone(&self.tx_manager),
588                Arc::clone(&self.query_cache),
589                Arc::clone(&self.catalog),
590                self.config.adaptive.clone(),
591                self.config.factorized_execution,
592                self.config.graph_model,
593                self.config.query_timeout,
594                Arc::clone(&self.commit_counter),
595                self.config.gc_interval,
596            );
597        }
598
599        #[cfg(feature = "rdf")]
600        let mut session = Session::with_rdf_store_and_adaptive(
601            Arc::clone(&self.store),
602            Arc::clone(&self.rdf_store),
603            Arc::clone(&self.tx_manager),
604            Arc::clone(&self.query_cache),
605            Arc::clone(&self.catalog),
606            self.config.adaptive.clone(),
607            self.config.factorized_execution,
608            self.config.graph_model,
609            self.config.query_timeout,
610            Arc::clone(&self.commit_counter),
611            self.config.gc_interval,
612        );
613        #[cfg(not(feature = "rdf"))]
614        let mut session = Session::with_adaptive(
615            Arc::clone(&self.store),
616            Arc::clone(&self.tx_manager),
617            Arc::clone(&self.query_cache),
618            Arc::clone(&self.catalog),
619            self.config.adaptive.clone(),
620            self.config.factorized_execution,
621            self.config.graph_model,
622            self.config.query_timeout,
623            Arc::clone(&self.commit_counter),
624            self.config.gc_interval,
625        );
626
627        #[cfg(feature = "wal")]
628        if let Some(ref wal) = self.wal {
629            session.set_wal(Arc::clone(wal));
630        }
631
632        #[cfg(feature = "cdc")]
633        session.set_cdc_log(Arc::clone(&self.cdc_log));
634
635        // Suppress unused_mut when cdc/wal are disabled
636        let _ = &mut session;
637
638        session
639    }
640
641    /// Returns the adaptive execution configuration.
642    #[must_use]
643    pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
644        &self.config.adaptive
645    }
646
647    /// Returns the configuration.
648    #[must_use]
649    pub fn config(&self) -> &Config {
650        &self.config
651    }
652
653    /// Returns the graph data model of this database.
654    #[must_use]
655    pub fn graph_model(&self) -> crate::config::GraphModel {
656        self.config.graph_model
657    }
658
659    /// Returns the configured memory limit in bytes, if any.
660    #[must_use]
661    pub fn memory_limit(&self) -> Option<usize> {
662        self.config.memory_limit
663    }
664
665    /// Returns the underlying store.
666    ///
667    /// This provides direct access to the LPG store for algorithm implementations
668    /// and admin operations (index management, schema introspection, MVCC internals).
669    ///
670    /// For code that only needs read/write graph operations, prefer
671    /// [`graph_store()`](Self::graph_store) which returns the trait interface.
672    #[must_use]
673    pub fn store(&self) -> &Arc<LpgStore> {
674        &self.store
675    }
676
677    // === Named Graph Management ===
678
679    /// Creates a named graph. Returns `true` if created, `false` if it already exists.
680    pub fn create_graph(&self, name: &str) -> bool {
681        self.store.create_graph(name)
682    }
683
684    /// Drops a named graph. Returns `true` if dropped, `false` if it did not exist.
685    pub fn drop_graph(&self, name: &str) -> bool {
686        self.store.drop_graph(name)
687    }
688
689    /// Returns all named graph names.
690    #[must_use]
691    pub fn list_graphs(&self) -> Vec<String> {
692        self.store.graph_names()
693    }
694
695    /// Returns the graph store as a trait object.
696    ///
697    /// This provides the [`GraphStoreMut`] interface for code that should work
698    /// with any storage backend. Use this when you only need graph read/write
699    /// operations and don't need admin methods like index management.
700    ///
701    /// [`GraphStoreMut`]: grafeo_core::graph::GraphStoreMut
702    #[must_use]
703    pub fn graph_store(&self) -> Arc<dyn GraphStoreMut> {
704        if let Some(ref ext_store) = self.external_store {
705            Arc::clone(ext_store)
706        } else {
707            Arc::clone(&self.store) as Arc<dyn GraphStoreMut>
708        }
709    }
710
711    /// Garbage collects old MVCC versions that are no longer visible.
712    ///
713    /// Determines the minimum epoch required by active transactions and prunes
714    /// version chains older than that threshold. Also cleans up completed
715    /// transaction metadata in the transaction manager.
716    pub fn gc(&self) {
717        let min_epoch = self.tx_manager.min_active_epoch();
718        self.store.gc_versions(min_epoch);
719        self.tx_manager.gc();
720    }
721
722    /// Returns the buffer manager for memory-aware operations.
723    #[must_use]
724    pub fn buffer_manager(&self) -> &Arc<BufferManager> {
725        &self.buffer_manager
726    }
727
728    /// Returns the query cache.
729    #[must_use]
730    pub fn query_cache(&self) -> &Arc<QueryCache> {
731        &self.query_cache
732    }
733
734    // =========================================================================
735    // Lifecycle
736    // =========================================================================
737
738    /// Closes the database, flushing all pending writes.
739    ///
740    /// For persistent databases, this ensures everything is safely on disk.
741    /// Called automatically when the database is dropped, but you can call
742    /// it explicitly if you need to guarantee durability at a specific point.
743    ///
744    /// # Errors
745    ///
746    /// Returns an error if the WAL can't be flushed (check disk space/permissions).
747    pub fn close(&self) -> Result<()> {
748        let mut is_open = self.is_open.write();
749        if !*is_open {
750            return Ok(());
751        }
752
753        // Commit and checkpoint WAL
754        #[cfg(feature = "wal")]
755        if let Some(ref wal) = self.wal {
756            let epoch = self.store.current_epoch();
757
758            // Use the last assigned transaction ID, or create a checkpoint-only tx
759            let checkpoint_tx = self.tx_manager.last_assigned_tx_id().unwrap_or_else(|| {
760                // No transactions have been started; begin one for checkpoint
761                self.tx_manager.begin()
762            });
763
764            // Log a TxCommit to mark all pending records as committed
765            wal.log(&WalRecord::TxCommit {
766                tx_id: checkpoint_tx,
767            })?;
768
769            // Then checkpoint
770            wal.checkpoint(checkpoint_tx, epoch)?;
771            wal.sync()?;
772        }
773
774        *is_open = false;
775        Ok(())
776    }
777
778    /// Returns the typed WAL if available.
779    #[cfg(feature = "wal")]
780    #[must_use]
781    pub fn wal(&self) -> Option<&Arc<LpgWal>> {
782        self.wal.as_ref()
783    }
784
785    /// Logs a WAL record if WAL is enabled.
786    #[cfg(feature = "wal")]
787    pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
788        if let Some(ref wal) = self.wal {
789            wal.log(record)?;
790        }
791        Ok(())
792    }
793}
794
795impl Drop for GrafeoDB {
796    fn drop(&mut self) {
797        if let Err(e) = self.close() {
798            tracing::error!("Error closing database: {}", e);
799        }
800    }
801}
802
803impl crate::admin::AdminService for GrafeoDB {
804    fn info(&self) -> crate::admin::DatabaseInfo {
805        self.info()
806    }
807
808    fn detailed_stats(&self) -> crate::admin::DatabaseStats {
809        self.detailed_stats()
810    }
811
812    fn schema(&self) -> crate::admin::SchemaInfo {
813        self.schema()
814    }
815
816    fn validate(&self) -> crate::admin::ValidationResult {
817        self.validate()
818    }
819
820    fn wal_status(&self) -> crate::admin::WalStatus {
821        self.wal_status()
822    }
823
824    fn wal_checkpoint(&self) -> Result<()> {
825        self.wal_checkpoint()
826    }
827}
828
829// =========================================================================
830// Query Result Types
831// =========================================================================
832
833/// The result of running a query.
834///
835/// Contains rows and columns, like a table. Use [`iter()`](Self::iter) to
836/// loop through rows, or [`scalar()`](Self::scalar) if you expect a single value.
837///
838/// # Examples
839///
840/// ```
841/// use grafeo_engine::GrafeoDB;
842///
843/// let db = GrafeoDB::new_in_memory();
844/// db.create_node(&["Person"]);
845///
846/// let result = db.execute("MATCH (p:Person) RETURN count(p) AS total")?;
847///
848/// // Check what we got
849/// println!("Columns: {:?}", result.columns);
850/// println!("Rows: {}", result.row_count());
851///
852/// // Iterate through results
853/// for row in result.iter() {
854///     println!("{:?}", row);
855/// }
856/// # Ok::<(), grafeo_common::utils::error::Error>(())
857/// ```
858#[derive(Debug)]
859pub struct QueryResult {
860    /// Column names from the RETURN clause.
861    pub columns: Vec<String>,
862    /// Column types - useful for distinguishing NodeId/EdgeId from plain integers.
863    pub column_types: Vec<grafeo_common::types::LogicalType>,
864    /// The actual result rows.
865    pub rows: Vec<Vec<grafeo_common::types::Value>>,
866    /// Query execution time in milliseconds (if timing was enabled).
867    pub execution_time_ms: Option<f64>,
868    /// Number of rows scanned during query execution (estimate).
869    pub rows_scanned: Option<u64>,
870    /// Status message for DDL and session commands (e.g., "Created node type 'Person'").
871    pub status_message: Option<String>,
872}
873
874impl QueryResult {
875    /// Creates a fully empty query result (no columns, no rows).
876    #[must_use]
877    pub fn empty() -> Self {
878        Self {
879            columns: Vec::new(),
880            column_types: Vec::new(),
881            rows: Vec::new(),
882            execution_time_ms: None,
883            rows_scanned: None,
884            status_message: None,
885        }
886    }
887
888    /// Creates a query result with only a status message (for DDL commands).
889    #[must_use]
890    pub fn status(msg: impl Into<String>) -> Self {
891        Self {
892            columns: Vec::new(),
893            column_types: Vec::new(),
894            rows: Vec::new(),
895            execution_time_ms: None,
896            rows_scanned: None,
897            status_message: Some(msg.into()),
898        }
899    }
900
901    /// Creates a new empty query result.
902    #[must_use]
903    pub fn new(columns: Vec<String>) -> Self {
904        let len = columns.len();
905        Self {
906            columns,
907            column_types: vec![grafeo_common::types::LogicalType::Any; len],
908            rows: Vec::new(),
909            execution_time_ms: None,
910            rows_scanned: None,
911            status_message: None,
912        }
913    }
914
915    /// Creates a new empty query result with column types.
916    #[must_use]
917    pub fn with_types(
918        columns: Vec<String>,
919        column_types: Vec<grafeo_common::types::LogicalType>,
920    ) -> Self {
921        Self {
922            columns,
923            column_types,
924            rows: Vec::new(),
925            execution_time_ms: None,
926            rows_scanned: None,
927            status_message: None,
928        }
929    }
930
931    /// Sets the execution metrics on this result.
932    pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
933        self.execution_time_ms = Some(execution_time_ms);
934        self.rows_scanned = Some(rows_scanned);
935        self
936    }
937
938    /// Returns the execution time in milliseconds, if available.
939    #[must_use]
940    pub fn execution_time_ms(&self) -> Option<f64> {
941        self.execution_time_ms
942    }
943
944    /// Returns the number of rows scanned, if available.
945    #[must_use]
946    pub fn rows_scanned(&self) -> Option<u64> {
947        self.rows_scanned
948    }
949
950    /// Returns the number of rows.
951    #[must_use]
952    pub fn row_count(&self) -> usize {
953        self.rows.len()
954    }
955
956    /// Returns the number of columns.
957    #[must_use]
958    pub fn column_count(&self) -> usize {
959        self.columns.len()
960    }
961
962    /// Returns true if the result is empty.
963    #[must_use]
964    pub fn is_empty(&self) -> bool {
965        self.rows.is_empty()
966    }
967
968    /// Extracts a single value from the result.
969    ///
970    /// Use this when your query returns exactly one row with one column,
971    /// like `RETURN count(n)` or `RETURN sum(p.amount)`.
972    ///
973    /// # Errors
974    ///
975    /// Returns an error if the result has multiple rows or columns.
976    pub fn scalar<T: FromValue>(&self) -> Result<T> {
977        if self.rows.len() != 1 || self.columns.len() != 1 {
978            return Err(grafeo_common::utils::error::Error::InvalidValue(
979                "Expected single value".to_string(),
980            ));
981        }
982        T::from_value(&self.rows[0][0])
983    }
984
985    /// Returns an iterator over the rows.
986    pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
987        self.rows.iter()
988    }
989}
990
991/// Converts a [`grafeo_common::types::Value`] to a concrete Rust type.
992///
993/// Implemented for common types like `i64`, `f64`, `String`, and `bool`.
994/// Used by [`QueryResult::scalar()`] to extract typed values.
995pub trait FromValue: Sized {
996    /// Attempts the conversion, returning an error on type mismatch.
997    fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
998}
999
1000impl FromValue for i64 {
1001    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1002        value
1003            .as_int64()
1004            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1005                expected: "INT64".to_string(),
1006                found: value.type_name().to_string(),
1007            })
1008    }
1009}
1010
1011impl FromValue for f64 {
1012    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1013        value
1014            .as_float64()
1015            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1016                expected: "FLOAT64".to_string(),
1017                found: value.type_name().to_string(),
1018            })
1019    }
1020}
1021
1022impl FromValue for String {
1023    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1024        value.as_str().map(String::from).ok_or_else(|| {
1025            grafeo_common::utils::error::Error::TypeMismatch {
1026                expected: "STRING".to_string(),
1027                found: value.type_name().to_string(),
1028            }
1029        })
1030    }
1031}
1032
1033impl FromValue for bool {
1034    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1035        value
1036            .as_bool()
1037            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1038                expected: "BOOL".to_string(),
1039                found: value.type_name().to_string(),
1040            })
1041    }
1042}
1043
1044#[cfg(test)]
1045mod tests {
1046    use super::*;
1047
1048    #[test]
1049    fn test_create_in_memory_database() {
1050        let db = GrafeoDB::new_in_memory();
1051        assert_eq!(db.node_count(), 0);
1052        assert_eq!(db.edge_count(), 0);
1053    }
1054
1055    #[test]
1056    fn test_database_config() {
1057        let config = Config::in_memory().with_threads(4).with_query_logging();
1058
1059        let db = GrafeoDB::with_config(config).unwrap();
1060        assert_eq!(db.config().threads, 4);
1061        assert!(db.config().query_logging);
1062    }
1063
1064    #[test]
1065    fn test_database_session() {
1066        let db = GrafeoDB::new_in_memory();
1067        let _session = db.session();
1068        // Session should be created successfully
1069    }
1070
1071    #[cfg(feature = "wal")]
1072    #[test]
1073    fn test_persistent_database_recovery() {
1074        use grafeo_common::types::Value;
1075        use tempfile::tempdir;
1076
1077        let dir = tempdir().unwrap();
1078        let db_path = dir.path().join("test_db");
1079
1080        // Create database and add some data
1081        {
1082            let db = GrafeoDB::open(&db_path).unwrap();
1083
1084            let alice = db.create_node(&["Person"]);
1085            db.set_node_property(alice, "name", Value::from("Alice"));
1086
1087            let bob = db.create_node(&["Person"]);
1088            db.set_node_property(bob, "name", Value::from("Bob"));
1089
1090            let _edge = db.create_edge(alice, bob, "KNOWS");
1091
1092            // Explicitly close to flush WAL
1093            db.close().unwrap();
1094        }
1095
1096        // Reopen and verify data was recovered
1097        {
1098            let db = GrafeoDB::open(&db_path).unwrap();
1099
1100            assert_eq!(db.node_count(), 2);
1101            assert_eq!(db.edge_count(), 1);
1102
1103            // Verify nodes exist
1104            let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1105            assert!(node0.is_some());
1106
1107            let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1108            assert!(node1.is_some());
1109        }
1110    }
1111
1112    #[cfg(feature = "wal")]
1113    #[test]
1114    fn test_wal_logging() {
1115        use tempfile::tempdir;
1116
1117        let dir = tempdir().unwrap();
1118        let db_path = dir.path().join("wal_test_db");
1119
1120        let db = GrafeoDB::open(&db_path).unwrap();
1121
1122        // Create some data
1123        let node = db.create_node(&["Test"]);
1124        db.delete_node(node);
1125
1126        // WAL should have records
1127        if let Some(wal) = db.wal() {
1128            assert!(wal.record_count() > 0);
1129        }
1130
1131        db.close().unwrap();
1132    }
1133
1134    #[cfg(feature = "wal")]
1135    #[test]
1136    fn test_wal_recovery_multiple_sessions() {
1137        // Tests that WAL recovery works correctly across multiple open/close cycles
1138        use grafeo_common::types::Value;
1139        use tempfile::tempdir;
1140
1141        let dir = tempdir().unwrap();
1142        let db_path = dir.path().join("multi_session_db");
1143
1144        // Session 1: Create initial data
1145        {
1146            let db = GrafeoDB::open(&db_path).unwrap();
1147            let alice = db.create_node(&["Person"]);
1148            db.set_node_property(alice, "name", Value::from("Alice"));
1149            db.close().unwrap();
1150        }
1151
1152        // Session 2: Add more data
1153        {
1154            let db = GrafeoDB::open(&db_path).unwrap();
1155            assert_eq!(db.node_count(), 1); // Previous data recovered
1156            let bob = db.create_node(&["Person"]);
1157            db.set_node_property(bob, "name", Value::from("Bob"));
1158            db.close().unwrap();
1159        }
1160
1161        // Session 3: Verify all data
1162        {
1163            let db = GrafeoDB::open(&db_path).unwrap();
1164            assert_eq!(db.node_count(), 2);
1165
1166            // Verify properties were recovered correctly
1167            let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
1168            assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
1169
1170            let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
1171            assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
1172        }
1173    }
1174
1175    #[cfg(feature = "wal")]
1176    #[test]
1177    fn test_database_consistency_after_mutations() {
1178        // Tests that database remains consistent after a series of create/delete operations
1179        use grafeo_common::types::Value;
1180        use tempfile::tempdir;
1181
1182        let dir = tempdir().unwrap();
1183        let db_path = dir.path().join("consistency_db");
1184
1185        {
1186            let db = GrafeoDB::open(&db_path).unwrap();
1187
1188            // Create nodes
1189            let a = db.create_node(&["Node"]);
1190            let b = db.create_node(&["Node"]);
1191            let c = db.create_node(&["Node"]);
1192
1193            // Create edges
1194            let e1 = db.create_edge(a, b, "LINKS");
1195            let _e2 = db.create_edge(b, c, "LINKS");
1196
1197            // Delete middle node and its edge
1198            db.delete_edge(e1);
1199            db.delete_node(b);
1200
1201            // Set properties on remaining nodes
1202            db.set_node_property(a, "value", Value::Int64(1));
1203            db.set_node_property(c, "value", Value::Int64(3));
1204
1205            db.close().unwrap();
1206        }
1207
1208        // Reopen and verify consistency
1209        {
1210            let db = GrafeoDB::open(&db_path).unwrap();
1211
1212            // Should have 2 nodes (a and c), b was deleted
1213            // Note: node_count includes deleted nodes in some implementations
1214            // What matters is that the non-deleted nodes are accessible
1215            let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
1216            assert!(node_a.is_some());
1217
1218            let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
1219            assert!(node_c.is_some());
1220
1221            // Middle node should be deleted
1222            let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
1223            assert!(node_b.is_none());
1224        }
1225    }
1226
1227    #[cfg(feature = "wal")]
1228    #[test]
1229    fn test_close_is_idempotent() {
1230        // Calling close() multiple times should not cause errors
1231        use tempfile::tempdir;
1232
1233        let dir = tempdir().unwrap();
1234        let db_path = dir.path().join("close_test_db");
1235
1236        let db = GrafeoDB::open(&db_path).unwrap();
1237        db.create_node(&["Test"]);
1238
1239        // First close should succeed
1240        assert!(db.close().is_ok());
1241
1242        // Second close should also succeed (idempotent)
1243        assert!(db.close().is_ok());
1244    }
1245
1246    #[test]
1247    fn test_query_result_has_metrics() {
1248        // Verifies that query results include execution metrics
1249        let db = GrafeoDB::new_in_memory();
1250        db.create_node(&["Person"]);
1251        db.create_node(&["Person"]);
1252
1253        #[cfg(feature = "gql")]
1254        {
1255            let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
1256
1257            // Metrics should be populated
1258            assert!(result.execution_time_ms.is_some());
1259            assert!(result.rows_scanned.is_some());
1260            assert!(result.execution_time_ms.unwrap() >= 0.0);
1261            assert_eq!(result.rows_scanned.unwrap(), 2);
1262        }
1263    }
1264
1265    #[test]
1266    fn test_empty_query_result_metrics() {
1267        // Verifies metrics are correct for queries returning no results
1268        let db = GrafeoDB::new_in_memory();
1269        db.create_node(&["Person"]);
1270
1271        #[cfg(feature = "gql")]
1272        {
1273            // Query that matches nothing
1274            let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
1275
1276            assert!(result.execution_time_ms.is_some());
1277            assert!(result.rows_scanned.is_some());
1278            assert_eq!(result.rows_scanned.unwrap(), 0);
1279        }
1280    }
1281
1282    #[cfg(feature = "cdc")]
1283    mod cdc_integration {
1284        use super::*;
1285
1286        #[test]
1287        fn test_node_lifecycle_history() {
1288            let db = GrafeoDB::new_in_memory();
1289
1290            // Create
1291            let id = db.create_node(&["Person"]);
1292            // Update
1293            db.set_node_property(id, "name", "Alice".into());
1294            db.set_node_property(id, "name", "Bob".into());
1295            // Delete
1296            db.delete_node(id);
1297
1298            let history = db.history(id).unwrap();
1299            assert_eq!(history.len(), 4); // create + 2 updates + delete
1300            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1301            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1302            assert!(history[1].before.is_none()); // first set_node_property has no prior value
1303            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
1304            assert!(history[2].before.is_some()); // second update has prior "Alice"
1305            assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
1306        }
1307
1308        #[test]
1309        fn test_edge_lifecycle_history() {
1310            let db = GrafeoDB::new_in_memory();
1311
1312            let alice = db.create_node(&["Person"]);
1313            let bob = db.create_node(&["Person"]);
1314            let edge = db.create_edge(alice, bob, "KNOWS");
1315            db.set_edge_property(edge, "since", 2024i64.into());
1316            db.delete_edge(edge);
1317
1318            let history = db.history(edge).unwrap();
1319            assert_eq!(history.len(), 3); // create + update + delete
1320            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1321            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1322            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
1323        }
1324
1325        #[test]
1326        fn test_create_node_with_props_cdc() {
1327            let db = GrafeoDB::new_in_memory();
1328
1329            let id = db.create_node_with_props(
1330                &["Person"],
1331                vec![
1332                    ("name", grafeo_common::types::Value::from("Alice")),
1333                    ("age", grafeo_common::types::Value::from(30i64)),
1334                ],
1335            );
1336
1337            let history = db.history(id).unwrap();
1338            assert_eq!(history.len(), 1);
1339            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1340            // Props should be captured
1341            let after = history[0].after.as_ref().unwrap();
1342            assert_eq!(after.len(), 2);
1343        }
1344
1345        #[test]
1346        fn test_changes_between() {
1347            let db = GrafeoDB::new_in_memory();
1348
1349            let id1 = db.create_node(&["A"]);
1350            let _id2 = db.create_node(&["B"]);
1351            db.set_node_property(id1, "x", 1i64.into());
1352
1353            // All events should be at the same epoch (in-memory, epoch doesn't advance without tx)
1354            let changes = db
1355                .changes_between(
1356                    grafeo_common::types::EpochId(0),
1357                    grafeo_common::types::EpochId(u64::MAX),
1358                )
1359                .unwrap();
1360            assert_eq!(changes.len(), 3); // 2 creates + 1 update
1361        }
1362    }
1363
1364    #[test]
1365    fn test_with_store_basic() {
1366        use grafeo_core::graph::lpg::LpgStore;
1367
1368        let store = Arc::new(LpgStore::new());
1369        let n1 = store.create_node(&["Person"]);
1370        store.set_node_property(n1, "name", "Alice".into());
1371
1372        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1373        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1374
1375        let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
1376        assert_eq!(result.rows.len(), 1);
1377    }
1378
1379    #[test]
1380    fn test_with_store_session() {
1381        use grafeo_core::graph::lpg::LpgStore;
1382
1383        let store = Arc::new(LpgStore::new());
1384        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1385        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1386
1387        let session = db.session();
1388        let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
1389        assert_eq!(result.rows.len(), 1);
1390    }
1391
1392    #[test]
1393    fn test_with_store_mutations() {
1394        use grafeo_core::graph::lpg::LpgStore;
1395
1396        let store = Arc::new(LpgStore::new());
1397        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1398        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1399
1400        let session = db.session();
1401        session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
1402
1403        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
1404        assert_eq!(result.rows.len(), 1);
1405
1406        // Data should also be visible via the original store
1407        assert_eq!(store.node_count(), 1);
1408    }
1409}