entelix_runnable/runnable.rs
1//! The `Runnable<I, O>` trait — entelix's composition contract (invariant 7).
2//!
3//! Anything composable in the SDK implements this trait: codecs, prompts,
4//! parsers, tools (via [`crate::ToolToRunnableAdapter`]), agents, compiled
5//! state graphs. `.pipe()` (see [`crate::RunnableExt`]) is the universal
6//! connector.
7
8use std::borrow::Cow;
9
10use entelix_core::{Error, ExecutionContext, Result};
11use futures::stream;
12
13use crate::stream::{BoxStream, DebugEvent, RunnableEvent, StreamChunk, StreamMode};
14
15/// One end of a composable computation.
16///
17/// Implementors describe what they consume (`I`) and produce (`O`);
18/// orchestration code combines them via [`crate::RunnableExt::pipe`] without
19/// caring about the concrete types in between. The trait surfaces three
20/// execution shapes: `invoke` (single-shot), `batch` (sequential or
21/// parallel sequence), and `stream` (5-mode incremental output, see
22/// [`StreamMode`]).
23#[async_trait::async_trait]
24pub trait Runnable<I, O>: Send + Sync
25where
26 I: Send + 'static,
27 O: Send + 'static,
28{
29 /// Single-shot invocation.
30 async fn invoke(&self, input: I, ctx: &ExecutionContext) -> Result<O>;
31
32 /// Batch invocation. The default runs `invoke` sequentially over the input
33 /// vector. Implementors that can parallelize (e.g. independent provider
34 /// calls) override this.
35 async fn batch(&self, inputs: Vec<I>, ctx: &ExecutionContext) -> Result<Vec<O>> {
36 let mut out = Vec::with_capacity(inputs.len());
37 for input in inputs {
38 if ctx.is_cancelled() {
39 return Err(Error::Cancelled);
40 }
41 out.push(self.invoke(input, ctx).await?);
42 }
43 Ok(out)
44 }
45
46 /// Streaming invocation, shaped by `mode` (see [`StreamMode`]).
47 ///
48 /// The default implementation falls back to `invoke` and yields one
49 /// chunk shaped per mode. Implementors that have intermediate work
50 /// to expose (compiled graphs over multiple node steps, chat models
51 /// receiving SSE deltas) override this method to emit multiple
52 /// chunks. Cancellation is handled per chunk via `ctx`.
53 async fn stream(
54 &self,
55 input: I,
56 mode: StreamMode,
57 ctx: &ExecutionContext,
58 ) -> Result<BoxStream<'_, Result<StreamChunk<O>>>> {
59 let name = self.name().into_owned();
60 let result = self.invoke(input, ctx).await?;
61 let chunks: Vec<Result<StreamChunk<O>>> = match mode {
62 StreamMode::Values | StreamMode::Messages => {
63 vec![Ok(StreamChunk::Value(result))]
64 }
65 StreamMode::Updates => vec![Ok(StreamChunk::Update {
66 node: name,
67 value: result,
68 })],
69 StreamMode::Debug => vec![
70 Ok(StreamChunk::Debug(DebugEvent::NodeStart {
71 node: name.clone(),
72 step: 1,
73 })),
74 Ok(StreamChunk::Value(result)),
75 Ok(StreamChunk::Debug(DebugEvent::NodeEnd {
76 node: name,
77 step: 1,
78 })),
79 Ok(StreamChunk::Debug(DebugEvent::Final)),
80 ],
81 StreamMode::Events => vec![
82 Ok(StreamChunk::Event(RunnableEvent::Started {
83 name: name.clone(),
84 })),
85 Ok(StreamChunk::Value(result)),
86 Ok(StreamChunk::Event(RunnableEvent::Finished {
87 name,
88 ok: true,
89 })),
90 ],
91 };
92 Ok(Box::pin(stream::iter(chunks)))
93 }
94
95 /// Human-readable identifier used by tracing and debug output. Default is
96 /// the Rust type name — implementors may override with a domain label
97 /// (e.g. `"anthropic-messages"`, `"json-parser"`).
98 fn name(&self) -> Cow<'_, str> {
99 Cow::Borrowed(core::any::type_name::<Self>())
100 }
101}