Skip to main content

featherdb_storage/
lib.rs

1//! Storage engine for FeatherDB
2//!
3//! This crate provides the low-level storage components:
4//! - Page management and file I/O
5//! - B-tree data structure
6//! - Buffer pool with pluggable eviction policies (Clock, LRU-2, LIRS)
7//! - Write-ahead logging (WAL)
8//! - Crash recovery
9//! - Adaptive page compression (LZ4/ZSTD)
10
11mod btree;
12mod buffer_pool;
13pub mod compression;
14mod eviction;
15mod file;
16mod freelist;
17mod page;
18mod wal;
19
20pub use btree::BTree;
21pub use buffer_pool::{BufferPool, BufferPoolStats, BufferPoolStatsSnapshot, PageGuard};
22pub use compression::{
23    create_compressor, CompressedPageHeader, CompressionStats, CompressionStatsSnapshot,
24    CompressionType, Compressor, PageCompressor,
25};
26pub use eviction::{
27    ClockEviction, EvictionPolicy, EvictionPolicyFactory, EvictionPolicyType, LirsEviction,
28    Lru2Eviction,
29};
30pub use file::FileManager;
31pub use freelist::FreeList;
32pub use page::{Page, PageType, SlotEntry};
33pub use wal::{Wal, WalRecord, WalRecordType, WalStats, WalStatsSnapshot};
34
35/// Statistics returned from crash recovery
36#[derive(Debug, Clone, Default)]
37pub struct RecoveryStats {
38    /// Total WAL records scanned
39    pub wal_records: u64,
40    /// Number of committed transactions found
41    pub committed_txns: usize,
42    /// Number of aborted transactions found
43    pub aborted_txns: usize,
44    /// Number of pages recovered (redo applied)
45    pub pages_recovered: u64,
46}
47
48/// Storage quota information
49///
50/// Provides information about current database and WAL file sizes,
51/// configured limits, and available disk space.
52#[derive(Debug, Clone)]
53pub struct StorageQuota {
54    /// Current database file size in bytes
55    pub used_bytes: u64,
56    /// Maximum database size in bytes (None if unlimited)
57    pub limit_bytes: Option<u64>,
58    /// Current WAL file size in bytes
59    pub wal_used_bytes: u64,
60    /// Maximum WAL size in bytes (None if unlimited)
61    pub wal_limit_bytes: Option<u64>,
62    /// Available disk space in bytes (0 if unknown)
63    pub disk_available_bytes: u64,
64}
65
66impl StorageQuota {
67    /// Calculate remaining capacity before hitting database limit
68    pub fn remaining(&self) -> Option<u64> {
69        self.limit_bytes
70            .map(|limit| limit.saturating_sub(self.used_bytes))
71    }
72
73    /// Calculate usage percentage (0.0 to 100.0)
74    pub fn usage_percent(&self) -> Option<f64> {
75        self.limit_bytes.map(|limit| {
76            if limit == 0 {
77                100.0
78            } else {
79                (self.used_bytes as f64 / limit as f64) * 100.0
80            }
81        })
82    }
83
84    /// Check if database limit has been exceeded
85    pub fn is_exceeded(&self) -> bool {
86        self.limit_bytes
87            .is_some_and(|limit| self.used_bytes >= limit)
88    }
89
90    /// Check if WAL limit has been exceeded
91    pub fn is_wal_exceeded(&self) -> bool {
92        self.wal_limit_bytes
93            .is_some_and(|limit| self.wal_used_bytes >= limit)
94    }
95}
96
97use featherdb_core::{constants, Config, Lsn, PageId, Result};
98use parking_lot::RwLock;
99use std::collections::HashSet;
100use std::sync::Arc;
101
102/// The storage engine, managing all disk I/O and page caching
103pub struct StorageEngine {
104    config: Config,
105    file_manager: Arc<FileManager>,
106    buffer_pool: Arc<BufferPool>,
107    wal: Arc<RwLock<Wal>>,
108    free_list: Arc<RwLock<FreeList>>,
109}
110
111impl StorageEngine {
112    /// Open or create a storage engine
113    pub fn open(config: Config) -> Result<Self> {
114        let file_manager = Arc::new(FileManager::open(&config)?);
115
116        // Initialize or validate the superblock
117        file_manager.init_if_needed(&config)?;
118
119        // Create buffer pool with configured eviction policy
120        let buffer_pool = Arc::new(BufferPool::with_policy(
121            config.buffer_pool_pages,
122            file_manager.clone(),
123            config.eviction_policy,
124        ));
125
126        let wal_path = config.path.with_extension("wal");
127        let wal = Arc::new(RwLock::new(Wal::open_with_config(
128            &wal_path,
129            config.wal_config.clone(),
130            config.storage_limits.max_wal_size,
131        )?));
132
133        let free_list = Arc::new(RwLock::new(FreeList::new(PageId::from(
134            constants::FREELIST_ROOT_PAGE,
135        ))));
136
137        Ok(StorageEngine {
138            config,
139            file_manager,
140            buffer_pool,
141            wal,
142            free_list,
143        })
144    }
145
146    /// Get the buffer pool
147    pub fn buffer_pool(&self) -> &Arc<BufferPool> {
148        &self.buffer_pool
149    }
150
151    /// Get the file manager
152    pub fn file_manager(&self) -> &Arc<FileManager> {
153        &self.file_manager
154    }
155
156    /// Get the WAL
157    pub fn wal(&self) -> &Arc<RwLock<Wal>> {
158        &self.wal
159    }
160
161    /// Allocate a new page
162    pub fn allocate_page(&self) -> Result<PageId> {
163        self.free_list.write().allocate(&self.buffer_pool)
164    }
165
166    /// Free a page
167    pub fn free_page(&self, page_id: PageId) -> Result<()> {
168        self.free_list.write().free(page_id, &self.buffer_pool)
169    }
170
171    /// Sync all data to disk
172    ///
173    /// This ensures all buffered writes are persisted to disk.
174    /// Should be called after flush_all() to ensure durability.
175    pub fn sync(&self) -> Result<()> {
176        self.file_manager.sync()
177    }
178
179    /// Checkpoint: flush all dirty pages and truncate WAL
180    pub fn checkpoint(&self) -> Result<Lsn> {
181        // First, flush all dirty pages
182        self.buffer_pool.flush_all()?;
183
184        // Sync to disk
185        self.sync()?;
186
187        // Get current LSN and truncate WAL
188        let lsn = self.wal.read().current_lsn();
189        self.wal.write().truncate()?;
190
191        Ok(lsn)
192    }
193
194    /// Graceful shutdown: flush all pages, sync, and shut down WAL
195    pub fn shutdown(&self) -> Result<()> {
196        // 1. Flush all dirty pages to disk
197        self.buffer_pool.flush_all()?;
198        // 2. Sync data file to disk
199        self.sync()?;
200        // 3. Truncate WAL (checkpoint)
201        self.wal.write().truncate()?;
202        // 4. Shutdown WAL (joins flush thread, final sync)
203        self.wal.write().shutdown()?;
204        Ok(())
205    }
206
207    /// Recover from crash using WAL with two-pass replay
208    ///
209    /// Pass 1: Scan WAL to find committed vs aborted transactions
210    /// Pass 2: Replay only writes from committed transactions
211    pub fn recover(&self) -> Result<RecoveryStats> {
212        let wal = self.wal.read();
213
214        if !wal.needs_recovery() {
215            return Ok(RecoveryStats::default());
216        }
217
218        // Pass 1: Build set of committed transaction IDs
219        let mut committed_txns = HashSet::new();
220        let mut aborted_txns = HashSet::new();
221        let mut total_records = 0u64;
222
223        for record in wal.iter()? {
224            let record = record?;
225            total_records += 1;
226            match record.record_type {
227                WalRecordType::Commit => {
228                    committed_txns.insert(record.txn_id);
229                }
230                WalRecordType::Abort => {
231                    aborted_txns.insert(record.txn_id);
232                }
233                _ => {}
234            }
235        }
236
237        // Pass 2: Apply only writes from committed transactions
238        let mut pages_recovered = 0u64;
239        for record in wal.iter()? {
240            let record = record?;
241            if record.record_type == WalRecordType::Write && committed_txns.contains(&record.txn_id)
242            {
243                let page_id = PageId::from(record.page_id);
244                let mut guard = self.buffer_pool.get_page_for_write(page_id)?;
245                guard.apply_redo(&record.new_data)?;
246                pages_recovered += 1;
247            }
248        }
249
250        // Flush recovered pages and checkpoint
251        if pages_recovered > 0 {
252            self.buffer_pool.flush_all()?;
253            self.sync()?;
254        }
255
256        // Truncate WAL after successful recovery
257        drop(wal);
258        self.wal.write().truncate()?;
259
260        Ok(RecoveryStats {
261            wal_records: total_records,
262            committed_txns: committed_txns.len(),
263            aborted_txns: aborted_txns.len(),
264            pages_recovered,
265        })
266    }
267
268    /// Check if sync-on-commit is enabled
269    pub fn sync_on_commit(&self) -> bool {
270        self.config.sync_on_commit
271    }
272
273    /// Get configuration
274    pub fn config(&self) -> &Config {
275        &self.config
276    }
277
278    /// Get WAL statistics snapshot
279    pub fn wal_stats(&self) -> WalStatsSnapshot {
280        self.wal.read().stats().snapshot()
281    }
282
283    /// Get compression statistics snapshot
284    pub fn compression_stats(&self) -> CompressionStatsSnapshot {
285        self.file_manager.compression_stats().snapshot()
286    }
287
288    /// Get current storage quota information
289    ///
290    /// Returns information about database and WAL file sizes,
291    /// configured limits, and available disk space.
292    pub fn storage_quota(&self) -> StorageQuota {
293        let wal = self.wal.read();
294        StorageQuota {
295            used_bytes: self.file_manager.current_size(),
296            limit_bytes: self.file_manager.max_size(),
297            wal_used_bytes: wal.current_size(),
298            wal_limit_bytes: wal.max_size(),
299            disk_available_bytes: 0, // Not implemented yet (would require platform-specific syscalls)
300        }
301    }
302}
303
304#[cfg(test)]
305mod tests {
306    use super::*;
307    use tempfile::TempDir;
308
309    #[test]
310    fn test_storage_engine_stats_exposure() {
311        let tmp = TempDir::new().unwrap();
312        let db_path = tmp.path().join("test.db");
313
314        let config = Config {
315            path: db_path,
316            ..Default::default()
317        };
318
319        let engine = StorageEngine::open(config).unwrap();
320
321        // Test WAL stats snapshot
322        let wal_stats = engine.wal_stats();
323        assert_eq!(wal_stats.total_records, 0); // No operations yet
324        assert_eq!(wal_stats.group_commits, 0);
325        assert_eq!(wal_stats.fsync_count, 0);
326        assert_eq!(wal_stats.average_batch_size(), 0.0);
327
328        // Test compression stats snapshot
329        let compression_stats = engine.compression_stats();
330        assert_eq!(compression_stats.pages_compressed, 0);
331        assert_eq!(compression_stats.pages_decompressed, 0);
332        assert_eq!(compression_stats.compression_ratio(), 1.0);
333        assert_eq!(compression_stats.space_savings_percent(), 0.0);
334    }
335}