use std::collections::HashMap;
use std::pin::Pin;
use futures::stream::Stream;
use serde_json::Value;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
pub trait Runnable<I: Send + 'static, O: Send + 'static>: Send + Sync {
fn invoke(&self, input: I) -> Pin<Box<dyn std::future::Future<Output = Result<O, anyhow::Error>> + Send>>;
fn invoke_with_config(
&self,
input: I,
_config: Option<HashMap<String, Value>>
) -> Pin<Box<dyn std::future::Future<Output = Result<O, anyhow::Error>> + Send>> {
self.invoke(input)
}
fn batch(&self, inputs: Vec<I>) -> Pin<Box<dyn std::future::Future<Output = Vec<Result<O, anyhow::Error>>> + Send>> {
let self_clone = self.clone_to_owned();
Box::pin(async move {
futures::future::join_all(inputs.into_iter().map(|input| {
let self_clone_inner = self_clone.clone_to_owned();
async move {
self_clone_inner.invoke(input).await
}
})).await
})
}
fn batch_with_config(
&self,
inputs: Vec<I>,
_config: Option<HashMap<String, Value>>
) -> Pin<Box<dyn std::future::Future<Output = Vec<Result<O, anyhow::Error>>> + Send>> {
self.batch(inputs)
}
fn stream(&self, input: I) -> Box<dyn Stream<Item = Result<O, anyhow::Error>> + Send> {
let self_clone = self.clone_to_owned();
let (tx, rx) = tokio::sync::mpsc::channel::<Result<O, anyhow::Error>>(1);
tokio::spawn(async move {
let result = self_clone.invoke(input).await;
let _ = tx.send(result).await;
});
Box::new(tokio_stream::wrappers::ReceiverStream::new(rx))
}
fn astream(
&self,
_input: I
) -> Pin<Box<dyn std::future::Future<Output = Box<dyn Stream<Item = Result<O, anyhow::Error>> + Send>> + Send>> {
let _self_clone = self.clone_to_owned();
Box::pin(async move {
let (_tx, rx) = mpsc::channel(10);
let stream: Box<dyn Stream<Item = Result<O, anyhow::Error>> + Send> = Box::new(ReceiverStream::new(rx));
stream
})
}
fn clone_to_owned(&self) -> Box<dyn Runnable<I, O> + Send + Sync>;
}
pub trait RunnableExt<I: Send + 'static, O: Send + 'static> {
fn pipe<NextO: Send + 'static>(
self: Box<Self>,
next: impl Runnable<O, NextO> + Send + Sync + 'static
) -> impl Runnable<I, NextO> + Send + Sync
where
Self: Sized + 'static + Send + Sync;
}
impl<T: Runnable<I, O> + ?Sized, I: Send + 'static, O: Send + 'static> RunnableExt<I, O> for T {
fn pipe<NextO: Send + 'static>(
self: Box<Self>,
next: impl Runnable<O, NextO> + Send + Sync + 'static
) -> impl Runnable<I, NextO> + Send + Sync
where
Self: Sized + 'static + Send + Sync,
{
pipe(*self, next)
}
}
pub fn pipe<I: Send + 'static, O1: Send + 'static, O2: Send + 'static>(
first: impl Runnable<I, O1> + Send + Sync + 'static,
second: impl Runnable<O1, O2> + Send + Sync + 'static
) -> Box<dyn Runnable<I, O2> + Send + Sync> {
struct PipeImpl<I: Send + 'static, O1: Send + 'static, O2: Send + 'static> {
first: Box<dyn Runnable<I, O1> + Send + Sync>,
second: Box<dyn Runnable<O1, O2> + Send + Sync>,
}
impl<I: Send + 'static, O1: Send + 'static, O2: Send + 'static> Runnable<I, O2> for PipeImpl<I, O1, O2> {
fn invoke(&self, input: I) -> Pin<Box<dyn std::future::Future<Output = Result<O2, anyhow::Error>> + Send>> {
let first_clone = self.first.clone_to_owned();
let second_clone = self.second.clone_to_owned();
Box::pin(async move {
let intermediate = first_clone.invoke(input).await?;
second_clone.invoke(intermediate).await
})
}
fn clone_to_owned(&self) -> Box<dyn Runnable<I, O2> + Send + Sync> {
Box::new(PipeImpl {
first: self.first.clone_to_owned(),
second: self.second.clone_to_owned(),
})
}
}
Box::new(PipeImpl {
first: Box::new(first),
second: Box::new(second),
})
}
pub struct RunnableSequence<I, O> {
inner: Box<dyn Runnable<I, O> + Send + Sync>,
}
impl<I: Send + 'static, O: Send + 'static> RunnableSequence<I, O> {
pub fn new(runnable: impl Runnable<I, O> + Send + Sync + 'static) -> Self {
Self {
inner: Box::new(runnable),
}
}
}
impl<I: 'static + Send, O: 'static + Send> Runnable<I, O> for RunnableSequence<I, O> {
fn invoke(&self, input: I) -> Pin<Box<dyn std::future::Future<Output = Result<O, anyhow::Error>> + Send>> {
let inner = self.inner.clone_to_owned();
inner.invoke(input)
}
fn clone_to_owned(&self) -> Box<dyn Runnable<I, O> + Send + Sync> {
Box::new(RunnableSequence {
inner: self.inner.clone_to_owned(),
})
}
}
impl<I: Send + 'static, O: Send + 'static> Runnable<I, O> for Box<dyn Runnable<I, O> + Send + Sync> {
fn invoke(&self, input: I) -> Pin<Box<dyn std::future::Future<Output = Result<O, anyhow::Error>> + Send>> {
let self_clone = self.clone_to_owned();
Box::pin(async move {
self_clone.invoke(input).await
})
}
fn clone_to_owned(&self) -> Box<dyn Runnable<I, O> + Send + Sync> {
(**self).clone_to_owned()
}
}