Skip to main content

we_trust_sqlite/reader/
wal.rs

1use yykv_types::DsError;
2type Result<T> = std::result::Result<T, DsError>;
3use parking_lot::Mutex;
4use std::collections::HashMap;
5use tokio::{
6    fs::File,
7    io::{AsyncReadExt, AsyncSeekExt, SeekFrom},
8};
9
10/// WAL 帧头部
11#[derive(Debug)]
12pub struct WalFrameHeader {
13    pub page_id: u32,
14    pub db_size: u32,
15    pub salt1: u32,
16    pub salt2: u32,
17    pub checksum1: u32,
18    pub checksum2: u32,
19}
20
21/// WAL 文件管理器
22pub struct WalReader {
23    wal_path: String,
24    page_size: u32,
25    index_cache: Mutex<HashMap<u32, u64>>, // page_id -> frame_offset
26    last_indexed_offset: Mutex<u64>,       // 上次建立索引的文件偏移量
27}
28
29impl WalReader {
30    pub fn new(db_path: &str, page_size: u32) -> Self {
31        Self {
32            wal_path: format!("{}-wal", db_path),
33            page_size,
34            index_cache: Mutex::new(HashMap::new()),
35            last_indexed_offset: Mutex::new(32), // WAL Header 是 32 字节
36        }
37    }
38
39    /// 增量更新 WAL 索引
40    async fn update_index(&self, file: &mut File) -> Result<()> {
41        let metadata = file
42            .metadata()
43            .await
44            .map_err(|e| DsError::io(e.to_string()))?;
45        let file_len = metadata.len();
46
47        let start_offset = {
48            let last_off = self.last_indexed_offset.lock();
49            if file_len <= *last_off {
50                return Ok(());
51            }
52            *last_off
53        };
54
55        let frame_size = 24 + self.page_size as u64;
56        let mut current_offset = start_offset;
57
58        while current_offset + frame_size <= file_len {
59            file.seek(SeekFrom::Start(current_offset))
60                .await
61                .map_err(|e| DsError::io(e.to_string()))?;
62            let mut frame_header_buf = [0u8; 4]; // 只需要前 4 字节 page_id
63            file.read_exact(&mut frame_header_buf)
64                .await
65                .map_err(|e| DsError::io(e.to_string()))?;
66
67            let frame_page_id = u32::from_be_bytes(frame_header_buf);
68
69            {
70                let mut cache = self.index_cache.lock();
71                cache.insert(frame_page_id, current_offset);
72            }
73
74            current_offset += frame_size;
75        }
76
77        {
78            let mut last_off = self.last_indexed_offset.lock();
79            if current_offset > *last_off {
80                *last_off = current_offset;
81            }
82        }
83        Ok(())
84    }
85
86    /// 尝试从 WAL 文件中查找最新的页面内容
87    pub async fn read_page(&self, page_id: u32) -> Result<Option<Vec<u8>>> {
88        let mut file = match File::open(&self.wal_path).await {
89            Ok(f) => f,
90            Err(_) => return Ok(None),
91        };
92
93        // 增量扫描
94        self.update_index(&mut file).await?;
95
96        // 查缓存
97        let offset = {
98            let cache = self.index_cache.lock();
99            cache.get(&page_id).cloned()
100        };
101
102        if let Some(off) = offset {
103            file.seek(SeekFrom::Start(off + 24))
104                .await
105                .map_err(|e| DsError::io(e.to_string()))?;
106            let mut page_data = vec![0u8; self.page_size as usize];
107            file.read_exact(&mut page_data)
108                .await
109                .map_err(|e| DsError::io(e.to_string()))?;
110            return Ok(Some(page_data));
111        }
112
113        Ok(None)
114    }
115}