robt 0.2.0

Read only, immutable Btree for indexing key,value
Documentation
use mkit::{
    cbor::{self, Cbor, IntoCbor},
    db,
};

use std::{cell::RefCell, convert::TryFrom, rc::Rc};

use crate::{
    config::Config, entry::Entry, flush::Flusher, scans::BuildScan, util, Result,
};

pub struct BuildMM<K, V, D, I> {
    m_blocksize: usize,
    iflush: Rc<RefCell<Flusher>>,
    iter: Box<BuildIter<K, V, D, I>>,
    entry: Option<(K, u64)>,
}

impl<K, V, D, I> BuildMM<K, V, D, I> {
    pub fn new(
        config: &Config,
        iflush: Rc<RefCell<Flusher>>,
        iter: BuildIter<K, V, D, I>,
    ) -> Self {
        BuildMM {
            m_blocksize: config.m_blocksize,
            iflush,
            iter: Box::new(iter),
            entry: None,
        }
    }
}

impl<K, V, D, I> Iterator for BuildMM<K, V, D, I>
where
    K: Clone + IntoCbor,
    V: Clone + IntoCbor,
    D: Clone + IntoCbor,
    I: Iterator<Item = db::Entry<K, V, D>>,
{
    type Item = Result<(K, u64)>;

    fn next(&mut self) -> Option<Self::Item> {
        let mut mblock = Vec::with_capacity(self.m_blocksize);
        let block_size = self.m_blocksize.saturating_sub(1);

        let mut first_key: Option<K> = None;
        let mut curr_fpos = None;
        let mut n = 0;

        iter_result!(Cbor::Major4(cbor::Info::Indefinite, vec![]).encode(&mut mblock));

        loop {
            let entry = {
                let entry = self.entry.take().map(|e| Some(Ok(e)));
                entry.unwrap_or_else(|| self.iter.next())
            };
            match entry {
                Some(Ok((key, fpos))) => {
                    curr_fpos = Some(fpos);
                    n += 1;

                    first_key.get_or_insert_with(|| key.clone());
                    let ibytes = {
                        let e = Entry::<K, V, D>::new_mm(key.clone(), fpos);
                        iter_result!(util::into_cbor_bytes(e))
                    };
                    if (mblock.len() + ibytes.len()) > block_size {
                        self.entry = Some((key, fpos));
                        break;
                    }
                    mblock.extend_from_slice(&ibytes);
                }
                Some(Err(err)) => return Some(Err(err)),
                None if first_key.is_some() => break,
                None => return None,
            }
        }

        let brk = iter_result!(util::into_cbor_bytes(cbor::SimpleValue::Break));
        mblock.extend_from_slice(&brk);
        mblock.resize(self.m_blocksize, 0);

        if n > 1 {
            curr_fpos = Some(self.iflush.borrow().to_fpos().unwrap_or(0));
            iter_result!(self.iflush.borrow_mut().flush(mblock));
        }

        Some(Ok((first_key.unwrap(), curr_fpos.unwrap())))
    }
}

pub struct BuildMZ<K, V, D, I> {
    m_blocksize: usize,
    iflush: Rc<RefCell<Flusher>>,
    iter: BuildZZ<K, V, D, I>,
    entry: Option<(K, u64)>,
}

impl<K, V, D, I> BuildMZ<K, V, D, I> {
    pub fn new(
        config: &Config,
        iflush: Rc<RefCell<Flusher>>,
        iter: BuildZZ<K, V, D, I>,
    ) -> Self {
        BuildMZ {
            m_blocksize: config.m_blocksize,
            iflush,
            iter,
            entry: None,
        }
    }
}

impl<K, V, D, I> Iterator for BuildMZ<K, V, D, I>
where
    K: Clone + IntoCbor,
    V: Clone + IntoCbor,
    D: Clone + IntoCbor,
    I: Iterator<Item = db::Entry<K, V, D>>,
{
    type Item = Result<(K, u64)>;

    fn next(&mut self) -> Option<Self::Item> {
        let mut mblock = Vec::with_capacity(self.m_blocksize);
        let block_size = self.m_blocksize.saturating_sub(1);

        let mut first_key: Option<K> = None;

        iter_result!(Cbor::Major4(cbor::Info::Indefinite, vec![]).encode(&mut mblock));

        loop {
            let entry = {
                let entry = self.entry.take().map(|e| Some(Ok(e)));
                entry.unwrap_or_else(|| self.iter.next())
            };
            match entry {
                Some(Ok((key, fpos))) => {
                    first_key.get_or_insert_with(|| key.clone());
                    let ibytes = {
                        let e = Entry::<K, V, D>::new_mz(key.clone(), fpos);
                        iter_result!(util::into_cbor_bytes(e))
                    };
                    if (mblock.len() + ibytes.len()) > block_size {
                        self.entry = Some((key, fpos));
                        break;
                    }
                    mblock.extend_from_slice(&ibytes);
                }
                Some(Err(err)) => return Some(Err(err)),
                None if first_key.is_some() => break,
                None => return None,
            }
        }

        let brk = iter_result!(util::into_cbor_bytes(cbor::SimpleValue::Break));
        mblock.extend_from_slice(&brk);
        mblock.resize(self.m_blocksize, 0);

        let fpos = self.iflush.borrow().to_fpos().unwrap_or(0);

        iter_result!(self.iflush.borrow_mut().flush(mblock));
        Some(Ok((first_key.unwrap(), fpos)))
    }
}

