use super::*;
use crate::btreemap::allocator::Allocator;
use crate::btreemap::node::v2::{
OVERFLOW_ADDRESS_OFFSET, OVERFLOW_MAGIC, PAGE_OVERFLOW_DATA_OFFSET, PAGE_OVERFLOW_NEXT_OFFSET,
};
use crate::{types::NULL, write_u64};
use std::cmp::min;
pub struct NodeReader<'a, M: Memory> {
pub address: Address,
pub overflows: &'a [Address],
pub page_size: PageSize,
pub memory: &'a M,
}
impl<M: Memory> Memory for NodeReader<'_, M> {
unsafe fn read_unsafe(&self, offset: u64, dst: *mut u8, count: usize) {
if (offset + count as u64) < self.page_size.get() as u64 {
self.memory
.read_unsafe(self.address.get() + offset, dst, count);
return;
}
let iter = NodeIterator::new(
VirtualSegment {
address: Address::from(offset),
length: Bytes::from(count as u64),
},
Bytes::from(self.page_size.get()),
);
let mut position = 0;
for RealSegment {
page_idx,
offset,
length,
} in iter
{
let len = length.get() as usize;
assert!(position + len <= count);
match page_idx {
0 => self
.memory
.read_unsafe((self.address + offset).get(), dst.add(position), len),
_ => self.memory.read_unsafe(
(self.overflows[page_idx - 1] + offset).get(),
dst.add(position),
len,
),
}
position += len;
}
}
#[inline]
fn read(&self, offset: u64, dst: &mut [u8]) {
unsafe { self.read_unsafe(offset, dst.as_mut_ptr(), dst.len()) }
}
fn write(&self, _: u64, _: &[u8]) {
unreachable!("NodeReader does not support write")
}
fn size(&self) -> u64 {
unreachable!("NodeReader does not support size")
}
fn grow(&self, _: u64) -> i64 {
unreachable!("NodeReader does not support grow")
}
}
#[derive(Debug)]
struct VirtualSegment {
address: Address,
length: Bytes,
}
struct RealSegment {
page_idx: usize,
offset: Bytes,
length: Bytes,
}
struct NodeIterator {
virtual_segment: VirtualSegment,
page_size: Bytes,
overflow_page_capacity: Bytes,
}
impl NodeIterator {
#[inline]
fn new(virtual_segment: VirtualSegment, page_size: Bytes) -> Self {
Self {
virtual_segment,
page_size,
overflow_page_capacity: page_size - PAGE_OVERFLOW_DATA_OFFSET,
}
}
}
impl Iterator for NodeIterator {
type Item = RealSegment;
fn next(&mut self) -> Option<Self::Item> {
if self.virtual_segment.length == Bytes::from(0u64) {
return None;
}
let offset = Bytes::from(self.virtual_segment.address.get());
let page_idx = if offset < self.page_size {
0 } else {
((offset - self.page_size) / self.overflow_page_capacity).get() + 1
};
let length = {
let page_end = self.page_size + self.overflow_page_capacity * page_idx;
min(page_end - offset, self.virtual_segment.length)
};
let offset = if offset < self.page_size {
offset
} else {
PAGE_OVERFLOW_DATA_OFFSET + (offset - self.page_size) % self.overflow_page_capacity
};
self.virtual_segment.length -= length;
self.virtual_segment.address += length;
Some(RealSegment {
page_idx: page_idx as usize,
offset,
length,
})
}
}
pub struct NodeWriter<'a, M: Memory> {
address: Address,
page_size: PageSize,
overflows: Vec<Address>,
allocator: &'a mut Allocator<M>,
max_offset: u64,
}
impl<'a, M: Memory> NodeWriter<'a, M> {
pub fn new(
address: Address,
overflows: Vec<Address>,
page_size: PageSize,
allocator: &'a mut Allocator<M>,
) -> Self {
Self {
address,
overflows,
page_size,
allocator,
max_offset: 0,
}
}
pub fn finish(mut self) -> Vec<Address> {
self.deallocate_unused_pages();
self.overflows
}
pub fn memory(&self) -> &M {
self.allocator.memory()
}
pub fn write(&mut self, offset: Address, src: &[u8]) {
let offset = offset.get();
let end_offset = offset + src.len() as u64;
self.max_offset = std::cmp::max(self.max_offset, end_offset);
if end_offset < self.page_size.get() as u64 {
write(self.allocator.memory(), self.address.get() + offset, src);
return;
}
let needed_pages =
compute_num_overflow_pages_needed(end_offset, self.page_size.get() as u64) as usize;
self.overflows
.reserve(needed_pages.saturating_sub(self.overflows.len()));
while self.overflows.len() < needed_pages {
self.allocate_new_page();
}
let iter = NodeIterator::new(
VirtualSegment {
address: Address::from(offset),
length: Bytes::from(src.len() as u64),
},
Bytes::from(self.page_size.get()),
);
let mut position = 0;
for RealSegment {
page_idx,
offset,
length,
} in iter
{
let len = length.get() as usize;
match page_idx {
0 => write(
self.allocator.memory(),
(self.address + offset).get(),
&src[position..position + len],
),
_ => write(
self.allocator.memory(),
(self.overflows[page_idx - 1] + offset).get(),
&src[position..position + len],
),
};
position += len;
}
}
pub fn write_u32(&mut self, offset: Address, val: u32) {
self.write(offset, &val.to_le_bytes());
}
pub fn write_u64(&mut self, offset: Address, val: u64) {
self.write(offset, &val.to_le_bytes());
}
pub fn write_struct<T>(&mut self, t: &T, addr: Address) {
let slice = unsafe {
core::slice::from_raw_parts(t as *const _ as *const u8, core::mem::size_of::<T>())
};
self.write(addr, slice)
}
fn allocate_new_page(&mut self) {
let new_page = self.allocator.allocate();
self.allocator
.memory()
.write(new_page.get(), &OVERFLOW_MAGIC[..]);
write_u64(
self.allocator.memory(),
new_page + PAGE_OVERFLOW_NEXT_OFFSET,
NULL.get(),
);
write_u64(
self.allocator.memory(),
match self.overflows.last() {
Some(prev_overflow) => {
*prev_overflow + PAGE_OVERFLOW_NEXT_OFFSET
}
None => {
self.address + OVERFLOW_ADDRESS_OFFSET
}
},
new_page.get(),
);
self.overflows.push(new_page);
}
fn deallocate_unused_pages(&mut self) {
let overflow_pages_needed =
compute_num_overflow_pages_needed(self.max_offset, self.page_size.get() as u64)
as usize;
assert!(overflow_pages_needed <= self.overflows.len());
while self.overflows.len() > overflow_pages_needed {
self.allocator.deallocate(self.overflows.pop().unwrap());
}
write_u64(
self.allocator.memory(),
match self.overflows.last() {
Some(last_overflow) => {
*last_overflow + PAGE_OVERFLOW_NEXT_OFFSET
}
None => {
self.address + OVERFLOW_ADDRESS_OFFSET
}
},
NULL.get(),
);
}
}
fn compute_num_overflow_pages_needed(size: u64, page_size: u64) -> u64 {
if size > page_size {
let overflow_data_len = size - page_size;
let overflow_page_capacity = page_size - PAGE_OVERFLOW_DATA_OFFSET.get();
overflow_data_len.div_ceil(overflow_page_capacity)
} else {
0
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn num_overflow_pages_no_overflows() {
assert_eq!(compute_num_overflow_pages_needed(0, 100), 0);
}
#[test]
fn num_overflow_pages_exactly_one_page() {
assert_eq!(compute_num_overflow_pages_needed(100, 100), 0);
}
#[test]
fn num_overflow_pages_two_pages() {
assert_eq!(compute_num_overflow_pages_needed(101, 100), 1);
}
#[test]
fn num_overflow_pages_exactly_two_pages() {
assert_eq!(
compute_num_overflow_pages_needed(200 - PAGE_OVERFLOW_DATA_OFFSET.get(), 100),
1
);
}
#[test]
fn num_overflow_pages_just_over_two_pages() {
assert_eq!(
compute_num_overflow_pages_needed(200 - PAGE_OVERFLOW_DATA_OFFSET.get() + 1, 100),
2
);
}
}