Skip to main content

entelix_runnable/
stream.rs

1//! Streaming surface for `Runnable`.
2//!
3//! Adds a 5-mode streaming protocol on top of the composition contract,
4//! mirroring `LangGraph`'s `stream_mode` semantics. Every `Runnable` exposes
5//! a `stream(input, mode, ctx)` method via the trait's default
6//! implementation; specialized impls (`CompiledGraph<S>`,
7//! `ChatModel<C, T>`) override to emit per-step chunks.
8//!
9//! - [`StreamMode::Values`]   — full output snapshot after each step.
10//! - [`StreamMode::Updates`]  — `(node_name, output)` per step.
11//! - [`StreamMode::Messages`] — provider-level token deltas
12//!   (re-uses `entelix_core::stream::StreamDelta`).
13//! - [`StreamMode::Debug`]    — node lifecycle markers (start/end).
14//! - [`StreamMode::Events`]   — runtime events (started/finished).
15//!
16//! The default trait method materializes a single-shot stream by calling
17//! `invoke` and emitting one chunk shaped per the requested mode. Graph
18//! and model implementors emit multiple chunks as work progresses.
19
20use std::pin::Pin;
21
22use entelix_core::stream::StreamDelta;
23use futures::Stream;
24
25/// Boxed `Stream` alias used by every `stream()` return type.
26pub type BoxStream<'a, T> = Pin<Box<dyn Stream<Item = T> + Send + 'a>>;
27
28/// Which stream shape the caller wants.
29///
30/// Matches `LangGraph`'s `stream_mode`. `Values` and `Updates` are the most
31/// common; `Messages` is for token-level UX; `Debug` and `Events` are for
32/// observability.
33#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
34#[non_exhaustive]
35pub enum StreamMode {
36    /// Emit the full output state after each step.
37    Values,
38    /// Emit only what changed at each step, tagged with the node name.
39    Updates,
40    /// Emit token-level deltas from any underlying chat model.
41    Messages,
42    /// Emit lifecycle markers (node start / node end / final).
43    Debug,
44    /// Emit runtime events (runnable started / runnable finished).
45    Events,
46}
47
48/// One chunk of a streaming `Runnable` invocation.
49///
50/// Generic over the runnable's output type `O`. Chunks not relevant to a
51/// chosen mode are simply not emitted; the variant the caller observes is
52/// determined by the requested [`StreamMode`].
53#[derive(Clone, Debug)]
54#[non_exhaustive]
55pub enum StreamChunk<O> {
56    /// Full snapshot — emitted in [`StreamMode::Values`].
57    Value(O),
58    /// Per-node update — emitted in [`StreamMode::Updates`].
59    Update {
60        /// Identifier of the node (or runnable) that produced `value`.
61        node: String,
62        /// State (or output) the node produced this step.
63        value: O,
64    },
65    /// Token-level delta — emitted in [`StreamMode::Messages`].
66    Message(StreamDelta),
67    /// Lifecycle marker — emitted in [`StreamMode::Debug`].
68    Debug(DebugEvent),
69    /// Runtime event — emitted in [`StreamMode::Events`].
70    Event(RunnableEvent),
71}
72
73/// Lifecycle marker for [`StreamMode::Debug`].
74#[derive(Clone, Debug, PartialEq, Eq)]
75#[non_exhaustive]
76pub enum DebugEvent {
77    /// Step `step` began executing node `node`.
78    NodeStart {
79        /// Name of the node that is about to run.
80        node: String,
81        /// 1-based step counter within this invocation.
82        step: usize,
83    },
84    /// Step `step` finished executing node `node`.
85    NodeEnd {
86        /// Name of the node that just finished.
87        node: String,
88        /// 1-based step counter within this invocation.
89        step: usize,
90    },
91    /// Graph reached a terminal state.
92    Final,
93}
94
95/// Runtime event for [`StreamMode::Events`].
96#[derive(Clone, Debug, PartialEq, Eq)]
97#[non_exhaustive]
98pub enum RunnableEvent {
99    /// Runnable named `name` started executing.
100    Started {
101        /// Identifier of the runnable.
102        name: String,
103    },
104    /// Runnable named `name` finished. `ok` is true when invocation
105    /// returned successfully.
106    Finished {
107        /// Identifier of the runnable.
108        name: String,
109        /// Whether the invocation succeeded.
110        ok: bool,
111    },
112}
113
114impl<O> StreamChunk<O> {
115    /// Borrow the inner `O` if this chunk carries one (`Value` or
116    /// `Update`); otherwise `None`.
117    pub const fn output(&self) -> Option<&O> {
118        match self {
119            Self::Value(v) | Self::Update { value: v, .. } => Some(v),
120            _ => None,
121        }
122    }
123
124    /// Consume the chunk, returning the inner `O` for the carrier
125    /// variants.
126    pub fn into_output(self) -> Option<O> {
127        match self {
128            Self::Value(v) | Self::Update { value: v, .. } => Some(v),
129            _ => None,
130        }
131    }
132}