Skip to main content

fabric_sdk/
sse.rs

1//! Server-Sent Events (SSE) streaming support.
2//!
3//! The Fabric API exposes several SSE endpoints:
4//! - `GET /v1/events/stream` — global event firehose
5//! - `GET /v1/workflow-runs/{id}/events` — per-run events
6//! - `GET /v1/jobs/{id}/events` — per-job events
7//! - `POST /v1/providers/execute/stream` — provider execution stream
8//!
9//! We deliberately do **not** use `web_sys::EventSource` in the wasm build:
10//! browsers refuse to attach `Authorization` headers to `EventSource` and it
11//! cannot issue POST requests, which would kill every authenticated stream.
12//! Instead we use `reqwest`'s streaming response body (backed by `fetch`'s
13//! `ReadableStream` on wasm and by `hyper` on native) and parse the SSE
14//! framing ourselves.
15
16use crate::{FabricClient, FabricError, Result};
17use bytes::Bytes;
18use futures_util::stream::{Stream, StreamExt};
19use serde::{Deserialize, Serialize};
20
21/// A single SSE event.
22#[derive(Debug, Clone, Default, Serialize, Deserialize)]
23pub struct SseEvent {
24    /// The `event:` field, if present.
25    #[serde(skip_serializing_if = "Option::is_none")]
26    pub event: Option<String>,
27    /// The `id:` field, if present. Used for `Last-Event-ID` reconnection.
28    #[serde(skip_serializing_if = "Option::is_none")]
29    pub id: Option<String>,
30    /// The concatenated `data:` field(s). SSE allows multi-line data; we
31    /// join them with `\n` per the spec.
32    pub data: String,
33}
34
35/// Parse an SSE byte stream into a stream of [`SseEvent`]s.
36///
37/// Handles the SSE wire format described at
38/// <https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream>:
39/// lines are delimited by `\n`, `\r`, or `\r\n`; a blank line dispatches the
40/// accumulated event. Comments (lines starting with `:`) and unknown fields
41/// are ignored.
42pub fn parse_sse_stream<S>(stream: S) -> impl Stream<Item = Result<SseEvent>>
43where
44    S: Stream<Item = std::result::Result<Bytes, reqwest::Error>>,
45{
46    let mut buffer: Vec<u8> = Vec::with_capacity(4096);
47    let mut current = SseEvent::default();
48    let mut data_buf = String::new();
49    let mut has_content = false;
50
51    async_stream::stream! {
52        let mut stream = Box::pin(stream);
53        while let Some(chunk) = stream.next().await {
54            let chunk = match chunk {
55                Ok(b) => b,
56                Err(e) => {
57                    yield Err(FabricError::Http(e));
58                    return;
59                }
60            };
61            buffer.extend_from_slice(&chunk);
62
63            // Pull out complete lines from the buffer. A line ends at \n,
64            // with a possible leading \r that we strip.
65            while let Some(nl) = buffer.iter().position(|&b| b == b'\n') {
66                let mut line = buffer.drain(..=nl).collect::<Vec<u8>>();
67                line.pop(); // drop '\n'
68                if line.last() == Some(&b'\r') {
69                    line.pop();
70                }
71                let line = match std::str::from_utf8(&line) {
72                    Ok(s) => s,
73                    Err(e) => {
74                        yield Err(FabricError::Other(format!("invalid UTF-8 in SSE stream: {e}")));
75                        return;
76                    }
77                };
78
79                if line.is_empty() {
80                    // Blank line → dispatch the event, if any.
81                    if has_content {
82                        current.data = std::mem::take(&mut data_buf);
83                        yield Ok(std::mem::take(&mut current));
84                        has_content = false;
85                    }
86                    continue;
87                }
88                if line.starts_with(':') {
89                    // Comment line — ignore.
90                    continue;
91                }
92
93                // Split "field: value" — per spec, the first `:` separates
94                // field from value, and a leading space on the value is
95                // stripped.
96                let (field, value) = match line.find(':') {
97                    Some(i) => {
98                        let v = &line[i + 1..];
99                        let v = v.strip_prefix(' ').unwrap_or(v);
100                        (&line[..i], v)
101                    }
102                    None => (line, ""),
103                };
104
105                match field {
106                    "event" => {
107                        current.event = Some(value.to_string());
108                        has_content = true;
109                    }
110                    "id" => {
111                        current.id = Some(value.to_string());
112                        has_content = true;
113                    }
114                    "data" => {
115                        if !data_buf.is_empty() {
116                            data_buf.push('\n');
117                        }
118                        data_buf.push_str(value);
119                        has_content = true;
120                    }
121                    // `retry:` and unknown fields — ignore for now.
122                    _ => {}
123                }
124            }
125        }
126
127        // Stream ended — dispatch any trailing event.
128        if has_content {
129            current.data = data_buf;
130            yield Ok(current);
131        }
132    }
133}
134
135/// Append `?include_internal=true` to a path when the option is set.
136///
137/// Centralised so the four SSE entry points (`stream_workflow_run`,
138/// `stream_job`, `stream_events`, and their `_with_internal` variants)
139/// stay in sync. The Fabric API hides internal SDK shim node events
140/// by default; this opt-in is only useful when debugging the
141/// finalizer itself.
142fn with_internal_query(path: &str, include_internal: bool) -> String {
143    if !include_internal {
144        return path.to_string();
145    }
146    let sep = if path.contains('?') { '&' } else { '?' };
147    format!("{path}{sep}include_internal=true")
148}
149
150impl FabricClient {
151    /// Stream events for a single workflow run as they happen.
152    ///
153    /// Returns a stream of [`SseEvent`]s. The stream terminates when the
154    /// server closes the connection (typically when the run reaches a
155    /// terminal state).
156    ///
157    /// Internal SDK shim node events (`_fabric_capture_input`,
158    /// `_fabric_finalize_output`) are hidden by default. Use
159    /// [`Self::stream_workflow_run_with_internal`] when debugging the
160    /// finalizer to see them.
161    pub async fn stream_workflow_run(
162        &self,
163        run_id: &str,
164    ) -> Result<impl Stream<Item = Result<SseEvent>>> {
165        self.sse_get(&format!("/v1/workflow-runs/{run_id}/events"))
166            .await
167    }
168
169    /// Same as [`Self::stream_workflow_run`] but explicitly opts in to
170    /// (or out of) internal SDK shim events.
171    pub async fn stream_workflow_run_with_internal(
172        &self,
173        run_id: &str,
174        include_internal: bool,
175    ) -> Result<impl Stream<Item = Result<SseEvent>>> {
176        let path = with_internal_query(
177            &format!("/v1/workflow-runs/{run_id}/events"),
178            include_internal,
179        );
180        self.sse_get(&path).await
181    }
182
183    /// Stream events for a single job as they happen.
184    ///
185    /// Hides internal SDK shim node events by default; use
186    /// [`Self::stream_job_with_internal`] to opt back in.
187    pub async fn stream_job(&self, job_id: &str) -> Result<impl Stream<Item = Result<SseEvent>>> {
188        self.sse_get(&format!("/v1/jobs/{job_id}/events")).await
189    }
190
191    /// Same as [`Self::stream_job`] with explicit internal-shim opt-in.
192    pub async fn stream_job_with_internal(
193        &self,
194        job_id: &str,
195        include_internal: bool,
196    ) -> Result<impl Stream<Item = Result<SseEvent>>> {
197        let path = with_internal_query(&format!("/v1/jobs/{job_id}/events"), include_internal);
198        self.sse_get(&path).await
199    }
200
201    /// Subscribe to the global event firehose.
202    ///
203    /// Hides internal SDK shim node events by default; use
204    /// [`Self::stream_events_with_internal`] to opt back in.
205    pub async fn stream_events(&self) -> Result<impl Stream<Item = Result<SseEvent>>> {
206        self.sse_get("/v1/events/stream").await
207    }
208
209    /// Same as [`Self::stream_events`] with explicit internal-shim opt-in.
210    pub async fn stream_events_with_internal(
211        &self,
212        include_internal: bool,
213    ) -> Result<impl Stream<Item = Result<SseEvent>>> {
214        let path = with_internal_query("/v1/events/stream", include_internal);
215        self.sse_get(&path).await
216    }
217
218    /// Execute a provider and stream partial results (e.g. token-by-token
219    /// LLM output).
220    pub async fn stream_provider_execute(
221        &self,
222        body: serde_json::Value,
223    ) -> Result<impl Stream<Item = Result<SseEvent>>> {
224        self.sse_post("/v1/providers/execute/stream", body).await
225    }
226
227    async fn sse_get(&self, path: &str) -> Result<impl Stream<Item = Result<SseEvent>>> {
228        let url = format!("{}{path}", self.base_url);
229        let resp = self
230            .client
231            .get(&url)
232            .header(reqwest::header::ACCEPT, "text/event-stream")
233            .send()
234            .await?;
235        check_sse_status(&resp)?;
236        Ok(parse_sse_stream(resp.bytes_stream()))
237    }
238
239    async fn sse_post(
240        &self,
241        path: &str,
242        body: serde_json::Value,
243    ) -> Result<impl Stream<Item = Result<SseEvent>>> {
244        let url = format!("{}{path}", self.base_url);
245        let resp = self
246            .client
247            .post(&url)
248            .header(reqwest::header::ACCEPT, "text/event-stream")
249            .json(&body)
250            .send()
251            .await?;
252        check_sse_status(&resp)?;
253        Ok(parse_sse_stream(resp.bytes_stream()))
254    }
255}
256
257fn check_sse_status(resp: &reqwest::Response) -> Result<()> {
258    let status = resp.status();
259    if !status.is_success() {
260        return Err(FabricError::Api {
261            code: status.as_u16().to_string(),
262            message: format!("SSE connection failed with HTTP {status}"),
263        });
264    }
265    Ok(())
266}