use std::borrow::Cow;
use entelix_core::{Error, ExecutionContext, Result};
use futures::stream;
use crate::stream::{BoxStream, DebugEvent, RunnableEvent, StreamChunk, StreamMode};
#[async_trait::async_trait]
pub trait Runnable<I, O>: Send + Sync
where
I: Send + 'static,
O: Send + 'static,
{
async fn invoke(&self, input: I, ctx: &ExecutionContext) -> Result<O>;
async fn batch(&self, inputs: Vec<I>, ctx: &ExecutionContext) -> Result<Vec<O>> {
let mut out = Vec::with_capacity(inputs.len());
for input in inputs {
if ctx.is_cancelled() {
return Err(Error::Cancelled);
}
out.push(self.invoke(input, ctx).await?);
}
Ok(out)
}
async fn stream(
&self,
input: I,
mode: StreamMode,
ctx: &ExecutionContext,
) -> Result<BoxStream<'_, Result<StreamChunk<O>>>> {
let name = self.name().into_owned();
let result = self.invoke(input, ctx).await?;
let chunks: Vec<Result<StreamChunk<O>>> = match mode {
StreamMode::Values | StreamMode::Messages => {
vec![Ok(StreamChunk::Value(result))]
}
StreamMode::Updates => vec![Ok(StreamChunk::Update {
node: name,
value: result,
})],
StreamMode::Debug => vec![
Ok(StreamChunk::Debug(DebugEvent::NodeStart {
node: name.clone(),
step: 1,
})),
Ok(StreamChunk::Value(result)),
Ok(StreamChunk::Debug(DebugEvent::NodeEnd {
node: name,
step: 1,
})),
Ok(StreamChunk::Debug(DebugEvent::Final)),
],
StreamMode::Events => vec![
Ok(StreamChunk::Event(RunnableEvent::Started {
name: name.clone(),
})),
Ok(StreamChunk::Value(result)),
Ok(StreamChunk::Event(RunnableEvent::Finished {
name,
ok: true,
})),
],
};
Ok(Box::pin(stream::iter(chunks)))
}
fn name(&self) -> Cow<'_, str> {
Cow::Borrowed(core::any::type_name::<Self>())
}
}