Skip to main content

wasm_dbms_memory/table_registry/
table_reader.rs

1// Rust guideline compliant 2026-02-28
2
3use std::marker::PhantomData;
4
5use wasm_dbms_api::prelude::{
6    DecodeError, Encode, MSize, MemoryError, MemoryResult, Page, PageOffset,
7};
8
9use super::page_ledger::PageLedger;
10use super::raw_record::{RAW_RECORD_HEADER_SIZE, RawRecord};
11use crate::{MemoryAccess, align_up};
12
13/// Stores the current position to read/write in memory.
14#[derive(Debug, Copy, Clone, PartialEq, Eq)]
15struct Position {
16    page: Page,
17    offset: PageOffset,
18}
19
20/// Represents the next record to read from memory.
21/// It also contains the new [`Position`] after reading the record.
22#[derive(Debug, Copy, Clone, PartialEq, Eq)]
23struct FoundRecord {
24    page: Page,
25    offset: PageOffset,
26    length: MSize,
27    new_position: Option<Position>,
28}
29
30/// Represents the next record read by the [`TableReader`].
31#[derive(Debug, Copy, Clone, PartialEq, Eq)]
32pub struct NextRecord<E>
33where
34    E: Encode,
35{
36    pub record: E,
37    pub page: Page,
38    pub offset: PageOffset,
39}
40
41/// A reader for the table registry that allows reading records from memory.
42///
43/// The table reader provides methods to read records from the table registry one by one,
44/// using the underlying [`PageLedger`] to locate the records in memory.
45pub struct TableReader<'a, E, MA>
46where
47    E: Encode,
48    MA: MemoryAccess,
49{
50    /// Buffer used to read records from memory.
51    buffer: Vec<u8>,
52    /// Reference to the memory access implementor.
53    mm: &'a mut MA,
54    page_ledger: &'a PageLedger,
55    page_size: usize,
56    phantom: PhantomData<E>,
57    /// Current position in the table registry.
58    /// If `None`, the reader has reached the end of the table.
59    position: Option<Position>,
60}
61
62impl<'a, E, MA> TableReader<'a, E, MA>
63where
64    E: Encode,
65    MA: MemoryAccess,
66{
67    /// Creates a new table reader starting from the beginning of the table registry.
68    pub fn new(page_ledger: &'a PageLedger, mm: &'a mut MA) -> Self {
69        // init position
70        let position = page_ledger.pages().first().map(|page_record| Position {
71            page: page_record.page,
72            offset: 0,
73        });
74        let page_size = mm.page_size() as usize;
75        Self {
76            buffer: vec![0u8; page_size],
77            mm,
78            page_ledger,
79            phantom: PhantomData,
80            position,
81            page_size,
82        }
83    }
84
85    /// Reads the next record from the table registry.
86    pub fn try_next(&mut self) -> MemoryResult<Option<NextRecord<E>>> {
87        let Some(Position { page, offset }) = self.position else {
88            return Ok(None);
89        };
90
91        // find next record segment
92        let Some(next_record) = self.find_next_record(page, offset)? else {
93            // no more records
94            self.position = None;
95            return Ok(None);
96        };
97
98        // read raw record
99        let record: RawRecord<E> = self.mm.read_at(next_record.page, next_record.offset)?;
100
101        // update position
102        self.position = next_record.new_position;
103
104        Ok(Some(NextRecord {
105            record: record.data,
106            page: next_record.page,
107            offset: next_record.offset,
108        }))
109    }
110
111    /// Finds the next record starting from the given position.
112    ///
113    /// If a record is found, returns [`Some<NextRecord<E>>`], otherwise returns [`None`].
114    /// If [`None`] is returned, the reader has reached the end of the table.
115    fn find_next_record(
116        &mut self,
117        mut page: Page,
118        mut offset: PageOffset,
119    ) -> MemoryResult<Option<FoundRecord>> {
120        loop {
121            // if offset is zero, read page; otherwise, just reuse buffer
122            if offset == 0 {
123                self.mm.read_at_raw(page, 0, &mut self.buffer)?;
124            }
125
126            // find next record in buffer; if found, return it
127            if let Some((next_segment_offset, next_segment_size)) =
128                self.find_next_record_position(&self.buffer, offset as usize)?
129            {
130                // found a record; return it
131                let new_offset = next_segment_offset + next_segment_size as PageOffset;
132                let new_position = if new_offset as usize >= self.page_size {
133                    // move to next page
134                    self.next_page(page)
135                } else {
136                    Some(Position {
137                        page,
138                        offset: new_offset,
139                    })
140                };
141                return Ok(Some(FoundRecord {
142                    page,
143                    offset: next_segment_offset,
144                    length: next_segment_size,
145                    new_position,
146                }));
147            }
148
149            // read next page
150            match self.next_page(page) {
151                Some(pos) => {
152                    page = pos.page;
153                    offset = pos.offset;
154                }
155                None => break,
156            }
157        }
158
159        Ok(None)
160    }
161
162    /// Gets the next page after the given current page.
163    fn next_page(&self, current_page: Page) -> Option<Position> {
164        self.page_ledger
165            .pages()
166            .iter()
167            .find(|p| p.page > current_page)
168            .map(|page_record| Position {
169                page: page_record.page,
170                offset: 0,
171            })
172    }
173
174    /// Finds the next record segment position.
175    ///
176    /// This is done by starting from the current offset
177    /// and searching for each multiple of [`E::ALIGNMENT`] until we find a size different from `0x00`.
178    ///
179    /// Returns the offset and size of the next record segment if found.
180    fn find_next_record_position(
181        &self,
182        buf: &[u8],
183        mut offset: usize,
184    ) -> MemoryResult<Option<(PageOffset, MSize)>> {
185        // first round the offset to the next alignment
186        offset = align_up::<RawRecord<E>>(offset);
187        // search for the first non-zero record length
188        let mut data_len;
189        loop {
190            // check whether we are at the end of the page
191            if offset + 1 >= buf.len() {
192                return Ok(None);
193            }
194            // read next two bytes
195            data_len = u16::from_le_bytes([buf[offset], buf[offset + 1]]) as MSize;
196            if data_len != 0 {
197                break;
198            }
199            // move to next alignment
200            offset += E::ALIGNMENT as usize;
201        }
202
203        let data_offset = offset + RAW_RECORD_HEADER_SIZE as usize;
204        if buf.len() < data_offset + data_len as usize {
205            return Err(MemoryError::DecodeError(DecodeError::TooShort));
206        }
207
208        Ok(Some((
209            offset as PageOffset,
210            data_len + RAW_RECORD_HEADER_SIZE,
211        )))
212    }
213}
214
215#[cfg(test)]
216mod tests {
217
218    use super::*;
219    use crate::table_registry::test_utils::User;
220    use crate::{HeapMemoryProvider, MemoryManager, TableRegistry, TableRegistryPage};
221
222    #[test]
223    fn test_should_read_all_records() {
224        const COUNT: u32 = 4_000;
225        let mut mm = MemoryManager::init(HeapMemoryProvider::default());
226        let table_registry = mock_table_registry(COUNT, &mut mm);
227        let mut reader = mocked(&table_registry, &mut mm);
228
229        // should read all records
230        let mut id = 0;
231        while let Some(NextRecord { record: user, .. }) =
232            reader.try_next().expect("failed to read user")
233        {
234            assert_eq!(user.id, id);
235            assert_eq!(user.name, format!("User {}", id));
236
237            id += 1;
238        }
239        assert_eq!(id, COUNT);
240    }
241
242    #[test]
243    fn test_should_find_next_page() {
244        let mut mm = MemoryManager::init(HeapMemoryProvider::default());
245        let table_registry = mock_table_registry(4_000, &mut mm);
246        let reader = mocked(&table_registry, &mut mm);
247
248        let page = reader.position.expect("should have position").page;
249
250        let next_page = reader.next_page(page).expect("should have next page");
251        assert_eq!(next_page.page, page + 1);
252        let next_page = reader.next_page(next_page.page);
253        assert!(next_page.is_some());
254        let next_page = reader.next_page(next_page.unwrap().page);
255        assert!(next_page.is_some());
256        let next_page = reader.next_page(next_page.unwrap().page);
257        assert!(
258            next_page.is_none(),
259            "should not have next page, but got {:?}",
260            next_page
261        );
262    }
263
264    #[test]
265    fn test_should_find_next_record_position() {
266        let mut mm = MemoryManager::init(HeapMemoryProvider::default());
267        let table_registry = mock_table_registry(1, &mut mm);
268        let reader = mocked(&table_registry, &mut mm);
269
270        let mut buf = vec![0u8; User::ALIGNMENT as usize];
271        buf.extend_from_slice(&[5u8, 0u8, 0u8, 0, 0, 0, 0, 0, 0]);
272
273        let (offset, size) = reader
274            .find_next_record_position(&buf, 0)
275            .expect("failed to get next record")
276            .expect("should have next record");
277
278        assert_eq!(offset, 32);
279        assert_eq!(size, 7);
280    }
281
282    #[test]
283    fn test_should_not_find_next_record_position_none() {
284        let mut mm = MemoryManager::init(HeapMemoryProvider::default());
285        let table_registry = mock_table_registry(1, &mut mm);
286        let reader = mocked(&table_registry, &mut mm);
287
288        let buf = vec![0u8; User::ALIGNMENT as usize * 2];
289        let result = reader
290            .find_next_record_position(&buf, 0)
291            .expect("failed to get next record");
292
293        assert!(result.is_none());
294    }
295
296    #[test]
297    fn test_should_not_find_next_record_position_too_short_for_length() {
298        let mut mm = MemoryManager::init(HeapMemoryProvider::default());
299        let table_registry = mock_table_registry(1, &mut mm);
300        let reader = mocked(&table_registry, &mut mm);
301
302        let buf = [5u8, 16u8];
303        let result = reader.find_next_record_position(&buf, 0);
304        assert!(result.is_err(), "expected error but got {:?}", result);
305        let err = result.unwrap_err();
306
307        assert!(matches!(
308            err,
309            MemoryError::DecodeError(DecodeError::TooShort)
310        ));
311    }
312
313    #[test]
314    fn test_should_not_find_next_record_position_too_short_for_data() {
315        let mut mm = MemoryManager::init(HeapMemoryProvider::default());
316        let table_registry = mock_table_registry(1, &mut mm);
317        let reader = mocked(&table_registry, &mut mm);
318
319        let buf = [5u8, 0u8, 0u8, 0, 0];
320        let result = reader.find_next_record_position(&buf, 0);
321
322        assert!(matches!(
323            result,
324            Err(MemoryError::DecodeError(DecodeError::TooShort))
325        ));
326    }
327
328    fn mock_table_registry(
329        entries: u32,
330        mm: &mut MemoryManager<HeapMemoryProvider>,
331    ) -> TableRegistry {
332        let schema_snapshot_page = mm.claim_page().expect("failed to get page");
333        let page_ledger_page = mm.claim_page().expect("failed to get page");
334        let free_segments_page = mm.claim_page().expect("failed to get page");
335        let index_registry_page = mm.claim_page().expect("failed to get page");
336        super::super::test_utils::write_dummy_schema_snapshot(schema_snapshot_page, mm);
337        let mut registry = TableRegistry::load(
338            TableRegistryPage {
339                schema_snapshot_page,
340                pages_list_page: page_ledger_page,
341                free_segments_page,
342                index_registry_page,
343                autoincrement_registry_page: None,
344            },
345            mm,
346        )
347        .expect("failed to load registry");
348
349        // insert `entries` records
350        for id in 0..entries {
351            let user = User {
352                id,
353                name: format!("User {}", id),
354                email: "new_user@example.com".to_string(),
355                age: 20 + id,
356            };
357            registry.insert(user, mm).expect("failed to insert user");
358        }
359
360        registry
361    }
362
363    fn mocked<'a>(
364        table_registry: &'a TableRegistry,
365        mm: &'a mut MemoryManager<HeapMemoryProvider>,
366    ) -> TableReader<'a, User, MemoryManager<HeapMemoryProvider>> {
367        TableReader::new(&table_registry.page_ledger, mm)
368    }
369}