wasm_dbms_memory/table_registry/
table_reader.rs1use std::marker::PhantomData;
4
5use wasm_dbms_api::prelude::{
6 DecodeError, Encode, MSize, MemoryError, MemoryResult, Page, PageOffset,
7};
8
9use super::page_ledger::PageLedger;
10use super::raw_record::{RAW_RECORD_HEADER_SIZE, RawRecord};
11use crate::{MemoryAccess, align_up};
12
13#[derive(Debug, Copy, Clone, PartialEq, Eq)]
15struct Position {
16 page: Page,
17 offset: PageOffset,
18}
19
20#[derive(Debug, Copy, Clone, PartialEq, Eq)]
23struct FoundRecord {
24 page: Page,
25 offset: PageOffset,
26 length: MSize,
27 new_position: Option<Position>,
28}
29
30#[derive(Debug, Copy, Clone, PartialEq, Eq)]
32pub struct NextRecord<E>
33where
34 E: Encode,
35{
36 pub record: E,
37 pub page: Page,
38 pub offset: PageOffset,
39}
40
41pub struct TableReader<'a, E, MA>
46where
47 E: Encode,
48 MA: MemoryAccess,
49{
50 buffer: Vec<u8>,
52 mm: &'a mut MA,
54 page_ledger: &'a PageLedger,
55 page_size: usize,
56 phantom: PhantomData<E>,
57 position: Option<Position>,
60}
61
62impl<'a, E, MA> TableReader<'a, E, MA>
63where
64 E: Encode,
65 MA: MemoryAccess,
66{
67 pub fn new(page_ledger: &'a PageLedger, mm: &'a mut MA) -> Self {
69 let position = page_ledger.pages().first().map(|page_record| Position {
71 page: page_record.page,
72 offset: 0,
73 });
74 let page_size = mm.page_size() as usize;
75 Self {
76 buffer: vec![0u8; page_size],
77 mm,
78 page_ledger,
79 phantom: PhantomData,
80 position,
81 page_size,
82 }
83 }
84
85 pub fn try_next(&mut self) -> MemoryResult<Option<NextRecord<E>>> {
87 let Some(Position { page, offset }) = self.position else {
88 return Ok(None);
89 };
90
91 let Some(next_record) = self.find_next_record(page, offset)? else {
93 self.position = None;
95 return Ok(None);
96 };
97
98 let record: RawRecord<E> = self.mm.read_at(next_record.page, next_record.offset)?;
100
101 self.position = next_record.new_position;
103
104 Ok(Some(NextRecord {
105 record: record.data,
106 page: next_record.page,
107 offset: next_record.offset,
108 }))
109 }
110
111 fn find_next_record(
116 &mut self,
117 mut page: Page,
118 mut offset: PageOffset,
119 ) -> MemoryResult<Option<FoundRecord>> {
120 loop {
121 if offset == 0 {
123 self.mm.read_at_raw(page, 0, &mut self.buffer)?;
124 }
125
126 if let Some((next_segment_offset, next_segment_size)) =
128 self.find_next_record_position(&self.buffer, offset as usize)?
129 {
130 let new_offset = next_segment_offset + next_segment_size as PageOffset;
132 let new_position = if new_offset as usize >= self.page_size {
133 self.next_page(page)
135 } else {
136 Some(Position {
137 page,
138 offset: new_offset,
139 })
140 };
141 return Ok(Some(FoundRecord {
142 page,
143 offset: next_segment_offset,
144 length: next_segment_size,
145 new_position,
146 }));
147 }
148
149 match self.next_page(page) {
151 Some(pos) => {
152 page = pos.page;
153 offset = pos.offset;
154 }
155 None => break,
156 }
157 }
158
159 Ok(None)
160 }
161
162 fn next_page(&self, current_page: Page) -> Option<Position> {
164 self.page_ledger
165 .pages()
166 .iter()
167 .find(|p| p.page > current_page)
168 .map(|page_record| Position {
169 page: page_record.page,
170 offset: 0,
171 })
172 }
173
174 fn find_next_record_position(
181 &self,
182 buf: &[u8],
183 mut offset: usize,
184 ) -> MemoryResult<Option<(PageOffset, MSize)>> {
185 offset = align_up::<RawRecord<E>>(offset);
187 let mut data_len;
189 loop {
190 if offset + 1 >= buf.len() {
192 return Ok(None);
193 }
194 data_len = u16::from_le_bytes([buf[offset], buf[offset + 1]]) as MSize;
196 if data_len != 0 {
197 break;
198 }
199 offset += E::ALIGNMENT as usize;
201 }
202
203 let data_offset = offset + RAW_RECORD_HEADER_SIZE as usize;
204 if buf.len() < data_offset + data_len as usize {
205 return Err(MemoryError::DecodeError(DecodeError::TooShort));
206 }
207
208 Ok(Some((
209 offset as PageOffset,
210 data_len + RAW_RECORD_HEADER_SIZE,
211 )))
212 }
213}
214
215#[cfg(test)]
216mod tests {
217
218 use super::*;
219 use crate::table_registry::test_utils::User;
220 use crate::{HeapMemoryProvider, MemoryManager, TableRegistry, TableRegistryPage};
221
222 #[test]
223 fn test_should_read_all_records() {
224 const COUNT: u32 = 4_000;
225 let mut mm = MemoryManager::init(HeapMemoryProvider::default());
226 let table_registry = mock_table_registry(COUNT, &mut mm);
227 let mut reader = mocked(&table_registry, &mut mm);
228
229 let mut id = 0;
231 while let Some(NextRecord { record: user, .. }) =
232 reader.try_next().expect("failed to read user")
233 {
234 assert_eq!(user.id, id);
235 assert_eq!(user.name, format!("User {}", id));
236
237 id += 1;
238 }
239 assert_eq!(id, COUNT);
240 }
241
242 #[test]
243 fn test_should_find_next_page() {
244 let mut mm = MemoryManager::init(HeapMemoryProvider::default());
245 let table_registry = mock_table_registry(4_000, &mut mm);
246 let reader = mocked(&table_registry, &mut mm);
247
248 let page = reader.position.expect("should have position").page;
249
250 let next_page = reader.next_page(page).expect("should have next page");
251 assert_eq!(next_page.page, page + 1);
252 let next_page = reader.next_page(next_page.page);
253 assert!(next_page.is_some());
254 let next_page = reader.next_page(next_page.unwrap().page);
255 assert!(next_page.is_some());
256 let next_page = reader.next_page(next_page.unwrap().page);
257 assert!(
258 next_page.is_none(),
259 "should not have next page, but got {:?}",
260 next_page
261 );
262 }
263
264 #[test]
265 fn test_should_find_next_record_position() {
266 let mut mm = MemoryManager::init(HeapMemoryProvider::default());
267 let table_registry = mock_table_registry(1, &mut mm);
268 let reader = mocked(&table_registry, &mut mm);
269
270 let mut buf = vec![0u8; User::ALIGNMENT as usize];
271 buf.extend_from_slice(&[5u8, 0u8, 0u8, 0, 0, 0, 0, 0, 0]);
272
273 let (offset, size) = reader
274 .find_next_record_position(&buf, 0)
275 .expect("failed to get next record")
276 .expect("should have next record");
277
278 assert_eq!(offset, 32);
279 assert_eq!(size, 7);
280 }
281
282 #[test]
283 fn test_should_not_find_next_record_position_none() {
284 let mut mm = MemoryManager::init(HeapMemoryProvider::default());
285 let table_registry = mock_table_registry(1, &mut mm);
286 let reader = mocked(&table_registry, &mut mm);
287
288 let buf = vec![0u8; User::ALIGNMENT as usize * 2];
289 let result = reader
290 .find_next_record_position(&buf, 0)
291 .expect("failed to get next record");
292
293 assert!(result.is_none());
294 }
295
296 #[test]
297 fn test_should_not_find_next_record_position_too_short_for_length() {
298 let mut mm = MemoryManager::init(HeapMemoryProvider::default());
299 let table_registry = mock_table_registry(1, &mut mm);
300 let reader = mocked(&table_registry, &mut mm);
301
302 let buf = [5u8, 16u8];
303 let result = reader.find_next_record_position(&buf, 0);
304 assert!(result.is_err(), "expected error but got {:?}", result);
305 let err = result.unwrap_err();
306
307 assert!(matches!(
308 err,
309 MemoryError::DecodeError(DecodeError::TooShort)
310 ));
311 }
312
313 #[test]
314 fn test_should_not_find_next_record_position_too_short_for_data() {
315 let mut mm = MemoryManager::init(HeapMemoryProvider::default());
316 let table_registry = mock_table_registry(1, &mut mm);
317 let reader = mocked(&table_registry, &mut mm);
318
319 let buf = [5u8, 0u8, 0u8, 0, 0];
320 let result = reader.find_next_record_position(&buf, 0);
321
322 assert!(matches!(
323 result,
324 Err(MemoryError::DecodeError(DecodeError::TooShort))
325 ));
326 }
327
328 fn mock_table_registry(
329 entries: u32,
330 mm: &mut MemoryManager<HeapMemoryProvider>,
331 ) -> TableRegistry {
332 let schema_snapshot_page = mm.claim_page().expect("failed to get page");
333 let page_ledger_page = mm.claim_page().expect("failed to get page");
334 let free_segments_page = mm.claim_page().expect("failed to get page");
335 let index_registry_page = mm.claim_page().expect("failed to get page");
336 super::super::test_utils::write_dummy_schema_snapshot(schema_snapshot_page, mm);
337 let mut registry = TableRegistry::load(
338 TableRegistryPage {
339 schema_snapshot_page,
340 pages_list_page: page_ledger_page,
341 free_segments_page,
342 index_registry_page,
343 autoincrement_registry_page: None,
344 },
345 mm,
346 )
347 .expect("failed to load registry");
348
349 for id in 0..entries {
351 let user = User {
352 id,
353 name: format!("User {}", id),
354 email: "new_user@example.com".to_string(),
355 age: 20 + id,
356 };
357 registry.insert(user, mm).expect("failed to insert user");
358 }
359
360 registry
361 }
362
363 fn mocked<'a>(
364 table_registry: &'a TableRegistry,
365 mm: &'a mut MemoryManager<HeapMemoryProvider>,
366 ) -> TableReader<'a, User, MemoryManager<HeapMemoryProvider>> {
367 TableReader::new(&table_registry.page_ledger, mm)
368 }
369}