systemprompt-api 0.7.0

Axum-based HTTP server and API gateway for systemprompt.io AI governance infrastructure. Exposes governed agents, MCP, A2A, and admin endpoints with rate limiting and RBAC.
Documentation
use serde_json::Value;

use super::super::captures::CapturedToolUse;
use super::{PartialToolUse, TapState};

pub(super) fn drain_sse(state: &mut TapState) {
    loop {
        let Some(pos) = find_double_newline(&state.sse_buffer) else {
            return;
        };
        let frame_bytes: Vec<u8> = state.sse_buffer.drain(..pos + 2).collect();
        let frame = String::from_utf8_lossy(&frame_bytes);
        for line in frame.lines() {
            if let Some(data) = line.strip_prefix("data: ") {
                if data.trim() == "[DONE]" {
                    continue;
                }
                let Ok(json) = serde_json::from_str::<Value>(data) else {
                    continue;
                };
                handle_sse_event(state, &json);
            }
        }
    }
}

fn find_double_newline(buf: &[u8]) -> Option<usize> {
    buf.windows(2).position(|w| w == b"\n\n")
}

fn handle_sse_event(state: &mut TapState, event: &Value) {
    let Some(kind) = event.get("type").and_then(Value::as_str) else {
        return;
    };
    match kind {
        "message_start" => handle_message_start(state, event),
        "message_delta" => handle_message_delta(state, event),
        "content_block_start" => handle_content_block_start(state, event),
        "content_block_delta" => handle_content_block_delta(state, event),
        "content_block_stop" => handle_content_block_stop(state, event),
        _ => {},
    }
}

fn handle_message_start(state: &mut TapState, event: &Value) {
    let Some(message) = event.get("message") else {
        return;
    };
    if let Some(model) = message.get("model").and_then(Value::as_str) {
        if !model.is_empty() {
            state.served_model = Some(model.to_string());
        }
    }
    if let Some(usage) = message.get("usage") {
        if let Some(v) = usage.get("input_tokens").and_then(Value::as_u64) {
            state.input_tokens = v as u32;
        }
        if let Some(v) = usage.get("output_tokens").and_then(Value::as_u64) {
            state.output_tokens = v as u32;
        }
    }
}

fn handle_message_delta(state: &mut TapState, event: &Value) {
    let Some(usage) = event.get("usage") else {
        return;
    };
    if let Some(v) = usage.get("output_tokens").and_then(Value::as_u64) {
        state.output_tokens = v as u32;
    }
    if let Some(v) = usage.get("input_tokens").and_then(Value::as_u64) {
        state.input_tokens = v as u32;
    }
}

fn handle_content_block_start(state: &mut TapState, event: &Value) {
    let index = event.get("index").and_then(Value::as_i64).unwrap_or(-1);
    let Some(block) = event.get("content_block") else {
        return;
    };
    if block.get("type").and_then(Value::as_str) != Some("tool_use") {
        return;
    }
    let id = block
        .get("id")
        .and_then(Value::as_str)
        .unwrap_or("")
        .to_string();
    let name = block
        .get("name")
        .and_then(Value::as_str)
        .unwrap_or("")
        .to_string();
    state.tool_uses_in_progress.push(PartialToolUse {
        index,
        id,
        name,
        input_json: String::new(),
    });
}

fn handle_content_block_delta(state: &mut TapState, event: &Value) {
    let index = event.get("index").and_then(Value::as_i64).unwrap_or(-1);
    let Some(delta) = event.get("delta") else {
        return;
    };
    if delta.get("type").and_then(Value::as_str) != Some("input_json_delta") {
        return;
    }
    let Some(partial) = delta.get("partial_json").and_then(Value::as_str) else {
        return;
    };
    if let Some(pt) = state
        .tool_uses_in_progress
        .iter_mut()
        .find(|p| p.index == index)
    {
        pt.input_json.push_str(partial);
    }
}

fn handle_content_block_stop(state: &mut TapState, event: &Value) {
    let index = event.get("index").and_then(Value::as_i64).unwrap_or(-1);
    if let Some(pos) = state
        .tool_uses_in_progress
        .iter()
        .position(|p| p.index == index)
    {
        let done = state.tool_uses_in_progress.remove(pos);
        state.tool_uses_done.push(CapturedToolUse {
            ai_tool_call_id: done.id,
            tool_name: done.name,
            tool_input: if done.input_json.is_empty() {
                "{}".to_string()
            } else {
                done.input_json
            },
        });
    }
}

pub(super) fn finalize_partials(state: &mut TapState) {
    let leftover: Vec<PartialToolUse> = std::mem::take(&mut state.tool_uses_in_progress);
    for p in leftover {
        state.tool_uses_done.push(CapturedToolUse {
            ai_tool_call_id: p.id,
            tool_name: p.name,
            tool_input: if p.input_json.is_empty() {
                "{}".to_string()
            } else {
                p.input_json
            },
        });
    }
}