Skip to main content

regulus_db/persistence/
mod.rs

1pub mod wal;
2pub mod snapshot;
3
4use std::path::Path;
5use crate::storage::{MemoryEngine, StorageEngine, RowId};
6use crate::persistence::wal::{WalManager, WalOperation};
7use crate::persistence::snapshot::SnapshotManager;
8use crate::types::{DbResult, DbError};
9
10/// 持久化管理器
11pub struct PersistenceManager {
12    wal: WalManager,
13    snapshot: SnapshotManager,
14    checkpoint_threshold: usize,  // WAL 文件大小阈值(字节)
15    wal_size: u64,
16}
17
18impl PersistenceManager {
19    pub fn new(base_path: &Path) -> DbResult<Self> {
20        // 确保目录存在
21        std::fs::create_dir_all(base_path)
22            .map_err(|e| DbError::IoError(e))?;
23
24        let wal_path = base_path.join("data.rdb.wal");
25        let wal = WalManager::new(&wal_path)?;
26        let snapshot = SnapshotManager::new(base_path);
27
28        Ok(PersistenceManager {
29            wal,
30            snapshot,
31            checkpoint_threshold: 10 * 1024 * 1024, // 10MB
32            wal_size: 0,
33        })
34    }
35
36    /// 恢复数据库
37    pub fn restore(&mut self) -> DbResult<MemoryEngine> {
38        // 1. 加载快照
39        let mut engine = if let Some(e) = self.snapshot.load()? {
40            e
41        } else {
42            MemoryEngine::new()
43        };
44
45        // 2. 重放 WAL
46        self.wal.replay(|op| {
47            Self::apply_operation(&mut engine, op)
48        })?;
49
50        // 3. 更新 WAL 大小
51        self.wal_size = self.wal.size()?;
52
53        Ok(engine)
54    }
55
56    /// 应用单个 WAL 操作
57    fn apply_operation(engine: &mut MemoryEngine, op: WalOperation) -> DbResult<()> {
58        match op {
59            WalOperation::CreateTable { schema } => {
60                engine.create_table(schema)?;
61            }
62            WalOperation::DropTable { name } => {
63                engine.drop_table(&name)?;
64            }
65            WalOperation::Insert { table, row_id, row } => {
66                // 直接插入到指定 row_id(用于恢复)
67                engine.insert_restored(&table, RowId(row_id), row)?;
68            }
69            WalOperation::Update { table, row_id, row } => {
70                engine.update(&table, RowId(row_id), row)?;
71            }
72            WalOperation::Delete { table, row_id } => {
73                engine.delete(&table, RowId(row_id))?;
74            }
75        }
76        Ok(())
77    }
78
79    /// 记录操作到 WAL
80    pub fn log_operation(&mut self, op: WalOperation) -> DbResult<u64> {
81        let lsn = self.wal.append(op)?;
82        self.wal_size = self.wal.size()?;
83        Ok(lsn)
84    }
85
86    /// 检查是否需要 checkpoint
87    pub fn needs_checkpoint(&self) -> bool {
88        self.wal_size > self.checkpoint_threshold as u64
89    }
90
91    /// 执行 checkpoint
92    pub fn checkpoint(&mut self, engine: &MemoryEngine) -> DbResult<()> {
93        // 1. 保存快照
94        self.snapshot.save(engine)?;
95
96        // 2. 截断 WAL
97        self.wal.truncate()?;
98        self.wal_size = 0;
99
100        Ok(())
101    }
102
103    /// 获取 WAL 大小
104    pub fn wal_size(&self) -> u64 {
105        self.wal_size
106    }
107}