data_bucket 0.3.13

DataBucket is container for WorkTable's data
Documentation
use std::fmt::Debug;
use std::io::SeekFrom;

use data_bucket_codegen::Persistable;
use indexset::core::pair::Pair;
use rkyv::de::Pool;
use rkyv::rancor::Strategy;
use rkyv::ser::allocator::ArenaHandle;
use rkyv::ser::sharing::Share;
use rkyv::ser::Serializer;
use rkyv::util::AlignedVec;
use rkyv::{Archive, Deserialize, Serialize};
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};

use crate::page::index::IndexPageUtility;
use crate::page::PageId;
use crate::{align8, VariableSizeMeasurable};
use crate::{seek_to_page_start, IndexValue, SizeMeasurable, GENERAL_HEADER_SIZE};
use crate::{Link, Persistable};

#[derive(Archive, Clone, Deserialize, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub struct UnsizedIndexPage<
    T: Default + SizeMeasurable + VariableSizeMeasurable,
    const DATA_LENGTH: u32,
> {
    pub slots_size: u16,
    pub node_id_size: u16,
    pub node_id: IndexValue<T>,
    pub last_value_offset: u32,
    pub removed_len: u32,
    pub slots: Vec<(u32, u16)>,
    pub index_values: Vec<IndexValue<T>>,
}

#[derive(
    Archive, Clone, Deserialize, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize, Persistable,
)]
#[persistable(by_parts, unsized_gens)]
pub struct UnsizedIndexPageUtility<T: Default + SizeMeasurable + VariableSizeMeasurable> {
    pub slots_size: u16,
    pub node_id_size: u16,
    pub node_id: IndexValue<T>,
    pub last_value_offset: u32,
    pub removed_len: u32,
    pub slots: Vec<(u32, u16)>,
}

impl<T: Default + SizeMeasurable + VariableSizeMeasurable> UnsizedIndexPageUtility<T> {
    pub fn update_node_id(&mut self, node_id: IndexValue<T>) -> eyre::Result<()> {
        self.node_id_size = node_id.aligned_size() as u16;
        self.node_id = node_id;

        Ok(())
    }
}

impl<T: Default + SizeMeasurable + VariableSizeMeasurable, const DATA_LENGTH: u32>
    IndexPageUtility<T> for UnsizedIndexPage<T, DATA_LENGTH>
where
    T: Archive
        + Debug
        + for<'a> Serialize<
            Strategy<Serializer<AlignedVec, ArenaHandle<'a>, Share>, rkyv::rancor::Error>,
        > + Send
        + Sync,
    <T as Archive>::Archived: Deserialize<T, Strategy<Pool, rkyv::rancor::Error>>,
{
    type Utility = UnsizedIndexPageUtility<T>;

    async fn parse_index_page_utility(
        file: &mut File,
        page_id: PageId,
    ) -> eyre::Result<Self::Utility> {
        seek_to_page_start(file, page_id.0).await?;
        let offset = GENERAL_HEADER_SIZE as i64;
        file.seek(SeekFrom::Current(offset)).await?;

        let mut slot_size_bytes = vec![0u8; UnsizedIndexPageUtility::<T>::slots_size_size()];
        file.read_exact(slot_size_bytes.as_mut_slice()).await?;
        let archived = unsafe {
            rkyv::access_unchecked::<<u16 as Archive>::Archived>(
                &slot_size_bytes[0..UnsizedIndexPageUtility::<T>::slots_size_size()],
            )
        };
        let slots_size =
            rkyv::deserialize::<u16, rkyv::rancor::Error>(archived).expect("data should be valid");
        let mut node_id_size_bytes = vec![0u8; UnsizedIndexPageUtility::<T>::node_id_size_size()];
        file.read_exact(node_id_size_bytes.as_mut_slice()).await?;
        let archived = unsafe {
            rkyv::access_unchecked::<<u16 as Archive>::Archived>(
                &node_id_size_bytes[0..UnsizedIndexPageUtility::<T>::node_id_size_size()],
            )
        };
        let node_id_size =
            rkyv::deserialize::<u16, rkyv::rancor::Error>(archived).expect("data should be valid");

        let index_utility_len = UnsizedIndexPageUtility::<T>::persisted_size(
            slots_size as usize,
            node_id_size as usize,
        );
        file.seek(SeekFrom::Current(
            -(UnsizedIndexPageUtility::<T>::slots_size_size() as i64
                + UnsizedIndexPageUtility::<T>::node_id_size_size() as i64),
        ))
        .await?;
        let mut index_utility_bytes = vec![0u8; index_utility_len];
        file.read_exact(index_utility_bytes.as_mut_slice()).await?;
        let utility = UnsizedIndexPageUtility::<T>::from_bytes(&index_utility_bytes);

        Ok(utility)
    }
}

