Skip to main content

wasm_dbms_memory/table_registry/
table_reader.rs

1// Rust guideline compliant 2026-02-28
2
3use std::marker::PhantomData;
4
5use wasm_dbms_api::prelude::{
6    DecodeError, Encode, MSize, MemoryError, MemoryResult, Page, PageOffset,
7};
8
9use super::page_ledger::PageLedger;
10use super::raw_record::{RAW_RECORD_HEADER_SIZE, RawRecord};
11use crate::{MemoryAccess, align_up};
12
13/// Stores the current position to read/write in memory.
14#[derive(Debug, Copy, Clone, PartialEq, Eq)]
15struct Position {
16    page: Page,
17    offset: PageOffset,
18    size: u64,
19}
20
21/// Represents the next record to read from memory.
22/// It also contains the new [`Position`] after reading the record.
23#[derive(Debug, Copy, Clone, PartialEq, Eq)]
24struct FoundRecord {
25    page: Page,
26    offset: PageOffset,
27    length: MSize,
28    new_position: Option<Position>,
29}
30
31/// Represents the next record read by the [`TableReader`].
32#[derive(Debug, Copy, Clone, PartialEq, Eq)]
33pub struct NextRecord<E>
34where
35    E: Encode,
36{
37    pub record: E,
38    pub page: Page,
39    pub offset: PageOffset,
40}
41
42/// A reader for the table registry that allows reading records from memory.
43///
44/// The table reader provides methods to read records from the table registry one by one,
45/// using the underlying [`PageLedger`] to locate the records in memory.
46pub struct TableReader<'a, E, MA>
47where
48    E: Encode,
49    MA: MemoryAccess,
50{
51    /// Buffer used to read records from memory.
52    buffer: Vec<u8>,
53    /// Reference to the memory access implementor.
54    mm: &'a MA,
55    page_ledger: &'a PageLedger,
56    page_size: usize,
57    phantom: PhantomData<E>,
58    /// Current position in the table registry.
59    /// If `None`, the reader has reached the end of the table.
60    position: Option<Position>,
61}
62
63impl<'a, E, MA> TableReader<'a, E, MA>
64where
65    E: Encode,
66    MA: MemoryAccess,
67{
68    /// Creates a new table reader starting from the beginning of the table registry.
69    pub fn new(page_ledger: &'a PageLedger, mm: &'a MA) -> Self {
70        // init position
71        let position = page_ledger.pages().first().map(|page_record| Position {
72            page: page_record.page,
73            offset: 0,
74            size: mm.page_size().saturating_sub(page_record.free),
75        });
76        let page_size = mm.page_size() as usize;
77        Self {
78            buffer: vec![0u8; page_size],
79            mm,
80            page_ledger,
81            phantom: PhantomData,
82            position,
83            page_size,
84        }
85    }
86
87    /// Reads the next record from the table registry.
88    pub fn try_next(&mut self) -> MemoryResult<Option<NextRecord<E>>> {
89        let Some(Position { page, offset, size }) = self.position else {
90            return Ok(None);
91        };
92
93        // find next record segment
94        let Some(next_record) = self.find_next_record(page, offset, size)? else {
95            // no more records
96            self.position = None;
97            return Ok(None);
98        };
99
100        // read raw record
101        let record: RawRecord<E> = self.mm.read_at(next_record.page, next_record.offset)?;
102
103        // update position
104        self.position = next_record.new_position;
105
106        Ok(Some(NextRecord {
107            record: record.data,
108            page: next_record.page,
109            offset: next_record.offset,
110        }))
111    }
112
113    /// Finds the next record starting from the given position.
114    ///
115    /// If a record is found, returns [`Some<NextRecord<E>>`], otherwise returns [`None`].
116    /// If [`None`] is returned, the reader has reached the end of the table.
117    fn find_next_record(
118        &mut self,
119        mut page: Page,
120        mut offset: PageOffset,
121        mut page_size: u64,
122    ) -> MemoryResult<Option<FoundRecord>> {
123        loop {
124            // get read_len (cannot read more than page_size)
125            let read_len =
126                std::cmp::min(self.page_size, page_size as usize).saturating_sub(offset as usize);
127            // if offset is zero, read page; otherwise, just reuse buffer
128            if offset == 0 {
129                self.mm.read_at_raw(page, 0, &mut self.buffer[..read_len])?;
130            }
131
132            // find next record in buffer; if found, return it
133            let buf_end = (page_size as usize).max(offset as usize);
134            if let Some((next_segment_offset, next_segment_size)) =
135                self.find_next_record_position(&self.buffer[..buf_end], offset as usize)?
136            {
137                // found a record; return it
138                let new_offset = next_segment_offset + next_segment_size as PageOffset;
139                let new_position = if new_offset as u64 >= page_size {
140                    // move to next page
141                    self.next_page(page)
142                } else {
143                    Some(Position {
144                        page,
145                        offset: new_offset,
146                        size: page_size,
147                    })
148                };
149                return Ok(Some(FoundRecord {
150                    page,
151                    offset: next_segment_offset,
152                    length: next_segment_size,
153                    new_position,
154                }));
155            }
156
157            // read next page
158            match self.next_page(page) {
159                Some(pos) => {
160                    page = pos.page;
161                    offset = pos.offset;
162                    page_size = pos.size;
163                }
164                None => break,
165            }
166        }
167
168        Ok(None)
169    }
170
171    /// Gets the next page after the given current page.
172    fn next_page(&self, current_page: Page) -> Option<Position> {
173        self.page_ledger
174            .pages()
175            .iter()
176            .find(|p| p.page > current_page)
177            .map(|page_record| Position {
178                page: page_record.page,
179                offset: 0,
180                size: (self.page_size as u64).saturating_sub(page_record.free),
181            })
182    }
183
184    /// Finds the next record segment position.
185    ///
186    /// This is done by starting from the current offset
187    /// and searching for each multiple of [`E::ALIGNMENT`] until we find a size different from `0x00`.
188    ///
189    /// Returns the offset and size of the next record segment if found.
190    fn find_next_record_position(
191        &self,
192        buf: &[u8],
193        mut offset: usize,
194    ) -> MemoryResult<Option<(PageOffset, MSize)>> {
195        // first round the offset to the next alignment
196        offset = align_up::<RawRecord<E>>(offset);
197        // search for the first non-zero record length
198        let mut data_len;
199        loop {
200            // check whether we are at the end of the page
201            if offset + 1 >= buf.len() {
202                return Ok(None);
203            }
204            // read next two bytes
205            data_len = u16::from_le_bytes([buf[offset], buf[offset + 1]]) as MSize;
206            if data_len != 0 {
207                break;
208            }
209            // move to next alignment
210            offset += E::ALIGNMENT as usize;
211        }
212
213        let data_offset = offset + RAW_RECORD_HEADER_SIZE as usize;
214        if buf.len() < data_offset + data_len as usize {
215            return Err(MemoryError::DecodeError(DecodeError::TooShort));
216        }
217
218        Ok(Some((
219            offset as PageOffset,
220            data_len + RAW_RECORD_HEADER_SIZE,
221        )))
222    }
223}
224
225#[cfg(test)]
226mod tests {
227
228    use super::*;
229    use crate::table_registry::test_utils::User;
230    use crate::{HeapMemoryProvider, MemoryManager, TableRegistry, TableRegistryPage};
231
232    #[test]
233    fn test_should_read_all_records() {
234        const COUNT: u32 = 4_000;
235        let mut mm = MemoryManager::init(HeapMemoryProvider::default());
236        let table_registry = mock_table_registry(COUNT, &mut mm);
237        let mut reader = mocked(&table_registry, &mm);
238
239        // should read all records
240        let mut id = 0;
241        while let Some(NextRecord { record: user, .. }) =
242            reader.try_next().expect("failed to read user")
243        {
244            assert_eq!(user.id, id);
245            assert_eq!(user.name, format!("User {}", id));
246
247            id += 1;
248        }
249        assert_eq!(id, COUNT);
250    }
251
252    #[test]
253    fn test_should_find_next_page() {
254        let mut mm = MemoryManager::init(HeapMemoryProvider::default());
255        let table_registry = mock_table_registry(4_000, &mut mm);
256        let reader = mocked(&table_registry, &mm);
257
258        let page = reader.position.expect("should have position").page;
259
260        let next_page = reader.next_page(page).expect("should have next page");
261        assert_eq!(next_page.page, page + 1);
262        let next_page = reader.next_page(next_page.page);
263        assert!(next_page.is_some());
264        let next_page = reader.next_page(next_page.unwrap().page);
265        assert!(next_page.is_some());
266        let next_page = reader.next_page(next_page.unwrap().page);
267        assert!(
268            next_page.is_none(),
269            "should not have next page, but got {:?}",
270            next_page
271        );
272    }
273
274    #[test]
275    fn test_should_find_next_record_position() {
276        let mut mm = MemoryManager::init(HeapMemoryProvider::default());
277        let table_registry = mock_table_registry(1, &mut mm);
278        let reader = mocked(&table_registry, &mm);
279
280        let mut buf = vec![0u8; User::ALIGNMENT as usize];
281        buf.extend_from_slice(&[5u8, 0u8, 0u8, 0, 0, 0, 0, 0, 0]);
282
283        let (offset, size) = reader
284            .find_next_record_position(&buf, 0)
285            .expect("failed to get next record")
286            .expect("should have next record");
287
288        assert_eq!(offset, 32);
289        assert_eq!(size, 7);
290    }
291
292    #[test]
293    fn test_should_not_find_next_record_position_none() {
294        let mut mm = MemoryManager::init(HeapMemoryProvider::default());
295        let table_registry = mock_table_registry(1, &mut mm);
296        let reader = mocked(&table_registry, &mm);
297
298        let buf = vec![0u8; User::ALIGNMENT as usize * 2];
299        let result = reader
300            .find_next_record_position(&buf, 0)
301            .expect("failed to get next record");
302
303        assert!(result.is_none());
304    }
305
306    #[test]
307    fn test_should_not_find_next_record_position_too_short_for_length() {
308        let mut mm = MemoryManager::init(HeapMemoryProvider::default());
309        let table_registry = mock_table_registry(1, &mut mm);
310        let reader = mocked(&table_registry, &mm);
311
312        let buf = [5u8, 16u8];
313        let result = reader.find_next_record_position(&buf, 0);
314        assert!(result.is_err(), "expected error but got {:?}", result);
315        let err = result.unwrap_err();
316
317        assert!(matches!(
318            err,
319            MemoryError::DecodeError(DecodeError::TooShort)
320        ));
321    }
322
323    #[test]
324    fn test_should_not_find_next_record_position_too_short_for_data() {
325        let mut mm = MemoryManager::init(HeapMemoryProvider::default());
326        let table_registry = mock_table_registry(1, &mut mm);
327        let reader = mocked(&table_registry, &mm);
328
329        let buf = [5u8, 0u8, 0u8, 0, 0];
330        let result = reader.find_next_record_position(&buf, 0);
331
332        assert!(matches!(
333            result,
334            Err(MemoryError::DecodeError(DecodeError::TooShort))
335        ));
336    }
337
338    fn mock_table_registry(
339        entries: u32,
340        mm: &mut MemoryManager<HeapMemoryProvider>,
341    ) -> TableRegistry {
342        let page_ledger_page = mm.allocate_page().expect("failed to get page");
343        let free_segments_page = mm.allocate_page().expect("failed to get page");
344        let mut registry = TableRegistry::load(
345            TableRegistryPage {
346                pages_list_page: page_ledger_page,
347                free_segments_page,
348            },
349            mm,
350        )
351        .expect("failed to load registry");
352
353        // insert `entries` records
354        for id in 0..entries {
355            let user = User {
356                id,
357                name: format!("User {}", id),
358                email: "new_user@example.com".to_string(),
359                age: 20 + id,
360            };
361            registry.insert(user, mm).expect("failed to insert user");
362        }
363
364        registry
365    }
366
367    fn mocked<'a>(
368        table_registry: &'a TableRegistry,
369        mm: &'a MemoryManager<HeapMemoryProvider>,
370    ) -> TableReader<'a, User, MemoryManager<HeapMemoryProvider>> {
371        TableReader::new(&table_registry.page_ledger, mm)
372    }
373}