Skip to main content

entelix_runnable/
runnable.rs

1//! The `Runnable<I, O>` trait — entelix's composition contract (invariant 7).
2//!
3//! Anything composable in the SDK implements this trait: codecs, prompts,
4//! parsers, tools (via [`crate::ToolToRunnableAdapter`]), agents, compiled
5//! state graphs. `.pipe()` (see [`crate::RunnableExt`]) is the universal
6//! connector.
7
8use std::borrow::Cow;
9
10use entelix_core::{Error, ExecutionContext, Result};
11use futures::stream;
12
13use crate::stream::{BoxStream, DebugEvent, RunnableEvent, StreamChunk, StreamMode};
14
15/// One end of a composable computation.
16///
17/// Implementors describe what they consume (`I`) and produce (`O`);
18/// orchestration code combines them via [`crate::RunnableExt::pipe`] without
19/// caring about the concrete types in between. The trait surfaces three
20/// execution shapes: `invoke` (single-shot), `batch` (sequential or
21/// parallel sequence), and `stream` (5-mode incremental output, see
22/// [`StreamMode`]).
23#[async_trait::async_trait]
24pub trait Runnable<I, O>: Send + Sync
25where
26    I: Send + 'static,
27    O: Send + 'static,
28{
29    /// Single-shot invocation.
30    async fn invoke(&self, input: I, ctx: &ExecutionContext) -> Result<O>;
31
32    /// Batch invocation. The default runs `invoke` sequentially over the input
33    /// vector. Implementors that can parallelize (e.g. independent provider
34    /// calls) override this.
35    async fn batch(&self, inputs: Vec<I>, ctx: &ExecutionContext) -> Result<Vec<O>> {
36        let mut out = Vec::with_capacity(inputs.len());
37        for input in inputs {
38            if ctx.is_cancelled() {
39                return Err(Error::Cancelled);
40            }
41            out.push(self.invoke(input, ctx).await?);
42        }
43        Ok(out)
44    }
45
46    /// Streaming invocation, shaped by `mode` (see [`StreamMode`]).
47    ///
48    /// The default implementation falls back to `invoke` and yields one
49    /// chunk shaped per mode. Implementors that have intermediate work
50    /// to expose (compiled graphs over multiple node steps, chat models
51    /// receiving SSE deltas) override this method to emit multiple
52    /// chunks. Cancellation is handled per chunk via `ctx`.
53    async fn stream(
54        &self,
55        input: I,
56        mode: StreamMode,
57        ctx: &ExecutionContext,
58    ) -> Result<BoxStream<'_, Result<StreamChunk<O>>>> {
59        let name = self.name().into_owned();
60        let result = self.invoke(input, ctx).await?;
61        let chunks: Vec<Result<StreamChunk<O>>> = match mode {
62            StreamMode::Values | StreamMode::Messages => {
63                vec![Ok(StreamChunk::Value(result))]
64            }
65            StreamMode::Updates => vec![Ok(StreamChunk::Update {
66                node: name,
67                value: result,
68            })],
69            StreamMode::Debug => vec![
70                Ok(StreamChunk::Debug(DebugEvent::NodeStart {
71                    node: name.clone(),
72                    step: 1,
73                })),
74                Ok(StreamChunk::Value(result)),
75                Ok(StreamChunk::Debug(DebugEvent::NodeEnd {
76                    node: name,
77                    step: 1,
78                })),
79                Ok(StreamChunk::Debug(DebugEvent::Final)),
80            ],
81            StreamMode::Events => vec![
82                Ok(StreamChunk::Event(RunnableEvent::Started {
83                    name: name.clone(),
84                })),
85                Ok(StreamChunk::Value(result)),
86                Ok(StreamChunk::Event(RunnableEvent::Finished {
87                    name,
88                    ok: true,
89                })),
90            ],
91        };
92        Ok(Box::pin(stream::iter(chunks)))
93    }
94
95    /// Human-readable identifier used by tracing and debug output. Default is
96    /// the Rust type name — implementors may override with a domain label
97    /// (e.g. `"anthropic-messages"`, `"json-parser"`).
98    fn name(&self) -> Cow<'_, str> {
99        Cow::Borrowed(core::any::type_name::<Self>())
100    }
101}