Skip to main content

cognis_graph/
stream_mode.rs

1//! Granular streaming modes for `CompiledGraph`.
2//!
3//! Compared to the default [`cognis_core::Runnable::stream_events`] (which
4//! emits the full structured `Event` taxonomy), [`StreamMode`] gives callers
5//! a coarser, name-keyed control over what they observe:
6//!
7//! | Mode          | What's emitted                                                  |
8//! |---------------|-----------------------------------------------------------------|
9//! | `Values`      | Whole state after every superstep (requires `S: Serialize`).    |
10//! | `Updates`     | `(node_name, update_json)` per node end (requires `Update: Serialize`). |
11//! | `Messages`    | Per-token / per-tool-call deltas (forwarded `OnLlmToken` etc.). |
12//! | `Tasks`       | `OnNodeStart` only — task scheduling.                           |
13//! | `Checkpoints` | One emit per persisted checkpoint.                              |
14//! | `Debug`       | Everything: every `Event` variant — equivalent to `stream_events`. |
15//! | `Custom`      | Only `Event::Custom` payloads written by nodes via `NodeCtx::write`. |
16//!
17//! Multiple modes can be requested; events are kept if they match any mode.
18
19use cognis_core::Event;
20
21/// Selectable stream-output modes.
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
23pub enum StreamMode {
24    /// Whole state at every superstep boundary.
25    Values,
26    /// Per-node delta after each node finishes.
27    Updates,
28    /// LLM token / tool-call deltas.
29    Messages,
30    /// Node-start signals (scheduling).
31    Tasks,
32    /// Each persisted checkpoint.
33    Checkpoints,
34    /// All events (no filtering).
35    Debug,
36    /// Only `Event::Custom` payloads.
37    Custom,
38}
39
40impl StreamMode {
41    /// Does this mode want this event?
42    pub fn matches(self, event: &Event) -> bool {
43        match self {
44            StreamMode::Debug => true,
45            StreamMode::Values => matches!(event, Event::OnEnd { .. }),
46            StreamMode::Updates => matches!(event, Event::OnNodeEnd { .. }),
47            StreamMode::Tasks => matches!(event, Event::OnNodeStart { .. }),
48            StreamMode::Messages => matches!(
49                event,
50                Event::OnLlmToken { .. } | Event::OnToolStart { .. } | Event::OnToolEnd { .. }
51            ),
52            StreamMode::Checkpoints => matches!(event, Event::OnCheckpoint { .. }),
53            StreamMode::Custom => matches!(event, Event::Custom { .. }),
54        }
55    }
56}
57
58/// A set of selected modes. Keeps an event if any mode matches.
59#[derive(Debug, Clone, Default)]
60pub struct StreamModes(Vec<StreamMode>);
61
62impl StreamModes {
63    /// Empty selector — emits nothing.
64    pub fn none() -> Self {
65        Self(Vec::new())
66    }
67
68    /// Select `Debug` (everything).
69    pub fn debug() -> Self {
70        Self(vec![StreamMode::Debug])
71    }
72
73    /// Select one mode.
74    pub fn only(mode: StreamMode) -> Self {
75        Self(vec![mode])
76    }
77
78    /// Build from an explicit list.
79    pub fn from_modes<I: IntoIterator<Item = StreamMode>>(modes: I) -> Self {
80        Self(modes.into_iter().collect())
81    }
82
83    /// Add another mode.
84    pub fn with(mut self, mode: StreamMode) -> Self {
85        if !self.0.contains(&mode) {
86            self.0.push(mode);
87        }
88        self
89    }
90
91    /// True if `event` matches any selected mode.
92    pub fn matches(&self, event: &Event) -> bool {
93        self.0.iter().any(|m| m.matches(event))
94    }
95
96    /// All selected modes.
97    pub fn modes(&self) -> &[StreamMode] {
98        &self.0
99    }
100}
101
102#[cfg(test)]
103mod tests {
104    use super::*;
105    use uuid::Uuid;
106
107    fn ev_node_start() -> Event {
108        Event::OnNodeStart {
109            node: "x".into(),
110            step: 0,
111            run_id: Uuid::nil(),
112        }
113    }
114    fn ev_node_end() -> Event {
115        Event::OnNodeEnd {
116            node: "x".into(),
117            step: 0,
118            output: serde_json::Value::Null,
119            run_id: Uuid::nil(),
120        }
121    }
122    fn ev_on_end() -> Event {
123        Event::OnEnd {
124            runnable: "graph".into(),
125            run_id: Uuid::nil(),
126            output: serde_json::Value::Null,
127        }
128    }
129    fn ev_token() -> Event {
130        Event::OnLlmToken {
131            token: "t".into(),
132            run_id: Uuid::nil(),
133        }
134    }
135
136    #[test]
137    fn debug_matches_everything() {
138        assert!(StreamMode::Debug.matches(&ev_node_start()));
139        assert!(StreamMode::Debug.matches(&ev_node_end()));
140        assert!(StreamMode::Debug.matches(&ev_token()));
141    }
142
143    #[test]
144    fn tasks_matches_only_starts() {
145        assert!(StreamMode::Tasks.matches(&ev_node_start()));
146        assert!(!StreamMode::Tasks.matches(&ev_node_end()));
147    }
148
149    #[test]
150    fn updates_matches_only_node_end() {
151        assert!(StreamMode::Updates.matches(&ev_node_end()));
152        assert!(!StreamMode::Updates.matches(&ev_node_start()));
153    }
154
155    #[test]
156    fn messages_matches_only_llm_or_tool() {
157        assert!(StreamMode::Messages.matches(&ev_token()));
158        assert!(!StreamMode::Messages.matches(&ev_node_end()));
159    }
160
161    #[test]
162    fn values_matches_only_graph_on_end() {
163        assert!(StreamMode::Values.matches(&ev_on_end()));
164        assert!(!StreamMode::Values.matches(&ev_node_end()));
165    }
166
167    #[test]
168    fn modes_set_unions_filters() {
169        let modes = StreamModes::only(StreamMode::Tasks).with(StreamMode::Updates);
170        assert!(modes.matches(&ev_node_start()));
171        assert!(modes.matches(&ev_node_end()));
172        assert!(!modes.matches(&ev_token()));
173    }
174}