Skip to main content

wasm_dbms_memory/table_registry/
raw_table_reader.rs

1// Rust guideline compliant 2026-04-27
2
3//! Raw-bytes table reader.
4//!
5//! Mirrors [`TableReader`](super::TableReader) but yields the encoded record
6//! bytes plus its [`RecordAddress`], so callers can decode via a snapshot-
7//! driven decoder (used by the migration apply pipeline to read records
8//! under the **stored** snapshot independent of the compile-time `T`).
9
10use wasm_dbms_api::prelude::{DecodeError, MSize, MemoryError, MemoryResult, Page, PageOffset};
11
12use super::page_ledger::PageLedger;
13use super::raw_record::RAW_RECORD_HEADER_SIZE;
14use super::record_address::RecordAddress;
15use crate::MemoryAccess;
16
17/// Yielded by [`RawTableReader::try_next`].
18#[derive(Debug, Clone, PartialEq, Eq)]
19pub struct RawRecordBytes {
20    /// Address of the record (page + aligned offset to the length header).
21    pub address: RecordAddress,
22    /// Decoded record body bytes (header stripped).
23    pub bytes: Vec<u8>,
24}
25
26/// Iterator over a table's records as raw bytes. Mirrors the scan logic of
27/// [`TableReader`](super::TableReader) but parameterised over runtime
28/// alignment (the migration codec slices records by snapshot, not by `T`).
29pub struct RawTableReader<'a, MA>
30where
31    MA: MemoryAccess,
32{
33    mm: &'a mut MA,
34    page_ledger: &'a PageLedger,
35    page_size: usize,
36    alignment: PageOffset,
37    cursor: Option<Cursor>,
38}
39
40#[derive(Debug, Copy, Clone)]
41struct Cursor {
42    page: Page,
43    offset: PageOffset,
44}
45
46impl<'a, MA> RawTableReader<'a, MA>
47where
48    MA: MemoryAccess,
49{
50    /// Build a reader. `alignment` must be the table's record alignment as
51    /// stored in the snapshot (matches the on-disk layout written by
52    /// `TableRegistry::insert`).
53    pub fn new(page_ledger: &'a PageLedger, alignment: PageOffset, mm: &'a mut MA) -> Self {
54        let page_size = mm.page_size() as usize;
55        let cursor = page_ledger.pages().first().map(|p| Cursor {
56            page: p.page,
57            offset: 0,
58        });
59        Self {
60            mm,
61            page_ledger,
62            page_size,
63            alignment,
64            cursor,
65        }
66    }
67
68    /// Pop the next live record's bytes, or `Ok(None)` at end of table.
69    pub fn try_next(&mut self) -> MemoryResult<Option<RawRecordBytes>> {
70        loop {
71            let Some(Cursor { page, offset }) = self.cursor else {
72                return Ok(None);
73            };
74            let aligned_usize = align_up_usize(offset as usize, self.alignment as usize);
75            // No room for a header on this page → advance to next page.
76            if aligned_usize + (RAW_RECORD_HEADER_SIZE as usize) > self.page_size {
77                self.cursor = self.next_page(page);
78                continue;
79            }
80            let aligned = aligned_usize as PageOffset;
81            let mut header = [0u8; RAW_RECORD_HEADER_SIZE as usize];
82            self.mm.read_at_raw(page, aligned, &mut header)?;
83            let length = u16::from_le_bytes(header) as MSize;
84            if length == 0 {
85                // Empty slot — skip one alignment.
86                let next_offset = aligned_usize + self.alignment as usize;
87                self.cursor = if next_offset >= self.page_size {
88                    self.next_page(page)
89                } else {
90                    Some(Cursor {
91                        page,
92                        offset: next_offset as PageOffset,
93                    })
94                };
95                continue;
96            }
97            let body_offset_usize = aligned_usize + RAW_RECORD_HEADER_SIZE as usize;
98            if body_offset_usize + (length as usize) > self.page_size {
99                return Err(MemoryError::DecodeError(DecodeError::TooShort));
100            }
101            let body_offset = body_offset_usize as PageOffset;
102            let mut body = vec![0u8; length as usize];
103            self.mm.read_at_raw(page, body_offset, &mut body)?;
104            let address = RecordAddress {
105                page,
106                offset: aligned,
107            };
108            // Advance past the record body, aligned to the next slot.
109            let next_offset =
110                align_up_usize(body_offset_usize + length as usize, self.alignment as usize);
111            self.cursor = if next_offset >= self.page_size {
112                self.next_page(page)
113            } else {
114                Some(Cursor {
115                    page,
116                    offset: next_offset as PageOffset,
117                })
118            };
119            return Ok(Some(RawRecordBytes {
120                address,
121                bytes: body,
122            }));
123        }
124    }
125
126    fn next_page(&self, current: Page) -> Option<Cursor> {
127        self.page_ledger
128            .pages()
129            .iter()
130            .find(|p| p.page > current)
131            .map(|p| Cursor {
132                page: p.page,
133                offset: 0,
134            })
135    }
136}
137
138fn align_up_usize(offset: usize, alignment: usize) -> usize {
139    if alignment == 0 {
140        return offset;
141    }
142    let rem = offset % alignment;
143    if rem == 0 {
144        offset
145    } else {
146        offset + (alignment - rem)
147    }
148}
149
150#[cfg(test)]
151mod tests {
152    use super::*;
153    use crate::table_registry::test_utils::{User, write_dummy_schema_snapshot};
154    use crate::{HeapMemoryProvider, MemoryManager, TableRegistry, TableRegistryPage};
155
156    #[test]
157    fn test_insert_raw_round_trips_through_raw_reader() {
158        let mut mm = MemoryManager::init(HeapMemoryProvider::default());
159        let schema_snapshot_page = mm.claim_page().unwrap();
160        let pages_list_page = mm.claim_page().unwrap();
161        let free_segments_page = mm.claim_page().unwrap();
162        let index_registry_page = mm.claim_page().unwrap();
163        write_dummy_schema_snapshot(schema_snapshot_page, &mut mm);
164
165        let mut registry = TableRegistry::load(
166            TableRegistryPage {
167                schema_snapshot_page,
168                pages_list_page,
169                free_segments_page,
170                index_registry_page,
171                autoincrement_registry_page: None,
172            },
173            &mut mm,
174        )
175        .unwrap();
176
177        let alignment = 32u16;
178        let payload = vec![1u8, 2, 3, 4, 5, 6, 7];
179        let address = registry.insert_raw(&payload, alignment, &mut mm).unwrap();
180        let read_back = registry.read_raw_at(address, &mut mm).unwrap();
181        assert_eq!(read_back, payload);
182
183        let mut reader = RawTableReader::new(&registry.page_ledger, alignment, &mut mm);
184        let row = reader.try_next().unwrap().expect("missing row");
185        assert_eq!(row.bytes, payload);
186        assert!(reader.try_next().unwrap().is_none());
187    }
188
189    #[test]
190    fn test_delete_raw_frees_segment() {
191        let mut mm = MemoryManager::init(HeapMemoryProvider::default());
192        let schema_snapshot_page = mm.claim_page().unwrap();
193        let pages_list_page = mm.claim_page().unwrap();
194        let free_segments_page = mm.claim_page().unwrap();
195        let index_registry_page = mm.claim_page().unwrap();
196        write_dummy_schema_snapshot(schema_snapshot_page, &mut mm);
197
198        let mut registry = TableRegistry::load(
199            TableRegistryPage {
200                schema_snapshot_page,
201                pages_list_page,
202                free_segments_page,
203                index_registry_page,
204                autoincrement_registry_page: None,
205            },
206            &mut mm,
207        )
208        .unwrap();
209
210        let alignment = 32u16;
211        let bytes = vec![9u8; 10];
212        let addr = registry.insert_raw(&bytes, alignment, &mut mm).unwrap();
213        registry
214            .delete_raw(addr, bytes.len() as MSize, alignment, &mut mm)
215            .unwrap();
216
217        let mut reader = RawTableReader::new(&registry.page_ledger, alignment, &mut mm);
218        assert!(reader.try_next().unwrap().is_none());
219    }
220
221    #[test]
222    fn test_raw_table_reader_reads_all_records_as_bytes() {
223        use wasm_dbms_api::prelude::Encode;
224
225        let mut mm = MemoryManager::init(HeapMemoryProvider::default());
226        let schema_snapshot_page = mm.claim_page().unwrap();
227        let pages_list_page = mm.claim_page().unwrap();
228        let free_segments_page = mm.claim_page().unwrap();
229        let index_registry_page = mm.claim_page().unwrap();
230        write_dummy_schema_snapshot(schema_snapshot_page, &mut mm);
231
232        let mut registry = TableRegistry::load(
233            TableRegistryPage {
234                schema_snapshot_page,
235                pages_list_page,
236                free_segments_page,
237                index_registry_page,
238                autoincrement_registry_page: None,
239            },
240            &mut mm,
241        )
242        .unwrap();
243
244        for id in 0..3u32 {
245            let user = User {
246                id,
247                name: format!("U{id}"),
248                email: "x@x".into(),
249                age: 20,
250            };
251            registry.insert(user, &mut mm).unwrap();
252        }
253
254        let mut reader = RawTableReader::new(&registry.page_ledger, User::ALIGNMENT, &mut mm);
255        let mut count = 0;
256        while reader.try_next().unwrap().is_some() {
257            count += 1;
258        }
259        assert_eq!(count, 3);
260    }
261}