wasm_dbms_memory/table_registry/
table_reader.rs1use std::marker::PhantomData;
4
5use wasm_dbms_api::prelude::{
6 DecodeError, Encode, MSize, MemoryError, MemoryResult, Page, PageOffset,
7};
8
9use super::page_ledger::PageLedger;
10use super::raw_record::{RAW_RECORD_HEADER_SIZE, RawRecord};
11use crate::{MemoryAccess, align_up};
12
13#[derive(Debug, Copy, Clone, PartialEq, Eq)]
15struct Position {
16 page: Page,
17 offset: PageOffset,
18 size: u64,
19}
20
21#[derive(Debug, Copy, Clone, PartialEq, Eq)]
24struct FoundRecord {
25 page: Page,
26 offset: PageOffset,
27 length: MSize,
28 new_position: Option<Position>,
29}
30
31#[derive(Debug, Copy, Clone, PartialEq, Eq)]
33pub struct NextRecord<E>
34where
35 E: Encode,
36{
37 pub record: E,
38 pub page: Page,
39 pub offset: PageOffset,
40}
41
42pub struct TableReader<'a, E, MA>
47where
48 E: Encode,
49 MA: MemoryAccess,
50{
51 buffer: Vec<u8>,
53 mm: &'a MA,
55 page_ledger: &'a PageLedger,
56 page_size: usize,
57 phantom: PhantomData<E>,
58 position: Option<Position>,
61}
62
63impl<'a, E, MA> TableReader<'a, E, MA>
64where
65 E: Encode,
66 MA: MemoryAccess,
67{
68 pub fn new(page_ledger: &'a PageLedger, mm: &'a MA) -> Self {
70 let position = page_ledger.pages().first().map(|page_record| Position {
72 page: page_record.page,
73 offset: 0,
74 size: mm.page_size().saturating_sub(page_record.free),
75 });
76 let page_size = mm.page_size() as usize;
77 Self {
78 buffer: vec![0u8; page_size],
79 mm,
80 page_ledger,
81 phantom: PhantomData,
82 position,
83 page_size,
84 }
85 }
86
87 pub fn try_next(&mut self) -> MemoryResult<Option<NextRecord<E>>> {
89 let Some(Position { page, offset, size }) = self.position else {
90 return Ok(None);
91 };
92
93 let Some(next_record) = self.find_next_record(page, offset, size)? else {
95 self.position = None;
97 return Ok(None);
98 };
99
100 let record: RawRecord<E> = self.mm.read_at(next_record.page, next_record.offset)?;
102
103 self.position = next_record.new_position;
105
106 Ok(Some(NextRecord {
107 record: record.data,
108 page: next_record.page,
109 offset: next_record.offset,
110 }))
111 }
112
113 fn find_next_record(
118 &mut self,
119 mut page: Page,
120 mut offset: PageOffset,
121 mut page_size: u64,
122 ) -> MemoryResult<Option<FoundRecord>> {
123 loop {
124 let read_len =
126 std::cmp::min(self.page_size, page_size as usize).saturating_sub(offset as usize);
127 if offset == 0 {
129 self.mm.read_at_raw(page, 0, &mut self.buffer[..read_len])?;
130 }
131
132 let buf_end = (page_size as usize).max(offset as usize);
134 if let Some((next_segment_offset, next_segment_size)) =
135 self.find_next_record_position(&self.buffer[..buf_end], offset as usize)?
136 {
137 let new_offset = next_segment_offset + next_segment_size as PageOffset;
139 let new_position = if new_offset as u64 >= page_size {
140 self.next_page(page)
142 } else {
143 Some(Position {
144 page,
145 offset: new_offset,
146 size: page_size,
147 })
148 };
149 return Ok(Some(FoundRecord {
150 page,
151 offset: next_segment_offset,
152 length: next_segment_size,
153 new_position,
154 }));
155 }
156
157 match self.next_page(page) {
159 Some(pos) => {
160 page = pos.page;
161 offset = pos.offset;
162 page_size = pos.size;
163 }
164 None => break,
165 }
166 }
167
168 Ok(None)
169 }
170
171 fn next_page(&self, current_page: Page) -> Option<Position> {
173 self.page_ledger
174 .pages()
175 .iter()
176 .find(|p| p.page > current_page)
177 .map(|page_record| Position {
178 page: page_record.page,
179 offset: 0,
180 size: (self.page_size as u64).saturating_sub(page_record.free),
181 })
182 }
183
184 fn find_next_record_position(
191 &self,
192 buf: &[u8],
193 mut offset: usize,
194 ) -> MemoryResult<Option<(PageOffset, MSize)>> {
195 offset = align_up::<RawRecord<E>>(offset);
197 let mut data_len;
199 loop {
200 if offset + 1 >= buf.len() {
202 return Ok(None);
203 }
204 data_len = u16::from_le_bytes([buf[offset], buf[offset + 1]]) as MSize;
206 if data_len != 0 {
207 break;
208 }
209 offset += E::ALIGNMENT as usize;
211 }
212
213 let data_offset = offset + RAW_RECORD_HEADER_SIZE as usize;
214 if buf.len() < data_offset + data_len as usize {
215 return Err(MemoryError::DecodeError(DecodeError::TooShort));
216 }
217
218 Ok(Some((
219 offset as PageOffset,
220 data_len + RAW_RECORD_HEADER_SIZE,
221 )))
222 }
223}
224
225#[cfg(test)]
226mod tests {
227
228 use super::*;
229 use crate::table_registry::test_utils::User;
230 use crate::{HeapMemoryProvider, MemoryManager, TableRegistry, TableRegistryPage};
231
232 #[test]
233 fn test_should_read_all_records() {
234 const COUNT: u32 = 4_000;
235 let mut mm = MemoryManager::init(HeapMemoryProvider::default());
236 let table_registry = mock_table_registry(COUNT, &mut mm);
237 let mut reader = mocked(&table_registry, &mm);
238
239 let mut id = 0;
241 while let Some(NextRecord { record: user, .. }) =
242 reader.try_next().expect("failed to read user")
243 {
244 assert_eq!(user.id, id);
245 assert_eq!(user.name, format!("User {}", id));
246
247 id += 1;
248 }
249 assert_eq!(id, COUNT);
250 }
251
252 #[test]
253 fn test_should_find_next_page() {
254 let mut mm = MemoryManager::init(HeapMemoryProvider::default());
255 let table_registry = mock_table_registry(4_000, &mut mm);
256 let reader = mocked(&table_registry, &mm);
257
258 let page = reader.position.expect("should have position").page;
259
260 let next_page = reader.next_page(page).expect("should have next page");
261 assert_eq!(next_page.page, page + 1);
262 let next_page = reader.next_page(next_page.page);
263 assert!(next_page.is_some());
264 let next_page = reader.next_page(next_page.unwrap().page);
265 assert!(next_page.is_some());
266 let next_page = reader.next_page(next_page.unwrap().page);
267 assert!(
268 next_page.is_none(),
269 "should not have next page, but got {:?}",
270 next_page
271 );
272 }
273
274 #[test]
275 fn test_should_find_next_record_position() {
276 let mut mm = MemoryManager::init(HeapMemoryProvider::default());
277 let table_registry = mock_table_registry(1, &mut mm);
278 let reader = mocked(&table_registry, &mm);
279
280 let mut buf = vec![0u8; User::ALIGNMENT as usize];
281 buf.extend_from_slice(&[5u8, 0u8, 0u8, 0, 0, 0, 0, 0, 0]);
282
283 let (offset, size) = reader
284 .find_next_record_position(&buf, 0)
285 .expect("failed to get next record")
286 .expect("should have next record");
287
288 assert_eq!(offset, 32);
289 assert_eq!(size, 7);
290 }
291
292 #[test]
293 fn test_should_not_find_next_record_position_none() {
294 let mut mm = MemoryManager::init(HeapMemoryProvider::default());
295 let table_registry = mock_table_registry(1, &mut mm);
296 let reader = mocked(&table_registry, &mm);
297
298 let buf = vec![0u8; User::ALIGNMENT as usize * 2];
299 let result = reader
300 .find_next_record_position(&buf, 0)
301 .expect("failed to get next record");
302
303 assert!(result.is_none());
304 }
305
306 #[test]
307 fn test_should_not_find_next_record_position_too_short_for_length() {
308 let mut mm = MemoryManager::init(HeapMemoryProvider::default());
309 let table_registry = mock_table_registry(1, &mut mm);
310 let reader = mocked(&table_registry, &mm);
311
312 let buf = [5u8, 16u8];
313 let result = reader.find_next_record_position(&buf, 0);
314 assert!(result.is_err(), "expected error but got {:?}", result);
315 let err = result.unwrap_err();
316
317 assert!(matches!(
318 err,
319 MemoryError::DecodeError(DecodeError::TooShort)
320 ));
321 }
322
323 #[test]
324 fn test_should_not_find_next_record_position_too_short_for_data() {
325 let mut mm = MemoryManager::init(HeapMemoryProvider::default());
326 let table_registry = mock_table_registry(1, &mut mm);
327 let reader = mocked(&table_registry, &mm);
328
329 let buf = [5u8, 0u8, 0u8, 0, 0];
330 let result = reader.find_next_record_position(&buf, 0);
331
332 assert!(matches!(
333 result,
334 Err(MemoryError::DecodeError(DecodeError::TooShort))
335 ));
336 }
337
338 fn mock_table_registry(
339 entries: u32,
340 mm: &mut MemoryManager<HeapMemoryProvider>,
341 ) -> TableRegistry {
342 let page_ledger_page = mm.allocate_page().expect("failed to get page");
343 let free_segments_page = mm.allocate_page().expect("failed to get page");
344 let mut registry = TableRegistry::load(
345 TableRegistryPage {
346 pages_list_page: page_ledger_page,
347 free_segments_page,
348 },
349 mm,
350 )
351 .expect("failed to load registry");
352
353 for id in 0..entries {
355 let user = User {
356 id,
357 name: format!("User {}", id),
358 email: "new_user@example.com".to_string(),
359 age: 20 + id,
360 };
361 registry.insert(user, mm).expect("failed to insert user");
362 }
363
364 registry
365 }
366
367 fn mocked<'a>(
368 table_registry: &'a TableRegistry,
369 mm: &'a MemoryManager<HeapMemoryProvider>,
370 ) -> TableReader<'a, User, MemoryManager<HeapMemoryProvider>> {
371 TableReader::new(&table_registry.page_ledger, mm)
372 }
373}