fs2db 7.3.0

migration helper
Documentation
use std::path::Path;

use futures::StreamExt;

use tokio::io::{AsyncBufReadExt, BufReader};

use tokio_stream::wrappers::{ReceiverStream, SplitStream};

use tonic::Status;

use crate::input::source::BucketSource;

/// A trait which gets a readable object by bucket
#[tonic::async_trait]
pub trait ReadSource: Send + Sync + 'static {
    type Bucket: Send + Sync;
    type R: Send + Sync + tokio::io::AsyncRead + Unpin;

    async fn get_src_read_by_bucket(&self, b: Self::Bucket) -> Result<Self::R, Status>;
}

pub struct ReadSrc<R> {
    rsrc: R,
}

#[tonic::async_trait]
impl<R> BucketSource for ReadSrc<R>
where
    R: ReadSource,
{
    type Bucket = R::Bucket;
    type K = usize;
    type V = Vec<u8>;
    type All = ReceiverStream<Result<(Self::K, Self::V), Status>>;

    async fn get_all_by_bucket(&self, b: Self::Bucket) -> Result<Self::All, Status> {
        let r: R::R = self.rsrc.get_src_read_by_bucket(b).await?;
        let br = BufReader::new(r);
        let splited = br.split(b'\n');
        let ss = SplitStream::new(splited);
        let (tx, rx) = tokio::sync::mpsc::channel(1);
        tokio::spawn(async move {
            let mapd = ss
                .map(|rslt| {
                    rslt.map_err(|e| Status::internal(format!("unable to get a line: {e}")))
                })
                .enumerate()
                .map(|pair: (usize, _)| {
                    let (ix, item) = pair;
                    item.map(|v: Self::V| (ix, v))
                });
            let rt = &tx;
            let _cnt: u64 = mapd
                .fold(0, |tot, rslt| async move {
                    rt.send(rslt).await.map(|_| 1 + tot).unwrap_or(tot)
                })
                .await;
        });
        Ok(ReceiverStream::new(rx))
    }
}

pub fn bytes_src_new<R>(rsrc: R) -> impl BucketSource<Bucket = R::Bucket, K = usize, V = Vec<u8>>
where
    R: ReadSource,
{
    ReadSrc { rsrc }
}

#[tonic::async_trait]
pub trait FsSource: Send + Sync + 'static {
    type Bucket: Send + Sync;
    type P: AsRef<Path> + Send + Sync;

    fn bucket2path(&self, b: Self::Bucket) -> Result<Self::P, Status>;

    async fn get_file_by_bucket(&self, b: Self::Bucket) -> Result<tokio::fs::File, Status> {
        let p: Self::P = self.bucket2path(b)?;
        tokio::fs::File::open(p)
            .await
            .map_err(|e| Status::internal(format!("unable to open a file: {e}")))
    }
}

#[tonic::async_trait]
impl<F> ReadSource for F
where
    F: FsSource,
{
    type Bucket = F::Bucket;
    type R = tokio::fs::File;

    async fn get_src_read_by_bucket(&self, b: Self::Bucket) -> Result<Self::R, Status> {
        self.get_file_by_bucket(b).await
    }
}