Skip to main content

grafeo_engine/
database.rs

1//! The main database struct and operations.
2//!
3//! Start here with [`GrafeoDB`] - it's your handle to everything.
4
5use std::path::Path;
6use std::sync::Arc;
7
8use parking_lot::RwLock;
9
10use grafeo_adapters::storage::wal::{WalConfig, WalManager, WalRecord, WalRecovery};
11use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::lpg::LpgStore;
14#[cfg(feature = "rdf")]
15use grafeo_core::graph::rdf::RdfStore;
16
17use crate::config::Config;
18use crate::query::cache::QueryCache;
19use crate::session::Session;
20use crate::transaction::TransactionManager;
21
22/// Your handle to a Grafeo database.
23///
24/// Start here. Create one with [`new_in_memory()`](Self::new_in_memory) for
25/// quick experiments, or [`open()`](Self::open) for persistent storage.
26/// Then grab a [`session()`](Self::session) to start querying.
27///
28/// # Examples
29///
30/// ```
31/// use grafeo_engine::GrafeoDB;
32///
33/// // Quick in-memory database
34/// let db = GrafeoDB::new_in_memory();
35///
36/// // Add some data
37/// db.create_node(&["Person"]);
38///
39/// // Query it
40/// let session = db.session();
41/// let result = session.execute("MATCH (p:Person) RETURN p")?;
42/// # Ok::<(), grafeo_common::utils::error::Error>(())
43/// ```
44pub struct GrafeoDB {
45    /// Database configuration.
46    config: Config,
47    /// The underlying graph store.
48    store: Arc<LpgStore>,
49    /// RDF triple store (if RDF feature is enabled).
50    #[cfg(feature = "rdf")]
51    rdf_store: Arc<RdfStore>,
52    /// Transaction manager.
53    tx_manager: Arc<TransactionManager>,
54    /// Unified buffer manager.
55    buffer_manager: Arc<BufferManager>,
56    /// Write-ahead log manager (if durability is enabled).
57    wal: Option<Arc<WalManager>>,
58    /// Query cache for parsed and optimized plans.
59    query_cache: Arc<QueryCache>,
60    /// Whether the database is open.
61    is_open: RwLock<bool>,
62}
63
64impl GrafeoDB {
65    /// Creates an in-memory database - fast to create, gone when dropped.
66    ///
67    /// Use this for tests, experiments, or when you don't need persistence.
68    /// For data that survives restarts, use [`open()`](Self::open) instead.
69    ///
70    /// # Examples
71    ///
72    /// ```
73    /// use grafeo_engine::GrafeoDB;
74    ///
75    /// let db = GrafeoDB::new_in_memory();
76    /// let session = db.session();
77    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
78    /// # Ok::<(), grafeo_common::utils::error::Error>(())
79    /// ```
80    #[must_use]
81    pub fn new_in_memory() -> Self {
82        Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
83    }
84
85    /// Opens a database at the given path, creating it if it doesn't exist.
86    ///
87    /// If you've used this path before, Grafeo recovers your data from the
88    /// write-ahead log automatically. First open on a new path creates an
89    /// empty database.
90    ///
91    /// # Errors
92    ///
93    /// Returns an error if the path isn't writable or recovery fails.
94    ///
95    /// # Examples
96    ///
97    /// ```no_run
98    /// use grafeo_engine::GrafeoDB;
99    ///
100    /// let db = GrafeoDB::open("./my_social_network")?;
101    /// # Ok::<(), grafeo_common::utils::error::Error>(())
102    /// ```
103    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
104        Self::with_config(Config::persistent(path.as_ref()))
105    }
106
107    /// Creates a database with custom configuration.
108    ///
109    /// Use this when you need fine-grained control over memory limits,
110    /// thread counts, or persistence settings. For most cases,
111    /// [`new_in_memory()`](Self::new_in_memory) or [`open()`](Self::open)
112    /// are simpler.
113    ///
114    /// # Errors
115    ///
116    /// Returns an error if the database can't be created or recovery fails.
117    ///
118    /// # Examples
119    ///
120    /// ```
121    /// use grafeo_engine::{GrafeoDB, Config};
122    ///
123    /// // In-memory with a 512MB limit
124    /// let config = Config::in_memory()
125    ///     .with_memory_limit(512 * 1024 * 1024);
126    ///
127    /// let db = GrafeoDB::with_config(config)?;
128    /// # Ok::<(), grafeo_common::utils::error::Error>(())
129    /// ```
130    pub fn with_config(config: Config) -> Result<Self> {
131        let store = Arc::new(LpgStore::new());
132        #[cfg(feature = "rdf")]
133        let rdf_store = Arc::new(RdfStore::new());
134        let tx_manager = Arc::new(TransactionManager::new());
135
136        // Create buffer manager with configured limits
137        let buffer_config = BufferManagerConfig {
138            budget: config.memory_limit.unwrap_or_else(|| {
139                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
140            }),
141            spill_path: config
142                .spill_path
143                .clone()
144                .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
145            ..BufferManagerConfig::default()
146        };
147        let buffer_manager = BufferManager::new(buffer_config);
148
149        // Initialize WAL if persistence is enabled
150        let wal = if config.wal_enabled {
151            if let Some(ref db_path) = config.path {
152                // Create database directory if it doesn't exist
153                std::fs::create_dir_all(db_path)?;
154
155                let wal_path = db_path.join("wal");
156
157                // Check if WAL exists and recover if needed
158                if wal_path.exists() {
159                    let recovery = WalRecovery::new(&wal_path);
160                    let records = recovery.recover()?;
161                    Self::apply_wal_records(&store, &records)?;
162                }
163
164                // Open/create WAL manager
165                let wal_config = WalConfig::default();
166                let wal_manager = WalManager::with_config(&wal_path, wal_config)?;
167                Some(Arc::new(wal_manager))
168            } else {
169                None
170            }
171        } else {
172            None
173        };
174
175        // Create query cache with default capacity (1000 queries)
176        let query_cache = Arc::new(QueryCache::default());
177
178        Ok(Self {
179            config,
180            store,
181            #[cfg(feature = "rdf")]
182            rdf_store,
183            tx_manager,
184            buffer_manager,
185            wal,
186            query_cache,
187            is_open: RwLock::new(true),
188        })
189    }
190
191    /// Applies WAL records to restore the database state.
192    fn apply_wal_records(store: &LpgStore, records: &[WalRecord]) -> Result<()> {
193        for record in records {
194            match record {
195                WalRecord::CreateNode { id, labels } => {
196                    let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
197                    store.create_node_with_id(*id, &label_refs);
198                }
199                WalRecord::DeleteNode { id } => {
200                    store.delete_node(*id);
201                }
202                WalRecord::CreateEdge {
203                    id,
204                    src,
205                    dst,
206                    edge_type,
207                } => {
208                    store.create_edge_with_id(*id, *src, *dst, edge_type);
209                }
210                WalRecord::DeleteEdge { id } => {
211                    store.delete_edge(*id);
212                }
213                WalRecord::SetNodeProperty { id, key, value } => {
214                    store.set_node_property(*id, key, value.clone());
215                }
216                WalRecord::SetEdgeProperty { id, key, value } => {
217                    store.set_edge_property(*id, key, value.clone());
218                }
219                WalRecord::AddNodeLabel { id, label } => {
220                    store.add_label(*id, label);
221                }
222                WalRecord::RemoveNodeLabel { id, label } => {
223                    store.remove_label(*id, label);
224                }
225                WalRecord::TxCommit { .. }
226                | WalRecord::TxAbort { .. }
227                | WalRecord::Checkpoint { .. } => {
228                    // Transaction control records don't need replay action
229                    // (recovery already filtered to only committed transactions)
230                }
231            }
232        }
233        Ok(())
234    }
235
236    /// Opens a new session for running queries.
237    ///
238    /// Sessions are cheap to create - spin up as many as you need. Each
239    /// gets its own transaction context, so concurrent sessions won't
240    /// block each other on reads.
241    ///
242    /// # Examples
243    ///
244    /// ```
245    /// use grafeo_engine::GrafeoDB;
246    ///
247    /// let db = GrafeoDB::new_in_memory();
248    /// let session = db.session();
249    ///
250    /// // Run queries through the session
251    /// let result = session.execute("MATCH (n) RETURN count(n)")?;
252    /// # Ok::<(), grafeo_common::utils::error::Error>(())
253    /// ```
254    #[must_use]
255    pub fn session(&self) -> Session {
256        #[cfg(feature = "rdf")]
257        {
258            Session::with_rdf_store_and_adaptive(
259                Arc::clone(&self.store),
260                Arc::clone(&self.rdf_store),
261                Arc::clone(&self.tx_manager),
262                Arc::clone(&self.query_cache),
263                self.config.adaptive.clone(),
264                self.config.factorized_execution,
265            )
266        }
267        #[cfg(not(feature = "rdf"))]
268        {
269            Session::with_adaptive(
270                Arc::clone(&self.store),
271                Arc::clone(&self.tx_manager),
272                Arc::clone(&self.query_cache),
273                self.config.adaptive.clone(),
274                self.config.factorized_execution,
275            )
276        }
277    }
278
279    /// Returns the adaptive execution configuration.
280    #[must_use]
281    pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
282        &self.config.adaptive
283    }
284
285    /// Runs a query directly on the database.
286    ///
287    /// A convenience method that creates a temporary session behind the
288    /// scenes. If you're running multiple queries, grab a
289    /// [`session()`](Self::session) instead to avoid the overhead.
290    ///
291    /// # Errors
292    ///
293    /// Returns an error if parsing or execution fails.
294    pub fn execute(&self, query: &str) -> Result<QueryResult> {
295        let session = self.session();
296        session.execute(query)
297    }
298
299    /// Executes a query with parameters and returns the result.
300    ///
301    /// # Errors
302    ///
303    /// Returns an error if the query fails.
304    pub fn execute_with_params(
305        &self,
306        query: &str,
307        params: std::collections::HashMap<String, grafeo_common::types::Value>,
308    ) -> Result<QueryResult> {
309        let session = self.session();
310        session.execute_with_params(query, params)
311    }
312
313    /// Executes a Cypher query and returns the result.
314    ///
315    /// # Errors
316    ///
317    /// Returns an error if the query fails.
318    #[cfg(feature = "cypher")]
319    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
320        let session = self.session();
321        session.execute_cypher(query)
322    }
323
324    /// Executes a Cypher query with parameters and returns the result.
325    ///
326    /// # Errors
327    ///
328    /// Returns an error if the query fails.
329    #[cfg(feature = "cypher")]
330    pub fn execute_cypher_with_params(
331        &self,
332        query: &str,
333        params: std::collections::HashMap<String, grafeo_common::types::Value>,
334    ) -> Result<QueryResult> {
335        use crate::query::processor::{QueryLanguage, QueryProcessor};
336
337        // Create processor
338        let processor = QueryProcessor::for_lpg(Arc::clone(&self.store));
339        processor.process(query, QueryLanguage::Cypher, Some(&params))
340    }
341
342    /// Executes a Gremlin query and returns the result.
343    ///
344    /// # Errors
345    ///
346    /// Returns an error if the query fails.
347    #[cfg(feature = "gremlin")]
348    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
349        let session = self.session();
350        session.execute_gremlin(query)
351    }
352
353    /// Executes a Gremlin query with parameters and returns the result.
354    ///
355    /// # Errors
356    ///
357    /// Returns an error if the query fails.
358    #[cfg(feature = "gremlin")]
359    pub fn execute_gremlin_with_params(
360        &self,
361        query: &str,
362        params: std::collections::HashMap<String, grafeo_common::types::Value>,
363    ) -> Result<QueryResult> {
364        let session = self.session();
365        session.execute_gremlin_with_params(query, params)
366    }
367
368    /// Executes a GraphQL query and returns the result.
369    ///
370    /// # Errors
371    ///
372    /// Returns an error if the query fails.
373    #[cfg(feature = "graphql")]
374    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
375        let session = self.session();
376        session.execute_graphql(query)
377    }
378
379    /// Executes a GraphQL query with parameters and returns the result.
380    ///
381    /// # Errors
382    ///
383    /// Returns an error if the query fails.
384    #[cfg(feature = "graphql")]
385    pub fn execute_graphql_with_params(
386        &self,
387        query: &str,
388        params: std::collections::HashMap<String, grafeo_common::types::Value>,
389    ) -> Result<QueryResult> {
390        let session = self.session();
391        session.execute_graphql_with_params(query, params)
392    }
393
394    /// Executes a SPARQL query and returns the result.
395    ///
396    /// SPARQL queries operate on the RDF triple store.
397    ///
398    /// # Errors
399    ///
400    /// Returns an error if the query fails.
401    ///
402    /// # Examples
403    ///
404    /// ```ignore
405    /// use grafeo_engine::GrafeoDB;
406    ///
407    /// let db = GrafeoDB::new_in_memory();
408    /// let result = db.execute_sparql("SELECT ?s ?p ?o WHERE { ?s ?p ?o }")?;
409    /// ```
410    #[cfg(all(feature = "sparql", feature = "rdf"))]
411    pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
412        use crate::query::{
413            Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
414        };
415
416        // Parse and translate the SPARQL query to a logical plan
417        let logical_plan = sparql_translator::translate(query)?;
418
419        // Optimize the plan
420        let optimizer = Optimizer::new();
421        let optimized_plan = optimizer.optimize(logical_plan)?;
422
423        // Convert to physical plan using RDF planner
424        let planner = RdfPlanner::new(Arc::clone(&self.rdf_store));
425        let mut physical_plan = planner.plan(&optimized_plan)?;
426
427        // Execute the plan
428        let executor = Executor::with_columns(physical_plan.columns.clone());
429        executor.execute(physical_plan.operator.as_mut())
430    }
431
432    /// Returns the RDF store.
433    ///
434    /// This provides direct access to the RDF store for triple operations.
435    #[cfg(feature = "rdf")]
436    #[must_use]
437    pub fn rdf_store(&self) -> &Arc<RdfStore> {
438        &self.rdf_store
439    }
440
441    /// Executes a query and returns a single scalar value.
442    ///
443    /// # Errors
444    ///
445    /// Returns an error if the query fails or doesn't return exactly one row.
446    pub fn query_scalar<T: FromValue>(&self, query: &str) -> Result<T> {
447        let result = self.execute(query)?;
448        result.scalar()
449    }
450
451    /// Returns the configuration.
452    #[must_use]
453    pub fn config(&self) -> &Config {
454        &self.config
455    }
456
457    /// Returns the underlying store.
458    ///
459    /// This provides direct access to the LPG store for algorithm implementations.
460    #[must_use]
461    pub fn store(&self) -> &Arc<LpgStore> {
462        &self.store
463    }
464
465    /// Returns the buffer manager for memory-aware operations.
466    #[must_use]
467    pub fn buffer_manager(&self) -> &Arc<BufferManager> {
468        &self.buffer_manager
469    }
470
471    /// Closes the database, flushing all pending writes.
472    ///
473    /// For persistent databases, this ensures everything is safely on disk.
474    /// Called automatically when the database is dropped, but you can call
475    /// it explicitly if you need to guarantee durability at a specific point.
476    ///
477    /// # Errors
478    ///
479    /// Returns an error if the WAL can't be flushed (check disk space/permissions).
480    pub fn close(&self) -> Result<()> {
481        let mut is_open = self.is_open.write();
482        if !*is_open {
483            return Ok(());
484        }
485
486        // Commit and checkpoint WAL
487        if let Some(ref wal) = self.wal {
488            let epoch = self.store.current_epoch();
489
490            // Use the last assigned transaction ID, or create a checkpoint-only tx
491            let checkpoint_tx = self.tx_manager.last_assigned_tx_id().unwrap_or_else(|| {
492                // No transactions have been started; begin one for checkpoint
493                self.tx_manager.begin()
494            });
495
496            // Log a TxCommit to mark all pending records as committed
497            wal.log(&WalRecord::TxCommit {
498                tx_id: checkpoint_tx,
499            })?;
500
501            // Then checkpoint
502            wal.checkpoint(checkpoint_tx, epoch)?;
503            wal.sync()?;
504        }
505
506        *is_open = false;
507        Ok(())
508    }
509
510    /// Returns the WAL manager if available.
511    #[must_use]
512    pub fn wal(&self) -> Option<&Arc<WalManager>> {
513        self.wal.as_ref()
514    }
515
516    /// Logs a WAL record if WAL is enabled.
517    fn log_wal(&self, record: &WalRecord) -> Result<()> {
518        if let Some(ref wal) = self.wal {
519            wal.log(record)?;
520        }
521        Ok(())
522    }
523
524    /// Returns the number of nodes in the database.
525    #[must_use]
526    pub fn node_count(&self) -> usize {
527        self.store.node_count()
528    }
529
530    /// Returns the number of edges in the database.
531    #[must_use]
532    pub fn edge_count(&self) -> usize {
533        self.store.edge_count()
534    }
535
536    /// Returns the number of distinct labels in the database.
537    #[must_use]
538    pub fn label_count(&self) -> usize {
539        self.store.label_count()
540    }
541
542    /// Returns the number of distinct property keys in the database.
543    #[must_use]
544    pub fn property_key_count(&self) -> usize {
545        self.store.property_key_count()
546    }
547
548    /// Returns the number of distinct edge types in the database.
549    #[must_use]
550    pub fn edge_type_count(&self) -> usize {
551        self.store.edge_type_count()
552    }
553
554    // === Node Operations ===
555
556    /// Creates a node with the given labels and returns its ID.
557    ///
558    /// Labels categorize nodes - think of them like tags. A node can have
559    /// multiple labels (e.g., `["Person", "Employee"]`).
560    ///
561    /// # Examples
562    ///
563    /// ```
564    /// use grafeo_engine::GrafeoDB;
565    ///
566    /// let db = GrafeoDB::new_in_memory();
567    /// let alice = db.create_node(&["Person"]);
568    /// let company = db.create_node(&["Company", "Startup"]);
569    /// ```
570    pub fn create_node(&self, labels: &[&str]) -> grafeo_common::types::NodeId {
571        let id = self.store.create_node(labels);
572
573        // Log to WAL if enabled
574        if let Err(e) = self.log_wal(&WalRecord::CreateNode {
575            id,
576            labels: labels.iter().map(|s| s.to_string()).collect(),
577        }) {
578            tracing::warn!("Failed to log CreateNode to WAL: {}", e);
579        }
580
581        id
582    }
583
584    /// Creates a new node with labels and properties.
585    ///
586    /// If WAL is enabled, the operation is logged for durability.
587    pub fn create_node_with_props(
588        &self,
589        labels: &[&str],
590        properties: impl IntoIterator<
591            Item = (
592                impl Into<grafeo_common::types::PropertyKey>,
593                impl Into<grafeo_common::types::Value>,
594            ),
595        >,
596    ) -> grafeo_common::types::NodeId {
597        // Collect properties first so we can log them to WAL
598        let props: Vec<(
599            grafeo_common::types::PropertyKey,
600            grafeo_common::types::Value,
601        )> = properties
602            .into_iter()
603            .map(|(k, v)| (k.into(), v.into()))
604            .collect();
605
606        let id = self
607            .store
608            .create_node_with_props(labels, props.iter().map(|(k, v)| (k.clone(), v.clone())));
609
610        // Log node creation to WAL
611        if let Err(e) = self.log_wal(&WalRecord::CreateNode {
612            id,
613            labels: labels.iter().map(|s| s.to_string()).collect(),
614        }) {
615            tracing::warn!("Failed to log CreateNode to WAL: {}", e);
616        }
617
618        // Log each property to WAL for full durability
619        for (key, value) in props {
620            if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
621                id,
622                key: key.to_string(),
623                value,
624            }) {
625                tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
626            }
627        }
628
629        id
630    }
631
632    /// Gets a node by ID.
633    #[must_use]
634    pub fn get_node(
635        &self,
636        id: grafeo_common::types::NodeId,
637    ) -> Option<grafeo_core::graph::lpg::Node> {
638        self.store.get_node(id)
639    }
640
641    /// Deletes a node and all its edges.
642    ///
643    /// If WAL is enabled, the operation is logged for durability.
644    pub fn delete_node(&self, id: grafeo_common::types::NodeId) -> bool {
645        let result = self.store.delete_node(id);
646
647        if result {
648            if let Err(e) = self.log_wal(&WalRecord::DeleteNode { id }) {
649                tracing::warn!("Failed to log DeleteNode to WAL: {}", e);
650            }
651        }
652
653        result
654    }
655
656    /// Sets a property on a node.
657    ///
658    /// If WAL is enabled, the operation is logged for durability.
659    pub fn set_node_property(
660        &self,
661        id: grafeo_common::types::NodeId,
662        key: &str,
663        value: grafeo_common::types::Value,
664    ) {
665        // Log to WAL first
666        if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
667            id,
668            key: key.to_string(),
669            value: value.clone(),
670        }) {
671            tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
672        }
673
674        self.store.set_node_property(id, key, value);
675    }
676
677    /// Adds a label to an existing node.
678    ///
679    /// Returns `true` if the label was added, `false` if the node doesn't exist
680    /// or already has the label.
681    ///
682    /// # Examples
683    ///
684    /// ```
685    /// use grafeo_engine::GrafeoDB;
686    ///
687    /// let db = GrafeoDB::new_in_memory();
688    /// let alice = db.create_node(&["Person"]);
689    ///
690    /// // Promote Alice to Employee
691    /// let added = db.add_node_label(alice, "Employee");
692    /// assert!(added);
693    /// ```
694    pub fn add_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
695        let result = self.store.add_label(id, label);
696
697        if result {
698            // Log to WAL if enabled
699            if let Err(e) = self.log_wal(&WalRecord::AddNodeLabel {
700                id,
701                label: label.to_string(),
702            }) {
703                tracing::warn!("Failed to log AddNodeLabel to WAL: {}", e);
704            }
705        }
706
707        result
708    }
709
710    /// Removes a label from a node.
711    ///
712    /// Returns `true` if the label was removed, `false` if the node doesn't exist
713    /// or doesn't have the label.
714    ///
715    /// # Examples
716    ///
717    /// ```
718    /// use grafeo_engine::GrafeoDB;
719    ///
720    /// let db = GrafeoDB::new_in_memory();
721    /// let alice = db.create_node(&["Person", "Employee"]);
722    ///
723    /// // Remove Employee status
724    /// let removed = db.remove_node_label(alice, "Employee");
725    /// assert!(removed);
726    /// ```
727    pub fn remove_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
728        let result = self.store.remove_label(id, label);
729
730        if result {
731            // Log to WAL if enabled
732            if let Err(e) = self.log_wal(&WalRecord::RemoveNodeLabel {
733                id,
734                label: label.to_string(),
735            }) {
736                tracing::warn!("Failed to log RemoveNodeLabel to WAL: {}", e);
737            }
738        }
739
740        result
741    }
742
743    /// Gets all labels for a node.
744    ///
745    /// Returns `None` if the node doesn't exist.
746    ///
747    /// # Examples
748    ///
749    /// ```
750    /// use grafeo_engine::GrafeoDB;
751    ///
752    /// let db = GrafeoDB::new_in_memory();
753    /// let alice = db.create_node(&["Person", "Employee"]);
754    ///
755    /// let labels = db.get_node_labels(alice).unwrap();
756    /// assert!(labels.contains(&"Person".to_string()));
757    /// assert!(labels.contains(&"Employee".to_string()));
758    /// ```
759    #[must_use]
760    pub fn get_node_labels(&self, id: grafeo_common::types::NodeId) -> Option<Vec<String>> {
761        self.store
762            .get_node(id)
763            .map(|node| node.labels.iter().map(|s| s.to_string()).collect())
764    }
765
766    // === Edge Operations ===
767
768    /// Creates an edge (relationship) between two nodes.
769    ///
770    /// Edges connect nodes and have a type that describes the relationship.
771    /// They're directed - the order of `src` and `dst` matters.
772    ///
773    /// # Examples
774    ///
775    /// ```
776    /// use grafeo_engine::GrafeoDB;
777    ///
778    /// let db = GrafeoDB::new_in_memory();
779    /// let alice = db.create_node(&["Person"]);
780    /// let bob = db.create_node(&["Person"]);
781    ///
782    /// // Alice knows Bob (directed: Alice -> Bob)
783    /// let edge = db.create_edge(alice, bob, "KNOWS");
784    /// ```
785    pub fn create_edge(
786        &self,
787        src: grafeo_common::types::NodeId,
788        dst: grafeo_common::types::NodeId,
789        edge_type: &str,
790    ) -> grafeo_common::types::EdgeId {
791        let id = self.store.create_edge(src, dst, edge_type);
792
793        // Log to WAL if enabled
794        if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
795            id,
796            src,
797            dst,
798            edge_type: edge_type.to_string(),
799        }) {
800            tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
801        }
802
803        id
804    }
805
806    /// Creates a new edge with properties.
807    ///
808    /// If WAL is enabled, the operation is logged for durability.
809    pub fn create_edge_with_props(
810        &self,
811        src: grafeo_common::types::NodeId,
812        dst: grafeo_common::types::NodeId,
813        edge_type: &str,
814        properties: impl IntoIterator<
815            Item = (
816                impl Into<grafeo_common::types::PropertyKey>,
817                impl Into<grafeo_common::types::Value>,
818            ),
819        >,
820    ) -> grafeo_common::types::EdgeId {
821        // Collect properties first so we can log them to WAL
822        let props: Vec<(
823            grafeo_common::types::PropertyKey,
824            grafeo_common::types::Value,
825        )> = properties
826            .into_iter()
827            .map(|(k, v)| (k.into(), v.into()))
828            .collect();
829
830        let id = self.store.create_edge_with_props(
831            src,
832            dst,
833            edge_type,
834            props.iter().map(|(k, v)| (k.clone(), v.clone())),
835        );
836
837        // Log edge creation to WAL
838        if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
839            id,
840            src,
841            dst,
842            edge_type: edge_type.to_string(),
843        }) {
844            tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
845        }
846
847        // Log each property to WAL for full durability
848        for (key, value) in props {
849            if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
850                id,
851                key: key.to_string(),
852                value,
853            }) {
854                tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
855            }
856        }
857
858        id
859    }
860
861    /// Gets an edge by ID.
862    #[must_use]
863    pub fn get_edge(
864        &self,
865        id: grafeo_common::types::EdgeId,
866    ) -> Option<grafeo_core::graph::lpg::Edge> {
867        self.store.get_edge(id)
868    }
869
870    /// Deletes an edge.
871    ///
872    /// If WAL is enabled, the operation is logged for durability.
873    pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
874        let result = self.store.delete_edge(id);
875
876        if result {
877            if let Err(e) = self.log_wal(&WalRecord::DeleteEdge { id }) {
878                tracing::warn!("Failed to log DeleteEdge to WAL: {}", e);
879            }
880        }
881
882        result
883    }
884
885    /// Sets a property on an edge.
886    ///
887    /// If WAL is enabled, the operation is logged for durability.
888    pub fn set_edge_property(
889        &self,
890        id: grafeo_common::types::EdgeId,
891        key: &str,
892        value: grafeo_common::types::Value,
893    ) {
894        // Log to WAL first
895        if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
896            id,
897            key: key.to_string(),
898            value: value.clone(),
899        }) {
900            tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
901        }
902        self.store.set_edge_property(id, key, value);
903    }
904
905    /// Removes a property from a node.
906    ///
907    /// Returns true if the property existed and was removed, false otherwise.
908    pub fn remove_node_property(&self, id: grafeo_common::types::NodeId, key: &str) -> bool {
909        // Note: RemoveProperty WAL records not yet implemented, but operation works in memory
910        self.store.remove_node_property(id, key).is_some()
911    }
912
913    /// Removes a property from an edge.
914    ///
915    /// Returns true if the property existed and was removed, false otherwise.
916    pub fn remove_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str) -> bool {
917        // Note: RemoveProperty WAL records not yet implemented, but operation works in memory
918        self.store.remove_edge_property(id, key).is_some()
919    }
920
921    // =========================================================================
922    // ADMIN API: Introspection
923    // =========================================================================
924
925    /// Returns true if this database is backed by a file (persistent).
926    ///
927    /// In-memory databases return false.
928    #[must_use]
929    pub fn is_persistent(&self) -> bool {
930        self.config.path.is_some()
931    }
932
933    /// Returns the database file path, if persistent.
934    ///
935    /// In-memory databases return None.
936    #[must_use]
937    pub fn path(&self) -> Option<&Path> {
938        self.config.path.as_deref()
939    }
940
941    /// Returns high-level database information.
942    ///
943    /// Includes node/edge counts, persistence status, and mode (LPG/RDF).
944    #[must_use]
945    pub fn info(&self) -> crate::admin::DatabaseInfo {
946        crate::admin::DatabaseInfo {
947            mode: crate::admin::DatabaseMode::Lpg,
948            node_count: self.store.node_count(),
949            edge_count: self.store.edge_count(),
950            is_persistent: self.is_persistent(),
951            path: self.config.path.clone(),
952            wal_enabled: self.config.wal_enabled,
953            version: env!("CARGO_PKG_VERSION").to_string(),
954        }
955    }
956
957    /// Returns detailed database statistics.
958    ///
959    /// Includes counts, memory usage, and index information.
960    #[must_use]
961    pub fn detailed_stats(&self) -> crate::admin::DatabaseStats {
962        let disk_bytes = self.config.path.as_ref().and_then(|p| {
963            if p.exists() {
964                Self::calculate_disk_usage(p).ok()
965            } else {
966                None
967            }
968        });
969
970        crate::admin::DatabaseStats {
971            node_count: self.store.node_count(),
972            edge_count: self.store.edge_count(),
973            label_count: self.store.label_count(),
974            edge_type_count: self.store.edge_type_count(),
975            property_key_count: self.store.property_key_count(),
976            index_count: 0, // TODO: implement index tracking
977            memory_bytes: self.buffer_manager.allocated(),
978            disk_bytes,
979        }
980    }
981
982    /// Calculates total disk usage for the database directory.
983    fn calculate_disk_usage(path: &Path) -> Result<usize> {
984        let mut total = 0usize;
985        if path.is_dir() {
986            for entry in std::fs::read_dir(path)? {
987                let entry = entry?;
988                let metadata = entry.metadata()?;
989                if metadata.is_file() {
990                    total += metadata.len() as usize;
991                } else if metadata.is_dir() {
992                    total += Self::calculate_disk_usage(&entry.path())?;
993                }
994            }
995        }
996        Ok(total)
997    }
998
999    /// Returns schema information (labels, edge types, property keys).
1000    ///
1001    /// For LPG mode, returns label and edge type information.
1002    /// For RDF mode, returns predicate and named graph information.
1003    #[must_use]
1004    pub fn schema(&self) -> crate::admin::SchemaInfo {
1005        let labels = self
1006            .store
1007            .all_labels()
1008            .into_iter()
1009            .map(|name| crate::admin::LabelInfo {
1010                name: name.clone(),
1011                count: self.store.nodes_with_label(&name).count(),
1012            })
1013            .collect();
1014
1015        let edge_types = self
1016            .store
1017            .all_edge_types()
1018            .into_iter()
1019            .map(|name| crate::admin::EdgeTypeInfo {
1020                name: name.clone(),
1021                count: self.store.edges_with_type(&name).count(),
1022            })
1023            .collect();
1024
1025        let property_keys = self.store.all_property_keys();
1026
1027        crate::admin::SchemaInfo::Lpg(crate::admin::LpgSchemaInfo {
1028            labels,
1029            edge_types,
1030            property_keys,
1031        })
1032    }
1033
1034    /// Returns RDF schema information.
1035    ///
1036    /// Only available when the RDF feature is enabled.
1037    #[cfg(feature = "rdf")]
1038    #[must_use]
1039    pub fn rdf_schema(&self) -> crate::admin::SchemaInfo {
1040        let stats = self.rdf_store.stats();
1041
1042        let predicates = self
1043            .rdf_store
1044            .predicates()
1045            .into_iter()
1046            .map(|predicate| {
1047                let count = self.rdf_store.triples_with_predicate(&predicate).len();
1048                crate::admin::PredicateInfo {
1049                    iri: predicate.to_string(),
1050                    count,
1051                }
1052            })
1053            .collect();
1054
1055        crate::admin::SchemaInfo::Rdf(crate::admin::RdfSchemaInfo {
1056            predicates,
1057            named_graphs: Vec::new(), // Named graphs not yet implemented in RdfStore
1058            subject_count: stats.subject_count,
1059            object_count: stats.object_count,
1060        })
1061    }
1062
1063    /// Validates database integrity.
1064    ///
1065    /// Checks for:
1066    /// - Dangling edge references (edges pointing to non-existent nodes)
1067    /// - Internal index consistency
1068    ///
1069    /// Returns a list of errors and warnings. Empty errors = valid.
1070    #[must_use]
1071    pub fn validate(&self) -> crate::admin::ValidationResult {
1072        let mut result = crate::admin::ValidationResult::default();
1073
1074        // Check for dangling edge references
1075        for edge in self.store.all_edges() {
1076            if self.store.get_node(edge.src).is_none() {
1077                result.errors.push(crate::admin::ValidationError {
1078                    code: "DANGLING_SRC".to_string(),
1079                    message: format!(
1080                        "Edge {} references non-existent source node {}",
1081                        edge.id.0, edge.src.0
1082                    ),
1083                    context: Some(format!("edge:{}", edge.id.0)),
1084                });
1085            }
1086            if self.store.get_node(edge.dst).is_none() {
1087                result.errors.push(crate::admin::ValidationError {
1088                    code: "DANGLING_DST".to_string(),
1089                    message: format!(
1090                        "Edge {} references non-existent destination node {}",
1091                        edge.id.0, edge.dst.0
1092                    ),
1093                    context: Some(format!("edge:{}", edge.id.0)),
1094                });
1095            }
1096        }
1097
1098        // Add warnings for potential issues
1099        if self.store.node_count() > 0 && self.store.edge_count() == 0 {
1100            result.warnings.push(crate::admin::ValidationWarning {
1101                code: "NO_EDGES".to_string(),
1102                message: "Database has nodes but no edges".to_string(),
1103                context: None,
1104            });
1105        }
1106
1107        result
1108    }
1109
1110    /// Returns WAL (Write-Ahead Log) status.
1111    ///
1112    /// Returns None if WAL is not enabled.
1113    #[must_use]
1114    pub fn wal_status(&self) -> crate::admin::WalStatus {
1115        if let Some(ref wal) = self.wal {
1116            crate::admin::WalStatus {
1117                enabled: true,
1118                path: self.config.path.as_ref().map(|p| p.join("wal")),
1119                size_bytes: wal.size_bytes(),
1120                record_count: wal.record_count() as usize,
1121                last_checkpoint: wal.last_checkpoint_timestamp(),
1122                current_epoch: self.store.current_epoch().as_u64(),
1123            }
1124        } else {
1125            crate::admin::WalStatus {
1126                enabled: false,
1127                path: None,
1128                size_bytes: 0,
1129                record_count: 0,
1130                last_checkpoint: None,
1131                current_epoch: self.store.current_epoch().as_u64(),
1132            }
1133        }
1134    }
1135
1136    /// Forces a WAL checkpoint.
1137    ///
1138    /// Flushes all pending WAL records to the main storage.
1139    ///
1140    /// # Errors
1141    ///
1142    /// Returns an error if the checkpoint fails.
1143    pub fn wal_checkpoint(&self) -> Result<()> {
1144        if let Some(ref wal) = self.wal {
1145            let epoch = self.store.current_epoch();
1146            let tx_id = self
1147                .tx_manager
1148                .last_assigned_tx_id()
1149                .unwrap_or_else(|| self.tx_manager.begin());
1150            wal.checkpoint(tx_id, epoch)?;
1151            wal.sync()?;
1152        }
1153        Ok(())
1154    }
1155
1156    // =========================================================================
1157    // ADMIN API: Persistence Control
1158    // =========================================================================
1159
1160    /// Saves the database to a file path.
1161    ///
1162    /// - If in-memory: creates a new persistent database at path
1163    /// - If file-backed: creates a copy at the new path
1164    ///
1165    /// The original database remains unchanged.
1166    ///
1167    /// # Errors
1168    ///
1169    /// Returns an error if the save operation fails.
1170    pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
1171        let path = path.as_ref();
1172
1173        // Create target database with WAL enabled
1174        let target_config = Config::persistent(path);
1175        let target = Self::with_config(target_config)?;
1176
1177        // Copy all nodes using WAL-enabled methods
1178        for node in self.store.all_nodes() {
1179            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
1180            target.store.create_node_with_id(node.id, &label_refs);
1181
1182            // Log to WAL
1183            target.log_wal(&WalRecord::CreateNode {
1184                id: node.id,
1185                labels: node.labels.iter().map(|s| s.to_string()).collect(),
1186            })?;
1187
1188            // Copy properties
1189            for (key, value) in node.properties {
1190                target
1191                    .store
1192                    .set_node_property(node.id, key.as_str(), value.clone());
1193                target.log_wal(&WalRecord::SetNodeProperty {
1194                    id: node.id,
1195                    key: key.to_string(),
1196                    value,
1197                })?;
1198            }
1199        }
1200
1201        // Copy all edges using WAL-enabled methods
1202        for edge in self.store.all_edges() {
1203            target
1204                .store
1205                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
1206
1207            // Log to WAL
1208            target.log_wal(&WalRecord::CreateEdge {
1209                id: edge.id,
1210                src: edge.src,
1211                dst: edge.dst,
1212                edge_type: edge.edge_type.to_string(),
1213            })?;
1214
1215            // Copy properties
1216            for (key, value) in edge.properties {
1217                target
1218                    .store
1219                    .set_edge_property(edge.id, key.as_str(), value.clone());
1220                target.log_wal(&WalRecord::SetEdgeProperty {
1221                    id: edge.id,
1222                    key: key.to_string(),
1223                    value,
1224                })?;
1225            }
1226        }
1227
1228        // Checkpoint and close the target database
1229        target.close()?;
1230
1231        Ok(())
1232    }
1233
1234    /// Creates an in-memory copy of this database.
1235    ///
1236    /// Returns a new database that is completely independent.
1237    /// Useful for:
1238    /// - Testing modifications without affecting the original
1239    /// - Faster operations when persistence isn't needed
1240    ///
1241    /// # Errors
1242    ///
1243    /// Returns an error if the copy operation fails.
1244    pub fn to_memory(&self) -> Result<Self> {
1245        let config = Config::in_memory();
1246        let target = Self::with_config(config)?;
1247
1248        // Copy all nodes
1249        for node in self.store.all_nodes() {
1250            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
1251            target.store.create_node_with_id(node.id, &label_refs);
1252
1253            // Copy properties
1254            for (key, value) in node.properties {
1255                target.store.set_node_property(node.id, key.as_str(), value);
1256            }
1257        }
1258
1259        // Copy all edges
1260        for edge in self.store.all_edges() {
1261            target
1262                .store
1263                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
1264
1265            // Copy properties
1266            for (key, value) in edge.properties {
1267                target.store.set_edge_property(edge.id, key.as_str(), value);
1268            }
1269        }
1270
1271        Ok(target)
1272    }
1273
1274    /// Opens a database file and loads it entirely into memory.
1275    ///
1276    /// The returned database has no connection to the original file.
1277    /// Changes will NOT be written back to the file.
1278    ///
1279    /// # Errors
1280    ///
1281    /// Returns an error if the file can't be opened or loaded.
1282    pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
1283        // Open the source database (triggers WAL recovery)
1284        let source = Self::open(path)?;
1285
1286        // Create in-memory copy
1287        let target = source.to_memory()?;
1288
1289        // Close the source (releases file handles)
1290        source.close()?;
1291
1292        Ok(target)
1293    }
1294
1295    // =========================================================================
1296    // ADMIN API: Iteration
1297    // =========================================================================
1298
1299    /// Returns an iterator over all nodes in the database.
1300    ///
1301    /// Useful for dump/export operations.
1302    pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
1303        self.store.all_nodes()
1304    }
1305
1306    /// Returns an iterator over all edges in the database.
1307    ///
1308    /// Useful for dump/export operations.
1309    pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
1310        self.store.all_edges()
1311    }
1312}
1313
1314impl Drop for GrafeoDB {
1315    fn drop(&mut self) {
1316        if let Err(e) = self.close() {
1317            tracing::error!("Error closing database: {}", e);
1318        }
1319    }
1320}
1321
1322/// The result of running a query.
1323///
1324/// Contains rows and columns, like a table. Use [`iter()`](Self::iter) to
1325/// loop through rows, or [`scalar()`](Self::scalar) if you expect a single value.
1326///
1327/// # Examples
1328///
1329/// ```
1330/// use grafeo_engine::GrafeoDB;
1331///
1332/// let db = GrafeoDB::new_in_memory();
1333/// db.create_node(&["Person"]);
1334///
1335/// let result = db.execute("MATCH (p:Person) RETURN count(p) AS total")?;
1336///
1337/// // Check what we got
1338/// println!("Columns: {:?}", result.columns);
1339/// println!("Rows: {}", result.row_count());
1340///
1341/// // Iterate through results
1342/// for row in result.iter() {
1343///     println!("{:?}", row);
1344/// }
1345/// # Ok::<(), grafeo_common::utils::error::Error>(())
1346/// ```
1347#[derive(Debug)]
1348pub struct QueryResult {
1349    /// Column names from the RETURN clause.
1350    pub columns: Vec<String>,
1351    /// Column types - useful for distinguishing NodeId/EdgeId from plain integers.
1352    pub column_types: Vec<grafeo_common::types::LogicalType>,
1353    /// The actual result rows.
1354    pub rows: Vec<Vec<grafeo_common::types::Value>>,
1355}
1356
1357impl QueryResult {
1358    /// Creates a new empty query result.
1359    #[must_use]
1360    pub fn new(columns: Vec<String>) -> Self {
1361        let len = columns.len();
1362        Self {
1363            columns,
1364            column_types: vec![grafeo_common::types::LogicalType::Any; len],
1365            rows: Vec::new(),
1366        }
1367    }
1368
1369    /// Creates a new empty query result with column types.
1370    #[must_use]
1371    pub fn with_types(
1372        columns: Vec<String>,
1373        column_types: Vec<grafeo_common::types::LogicalType>,
1374    ) -> Self {
1375        Self {
1376            columns,
1377            column_types,
1378            rows: Vec::new(),
1379        }
1380    }
1381
1382    /// Returns the number of rows.
1383    #[must_use]
1384    pub fn row_count(&self) -> usize {
1385        self.rows.len()
1386    }
1387
1388    /// Returns the number of columns.
1389    #[must_use]
1390    pub fn column_count(&self) -> usize {
1391        self.columns.len()
1392    }
1393
1394    /// Returns true if the result is empty.
1395    #[must_use]
1396    pub fn is_empty(&self) -> bool {
1397        self.rows.is_empty()
1398    }
1399
1400    /// Extracts a single value from the result.
1401    ///
1402    /// Use this when your query returns exactly one row with one column,
1403    /// like `RETURN count(n)` or `RETURN sum(p.amount)`.
1404    ///
1405    /// # Errors
1406    ///
1407    /// Returns an error if the result has multiple rows or columns.
1408    pub fn scalar<T: FromValue>(&self) -> Result<T> {
1409        if self.rows.len() != 1 || self.columns.len() != 1 {
1410            return Err(grafeo_common::utils::error::Error::InvalidValue(
1411                "Expected single value".to_string(),
1412            ));
1413        }
1414        T::from_value(&self.rows[0][0])
1415    }
1416
1417    /// Returns an iterator over the rows.
1418    pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1419        self.rows.iter()
1420    }
1421}
1422
1423/// Converts a [`Value`](grafeo_common::types::Value) to a concrete Rust type.
1424///
1425/// Implemented for common types like `i64`, `f64`, `String`, and `bool`.
1426/// Used by [`QueryResult::scalar()`] to extract typed values.
1427pub trait FromValue: Sized {
1428    /// Attempts the conversion, returning an error on type mismatch.
1429    fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1430}
1431
1432impl FromValue for i64 {
1433    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1434        value
1435            .as_int64()
1436            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1437                expected: "INT64".to_string(),
1438                found: value.type_name().to_string(),
1439            })
1440    }
1441}
1442
1443impl FromValue for f64 {
1444    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1445        value
1446            .as_float64()
1447            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1448                expected: "FLOAT64".to_string(),
1449                found: value.type_name().to_string(),
1450            })
1451    }
1452}
1453
1454impl FromValue for String {
1455    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1456        value.as_str().map(String::from).ok_or_else(|| {
1457            grafeo_common::utils::error::Error::TypeMismatch {
1458                expected: "STRING".to_string(),
1459                found: value.type_name().to_string(),
1460            }
1461        })
1462    }
1463}
1464
1465impl FromValue for bool {
1466    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1467        value
1468            .as_bool()
1469            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1470                expected: "BOOL".to_string(),
1471                found: value.type_name().to_string(),
1472            })
1473    }
1474}
1475
1476#[cfg(test)]
1477mod tests {
1478    use super::*;
1479
1480    #[test]
1481    fn test_create_in_memory_database() {
1482        let db = GrafeoDB::new_in_memory();
1483        assert_eq!(db.node_count(), 0);
1484        assert_eq!(db.edge_count(), 0);
1485    }
1486
1487    #[test]
1488    fn test_database_config() {
1489        let config = Config::in_memory().with_threads(4).with_query_logging();
1490
1491        let db = GrafeoDB::with_config(config).unwrap();
1492        assert_eq!(db.config().threads, 4);
1493        assert!(db.config().query_logging);
1494    }
1495
1496    #[test]
1497    fn test_database_session() {
1498        let db = GrafeoDB::new_in_memory();
1499        let _session = db.session();
1500        // Session should be created successfully
1501    }
1502
1503    #[test]
1504    fn test_persistent_database_recovery() {
1505        use grafeo_common::types::Value;
1506        use tempfile::tempdir;
1507
1508        let dir = tempdir().unwrap();
1509        let db_path = dir.path().join("test_db");
1510
1511        // Create database and add some data
1512        {
1513            let db = GrafeoDB::open(&db_path).unwrap();
1514
1515            let alice = db.create_node(&["Person"]);
1516            db.set_node_property(alice, "name", Value::from("Alice"));
1517
1518            let bob = db.create_node(&["Person"]);
1519            db.set_node_property(bob, "name", Value::from("Bob"));
1520
1521            let _edge = db.create_edge(alice, bob, "KNOWS");
1522
1523            // Explicitly close to flush WAL
1524            db.close().unwrap();
1525        }
1526
1527        // Reopen and verify data was recovered
1528        {
1529            let db = GrafeoDB::open(&db_path).unwrap();
1530
1531            assert_eq!(db.node_count(), 2);
1532            assert_eq!(db.edge_count(), 1);
1533
1534            // Verify nodes exist
1535            let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1536            assert!(node0.is_some());
1537
1538            let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1539            assert!(node1.is_some());
1540        }
1541    }
1542
1543    #[test]
1544    fn test_wal_logging() {
1545        use tempfile::tempdir;
1546
1547        let dir = tempdir().unwrap();
1548        let db_path = dir.path().join("wal_test_db");
1549
1550        let db = GrafeoDB::open(&db_path).unwrap();
1551
1552        // Create some data
1553        let node = db.create_node(&["Test"]);
1554        db.delete_node(node);
1555
1556        // WAL should have records
1557        if let Some(wal) = db.wal() {
1558            assert!(wal.record_count() > 0);
1559        }
1560
1561        db.close().unwrap();
1562    }
1563}