wasm_dbms_memory/table_registry/
raw_table_reader.rs1use wasm_dbms_api::prelude::{DecodeError, MSize, MemoryError, MemoryResult, Page, PageOffset};
11
12use super::page_ledger::PageLedger;
13use super::raw_record::RAW_RECORD_HEADER_SIZE;
14use super::record_address::RecordAddress;
15use crate::MemoryAccess;
16
17#[derive(Debug, Clone, PartialEq, Eq)]
19pub struct RawRecordBytes {
20 pub address: RecordAddress,
22 pub bytes: Vec<u8>,
24}
25
26pub struct RawTableReader<'a, MA>
30where
31 MA: MemoryAccess,
32{
33 mm: &'a mut MA,
34 page_ledger: &'a PageLedger,
35 page_size: usize,
36 alignment: PageOffset,
37 cursor: Option<Cursor>,
38}
39
40#[derive(Debug, Copy, Clone)]
41struct Cursor {
42 page: Page,
43 offset: PageOffset,
44}
45
46impl<'a, MA> RawTableReader<'a, MA>
47where
48 MA: MemoryAccess,
49{
50 pub fn new(page_ledger: &'a PageLedger, alignment: PageOffset, mm: &'a mut MA) -> Self {
54 let page_size = mm.page_size() as usize;
55 let cursor = page_ledger.pages().first().map(|p| Cursor {
56 page: p.page,
57 offset: 0,
58 });
59 Self {
60 mm,
61 page_ledger,
62 page_size,
63 alignment,
64 cursor,
65 }
66 }
67
68 pub fn try_next(&mut self) -> MemoryResult<Option<RawRecordBytes>> {
70 loop {
71 let Some(Cursor { page, offset }) = self.cursor else {
72 return Ok(None);
73 };
74 let aligned_usize = align_up_usize(offset as usize, self.alignment as usize);
75 if aligned_usize + (RAW_RECORD_HEADER_SIZE as usize) > self.page_size {
77 self.cursor = self.next_page(page);
78 continue;
79 }
80 let aligned = aligned_usize as PageOffset;
81 let mut header = [0u8; RAW_RECORD_HEADER_SIZE as usize];
82 self.mm.read_at_raw(page, aligned, &mut header)?;
83 let length = u16::from_le_bytes(header) as MSize;
84 if length == 0 {
85 let next_offset = aligned_usize + self.alignment as usize;
87 self.cursor = if next_offset >= self.page_size {
88 self.next_page(page)
89 } else {
90 Some(Cursor {
91 page,
92 offset: next_offset as PageOffset,
93 })
94 };
95 continue;
96 }
97 let body_offset_usize = aligned_usize + RAW_RECORD_HEADER_SIZE as usize;
98 if body_offset_usize + (length as usize) > self.page_size {
99 return Err(MemoryError::DecodeError(DecodeError::TooShort));
100 }
101 let body_offset = body_offset_usize as PageOffset;
102 let mut body = vec![0u8; length as usize];
103 self.mm.read_at_raw(page, body_offset, &mut body)?;
104 let address = RecordAddress {
105 page,
106 offset: aligned,
107 };
108 let next_offset =
110 align_up_usize(body_offset_usize + length as usize, self.alignment as usize);
111 self.cursor = if next_offset >= self.page_size {
112 self.next_page(page)
113 } else {
114 Some(Cursor {
115 page,
116 offset: next_offset as PageOffset,
117 })
118 };
119 return Ok(Some(RawRecordBytes {
120 address,
121 bytes: body,
122 }));
123 }
124 }
125
126 fn next_page(&self, current: Page) -> Option<Cursor> {
127 self.page_ledger
128 .pages()
129 .iter()
130 .find(|p| p.page > current)
131 .map(|p| Cursor {
132 page: p.page,
133 offset: 0,
134 })
135 }
136}
137
138fn align_up_usize(offset: usize, alignment: usize) -> usize {
139 if alignment == 0 {
140 return offset;
141 }
142 let rem = offset % alignment;
143 if rem == 0 {
144 offset
145 } else {
146 offset + (alignment - rem)
147 }
148}
149
150#[cfg(test)]
151mod tests {
152 use super::*;
153 use crate::table_registry::test_utils::{User, write_dummy_schema_snapshot};
154 use crate::{HeapMemoryProvider, MemoryManager, TableRegistry, TableRegistryPage};
155
156 #[test]
157 fn test_insert_raw_round_trips_through_raw_reader() {
158 let mut mm = MemoryManager::init(HeapMemoryProvider::default());
159 let schema_snapshot_page = mm.claim_page().unwrap();
160 let pages_list_page = mm.claim_page().unwrap();
161 let free_segments_page = mm.claim_page().unwrap();
162 let index_registry_page = mm.claim_page().unwrap();
163 write_dummy_schema_snapshot(schema_snapshot_page, &mut mm);
164
165 let mut registry = TableRegistry::load(
166 TableRegistryPage {
167 schema_snapshot_page,
168 pages_list_page,
169 free_segments_page,
170 index_registry_page,
171 autoincrement_registry_page: None,
172 },
173 &mut mm,
174 )
175 .unwrap();
176
177 let alignment = 32u16;
178 let payload = vec![1u8, 2, 3, 4, 5, 6, 7];
179 let address = registry.insert_raw(&payload, alignment, &mut mm).unwrap();
180 let read_back = registry.read_raw_at(address, &mut mm).unwrap();
181 assert_eq!(read_back, payload);
182
183 let mut reader = RawTableReader::new(®istry.page_ledger, alignment, &mut mm);
184 let row = reader.try_next().unwrap().expect("missing row");
185 assert_eq!(row.bytes, payload);
186 assert!(reader.try_next().unwrap().is_none());
187 }
188
189 #[test]
190 fn test_delete_raw_frees_segment() {
191 let mut mm = MemoryManager::init(HeapMemoryProvider::default());
192 let schema_snapshot_page = mm.claim_page().unwrap();
193 let pages_list_page = mm.claim_page().unwrap();
194 let free_segments_page = mm.claim_page().unwrap();
195 let index_registry_page = mm.claim_page().unwrap();
196 write_dummy_schema_snapshot(schema_snapshot_page, &mut mm);
197
198 let mut registry = TableRegistry::load(
199 TableRegistryPage {
200 schema_snapshot_page,
201 pages_list_page,
202 free_segments_page,
203 index_registry_page,
204 autoincrement_registry_page: None,
205 },
206 &mut mm,
207 )
208 .unwrap();
209
210 let alignment = 32u16;
211 let bytes = vec![9u8; 10];
212 let addr = registry.insert_raw(&bytes, alignment, &mut mm).unwrap();
213 registry
214 .delete_raw(addr, bytes.len() as MSize, alignment, &mut mm)
215 .unwrap();
216
217 let mut reader = RawTableReader::new(®istry.page_ledger, alignment, &mut mm);
218 assert!(reader.try_next().unwrap().is_none());
219 }
220
221 #[test]
222 fn test_raw_table_reader_reads_all_records_as_bytes() {
223 use wasm_dbms_api::prelude::Encode;
224
225 let mut mm = MemoryManager::init(HeapMemoryProvider::default());
226 let schema_snapshot_page = mm.claim_page().unwrap();
227 let pages_list_page = mm.claim_page().unwrap();
228 let free_segments_page = mm.claim_page().unwrap();
229 let index_registry_page = mm.claim_page().unwrap();
230 write_dummy_schema_snapshot(schema_snapshot_page, &mut mm);
231
232 let mut registry = TableRegistry::load(
233 TableRegistryPage {
234 schema_snapshot_page,
235 pages_list_page,
236 free_segments_page,
237 index_registry_page,
238 autoincrement_registry_page: None,
239 },
240 &mut mm,
241 )
242 .unwrap();
243
244 for id in 0..3u32 {
245 let user = User {
246 id,
247 name: format!("U{id}"),
248 email: "x@x".into(),
249 age: 20,
250 };
251 registry.insert(user, &mut mm).unwrap();
252 }
253
254 let mut reader = RawTableReader::new(®istry.page_ledger, User::ALIGNMENT, &mut mm);
255 let mut count = 0;
256 while reader.try_next().unwrap().is_some() {
257 count += 1;
258 }
259 assert_eq!(count, 3);
260 }
261}