Skip to main content

kyu_api/
database.rs

1//! Database — top-level entry point owning catalog + storage + transactions.
2
3use std::path::{Path, PathBuf};
4use std::sync::{Arc, RwLock};
5
6use kyu_catalog::Catalog;
7use kyu_common::{KyuError, KyuResult};
8use kyu_extension::Extension;
9use kyu_transaction::{Checkpointer, TransactionManager, Wal};
10
11use crate::connection::Connection;
12use crate::storage::NodeGroupStorage;
13
14/// A graph database instance.
15///
16/// Owns the catalog (schema), columnar storage, transaction manager, and WAL.
17/// Create connections via [`Database::connect`] to execute Cypher queries and DDL.
18pub struct Database {
19    catalog: Arc<Catalog>,
20    storage: Arc<RwLock<NodeGroupStorage>>,
21    txn_mgr: Arc<TransactionManager>,
22    wal: Arc<Wal>,
23    checkpointer: Arc<Checkpointer>,
24    extensions: Arc<Vec<Box<dyn Extension>>>,
25    /// Database directory path for persistent databases, `None` for in-memory.
26    db_path: Option<PathBuf>,
27}
28
29impl Database {
30    /// Create a new in-memory database with empty catalog and storage.
31    pub fn in_memory() -> Self {
32        let txn_mgr = Arc::new(TransactionManager::new());
33        let wal = Arc::new(Wal::in_memory());
34        let checkpointer = Arc::new(Checkpointer::new(Arc::clone(&txn_mgr), Arc::clone(&wal)));
35        Self {
36            catalog: Arc::new(Catalog::new()),
37            storage: Arc::new(RwLock::new(NodeGroupStorage::new())),
38            txn_mgr,
39            wal,
40            checkpointer,
41            extensions: Arc::new(Vec::new()),
42            db_path: None,
43        }
44    }
45
46    /// Open a persistent database at the given directory path.
47    /// Creates the directory if it doesn't exist.
48    /// Loads catalog and storage from checkpoint, then replays WAL for DDL recovery.
49    pub fn open(path: &Path) -> KyuResult<Self> {
50        use crate::persistence;
51        use kyu_transaction::{WalReplayer, WalRecord};
52
53        std::fs::create_dir_all(path).map_err(|e| {
54            KyuError::Storage(format!("cannot create database directory '{}': {e}", path.display()))
55        })?;
56
57        // Load catalog from checkpoint, or start fresh.
58        let mut catalog_content = match persistence::load_catalog(path)? {
59            Some(mut c) => {
60                c.rebuild_indexes();
61                c
62            }
63            None => kyu_catalog::CatalogContent::new(),
64        };
65
66        // Replay WAL for DDL recovery: apply the last CatalogSnapshot from
67        // committed transactions to recover any DDL after the last checkpoint.
68        let wal_path = path.join("wal.bin");
69        if wal_path.exists() {
70            let replayer = WalReplayer::new(&wal_path);
71            if let Ok(result) = replayer.replay() {
72                // Find the last CatalogSnapshot in committed records.
73                for record in result.committed_records.iter().rev() {
74                    if let WalRecord::CatalogSnapshot { json_bytes } = record
75                        && let Ok(json) = std::str::from_utf8(json_bytes)
76                        && let Ok(recovered) = kyu_catalog::CatalogContent::deserialize_json(json)
77                    {
78                        catalog_content = recovered;
79                        break;
80                    }
81                }
82            }
83        }
84
85        // Load storage from checkpoint (uses recovered catalog for schema).
86        let storage = persistence::load_storage(path, &catalog_content)?;
87
88        let catalog = Arc::new(Catalog::from_content(catalog_content));
89        let storage = Arc::new(RwLock::new(storage));
90
91        let wal = Wal::new(path, false).map_err(|e| {
92            KyuError::Storage(format!("cannot open WAL at '{}': {e}", path.display()))
93        })?;
94        let txn_mgr = Arc::new(TransactionManager::new());
95        let wal = Arc::new(wal);
96
97        // Build the flush callback for the checkpointer.
98        let flush_catalog = Arc::clone(&catalog);
99        let flush_storage = Arc::clone(&storage);
100        let flush_path = path.to_path_buf();
101        let flush_fn = Arc::new(move || {
102            let cat = flush_catalog.read();
103            let stor = flush_storage.read().map_err(|e| format!("storage lock: {e}"))?;
104            persistence::save_catalog(&flush_path, &cat)
105                .map_err(|e| format!("{e}"))?;
106            persistence::save_storage(&flush_path, &stor, &cat)
107                .map_err(|e| format!("{e}"))?;
108            Ok(())
109        });
110
111        let checkpointer = Arc::new(
112            Checkpointer::new(Arc::clone(&txn_mgr), Arc::clone(&wal))
113                .with_flush(flush_fn),
114        );
115
116        Ok(Self {
117            catalog,
118            storage,
119            txn_mgr,
120            wal,
121            checkpointer,
122            extensions: Arc::new(Vec::new()),
123            db_path: Some(path.to_path_buf()),
124        })
125    }
126
127    /// Register an extension. Must be called before creating connections.
128    pub fn register_extension(&mut self, ext: Box<dyn Extension>) {
129        Arc::get_mut(&mut self.extensions)
130            .expect("cannot register extension while connections exist")
131            .push(ext);
132    }
133
134    /// Create a connection to this database.
135    pub fn connect(&self) -> Connection {
136        Connection::new(
137            Arc::clone(&self.catalog),
138            Arc::clone(&self.storage),
139            Arc::clone(&self.txn_mgr),
140            Arc::clone(&self.wal),
141            Arc::clone(&self.checkpointer),
142            Arc::clone(&self.extensions),
143        )
144    }
145
146    /// Manually trigger a checkpoint.
147    pub fn checkpoint(&self) -> KyuResult<u64> {
148        self.checkpointer.checkpoint().map_err(|e| {
149            KyuError::Transaction(format!("checkpoint failed: {e}"))
150        })
151    }
152
153    /// Get a reference to the underlying storage (for direct insertion).
154    pub fn storage(&self) -> &Arc<RwLock<NodeGroupStorage>> {
155        &self.storage
156    }
157
158    /// Get a reference to the underlying catalog.
159    pub fn catalog(&self) -> &Arc<Catalog> {
160        &self.catalog
161    }
162
163    /// Get a reference to the transaction manager.
164    pub fn txn_manager(&self) -> &Arc<TransactionManager> {
165        &self.txn_mgr
166    }
167
168    /// Get a reference to the WAL.
169    pub fn wal(&self) -> &Arc<Wal> {
170        &self.wal
171    }
172}
173
174impl Drop for Database {
175    fn drop(&mut self) {
176        // For persistent databases, flush state to disk on orderly shutdown.
177        if self.db_path.is_some() {
178            let _ = self.checkpointer.checkpoint();
179        }
180    }
181}