systemprompt-api 0.14.6

Axum-based HTTP server and API gateway for systemprompt.io AI governance infrastructure. Exposes governed agents, MCP, A2A, and admin endpoints with rate limiting and RBAC.
Documentation
use futures_util::StreamExt;
// JSON: protocol boundary — event shapes are owned by the models::wire
// Anthropic codec.
use serde_json::Value;
use systemprompt_models::wire::anthropic;

use super::super::super::canonical_response::CanonicalEvent;

#[cfg_attr(
    not(feature = "test-api"),
    expect(
        unreachable_pub,
        reason = "items are re-exported via `test_api` only when the feature is on"
    )
)]
pub fn sse_to_canonical_events<S>(
    stream: S,
) -> futures_util::stream::BoxStream<'static, Result<CanonicalEvent, String>>
where
    S: futures_util::Stream<Item = Result<bytes::Bytes, reqwest::Error>> + Send + 'static,
{
    use futures_util::stream;
    let s = stream
        .map(|chunk| chunk.map_err(|e| e.to_string()))
        .scan((Vec::<u8>::new(), String::new()), |state, item| {
            let (buf, msg_id) = state;
            let res = match item {
                Ok(bytes) => {
                    buf.extend_from_slice(&bytes);
                    Some(drain_frames(buf, msg_id))
                },
                Err(e) => Some(vec![Err(e)]),
            };
            futures_util::future::ready(res)
        })
        .flat_map(stream::iter);
    s.boxed()
}

fn drain_frames(buf: &mut Vec<u8>, msg_id: &mut String) -> Vec<Result<CanonicalEvent, String>> {
    let mut events: Vec<Result<CanonicalEvent, String>> = Vec::new();
    while let Some(pos) = find_double_newline(buf) {
        let frame: Vec<u8> = buf.drain(..pos + 2).collect();
        let frame_str = String::from_utf8_lossy(&frame);
        for line in frame_str.lines() {
            if let Some(data) = line.strip_prefix("data: ") {
                if data.trim() == "[DONE]" {
                    continue;
                }
                if let Ok(value) = serde_json::from_str::<Value>(data) {
                    if let Some(ev) = anthropic::event_from_sse(&value, msg_id) {
                        if let CanonicalEvent::MessageStart { id, .. } = &ev {
                            msg_id.clone_from(id);
                        }
                        events.push(Ok(ev));
                    }
                }
            }
        }
    }
    events
}

fn find_double_newline(buf: &[u8]) -> Option<usize> {
    buf.windows(2).position(|w| w == b"\n\n")
}