phi_core/provider/sse.rs
1//! Shared SSE (Server-Sent Events) parsing utilities.
2//!
3//! Factored out from the Anthropic provider so all providers can reuse
4//! the same streaming infrastructure.
5/*
6ARCHITECTURE: sse.rs — the shared HTTP streaming layer
7
8Server-Sent Events (SSE) is the wire protocol used by all major LLM providers
9for streaming. It's an HTTP response with Content-Type: text/event-stream where
10the server pushes newline-delimited events:
11
12 event: content_block_delta
13 data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}
14
15 event: message_stop
16 data: {"type":"message_stop"}
17
18`drive_sse()` handles the generic SSE layer (HTTP chunking, event framing,
19reconnect, cancellation) and forwards parsed `SseEvent` structs to a channel.
20Each provider then reads from that channel and interprets the `data` JSON
21according to its own format (Anthropic vs OpenAI vs Google differ in event names
22and JSON shapes).
23
24This separation of concerns:
25 - `sse.rs` → knows about SSE wire format (events, data, open, close)
26 - `anthropic.rs` → knows about Anthropic's specific JSON event schema
27 - `openai_compat.rs` → knows about OpenAI's specific JSON event schema
28 - ...
29
30RUST QUIRK: `reqwest_eventsource` crate — async SSE over HTTP
31
32`reqwest` is Rust's most popular async HTTP client (like Python's `httpx`).
33`reqwest_eventsource` extends it with SSE stream handling: it reconnects on
34dropped connections, handles SSE framing (multi-line fields, comment lines, etc.),
35and presents a clean `Stream` of `Event` items.
36
37`EventSource` implements `futures::Stream<Item = Result<Event, ...>>` — a lazy,
38async sequence of values. Think of it as an async generator in Python.
39*/
40
41use futures::StreamExt;
42use reqwest_eventsource::{Event, EventSource};
43use tokio::sync::mpsc;
44use tokio_util::sync::CancellationToken;
45use tracing::debug;
46
47/// A parsed SSE event with event type and data.
48/*
49RUST QUIRK: `#[derive(Debug, Clone)]` on a simple named-field struct
50 Both fields are `String` (heap-allocated, owned). `Clone` deep-copies both strings.
51 `Debug` enables `{:?}` formatting for logging/debugging.
52 No `Copy` here because `String` is not `Copy` (it owns heap memory).
53*/
54#[derive(Debug, Clone)]
55pub struct SseEvent {
56 /// The SSE event type (e.g. "content_block_delta", "message_stop").
57 pub event: String,
58 /// The raw JSON data string for this event.
59 pub data: String,
60}
61
62/// Drives an EventSource, sending parsed events through a channel.
63/// Returns when the stream ends, errors, or is cancelled.
64///
65/// The caller receives `SseEvent`s and can parse them according to
66/// provider-specific formats.
67/*
68ARCHITECTURE: drive_sse — the event pump
69
70`drive_sse()` is a "pump" that pulls events from the HTTP response stream and
71pushes them into an mpsc channel. It runs as a concurrent task alongside the
72provider's event-processing logic.
73
74Why async? The HTTP response arrives incrementally. If we waited synchronously
75for the entire response, we'd have no streaming at all. `async` lets the tokio
76runtime interleave the HTTP reads with other work.
77
78Return value: `Result<(), String>`
79 `Ok(())` — stream ended cleanly (HTTP connection closed by server)
80 `Err(s)` — either cancelled ("cancelled") or an HTTP/network error
81
82RUST QUIRK: `tokio::select!` — race multiple async operations
83
84`tokio::select!` runs multiple futures concurrently and returns when the FIRST
85one completes. All other futures are DROPPED (cancelled).
86
87Here we race two operations:
88 1. `cancel.cancelled()` — waits until the cancellation token is triggered
89 2. `es.next()` — waits for the next SSE event from the HTTP stream
90
91If cancellation fires first: we close the SSE connection and return Err("cancelled").
92If a new event arrives first: we process it and loop.
93
94Python analogy:
95 done, pending = asyncio.wait(
96 {cancel_task, es_next_task},
97 return_when=asyncio.FIRST_COMPLETED
98 )
99
100The `_` in `_ = cancel.cancelled()` means "I don't care about the return value
101of this future — I only care that it completed."
102
103RUST QUIRK: `loop { ... }` with `return` — Rust's infinite loop + early exits
104
105`loop` creates an infinite loop (no condition). The only exits are:
106 - `return Ok(())` when stream ends
107 - `return Err(...)` on cancellation or error
108 - `break` (not used here)
109
110Unlike `while true` in Python, `loop` in Rust communicates intent: "this loop
111runs until an explicit exit, not until some condition becomes false."
112
113RUST QUIRK: `es.next()` on a Stream — `futures::StreamExt::next()`
114 `EventSource` implements `futures::Stream`. The `StreamExt` trait (from the
115 `futures` crate) extends Stream with ergonomic methods:
116 `.next()` → future that resolves to `Option<Item>`:
117 `Some(Ok(event))` — got an event
118 `Some(Err(e))` — got an error
119 `None` — stream ended
120
121RUST QUIRK: `tx.send(...).is_err()` — checking if the receiver is gone
122 `mpsc::UnboundedSender::send()` returns `Err(value)` if the receiver has been
123 dropped. When the receiver (the provider's event-processing task) is gone,
124 there's no point continuing — we close the SSE connection and return Ok.
125 `.is_err()` returns `true` if the Result is an `Err` variant.
126 Python analogy: catching a BrokenPipeError when writing to a closed queue.
127
128RUST QUIRK: `e.to_string()` on an error type
129 Most error types implement `Display` (the `{}` formatter). `.to_string()`
130 calls `Display` to get a human-readable description. Equivalent to `str(e)` in Python.
131 Here we convert the `reqwest_eventsource` error to a plain `String` for our
132 simpler `Result<(), String>` return type.
133*/
134/*
135DESIGN: Why `drive_sse` takes `es` by value AND uses a channel `tx`
136
137Same dual-audience pattern as `execute_single_tool`:
138 `es` = HTTP SOURCE — drive_sse owns the connection; it is the only place that calls
139 `.close()` and `.next()`. Passing by value (not `&mut`) means this function
140 is exclusively responsible for the connection's lifecycle.
141 `tx` = OBSERVER CHANNEL — provider tasks await on the rx end to process events.
142 Separation keeps HTTP pumping out of provider-specific JSON parsing logic.
143 `cancel` = ABORT — if triggered mid-stream, `.close()` shuts down the HTTP connection
144 and the function returns so the caller can clean up.
145
146Why run drive_sse as a concurrent task?
147 Providers spawn drive_sse as a tokio task and await the rx channel separately.
148 This separates "reading bytes off the wire" from "interpreting the JSON events".
149 If interpretation is slow, the HTTP buffer drains independently.
150*/
151pub async fn drive_sse(
152 mut es: EventSource, // HTTP SOURCE — owns the open SSE connection; .close() shuts it down
153 tx: mpsc::UnboundedSender<SseEvent>, // OBSERVER CHANNEL — forward parsed events to the provider's processing task
154 cancel: CancellationToken, // ABORT — if triggered, closes connection and returns Err("cancelled")
155) -> Result<(), String> {
156 loop {
157 tokio::select! {
158 // Branch 1: cancellation requested — close SSE and abort
159 _ = cancel.cancelled() => {
160 es.close(); // signal the HTTP connection to close
161 return Err("cancelled".into());
162 }
163 // Branch 2: next SSE event arrived (or stream ended/errored)
164 event = es.next() => {
165 match event {
166 // Stream ended cleanly (server closed the connection)
167 None => return Ok(()),
168 // Connection opened — just log it, no action needed
169 Some(Ok(Event::Open)) => {
170 debug!("SSE connection opened");
171 }
172 // A real event with data — forward to the provider's channel
173 Some(Ok(Event::Message(msg))) => {
174 if tx.send(SseEvent {
175 event: msg.event, // e.g. "content_block_delta"
176 data: msg.data, // the raw JSON payload
177 }).is_err() {
178 // Receiver dropped — no one is listening, clean up
179 es.close();
180 return Ok(());
181 }
182 }
183 // HTTP/SSE error (network issue, bad status code, etc.)
184 Some(Err(e)) => {
185 es.close();
186 return Err(e.to_string());
187 }
188 }
189 }
190 }
191 }
192}