1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
//! Shared SSE (Server-Sent Events) parsing utilities.
//!
//! Factored out from the Anthropic provider so all providers can reuse
//! the same streaming infrastructure.
/*
ARCHITECTURE: sse.rs — the shared HTTP streaming layer
Server-Sent Events (SSE) is the wire protocol used by all major LLM providers
for streaming. It's an HTTP response with Content-Type: text/event-stream where
the server pushes newline-delimited events:
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}
event: message_stop
data: {"type":"message_stop"}
`drive_sse()` handles the generic SSE layer (HTTP chunking, event framing,
reconnect, cancellation) and forwards parsed `SseEvent` structs to a channel.
Each provider then reads from that channel and interprets the `data` JSON
according to its own format (Anthropic vs OpenAI vs Google differ in event names
and JSON shapes).
This separation of concerns:
- `sse.rs` → knows about SSE wire format (events, data, open, close)
- `anthropic.rs` → knows about Anthropic's specific JSON event schema
- `openai_compat.rs` → knows about OpenAI's specific JSON event schema
- ...
RUST QUIRK: `reqwest_eventsource` crate — async SSE over HTTP
`reqwest` is Rust's most popular async HTTP client (like Python's `httpx`).
`reqwest_eventsource` extends it with SSE stream handling: it reconnects on
dropped connections, handles SSE framing (multi-line fields, comment lines, etc.),
and presents a clean `Stream` of `Event` items.
`EventSource` implements `futures::Stream<Item = Result<Event, ...>>` — a lazy,
async sequence of values. Think of it as an async generator in Python.
*/
use StreamExt;
use ;
use mpsc;
use CancellationToken;
use debug;
/// A parsed SSE event with event type and data.
/*
RUST QUIRK: `#[derive(Debug, Clone)]` on a simple named-field struct
Both fields are `String` (heap-allocated, owned). `Clone` deep-copies both strings.
`Debug` enables `{:?}` formatting for logging/debugging.
No `Copy` here because `String` is not `Copy` (it owns heap memory).
*/
/// Drives an EventSource, sending parsed events through a channel.
/// Returns when the stream ends, errors, or is cancelled.
///
/// The caller receives `SseEvent`s and can parse them according to
/// provider-specific formats.
/*
ARCHITECTURE: drive_sse — the event pump
`drive_sse()` is a "pump" that pulls events from the HTTP response stream and
pushes them into an mpsc channel. It runs as a concurrent task alongside the
provider's event-processing logic.
Why async? The HTTP response arrives incrementally. If we waited synchronously
for the entire response, we'd have no streaming at all. `async` lets the tokio
runtime interleave the HTTP reads with other work.
Return value: `Result<(), String>`
`Ok(())` — stream ended cleanly (HTTP connection closed by server)
`Err(s)` — either cancelled ("cancelled") or an HTTP/network error
RUST QUIRK: `tokio::select!` — race multiple async operations
`tokio::select!` runs multiple futures concurrently and returns when the FIRST
one completes. All other futures are DROPPED (cancelled).
Here we race two operations:
1. `cancel.cancelled()` — waits until the cancellation token is triggered
2. `es.next()` — waits for the next SSE event from the HTTP stream
If cancellation fires first: we close the SSE connection and return Err("cancelled").
If a new event arrives first: we process it and loop.
Python analogy:
done, pending = asyncio.wait(
{cancel_task, es_next_task},
return_when=asyncio.FIRST_COMPLETED
)
The `_` in `_ = cancel.cancelled()` means "I don't care about the return value
of this future — I only care that it completed."
RUST QUIRK: `loop { ... }` with `return` — Rust's infinite loop + early exits
`loop` creates an infinite loop (no condition). The only exits are:
- `return Ok(())` when stream ends
- `return Err(...)` on cancellation or error
- `break` (not used here)
Unlike `while true` in Python, `loop` in Rust communicates intent: "this loop
runs until an explicit exit, not until some condition becomes false."
RUST QUIRK: `es.next()` on a Stream — `futures::StreamExt::next()`
`EventSource` implements `futures::Stream`. The `StreamExt` trait (from the
`futures` crate) extends Stream with ergonomic methods:
`.next()` → future that resolves to `Option<Item>`:
`Some(Ok(event))` — got an event
`Some(Err(e))` — got an error
`None` — stream ended
RUST QUIRK: `tx.send(...).is_err()` — checking if the receiver is gone
`mpsc::UnboundedSender::send()` returns `Err(value)` if the receiver has been
dropped. When the receiver (the provider's event-processing task) is gone,
there's no point continuing — we close the SSE connection and return Ok.
`.is_err()` returns `true` if the Result is an `Err` variant.
Python analogy: catching a BrokenPipeError when writing to a closed queue.
RUST QUIRK: `e.to_string()` on an error type
Most error types implement `Display` (the `{}` formatter). `.to_string()`
calls `Display` to get a human-readable description. Equivalent to `str(e)` in Python.
Here we convert the `reqwest_eventsource` error to a plain `String` for our
simpler `Result<(), String>` return type.
*/
/*
DESIGN: Why `drive_sse` takes `es` by value AND uses a channel `tx`
Same dual-audience pattern as `execute_single_tool`:
`es` = HTTP SOURCE — drive_sse owns the connection; it is the only place that calls
`.close()` and `.next()`. Passing by value (not `&mut`) means this function
is exclusively responsible for the connection's lifecycle.
`tx` = OBSERVER CHANNEL — provider tasks await on the rx end to process events.
Separation keeps HTTP pumping out of provider-specific JSON parsing logic.
`cancel` = ABORT — if triggered mid-stream, `.close()` shuts down the HTTP connection
and the function returns so the caller can clean up.
Why run drive_sse as a concurrent task?
Providers spawn drive_sse as a tokio task and await the rx channel separately.
This separates "reading bytes off the wire" from "interpreting the JSON events".
If interpretation is slow, the HTTP buffer drains independently.
*/
pub async