fabric-platform 0.8.1

Rust client SDK for the Fabric platform
Documentation
//! Server-Sent Events (SSE) streaming support.
//!
//! The Fabric API exposes several SSE endpoints:
//! - `GET /v1/events/stream` — global event firehose
//! - `GET /v1/workflow-runs/{id}/events` — per-run events
//! - `GET /v1/jobs/{id}/events` — per-job events
//! - `POST /v1/providers/execute/stream` — provider execution stream
//!
//! We deliberately do **not** use `web_sys::EventSource` in the wasm build:
//! browsers refuse to attach `Authorization` headers to `EventSource` and it
//! cannot issue POST requests, which would kill every authenticated stream.
//! Instead we use `reqwest`'s streaming response body (backed by `fetch`'s
//! `ReadableStream` on wasm and by `hyper` on native) and parse the SSE
//! framing ourselves.

use crate::{FabricClient, FabricError, Result};
use bytes::Bytes;
use futures_util::stream::{Stream, StreamExt};
use serde::{Deserialize, Serialize};

/// A single SSE event.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct SseEvent {
    /// The `event:` field, if present.
    #[serde(skip_serializing_if = "Option::is_none")]
    pub event: Option<String>,
    /// The `id:` field, if present. Used for `Last-Event-ID` reconnection.
    #[serde(skip_serializing_if = "Option::is_none")]
    pub id: Option<String>,
    /// The concatenated `data:` field(s). SSE allows multi-line data; we
    /// join them with `\n` per the spec.
    pub data: String,
}

/// Parse an SSE byte stream into a stream of [`SseEvent`]s.
///
/// Handles the SSE wire format described at
/// <https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream>:
/// lines are delimited by `\n`, `\r`, or `\r\n`; a blank line dispatches the
/// accumulated event. Comments (lines starting with `:`) and unknown fields
/// are ignored.
pub fn parse_sse_stream<S>(stream: S) -> impl Stream<Item = Result<SseEvent>>
where
    S: Stream<Item = std::result::Result<Bytes, reqwest::Error>>,
{
    let mut buffer: Vec<u8> = Vec::with_capacity(4096);
    let mut current = SseEvent::default();
    let mut data_buf = String::new();
    let mut has_content = false;

    async_stream::stream! {
        let mut stream = Box::pin(stream);
        while let Some(chunk) = stream.next().await {
            let chunk = match chunk {
                Ok(b) => b,
                Err(e) => {
                    yield Err(FabricError::Http(e));
                    return;
                }
            };
            buffer.extend_from_slice(&chunk);

            // Pull out complete lines from the buffer. A line ends at \n,
            // with a possible leading \r that we strip.
            while let Some(nl) = buffer.iter().position(|&b| b == b'\n') {
                let mut line = buffer.drain(..=nl).collect::<Vec<u8>>();
                line.pop(); // drop '\n'
                if line.last() == Some(&b'\r') {
                    line.pop();
                }
                let line = match std::str::from_utf8(&line) {
                    Ok(s) => s,
                    Err(e) => {
                        yield Err(FabricError::Other(format!("invalid UTF-8 in SSE stream: {e}")));
                        return;
                    }
                };

                if line.is_empty() {
                    // Blank line → dispatch the event, if any.
                    if has_content {
                        current.data = std::mem::take(&mut data_buf);
                        yield Ok(std::mem::take(&mut current));
                        has_content = false;
                    }
                    continue;
                }
                if line.starts_with(':') {
                    // Comment line — ignore.
                    continue;
                }

                // Split "field: value" — per spec, the first `:` separates
                // field from value, and a leading space on the value is
                // stripped.
                let (field, value) = match line.find(':') {
                    Some(i) => {
                        let v = &line[i + 1..];
                        let v = v.strip_prefix(' ').unwrap_or(v);
                        (&line[..i], v)
                    }
                    None => (line, ""),
                };

                match field {
                    "event" => {
                        current.event = Some(value.to_string());
                        has_content = true;
                    }
                    "id" => {
                        current.id = Some(value.to_string());
                        has_content = true;
                    }
                    "data" => {
                        if !data_buf.is_empty() {
                            data_buf.push('\n');
                        }
                        data_buf.push_str(value);
                        has_content = true;
                    }
                    // `retry:` and unknown fields — ignore for now.
                    _ => {}
                }
            }
        }

        // Stream ended — dispatch any trailing event.
        if has_content {
            current.data = data_buf;
            yield Ok(current);
        }
    }
}

