we_trust_sqlite/writer/
payload.rs1use super::*;
2use yykv_types::DsError;
3
4impl NativeWriter {
5 pub(super) fn get_stored_payload_info(
7 &self,
8 page_type: u8,
9 payload_size: u64,
10 page_size: u32,
11 ) -> (usize, bool) {
12 let u = page_size;
13 let (x, m) = match page_type {
14 0x0D => (u - 35, ((u - 12) * 32) / 255 + 23), 0x0A | 0x02 => {
16 let x = ((u - 12) * 64) / 255 - 23;
18 let m = ((u - 12) * 32) / 255 + 23;
19 (x, m)
20 }
21 _ => return (payload_size as usize, false),
22 };
23
24 if payload_size <= x as u64 {
25 (payload_size as usize, false)
26 } else {
27 let k = m + ((payload_size - m as u64) % (u as u64 - 4)) as u32;
28 let stored_len = if k <= x { k } else { m };
29 (stored_len as usize, true)
30 }
31 }
32
33 pub(super) async fn write_overflow_pages(&self, data: &[u8]) -> Result<u32> {
35 let page_size = self.get_page_size().await?;
36 let usable_size = page_size;
37 let overflow_payload_limit = usable_size - 4;
38
39 let mut first_page_id = 0;
40 let mut last_page_id = 0;
41 let mut remaining_data = data;
42
43 while !remaining_data.is_empty() {
44 let new_page_id = self.allocate_page().await?;
45 if first_page_id == 0 {
46 first_page_id = new_page_id;
47 }
48
49 if last_page_id != 0 {
50 let mut file = OpenOptions::new()
52 .write(true)
53 .open(&self.path)
54 .await
55 .map_err(|e| DsError::io(format!("Failed to open database file: {}", e)))?;
56 let prev_offset = (last_page_id as u64 - 1) * page_size as u64;
57 file.seek(SeekFrom::Start(prev_offset))
58 .await
59 .map_err(|e| DsError::io(e.to_string()))?;
60 file.write_all(&new_page_id.to_be_bytes())
61 .await
62 .map_err(|e| DsError::io(e.to_string()))?;
63 }
64
65 let chunk_size = std::cmp::min(remaining_data.len(), overflow_payload_limit as usize);
66 let mut page_data = vec![0u8; page_size as usize];
67 page_data[0..4].copy_from_slice(&0u32.to_be_bytes());
69 page_data[4..4 + chunk_size].copy_from_slice(&remaining_data[..chunk_size]);
70
71 let mut file = OpenOptions::new()
72 .write(true)
73 .open(&self.path)
74 .await
75 .map_err(|e| DsError::io(format!("Failed to open database file: {}", e)))?;
76 let offset = (new_page_id as u64 - 1) * page_size as u64;
77 file.seek(SeekFrom::Start(offset))
78 .await
79 .map_err(|e| DsError::io(e.to_string()))?;
80 file.write_all(&page_data)
81 .await
82 .map_err(|e| DsError::io(e.to_string()))?;
83
84 remaining_data = &remaining_data[chunk_size..];
85 last_page_id = new_page_id;
86 }
87
88 Ok(first_page_id)
89 }
90
91 pub async fn read_full_payload(
93 &self,
94 page_data: &[u8],
95 cell_offset: usize,
96 page_type: u8,
97 page_size: u32,
98 ) -> Result<Vec<u8>> {
99 let (payload_size, consumed) = parse_varint(&page_data[cell_offset..]);
100 let (stored_len, has_overflow) =
101 self.get_stored_payload_info(page_type, payload_size, page_size);
102
103 let mut payload =
104 page_data[cell_offset + consumed..cell_offset + consumed + stored_len].to_vec();
105
106 if has_overflow {
107 let mut next_page_id = u32::from_be_bytes([
108 page_data[cell_offset + consumed + stored_len],
109 page_data[cell_offset + consumed + stored_len + 1],
110 page_data[cell_offset + consumed + stored_len + 2],
111 page_data[cell_offset + consumed + stored_len + 3],
112 ]);
113
114 let mut file = File::open(&self.path)
115 .await
116 .map_err(|e| DsError::io(format!("Failed to open database file: {}", e)))?;
117 while next_page_id > 0 {
118 let offset = (next_page_id as u64 - 1) * page_size as u64;
119 file.seek(SeekFrom::Start(offset))
120 .await
121 .map_err(|e| DsError::io(e.to_string()))?;
122 let mut ovfl_header = [0u8; 4];
123 file.read_exact(&mut ovfl_header)
124 .await
125 .map_err(|e| DsError::io(e.to_string()))?;
126
127 let next_ovfl_id = u32::from_be_bytes(ovfl_header);
128 let remaining = payload_size as usize - payload.len();
129 let chunk_size = std::cmp::min(remaining, (page_size - 4) as usize);
130
131 let mut chunk = vec![0u8; chunk_size];
132 file.read_exact(&mut chunk)
133 .await
134 .map_err(|e| DsError::io(e.to_string()))?;
135 payload.extend(chunk);
136
137 next_page_id = next_ovfl_id;
138 }
139 }
140
141 Ok(payload)
142 }
143}