Skip to main content

cognis_graph/
stream_mode.rs

1//! Granular streaming modes for `CompiledGraph`.
2//!
3//! Compared to the default [`cognis_core::Runnable::stream_events`] (which
4//! emits the full structured `Event` taxonomy), [`StreamMode`] gives callers
5//! a coarser, name-keyed control over what they observe — closer to
6//! Python LangGraph's `stream(...)` modes:
7//!
8//! | Mode          | What's emitted                                                  |
9//! |---------------|-----------------------------------------------------------------|
10//! | `Values`      | Whole state after every superstep (requires `S: Serialize`).    |
11//! | `Updates`     | `(node_name, update_json)` per node end (requires `Update: Serialize`). |
12//! | `Messages`    | Per-token / per-tool-call deltas (forwarded `OnLlmToken` etc.). |
13//! | `Tasks`       | `OnNodeStart` only — task scheduling.                           |
14//! | `Checkpoints` | One emit per persisted checkpoint.                              |
15//! | `Debug`       | Everything: every `Event` variant — equivalent to `stream_events`. |
16//! | `Custom`      | Only `Event::Custom` payloads written by nodes via `NodeCtx::write`. |
17//!
18//! Multiple modes can be requested; events are kept if they match any mode.
19
20use cognis_core::Event;
21
22/// Selectable stream-output modes.
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
24pub enum StreamMode {
25    /// Whole state at every superstep boundary.
26    Values,
27    /// Per-node delta after each node finishes.
28    Updates,
29    /// LLM token / tool-call deltas.
30    Messages,
31    /// Node-start signals (scheduling).
32    Tasks,
33    /// Each persisted checkpoint.
34    Checkpoints,
35    /// All events (no filtering).
36    Debug,
37    /// Only `Event::Custom` payloads.
38    Custom,
39}
40
41impl StreamMode {
42    /// Does this mode want this event?
43    pub fn matches(self, event: &Event) -> bool {
44        match self {
45            StreamMode::Debug => true,
46            StreamMode::Values => matches!(event, Event::OnEnd { .. }),
47            StreamMode::Updates => matches!(event, Event::OnNodeEnd { .. }),
48            StreamMode::Tasks => matches!(event, Event::OnNodeStart { .. }),
49            StreamMode::Messages => matches!(
50                event,
51                Event::OnLlmToken { .. } | Event::OnToolStart { .. } | Event::OnToolEnd { .. }
52            ),
53            StreamMode::Checkpoints => matches!(event, Event::OnCheckpoint { .. }),
54            StreamMode::Custom => matches!(event, Event::Custom { .. }),
55        }
56    }
57}
58
59/// A set of selected modes. Keeps an event if any mode matches.
60#[derive(Debug, Clone, Default)]
61pub struct StreamModes(Vec<StreamMode>);
62
63impl StreamModes {
64    /// Empty selector — emits nothing.
65    pub fn none() -> Self {
66        Self(Vec::new())
67    }
68
69    /// Select `Debug` (everything).
70    pub fn debug() -> Self {
71        Self(vec![StreamMode::Debug])
72    }
73
74    /// Select one mode.
75    pub fn only(mode: StreamMode) -> Self {
76        Self(vec![mode])
77    }
78
79    /// Build from an explicit list.
80    pub fn from_modes<I: IntoIterator<Item = StreamMode>>(modes: I) -> Self {
81        Self(modes.into_iter().collect())
82    }
83
84    /// Add another mode.
85    pub fn with(mut self, mode: StreamMode) -> Self {
86        if !self.0.contains(&mode) {
87            self.0.push(mode);
88        }
89        self
90    }
91
92    /// True if `event` matches any selected mode.
93    pub fn matches(&self, event: &Event) -> bool {
94        self.0.iter().any(|m| m.matches(event))
95    }
96
97    /// All selected modes.
98    pub fn modes(&self) -> &[StreamMode] {
99        &self.0
100    }
101}
102
103#[cfg(test)]
104mod tests {
105    use super::*;
106    use uuid::Uuid;
107
108    fn ev_node_start() -> Event {
109        Event::OnNodeStart {
110            node: "x".into(),
111            step: 0,
112            run_id: Uuid::nil(),
113        }
114    }
115    fn ev_node_end() -> Event {
116        Event::OnNodeEnd {
117            node: "x".into(),
118            step: 0,
119            output: serde_json::Value::Null,
120            run_id: Uuid::nil(),
121        }
122    }
123    fn ev_on_end() -> Event {
124        Event::OnEnd {
125            runnable: "graph".into(),
126            run_id: Uuid::nil(),
127            output: serde_json::Value::Null,
128        }
129    }
130    fn ev_token() -> Event {
131        Event::OnLlmToken {
132            token: "t".into(),
133            run_id: Uuid::nil(),
134        }
135    }
136
137    #[test]
138    fn debug_matches_everything() {
139        assert!(StreamMode::Debug.matches(&ev_node_start()));
140        assert!(StreamMode::Debug.matches(&ev_node_end()));
141        assert!(StreamMode::Debug.matches(&ev_token()));
142    }
143
144    #[test]
145    fn tasks_matches_only_starts() {
146        assert!(StreamMode::Tasks.matches(&ev_node_start()));
147        assert!(!StreamMode::Tasks.matches(&ev_node_end()));
148    }
149
150    #[test]
151    fn updates_matches_only_node_end() {
152        assert!(StreamMode::Updates.matches(&ev_node_end()));
153        assert!(!StreamMode::Updates.matches(&ev_node_start()));
154    }
155
156    #[test]
157    fn messages_matches_only_llm_or_tool() {
158        assert!(StreamMode::Messages.matches(&ev_token()));
159        assert!(!StreamMode::Messages.matches(&ev_node_end()));
160    }
161
162    #[test]
163    fn values_matches_only_graph_on_end() {
164        assert!(StreamMode::Values.matches(&ev_on_end()));
165        assert!(!StreamMode::Values.matches(&ev_node_end()));
166    }
167
168    #[test]
169    fn modes_set_unions_filters() {
170        let modes = StreamModes::only(StreamMode::Tasks).with(StreamMode::Updates);
171        assert!(modes.matches(&ev_node_start()));
172        assert!(modes.matches(&ev_node_end()));
173        assert!(!modes.matches(&ev_token()));
174    }
175}