pub struct BuildZZ<K, V, D, I> {
    z_blocksize: usize,
    v_blocksize: usize,
    value_in_vlog: bool,
    delta_ok: bool,
    iflush: Rc<RefCell<Flusher>>,
    vflush: Rc<RefCell<Flusher>>,
    iter: Rc<RefCell<BuildScan<K, V, D, I>>>,
}

impl<K, V, D, I> BuildZZ<K, V, D, I> {
    pub fn new(
        config: &Config,
        iflush: Rc<RefCell<Flusher>>,
        vflush: Rc<RefCell<Flusher>>,
        iter: Rc<RefCell<BuildScan<K, V, D, I>>>,
    ) -> Self {
        BuildZZ {
            z_blocksize: config.z_blocksize,
            v_blocksize: config.v_blocksize,
            value_in_vlog: config.value_in_vlog,
            delta_ok: config.delta_ok,
            iflush,
            vflush,
            iter,
        }
    }
}

impl<K, V, D, I> Iterator for BuildZZ<K, V, D, I>
where
    K: Clone + IntoCbor,
    V: Clone + IntoCbor,
    D: Clone + IntoCbor,
    I: Iterator<Item = db::Entry<K, V, D>>,
{
    type Item = Result<(K, u64)>;

    fn next(&mut self) -> Option<Self::Item> {
        let mut zblock = Vec::with_capacity(self.z_blocksize);
        let mut vblock = Vec::with_capacity(self.v_blocksize);
        let block_size = self.z_blocksize.saturating_sub(1);

        let mut first_key: Option<K> = None;

        iter_result!(Cbor::Major4(cbor::Info::Indefinite, vec![]).encode(&mut zblock));

        let mut iter = self.iter.borrow_mut();
        let mut vfpos = self.vflush.borrow().to_fpos().unwrap_or(0);

        loop {
            match iter.next() {
                Some(mut entry) => {
                    if !self.delta_ok {
                        entry.drain_deltas()
                    }
                    first_key.get_or_insert_with(|| entry.key.clone());
                    let (e, vbytes) = {
                        let e = Entry::<K, V, D>::from(entry.clone());
                        iter_result!(e.into_reference(vfpos, self.value_in_vlog))
                    };
                    let ibytes = iter_result!(util::into_cbor_bytes(e));

                    if (zblock.len() + ibytes.len()) > block_size {
                        iter.push(entry);
                        break;
                    }
                    zblock.extend_from_slice(&ibytes);
                    vblock.extend_from_slice(&vbytes);
                    vfpos += u64::try_from(vbytes.len()).unwrap();
                }
                None if first_key.is_some() => break,
                None => return None,
            }
        }

        let brk = iter_result!(util::into_cbor_bytes(cbor::SimpleValue::Break));
        zblock.extend_from_slice(&brk);
        zblock.resize(self.z_blocksize, 0);

        let fpos = self.iflush.borrow().to_fpos().unwrap_or(0);

        iter_result!(self.vflush.borrow_mut().flush(vblock));
        iter_result!(self.iflush.borrow_mut().flush(zblock));
        Some(Ok((first_key.unwrap(), fpos)))
    }
}

pub enum BuildIter<K, V, D, I> {
    MM(BuildMM<K, V, D, I>),
    MZ(BuildMZ<K, V, D, I>),
}

impl<K, V, D, I> From<BuildMZ<K, V, D, I>> for BuildIter<K, V, D, I> {
    fn from(val: BuildMZ<K, V, D, I>) -> Self {
        BuildIter::MZ(val)
    }
}

impl<K, V, D, I> From<BuildMM<K, V, D, I>> for BuildIter<K, V, D, I> {
    fn from(val: BuildMM<K, V, D, I>) -> Self {
        BuildIter::MM(val)
    }
}

impl<K, V, D, I> Iterator for BuildIter<K, V, D, I>
where
    K: Clone + IntoCbor,
    V: Clone + IntoCbor,
    D: Clone + IntoCbor,
    I: Iterator<Item = db::Entry<K, V, D>>,
{
    type Item = Result<(K, u64)>;

    fn next(&mut self) -> Option<Self::Item> {
        match self {
            BuildIter::MM(iter) => iter.next(),
            BuildIter::MZ(iter) => iter.next(),
        }
    }
}