1mod btree;
12mod buffer_pool;
13pub mod compression;
14mod eviction;
15mod file;
16mod freelist;
17mod page;
18mod wal;
19
20pub use btree::BTree;
21pub use buffer_pool::{BufferPool, BufferPoolStats, BufferPoolStatsSnapshot, PageGuard};
22pub use compression::{
23 create_compressor, CompressedPageHeader, CompressionStats, CompressionStatsSnapshot,
24 CompressionType, Compressor, PageCompressor,
25};
26pub use eviction::{
27 ClockEviction, EvictionPolicy, EvictionPolicyFactory, EvictionPolicyType, LirsEviction,
28 Lru2Eviction,
29};
30pub use file::FileManager;
31pub use freelist::FreeList;
32pub use page::{Page, PageType, SlotEntry};
33pub use wal::{Wal, WalRecord, WalRecordType, WalStats, WalStatsSnapshot};
34
35#[derive(Debug, Clone, Default)]
37pub struct RecoveryStats {
38 pub wal_records: u64,
40 pub committed_txns: usize,
42 pub aborted_txns: usize,
44 pub pages_recovered: u64,
46}
47
48#[derive(Debug, Clone)]
53pub struct StorageQuota {
54 pub used_bytes: u64,
56 pub limit_bytes: Option<u64>,
58 pub wal_used_bytes: u64,
60 pub wal_limit_bytes: Option<u64>,
62 pub disk_available_bytes: u64,
64}
65
66impl StorageQuota {
67 pub fn remaining(&self) -> Option<u64> {
69 self.limit_bytes
70 .map(|limit| limit.saturating_sub(self.used_bytes))
71 }
72
73 pub fn usage_percent(&self) -> Option<f64> {
75 self.limit_bytes.map(|limit| {
76 if limit == 0 {
77 100.0
78 } else {
79 (self.used_bytes as f64 / limit as f64) * 100.0
80 }
81 })
82 }
83
84 pub fn is_exceeded(&self) -> bool {
86 self.limit_bytes
87 .is_some_and(|limit| self.used_bytes >= limit)
88 }
89
90 pub fn is_wal_exceeded(&self) -> bool {
92 self.wal_limit_bytes
93 .is_some_and(|limit| self.wal_used_bytes >= limit)
94 }
95}
96
97use featherdb_core::{constants, Config, Lsn, PageId, Result};
98use parking_lot::RwLock;
99use std::collections::HashSet;
100use std::sync::Arc;
101
102pub struct StorageEngine {
104 config: Config,
105 file_manager: Arc<FileManager>,
106 buffer_pool: Arc<BufferPool>,
107 wal: Arc<RwLock<Wal>>,
108 free_list: Arc<RwLock<FreeList>>,
109}
110
111impl StorageEngine {
112 pub fn open(config: Config) -> Result<Self> {
114 let file_manager = Arc::new(FileManager::open(&config)?);
115
116 file_manager.init_if_needed(&config)?;
118
119 let buffer_pool = Arc::new(BufferPool::with_policy(
121 config.buffer_pool_pages,
122 file_manager.clone(),
123 config.eviction_policy,
124 ));
125
126 let wal_path = config.path.with_extension("wal");
127 let wal = Arc::new(RwLock::new(Wal::open_with_config(
128 &wal_path,
129 config.wal_config.clone(),
130 config.storage_limits.max_wal_size,
131 )?));
132
133 let free_list = Arc::new(RwLock::new(FreeList::new(PageId::from(
134 constants::FREELIST_ROOT_PAGE,
135 ))));
136
137 Ok(StorageEngine {
138 config,
139 file_manager,
140 buffer_pool,
141 wal,
142 free_list,
143 })
144 }
145
146 pub fn buffer_pool(&self) -> &Arc<BufferPool> {
148 &self.buffer_pool
149 }
150
151 pub fn file_manager(&self) -> &Arc<FileManager> {
153 &self.file_manager
154 }
155
156 pub fn wal(&self) -> &Arc<RwLock<Wal>> {
158 &self.wal
159 }
160
161 pub fn allocate_page(&self) -> Result<PageId> {
163 self.free_list.write().allocate(&self.buffer_pool)
164 }
165
166 pub fn free_page(&self, page_id: PageId) -> Result<()> {
168 self.free_list.write().free(page_id, &self.buffer_pool)
169 }
170
171 pub fn sync(&self) -> Result<()> {
176 self.file_manager.sync()
177 }
178
179 pub fn checkpoint(&self) -> Result<Lsn> {
181 self.buffer_pool.flush_all()?;
183
184 self.sync()?;
186
187 let lsn = self.wal.read().current_lsn();
189 self.wal.write().truncate()?;
190
191 Ok(lsn)
192 }
193
194 pub fn shutdown(&self) -> Result<()> {
196 self.buffer_pool.flush_all()?;
198 self.sync()?;
200 self.wal.write().truncate()?;
202 self.wal.write().shutdown()?;
204 Ok(())
205 }
206
207 pub fn recover(&self) -> Result<RecoveryStats> {
212 let wal = self.wal.read();
213
214 if !wal.needs_recovery() {
215 return Ok(RecoveryStats::default());
216 }
217
218 let mut committed_txns = HashSet::new();
220 let mut aborted_txns = HashSet::new();
221 let mut total_records = 0u64;
222
223 for record in wal.iter()? {
224 let record = record?;
225 total_records += 1;
226 match record.record_type {
227 WalRecordType::Commit => {
228 committed_txns.insert(record.txn_id);
229 }
230 WalRecordType::Abort => {
231 aborted_txns.insert(record.txn_id);
232 }
233 _ => {}
234 }
235 }
236
237 let mut pages_recovered = 0u64;
239 for record in wal.iter()? {
240 let record = record?;
241 if record.record_type == WalRecordType::Write && committed_txns.contains(&record.txn_id)
242 {
243 let page_id = PageId::from(record.page_id);
244 let mut guard = self.buffer_pool.get_page_for_write(page_id)?;
245 guard.apply_redo(&record.new_data)?;
246 pages_recovered += 1;
247 }
248 }
249
250 if pages_recovered > 0 {
252 self.buffer_pool.flush_all()?;
253 self.sync()?;
254 }
255
256 drop(wal);
258 self.wal.write().truncate()?;
259
260 Ok(RecoveryStats {
261 wal_records: total_records,
262 committed_txns: committed_txns.len(),
263 aborted_txns: aborted_txns.len(),
264 pages_recovered,
265 })
266 }
267
268 pub fn sync_on_commit(&self) -> bool {
270 self.config.sync_on_commit
271 }
272
273 pub fn config(&self) -> &Config {
275 &self.config
276 }
277
278 pub fn wal_stats(&self) -> WalStatsSnapshot {
280 self.wal.read().stats().snapshot()
281 }
282
283 pub fn compression_stats(&self) -> CompressionStatsSnapshot {
285 self.file_manager.compression_stats().snapshot()
286 }
287
288 pub fn storage_quota(&self) -> StorageQuota {
293 let wal = self.wal.read();
294 StorageQuota {
295 used_bytes: self.file_manager.current_size(),
296 limit_bytes: self.file_manager.max_size(),
297 wal_used_bytes: wal.current_size(),
298 wal_limit_bytes: wal.max_size(),
299 disk_available_bytes: 0, }
301 }
302}
303
304#[cfg(test)]
305mod tests {
306 use super::*;
307 use tempfile::TempDir;
308
309 #[test]
310 fn test_storage_engine_stats_exposure() {
311 let tmp = TempDir::new().unwrap();
312 let db_path = tmp.path().join("test.db");
313
314 let config = Config {
315 path: db_path,
316 ..Default::default()
317 };
318
319 let engine = StorageEngine::open(config).unwrap();
320
321 let wal_stats = engine.wal_stats();
323 assert_eq!(wal_stats.total_records, 0); assert_eq!(wal_stats.group_commits, 0);
325 assert_eq!(wal_stats.fsync_count, 0);
326 assert_eq!(wal_stats.average_batch_size(), 0.0);
327
328 let compression_stats = engine.compression_stats();
330 assert_eq!(compression_stats.pages_compressed, 0);
331 assert_eq!(compression_stats.pages_decompressed, 0);
332 assert_eq!(compression_stats.compression_ratio(), 1.0);
333 assert_eq!(compression_stats.space_savings_percent(), 0.0);
334 }
335}