dwctl 8.40.0

The Doubleword Control Layer - A self-hostable observability and analytics platform for LLM applications
/// Body transform that injects streaming flags for generative requests.
///
/// For `/completions` and `/chat/completions`, it injects `stream_options.include_usage = true`
/// into streaming requests so providers report token usage in the final SSE chunk.
///
/// Also handles the `X-Fusillade-Stream: true` header:
/// - for `/completions` and `/chat/completions`, sets `stream: true` and
///   `stream_options.include_usage = true`
/// - for `/responses`, sets `stream: true` so the provider returns SSE events that include
///   the final `response.completed` usage payload.
pub fn stream_usage_transform(path: &str, headers: &axum::http::HeaderMap, body_bytes: &[u8]) -> Option<axum::body::Bytes> {
    let fusillade_stream = headers.get("x-fusillade-stream").and_then(|v| v.to_str().ok()) == Some("true");

    if let Ok(mut json_body) = serde_json::from_slice::<serde_json::Value>(body_bytes) {
        let request_streaming =
            json_body.as_object().and_then(|obj| obj.get("stream")).and_then(|v| v.as_bool()) == Some(true) || fusillade_stream;

        if path.ends_with("/completions") && request_streaming {
            let obj = json_body.as_object_mut()?;
            if fusillade_stream {
                obj.insert("stream".to_string(), serde_json::Value::Bool(true));
            }

            obj.entry("stream_options")
                .or_insert_with(|| serde_json::json!({}))
                .as_object_mut()?
                .insert("include_usage".to_string(), serde_json::json!(true));

            if let Ok(bytes) = serde_json::to_vec(&json_body) {
                return Some(axum::body::Bytes::from(bytes));
            }
        }

        if path.ends_with("/responses") && fusillade_stream {
            let obj = json_body.as_object_mut()?;
            obj.insert("stream".to_string(), serde_json::Value::Bool(true));
            if let Ok(bytes) = serde_json::to_vec(&json_body) {
                return Some(axum::body::Bytes::from(bytes));
            }
        }
    }

    None
}

#[cfg(test)]
mod tests {
    use super::stream_usage_transform;
    use axum::http::HeaderMap;

    fn call(path: &str, body: &serde_json::Value) -> Option<serde_json::Value> {
        let bytes = serde_json::to_vec(body).unwrap();
        stream_usage_transform(path, &HeaderMap::new(), &bytes).map(|b| serde_json::from_slice(&b).unwrap())
    }

    #[test]
    fn injects_stream_options_when_missing() {
        let body = serde_json::json!({
            "model": "gpt-4",
            "messages": [{"role": "user", "content": "hi"}],
            "stream": true
        });
        let result = call("/chat/completions", &body).expect("should transform");
        assert_eq!(result["stream_options"]["include_usage"], true);
    }

    #[test]
    fn preserves_existing_stream_options_fields() {
        let body = serde_json::json!({
            "model": "gpt-4",
            "messages": [{"role": "user", "content": "hi"}],
            "stream": true,
            "stream_options": {"include_usage": false}
        });
        let result = call("/chat/completions", &body).expect("should transform");
        // include_usage should be overwritten to true
        assert_eq!(result["stream_options"]["include_usage"], true);
    }

    #[test]
    fn skips_non_streaming_requests() {
        let body = serde_json::json!({
            "model": "gpt-4",
            "messages": [{"role": "user", "content": "hi"}],
            "stream": false
        });
        assert!(call("/chat/completions", &body).is_none());
    }

    #[test]
    fn skips_when_stream_absent() {
        let body = serde_json::json!({
            "model": "gpt-4",
            "messages": [{"role": "user", "content": "hi"}]
        });
        assert!(call("/chat/completions", &body).is_none());
    }

    #[test]
    fn skips_non_completions_paths() {
        let body = serde_json::json!({
            "model": "gpt-4",
            "input": "hello",
            "stream": true
        });
        assert!(call("/embeddings", &body).is_none());
    }

    #[test]
    fn matches_legacy_completions_path() {
        let body = serde_json::json!({
            "model": "gpt-4",
            "prompt": "hello",
            "stream": true
        });
        let result = call("/completions", &body).expect("should transform");
        assert_eq!(result["stream_options"]["include_usage"], true);
    }

    #[test]
    fn matches_nested_chat_completions_path() {
        let body = serde_json::json!({
            "model": "gpt-4",
            "messages": [{"role": "user", "content": "hi"}],
            "stream": true
        });
        let result = call("/v1/chat/completions", &body).expect("should transform");
        assert_eq!(result["stream_options"]["include_usage"], true);
    }

    #[test]
    fn handles_null_stream_options_gracefully() {
        let body = serde_json::json!({
            "model": "gpt-4",
            "messages": [{"role": "user", "content": "hi"}],
            "stream": true,
            "stream_options": null
        });
        // stream_options is null, as_object_mut() returns None, ? returns None
        assert!(call("/chat/completions", &body).is_none());
    }

    fn call_with_headers(path: &str, headers: &HeaderMap, body: &serde_json::Value) -> Option<serde_json::Value> {
        let bytes = serde_json::to_vec(body).unwrap();
        stream_usage_transform(path, headers, &bytes).map(|b| serde_json::from_slice(&b).unwrap())
    }

    fn fusillade_stream_headers() -> HeaderMap {
        let mut headers = HeaderMap::new();
        headers.insert("x-fusillade-stream", "true".parse().unwrap());
        headers
    }

    #[test]
    fn fusillade_stream_injects_stream_and_usage() {
        let body = serde_json::json!({
            "model": "gpt-4",
            "messages": [{"role": "user", "content": "hi"}]
        });
        let result = call_with_headers("/chat/completions", &fusillade_stream_headers(), &body).expect("should transform");
        assert_eq!(result["stream"], true);
        assert_eq!(result["stream_options"]["include_usage"], true);
    }

    #[test]
    fn fusillade_stream_overrides_stream_false() {
        let body = serde_json::json!({
            "model": "gpt-4",
            "messages": [{"role": "user", "content": "hi"}],
            "stream": false
        });
        let result = call_with_headers("/chat/completions", &fusillade_stream_headers(), &body).expect("should transform");
        assert_eq!(result["stream"], true);
        assert_eq!(result["stream_options"]["include_usage"], true);
    }

    #[test]
    fn fusillade_stream_skips_non_completions() {
        let body = serde_json::json!({
            "model": "gpt-4",
            "input": "hello"
        });
        assert!(call_with_headers("/embeddings", &fusillade_stream_headers(), &body).is_none());
    }

    #[test]
    fn fusillade_stream_skips_responses_endpoint() {
        let body = serde_json::json!({
            "model": "gpt-4o",
            "input": "hello"
        });
        let result = call_with_headers("/v1/responses", &fusillade_stream_headers(), &body).expect("should transform");
        assert_eq!(result["stream"], true);
        assert!(result.get("stream_options").is_none());
    }
}