systemprompt-api 0.9.2

Axum-based HTTP server and API gateway for systemprompt.io AI governance infrastructure. Exposes governed agents, MCP, A2A, and admin endpoints with rate limiting and RBAC.
Documentation
use futures_util::StreamExt;
// JSON: protocol boundary — Anthropic Messages outbound wire format is dynamic
// JSON.
use serde_json::Value;

use super::super::super::canonical_response::{
    CanonicalEvent, CanonicalStopReason, CanonicalUsage, ContentBlockKind,
};

pub(super) fn sse_to_canonical_events<S>(
    stream: S,
) -> futures_util::stream::BoxStream<'static, Result<CanonicalEvent, String>>
where
    S: futures_util::Stream<Item = Result<bytes::Bytes, reqwest::Error>> + Send + 'static,
{
    use futures_util::stream;
    let s = stream
        .map(|chunk| chunk.map_err(|e| e.to_string()))
        .scan((Vec::<u8>::new(), String::new()), |state, item| {
            let (buf, msg_id) = state;
            let res = match item {
                Ok(bytes) => {
                    buf.extend_from_slice(&bytes);
                    let mut events: Vec<Result<CanonicalEvent, String>> = Vec::new();
                    while let Some(pos) = find_double_newline(buf) {
                        let frame: Vec<u8> = buf.drain(..pos + 2).collect();
                        let frame_str = String::from_utf8_lossy(&frame);
                        for line in frame_str.lines() {
                            if let Some(data) = line.strip_prefix("data: ") {
                                if data.trim() == "[DONE]" {
                                    continue;
                                }
                                if let Ok(value) = serde_json::from_str::<Value>(data) {
                                    if let Some(ev) = anthropic_event_to_canonical(&value, msg_id) {
                                        if let CanonicalEvent::MessageStart { id, .. } = &ev {
                                            msg_id.clone_from(id);
                                        }
                                        events.push(Ok(ev));
                                    }
                                }
                            }
                        }
                    }
                    Some(events)
                },
                Err(e) => Some(vec![Err(e)]),
            };
            futures_util::future::ready(res)
        })
        .flat_map(stream::iter);
    s.boxed()
}

fn find_double_newline(buf: &[u8]) -> Option<usize> {
    buf.windows(2).position(|w| w == b"\n\n")
}

fn anthropic_event_to_canonical(value: &Value, msg_id: &str) -> Option<CanonicalEvent> {
    let kind = value.get("type").and_then(Value::as_str)?;
    match kind {
        "message_start" => convert_message_start(value),
        "content_block_start" => convert_content_block_start(value),
        "content_block_delta" => convert_content_block_delta(value),
        "content_block_stop" => {
            let index = value.get("index").and_then(Value::as_u64).unwrap_or(0) as u32;
            Some(CanonicalEvent::ContentBlockStop { index })
        },
        "message_delta" => convert_message_delta(value, msg_id),
        "message_stop" => Some(CanonicalEvent::MessageStop {
            id: msg_id.to_string(),
            stop_reason: None,
        }),
        "error" => {
            let msg = value
                .get("error")
                .and_then(|e| e.get("message"))
                .and_then(Value::as_str)
                .unwrap_or("upstream error")
                .to_string();
            Some(CanonicalEvent::Error(msg))
        },
        _ => None,
    }
}

fn convert_message_start(value: &Value) -> Option<CanonicalEvent> {
    let msg = value.get("message")?;
    Some(CanonicalEvent::MessageStart {
        id: msg
            .get("id")
            .and_then(Value::as_str)
            .unwrap_or("")
            .to_string(),
        model: msg
            .get("model")
            .and_then(Value::as_str)
            .unwrap_or("")
            .to_string(),
        usage: usage_from_value(msg.get("usage")),
    })
}

fn convert_content_block_start(value: &Value) -> Option<CanonicalEvent> {
    let index = value.get("index").and_then(Value::as_u64).unwrap_or(0) as u32;
    let block = value.get("content_block")?;
    let block_type = block.get("type").and_then(Value::as_str)?;
    let kind = match block_type {
        "text" => ContentBlockKind::Text,
        "thinking" => ContentBlockKind::Thinking {
            signature: block
                .get("signature")
                .and_then(Value::as_str)
                .map(ToString::to_string),
        },
        "tool_use" => ContentBlockKind::ToolUse {
            id: block
                .get("id")
                .and_then(Value::as_str)
                .unwrap_or("")
                .to_string(),
            name: block
                .get("name")
                .and_then(Value::as_str)
                .unwrap_or("")
                .to_string(),
        },
        _ => return None,
    };
    Some(CanonicalEvent::ContentBlockStart { index, block: kind })
}

fn convert_content_block_delta(value: &Value) -> Option<CanonicalEvent> {
    let index = value.get("index").and_then(Value::as_u64).unwrap_or(0) as u32;
    let delta = value.get("delta")?;
    let dtype = delta.get("type").and_then(Value::as_str)?;
    let text_field = |field: &str| {
        delta
            .get(field)
            .and_then(Value::as_str)
            .unwrap_or("")
            .to_string()
    };
    match dtype {
        "text_delta" => Some(CanonicalEvent::TextDelta {
            index,
            text: text_field("text"),
        }),
        "thinking_delta" => Some(CanonicalEvent::ThinkingDelta {
            index,
            text: text_field("thinking"),
        }),
        "input_json_delta" => Some(CanonicalEvent::ToolUseDelta {
            index,
            partial_json: text_field("partial_json"),
        }),
        _ => None,
    }
}

fn convert_message_delta(value: &Value, msg_id: &str) -> Option<CanonicalEvent> {
    let stop_reason = value
        .get("delta")
        .and_then(|d| d.get("stop_reason"))
        .and_then(Value::as_str)
        .map(CanonicalStopReason::from_anthropic);
    let usage = value.get("usage").map(|u| usage_from_value(Some(u)));
    if stop_reason.is_some() {
        return Some(CanonicalEvent::MessageStop {
            id: msg_id.to_string(),
            stop_reason,
        });
    }
    usage.map(CanonicalEvent::UsageDelta)
}

fn usage_from_value(v: Option<&Value>) -> CanonicalUsage {
    let Some(u) = v else {
        return CanonicalUsage::default();
    };
    CanonicalUsage {
        input_tokens: u.get("input_tokens").and_then(Value::as_u64).unwrap_or(0) as u32,
        output_tokens: u.get("output_tokens").and_then(Value::as_u64).unwrap_or(0) as u32,
    }
}