systemprompt-api 0.9.2

Axum-based HTTP server and API gateway for systemprompt.io AI governance infrastructure. Exposes governed agents, MCP, A2A, and admin endpoints with rate limiting and RBAC.
Documentation
use futures_util::StreamExt;
// JSON: protocol boundary — OpenAI Responses outbound wire format is dynamic
// JSON.
use serde_json::Value;

use super::super::super::canonical_response::{
    CanonicalEvent, CanonicalStopReason, CanonicalUsage, ContentBlockKind,
};

pub(super) fn sse_to_canonical_events<S>(
    stream: S,
    fallback_model: String,
) -> futures_util::stream::BoxStream<'static, Result<CanonicalEvent, String>>
where
    S: futures_util::Stream<Item = Result<bytes::Bytes, reqwest::Error>> + Send + 'static,
{
    use futures_util::stream;
    let initial = ResponsesStreamState {
        buf: Vec::new(),
        model: fallback_model,
        response_id: String::new(),
        started: false,
        items: Vec::new(),
    };
    let s = stream
        .map(|chunk| chunk.map_err(|e| e.to_string()))
        .scan(initial, |state, item| {
            let res = match item {
                Ok(bytes) => Some(drain_buffer(state, &bytes)),
                Err(e) => Some(vec![Err(e)]),
            };
            futures_util::future::ready(res)
        })
        .flat_map(stream::iter);
    s.boxed()
}

fn drain_buffer(
    state: &mut ResponsesStreamState,
    bytes: &bytes::Bytes,
) -> Vec<Result<CanonicalEvent, String>> {
    state.buf.extend_from_slice(bytes);
    let mut events: Vec<Result<CanonicalEvent, String>> = Vec::new();
    while let Some(pos) = find_double_newline(&state.buf) {
        let frame: Vec<u8> = state.buf.drain(..pos + 2).collect();
        let frame_str = String::from_utf8_lossy(&frame);
        let mut data_parts: Vec<&str> = Vec::new();
        for line in frame_str.lines() {
            if let Some(d) = line.strip_prefix("data: ") {
                data_parts.push(d);
            }
        }
        let joined = data_parts.join("\n");
        if joined.trim().is_empty() {
            continue;
        }
        if let Ok(value) = serde_json::from_str::<Value>(&joined) {
            handle_responses_event(state, &value, &mut events);
        }
    }
    events
}

fn find_double_newline(buf: &[u8]) -> Option<usize> {
    buf.windows(2).position(|w| w == b"\n\n")
}

struct ResponsesStreamState {
    buf: Vec<u8>,
    model: String,
    response_id: String,
    started: bool,
    items: Vec<ItemSlot>,
}

use super::slot::{ItemSlot, SlotKind, SlotKindMatch, lookup_canonical};

fn handle_responses_event(
    state: &mut ResponsesStreamState,
    value: &Value,
    events: &mut Vec<Result<CanonicalEvent, String>>,
) {
    let Some(kind) = value.get("type").and_then(Value::as_str) else {
        return;
    };
    match kind {
        "response.created" => handle_created(state, value, events),
        "response.output_item.added" => handle_item_added(state, value, events),
        "response.output_text.delta" => {
            emit_delta(
                state,
                value,
                SlotKindMatch::Message,
                events,
                DeltaShape::Text,
            );
        },
        "response.function_call_arguments.delta" => {
            emit_delta(
                state,
                value,
                SlotKindMatch::Function,
                events,
                DeltaShape::ToolUse,
            );
        },
        "response.reasoning_summary_text.delta" => {
            emit_delta(
                state,
                value,
                SlotKindMatch::Reasoning,
                events,
                DeltaShape::Thinking,
            );
        },
        "response.output_item.done" => handle_item_done(state, value, events),
        "response.completed" => handle_completed(state, value, events),
        "response.failed" | "error" => handle_error(value, events),
        _ => {},
    }
}

fn handle_created(
    state: &mut ResponsesStreamState,
    value: &Value,
    events: &mut Vec<Result<CanonicalEvent, String>>,
) {
    let response = value.get("response").unwrap_or(&Value::Null);
    let id = response
        .get("id")
        .and_then(Value::as_str)
        .unwrap_or("resp_unknown")
        .to_string();
    let model = response
        .get("model")
        .and_then(Value::as_str)
        .unwrap_or(&state.model)
        .to_string();
    state.model.clone_from(&model);
    state.response_id.clone_from(&id);
    state.started = true;
    events.push(Ok(CanonicalEvent::MessageStart {
        id,
        model,
        usage: CanonicalUsage::default(),
    }));
}

