use std::marker::PhantomData;
use wasm_dbms_api::prelude::{
DecodeError, Encode, MSize, MemoryError, MemoryResult, Page, PageOffset,
};
use super::page_ledger::PageLedger;
use super::raw_record::{RAW_RECORD_HEADER_SIZE, RawRecord};
use crate::{MemoryAccess, align_up};
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
struct Position {
page: Page,
offset: PageOffset,
size: u64,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
struct FoundRecord {
page: Page,
offset: PageOffset,
length: MSize,
new_position: Option<Position>,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct NextRecord<E>
where
E: Encode,
{
pub record: E,
pub page: Page,
pub offset: PageOffset,
}
pub struct TableReader<'a, E, MA>
where
E: Encode,
MA: MemoryAccess,
{
buffer: Vec<u8>,
mm: &'a mut MA,
page_ledger: &'a PageLedger,
page_size: usize,
phantom: PhantomData<E>,
position: Option<Position>,
}
impl<'a, E, MA> TableReader<'a, E, MA>
where
E: Encode,
MA: MemoryAccess,
{
pub fn new(page_ledger: &'a PageLedger, mm: &'a mut MA) -> Self {
let position = page_ledger.pages().first().map(|page_record| Position {
page: page_record.page,
offset: 0,
size: mm.page_size().saturating_sub(page_record.free),
});
let page_size = mm.page_size() as usize;
Self {
buffer: vec![0u8; page_size],
mm,
page_ledger,
phantom: PhantomData,
position,
page_size,
}
}
pub fn try_next(&mut self) -> MemoryResult<Option<NextRecord<E>>> {
let Some(Position { page, offset, size }) = self.position else {
return Ok(None);
};
let Some(next_record) = self.find_next_record(page, offset, size)? else {
self.position = None;
return Ok(None);
};
let record: RawRecord<E> = self.mm.read_at(next_record.page, next_record.offset)?;
self.position = next_record.new_position;
Ok(Some(NextRecord {
record: record.data,
page: next_record.page,
offset: next_record.offset,
}))
}
fn find_next_record(
&mut self,
mut page: Page,
mut offset: PageOffset,
mut page_size: u64,
) -> MemoryResult<Option<FoundRecord>> {
loop {
let read_len =
std::cmp::min(self.page_size, page_size as usize).saturating_sub(offset as usize);
if offset == 0 {
self.mm.read_at_raw(page, 0, &mut self.buffer[..read_len])?;
}
let buf_end = (page_size as usize).max(offset as usize);
if let Some((next_segment_offset, next_segment_size)) =
self.find_next_record_position(&self.buffer[..buf_end], offset as usize)?
{
let new_offset = next_segment_offset + next_segment_size as PageOffset;
let new_position = if new_offset as u64 >= page_size {
self.next_page(page)
} else {
Some(Position {
page,
offset: new_offset,
size: page_size,
})
};
return Ok(Some(FoundRecord {
page,
offset: next_segment_offset,
length: next_segment_size,
new_position,
}));
}
match self.next_page(page) {
Some(pos) => {
page = pos.page;
offset = pos.offset;
page_size = pos.size;
}
None => break,
}
}
Ok(None)
}
fn next_page(&self, current_page: Page) -> Option<Position> {
self.page_ledger
.pages()
.iter()
.find(|p| p.page > current_page)
.map(|page_record| Position {
page: page_record.page,
offset: 0,
size: (self.page_size as u64).saturating_sub(page_record.free),
})
}
fn find_next_record_position(
&self,
buf: &[u8],
mut offset: usize,
) -> MemoryResult<Option<(PageOffset, MSize)>> {
offset = align_up::<RawRecord<E>>(offset);
let mut data_len;
loop {
if offset + 1 >= buf.len() {
return Ok(None);
}
data_len = u16::from_le_bytes([buf[offset], buf[offset + 1]]) as MSize;
if data_len != 0 {
break;
}
offset += E::ALIGNMENT as usize;
}
let data_offset = offset + RAW_RECORD_HEADER_SIZE as usize;
if buf.len() < data_offset + data_len as usize {
return Err(MemoryError::DecodeError(DecodeError::TooShort));
}
Ok(Some((
offset as PageOffset,
data_len + RAW_RECORD_HEADER_SIZE,
)))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::table_registry::test_utils::User;
use crate::{HeapMemoryProvider, MemoryManager, TableRegistry, TableRegistryPage};
#[test]
fn test_should_read_all_records() {
const COUNT: u32 = 4_000;
let mut mm = MemoryManager::init(HeapMemoryProvider::default());
let table_registry = mock_table_registry(COUNT, &mut mm);
let mut reader = mocked(&table_registry, &mut mm);
let mut id = 0;
while let Some(NextRecord { record: user, .. }) =
reader.try_next().expect("failed to read user")
{
assert_eq!(user.id, id);
assert_eq!(user.name, format!("User {}", id));
id += 1;
}
assert_eq!(id, COUNT);
}
#[test]
fn test_should_find_next_page() {
let mut mm = MemoryManager::init(HeapMemoryProvider::default());
let table_registry = mock_table_registry(4_000, &mut mm);
let reader = mocked(&table_registry, &mut mm);
let page = reader.position.expect("should have position").page;
let next_page = reader.next_page(page).expect("should have next page");
assert_eq!(next_page.page, page + 1);
let next_page = reader.next_page(next_page.page);
assert!(next_page.is_some());
let next_page = reader.next_page(next_page.unwrap().page);
assert!(next_page.is_some());
let next_page = reader.next_page(next_page.unwrap().page);
assert!(
next_page.is_none(),
"should not have next page, but got {:?}",
next_page
);
}
#[test]
fn test_should_find_next_record_position() {
let mut mm = MemoryManager::init(HeapMemoryProvider::default());
let table_registry = mock_table_registry(1, &mut mm);
let reader = mocked(&table_registry, &mut mm);
let mut buf = vec![0u8; User::ALIGNMENT as usize];
buf.extend_from_slice(&[5u8, 0u8, 0u8, 0, 0, 0, 0, 0, 0]);
let (offset, size) = reader
.find_next_record_position(&buf, 0)
.expect("failed to get next record")
.expect("should have next record");
assert_eq!(offset, 32);
assert_eq!(size, 7);
}
#[test]
fn test_should_not_find_next_record_position_none() {
let mut mm = MemoryManager::init(HeapMemoryProvider::default());
let table_registry = mock_table_registry(1, &mut mm);
let reader = mocked(&table_registry, &mut mm);
let buf = vec![0u8; User::ALIGNMENT as usize * 2];
let result = reader
.find_next_record_position(&buf, 0)
.expect("failed to get next record");
assert!(result.is_none());
}
#[test]
fn test_should_not_find_next_record_position_too_short_for_length() {
let mut mm = MemoryManager::init(HeapMemoryProvider::default());
let table_registry = mock_table_registry(1, &mut mm);
let reader = mocked(&table_registry, &mut mm);
let buf = [5u8, 16u8];
let result = reader.find_next_record_position(&buf, 0);
assert!(result.is_err(), "expected error but got {:?}", result);
let err = result.unwrap_err();
assert!(matches!(
err,
MemoryError::DecodeError(DecodeError::TooShort)
));
}
#[test]
fn test_should_not_find_next_record_position_too_short_for_data() {
let mut mm = MemoryManager::init(HeapMemoryProvider::default());
let table_registry = mock_table_registry(1, &mut mm);
let reader = mocked(&table_registry, &mut mm);
let buf = [5u8, 0u8, 0u8, 0, 0];
let result = reader.find_next_record_position(&buf, 0);
assert!(matches!(
result,
Err(MemoryError::DecodeError(DecodeError::TooShort))
));
}
fn mock_table_registry(
entries: u32,
mm: &mut MemoryManager<HeapMemoryProvider>,
) -> TableRegistry {
let page_ledger_page = mm.allocate_page().expect("failed to get page");
let free_segments_page = mm.allocate_page().expect("failed to get page");
let index_registry_page = mm.allocate_page().expect("failed to get page");
let mut registry = TableRegistry::load(
TableRegistryPage {
pages_list_page: page_ledger_page,
free_segments_page,
index_registry_page,
autoincrement_registry_page: None,
},
mm,
)
.expect("failed to load registry");
for id in 0..entries {
let user = User {
id,
name: format!("User {}", id),
email: "new_user@example.com".to_string(),
age: 20 + id,
};
registry.insert(user, mm).expect("failed to insert user");
}
registry
}
fn mocked<'a>(
table_registry: &'a TableRegistry,
mm: &'a mut MemoryManager<HeapMemoryProvider>,
) -> TableReader<'a, User, MemoryManager<HeapMemoryProvider>> {
TableReader::new(&table_registry.page_ledger, mm)
}
}