Skip to main content

opendev_http/
streaming.rs

1//! SSE streaming support for LLM provider responses.
2//!
3//! Provides types and parsing for Server-Sent Events (SSE) used by
4//! streaming LLM APIs (OpenAI Responses API, Anthropic, etc.).
5
6use serde_json::Value;
7
8/// Events emitted during a streaming LLM response.
9#[derive(Debug, Clone)]
10pub enum StreamEvent {
11    /// A chunk of text content from the assistant.
12    TextDelta(String),
13    /// A chunk of reasoning/thinking content.
14    ReasoningDelta(String),
15    /// A new reasoning/thinking block is starting (used to insert separators
16    /// between multiple interleaved thinking blocks in a single response).
17    ReasoningBlockStart,
18    /// A new function/tool call is starting.
19    /// Fields: `(index, call_id, function_name)`
20    FunctionCallStart {
21        index: usize,
22        call_id: String,
23        name: String,
24    },
25    /// A chunk of function call arguments.
26    FunctionCallDelta { index: usize, delta: String },
27    /// Function call arguments are complete.
28    FunctionCallDone { index: usize, arguments: String },
29    /// Usage/metadata update (input_tokens, output_tokens, stop_reason).
30    UsageUpdate {
31        usage: Option<Value>,
32        stop_reason: Option<String>,
33    },
34    /// The complete response is available (streaming finished).
35    /// Contains the full response body for final processing.
36    Done(Value),
37    /// An error occurred during streaming.
38    Error(String),
39}
40
41/// Callback for stream events. Implementations should be cheap/non-blocking.
42pub trait StreamCallback: Send + Sync {
43    fn on_event(&self, event: &StreamEvent);
44}
45
46/// A closure-based StreamCallback.
47pub struct FnStreamCallback<F: Fn(&StreamEvent) + Send + Sync>(pub F);
48
49impl<F: Fn(&StreamEvent) + Send + Sync> StreamCallback for FnStreamCallback<F> {
50    fn on_event(&self, event: &StreamEvent) {
51        (self.0)(event);
52    }
53}
54
55/// Parse a single SSE data line (after "data: " prefix) as JSON.
56pub fn parse_sse_data(line: &str) -> Option<Value> {
57    let data = line.strip_prefix("data: ")?;
58    if data == "[DONE]" {
59        return None;
60    }
61    serde_json::from_str(data).ok()
62}
63
64/// Parse an SSE event block (event type + data) from raw lines.
65///
66/// Returns `(event_type, data_json)` if both are present.
67pub fn parse_sse_block(event_line: Option<&str>, data_line: &str) -> Option<(String, Value)> {
68    let event_type = event_line
69        .and_then(|l| l.strip_prefix("event: "))
70        .map(|s| s.to_string())
71        .unwrap_or_default();
72
73    let data = parse_sse_data(data_line)?;
74    Some((event_type, data))
75}