fn handle_item_added(
    state: &mut ResponsesStreamState,
    value: &Value,
    events: &mut Vec<Result<CanonicalEvent, String>>,
) {
    let output_index = value
        .get("output_index")
        .and_then(Value::as_i64)
        .unwrap_or(-1);
    let item = value.get("item").unwrap_or(&Value::Null);
    let item_type = item.get("type").and_then(Value::as_str).unwrap_or("");
    let canonical_index = state.items.len() as u32;
    let (kind, block) = match item_type {
        "message" => (SlotKind::Message, ContentBlockKind::Text),
        "function_call" => {
            let id = item
                .get("call_id")
                .and_then(Value::as_str)
                .or_else(|| item.get("id").and_then(Value::as_str))
                .unwrap_or("")
                .to_string();
            let name = item
                .get("name")
                .and_then(Value::as_str)
                .unwrap_or("")
                .to_string();
            (SlotKind::Function, ContentBlockKind::ToolUse { id, name })
        },
        "reasoning" => (
            SlotKind::Reasoning,
            ContentBlockKind::Thinking { signature: None },
        ),
        _ => return,
    };
    state.items.push(ItemSlot {
        output_index,
        canonical_index,
        kind,
    });
    events.push(Ok(CanonicalEvent::ContentBlockStart {
        index: canonical_index,
        block,
    }));
}

#[derive(Clone, Copy)]
enum DeltaShape {
    Text,
    ToolUse,
    Thinking,
}

fn emit_delta(
    state: &ResponsesStreamState,
    value: &Value,
    want: SlotKindMatch,
    events: &mut Vec<Result<CanonicalEvent, String>>,
    shape: DeltaShape,
) {
    let output_index = value
        .get("output_index")
        .and_then(Value::as_i64)
        .unwrap_or(-1);
    let Some(idx) = lookup_canonical(&state.items, output_index, want) else {
        return;
    };
    let delta = value.get("delta").and_then(Value::as_str).unwrap_or("");
    if delta.is_empty() {
        return;
    }
    let event = match shape {
        DeltaShape::Text => CanonicalEvent::TextDelta {
            index: idx,
            text: delta.to_string(),
        },
        DeltaShape::ToolUse => CanonicalEvent::ToolUseDelta {
            index: idx,
            partial_json: delta.to_string(),
        },
        DeltaShape::Thinking => CanonicalEvent::ThinkingDelta {
            index: idx,
            text: delta.to_string(),
        },
    };
    events.push(Ok(event));
}

fn handle_item_done(
    state: &ResponsesStreamState,
    value: &Value,
    events: &mut Vec<Result<CanonicalEvent, String>>,
) {
    let output_index = value
        .get("output_index")
        .and_then(Value::as_i64)
        .unwrap_or(-1);
    if let Some(slot) = state.items.iter().find(|s| s.output_index == output_index) {
        events.push(Ok(CanonicalEvent::ContentBlockStop {
            index: slot.canonical_index,
        }));
    }
}

fn handle_completed(
    state: &ResponsesStreamState,
    value: &Value,
    events: &mut Vec<Result<CanonicalEvent, String>>,
) {
    let response = value.get("response").unwrap_or(&Value::Null);
    let id = response
        .get("id")
        .and_then(Value::as_str)
        .filter(|s| !s.is_empty())
        .map_or_else(|| state.response_id.clone(), ToString::to_string);
    if let Some(usage) = response.get("usage") {
        let pull = |key: &str| usage.get(key).and_then(Value::as_u64).unwrap_or(0) as u32;
        events.push(Ok(CanonicalEvent::UsageDelta(CanonicalUsage {
            input_tokens: pull("input_tokens"),
            output_tokens: pull("output_tokens"),
        })));
    }
    events.push(Ok(CanonicalEvent::MessageStop {
        id,
        stop_reason: Some(CanonicalStopReason::EndTurn),
    }));
}

fn handle_error(value: &Value, events: &mut Vec<Result<CanonicalEvent, String>>) {
    let msg = value
        .get("error")
        .and_then(|e| e.get("message"))
        .and_then(Value::as_str)
        .unwrap_or("upstream error")
        .to_string();
    events.push(Ok(CanonicalEvent::Error(msg)));
}