1use std::path::{Path, PathBuf};
4use std::sync::{Arc, RwLock};
5
6use kyu_catalog::Catalog;
7use kyu_common::{KyuError, KyuResult};
8use kyu_extension::Extension;
9use kyu_transaction::{Checkpointer, TransactionManager, Wal};
10
11use crate::connection::Connection;
12use crate::storage::NodeGroupStorage;
13
14pub struct Database {
19 catalog: Arc<Catalog>,
20 storage: Arc<RwLock<NodeGroupStorage>>,
21 txn_mgr: Arc<TransactionManager>,
22 wal: Arc<Wal>,
23 checkpointer: Arc<Checkpointer>,
24 extensions: Arc<Vec<Box<dyn Extension>>>,
25 db_path: Option<PathBuf>,
27}
28
29impl Database {
30 pub fn in_memory() -> Self {
32 let txn_mgr = Arc::new(TransactionManager::new());
33 let wal = Arc::new(Wal::in_memory());
34 let checkpointer = Arc::new(Checkpointer::new(Arc::clone(&txn_mgr), Arc::clone(&wal)));
35 Self {
36 catalog: Arc::new(Catalog::new()),
37 storage: Arc::new(RwLock::new(NodeGroupStorage::new())),
38 txn_mgr,
39 wal,
40 checkpointer,
41 extensions: Arc::new(Vec::new()),
42 db_path: None,
43 }
44 }
45
46 pub fn open(path: &Path) -> KyuResult<Self> {
50 use crate::persistence;
51 use kyu_transaction::{WalRecord, WalReplayer};
52
53 std::fs::create_dir_all(path).map_err(|e| {
54 KyuError::Storage(format!(
55 "cannot create database directory '{}': {e}",
56 path.display()
57 ))
58 })?;
59
60 let mut catalog_content = match persistence::load_catalog(path)? {
62 Some(mut c) => {
63 c.rebuild_indexes();
64 c
65 }
66 None => kyu_catalog::CatalogContent::new(),
67 };
68
69 let wal_path = path.join("wal.bin");
72 if wal_path.exists() {
73 let replayer = WalReplayer::new(&wal_path);
74 if let Ok(result) = replayer.replay() {
75 for record in result.committed_records.iter().rev() {
77 if let WalRecord::CatalogSnapshot { json_bytes } = record
78 && let Ok(json) = std::str::from_utf8(json_bytes)
79 && let Ok(recovered) = kyu_catalog::CatalogContent::deserialize_json(json)
80 {
81 catalog_content = recovered;
82 break;
83 }
84 }
85 }
86 }
87
88 let storage = persistence::load_storage(path, &catalog_content)?;
90
91 let catalog = Arc::new(Catalog::from_content(catalog_content));
92 let storage = Arc::new(RwLock::new(storage));
93
94 let wal = Wal::new(path, false).map_err(|e| {
95 KyuError::Storage(format!("cannot open WAL at '{}': {e}", path.display()))
96 })?;
97 let txn_mgr = Arc::new(TransactionManager::new());
98 let wal = Arc::new(wal);
99
100 let flush_catalog = Arc::clone(&catalog);
102 let flush_storage = Arc::clone(&storage);
103 let flush_path = path.to_path_buf();
104 let flush_fn = Arc::new(move || {
105 let cat = flush_catalog.read();
106 let stor = flush_storage
107 .read()
108 .map_err(|e| format!("storage lock: {e}"))?;
109 persistence::save_catalog(&flush_path, &cat).map_err(|e| format!("{e}"))?;
110 persistence::save_storage(&flush_path, &stor, &cat).map_err(|e| format!("{e}"))?;
111 Ok(())
112 });
113
114 let checkpointer = Arc::new(
115 Checkpointer::new(Arc::clone(&txn_mgr), Arc::clone(&wal)).with_flush(flush_fn),
116 );
117
118 Ok(Self {
119 catalog,
120 storage,
121 txn_mgr,
122 wal,
123 checkpointer,
124 extensions: Arc::new(Vec::new()),
125 db_path: Some(path.to_path_buf()),
126 })
127 }
128
129 pub fn register_extension(&mut self, ext: Box<dyn Extension>) {
131 Arc::get_mut(&mut self.extensions)
132 .expect("cannot register extension while connections exist")
133 .push(ext);
134 }
135
136 pub fn connect(&self) -> Connection {
138 Connection::new(
139 Arc::clone(&self.catalog),
140 Arc::clone(&self.storage),
141 Arc::clone(&self.txn_mgr),
142 Arc::clone(&self.wal),
143 Arc::clone(&self.checkpointer),
144 Arc::clone(&self.extensions),
145 )
146 }
147
148 pub fn checkpoint(&self) -> KyuResult<u64> {
150 self.checkpointer
151 .checkpoint()
152 .map_err(|e| KyuError::Transaction(format!("checkpoint failed: {e}")))
153 }
154
155 pub fn storage(&self) -> &Arc<RwLock<NodeGroupStorage>> {
157 &self.storage
158 }
159
160 pub fn catalog(&self) -> &Arc<Catalog> {
162 &self.catalog
163 }
164
165 pub fn txn_manager(&self) -> &Arc<TransactionManager> {
167 &self.txn_mgr
168 }
169
170 pub fn wal(&self) -> &Arc<Wal> {
172 &self.wal
173 }
174}
175
176impl Drop for Database {
177 fn drop(&mut self) {
178 if self.db_path.is_some() {
180 let _ = self.checkpointer.checkpoint();
181 }
182 }
183}