1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
//! Type definitions for the harness streaming surface.
//!
//! All structs, enums, and traits in this module form the public API of
//! `crate::harness::stream`. Implementations, free functions, and tests live
//! in the sibling `mod.rs` and `test.rs` files.
//!
//! The stream module is intentionally independent of `crate::harness::events`
//! so it can be used without the full observability stack.
use HashSet;
use ;
use crateMessageDelta;
// ---------------------------------------------------------------------------
// StreamMode
// ---------------------------------------------------------------------------
/// Selects which categories of [`StreamChunk`]s a consumer wants to receive.
///
/// Consumers subscribe to one or more modes; a [`StreamSink`] only buffers
/// chunks that match the active mode set. Using a narrow mode set reduces
/// allocation and processing in tight loops.
// ---------------------------------------------------------------------------
// StreamChunk
// ---------------------------------------------------------------------------
/// A single item produced by a streaming harness or graph run.
///
/// Each variant corresponds to a [`StreamMode`]: a chunk is buffered by a
/// [`StreamSink`] only when its matching mode is active.
///
/// All variants derive `Clone` so chunks can be fanned out to multiple
/// consumers.
// ---------------------------------------------------------------------------
// StreamSink
// ---------------------------------------------------------------------------
/// An in-process buffer for [`StreamChunk`]s filtered by an active set of
/// [`StreamMode`]s.
///
/// Producers call [`StreamSink::push`] to submit chunks; the sink silently
/// discards chunks whose mode is not in the active set. Consumers call
/// [`StreamSink::drain`] to retrieve and clear all buffered chunks.
///
/// `StreamSink` is deliberately synchronous and single-threaded. Wrap it in
/// `Arc<Mutex<…>>` when sharing across threads.
///
/// # Example
///
/// ```
/// use tinyagents::harness::stream::{StreamChunk, StreamMode, StreamSink};
/// use tinyagents::harness::message::MessageDelta;
///
/// let sink = StreamSink::new(vec![StreamMode::Messages]);
/// sink.push(StreamChunk::Message(MessageDelta { text: "hello".into(), tool_call: None }));
/// sink.push(StreamChunk::Debug("ignored".into()));
///
/// let chunks = sink.drain();
/// assert_eq!(chunks.len(), 1);
/// ```