entelix_runnable/stream.rs
1//! Streaming surface for `Runnable`.
2//!
3//! Adds a 5-mode streaming protocol on top of the composition contract,
4//! mirroring `LangGraph`'s `stream_mode` semantics. Every `Runnable` exposes
5//! a `stream(input, mode, ctx)` method via the trait's default
6//! implementation; specialized impls (`CompiledGraph<S>`,
7//! `ChatModel<C, T>`) override to emit per-step chunks.
8//!
9//! - [`StreamMode::Values`] — full output snapshot after each step.
10//! - [`StreamMode::Updates`] — `(node_name, output)` per step.
11//! - [`StreamMode::Messages`] — provider-level token deltas
12//! (re-uses `entelix_core::stream::StreamDelta`).
13//! - [`StreamMode::Debug`] — node lifecycle markers (start/end).
14//! - [`StreamMode::Events`] — runtime events (started/finished).
15//!
16//! The default trait method materializes a single-shot stream by calling
17//! `invoke` and emitting one chunk shaped per the requested mode. Graph
18//! and model implementors emit multiple chunks as work progresses.
19
20use std::pin::Pin;
21
22use entelix_core::stream::StreamDelta;
23use futures::Stream;
24
25/// Boxed `Stream` alias used by every `stream()` return type.
26pub type BoxStream<'a, T> = Pin<Box<dyn Stream<Item = T> + Send + 'a>>;
27
28/// Which stream shape the caller wants.
29///
30/// Matches `LangGraph`'s `stream_mode`. `Values` and `Updates` are the most
31/// common; `Messages` is for token-level UX; `Debug` and `Events` are for
32/// observability.
33#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
34#[non_exhaustive]
35pub enum StreamMode {
36 /// Emit the full output state after each step.
37 Values,
38 /// Emit only what changed at each step, tagged with the node name.
39 Updates,
40 /// Emit token-level deltas from any underlying chat model.
41 Messages,
42 /// Emit lifecycle markers (node start / node end / final).
43 Debug,
44 /// Emit runtime events (runnable started / runnable finished).
45 Events,
46}
47
48/// One chunk of a streaming `Runnable` invocation.
49///
50/// Generic over the runnable's output type `O`. Chunks not relevant to a
51/// chosen mode are simply not emitted; the variant the caller observes is
52/// determined by the requested [`StreamMode`].
53#[derive(Clone, Debug)]
54#[non_exhaustive]
55pub enum StreamChunk<O> {
56 /// Full snapshot — emitted in [`StreamMode::Values`].
57 Value(O),
58 /// Per-node update — emitted in [`StreamMode::Updates`].
59 Update {
60 /// Identifier of the node (or runnable) that produced `value`.
61 node: String,
62 /// State (or output) the node produced this step.
63 value: O,
64 },
65 /// Token-level delta — emitted in [`StreamMode::Messages`].
66 Message(StreamDelta),
67 /// Lifecycle marker — emitted in [`StreamMode::Debug`].
68 Debug(DebugEvent),
69 /// Runtime event — emitted in [`StreamMode::Events`].
70 Event(RunnableEvent),
71}
72
73/// Lifecycle marker for [`StreamMode::Debug`].
74#[derive(Clone, Debug, PartialEq, Eq)]
75#[non_exhaustive]
76pub enum DebugEvent {
77 /// Step `step` began executing node `node`.
78 NodeStart {
79 /// Name of the node that is about to run.
80 node: String,
81 /// 1-based step counter within this invocation.
82 step: usize,
83 },
84 /// Step `step` finished executing node `node`.
85 NodeEnd {
86 /// Name of the node that just finished.
87 node: String,
88 /// 1-based step counter within this invocation.
89 step: usize,
90 },
91 /// Graph reached a terminal state.
92 Final,
93}
94
95/// Runtime event for [`StreamMode::Events`].
96#[derive(Clone, Debug, PartialEq, Eq)]
97#[non_exhaustive]
98pub enum RunnableEvent {
99 /// Runnable named `name` started executing.
100 Started {
101 /// Identifier of the runnable.
102 name: String,
103 },
104 /// Runnable named `name` finished. `ok` is true when invocation
105 /// returned successfully.
106 Finished {
107 /// Identifier of the runnable.
108 name: String,
109 /// Whether the invocation succeeded.
110 ok: bool,
111 },
112}
113
114impl<O> StreamChunk<O> {
115 /// Borrow the inner `O` if this chunk carries one (`Value` or
116 /// `Update`); otherwise `None`.
117 pub const fn output(&self) -> Option<&O> {
118 match self {
119 Self::Value(v) | Self::Update { value: v, .. } => Some(v),
120 _ => None,
121 }
122 }
123
124 /// Consume the chunk, returning the inner `O` for the carrier
125 /// variants.
126 pub fn into_output(self) -> Option<O> {
127 match self {
128 Self::Value(v) | Self::Update { value: v, .. } => Some(v),
129 _ => None,
130 }
131 }
132}