we_trust_sqlite/writer/
wal.rs1use super::*;
2use yykv_types::DsError;
3
4type Result<T> = std::result::Result<T, DsError>;
5
6impl NativeWriter {
7 pub async fn checkpoint(&self) -> Result<()> {
9 let wal_path = format!("{}-wal", self.path);
10 let mut wal_file = match File::open(&wal_path).await {
11 Ok(f) => f,
12 Err(_) => return Ok(()), };
14
15 let page_size = self.get_page_size().await?;
16 let mut header = [0u8; 32];
17 wal_file.read_exact(&mut header).await?;
18
19 let magic = u32::from_be_bytes([header[0], header[1], header[2], header[3]]);
21 if magic != 0x377f0682 && magic != 0x377f0683 {
22 return Err(DsError::query("Invalid WAL magic"));
23 }
24
25 let mut db_file = OpenOptions::new().write(true).open(&self.path).await?;
26 let mut frame_header = [0u8; 24];
27 let mut page_data = vec![0u8; page_size as usize];
28
29 while wal_file.read_exact(&mut frame_header).await.is_ok() {
30 let page_id = u32::from_be_bytes([
31 frame_header[0],
32 frame_header[1],
33 frame_header[2],
34 frame_header[3],
35 ]);
36 wal_file.read_exact(&mut page_data).await?;
37
38 let offset = (page_id as u64 - 1) * page_size as u64;
39 db_file.seek(SeekFrom::Start(offset)).await?;
40 db_file.write_all(&page_data).await?;
41 }
42
43 drop(wal_file);
45 tokio::fs::remove_file(&wal_path).await?;
46
47 let shm_path = format!("{}-shm", self.path);
49 let _ = tokio::fs::remove_file(&shm_path).await;
50
51 Ok(())
52 }
53}
54
55pub struct WalWriter {
57 wal_path: String,
58 page_size: u32,
59 salt1: u32,
60 salt2: u32,
61}
62
63impl WalWriter {
64 pub fn new(db_path: &str, page_size: u32) -> Self {
65 Self {
66 wal_path: format!("{}-wal", db_path),
67 page_size,
68 salt1: 0x12345678, salt2: 0x87654321,
70 }
71 }
72
73 async fn write_header(&self, file: &mut File) -> Result<()> {
75 let mut header = [0u8; 32];
76 header[0..4].copy_from_slice(&0x377f0682u32.to_be_bytes()); header[4..8].copy_from_slice(&3007000u32.to_be_bytes()); header[8..12].copy_from_slice(&self.page_size.to_be_bytes());
79 header[12..16].copy_from_slice(&0u32.to_be_bytes()); header[16..20].copy_from_slice(&self.salt1.to_be_bytes());
81 header[20..24].copy_from_slice(&self.salt2.to_be_bytes());
82 header[24..28].copy_from_slice(&0u32.to_be_bytes());
84 header[28..32].copy_from_slice(&0u32.to_be_bytes());
85
86 file.seek(SeekFrom::Start(0)).await?;
87 file.write_all(&header).await?;
88 Ok(())
89 }
90
91 pub async fn write_frame(&self, page_id: u32, db_size: u32, data: &[u8]) -> Result<()> {
93 let mut file = OpenOptions::new()
94 .create(true)
95 .read(true)
96 .write(true)
97 .open(&self.wal_path)
98 .await?;
99
100 let metadata = file.metadata().await?;
101 if metadata.len() == 0 {
102 self.write_header(&mut file).await?;
103 }
104
105 file.seek(SeekFrom::End(0)).await?;
106
107 let mut frame_header = [0u8; 24];
108 frame_header[0..4].copy_from_slice(&page_id.to_be_bytes());
109 frame_header[4..8].copy_from_slice(&db_size.to_be_bytes());
110 frame_header[8..12].copy_from_slice(&self.salt1.to_be_bytes());
111 frame_header[12..16].copy_from_slice(&self.salt2.to_be_bytes());
112 frame_header[16..20].copy_from_slice(&0u32.to_be_bytes());
114 frame_header[20..24].copy_from_slice(&0u32.to_be_bytes());
115
116 file.write_all(&frame_header).await?;
117 file.write_all(data).await?;
118 Ok(())
119 }
120}