Skip to main content

kyu_api/
database.rs

1//! Database — top-level entry point owning catalog + storage + transactions.
2
3use std::path::{Path, PathBuf};
4use std::sync::{Arc, RwLock};
5
6use kyu_catalog::Catalog;
7use kyu_common::{KyuError, KyuResult};
8use kyu_extension::Extension;
9use kyu_transaction::{Checkpointer, TransactionManager, Wal};
10
11use crate::connection::Connection;
12use crate::storage::NodeGroupStorage;
13
14/// A graph database instance.
15///
16/// Owns the catalog (schema), columnar storage, transaction manager, and WAL.
17/// Create connections via [`Database::connect`] to execute Cypher queries and DDL.
18pub struct Database {
19    catalog: Arc<Catalog>,
20    storage: Arc<RwLock<NodeGroupStorage>>,
21    txn_mgr: Arc<TransactionManager>,
22    wal: Arc<Wal>,
23    checkpointer: Arc<Checkpointer>,
24    extensions: Arc<Vec<Box<dyn Extension>>>,
25    /// Database directory path for persistent databases, `None` for in-memory.
26    db_path: Option<PathBuf>,
27}
28
29impl Database {
30    /// Create a new in-memory database with empty catalog and storage.
31    pub fn in_memory() -> Self {
32        let txn_mgr = Arc::new(TransactionManager::new());
33        let wal = Arc::new(Wal::in_memory());
34        let checkpointer = Arc::new(Checkpointer::new(Arc::clone(&txn_mgr), Arc::clone(&wal)));
35        Self {
36            catalog: Arc::new(Catalog::new()),
37            storage: Arc::new(RwLock::new(NodeGroupStorage::new())),
38            txn_mgr,
39            wal,
40            checkpointer,
41            extensions: Arc::new(Vec::new()),
42            db_path: None,
43        }
44    }
45
46    /// Open a persistent database at the given directory path.
47    /// Creates the directory if it doesn't exist.
48    /// Loads catalog and storage from checkpoint, then replays WAL for DDL recovery.
49    pub fn open(path: &Path) -> KyuResult<Self> {
50        use crate::persistence;
51        use kyu_transaction::{WalRecord, WalReplayer};
52
53        std::fs::create_dir_all(path).map_err(|e| {
54            KyuError::Storage(format!(
55                "cannot create database directory '{}': {e}",
56                path.display()
57            ))
58        })?;
59
60        // Load catalog from checkpoint, or start fresh.
61        let mut catalog_content = match persistence::load_catalog(path)? {
62            Some(mut c) => {
63                c.rebuild_indexes();
64                c
65            }
66            None => kyu_catalog::CatalogContent::new(),
67        };
68
69        // Replay WAL for DDL recovery: apply the last CatalogSnapshot from
70        // committed transactions to recover any DDL after the last checkpoint.
71        let wal_path = path.join("wal.bin");
72        if wal_path.exists() {
73            let replayer = WalReplayer::new(&wal_path);
74            if let Ok(result) = replayer.replay() {
75                // Find the last CatalogSnapshot in committed records.
76                for record in result.committed_records.iter().rev() {
77                    if let WalRecord::CatalogSnapshot { json_bytes } = record
78                        && let Ok(json) = std::str::from_utf8(json_bytes)
79                        && let Ok(recovered) = kyu_catalog::CatalogContent::deserialize_json(json)
80                    {
81                        catalog_content = recovered;
82                        break;
83                    }
84                }
85            }
86        }
87
88        // Load storage from checkpoint (uses recovered catalog for schema).
89        let storage = persistence::load_storage(path, &catalog_content)?;
90
91        let catalog = Arc::new(Catalog::from_content(catalog_content));
92        let storage = Arc::new(RwLock::new(storage));
93
94        let wal = Wal::new(path, false).map_err(|e| {
95            KyuError::Storage(format!("cannot open WAL at '{}': {e}", path.display()))
96        })?;
97        let txn_mgr = Arc::new(TransactionManager::new());
98        let wal = Arc::new(wal);
99
100        // Build the flush callback for the checkpointer.
101        let flush_catalog = Arc::clone(&catalog);
102        let flush_storage = Arc::clone(&storage);
103        let flush_path = path.to_path_buf();
104        let flush_fn = Arc::new(move || {
105            let cat = flush_catalog.read();
106            let stor = flush_storage
107                .read()
108                .map_err(|e| format!("storage lock: {e}"))?;
109            persistence::save_catalog(&flush_path, &cat).map_err(|e| format!("{e}"))?;
110            persistence::save_storage(&flush_path, &stor, &cat).map_err(|e| format!("{e}"))?;
111            Ok(())
112        });
113
114        let checkpointer = Arc::new(
115            Checkpointer::new(Arc::clone(&txn_mgr), Arc::clone(&wal)).with_flush(flush_fn),
116        );
117
118        Ok(Self {
119            catalog,
120            storage,
121            txn_mgr,
122            wal,
123            checkpointer,
124            extensions: Arc::new(Vec::new()),
125            db_path: Some(path.to_path_buf()),
126        })
127    }
128
129    /// Register an extension. Must be called before creating connections.
130    pub fn register_extension(&mut self, ext: Box<dyn Extension>) {
131        Arc::get_mut(&mut self.extensions)
132            .expect("cannot register extension while connections exist")
133            .push(ext);
134    }
135
136    /// Create a connection to this database.
137    pub fn connect(&self) -> Connection {
138        Connection::new(
139            Arc::clone(&self.catalog),
140            Arc::clone(&self.storage),
141            Arc::clone(&self.txn_mgr),
142            Arc::clone(&self.wal),
143            Arc::clone(&self.checkpointer),
144            Arc::clone(&self.extensions),
145        )
146    }
147
148    /// Manually trigger a checkpoint.
149    pub fn checkpoint(&self) -> KyuResult<u64> {
150        self.checkpointer
151            .checkpoint()
152            .map_err(|e| KyuError::Transaction(format!("checkpoint failed: {e}")))
153    }
154
155    /// Get a reference to the underlying storage (for direct insertion).
156    pub fn storage(&self) -> &Arc<RwLock<NodeGroupStorage>> {
157        &self.storage
158    }
159
160    /// Get a reference to the underlying catalog.
161    pub fn catalog(&self) -> &Arc<Catalog> {
162        &self.catalog
163    }
164
165    /// Get a reference to the transaction manager.
166    pub fn txn_manager(&self) -> &Arc<TransactionManager> {
167        &self.txn_mgr
168    }
169
170    /// Get a reference to the WAL.
171    pub fn wal(&self) -> &Arc<Wal> {
172        &self.wal
173    }
174}
175
176impl Drop for Database {
177    fn drop(&mut self) {
178        // For persistent databases, flush state to disk on orderly shutdown.
179        if self.db_path.is_some() {
180            let _ = self.checkpointer.checkpoint();
181        }
182    }
183}