impl<T, const DATA_LENGTH: u32> UnsizedIndexPage<T, DATA_LENGTH>
where
    T: Archive
        + Default
        + Clone
        + SizeMeasurable
        + VariableSizeMeasurable
        + for<'a> Serialize<
            Strategy<Serializer<AlignedVec, ArenaHandle<'a>, Share>, rkyv::rancor::Error>,
        >,
    <T as Archive>::Archived: Deserialize<T, Strategy<Pool, rkyv::rancor::Error>>,
{
    pub fn new(node_id: IndexValue<T>) -> eyre::Result<Self> {
        let len = node_id.aligned_size() as u32;
        Ok(Self {
            slots_size: 1,
            node_id_size: len as u16,
            node_id: node_id.clone(),
            last_value_offset: len,
            removed_len: 0,
            slots: vec![(len, len as u16)],
            index_values: vec![node_id],
        })
    }

    fn new_with_values(values: Vec<IndexValue<T>>) -> Self
    where
        T: Clone,
    {
        let slots_size = values.len() as u16;
        let node_id = values.last().expect("Node should be not empty").clone();
        let node_id_size = node_id.aligned_size() as u16;
        let mut last_value_offset = 0;
        let mut slots = vec![];
        for val in &values {
            let len = val.aligned_size() as u32;
            last_value_offset += len;
            slots.push((last_value_offset, len as u16));
        }
        Self {
            slots_size,
            node_id_size,
            node_id,
            last_value_offset,
            slots,
            removed_len: 0,
            index_values: values,
        }
    }

    pub fn rebuild(&mut self)
    where
        T: Clone,
    {
        self.node_id = self.index_values.last().unwrap().clone();
        self.node_id_size = self.node_id.aligned_size() as u16;
        self.last_value_offset = 0;
        self.removed_len = 0;
        let mut slots = vec![];
        for val in &self.index_values {
            let len = val.aligned_size() as u32;
            self.last_value_offset += len;
            slots.push((self.last_value_offset, len as u16));
        }
        self.slots = slots;
        self.slots_size = self.slots.len() as u16
    }

    pub fn split(&mut self, index: usize) -> UnsizedIndexPage<T, DATA_LENGTH>
    where
        T: Clone,
    {
        let index_values = self.index_values.split_off(index);
        let new_page = UnsizedIndexPage::new_with_values(index_values);
        self.rebuild();

        new_page
    }

    pub async fn persist_value(
        file: &mut File,
        page_id: PageId,
        current_offset: u32,
        value: IndexValue<T>,
    ) -> eyre::Result<u32>
    where
        T: Archive
            + Eq
            + for<'a> Serialize<
                Strategy<Serializer<AlignedVec, ArenaHandle<'a>, Share>, rkyv::rancor::Error>,
            >,
        <T as Archive>::Archived: Deserialize<T, Strategy<Pool, rkyv::rancor::Error>>,
    {
        // We seek to page's end and will write values from tail.
        seek_to_page_start(file, page_id.0 + 1).await?;

        let bytes = rkyv::to_bytes::<rkyv::rancor::Error>(&value)?;
        let offset = current_offset + bytes.len() as u32;
        file.seek(SeekFrom::Current(-(offset as i64))).await?;
        file.write_all(bytes.as_slice()).await?;

        Ok(offset)
    }

    async fn read_value(file: &mut File, len: u16) -> eyre::Result<IndexValue<T>>
    where
        T: Archive,
        <T as Archive>::Archived: Deserialize<T, Strategy<Pool, rkyv::rancor::Error>>,
    {
        let mut bytes = vec![0u8; len as usize];
        file.read_exact(bytes.as_mut_slice()).await?;
        let mut v = AlignedVec::<4>::new();
        v.extend_from_slice(bytes.as_slice());
        let archived =
            unsafe { rkyv::access_unchecked::<<IndexValue<T> as Archive>::Archived>(&v[..]) };
        Ok(rkyv::deserialize(archived).expect("data should be valid"))
    }

    pub async fn read_value_with_offset(
        file: &mut File,
        page_id: PageId,
        offset: u32,
        len: u16,
    ) -> eyre::Result<IndexValue<T>>
    where
        T: Archive,
        <T as Archive>::Archived: Deserialize<T, Strategy<Pool, rkyv::rancor::Error>>,
    {
        seek_to_page_start(file, page_id.0 + 1).await?;
        file.seek(SeekFrom::Current(-(offset as i64))).await?;
        Self::read_value(file, len).await
    }

    pub fn get_node(&self) -> Vec<Pair<T, Link>>
    where
        T: Clone + Ord,
    {
        self.index_values
            .clone()
            .into_iter()
            .map(|v| v.into())
            .collect()
    }

    pub fn from_node(node: &[impl Into<IndexValue<T>> + Clone]) -> Self
    where
        T: Clone + Ord + Default,
    {
        let values = node.iter().map(|v| v.clone().into()).collect::<Vec<_>>();
        Self::new_with_values(values)
    }
}

