Skip to main content

we_trust_sqlite/writer/
wal.rs

1use super::*;
2use yykv_types::DsError;
3
4type Result<T> = std::result::Result<T, DsError>;
5
6impl NativeWriter {
7    /// 执行 WAL Checkpoint,将 WAL 中的页面同步回主数据库
8    pub async fn checkpoint(&self) -> Result<()> {
9        let wal_path = format!("{}-wal", self.path);
10        let mut wal_file = match File::open(&wal_path).await {
11            Ok(f) => f,
12            Err(_) => return Ok(()), // WAL 不存在,无需 checkpoint
13        };
14
15        let page_size = self.get_page_size().await?;
16        let mut header = [0u8; 32];
17        wal_file.read_exact(&mut header).await?;
18
19        // 验证 Magic
20        let magic = u32::from_be_bytes([header[0], header[1], header[2], header[3]]);
21        if magic != 0x377f0682 && magic != 0x377f0683 {
22            return Err(DsError::query("Invalid WAL magic"));
23        }
24
25        let mut db_file = OpenOptions::new().write(true).open(&self.path).await?;
26        let mut frame_header = [0u8; 24];
27        let mut page_data = vec![0u8; page_size as usize];
28
29        while wal_file.read_exact(&mut frame_header).await.is_ok() {
30            let page_id = u32::from_be_bytes([
31                frame_header[0],
32                frame_header[1],
33                frame_header[2],
34                frame_header[3],
35            ]);
36            wal_file.read_exact(&mut page_data).await?;
37
38            let offset = (page_id as u64 - 1) * page_size as u64;
39            db_file.seek(SeekFrom::Start(offset)).await?;
40            db_file.write_all(&page_data).await?;
41        }
42
43        // Checkpoint 完成后通常需要截断或删除 WAL
44        drop(wal_file);
45        tokio::fs::remove_file(&wal_path).await?;
46
47        // 同时删除 WAL-index 文件(如果存在)
48        let shm_path = format!("{}-shm", self.path);
49        let _ = tokio::fs::remove_file(&shm_path).await;
50
51        Ok(())
52    }
53}
54
55/// WAL 写入器
56pub struct WalWriter {
57    wal_path: String,
58    page_size: u32,
59    salt1: u32,
60    salt2: u32,
61}
62
63impl WalWriter {
64    pub fn new(db_path: &str, page_size: u32) -> Self {
65        Self {
66            wal_path: format!("{}-wal", db_path),
67            page_size,
68            salt1: 0x12345678, // 实际应用中应随机生成
69            salt2: 0x87654321,
70        }
71    }
72
73    /// 写入 WAL Header
74    async fn write_header(&self, file: &mut File) -> Result<()> {
75        let mut header = [0u8; 32];
76        header[0..4].copy_from_slice(&0x377f0682u32.to_be_bytes()); // Magic
77        header[4..8].copy_from_slice(&3007000u32.to_be_bytes()); // Version
78        header[8..12].copy_from_slice(&self.page_size.to_be_bytes());
79        header[12..16].copy_from_slice(&0u32.to_be_bytes()); // Checkpoint seq
80        header[16..20].copy_from_slice(&self.salt1.to_be_bytes());
81        header[20..24].copy_from_slice(&self.salt2.to_be_bytes());
82        // Checksum (暂时填充 0)
83        header[24..28].copy_from_slice(&0u32.to_be_bytes());
84        header[28..32].copy_from_slice(&0u32.to_be_bytes());
85
86        file.seek(SeekFrom::Start(0)).await?;
87        file.write_all(&header).await?;
88        Ok(())
89    }
90
91    /// 写入一个 WAL 帧
92    pub async fn write_frame(&self, page_id: u32, db_size: u32, data: &[u8]) -> Result<()> {
93        let mut file = OpenOptions::new()
94            .create(true)
95            .read(true)
96            .write(true)
97            .open(&self.wal_path)
98            .await?;
99
100        let metadata = file.metadata().await?;
101        if metadata.len() == 0 {
102            self.write_header(&mut file).await?;
103        }
104
105        file.seek(SeekFrom::End(0)).await?;
106
107        let mut frame_header = [0u8; 24];
108        frame_header[0..4].copy_from_slice(&page_id.to_be_bytes());
109        frame_header[4..8].copy_from_slice(&db_size.to_be_bytes());
110        frame_header[8..12].copy_from_slice(&self.salt1.to_be_bytes());
111        frame_header[12..16].copy_from_slice(&self.salt2.to_be_bytes());
112        // Checksum (暂时填充 0)
113        frame_header[16..20].copy_from_slice(&0u32.to_be_bytes());
114        frame_header[20..24].copy_from_slice(&0u32.to_be_bytes());
115
116        file.write_all(&frame_header).await?;
117        file.write_all(data).await?;
118        Ok(())
119    }
120}