Skip to main content

we_trust_sqlite/writer/
payload.rs

1use super::*;
2use yykv_types::DsError;
3
4impl NativeWriter {
5    /// 计算在页面上存储的 payload 长度
6    pub(super) fn get_stored_payload_info(
7        &self,
8        page_type: u8,
9        payload_size: u64,
10        page_size: u32,
11    ) -> (usize, bool) {
12        let u = page_size;
13        let (x, m) = match page_type {
14            0x0D => (u - 35, ((u - 12) * 32) / 255 + 23), // Table Leaf
15            0x0A | 0x02 => {
16                // Index Leaf or Interior
17                let x = ((u - 12) * 64) / 255 - 23;
18                let m = ((u - 12) * 32) / 255 + 23;
19                (x, m)
20            }
21            _ => return (payload_size as usize, false),
22        };
23
24        if payload_size <= x as u64 {
25            (payload_size as usize, false)
26        } else {
27            let k = m + ((payload_size - m as u64) % (u as u64 - 4)) as u32;
28            let stored_len = if k <= x { k } else { m };
29            (stored_len as usize, true)
30        }
31    }
32
33    /// 处理溢出页写入,返回第一个溢出页 ID
34    pub(super) async fn write_overflow_pages(&self, data: &[u8]) -> Result<u32> {
35        let page_size = self.get_page_size().await?;
36        let usable_size = page_size;
37        let overflow_payload_limit = usable_size - 4;
38
39        let mut first_page_id = 0;
40        let mut last_page_id = 0;
41        let mut remaining_data = data;
42
43        while !remaining_data.is_empty() {
44            let new_page_id = self.allocate_page().await?;
45            if first_page_id == 0 {
46                first_page_id = new_page_id;
47            }
48
49            if last_page_id != 0 {
50                // 更新上一个溢出页的 next 指针
51                let mut file = OpenOptions::new()
52                    .write(true)
53                    .open(&self.path)
54                    .await
55                    .map_err(|e| DsError::io(format!("Failed to open database file: {}", e)))?;
56                let prev_offset = (last_page_id as u64 - 1) * page_size as u64;
57                file.seek(SeekFrom::Start(prev_offset))
58                    .await
59                    .map_err(|e| DsError::io(e.to_string()))?;
60                file.write_all(&new_page_id.to_be_bytes())
61                    .await
62                    .map_err(|e| DsError::io(e.to_string()))?;
63            }
64
65            let chunk_size = std::cmp::min(remaining_data.len(), overflow_payload_limit as usize);
66            let mut page_data = vec![0u8; page_size as usize];
67            // next page pointer (initially 0)
68            page_data[0..4].copy_from_slice(&0u32.to_be_bytes());
69            page_data[4..4 + chunk_size].copy_from_slice(&remaining_data[..chunk_size]);
70
71            let mut file = OpenOptions::new()
72                .write(true)
73                .open(&self.path)
74                .await
75                .map_err(|e| DsError::io(format!("Failed to open database file: {}", e)))?;
76            let offset = (new_page_id as u64 - 1) * page_size as u64;
77            file.seek(SeekFrom::Start(offset))
78                .await
79                .map_err(|e| DsError::io(e.to_string()))?;
80            file.write_all(&page_data)
81                .await
82                .map_err(|e| DsError::io(e.to_string()))?;
83
84            remaining_data = &remaining_data[chunk_size..];
85            last_page_id = new_page_id;
86        }
87
88        Ok(first_page_id)
89    }
90
91    /// 读取完整的 payload (处理溢出页)
92    pub async fn read_full_payload(
93        &self,
94        page_data: &[u8],
95        cell_offset: usize,
96        page_type: u8,
97        page_size: u32,
98    ) -> Result<Vec<u8>> {
99        let (payload_size, consumed) = parse_varint(&page_data[cell_offset..]);
100        let (stored_len, has_overflow) =
101            self.get_stored_payload_info(page_type, payload_size, page_size);
102
103        let mut payload =
104            page_data[cell_offset + consumed..cell_offset + consumed + stored_len].to_vec();
105
106        if has_overflow {
107            let mut next_page_id = u32::from_be_bytes([
108                page_data[cell_offset + consumed + stored_len],
109                page_data[cell_offset + consumed + stored_len + 1],
110                page_data[cell_offset + consumed + stored_len + 2],
111                page_data[cell_offset + consumed + stored_len + 3],
112            ]);
113
114            let mut file = File::open(&self.path)
115                .await
116                .map_err(|e| DsError::io(format!("Failed to open database file: {}", e)))?;
117            while next_page_id > 0 {
118                let offset = (next_page_id as u64 - 1) * page_size as u64;
119                file.seek(SeekFrom::Start(offset))
120                    .await
121                    .map_err(|e| DsError::io(e.to_string()))?;
122                let mut ovfl_header = [0u8; 4];
123                file.read_exact(&mut ovfl_header)
124                    .await
125                    .map_err(|e| DsError::io(e.to_string()))?;
126
127                let next_ovfl_id = u32::from_be_bytes(ovfl_header);
128                let remaining = payload_size as usize - payload.len();
129                let chunk_size = std::cmp::min(remaining, (page_size - 4) as usize);
130
131                let mut chunk = vec![0u8; chunk_size];
132                file.read_exact(&mut chunk)
133                    .await
134                    .map_err(|e| DsError::io(e.to_string()))?;
135                payload.extend(chunk);
136
137                next_page_id = next_ovfl_id;
138            }
139        }
140
141        Ok(payload)
142    }
143}