1use std::path::{Path, PathBuf};
4use std::sync::{Arc, RwLock};
5
6use kyu_catalog::Catalog;
7use kyu_common::{KyuError, KyuResult};
8use kyu_extension::Extension;
9use kyu_transaction::{Checkpointer, TransactionManager, Wal};
10
11use crate::connection::Connection;
12use crate::storage::NodeGroupStorage;
13
14pub struct Database {
19 catalog: Arc<Catalog>,
20 storage: Arc<RwLock<NodeGroupStorage>>,
21 txn_mgr: Arc<TransactionManager>,
22 wal: Arc<Wal>,
23 checkpointer: Arc<Checkpointer>,
24 extensions: Arc<Vec<Box<dyn Extension>>>,
25 db_path: Option<PathBuf>,
27}
28
29impl Database {
30 pub fn in_memory() -> Self {
32 let txn_mgr = Arc::new(TransactionManager::new());
33 let wal = Arc::new(Wal::in_memory());
34 let checkpointer = Arc::new(Checkpointer::new(Arc::clone(&txn_mgr), Arc::clone(&wal)));
35 Self {
36 catalog: Arc::new(Catalog::new()),
37 storage: Arc::new(RwLock::new(NodeGroupStorage::new())),
38 txn_mgr,
39 wal,
40 checkpointer,
41 extensions: Arc::new(Vec::new()),
42 db_path: None,
43 }
44 }
45
46 pub fn open(path: &Path) -> KyuResult<Self> {
50 use crate::persistence;
51 use kyu_transaction::{WalReplayer, WalRecord};
52
53 std::fs::create_dir_all(path).map_err(|e| {
54 KyuError::Storage(format!("cannot create database directory '{}': {e}", path.display()))
55 })?;
56
57 let mut catalog_content = match persistence::load_catalog(path)? {
59 Some(mut c) => {
60 c.rebuild_indexes();
61 c
62 }
63 None => kyu_catalog::CatalogContent::new(),
64 };
65
66 let wal_path = path.join("wal.bin");
69 if wal_path.exists() {
70 let replayer = WalReplayer::new(&wal_path);
71 if let Ok(result) = replayer.replay() {
72 for record in result.committed_records.iter().rev() {
74 if let WalRecord::CatalogSnapshot { json_bytes } = record
75 && let Ok(json) = std::str::from_utf8(json_bytes)
76 && let Ok(recovered) = kyu_catalog::CatalogContent::deserialize_json(json)
77 {
78 catalog_content = recovered;
79 break;
80 }
81 }
82 }
83 }
84
85 let storage = persistence::load_storage(path, &catalog_content)?;
87
88 let catalog = Arc::new(Catalog::from_content(catalog_content));
89 let storage = Arc::new(RwLock::new(storage));
90
91 let wal = Wal::new(path, false).map_err(|e| {
92 KyuError::Storage(format!("cannot open WAL at '{}': {e}", path.display()))
93 })?;
94 let txn_mgr = Arc::new(TransactionManager::new());
95 let wal = Arc::new(wal);
96
97 let flush_catalog = Arc::clone(&catalog);
99 let flush_storage = Arc::clone(&storage);
100 let flush_path = path.to_path_buf();
101 let flush_fn = Arc::new(move || {
102 let cat = flush_catalog.read();
103 let stor = flush_storage.read().map_err(|e| format!("storage lock: {e}"))?;
104 persistence::save_catalog(&flush_path, &cat)
105 .map_err(|e| format!("{e}"))?;
106 persistence::save_storage(&flush_path, &stor, &cat)
107 .map_err(|e| format!("{e}"))?;
108 Ok(())
109 });
110
111 let checkpointer = Arc::new(
112 Checkpointer::new(Arc::clone(&txn_mgr), Arc::clone(&wal))
113 .with_flush(flush_fn),
114 );
115
116 Ok(Self {
117 catalog,
118 storage,
119 txn_mgr,
120 wal,
121 checkpointer,
122 extensions: Arc::new(Vec::new()),
123 db_path: Some(path.to_path_buf()),
124 })
125 }
126
127 pub fn register_extension(&mut self, ext: Box<dyn Extension>) {
129 Arc::get_mut(&mut self.extensions)
130 .expect("cannot register extension while connections exist")
131 .push(ext);
132 }
133
134 pub fn connect(&self) -> Connection {
136 Connection::new(
137 Arc::clone(&self.catalog),
138 Arc::clone(&self.storage),
139 Arc::clone(&self.txn_mgr),
140 Arc::clone(&self.wal),
141 Arc::clone(&self.checkpointer),
142 Arc::clone(&self.extensions),
143 )
144 }
145
146 pub fn checkpoint(&self) -> KyuResult<u64> {
148 self.checkpointer.checkpoint().map_err(|e| {
149 KyuError::Transaction(format!("checkpoint failed: {e}"))
150 })
151 }
152
153 pub fn storage(&self) -> &Arc<RwLock<NodeGroupStorage>> {
155 &self.storage
156 }
157
158 pub fn catalog(&self) -> &Arc<Catalog> {
160 &self.catalog
161 }
162
163 pub fn txn_manager(&self) -> &Arc<TransactionManager> {
165 &self.txn_mgr
166 }
167
168 pub fn wal(&self) -> &Arc<Wal> {
170 &self.wal
171 }
172}
173
174impl Drop for Database {
175 fn drop(&mut self) {
176 if self.db_path.is_some() {
178 let _ = self.checkpointer.checkpoint();
179 }
180 }
181}