impl<T, const DATA_LENGTH: u32> Persistable for UnsizedIndexPage<T, DATA_LENGTH>
where
    T: Archive
        + Clone
        + Default
        + Debug
        + SizeMeasurable
        + VariableSizeMeasurable
        + for<'a> Serialize<
            Strategy<Serializer<AlignedVec, ArenaHandle<'a>, Share>, rkyv::rancor::Error>,
        >,
    <T as Archive>::Archived: Deserialize<T, Strategy<Pool, rkyv::rancor::Error>>,
{
    fn as_bytes(&self) -> impl AsRef<[u8]> + Send {
        let data_length = DATA_LENGTH as usize;
        let utility = UnsizedIndexPageUtility {
            slots_size: self.slots_size,
            node_id_size: self.node_id_size,
            node_id: self.node_id.clone(),
            last_value_offset: self.last_value_offset,
            removed_len: self.removed_len,
            slots: self.slots.clone(),
        };
        let utility_bytes = utility.as_bytes();
        let utility_bytes = utility_bytes.as_ref().to_vec();
        let utility_len = utility_bytes.len();
        let mut bytes = vec![0u8; data_length];
        bytes.splice(0..utility_len, utility_bytes.iter().copied());

        for ((offset, len), value) in self.slots.iter().zip(self.index_values.iter()) {
            let offset = data_length - *offset as usize;
            let len = *len as usize;
            let value_bytes = rkyv::to_bytes::<rkyv::rancor::Error>(value).unwrap();
            bytes.splice(offset..(offset + len), value_bytes.iter().copied());
        }

        bytes
    }

    fn from_bytes(bytes: &[u8]) -> Self {
        let slots_size_bytes = &bytes[0..UnsizedIndexPageUtility::<T>::slots_size_size()];
        let archived =
            unsafe { rkyv::access_unchecked::<<u16 as Archive>::Archived>(slots_size_bytes) };
        let slots_size =
            rkyv::deserialize::<u16, rkyv::rancor::Error>(archived).expect("data should be valid");
        let node_id_size_bytes = &bytes[UnsizedIndexPageUtility::<T>::slots_size_size()
            ..UnsizedIndexPageUtility::<T>::node_id_size_size()
                + UnsizedIndexPageUtility::<T>::node_id_size_size()];
        let archived =
            unsafe { rkyv::access_unchecked::<<u16 as Archive>::Archived>(node_id_size_bytes) };
        let node_id_size =
            rkyv::deserialize::<u16, rkyv::rancor::Error>(archived).expect("data should be valid");
        let utility_len = UnsizedIndexPageUtility::<T>::persisted_size(
            slots_size as usize,
            node_id_size as usize,
        );
        let utility = UnsizedIndexPageUtility::<T>::from_bytes(&bytes[0..utility_len]);
        let mut index_values = Vec::with_capacity(utility.slots.len());
        for (offset, len) in &utility.slots {
            let offset = bytes.len() - *offset as usize;
            let len = *len as usize;
            let value_bytes = &bytes[offset..(offset + len)];
            let archived = unsafe {
                rkyv::access_unchecked::<<IndexValue<T> as Archive>::Archived>(value_bytes)
            };
            let val = rkyv::deserialize::<_, rkyv::rancor::Error>(archived)
                .expect("data should be valid");
            index_values.push(val)
        }

        Self {
            slots_size,
            node_id_size,
            node_id: utility.node_id,
            last_value_offset: utility.last_value_offset,
            slots: utility.slots,
            removed_len: utility.removed_len,
            index_values,
        }
    }
}

#[cfg(test)]
mod test {
    use crate::{IndexValue, Link, Persistable, UnsizedIndexPage};

    #[test]
    fn to_bytes_and_back() {
        let page = UnsizedIndexPage::<_, 1024>::new(IndexValue {
            key: "Someone from somewhere".to_string(),
            link: Default::default(),
        })
        .unwrap();
        let bytes = page.as_bytes();
        assert_eq!(bytes.as_ref().len(), 1024);
        let page_back = UnsizedIndexPage::from_bytes(bytes.as_ref());
        assert_eq!(page_back, page)
    }

    #[test]
    fn split() {
        let mut values = vec![];
        for i in 0..10 {
            values.push(IndexValue {
                key: format!("{i}___________________{i}"),
                link: Link {
                    page_id: 0.into(),
                    offset: i * 24,
                    length: 24,
                },
            })
        }
        let mut page = UnsizedIndexPage::<String, 1024>::new_with_values(values);
        let split = page.split(5);

        assert_eq!(page.slots_size, 5);
        let offset = page.slots.iter().map(|(_, l)| *l).sum::<u16>();
        assert_eq!(page.last_value_offset, offset as u32);
        assert_eq!(page.last_value_offset, page.slots.last().unwrap().0);

        assert_ne!(page.node_id, split.node_id);

        assert_eq!(split.slots_size, 5);
        let offset = split.slots.iter().map(|(_, l)| *l).sum::<u16>();
        assert_eq!(split.last_value_offset, offset as u32);
        assert_eq!(split.last_value_offset, page.slots.last().unwrap().0)
    }
}