hypershell-tokio-components 0.1.0-alpha

Modular DSL for shellscripting in Rust
Documentation
use core::marker::PhantomData;

use cgp::extra::handler::{Handler, HandlerComponent};
use cgp::prelude::*;
use futures::AsyncRead as FuturesAsyncRead;
use futures::io::Cursor;
use tokio::io::{AsyncRead as TokioAsyncRead, AsyncReadExt as _};
use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt, TokioAsyncReadCompatExt};

use crate::types::{FuturesAsyncReadStream, TokioAsyncReadStream};

#[cgp_new_provider]
impl<Context, Code, Input> Handler<Context, Code, Input> for ConvertStreamToBytes
where
    Context: CanRaiseAsyncError<std::io::Error>,
    Input: Send + TokioAsyncRead + Unpin,
    Code: Send,
{
    type Output = Vec<u8>;

    async fn handle(
        _context: &Context,
        _tag: PhantomData<Code>,
        mut input: Input,
    ) -> Result<Vec<u8>, Context::Error> {
        let mut output = Vec::new();

        input
            .read_to_end(&mut output)
            .await
            .map_err(Context::raise_error)?;

        Ok(output)
    }
}

#[cgp_new_provider]
impl<Context, Code, Input> Handler<Context, Code, Input> for ConvertStreamToString
where
    Context: CanRaiseAsyncError<std::io::Error>,
    Input: Send + TokioAsyncRead + Unpin,
    Code: Send,
{
    type Output = String;

    async fn handle(
        _context: &Context,
        _tag: PhantomData<Code>,
        mut input: Input,
    ) -> Result<String, Context::Error> {
        let mut output = String::new();

        input
            .read_to_string(&mut output)
            .await
            .map_err(Context::raise_error)?;

        Ok(output)
    }
}

#[cgp_new_provider]
impl<Context, Code, Input> Handler<Context, Code, Input> for ConvertBytesToStream
where
    Context: CanRaiseAsyncError<std::io::Error>,
    Input: Send + AsRef<[u8]> + Unpin + 'static,
    Code: Send,
{
    type Output = TokioAsyncReadStream<Compat<Cursor<Input>>>;

    async fn handle(
        _context: &Context,
        _tag: PhantomData<Code>,
        input: Input,
    ) -> Result<TokioAsyncReadStream<Compat<Cursor<Input>>>, Context::Error> {
        Ok(Cursor::new(input).compat().into())
    }
}

#[cgp_new_provider]
impl<Context, Code, Input> Handler<Context, Code, Input> for FuturesToTokioStream
where
    Context: HasAsyncErrorType,
    Input: Send + FuturesAsyncRead + Unpin + 'static,
    Code: Send,
{
    type Output = TokioAsyncReadStream<Compat<Input>>;

    async fn handle(
        _context: &Context,
        _tag: PhantomData<Code>,
        input: Input,
    ) -> Result<TokioAsyncReadStream<Compat<Input>>, Context::Error> {
        Ok(input.compat().into())
    }
}

#[cgp_new_provider]
impl<Context, Code, Input> Handler<Context, Code, Input> for TokioToFuturesStream
where
    Context: HasAsyncErrorType,
    Input: Send + TokioAsyncRead + Unpin + 'static,
    Code: Send,
{
    type Output = FuturesAsyncReadStream<Compat<Input>>;

    async fn handle(
        _context: &Context,
        _tag: PhantomData<Code>,
        input: Input,
    ) -> Result<FuturesAsyncReadStream<Compat<Input>>, Context::Error> {
        Ok(input.compat().into())
    }
}

#[cgp_new_provider]
impl<Context, Code, Input> Handler<Context, Code, Input> for WrapTokioAsyncReadStream
where
    Context: HasAsyncErrorType,
    Input: Send + TokioAsyncRead + Unpin + 'static,
    Code: Send,
{
    type Output = TokioAsyncReadStream<Input>;

    async fn handle(
        _context: &Context,
        _tag: PhantomData<Code>,
        input: Input,
    ) -> Result<TokioAsyncReadStream<Input>, Context::Error> {
        Ok(input.into())
    }
}

#[cgp_new_provider]
impl<Context, Code, Input> Handler<Context, Code, Input> for WrapFuturesAsyncReadStream
where
    Context: HasAsyncErrorType,
    Input: Send + FuturesAsyncRead + Unpin + 'static,
    Code: Send,
{
    type Output = FuturesAsyncReadStream<Input>;

    async fn handle(
        _context: &Context,
        _tag: PhantomData<Code>,
        input: Input,
    ) -> Result<FuturesAsyncReadStream<Input>, Context::Error> {
        Ok(input.into())
    }
}