we_trust_sqlite/reader/
wal.rs1use yykv_types::DsError;
2type Result<T> = std::result::Result<T, DsError>;
3use parking_lot::Mutex;
4use std::collections::HashMap;
5use tokio::{
6 fs::File,
7 io::{AsyncReadExt, AsyncSeekExt, SeekFrom},
8};
9
10#[derive(Debug)]
12pub struct WalFrameHeader {
13 pub page_id: u32,
14 pub db_size: u32,
15 pub salt1: u32,
16 pub salt2: u32,
17 pub checksum1: u32,
18 pub checksum2: u32,
19}
20
21pub struct WalReader {
23 wal_path: String,
24 page_size: u32,
25 index_cache: Mutex<HashMap<u32, u64>>, last_indexed_offset: Mutex<u64>, }
28
29impl WalReader {
30 pub fn new(db_path: &str, page_size: u32) -> Self {
31 Self {
32 wal_path: format!("{}-wal", db_path),
33 page_size,
34 index_cache: Mutex::new(HashMap::new()),
35 last_indexed_offset: Mutex::new(32), }
37 }
38
39 async fn update_index(&self, file: &mut File) -> Result<()> {
41 let metadata = file
42 .metadata()
43 .await
44 .map_err(|e| DsError::io(e.to_string()))?;
45 let file_len = metadata.len();
46
47 let start_offset = {
48 let last_off = self.last_indexed_offset.lock();
49 if file_len <= *last_off {
50 return Ok(());
51 }
52 *last_off
53 };
54
55 let frame_size = 24 + self.page_size as u64;
56 let mut current_offset = start_offset;
57
58 while current_offset + frame_size <= file_len {
59 file.seek(SeekFrom::Start(current_offset))
60 .await
61 .map_err(|e| DsError::io(e.to_string()))?;
62 let mut frame_header_buf = [0u8; 4]; file.read_exact(&mut frame_header_buf)
64 .await
65 .map_err(|e| DsError::io(e.to_string()))?;
66
67 let frame_page_id = u32::from_be_bytes(frame_header_buf);
68
69 {
70 let mut cache = self.index_cache.lock();
71 cache.insert(frame_page_id, current_offset);
72 }
73
74 current_offset += frame_size;
75 }
76
77 {
78 let mut last_off = self.last_indexed_offset.lock();
79 if current_offset > *last_off {
80 *last_off = current_offset;
81 }
82 }
83 Ok(())
84 }
85
86 pub async fn read_page(&self, page_id: u32) -> Result<Option<Vec<u8>>> {
88 let mut file = match File::open(&self.wal_path).await {
89 Ok(f) => f,
90 Err(_) => return Ok(None),
91 };
92
93 self.update_index(&mut file).await?;
95
96 let offset = {
98 let cache = self.index_cache.lock();
99 cache.get(&page_id).cloned()
100 };
101
102 if let Some(off) = offset {
103 file.seek(SeekFrom::Start(off + 24))
104 .await
105 .map_err(|e| DsError::io(e.to_string()))?;
106 let mut page_data = vec![0u8; self.page_size as usize];
107 file.read_exact(&mut page_data)
108 .await
109 .map_err(|e| DsError::io(e.to_string()))?;
110 return Ok(Some(page_data));
111 }
112
113 Ok(None)
114 }
115}