/// Append `?include_internal=true` to a path when the option is set.
///
/// Centralised so the four SSE entry points (`stream_workflow_run`,
/// `stream_job`, `stream_events`, and their `_with_internal` variants)
/// stay in sync. The Fabric API hides internal SDK shim node events
/// by default; this opt-in is only useful when debugging the
/// finalizer itself.
fn with_internal_query(path: &str, include_internal: bool) -> String {
    if !include_internal {
        return path.to_string();
    }
    let sep = if path.contains('?') { '&' } else { '?' };
    format!("{path}{sep}include_internal=true")
}

impl FabricClient {
    /// Stream events for a single workflow run as they happen.
    ///
    /// Returns a stream of [`SseEvent`]s. The stream terminates when the
    /// server closes the connection (typically when the run reaches a
    /// terminal state).
    ///
    /// Internal SDK shim node events (`_fabric_capture_input`,
    /// `_fabric_finalize_output`) are hidden by default. Use
    /// [`Self::stream_workflow_run_with_internal`] when debugging the
    /// finalizer to see them.
    pub async fn stream_workflow_run(
        &self,
        run_id: &str,
    ) -> Result<impl Stream<Item = Result<SseEvent>>> {
        self.sse_get(&format!("/v1/workflow-runs/{run_id}/events"))
            .await
    }

    /// Same as [`Self::stream_workflow_run`] but explicitly opts in to
    /// (or out of) internal SDK shim events.
    pub async fn stream_workflow_run_with_internal(
        &self,
        run_id: &str,
        include_internal: bool,
    ) -> Result<impl Stream<Item = Result<SseEvent>>> {
        let path = with_internal_query(
            &format!("/v1/workflow-runs/{run_id}/events"),
            include_internal,
        );
        self.sse_get(&path).await
    }

    /// Stream events for a single job as they happen.
    ///
    /// Hides internal SDK shim node events by default; use
    /// [`Self::stream_job_with_internal`] to opt back in.
    pub async fn stream_job(&self, job_id: &str) -> Result<impl Stream<Item = Result<SseEvent>>> {
        self.sse_get(&format!("/v1/jobs/{job_id}/events")).await
    }

    /// Same as [`Self::stream_job`] with explicit internal-shim opt-in.
    pub async fn stream_job_with_internal(
        &self,
        job_id: &str,
        include_internal: bool,
    ) -> Result<impl Stream<Item = Result<SseEvent>>> {
        let path = with_internal_query(&format!("/v1/jobs/{job_id}/events"), include_internal);
        self.sse_get(&path).await
    }

    /// Subscribe to the global event firehose.
    ///
    /// Hides internal SDK shim node events by default; use
    /// [`Self::stream_events_with_internal`] to opt back in.
    pub async fn stream_events(&self) -> Result<impl Stream<Item = Result<SseEvent>>> {
        self.sse_get("/v1/events/stream").await
    }

    /// Same as [`Self::stream_events`] with explicit internal-shim opt-in.
    pub async fn stream_events_with_internal(
        &self,
        include_internal: bool,
    ) -> Result<impl Stream<Item = Result<SseEvent>>> {
        let path = with_internal_query("/v1/events/stream", include_internal);
        self.sse_get(&path).await
    }

    /// Execute a provider and stream partial results (e.g. token-by-token
    /// LLM output).
    pub async fn stream_provider_execute(
        &self,
        body: serde_json::Value,
    ) -> Result<impl Stream<Item = Result<SseEvent>>> {
        self.sse_post("/v1/providers/execute/stream", body).await
    }

    async fn sse_get(&self, path: &str) -> Result<impl Stream<Item = Result<SseEvent>>> {
        let url = format!("{}{path}", self.base_url);
        let resp = self
            .client
            .get(&url)
            .header(reqwest::header::ACCEPT, "text/event-stream")
            .send()
            .await?;
        check_sse_status(&resp)?;
        Ok(parse_sse_stream(resp.bytes_stream()))
    }

    async fn sse_post(
        &self,
        path: &str,
        body: serde_json::Value,
    ) -> Result<impl Stream<Item = Result<SseEvent>>> {
        let url = format!("{}{path}", self.base_url);
        let resp = self
            .client
            .post(&url)
            .header(reqwest::header::ACCEPT, "text/event-stream")
            .json(&body)
            .send()
            .await?;
        check_sse_status(&resp)?;
        Ok(parse_sse_stream(resp.bytes_stream()))
    }
}

fn check_sse_status(resp: &reqwest::Response) -> Result<()> {
    let status = resp.status();
    if !status.is_success() {
        return Err(FabricError::Api {
            code: status.as_u16().to_string(),
            message: format!("SSE connection failed with HTTP {status}"),
        });
    }
    Ok(())
}