tonbo 0.3.2

An embedded persistent KV database in Rust.
Documentation
pub(crate) mod level;
pub(crate) mod mem_projection;
pub(crate) mod merge;
pub(crate) mod package;
pub(crate) mod record_batch;

use std::{
    fmt::{self, Debug, Formatter},
    mem::transmute,
    pin::Pin,
    sync::Arc,
    task::{Context, Poll},
};

use futures_core::Stream;
use futures_util::{ready, stream};
use parquet::arrow::ProjectionMask;
use pin_project_lite::pin_project;
use record_batch::RecordBatchEntry;

use crate::{
    inmem::{immutable::ImmutableScan, mutable::MutableScan},
    ondisk::scan::SsTableScan,
    record::{Key, Record, RecordRef, Schema},
    stream::{level::LevelStream, mem_projection::MemProjectionStream},
    timestamp::Timestamped,
    transaction::TransactionScan,
};

pub enum Entry<'entry, R>
where
    R: Record,
{
    Transaction(
        (
            Timestamped<<<R::Schema as Schema>::Key as Key>::Ref<'entry>>,
            &'entry Option<R>,
        ),
    ),
    Mutable(
        crossbeam_skiplist::map::Entry<'entry, Timestamped<<R::Schema as Schema>::Key>, Option<R>>,
    ),
    Projection((Box<Entry<'entry, R>>, Arc<ProjectionMask>)),
    RecordBatch(RecordBatchEntry<R>),
}

impl<R> Entry<'_, R>
where
    R: Record,
{
    pub(crate) fn key(&self) -> Timestamped<<<R::Schema as Schema>::Key as Key>::Ref<'_>> {
        match self {
            Entry::Transaction((key, _)) => {
                // Safety: shorter lifetime must be safe
                unsafe {
                    transmute::<
                        Timestamped<<<R::Schema as Schema>::Key as Key>::Ref<'_>>,
                        Timestamped<<<R::Schema as Schema>::Key as Key>::Ref<'_>>,
                    >(key.clone())
                }
            }
            Entry::Mutable(entry) => entry.key().map(|key| {
                // Safety: shorter lifetime must be safe
                unsafe { transmute(key.as_key_ref()) }
            }),
            Entry::RecordBatch(entry) => entry.internal_key(),
            Entry::Projection((entry, _)) => entry.key(),
        }
    }

    pub fn value(&self) -> Option<R::Ref<'_>> {
        match self {
            Entry::Transaction((_, value)) => value.as_ref().map(R::as_record_ref),
            Entry::Mutable(entry) => entry.value().as_ref().map(R::as_record_ref),
            Entry::RecordBatch(entry) => entry.get(),
            Entry::Projection((entry, projection_mask)) => entry.value().map(|mut val_ref| {
                val_ref.projection(projection_mask);
                val_ref
            }),
        }
    }
}

impl<R> fmt::Debug for Entry<'_, R>
where
    R: Record + Debug,
    <R::Schema as Schema>::Key: Debug,
{
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
        match self {
            Entry::Transaction((key, value)) => {
                write!(f, "Entry::Transaction({:?} -> {:?})", key, value)
            }
            Entry::Mutable(mutable) => write!(
                f,
                "Entry::Mutable({:?} -> {:?})",
                mutable.key(),
                mutable.value()
            ),
            Entry::RecordBatch(sstable) => write!(f, "Entry::SsTable({:?})", sstable),
            Entry::Projection((entry, projection_mask)) => {
                write!(f, "Entry::Projection({:?} -> {:?})", entry, projection_mask)
            }
        }
    }
}

pin_project! {
    #[project = ScanStreamProject]
    pub enum ScanStream<'scan, R>
    where
        R: Record,
    {
        Transaction {
            #[pin]
            inner: stream::Iter<TransactionScan<'scan, R>>,
        },
        Mutable {
            #[pin]
            inner: stream::Iter<MutableScan<'scan, R>>,
        },
        Immutable {
            #[pin]
            inner: stream::Iter<ImmutableScan<'scan, R>>,
        },
        SsTable {
            #[pin]
            inner: SsTableScan<'scan, R>,
        },
        Level {
            #[pin]
            inner: LevelStream<'scan, R>,
        },
        MemProjection {
            #[pin]
            inner: MemProjectionStream<'scan, R>,
        }
    }
}

impl<'scan, R> From<TransactionScan<'scan, R>> for ScanStream<'scan, R>
where
    R: Record,
{
    fn from(inner: TransactionScan<'scan, R>) -> Self {
        ScanStream::Transaction {
            inner: stream::iter(inner),
        }
    }
}

impl<'scan, R> From<MutableScan<'scan, R>> for ScanStream<'scan, R>
where
    R: Record,
{
    fn from(inner: MutableScan<'scan, R>) -> Self {
        ScanStream::Mutable {
            inner: stream::iter(inner),
        }
    }
}

impl<'scan, R> From<ImmutableScan<'scan, R>> for ScanStream<'scan, R>
where
    R: Record,
{
    fn from(inner: ImmutableScan<'scan, R>) -> Self {
        ScanStream::Immutable {
            inner: stream::iter(inner),
        }
    }
}

impl<'scan, R> From<SsTableScan<'scan, R>> for ScanStream<'scan, R>
where
    R: Record,
{
    fn from(inner: SsTableScan<'scan, R>) -> Self {
        ScanStream::SsTable { inner }
    }
}

impl<'scan, R> From<MemProjectionStream<'scan, R>> for ScanStream<'scan, R>
where
    R: Record,
{
    fn from(inner: MemProjectionStream<'scan, R>) -> Self {
        ScanStream::MemProjection { inner }
    }
}

impl<R> fmt::Debug for ScanStream<'_, R>
where
    R: Record,
{
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
        match self {
            ScanStream::Transaction { .. } => write!(f, "ScanStream::Transaction"),
            ScanStream::Mutable { .. } => write!(f, "ScanStream::Mutable"),
            ScanStream::SsTable { .. } => write!(f, "ScanStream::SsTable"),
            ScanStream::Immutable { .. } => write!(f, "ScanStream::Immutable"),
            ScanStream::Level { .. } => write!(f, "ScanStream::Level"),
            ScanStream::MemProjection { .. } => write!(f, "ScanStream::MemProjection"),
        }
    }
}

impl<'scan, R> Stream for ScanStream<'scan, R>
where
    R: Record,
{
    type Item = Result<Entry<'scan, R>, parquet::errors::ParquetError>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        match self.project() {
            ScanStreamProject::Transaction { inner } => {
                Poll::Ready(ready!(inner.poll_next(cx)).map(Entry::Transaction).map(Ok))
            }
            ScanStreamProject::Mutable { inner } => {
                Poll::Ready(ready!(inner.poll_next(cx)).map(Entry::Mutable).map(Ok))
            }
            ScanStreamProject::SsTable { inner } => {
                Poll::Ready(ready!(inner.poll_next(cx)).map(|entry| entry.map(Entry::RecordBatch)))
            }
            ScanStreamProject::Immutable { inner } => {
                Poll::Ready(ready!(inner.poll_next(cx)).map(|entry| Ok(Entry::RecordBatch(entry))))
            }
            ScanStreamProject::Level { inner } => {
                Poll::Ready(ready!(inner.poll_next(cx)).map(|entry| entry.map(Entry::RecordBatch)))
            }
            ScanStreamProject::MemProjection { inner } => Poll::Ready(ready!(inner.poll_next(cx))),
        }
    }
}