use std::{
borrow::Borrow,
cmp::Ordering,
marker::PhantomData,
mem,
ops::{Deref, Range},
slice,
};
use super::{
codec::*, data::*, ItemIter, PageBuf, PageBuilder, PageKind, PageRef, PageTier,
RewindableIterator, SeekableIterator, SliceIter,
};
pub(crate) struct SortedPageBuilder<I> {
base: PageBuilder,
iter: Option<I>,
num_items: usize,
content_size: usize,
}
impl<I, K, V> SortedPageBuilder<I>
where
I: RewindableIterator<Item = (K, V)>,
K: SortedPageKey,
V: SortedPageValue,
{
pub(crate) fn new(tier: PageTier, kind: PageKind) -> Self {
Self {
base: PageBuilder::new(tier, kind),
iter: None,
num_items: 0,
content_size: 0,
}
}
pub(crate) fn with_iter(mut self, mut iter: I) -> Self {
for (k, v) in &mut iter {
self.num_items += 1;
self.content_size += k.encode_size() + v.encode_size();
}
self.content_size += self.num_items * mem::size_of::<u32>();
assert!(self.content_size <= u32::MAX as usize);
self.iter = Some(iter);
self
}
pub(crate) fn size(&self) -> usize {
self.base.size(self.content_size)
}
pub(crate) fn build(mut self, page: &mut PageBuf<'_>) {
assert!(page.size() >= self.size());
self.base.build(page);
if let Some(iter) = self.iter.as_mut() {
unsafe {
let mut buf = SortedPageBuf::new(page.content_mut(), self.num_items);
iter.rewind();
for (k, v) in iter {
buf.add(k, v);
}
}
}
}
}
impl<K, V> SortedPageBuilder<ItemIter<(K, V)>>
where
K: SortedPageKey,
V: SortedPageValue,
{
pub(crate) fn with_item(self, item: (K, V)) -> Self {
self.with_iter(ItemIter::new(item))
}
}
impl<'a, K, V> SortedPageBuilder<SliceIter<'a, (K, V)>>
where
K: SortedPageKey,
V: SortedPageValue,
{
pub(crate) fn with_slice(self, slice: &'a [(K, V)]) -> Self {
self.with_iter(SliceIter::new(slice))
}
}
struct SortedPageBuf<K, V> {
offsets: Encoder,
payload: Encoder,
_marker: PhantomData<(K, V)>,
}
impl<K, V> SortedPageBuf<K, V>
where
K: SortedPageKey,
V: SortedPageValue,
{
unsafe fn new(content: &mut [u8], num_items: usize) -> Self {
let offsets_size = num_items * mem::size_of::<u32>();
let (offsets, payload) = content.split_at_mut(offsets_size);
Self {
offsets: Encoder::new(offsets),
payload: Encoder::new(payload),
_marker: PhantomData,
}
}
unsafe fn add(&mut self, key: K, value: V) {
let offset = self.offsets.len() + self.payload.offset();
self.offsets.put_u32(offset as u32);
key.encode_to(&mut self.payload);
value.encode_to(&mut self.payload);
}
}
#[derive(Clone)]
pub(crate) struct SortedPageRef<'a, K, V> {
page: PageRef<'a>,
content: &'a [u8],
offsets: &'a [u32],
_marker: PhantomData<(K, V)>,
}
impl<'a, K, V> SortedPageRef<'a, K, V>
where
K: SortedPageKey,
V: SortedPageValue,
{
pub(crate) fn new(page: PageRef<'a>) -> Self {
let content = page.content();
let offsets = unsafe {
let ptr = content.as_ptr() as *const u32;
let len = if content.is_empty() {
0
} else {
let size = u32::from_le(ptr.read());
size as usize / mem::size_of::<u32>()
};
slice::from_raw_parts(ptr, len)
};
Self {
page,
content,
offsets,
_marker: PhantomData,
}
}
pub(crate) fn len(&self) -> usize {
self.offsets.len()
}
pub(crate) fn get(&self, index: usize) -> Option<(K, V)> {
if let Some(item) = self.item(index) {
let mut dec = Decoder::new(item);
unsafe {
let k = K::decode_from(&mut dec);
let v = V::decode_from(&mut dec);
Some((k, v))
}
} else {
None
}
}
pub(crate) fn rank<Q: ?Sized>(&self, target: &Q) -> Result<usize, usize>
where
K: Borrow<Q>,
Q: Ord,
{
let mut left = 0;
let mut right = self.len();
while left < right {
let mid = (left + right) / 2;
let key = unsafe {
let item = self.item(mid).unwrap();
let mut dec = Decoder::new(item);
K::decode_from(&mut dec)
};
match key.borrow().cmp(target) {
Ordering::Less => left = mid + 1,
Ordering::Greater => right = mid,
Ordering::Equal => return Ok(mid),
}
}
Err(left)
}
#[allow(clippy::type_complexity)]
pub(crate) fn into_split_iter(
self,
) -> Option<(
K,
SortedPageRangeIter<'a, K, V>,
SortedPageRangeIter<'a, K, V>,
)> {
let len = self.len();
if let Some((mid, _)) = self.get(len / 2) {
let sep = mid.as_split_separator();
let index = match self.rank(&sep) {
Ok(i) => i,
Err(i) => i,
};
if index > 0 {
let left_iter = SortedPageRangeIter::new(self.clone(), 0..index);
let right_iter = SortedPageRangeIter::new(self, index..len);
return Some((sep, left_iter, right_iter));
}
}
None
}
fn item(&self, index: usize) -> Option<&[u8]> {
if let Some(offset) = self.item_offset(index) {
let next_offset = self.item_offset(index + 1).unwrap_or(self.content.len());
Some(&self.content[offset..next_offset])
} else {
None
}
}
fn item_offset(&self, index: usize) -> Option<usize> {
self.offsets.get(index).map(|v| u32::from_le(*v) as usize)
}
}
impl<'a, K, V> Deref for SortedPageRef<'a, K, V> {
type Target = PageRef<'a>;
fn deref(&self) -> &Self::Target {
&self.page
}
}
impl<'a, K, V, T> From<T> for SortedPageRef<'a, K, V>
where
K: SortedPageKey,
V: SortedPageValue,
T: Into<PageRef<'a>>,
{
fn from(page: T) -> Self {
Self::new(page.into())
}
}
#[derive(Clone)]
pub(crate) struct SortedPageIter<'a, K, V> {
page: SortedPageRef<'a, K, V>,
next: usize,
}
impl<'a, K, V> SortedPageIter<'a, K, V> {
pub(crate) fn new(page: SortedPageRef<'a, K, V>) -> Self {
Self { page, next: 0 }
}
}
impl<'a, K, V, T> From<T> for SortedPageIter<'a, K, V>
where
K: SortedPageKey,
V: SortedPageValue,
T: Into<SortedPageRef<'a, K, V>>,
{
fn from(page: T) -> Self {
Self::new(page.into())
}
}
impl<'a, K, V> Iterator for SortedPageIter<'a, K, V>
where
K: SortedPageKey,
V: SortedPageValue,
{
type Item = (K, V);
fn next(&mut self) -> Option<Self::Item> {
if let Some(item) = self.page.get(self.next) {
self.next += 1;
Some(item)
} else {
None
}
}
}
impl<'a, K, V> RewindableIterator for SortedPageIter<'a, K, V>
where
K: SortedPageKey,
V: SortedPageValue,
{
fn rewind(&mut self) {
self.next = 0;
}
}
impl<'a, V> SeekableIterator<Key<'_>> for SortedPageIter<'a, Key<'_>, V>
where
V: SortedPageValue,
{
fn seek(&mut self, target: &Key<'_>) -> bool {
match self.page.rank(target) {
Ok(i) => {
self.next = i;
true
}
Err(i) => {
self.next = i;
false
}
}
}
}
impl<'a, V> SeekableIterator<[u8]> for SortedPageIter<'a, &'a [u8], V>
where
V: SortedPageValue,
{
fn seek(&mut self, target: &[u8]) -> bool {
match self.page.rank(target) {
Ok(i) => {
self.next = i;
true
}
Err(i) => {
self.next = i;
false
}
}
}
}
#[derive(Clone)]
pub(crate) struct SortedPageRangeIter<'a, K, V> {
page: SortedPageRef<'a, K, V>,
range: Range<usize>,
index: usize,
}
impl<'a, K, V> SortedPageRangeIter<'a, K, V> {
pub(crate) fn new(page: SortedPageRef<'a, K, V>, range: Range<usize>) -> Self {
let index = range.start;
Self { page, range, index }
}
}
impl<'a, K, V> Iterator for SortedPageRangeIter<'a, K, V>
where
K: SortedPageKey,
V: SortedPageValue,
{
type Item = (K, V);
fn next(&mut self) -> Option<Self::Item> {
if self.index < self.range.end {
if let Some(item) = self.page.get(self.index) {
self.index += 1;
return Some(item);
}
}
None
}
}
impl<'a, K, V> RewindableIterator for SortedPageRangeIter<'a, K, V>
where
K: SortedPageKey,
V: SortedPageValue,
{
fn rewind(&mut self) {
self.index = self.range.start;
}
}
pub(crate) trait SortedPageKey: Codec + Clone + Ord {
fn as_raw(&self) -> &[u8];
fn as_split_separator(&self) -> Self;
}
pub(crate) trait SortedPageValue: Codec + Clone {}
impl<T> SortedPageValue for T where T: Codec + Clone {}
impl Codec for &[u8] {
fn encode_size(&self) -> usize {
mem::size_of::<u32>() + self.len()
}
unsafe fn encode_to(&self, enc: &mut Encoder) {
enc.put_u32(self.len() as u32);
enc.put_slice(self);
}
unsafe fn decode_from(dec: &mut Decoder) -> Self {
let len = dec.get_u32() as usize;
dec.get_slice(len)
}
}
impl SortedPageKey for &[u8] {
fn as_raw(&self) -> &[u8] {
self
}
fn as_split_separator(&self) -> Self {
self
}
}
impl Codec for Key<'_> {
fn encode_size(&self) -> usize {
self.raw.encode_size() + mem::size_of::<u64>()
}
unsafe fn encode_to(&self, enc: &mut Encoder) {
self.raw.encode_to(enc);
enc.put_u64(self.lsn);
}
unsafe fn decode_from(dec: &mut Decoder) -> Self {
let raw = Codec::decode_from(dec);
let lsn = dec.get_u64();
Self::new(raw, lsn)
}
}
impl SortedPageKey for Key<'_> {
fn as_raw(&self) -> &[u8] {
self.raw
}
fn as_split_separator(&self) -> Self {
Key::new(self.raw, u64::MAX)
}
}
const VALUE_KIND_PUT: u8 = 0;
const VALUE_KIND_DELETE: u8 = 1;
impl Codec for Value<'_> {
fn encode_size(&self) -> usize {
1 + match self {
Self::Put(v) => v.len(),
Self::Delete => 0,
}
}
unsafe fn encode_to(&self, enc: &mut Encoder) {
match self {
Value::Put(v) => {
enc.put_u8(VALUE_KIND_PUT);
enc.put_slice(v);
}
Value::Delete => enc.put_u8(VALUE_KIND_DELETE),
}
}
unsafe fn decode_from(dec: &mut Decoder) -> Self {
let kind = dec.get_u8();
match kind {
VALUE_KIND_PUT => Self::Put(dec.get_slice(dec.remaining())),
VALUE_KIND_DELETE => Self::Delete,
_ => unreachable!(),
}
}
}
impl Codec for Index {
fn encode_size(&self) -> usize {
mem::size_of::<u64>() * 2
}
unsafe fn encode_to(&self, enc: &mut Encoder) {
enc.put_u64(self.id);
enc.put_u64(self.epoch);
}
unsafe fn decode_from(dec: &mut Decoder) -> Self {
let id = dec.get_u64();
let epoch = dec.get_u64();
Self::new(id, epoch)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::page::tests::*;
#[test]
fn sorted_page() {
let data = raw_slice(&[[1], [3], [5]]);
let owned_page = OwnedSortedPage::from_slice(&data);
let page = owned_page.as_ref();
assert_eq!(page.tier(), PageTier::Leaf);
assert_eq!(page.kind(), PageKind::Data);
assert_eq!(page.len(), data.len());
assert_eq!(page.get(0), Some(data[0]));
assert_eq!(page.get(1), Some(data[1]));
assert_eq!(page.get(2), Some(data[2]));
assert_eq!(page.get(3), None);
assert_eq!(page.rank([0].as_slice()), Err(0));
assert_eq!(page.rank([1].as_slice()), Ok(0));
assert_eq!(page.rank([2].as_slice()), Err(1));
assert_eq!(page.rank([3].as_slice()), Ok(1));
assert_eq!(page.rank([4].as_slice()), Err(2));
assert_eq!(page.rank([5].as_slice()), Ok(2));
let mut iter = SortedPageIter::from(page);
for _ in 0..2 {
for (a, b) in (&mut iter).zip(data.clone()) {
assert_eq!(a, b);
}
iter.rewind();
}
}
#[test]
fn sorted_page_split() {
let data = key_slice(&[([1], 2), ([1], 1), ([3], 3), ([3], 2), ([3], 1), ([3], 0)]);
let left_data = key_slice(&[([1], 2), ([1], 1)]);
let right_data = key_slice(&[([3], 3), ([3], 2), ([3], 1), ([3], 0)]);
let owned_page = OwnedSortedPage::from_slice(&data);
let page = owned_page.as_ref();
let (split_key, mut left_iter, mut right_iter) = page.into_split_iter().unwrap();
assert_eq!(split_key, Key::new(&[3], u64::MAX));
for _ in 0..2 {
for (a, b) in (&mut left_iter).zip(left_data.clone()) {
assert_eq!(a, b);
}
left_iter.rewind();
for (a, b) in (&mut right_iter).zip(right_data.clone()) {
assert_eq!(a, b);
}
right_iter.rewind();
}
}
#[test]
fn sorted_page_split_none() {
{
let data = raw_slice(&[[1]]);
let owned_page = OwnedSortedPage::from_slice(&data);
assert!(owned_page.as_ref().into_split_iter().is_none());
}
{
let data = key_slice(&[([1], 2), ([1], 1), ([3], 3)]);
let owned_page = OwnedSortedPage::from_slice(&data);
assert!(owned_page.as_ref().into_split_iter().is_none());
}
}
#[test]
fn sorted_page_iter() {
let data = raw_slice(&[[1], [3], [5]]);
let owned_page = OwnedSortedPage::from_slice(&data);
let mut iter = owned_page.as_iter();
for _ in 0..2 {
for (a, b) in (&mut iter).zip(data.clone()) {
assert_eq!(a, b);
}
iter.rewind();
}
assert!(!iter.seek([0].as_slice()));
assert_eq!(iter.next(), Some(data[0]));
assert!(iter.seek([1].as_slice()));
assert_eq!(iter.next(), Some(data[0]));
assert!(!iter.seek([2].as_slice()));
assert_eq!(iter.next(), Some(data[1]));
assert!(iter.seek([3].as_slice()));
assert_eq!(iter.next(), Some(data[1]));
assert!(!iter.seek([4].as_slice()));
assert_eq!(iter.next(), Some(data[2]));
assert!(iter.seek([5].as_slice()));
assert_eq!(iter.next(), Some(data[2]));
assert!(!iter.seek([6].as_slice()));
assert_eq!(iter.next(), None);
}
}