Skip to main content

phi_core/provider/
sse.rs

1//! Shared SSE (Server-Sent Events) parsing utilities.
2//!
3//! Factored out from the Anthropic provider so all providers can reuse
4//! the same streaming infrastructure.
5/*
6ARCHITECTURE: sse.rs — the shared HTTP streaming layer
7
8Server-Sent Events (SSE) is the wire protocol used by all major LLM providers
9for streaming. It's an HTTP response with Content-Type: text/event-stream where
10the server pushes newline-delimited events:
11
12  event: content_block_delta
13  data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}
14
15  event: message_stop
16  data: {"type":"message_stop"}
17
18`drive_sse()` handles the generic SSE layer (HTTP chunking, event framing,
19reconnect, cancellation) and forwards parsed `SseEvent` structs to a channel.
20Each provider then reads from that channel and interprets the `data` JSON
21according to its own format (Anthropic vs OpenAI vs Google differ in event names
22and JSON shapes).
23
24This separation of concerns:
25  - `sse.rs`           → knows about SSE wire format (events, data, open, close)
26  - `anthropic.rs`     → knows about Anthropic's specific JSON event schema
27  - `openai_compat.rs` → knows about OpenAI's specific JSON event schema
28  - ...
29
30RUST QUIRK: `reqwest_eventsource` crate — async SSE over HTTP
31
32`reqwest` is Rust's most popular async HTTP client (like Python's `httpx`).
33`reqwest_eventsource` extends it with SSE stream handling: it reconnects on
34dropped connections, handles SSE framing (multi-line fields, comment lines, etc.),
35and presents a clean `Stream` of `Event` items.
36
37`EventSource` implements `futures::Stream<Item = Result<Event, ...>>` — a lazy,
38async sequence of values. Think of it as an async generator in Python.
39*/
40
41use futures::StreamExt;
42use reqwest_eventsource::{Event, EventSource};
43use tokio::sync::mpsc;
44use tokio_util::sync::CancellationToken;
45use tracing::debug;
46
47/// A parsed SSE event with event type and data.
48/*
49RUST QUIRK: `#[derive(Debug, Clone)]` on a simple named-field struct
50  Both fields are `String` (heap-allocated, owned). `Clone` deep-copies both strings.
51  `Debug` enables `{:?}` formatting for logging/debugging.
52  No `Copy` here because `String` is not `Copy` (it owns heap memory).
53*/
54#[derive(Debug, Clone)]
55pub struct SseEvent {
56    /// The SSE event type (e.g. "content_block_delta", "message_stop").
57    pub event: String,
58    /// The raw JSON data string for this event.
59    pub data: String,
60}
61
62/// Drives an EventSource, sending parsed events through a channel.
63/// Returns when the stream ends, errors, or is cancelled.
64///
65/// The caller receives `SseEvent`s and can parse them according to
66/// provider-specific formats.
67/*
68ARCHITECTURE: drive_sse — the event pump
69
70`drive_sse()` is a "pump" that pulls events from the HTTP response stream and
71pushes them into an mpsc channel. It runs as a concurrent task alongside the
72provider's event-processing logic.
73
74Why async? The HTTP response arrives incrementally. If we waited synchronously
75for the entire response, we'd have no streaming at all. `async` lets the tokio
76runtime interleave the HTTP reads with other work.
77
78Return value: `Result<(), String>`
79  `Ok(())`  — stream ended cleanly (HTTP connection closed by server)
80  `Err(s)`  — either cancelled ("cancelled") or an HTTP/network error
81
82RUST QUIRK: `tokio::select!` — race multiple async operations
83
84`tokio::select!` runs multiple futures concurrently and returns when the FIRST
85one completes. All other futures are DROPPED (cancelled).
86
87Here we race two operations:
88  1. `cancel.cancelled()` — waits until the cancellation token is triggered
89  2. `es.next()`          — waits for the next SSE event from the HTTP stream
90
91If cancellation fires first: we close the SSE connection and return Err("cancelled").
92If a new event arrives first: we process it and loop.
93
94Python analogy:
95  done, pending = asyncio.wait(
96      {cancel_task, es_next_task},
97      return_when=asyncio.FIRST_COMPLETED
98  )
99
100The `_` in `_ = cancel.cancelled()` means "I don't care about the return value
101of this future — I only care that it completed."
102
103RUST QUIRK: `loop { ... }` with `return` — Rust's infinite loop + early exits
104
105`loop` creates an infinite loop (no condition). The only exits are:
106  - `return Ok(())`  when stream ends
107  - `return Err(...)` on cancellation or error
108  - `break` (not used here)
109
110Unlike `while true` in Python, `loop` in Rust communicates intent: "this loop
111runs until an explicit exit, not until some condition becomes false."
112
113RUST QUIRK: `es.next()` on a Stream — `futures::StreamExt::next()`
114  `EventSource` implements `futures::Stream`. The `StreamExt` trait (from the
115  `futures` crate) extends Stream with ergonomic methods:
116    `.next()` → future that resolves to `Option<Item>`:
117      `Some(Ok(event))` — got an event
118      `Some(Err(e))`    — got an error
119      `None`            — stream ended
120
121RUST QUIRK: `tx.send(...).is_err()` — checking if the receiver is gone
122  `mpsc::UnboundedSender::send()` returns `Err(value)` if the receiver has been
123  dropped. When the receiver (the provider's event-processing task) is gone,
124  there's no point continuing — we close the SSE connection and return Ok.
125  `.is_err()` returns `true` if the Result is an `Err` variant.
126  Python analogy: catching a BrokenPipeError when writing to a closed queue.
127
128RUST QUIRK: `e.to_string()` on an error type
129  Most error types implement `Display` (the `{}` formatter). `.to_string()`
130  calls `Display` to get a human-readable description. Equivalent to `str(e)` in Python.
131  Here we convert the `reqwest_eventsource` error to a plain `String` for our
132  simpler `Result<(), String>` return type.
133*/
134/*
135DESIGN: Why `drive_sse` takes `es` by value AND uses a channel `tx`
136
137Same dual-audience pattern as `execute_single_tool`:
138  `es`     = HTTP SOURCE — drive_sse owns the connection; it is the only place that calls
139             `.close()` and `.next()`. Passing by value (not `&mut`) means this function
140             is exclusively responsible for the connection's lifecycle.
141  `tx`     = OBSERVER CHANNEL — provider tasks await on the rx end to process events.
142             Separation keeps HTTP pumping out of provider-specific JSON parsing logic.
143  `cancel` = ABORT — if triggered mid-stream, `.close()` shuts down the HTTP connection
144             and the function returns so the caller can clean up.
145
146Why run drive_sse as a concurrent task?
147  Providers spawn drive_sse as a tokio task and await the rx channel separately.
148  This separates "reading bytes off the wire" from "interpreting the JSON events".
149  If interpretation is slow, the HTTP buffer drains independently.
150*/
151pub async fn drive_sse(
152    mut es: EventSource, // HTTP SOURCE — owns the open SSE connection; .close() shuts it down
153    tx: mpsc::UnboundedSender<SseEvent>, // OBSERVER CHANNEL — forward parsed events to the provider's processing task
154    cancel: CancellationToken, // ABORT — if triggered, closes connection and returns Err("cancelled")
155) -> Result<(), String> {
156    loop {
157        tokio::select! {
158            // Branch 1: cancellation requested — close SSE and abort
159            _ = cancel.cancelled() => {
160                es.close(); // signal the HTTP connection to close
161                return Err("cancelled".into());
162            }
163            // Branch 2: next SSE event arrived (or stream ended/errored)
164            event = es.next() => {
165                match event {
166                    // Stream ended cleanly (server closed the connection)
167                    None => return Ok(()),
168                    // Connection opened — just log it, no action needed
169                    Some(Ok(Event::Open)) => {
170                        debug!("SSE connection opened");
171                    }
172                    // A real event with data — forward to the provider's channel
173                    Some(Ok(Event::Message(msg))) => {
174                        if tx.send(SseEvent {
175                            event: msg.event, // e.g. "content_block_delta"
176                            data: msg.data,   // the raw JSON payload
177                        }).is_err() {
178                            // Receiver dropped — no one is listening, clean up
179                            es.close();
180                            return Ok(());
181                        }
182                    }
183                    // HTTP/SSE error (network issue, bad status code, etc.)
184                    Some(Err(e)) => {
185                        es.close();
186                        return Err(e.to_string());
187                    }
188                }
189            }
190        }
191    }
192}