1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
//! Higher-level streaming projections for the harness.
//!
//! In the recursive architecture this is the typed lens an observer uses to
//! watch work unfold across a nested run tree: state snapshots, diffs, model
//! deltas, debug traces, and interrupts from a run — and, transitively, from
//! the sub-agents and sub-graphs it spawns — are projected as filtered
//! [`StreamChunk`]s so a parent (or a REPL/CodeAct loop driving the run) can
//! consume only the categories it cares about.
//!
//! This module provides:
//!
//! - [`StreamMode`] — an enum selecting which chunk categories a consumer
//! wants to receive.
//! - [`StreamChunk`] — a typed union of all chunk categories (state snapshots,
//! diffs, model deltas, debug output, interrupts, and custom extensions).
//! - [`StreamSink`] — a synchronous, mode-filtered buffer for [`StreamChunk`]s.
//! - [`stream`] — a convenience helper that filters a slice of chunks by a
//! set of modes and returns the matching subset.
//!
//! The stream module is **independent** of `crate::harness::events`: it
//! provides a higher-level projection API without importing the event bus.
//! Integration between event delivery and stream chunks is the responsibility
//! of the harness runtime.
pub use *;
use HashSet;
// ---------------------------------------------------------------------------
// StreamSink impls
// ---------------------------------------------------------------------------
// ---------------------------------------------------------------------------
// stream() helper
// ---------------------------------------------------------------------------
/// Filters a collection of [`StreamChunk`]s to those matching a set of
/// [`StreamMode`]s and returns the matching chunks in input order.
///
/// This is a synchronous, allocation-based helper for contexts where the full
/// chunk list is already collected (for example in tests or post-processing).
/// It does not require an async runtime.
///
/// # Example
///
/// ```
/// use tinyagents::harness::stream::{stream, StreamChunk, StreamMode};
/// use tinyagents::harness::message::MessageDelta;
///
/// let chunks = vec![
/// StreamChunk::Message(MessageDelta { text: "hi".into(), tool_call: None }),
/// StreamChunk::Debug("trace".into()),
/// StreamChunk::Updates(serde_json::json!({"key": "value"})),
/// ];
///
/// let messages = stream(&chunks, &[StreamMode::Messages]);
/// assert_eq!(messages.len(), 1);
///
/// let all = stream(&chunks, &[StreamMode::Messages, StreamMode::Debug, StreamMode::Updates]);
/// assert_eq!(all.len(), 3);
/// ```