fabric_sdk/sse.rs
1//! Server-Sent Events (SSE) streaming support.
2//!
3//! The Fabric API exposes several SSE endpoints:
4//! - `GET /v1/events/stream` — global event firehose
5//! - `GET /v1/workflow-runs/{id}/events` — per-run events
6//! - `GET /v1/jobs/{id}/events` — per-job events
7//! - `POST /v1/providers/execute/stream` — provider execution stream
8//!
9//! We deliberately do **not** use `web_sys::EventSource` in the wasm build:
10//! browsers refuse to attach `Authorization` headers to `EventSource` and it
11//! cannot issue POST requests, which would kill every authenticated stream.
12//! Instead we use `reqwest`'s streaming response body (backed by `fetch`'s
13//! `ReadableStream` on wasm and by `hyper` on native) and parse the SSE
14//! framing ourselves.
15
16use crate::{FabricClient, FabricError, Result};
17use bytes::Bytes;
18use futures_util::stream::{Stream, StreamExt};
19use serde::{Deserialize, Serialize};
20
21/// A single SSE event.
22#[derive(Debug, Clone, Default, Serialize, Deserialize)]
23pub struct SseEvent {
24 /// The `event:` field, if present.
25 #[serde(skip_serializing_if = "Option::is_none")]
26 pub event: Option<String>,
27 /// The `id:` field, if present. Used for `Last-Event-ID` reconnection.
28 #[serde(skip_serializing_if = "Option::is_none")]
29 pub id: Option<String>,
30 /// The concatenated `data:` field(s). SSE allows multi-line data; we
31 /// join them with `\n` per the spec.
32 pub data: String,
33}
34
35/// Parse an SSE byte stream into a stream of [`SseEvent`]s.
36///
37/// Handles the SSE wire format described at
38/// <https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream>:
39/// lines are delimited by `\n`, `\r`, or `\r\n`; a blank line dispatches the
40/// accumulated event. Comments (lines starting with `:`) and unknown fields
41/// are ignored.
42pub fn parse_sse_stream<S>(stream: S) -> impl Stream<Item = Result<SseEvent>>
43where
44 S: Stream<Item = std::result::Result<Bytes, reqwest::Error>>,
45{
46 let mut buffer: Vec<u8> = Vec::with_capacity(4096);
47 let mut current = SseEvent::default();
48 let mut data_buf = String::new();
49 let mut has_content = false;
50
51 async_stream::stream! {
52 let mut stream = Box::pin(stream);
53 while let Some(chunk) = stream.next().await {
54 let chunk = match chunk {
55 Ok(b) => b,
56 Err(e) => {
57 yield Err(FabricError::Http(e));
58 return;
59 }
60 };
61 buffer.extend_from_slice(&chunk);
62
63 // Pull out complete lines from the buffer. A line ends at \n,
64 // with a possible leading \r that we strip.
65 while let Some(nl) = buffer.iter().position(|&b| b == b'\n') {
66 let mut line = buffer.drain(..=nl).collect::<Vec<u8>>();
67 line.pop(); // drop '\n'
68 if line.last() == Some(&b'\r') {
69 line.pop();
70 }
71 let line = match std::str::from_utf8(&line) {
72 Ok(s) => s,
73 Err(e) => {
74 yield Err(FabricError::Other(format!("invalid UTF-8 in SSE stream: {e}")));
75 return;
76 }
77 };
78
79 if line.is_empty() {
80 // Blank line → dispatch the event, if any.
81 if has_content {
82 current.data = std::mem::take(&mut data_buf);
83 yield Ok(std::mem::take(&mut current));
84 has_content = false;
85 }
86 continue;
87 }
88 if line.starts_with(':') {
89 // Comment line — ignore.
90 continue;
91 }
92
93 // Split "field: value" — per spec, the first `:` separates
94 // field from value, and a leading space on the value is
95 // stripped.
96 let (field, value) = match line.find(':') {
97 Some(i) => {
98 let v = &line[i + 1..];
99 let v = v.strip_prefix(' ').unwrap_or(v);
100 (&line[..i], v)
101 }
102 None => (line, ""),
103 };
104
105 match field {
106 "event" => {
107 current.event = Some(value.to_string());
108 has_content = true;
109 }
110 "id" => {
111 current.id = Some(value.to_string());
112 has_content = true;
113 }
114 "data" => {
115 if !data_buf.is_empty() {
116 data_buf.push('\n');
117 }
118 data_buf.push_str(value);
119 has_content = true;
120 }
121 // `retry:` and unknown fields — ignore for now.
122 _ => {}
123 }
124 }
125 }
126
127 // Stream ended — dispatch any trailing event.
128 if has_content {
129 current.data = data_buf;
130 yield Ok(current);
131 }
132 }
133}
134
135/// Append `?include_internal=true` to a path when the option is set.
136///
137/// Centralised so the four SSE entry points (`stream_workflow_run`,
138/// `stream_job`, `stream_events`, and their `_with_internal` variants)
139/// stay in sync. The Fabric API hides internal SDK shim node events
140/// by default; this opt-in is only useful when debugging the
141/// finalizer itself.
142fn with_internal_query(path: &str, include_internal: bool) -> String {
143 if !include_internal {
144 return path.to_string();
145 }
146 let sep = if path.contains('?') { '&' } else { '?' };
147 format!("{path}{sep}include_internal=true")
148}
149
150impl FabricClient {
151 /// Stream events for a single workflow run as they happen.
152 ///
153 /// Returns a stream of [`SseEvent`]s. The stream terminates when the
154 /// server closes the connection (typically when the run reaches a
155 /// terminal state).
156 ///
157 /// Internal SDK shim node events (`_fabric_capture_input`,
158 /// `_fabric_finalize_output`) are hidden by default. Use
159 /// [`Self::stream_workflow_run_with_internal`] when debugging the
160 /// finalizer to see them.
161 pub async fn stream_workflow_run(
162 &self,
163 run_id: &str,
164 ) -> Result<impl Stream<Item = Result<SseEvent>>> {
165 self.sse_get(&format!("/v1/workflow-runs/{run_id}/events"))
166 .await
167 }
168
169 /// Same as [`Self::stream_workflow_run`] but explicitly opts in to
170 /// (or out of) internal SDK shim events.
171 pub async fn stream_workflow_run_with_internal(
172 &self,
173 run_id: &str,
174 include_internal: bool,
175 ) -> Result<impl Stream<Item = Result<SseEvent>>> {
176 let path = with_internal_query(
177 &format!("/v1/workflow-runs/{run_id}/events"),
178 include_internal,
179 );
180 self.sse_get(&path).await
181 }
182
183 /// Stream events for a single job as they happen.
184 ///
185 /// Hides internal SDK shim node events by default; use
186 /// [`Self::stream_job_with_internal`] to opt back in.
187 pub async fn stream_job(&self, job_id: &str) -> Result<impl Stream<Item = Result<SseEvent>>> {
188 self.sse_get(&format!("/v1/jobs/{job_id}/events")).await
189 }
190
191 /// Same as [`Self::stream_job`] with explicit internal-shim opt-in.
192 pub async fn stream_job_with_internal(
193 &self,
194 job_id: &str,
195 include_internal: bool,
196 ) -> Result<impl Stream<Item = Result<SseEvent>>> {
197 let path = with_internal_query(&format!("/v1/jobs/{job_id}/events"), include_internal);
198 self.sse_get(&path).await
199 }
200
201 /// Subscribe to the global event firehose.
202 ///
203 /// Hides internal SDK shim node events by default; use
204 /// [`Self::stream_events_with_internal`] to opt back in.
205 pub async fn stream_events(&self) -> Result<impl Stream<Item = Result<SseEvent>>> {
206 self.sse_get("/v1/events/stream").await
207 }
208
209 /// Same as [`Self::stream_events`] with explicit internal-shim opt-in.
210 pub async fn stream_events_with_internal(
211 &self,
212 include_internal: bool,
213 ) -> Result<impl Stream<Item = Result<SseEvent>>> {
214 let path = with_internal_query("/v1/events/stream", include_internal);
215 self.sse_get(&path).await
216 }
217
218 /// Execute a provider and stream partial results (e.g. token-by-token
219 /// LLM output).
220 pub async fn stream_provider_execute(
221 &self,
222 body: serde_json::Value,
223 ) -> Result<impl Stream<Item = Result<SseEvent>>> {
224 self.sse_post("/v1/providers/execute/stream", body).await
225 }
226
227 async fn sse_get(&self, path: &str) -> Result<impl Stream<Item = Result<SseEvent>>> {
228 let url = format!("{}{path}", self.base_url);
229 let resp = self
230 .client
231 .get(&url)
232 .header(reqwest::header::ACCEPT, "text/event-stream")
233 .send()
234 .await?;
235 check_sse_status(&resp)?;
236 Ok(parse_sse_stream(resp.bytes_stream()))
237 }
238
239 async fn sse_post(
240 &self,
241 path: &str,
242 body: serde_json::Value,
243 ) -> Result<impl Stream<Item = Result<SseEvent>>> {
244 let url = format!("{}{path}", self.base_url);
245 let resp = self
246 .client
247 .post(&url)
248 .header(reqwest::header::ACCEPT, "text/event-stream")
249 .json(&body)
250 .send()
251 .await?;
252 check_sse_status(&resp)?;
253 Ok(parse_sse_stream(resp.bytes_stream()))
254 }
255}
256
257fn check_sse_status(resp: &reqwest::Response) -> Result<()> {
258 let status = resp.status();
259 if !status.is_success() {
260 return Err(FabricError::Api {
261 code: status.as_u16().to_string(),
262 message: format!("SSE connection failed with HTTP {status}"),
263 });
264 }
265 Ok(())
266}