tonbo 0.3.2

An embedded persistent KV database in Rust.
Documentation
use std::{
    pin::Pin,
    sync::Arc,
    task::{Context, Poll},
};

use futures_core::Stream;
use parquet::{arrow::ProjectionMask, errors::ParquetError};
use pin_project_lite::pin_project;

use crate::{
    record::Record,
    stream::{Entry, ScanStream},
};

pin_project! {
    pub struct MemProjectionStream<'projection, R>
    where
        R: Record,
    {
        stream: Box<ScanStream<'projection, R>>,
        projection_mask: Arc<ProjectionMask>,
    }
}

impl<'projection, R> MemProjectionStream<'projection, R>
where
    R: Record,
{
    pub(crate) fn new(stream: ScanStream<'projection, R>, projection_mask: ProjectionMask) -> Self {
        Self {
            stream: Box::new(stream),
            projection_mask: Arc::new(projection_mask),
        }
    }
}

impl<'projection, R> Stream for MemProjectionStream<'projection, R>
where
    R: Record,
{
    type Item = Result<Entry<'projection, R>, ParquetError>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let mut project = self.project();

        return match Pin::new(&mut project.stream).poll_next(cx) {
            Poll::Ready(Some(Ok(entry))) => Poll::Ready(Some(Ok(Entry::Projection((
                Box::new(entry),
                project.projection_mask.clone(),
            ))))),
            poll => poll,
        };
    }
}

#[cfg(all(test, feature = "tokio"))]
mod tests {
    use std::{ops::Bound, sync::Arc};

    use fusio::{disk::TokioFs, path::Path, DynFs};
    use futures_util::StreamExt;
    use parquet::arrow::{ArrowSchemaConverter, ProjectionMask};

    use crate::{
        inmem::{immutable::tests::TestSchema, mutable::Mutable},
        record::Schema,
        stream::mem_projection::MemProjectionStream,
        tests::Test,
        trigger::TriggerFactory,
        wal::log::LogType,
        DbOption,
    };

    #[tokio::test]
    async fn merge_mutable() {
        let temp_dir = tempfile::tempdir().unwrap();
        let fs = Arc::new(TokioFs) as Arc<dyn DynFs>;
        let option = DbOption::new(
            Path::from_filesystem_path(temp_dir.path()).unwrap(),
            &TestSchema,
        );

        fs.create_dir_all(&option.wal_dir_path()).await.unwrap();

        let trigger = TriggerFactory::create(option.trigger_type);

        let mutable = Mutable::<Test>::new(&option, trigger, &fs, Arc::new(TestSchema {}))
            .await
            .unwrap();

        mutable
            .insert(
                LogType::Full,
                Test {
                    vstring: "0".to_string(),
                    vu32: 0,
                    vbool: Some(true),
                },
                0.into(),
            )
            .await
            .unwrap();
        mutable
            .insert(
                LogType::Full,
                Test {
                    vstring: "1".to_string(),
                    vu32: 1,
                    vbool: Some(true),
                },
                0.into(),
            )
            .await
            .unwrap();
        mutable
            .insert(
                LogType::Full,
                Test {
                    vstring: "2".to_string(),
                    vu32: 2,
                    vbool: Some(true),
                },
                0.into(),
            )
            .await
            .unwrap();

        let mask = ProjectionMask::roots(
            &ArrowSchemaConverter::new()
                .convert(TestSchema.arrow_schema())
                .unwrap(),
            vec![0, 1, 2, 4],
        );

        let mut stream = MemProjectionStream::<Test>::new(
            mutable
                .scan((Bound::Unbounded, Bound::Unbounded), 6.into())
                .into(),
            mask,
        );

        let entry_0 = stream.next().await.unwrap().unwrap();
        assert!(entry_0.value().unwrap().vu32.is_none());
        assert_eq!(entry_0.value().unwrap().vstring, "0");
        assert_eq!(entry_0.value().unwrap().vbool, Some(true));

        let entry_1 = stream.next().await.unwrap().unwrap();
        assert!(entry_1.value().unwrap().vu32.is_none());
        assert_eq!(entry_1.value().unwrap().vstring, "1");
        assert_eq!(entry_1.value().unwrap().vbool, Some(true));

        let entry_2 = stream.next().await.unwrap().unwrap();
        assert!(entry_2.value().unwrap().vu32.is_none());
        assert_eq!(entry_2.value().unwrap().vstring, "2");
        assert_eq!(entry_2.value().unwrap().vbool, Some(true));

        assert!(stream.next().await.is_none())
    }
}