Skip to main content

grafeo_engine/
database.rs

1//! The main database struct and operations.
2//!
3//! Start here with [`GrafeoDB`] - it's your handle to everything.
4
5use std::path::Path;
6use std::sync::Arc;
7
8use parking_lot::RwLock;
9
10use grafeo_adapters::storage::wal::{WalConfig, WalManager, WalRecord, WalRecovery};
11use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::lpg::LpgStore;
14#[cfg(feature = "rdf")]
15use grafeo_core::graph::rdf::RdfStore;
16
17use crate::config::Config;
18use crate::session::Session;
19use crate::transaction::TransactionManager;
20
21/// Your handle to a Grafeo database.
22///
23/// Start here. Create one with [`new_in_memory()`](Self::new_in_memory) for
24/// quick experiments, or [`open()`](Self::open) for persistent storage.
25/// Then grab a [`session()`](Self::session) to start querying.
26///
27/// # Examples
28///
29/// ```
30/// use grafeo_engine::GrafeoDB;
31///
32/// // Quick in-memory database
33/// let db = GrafeoDB::new_in_memory();
34///
35/// // Add some data
36/// db.create_node(&["Person"]);
37///
38/// // Query it
39/// let session = db.session();
40/// let result = session.execute("MATCH (p:Person) RETURN p")?;
41/// # Ok::<(), grafeo_common::utils::error::Error>(())
42/// ```
43pub struct GrafeoDB {
44    /// Database configuration.
45    config: Config,
46    /// The underlying graph store.
47    store: Arc<LpgStore>,
48    /// RDF triple store (if RDF feature is enabled).
49    #[cfg(feature = "rdf")]
50    rdf_store: Arc<RdfStore>,
51    /// Transaction manager.
52    tx_manager: Arc<TransactionManager>,
53    /// Unified buffer manager.
54    buffer_manager: Arc<BufferManager>,
55    /// Write-ahead log manager (if durability is enabled).
56    wal: Option<Arc<WalManager>>,
57    /// Whether the database is open.
58    is_open: RwLock<bool>,
59}
60
61impl GrafeoDB {
62    /// Creates an in-memory database - fast to create, gone when dropped.
63    ///
64    /// Use this for tests, experiments, or when you don't need persistence.
65    /// For data that survives restarts, use [`open()`](Self::open) instead.
66    ///
67    /// # Examples
68    ///
69    /// ```
70    /// use grafeo_engine::GrafeoDB;
71    ///
72    /// let db = GrafeoDB::new_in_memory();
73    /// let session = db.session();
74    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
75    /// # Ok::<(), grafeo_common::utils::error::Error>(())
76    /// ```
77    #[must_use]
78    pub fn new_in_memory() -> Self {
79        Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
80    }
81
82    /// Opens a database at the given path, creating it if it doesn't exist.
83    ///
84    /// If you've used this path before, Grafeo recovers your data from the
85    /// write-ahead log automatically. First open on a new path creates an
86    /// empty database.
87    ///
88    /// # Errors
89    ///
90    /// Returns an error if the path isn't writable or recovery fails.
91    ///
92    /// # Examples
93    ///
94    /// ```no_run
95    /// use grafeo_engine::GrafeoDB;
96    ///
97    /// let db = GrafeoDB::open("./my_social_network")?;
98    /// # Ok::<(), grafeo_common::utils::error::Error>(())
99    /// ```
100    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
101        Self::with_config(Config::persistent(path.as_ref()))
102    }
103
104    /// Creates a database with custom configuration.
105    ///
106    /// Use this when you need fine-grained control over memory limits,
107    /// thread counts, or persistence settings. For most cases,
108    /// [`new_in_memory()`](Self::new_in_memory) or [`open()`](Self::open)
109    /// are simpler.
110    ///
111    /// # Errors
112    ///
113    /// Returns an error if the database can't be created or recovery fails.
114    ///
115    /// # Examples
116    ///
117    /// ```
118    /// use grafeo_engine::{GrafeoDB, Config};
119    ///
120    /// // In-memory with a 512MB limit
121    /// let config = Config::in_memory()
122    ///     .with_memory_limit(512 * 1024 * 1024);
123    ///
124    /// let db = GrafeoDB::with_config(config)?;
125    /// # Ok::<(), grafeo_common::utils::error::Error>(())
126    /// ```
127    pub fn with_config(config: Config) -> Result<Self> {
128        let store = Arc::new(LpgStore::new());
129        #[cfg(feature = "rdf")]
130        let rdf_store = Arc::new(RdfStore::new());
131        let tx_manager = Arc::new(TransactionManager::new());
132
133        // Create buffer manager with configured limits
134        let buffer_config = BufferManagerConfig {
135            budget: config.memory_limit.unwrap_or_else(|| {
136                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
137            }),
138            spill_path: config
139                .spill_path
140                .clone()
141                .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
142            ..BufferManagerConfig::default()
143        };
144        let buffer_manager = BufferManager::new(buffer_config);
145
146        // Initialize WAL if persistence is enabled
147        let wal = if config.wal_enabled {
148            if let Some(ref db_path) = config.path {
149                // Create database directory if it doesn't exist
150                std::fs::create_dir_all(db_path)?;
151
152                let wal_path = db_path.join("wal");
153
154                // Check if WAL exists and recover if needed
155                if wal_path.exists() {
156                    let recovery = WalRecovery::new(&wal_path);
157                    let records = recovery.recover()?;
158                    Self::apply_wal_records(&store, &records)?;
159                }
160
161                // Open/create WAL manager
162                let wal_config = WalConfig::default();
163                let wal_manager = WalManager::with_config(&wal_path, wal_config)?;
164                Some(Arc::new(wal_manager))
165            } else {
166                None
167            }
168        } else {
169            None
170        };
171
172        Ok(Self {
173            config,
174            store,
175            #[cfg(feature = "rdf")]
176            rdf_store,
177            tx_manager,
178            buffer_manager,
179            wal,
180            is_open: RwLock::new(true),
181        })
182    }
183
184    /// Applies WAL records to restore the database state.
185    fn apply_wal_records(store: &LpgStore, records: &[WalRecord]) -> Result<()> {
186        for record in records {
187            match record {
188                WalRecord::CreateNode { id, labels } => {
189                    let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
190                    store.create_node_with_id(*id, &label_refs);
191                }
192                WalRecord::DeleteNode { id } => {
193                    store.delete_node(*id);
194                }
195                WalRecord::CreateEdge {
196                    id,
197                    src,
198                    dst,
199                    edge_type,
200                } => {
201                    store.create_edge_with_id(*id, *src, *dst, edge_type);
202                }
203                WalRecord::DeleteEdge { id } => {
204                    store.delete_edge(*id);
205                }
206                WalRecord::SetNodeProperty { id, key, value } => {
207                    store.set_node_property(*id, key, value.clone());
208                }
209                WalRecord::SetEdgeProperty { id, key, value } => {
210                    store.set_edge_property(*id, key, value.clone());
211                }
212                WalRecord::AddNodeLabel { id, label } => {
213                    store.add_label(*id, label);
214                }
215                WalRecord::RemoveNodeLabel { id, label } => {
216                    store.remove_label(*id, label);
217                }
218                WalRecord::TxCommit { .. }
219                | WalRecord::TxAbort { .. }
220                | WalRecord::Checkpoint { .. } => {
221                    // Transaction control records don't need replay action
222                    // (recovery already filtered to only committed transactions)
223                }
224            }
225        }
226        Ok(())
227    }
228
229    /// Opens a new session for running queries.
230    ///
231    /// Sessions are cheap to create - spin up as many as you need. Each
232    /// gets its own transaction context, so concurrent sessions won't
233    /// block each other on reads.
234    ///
235    /// # Examples
236    ///
237    /// ```
238    /// use grafeo_engine::GrafeoDB;
239    ///
240    /// let db = GrafeoDB::new_in_memory();
241    /// let session = db.session();
242    ///
243    /// // Run queries through the session
244    /// let result = session.execute("MATCH (n) RETURN count(n)")?;
245    /// # Ok::<(), grafeo_common::utils::error::Error>(())
246    /// ```
247    #[must_use]
248    pub fn session(&self) -> Session {
249        #[cfg(feature = "rdf")]
250        {
251            Session::with_rdf_store_and_adaptive(
252                Arc::clone(&self.store),
253                Arc::clone(&self.rdf_store),
254                Arc::clone(&self.tx_manager),
255                self.config.adaptive.clone(),
256            )
257        }
258        #[cfg(not(feature = "rdf"))]
259        {
260            Session::with_adaptive(
261                Arc::clone(&self.store),
262                Arc::clone(&self.tx_manager),
263                self.config.adaptive.clone(),
264            )
265        }
266    }
267
268    /// Returns the adaptive execution configuration.
269    #[must_use]
270    pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
271        &self.config.adaptive
272    }
273
274    /// Runs a query directly on the database.
275    ///
276    /// A convenience method that creates a temporary session behind the
277    /// scenes. If you're running multiple queries, grab a
278    /// [`session()`](Self::session) instead to avoid the overhead.
279    ///
280    /// # Errors
281    ///
282    /// Returns an error if parsing or execution fails.
283    pub fn execute(&self, query: &str) -> Result<QueryResult> {
284        let session = self.session();
285        session.execute(query)
286    }
287
288    /// Executes a query with parameters and returns the result.
289    ///
290    /// # Errors
291    ///
292    /// Returns an error if the query fails.
293    pub fn execute_with_params(
294        &self,
295        query: &str,
296        params: std::collections::HashMap<String, grafeo_common::types::Value>,
297    ) -> Result<QueryResult> {
298        let session = self.session();
299        session.execute_with_params(query, params)
300    }
301
302    /// Executes a Cypher query and returns the result.
303    ///
304    /// # Errors
305    ///
306    /// Returns an error if the query fails.
307    #[cfg(feature = "cypher")]
308    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
309        let session = self.session();
310        session.execute_cypher(query)
311    }
312
313    /// Executes a Cypher query with parameters and returns the result.
314    ///
315    /// # Errors
316    ///
317    /// Returns an error if the query fails.
318    #[cfg(feature = "cypher")]
319    pub fn execute_cypher_with_params(
320        &self,
321        query: &str,
322        params: std::collections::HashMap<String, grafeo_common::types::Value>,
323    ) -> Result<QueryResult> {
324        use crate::query::processor::{QueryLanguage, QueryProcessor};
325
326        // Create processor
327        let processor = QueryProcessor::for_lpg(Arc::clone(&self.store));
328        processor.process(query, QueryLanguage::Cypher, Some(&params))
329    }
330
331    /// Executes a Gremlin query and returns the result.
332    ///
333    /// # Errors
334    ///
335    /// Returns an error if the query fails.
336    #[cfg(feature = "gremlin")]
337    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
338        let session = self.session();
339        session.execute_gremlin(query)
340    }
341
342    /// Executes a Gremlin query with parameters and returns the result.
343    ///
344    /// # Errors
345    ///
346    /// Returns an error if the query fails.
347    #[cfg(feature = "gremlin")]
348    pub fn execute_gremlin_with_params(
349        &self,
350        query: &str,
351        params: std::collections::HashMap<String, grafeo_common::types::Value>,
352    ) -> Result<QueryResult> {
353        let session = self.session();
354        session.execute_gremlin_with_params(query, params)
355    }
356
357    /// Executes a GraphQL query and returns the result.
358    ///
359    /// # Errors
360    ///
361    /// Returns an error if the query fails.
362    #[cfg(feature = "graphql")]
363    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
364        let session = self.session();
365        session.execute_graphql(query)
366    }
367
368    /// Executes a GraphQL query with parameters and returns the result.
369    ///
370    /// # Errors
371    ///
372    /// Returns an error if the query fails.
373    #[cfg(feature = "graphql")]
374    pub fn execute_graphql_with_params(
375        &self,
376        query: &str,
377        params: std::collections::HashMap<String, grafeo_common::types::Value>,
378    ) -> Result<QueryResult> {
379        let session = self.session();
380        session.execute_graphql_with_params(query, params)
381    }
382
383    /// Executes a SPARQL query and returns the result.
384    ///
385    /// SPARQL queries operate on the RDF triple store.
386    ///
387    /// # Errors
388    ///
389    /// Returns an error if the query fails.
390    ///
391    /// # Examples
392    ///
393    /// ```ignore
394    /// use grafeo_engine::GrafeoDB;
395    ///
396    /// let db = GrafeoDB::new_in_memory();
397    /// let result = db.execute_sparql("SELECT ?s ?p ?o WHERE { ?s ?p ?o }")?;
398    /// ```
399    #[cfg(all(feature = "sparql", feature = "rdf"))]
400    pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
401        use crate::query::{
402            Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
403        };
404
405        // Parse and translate the SPARQL query to a logical plan
406        let logical_plan = sparql_translator::translate(query)?;
407
408        // Optimize the plan
409        let optimizer = Optimizer::new();
410        let optimized_plan = optimizer.optimize(logical_plan)?;
411
412        // Convert to physical plan using RDF planner
413        let planner = RdfPlanner::new(Arc::clone(&self.rdf_store));
414        let mut physical_plan = planner.plan(&optimized_plan)?;
415
416        // Execute the plan
417        let executor = Executor::with_columns(physical_plan.columns.clone());
418        executor.execute(physical_plan.operator.as_mut())
419    }
420
421    /// Returns the RDF store.
422    ///
423    /// This provides direct access to the RDF store for triple operations.
424    #[cfg(feature = "rdf")]
425    #[must_use]
426    pub fn rdf_store(&self) -> &Arc<RdfStore> {
427        &self.rdf_store
428    }
429
430    /// Executes a query and returns a single scalar value.
431    ///
432    /// # Errors
433    ///
434    /// Returns an error if the query fails or doesn't return exactly one row.
435    pub fn query_scalar<T: FromValue>(&self, query: &str) -> Result<T> {
436        let result = self.execute(query)?;
437        result.scalar()
438    }
439
440    /// Returns the configuration.
441    #[must_use]
442    pub fn config(&self) -> &Config {
443        &self.config
444    }
445
446    /// Returns the underlying store.
447    ///
448    /// This provides direct access to the LPG store for algorithm implementations.
449    #[must_use]
450    pub fn store(&self) -> &Arc<LpgStore> {
451        &self.store
452    }
453
454    /// Returns the buffer manager for memory-aware operations.
455    #[must_use]
456    pub fn buffer_manager(&self) -> &Arc<BufferManager> {
457        &self.buffer_manager
458    }
459
460    /// Closes the database, flushing all pending writes.
461    ///
462    /// For persistent databases, this ensures everything is safely on disk.
463    /// Called automatically when the database is dropped, but you can call
464    /// it explicitly if you need to guarantee durability at a specific point.
465    ///
466    /// # Errors
467    ///
468    /// Returns an error if the WAL can't be flushed (check disk space/permissions).
469    pub fn close(&self) -> Result<()> {
470        let mut is_open = self.is_open.write();
471        if !*is_open {
472            return Ok(());
473        }
474
475        // Commit and checkpoint WAL
476        if let Some(ref wal) = self.wal {
477            let epoch = self.store.current_epoch();
478
479            // Use the last assigned transaction ID, or create a checkpoint-only tx
480            let checkpoint_tx = self.tx_manager.last_assigned_tx_id().unwrap_or_else(|| {
481                // No transactions have been started; begin one for checkpoint
482                self.tx_manager.begin()
483            });
484
485            // Log a TxCommit to mark all pending records as committed
486            wal.log(&WalRecord::TxCommit {
487                tx_id: checkpoint_tx,
488            })?;
489
490            // Then checkpoint
491            wal.checkpoint(checkpoint_tx, epoch)?;
492            wal.sync()?;
493        }
494
495        *is_open = false;
496        Ok(())
497    }
498
499    /// Returns the WAL manager if available.
500    #[must_use]
501    pub fn wal(&self) -> Option<&Arc<WalManager>> {
502        self.wal.as_ref()
503    }
504
505    /// Logs a WAL record if WAL is enabled.
506    fn log_wal(&self, record: &WalRecord) -> Result<()> {
507        if let Some(ref wal) = self.wal {
508            wal.log(record)?;
509        }
510        Ok(())
511    }
512
513    /// Returns the number of nodes in the database.
514    #[must_use]
515    pub fn node_count(&self) -> usize {
516        self.store.node_count()
517    }
518
519    /// Returns the number of edges in the database.
520    #[must_use]
521    pub fn edge_count(&self) -> usize {
522        self.store.edge_count()
523    }
524
525    /// Returns the number of distinct labels in the database.
526    #[must_use]
527    pub fn label_count(&self) -> usize {
528        self.store.label_count()
529    }
530
531    /// Returns the number of distinct property keys in the database.
532    #[must_use]
533    pub fn property_key_count(&self) -> usize {
534        self.store.property_key_count()
535    }
536
537    /// Returns the number of distinct edge types in the database.
538    #[must_use]
539    pub fn edge_type_count(&self) -> usize {
540        self.store.edge_type_count()
541    }
542
543    // === Node Operations ===
544
545    /// Creates a node with the given labels and returns its ID.
546    ///
547    /// Labels categorize nodes - think of them like tags. A node can have
548    /// multiple labels (e.g., `["Person", "Employee"]`).
549    ///
550    /// # Examples
551    ///
552    /// ```
553    /// use grafeo_engine::GrafeoDB;
554    ///
555    /// let db = GrafeoDB::new_in_memory();
556    /// let alice = db.create_node(&["Person"]);
557    /// let company = db.create_node(&["Company", "Startup"]);
558    /// ```
559    pub fn create_node(&self, labels: &[&str]) -> grafeo_common::types::NodeId {
560        let id = self.store.create_node(labels);
561
562        // Log to WAL if enabled
563        if let Err(e) = self.log_wal(&WalRecord::CreateNode {
564            id,
565            labels: labels.iter().map(|s| s.to_string()).collect(),
566        }) {
567            tracing::warn!("Failed to log CreateNode to WAL: {}", e);
568        }
569
570        id
571    }
572
573    /// Creates a new node with labels and properties.
574    ///
575    /// If WAL is enabled, the operation is logged for durability.
576    pub fn create_node_with_props(
577        &self,
578        labels: &[&str],
579        properties: impl IntoIterator<
580            Item = (
581                impl Into<grafeo_common::types::PropertyKey>,
582                impl Into<grafeo_common::types::Value>,
583            ),
584        >,
585    ) -> grafeo_common::types::NodeId {
586        // Collect properties first so we can log them to WAL
587        let props: Vec<(
588            grafeo_common::types::PropertyKey,
589            grafeo_common::types::Value,
590        )> = properties
591            .into_iter()
592            .map(|(k, v)| (k.into(), v.into()))
593            .collect();
594
595        let id = self
596            .store
597            .create_node_with_props(labels, props.iter().map(|(k, v)| (k.clone(), v.clone())));
598
599        // Log node creation to WAL
600        if let Err(e) = self.log_wal(&WalRecord::CreateNode {
601            id,
602            labels: labels.iter().map(|s| s.to_string()).collect(),
603        }) {
604            tracing::warn!("Failed to log CreateNode to WAL: {}", e);
605        }
606
607        // Log each property to WAL for full durability
608        for (key, value) in props {
609            if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
610                id,
611                key: key.to_string(),
612                value,
613            }) {
614                tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
615            }
616        }
617
618        id
619    }
620
621    /// Gets a node by ID.
622    #[must_use]
623    pub fn get_node(
624        &self,
625        id: grafeo_common::types::NodeId,
626    ) -> Option<grafeo_core::graph::lpg::Node> {
627        self.store.get_node(id)
628    }
629
630    /// Deletes a node and all its edges.
631    ///
632    /// If WAL is enabled, the operation is logged for durability.
633    pub fn delete_node(&self, id: grafeo_common::types::NodeId) -> bool {
634        let result = self.store.delete_node(id);
635
636        if result {
637            if let Err(e) = self.log_wal(&WalRecord::DeleteNode { id }) {
638                tracing::warn!("Failed to log DeleteNode to WAL: {}", e);
639            }
640        }
641
642        result
643    }
644
645    /// Sets a property on a node.
646    ///
647    /// If WAL is enabled, the operation is logged for durability.
648    pub fn set_node_property(
649        &self,
650        id: grafeo_common::types::NodeId,
651        key: &str,
652        value: grafeo_common::types::Value,
653    ) {
654        // Log to WAL first
655        if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
656            id,
657            key: key.to_string(),
658            value: value.clone(),
659        }) {
660            tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
661        }
662
663        self.store.set_node_property(id, key, value);
664    }
665
666    /// Adds a label to an existing node.
667    ///
668    /// Returns `true` if the label was added, `false` if the node doesn't exist
669    /// or already has the label.
670    ///
671    /// # Examples
672    ///
673    /// ```
674    /// use grafeo_engine::GrafeoDB;
675    ///
676    /// let db = GrafeoDB::new_in_memory();
677    /// let alice = db.create_node(&["Person"]);
678    ///
679    /// // Promote Alice to Employee
680    /// let added = db.add_node_label(alice, "Employee");
681    /// assert!(added);
682    /// ```
683    pub fn add_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
684        let result = self.store.add_label(id, label);
685
686        if result {
687            // Log to WAL if enabled
688            if let Err(e) = self.log_wal(&WalRecord::AddNodeLabel {
689                id,
690                label: label.to_string(),
691            }) {
692                tracing::warn!("Failed to log AddNodeLabel to WAL: {}", e);
693            }
694        }
695
696        result
697    }
698
699    /// Removes a label from a node.
700    ///
701    /// Returns `true` if the label was removed, `false` if the node doesn't exist
702    /// or doesn't have the label.
703    ///
704    /// # Examples
705    ///
706    /// ```
707    /// use grafeo_engine::GrafeoDB;
708    ///
709    /// let db = GrafeoDB::new_in_memory();
710    /// let alice = db.create_node(&["Person", "Employee"]);
711    ///
712    /// // Remove Employee status
713    /// let removed = db.remove_node_label(alice, "Employee");
714    /// assert!(removed);
715    /// ```
716    pub fn remove_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
717        let result = self.store.remove_label(id, label);
718
719        if result {
720            // Log to WAL if enabled
721            if let Err(e) = self.log_wal(&WalRecord::RemoveNodeLabel {
722                id,
723                label: label.to_string(),
724            }) {
725                tracing::warn!("Failed to log RemoveNodeLabel to WAL: {}", e);
726            }
727        }
728
729        result
730    }
731
732    /// Gets all labels for a node.
733    ///
734    /// Returns `None` if the node doesn't exist.
735    ///
736    /// # Examples
737    ///
738    /// ```
739    /// use grafeo_engine::GrafeoDB;
740    ///
741    /// let db = GrafeoDB::new_in_memory();
742    /// let alice = db.create_node(&["Person", "Employee"]);
743    ///
744    /// let labels = db.get_node_labels(alice).unwrap();
745    /// assert!(labels.contains(&"Person".to_string()));
746    /// assert!(labels.contains(&"Employee".to_string()));
747    /// ```
748    #[must_use]
749    pub fn get_node_labels(&self, id: grafeo_common::types::NodeId) -> Option<Vec<String>> {
750        self.store
751            .get_node(id)
752            .map(|node| node.labels.iter().map(|s| s.to_string()).collect())
753    }
754
755    // === Edge Operations ===
756
757    /// Creates an edge (relationship) between two nodes.
758    ///
759    /// Edges connect nodes and have a type that describes the relationship.
760    /// They're directed - the order of `src` and `dst` matters.
761    ///
762    /// # Examples
763    ///
764    /// ```
765    /// use grafeo_engine::GrafeoDB;
766    ///
767    /// let db = GrafeoDB::new_in_memory();
768    /// let alice = db.create_node(&["Person"]);
769    /// let bob = db.create_node(&["Person"]);
770    ///
771    /// // Alice knows Bob (directed: Alice -> Bob)
772    /// let edge = db.create_edge(alice, bob, "KNOWS");
773    /// ```
774    pub fn create_edge(
775        &self,
776        src: grafeo_common::types::NodeId,
777        dst: grafeo_common::types::NodeId,
778        edge_type: &str,
779    ) -> grafeo_common::types::EdgeId {
780        let id = self.store.create_edge(src, dst, edge_type);
781
782        // Log to WAL if enabled
783        if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
784            id,
785            src,
786            dst,
787            edge_type: edge_type.to_string(),
788        }) {
789            tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
790        }
791
792        id
793    }
794
795    /// Creates a new edge with properties.
796    ///
797    /// If WAL is enabled, the operation is logged for durability.
798    pub fn create_edge_with_props(
799        &self,
800        src: grafeo_common::types::NodeId,
801        dst: grafeo_common::types::NodeId,
802        edge_type: &str,
803        properties: impl IntoIterator<
804            Item = (
805                impl Into<grafeo_common::types::PropertyKey>,
806                impl Into<grafeo_common::types::Value>,
807            ),
808        >,
809    ) -> grafeo_common::types::EdgeId {
810        // Collect properties first so we can log them to WAL
811        let props: Vec<(
812            grafeo_common::types::PropertyKey,
813            grafeo_common::types::Value,
814        )> = properties
815            .into_iter()
816            .map(|(k, v)| (k.into(), v.into()))
817            .collect();
818
819        let id = self.store.create_edge_with_props(
820            src,
821            dst,
822            edge_type,
823            props.iter().map(|(k, v)| (k.clone(), v.clone())),
824        );
825
826        // Log edge creation to WAL
827        if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
828            id,
829            src,
830            dst,
831            edge_type: edge_type.to_string(),
832        }) {
833            tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
834        }
835
836        // Log each property to WAL for full durability
837        for (key, value) in props {
838            if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
839                id,
840                key: key.to_string(),
841                value,
842            }) {
843                tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
844            }
845        }
846
847        id
848    }
849
850    /// Gets an edge by ID.
851    #[must_use]
852    pub fn get_edge(
853        &self,
854        id: grafeo_common::types::EdgeId,
855    ) -> Option<grafeo_core::graph::lpg::Edge> {
856        self.store.get_edge(id)
857    }
858
859    /// Deletes an edge.
860    ///
861    /// If WAL is enabled, the operation is logged for durability.
862    pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
863        let result = self.store.delete_edge(id);
864
865        if result {
866            if let Err(e) = self.log_wal(&WalRecord::DeleteEdge { id }) {
867                tracing::warn!("Failed to log DeleteEdge to WAL: {}", e);
868            }
869        }
870
871        result
872    }
873
874    /// Sets a property on an edge.
875    ///
876    /// If WAL is enabled, the operation is logged for durability.
877    pub fn set_edge_property(
878        &self,
879        id: grafeo_common::types::EdgeId,
880        key: &str,
881        value: grafeo_common::types::Value,
882    ) {
883        // Log to WAL first
884        if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
885            id,
886            key: key.to_string(),
887            value: value.clone(),
888        }) {
889            tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
890        }
891        self.store.set_edge_property(id, key, value);
892    }
893
894    /// Removes a property from a node.
895    ///
896    /// Returns true if the property existed and was removed, false otherwise.
897    pub fn remove_node_property(&self, id: grafeo_common::types::NodeId, key: &str) -> bool {
898        // Note: RemoveProperty WAL records not yet implemented, but operation works in memory
899        self.store.remove_node_property(id, key).is_some()
900    }
901
902    /// Removes a property from an edge.
903    ///
904    /// Returns true if the property existed and was removed, false otherwise.
905    pub fn remove_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str) -> bool {
906        // Note: RemoveProperty WAL records not yet implemented, but operation works in memory
907        self.store.remove_edge_property(id, key).is_some()
908    }
909
910    // =========================================================================
911    // ADMIN API: Introspection
912    // =========================================================================
913
914    /// Returns true if this database is backed by a file (persistent).
915    ///
916    /// In-memory databases return false.
917    #[must_use]
918    pub fn is_persistent(&self) -> bool {
919        self.config.path.is_some()
920    }
921
922    /// Returns the database file path, if persistent.
923    ///
924    /// In-memory databases return None.
925    #[must_use]
926    pub fn path(&self) -> Option<&Path> {
927        self.config.path.as_deref()
928    }
929
930    /// Returns high-level database information.
931    ///
932    /// Includes node/edge counts, persistence status, and mode (LPG/RDF).
933    #[must_use]
934    pub fn info(&self) -> crate::admin::DatabaseInfo {
935        crate::admin::DatabaseInfo {
936            mode: crate::admin::DatabaseMode::Lpg,
937            node_count: self.store.node_count(),
938            edge_count: self.store.edge_count(),
939            is_persistent: self.is_persistent(),
940            path: self.config.path.clone(),
941            wal_enabled: self.config.wal_enabled,
942            version: env!("CARGO_PKG_VERSION").to_string(),
943        }
944    }
945
946    /// Returns detailed database statistics.
947    ///
948    /// Includes counts, memory usage, and index information.
949    #[must_use]
950    pub fn detailed_stats(&self) -> crate::admin::DatabaseStats {
951        let disk_bytes = self.config.path.as_ref().and_then(|p| {
952            if p.exists() {
953                Self::calculate_disk_usage(p).ok()
954            } else {
955                None
956            }
957        });
958
959        crate::admin::DatabaseStats {
960            node_count: self.store.node_count(),
961            edge_count: self.store.edge_count(),
962            label_count: self.store.label_count(),
963            edge_type_count: self.store.edge_type_count(),
964            property_key_count: self.store.property_key_count(),
965            index_count: 0, // TODO: implement index tracking
966            memory_bytes: self.buffer_manager.allocated(),
967            disk_bytes,
968        }
969    }
970
971    /// Calculates total disk usage for the database directory.
972    fn calculate_disk_usage(path: &Path) -> Result<usize> {
973        let mut total = 0usize;
974        if path.is_dir() {
975            for entry in std::fs::read_dir(path)? {
976                let entry = entry?;
977                let metadata = entry.metadata()?;
978                if metadata.is_file() {
979                    total += metadata.len() as usize;
980                } else if metadata.is_dir() {
981                    total += Self::calculate_disk_usage(&entry.path())?;
982                }
983            }
984        }
985        Ok(total)
986    }
987
988    /// Returns schema information (labels, edge types, property keys).
989    ///
990    /// For LPG mode, returns label and edge type information.
991    /// For RDF mode, returns predicate and named graph information.
992    #[must_use]
993    pub fn schema(&self) -> crate::admin::SchemaInfo {
994        let labels = self
995            .store
996            .all_labels()
997            .into_iter()
998            .map(|name| crate::admin::LabelInfo {
999                name: name.clone(),
1000                count: self.store.nodes_with_label(&name).count(),
1001            })
1002            .collect();
1003
1004        let edge_types = self
1005            .store
1006            .all_edge_types()
1007            .into_iter()
1008            .map(|name| crate::admin::EdgeTypeInfo {
1009                name: name.clone(),
1010                count: self.store.edges_with_type(&name).count(),
1011            })
1012            .collect();
1013
1014        let property_keys = self.store.all_property_keys();
1015
1016        crate::admin::SchemaInfo::Lpg(crate::admin::LpgSchemaInfo {
1017            labels,
1018            edge_types,
1019            property_keys,
1020        })
1021    }
1022
1023    /// Returns RDF schema information.
1024    ///
1025    /// Only available when the RDF feature is enabled.
1026    #[cfg(feature = "rdf")]
1027    #[must_use]
1028    pub fn rdf_schema(&self) -> crate::admin::SchemaInfo {
1029        let stats = self.rdf_store.stats();
1030
1031        let predicates = self
1032            .rdf_store
1033            .predicates()
1034            .into_iter()
1035            .map(|predicate| {
1036                let count = self.rdf_store.triples_with_predicate(&predicate).len();
1037                crate::admin::PredicateInfo {
1038                    iri: predicate.to_string(),
1039                    count,
1040                }
1041            })
1042            .collect();
1043
1044        crate::admin::SchemaInfo::Rdf(crate::admin::RdfSchemaInfo {
1045            predicates,
1046            named_graphs: Vec::new(), // Named graphs not yet implemented in RdfStore
1047            subject_count: stats.subject_count,
1048            object_count: stats.object_count,
1049        })
1050    }
1051
1052    /// Validates database integrity.
1053    ///
1054    /// Checks for:
1055    /// - Dangling edge references (edges pointing to non-existent nodes)
1056    /// - Internal index consistency
1057    ///
1058    /// Returns a list of errors and warnings. Empty errors = valid.
1059    #[must_use]
1060    pub fn validate(&self) -> crate::admin::ValidationResult {
1061        let mut result = crate::admin::ValidationResult::default();
1062
1063        // Check for dangling edge references
1064        for edge in self.store.all_edges() {
1065            if self.store.get_node(edge.src).is_none() {
1066                result.errors.push(crate::admin::ValidationError {
1067                    code: "DANGLING_SRC".to_string(),
1068                    message: format!(
1069                        "Edge {} references non-existent source node {}",
1070                        edge.id.0, edge.src.0
1071                    ),
1072                    context: Some(format!("edge:{}", edge.id.0)),
1073                });
1074            }
1075            if self.store.get_node(edge.dst).is_none() {
1076                result.errors.push(crate::admin::ValidationError {
1077                    code: "DANGLING_DST".to_string(),
1078                    message: format!(
1079                        "Edge {} references non-existent destination node {}",
1080                        edge.id.0, edge.dst.0
1081                    ),
1082                    context: Some(format!("edge:{}", edge.id.0)),
1083                });
1084            }
1085        }
1086
1087        // Add warnings for potential issues
1088        if self.store.node_count() > 0 && self.store.edge_count() == 0 {
1089            result.warnings.push(crate::admin::ValidationWarning {
1090                code: "NO_EDGES".to_string(),
1091                message: "Database has nodes but no edges".to_string(),
1092                context: None,
1093            });
1094        }
1095
1096        result
1097    }
1098
1099    /// Returns WAL (Write-Ahead Log) status.
1100    ///
1101    /// Returns None if WAL is not enabled.
1102    #[must_use]
1103    pub fn wal_status(&self) -> crate::admin::WalStatus {
1104        if let Some(ref wal) = self.wal {
1105            crate::admin::WalStatus {
1106                enabled: true,
1107                path: self.config.path.as_ref().map(|p| p.join("wal")),
1108                size_bytes: wal.size_bytes(),
1109                record_count: wal.record_count() as usize,
1110                last_checkpoint: wal.last_checkpoint_timestamp(),
1111                current_epoch: self.store.current_epoch().as_u64(),
1112            }
1113        } else {
1114            crate::admin::WalStatus {
1115                enabled: false,
1116                path: None,
1117                size_bytes: 0,
1118                record_count: 0,
1119                last_checkpoint: None,
1120                current_epoch: self.store.current_epoch().as_u64(),
1121            }
1122        }
1123    }
1124
1125    /// Forces a WAL checkpoint.
1126    ///
1127    /// Flushes all pending WAL records to the main storage.
1128    ///
1129    /// # Errors
1130    ///
1131    /// Returns an error if the checkpoint fails.
1132    pub fn wal_checkpoint(&self) -> Result<()> {
1133        if let Some(ref wal) = self.wal {
1134            let epoch = self.store.current_epoch();
1135            let tx_id = self
1136                .tx_manager
1137                .last_assigned_tx_id()
1138                .unwrap_or_else(|| self.tx_manager.begin());
1139            wal.checkpoint(tx_id, epoch)?;
1140            wal.sync()?;
1141        }
1142        Ok(())
1143    }
1144
1145    // =========================================================================
1146    // ADMIN API: Persistence Control
1147    // =========================================================================
1148
1149    /// Saves the database to a file path.
1150    ///
1151    /// - If in-memory: creates a new persistent database at path
1152    /// - If file-backed: creates a copy at the new path
1153    ///
1154    /// The original database remains unchanged.
1155    ///
1156    /// # Errors
1157    ///
1158    /// Returns an error if the save operation fails.
1159    pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
1160        let path = path.as_ref();
1161
1162        // Create target database with WAL enabled
1163        let target_config = Config::persistent(path);
1164        let target = Self::with_config(target_config)?;
1165
1166        // Copy all nodes using WAL-enabled methods
1167        for node in self.store.all_nodes() {
1168            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
1169            target.store.create_node_with_id(node.id, &label_refs);
1170
1171            // Log to WAL
1172            target.log_wal(&WalRecord::CreateNode {
1173                id: node.id,
1174                labels: node.labels.iter().map(|s| s.to_string()).collect(),
1175            })?;
1176
1177            // Copy properties
1178            for (key, value) in node.properties {
1179                target
1180                    .store
1181                    .set_node_property(node.id, key.as_str(), value.clone());
1182                target.log_wal(&WalRecord::SetNodeProperty {
1183                    id: node.id,
1184                    key: key.to_string(),
1185                    value,
1186                })?;
1187            }
1188        }
1189
1190        // Copy all edges using WAL-enabled methods
1191        for edge in self.store.all_edges() {
1192            target
1193                .store
1194                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
1195
1196            // Log to WAL
1197            target.log_wal(&WalRecord::CreateEdge {
1198                id: edge.id,
1199                src: edge.src,
1200                dst: edge.dst,
1201                edge_type: edge.edge_type.to_string(),
1202            })?;
1203
1204            // Copy properties
1205            for (key, value) in edge.properties {
1206                target
1207                    .store
1208                    .set_edge_property(edge.id, key.as_str(), value.clone());
1209                target.log_wal(&WalRecord::SetEdgeProperty {
1210                    id: edge.id,
1211                    key: key.to_string(),
1212                    value,
1213                })?;
1214            }
1215        }
1216
1217        // Checkpoint and close the target database
1218        target.close()?;
1219
1220        Ok(())
1221    }
1222
1223    /// Creates an in-memory copy of this database.
1224    ///
1225    /// Returns a new database that is completely independent.
1226    /// Useful for:
1227    /// - Testing modifications without affecting the original
1228    /// - Faster operations when persistence isn't needed
1229    ///
1230    /// # Errors
1231    ///
1232    /// Returns an error if the copy operation fails.
1233    pub fn to_memory(&self) -> Result<Self> {
1234        let config = Config::in_memory();
1235        let target = Self::with_config(config)?;
1236
1237        // Copy all nodes
1238        for node in self.store.all_nodes() {
1239            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
1240            target.store.create_node_with_id(node.id, &label_refs);
1241
1242            // Copy properties
1243            for (key, value) in node.properties {
1244                target.store.set_node_property(node.id, key.as_str(), value);
1245            }
1246        }
1247
1248        // Copy all edges
1249        for edge in self.store.all_edges() {
1250            target
1251                .store
1252                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
1253
1254            // Copy properties
1255            for (key, value) in edge.properties {
1256                target.store.set_edge_property(edge.id, key.as_str(), value);
1257            }
1258        }
1259
1260        Ok(target)
1261    }
1262
1263    /// Opens a database file and loads it entirely into memory.
1264    ///
1265    /// The returned database has no connection to the original file.
1266    /// Changes will NOT be written back to the file.
1267    ///
1268    /// # Errors
1269    ///
1270    /// Returns an error if the file can't be opened or loaded.
1271    pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
1272        // Open the source database (triggers WAL recovery)
1273        let source = Self::open(path)?;
1274
1275        // Create in-memory copy
1276        let target = source.to_memory()?;
1277
1278        // Close the source (releases file handles)
1279        source.close()?;
1280
1281        Ok(target)
1282    }
1283
1284    // =========================================================================
1285    // ADMIN API: Iteration
1286    // =========================================================================
1287
1288    /// Returns an iterator over all nodes in the database.
1289    ///
1290    /// Useful for dump/export operations.
1291    pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
1292        self.store.all_nodes()
1293    }
1294
1295    /// Returns an iterator over all edges in the database.
1296    ///
1297    /// Useful for dump/export operations.
1298    pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
1299        self.store.all_edges()
1300    }
1301}
1302
1303impl Drop for GrafeoDB {
1304    fn drop(&mut self) {
1305        if let Err(e) = self.close() {
1306            tracing::error!("Error closing database: {}", e);
1307        }
1308    }
1309}
1310
1311/// The result of running a query.
1312///
1313/// Contains rows and columns, like a table. Use [`iter()`](Self::iter) to
1314/// loop through rows, or [`scalar()`](Self::scalar) if you expect a single value.
1315///
1316/// # Examples
1317///
1318/// ```
1319/// use grafeo_engine::GrafeoDB;
1320///
1321/// let db = GrafeoDB::new_in_memory();
1322/// db.create_node(&["Person"]);
1323///
1324/// let result = db.execute("MATCH (p:Person) RETURN count(p) AS total")?;
1325///
1326/// // Check what we got
1327/// println!("Columns: {:?}", result.columns);
1328/// println!("Rows: {}", result.row_count());
1329///
1330/// // Iterate through results
1331/// for row in result.iter() {
1332///     println!("{:?}", row);
1333/// }
1334/// # Ok::<(), grafeo_common::utils::error::Error>(())
1335/// ```
1336#[derive(Debug)]
1337pub struct QueryResult {
1338    /// Column names from the RETURN clause.
1339    pub columns: Vec<String>,
1340    /// Column types - useful for distinguishing NodeId/EdgeId from plain integers.
1341    pub column_types: Vec<grafeo_common::types::LogicalType>,
1342    /// The actual result rows.
1343    pub rows: Vec<Vec<grafeo_common::types::Value>>,
1344}
1345
1346impl QueryResult {
1347    /// Creates a new empty query result.
1348    #[must_use]
1349    pub fn new(columns: Vec<String>) -> Self {
1350        let len = columns.len();
1351        Self {
1352            columns,
1353            column_types: vec![grafeo_common::types::LogicalType::Any; len],
1354            rows: Vec::new(),
1355        }
1356    }
1357
1358    /// Creates a new empty query result with column types.
1359    #[must_use]
1360    pub fn with_types(
1361        columns: Vec<String>,
1362        column_types: Vec<grafeo_common::types::LogicalType>,
1363    ) -> Self {
1364        Self {
1365            columns,
1366            column_types,
1367            rows: Vec::new(),
1368        }
1369    }
1370
1371    /// Returns the number of rows.
1372    #[must_use]
1373    pub fn row_count(&self) -> usize {
1374        self.rows.len()
1375    }
1376
1377    /// Returns the number of columns.
1378    #[must_use]
1379    pub fn column_count(&self) -> usize {
1380        self.columns.len()
1381    }
1382
1383    /// Returns true if the result is empty.
1384    #[must_use]
1385    pub fn is_empty(&self) -> bool {
1386        self.rows.is_empty()
1387    }
1388
1389    /// Extracts a single value from the result.
1390    ///
1391    /// Use this when your query returns exactly one row with one column,
1392    /// like `RETURN count(n)` or `RETURN sum(p.amount)`.
1393    ///
1394    /// # Errors
1395    ///
1396    /// Returns an error if the result has multiple rows or columns.
1397    pub fn scalar<T: FromValue>(&self) -> Result<T> {
1398        if self.rows.len() != 1 || self.columns.len() != 1 {
1399            return Err(grafeo_common::utils::error::Error::InvalidValue(
1400                "Expected single value".to_string(),
1401            ));
1402        }
1403        T::from_value(&self.rows[0][0])
1404    }
1405
1406    /// Returns an iterator over the rows.
1407    pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1408        self.rows.iter()
1409    }
1410}
1411
1412/// Converts a [`Value`](grafeo_common::types::Value) to a concrete Rust type.
1413///
1414/// Implemented for common types like `i64`, `f64`, `String`, and `bool`.
1415/// Used by [`QueryResult::scalar()`] to extract typed values.
1416pub trait FromValue: Sized {
1417    /// Attempts the conversion, returning an error on type mismatch.
1418    fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1419}
1420
1421impl FromValue for i64 {
1422    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1423        value
1424            .as_int64()
1425            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1426                expected: "INT64".to_string(),
1427                found: value.type_name().to_string(),
1428            })
1429    }
1430}
1431
1432impl FromValue for f64 {
1433    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1434        value
1435            .as_float64()
1436            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1437                expected: "FLOAT64".to_string(),
1438                found: value.type_name().to_string(),
1439            })
1440    }
1441}
1442
1443impl FromValue for String {
1444    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1445        value.as_str().map(String::from).ok_or_else(|| {
1446            grafeo_common::utils::error::Error::TypeMismatch {
1447                expected: "STRING".to_string(),
1448                found: value.type_name().to_string(),
1449            }
1450        })
1451    }
1452}
1453
1454impl FromValue for bool {
1455    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1456        value
1457            .as_bool()
1458            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1459                expected: "BOOL".to_string(),
1460                found: value.type_name().to_string(),
1461            })
1462    }
1463}
1464
1465#[cfg(test)]
1466mod tests {
1467    use super::*;
1468
1469    #[test]
1470    fn test_create_in_memory_database() {
1471        let db = GrafeoDB::new_in_memory();
1472        assert_eq!(db.node_count(), 0);
1473        assert_eq!(db.edge_count(), 0);
1474    }
1475
1476    #[test]
1477    fn test_database_config() {
1478        let config = Config::in_memory().with_threads(4).with_query_logging();
1479
1480        let db = GrafeoDB::with_config(config).unwrap();
1481        assert_eq!(db.config().threads, 4);
1482        assert!(db.config().query_logging);
1483    }
1484
1485    #[test]
1486    fn test_database_session() {
1487        let db = GrafeoDB::new_in_memory();
1488        let _session = db.session();
1489        // Session should be created successfully
1490    }
1491
1492    #[test]
1493    fn test_persistent_database_recovery() {
1494        use grafeo_common::types::Value;
1495        use tempfile::tempdir;
1496
1497        let dir = tempdir().unwrap();
1498        let db_path = dir.path().join("test_db");
1499
1500        // Create database and add some data
1501        {
1502            let db = GrafeoDB::open(&db_path).unwrap();
1503
1504            let alice = db.create_node(&["Person"]);
1505            db.set_node_property(alice, "name", Value::from("Alice"));
1506
1507            let bob = db.create_node(&["Person"]);
1508            db.set_node_property(bob, "name", Value::from("Bob"));
1509
1510            let _edge = db.create_edge(alice, bob, "KNOWS");
1511
1512            // Explicitly close to flush WAL
1513            db.close().unwrap();
1514        }
1515
1516        // Reopen and verify data was recovered
1517        {
1518            let db = GrafeoDB::open(&db_path).unwrap();
1519
1520            assert_eq!(db.node_count(), 2);
1521            assert_eq!(db.edge_count(), 1);
1522
1523            // Verify nodes exist
1524            let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1525            assert!(node0.is_some());
1526
1527            let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1528            assert!(node1.is_some());
1529        }
1530    }
1531
1532    #[test]
1533    fn test_wal_logging() {
1534        use tempfile::tempdir;
1535
1536        let dir = tempdir().unwrap();
1537        let db_path = dir.path().join("wal_test_db");
1538
1539        let db = GrafeoDB::open(&db_path).unwrap();
1540
1541        // Create some data
1542        let node = db.create_node(&["Test"]);
1543        db.delete_node(node);
1544
1545        // WAL should have records
1546        if let Some(wal) = db.wal() {
1547            assert!(wal.record_count() > 0);
1548        }
1549
1550        db.close().unwrap();
1551    }
1552}