use crate::config::Config;
use crate::request_logging::models::{AiRequest, AiResponse, ChatCompletionChunk, CompletionChunk, ParsedAIRequest, ResponsesRequest};
use async_openai::types::responses::ResponseStreamEvent;
use outlet::{RequestData, ResponseData};
use outlet_postgres::SerializationError;
use serde_json::Value;
use std::fmt;
use std::str;
use tracing::{error, instrument};
use uuid::Uuid;
use super::utils;
#[derive(Clone)]
pub enum Auth {
ApiKey { bearer_token: String },
None,
}
impl fmt::Debug for Auth {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Auth::ApiKey { .. } => f.debug_struct("ApiKey").field("bearer_token", &"<redacted>").finish(),
Auth::None => write!(f, "None"),
}
}
}
#[derive(Debug, Clone)]
#[allow(dead_code)] pub struct HttpAnalyticsRow {
pub instance_id: Uuid,
pub correlation_id: i64,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub method: String,
pub uri: String,
pub request_model: Option<String>,
pub response_model: Option<String>,
pub status_code: i32,
pub duration_ms: i64,
pub duration_to_first_byte_ms: Option<i64>,
pub prompt_tokens: i64,
pub completion_tokens: i64,
pub reasoning_tokens: i64,
pub total_tokens: i64,
pub response_type: String,
pub user_id: Option<Uuid>,
pub access_source: String,
pub input_price_per_token: Option<rust_decimal::Decimal>,
pub output_price_per_token: Option<rust_decimal::Decimal>,
pub server_address: String,
pub server_port: u16,
pub provider_name: Option<String>,
pub fusillade_batch_id: Option<Uuid>,
pub fusillade_request_id: Option<Uuid>,
pub custom_id: Option<String>,
pub request_origin: String,
pub batch_sla: String,
pub batch_request_source: String,
}
#[derive(Debug, Clone)]
pub struct UsageMetrics {
pub instance_id: Uuid,
pub correlation_id: i64,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub method: String,
pub uri: String,
pub request_model: Option<String>,
pub response_model: Option<String>,
pub status_code: i32,
pub duration_ms: i64,
pub duration_to_first_byte_ms: Option<i64>,
pub prompt_tokens: i64,
pub completion_tokens: i64,
pub reasoning_tokens: i64,
pub total_tokens: i64,
pub response_type: String,
pub server_address: String,
pub server_port: u16,
}
#[instrument(skip_all, name = "dwctl.parse_ai_request")]
pub fn parse_ai_request(request_data: &RequestData) -> Result<ParsedAIRequest, SerializationError> {
let headers = request_data
.headers
.iter()
.map(|(k, v)| (k.clone(), v.iter().map(|b| String::from_utf8_lossy(b).to_string()).collect()))
.collect();
let bytes = match &request_data.body {
Some(body) => body.as_ref(),
None => {
return Ok(ParsedAIRequest {
headers,
request: AiRequest::Other(Value::Null),
responses_request: None,
});
}
};
let body_str = String::from_utf8_lossy(bytes);
if body_str.trim().is_empty() {
return Ok(ParsedAIRequest {
headers,
request: AiRequest::Other(Value::Null),
responses_request: None,
});
}
let is_responses_path = request_data.uri.path().ends_with("/responses");
if is_responses_path {
return match serde_json::from_str::<Value>(&body_str) {
Ok(value) => {
let model = value.get("model").and_then(|v| v.as_str()).map(|s| s.to_string());
let stream = value.get("stream").and_then(|v| v.as_bool());
Ok(ParsedAIRequest {
headers,
request: AiRequest::Other(value),
responses_request: Some(ResponsesRequest { model, stream }),
})
}
Err(e) => {
let base64_encoded = base64::Engine::encode(&base64::engine::general_purpose::STANDARD, bytes);
Err(SerializationError {
fallback_data: format!("base64:{base64_encoded}"),
error: Box::new(e),
})
}
};
}
match serde_json::from_str(&body_str) {
Ok(request) => Ok(ParsedAIRequest {
headers,
request,
responses_request: None,
}),
Err(e) => {
let base64_encoded = base64::Engine::encode(&base64::engine::general_purpose::STANDARD, bytes);
Err(SerializationError {
fallback_data: format!("base64:{base64_encoded}"),
error: Box::new(e),
})
}
}
}
#[instrument(skip_all, name = "dwctl.parse_ai_response")]
pub fn parse_ai_response(request_data: &RequestData, response_data: &ResponseData) -> Result<AiResponse, SerializationError> {
let bytes = match &response_data.body {
Some(body) => body.as_ref(),
None => return Ok(AiResponse::Other(Value::Null)),
};
if bytes.is_empty() {
return Ok(AiResponse::Other(Value::Null));
}
let final_bytes = utils::decompress_response_if_needed(bytes, &response_data.headers)?;
let body_str = String::from_utf8_lossy(&final_bytes);
if body_str.trim().is_empty() {
return Ok(AiResponse::Other(Value::Null));
}
let fusillade_stream = request_data
.headers
.get("x-fusillade-stream")
.and_then(|values| values.first())
.and_then(|bytes| std::str::from_utf8(bytes).ok())
== Some("true");
let result = match parse_ai_request(request_data) {
Ok(parsed_request) => {
if let Some(responses_req) = &parsed_request.responses_request {
if responses_req.stream.unwrap_or(false) || fusillade_stream {
utils::parse_responses_streaming_response(&body_str)
} else {
utils::parse_responses_non_streaming_response(&body_str).or_else(|_| utils::parse_non_streaming_response(&body_str))
}
} else {
match parsed_request.request {
AiRequest::ChatCompletions(chat_req) if chat_req.stream.unwrap_or(false) || fusillade_stream => {
utils::parse_streaming_response(&body_str)
}
AiRequest::Completions(completion_req) if completion_req.stream.unwrap_or(false) || fusillade_stream => {
utils::parse_completions_streaming_response(&body_str)
}
_ => utils::parse_non_streaming_response(&body_str),
}
}
}
_ => utils::parse_non_streaming_response(&body_str),
};
result.map_err(|_| SerializationError {
fallback_data: format!(
"base64:{}",
base64::Engine::encode(&base64::engine::general_purpose::STANDARD, &final_bytes)
),
error: "Failed to parse response as JSON or SSE".into(),
})
}
impl UsageMetrics {
#[instrument(skip_all, name = "dwctl.extract_usage_metrics")]
pub fn extract(
instance_id: Uuid,
request_data: &RequestData,
response_data: &ResponseData,
parsed_response: &AiResponse,
config: &Config,
) -> Self {
let request_model = match parse_ai_request(request_data) {
Ok(parsed_request) => {
if let Some(responses_req) = parsed_request.responses_request {
responses_req.model
} else {
match parsed_request.request {
AiRequest::ChatCompletions(req) => Some(req.model),
AiRequest::Completions(req) => Some(req.model),
AiRequest::Embeddings(req) => Some(req.model),
AiRequest::Other(ref value) => {
let model = value.get("model").and_then(|v| v.as_str()).map(String::from);
if model.is_some() {
error!(
uri = %request_data.uri,
"Request body has a model field but failed typed deserialization — \
likely uses unsupported content types"
);
}
model
}
}
}
}
_ => None,
};
let response_metrics = TokenMetrics::from(parsed_response);
Self {
instance_id,
correlation_id: request_data.correlation_id as i64,
timestamp: chrono::DateTime::<chrono::Utc>::from(request_data.timestamp),
method: request_data.method.to_string(),
uri: request_data.uri.to_string(),
request_model,
response_model: response_metrics.response_model,
status_code: response_data.status.as_u16() as i32,
duration_ms: response_data.duration.as_millis() as i64,
duration_to_first_byte_ms: Some(response_data.duration_to_first_byte.as_millis() as i64),
prompt_tokens: response_metrics.prompt_tokens,
completion_tokens: response_metrics.completion_tokens,
reasoning_tokens: response_metrics.reasoning_tokens,
total_tokens: response_metrics.total_tokens,
response_type: response_metrics.response_type,
server_address: config.host.clone(),
server_port: config.port,
}
}
}
impl Auth {
#[instrument(skip_all, name = "dwctl.extract_auth")]
pub fn from_request(request_data: &RequestData, _config: &Config) -> Self {
if let Some(auth_header) = Self::get_header_value(request_data, "authorization")
&& let Some(bearer_token) = auth_header.strip_prefix("Bearer ")
{
return Auth::ApiKey {
bearer_token: bearer_token.to_string(),
};
}
Auth::None
}
fn get_header_value(request_data: &RequestData, header_name: &str) -> Option<String> {
request_data
.headers
.get(header_name)
.and_then(|values| values.first())
.and_then(|bytes| str::from_utf8(bytes).ok())
.map(|s| s.to_string())
}
}
#[derive(Debug, Clone)]
struct TokenMetrics {
prompt_tokens: i64,
completion_tokens: i64,
reasoning_tokens: i64,
total_tokens: i64,
response_type: String,
response_model: Option<String>,
}
fn extract_completion_reasoning_tokens(usage: &async_openai::types::chat::CompletionUsage) -> i64 {
usage
.completion_tokens_details
.as_ref()
.and_then(|d| d.reasoning_tokens)
.map(|t| t as i64)
.unwrap_or(0)
}
fn extract_response_reasoning_tokens(usage: &async_openai::types::responses::ResponseUsage) -> i64 {
usage.output_tokens_details.reasoning_tokens as i64
}
impl From<&AiResponse> for TokenMetrics {
fn from(response: &AiResponse) -> Self {
match response {
AiResponse::ChatCompletions(response) => {
if let Some(usage) = &response.usage {
Self {
prompt_tokens: usage.prompt_tokens as i64,
completion_tokens: usage.completion_tokens as i64,
reasoning_tokens: extract_completion_reasoning_tokens(usage),
total_tokens: usage.total_tokens as i64,
response_type: "chat_completion".to_string(),
response_model: Some(response.model.clone()),
}
} else {
Self {
prompt_tokens: 0,
completion_tokens: 0,
reasoning_tokens: 0,
total_tokens: 0,
response_type: "chat_completion".to_string(),
response_model: Some(response.model.clone()),
}
}
}
AiResponse::ChatCompletionsStream(chunks) => {
let last_normal_with_usage = chunks.iter().rev().find_map(|chunk| match chunk {
ChatCompletionChunk::Normal(normal_chunk) if normal_chunk.usage.is_some() => Some(normal_chunk),
_ => None,
});
let model = chunks.iter().find_map(|chunk| match chunk {
ChatCompletionChunk::Normal(c) => Some(c.model.clone()),
_ => None,
});
if let Some(chunk) = last_normal_with_usage {
if let Some(usage) = &chunk.usage {
Self {
prompt_tokens: usage.prompt_tokens as i64,
completion_tokens: usage.completion_tokens as i64,
reasoning_tokens: extract_completion_reasoning_tokens(usage),
total_tokens: usage.total_tokens as i64,
response_type: "chat_completion_stream".to_string(),
response_model: model,
}
} else {
Self {
prompt_tokens: 0,
completion_tokens: 0,
reasoning_tokens: 0,
total_tokens: 0,
response_type: "chat_completion_stream".to_string(),
response_model: model,
}
}
} else {
Self {
prompt_tokens: 0,
completion_tokens: 0,
reasoning_tokens: 0,
total_tokens: 0,
response_type: "chat_completion_stream".to_string(),
response_model: model,
}
}
}
AiResponse::CompletionsStream(chunks) => {
let last_normal_with_usage = chunks.iter().rev().find_map(|chunk| match chunk {
CompletionChunk::Normal(normal_chunk) if normal_chunk.usage.is_some() => Some(normal_chunk),
_ => None,
});
let model = chunks.iter().find_map(|chunk| match chunk {
CompletionChunk::Normal(c) => Some(c.model.clone()),
_ => None,
});
if let Some(chunk) = last_normal_with_usage {
if let Some(usage) = &chunk.usage {
Self {
prompt_tokens: usage.prompt_tokens as i64,
completion_tokens: usage.completion_tokens as i64,
reasoning_tokens: extract_completion_reasoning_tokens(usage),
total_tokens: usage.total_tokens as i64,
response_type: "completion_stream".to_string(),
response_model: model,
}
} else {
Self {
prompt_tokens: 0,
completion_tokens: 0,
reasoning_tokens: 0,
total_tokens: 0,
response_type: "completion_stream".to_string(),
response_model: model,
}
}
} else {
Self {
prompt_tokens: 0,
completion_tokens: 0,
reasoning_tokens: 0,
total_tokens: 0,
response_type: "completion_stream".to_string(),
response_model: model,
}
}
}
AiResponse::Completions(response) => {
if let Some(usage) = &response.usage {
Self {
prompt_tokens: usage.prompt_tokens as i64,
completion_tokens: usage.completion_tokens as i64,
reasoning_tokens: extract_completion_reasoning_tokens(usage),
total_tokens: usage.total_tokens as i64,
response_type: "completion".to_string(),
response_model: Some(response.model.clone()),
}
} else {
Self {
prompt_tokens: 0,
completion_tokens: 0,
reasoning_tokens: 0,
total_tokens: 0,
response_type: "completion".to_string(),
response_model: Some(response.model.clone()),
}
}
}
AiResponse::Embeddings(response) => {
let usage = &response.usage;
Self {
prompt_tokens: usage.prompt_tokens as i64,
completion_tokens: 0, reasoning_tokens: 0,
total_tokens: usage.total_tokens as i64,
response_type: "embeddings".to_string(),
response_model: Some(response.model.clone()),
}
}
AiResponse::Base64Embeddings(response) => {
let usage = &response.usage;
Self {
prompt_tokens: usage.prompt_tokens as i64,
completion_tokens: 0, reasoning_tokens: 0,
total_tokens: usage.total_tokens as i64,
response_type: "base64_embeddings".to_string(),
response_model: Some(response.model.clone()),
}
}
AiResponse::Responses(response) => {
if let Some(usage) = &response.usage {
Self {
prompt_tokens: usage.input_tokens as i64,
completion_tokens: usage.output_tokens as i64,
reasoning_tokens: extract_response_reasoning_tokens(usage),
total_tokens: usage.total_tokens as i64,
response_type: "response".to_string(),
response_model: Some(response.model.clone()),
}
} else {
Self {
prompt_tokens: 0,
completion_tokens: 0,
reasoning_tokens: 0,
total_tokens: 0,
response_type: "response".to_string(),
response_model: Some(response.model.clone()),
}
}
}
AiResponse::ResponsesStream(events) => {
let completed = events.iter().find_map(|e| {
if let ResponseStreamEvent::ResponseCompleted(ev) = e {
Some(&ev.response)
} else {
None
}
});
let model = completed.map(|r| r.model.clone());
if let Some(usage) = completed.and_then(|r| r.usage.as_ref()) {
Self {
prompt_tokens: usage.input_tokens as i64,
completion_tokens: usage.output_tokens as i64,
reasoning_tokens: extract_response_reasoning_tokens(usage),
total_tokens: usage.total_tokens as i64,
response_type: "response_stream".to_string(),
response_model: model,
}
} else {
Self {
prompt_tokens: 0,
completion_tokens: 0,
reasoning_tokens: 0,
total_tokens: 0,
response_type: "response_stream".to_string(),
response_model: model,
}
}
}
AiResponse::Other(_) => Self {
prompt_tokens: 0,
completion_tokens: 0,
reasoning_tokens: 0,
total_tokens: 0,
response_type: "other".to_string(),
response_model: None,
},
}
}
}
#[cfg(test)]
mod tests {
use super::{UsageMetrics, parse_ai_request, parse_ai_response};
use crate::request_logging::models::{AiRequest, AiResponse};
use async_openai::types::chat::{CreateChatCompletionResponse, CreateChatCompletionStreamResponse};
use async_openai::types::completions::CreateCompletionResponse;
use async_openai::types::embeddings::{CreateBase64EmbeddingResponse, CreateEmbeddingResponse, EmbeddingUsage};
use axum::http::{Method, StatusCode, Uri};
use bytes::Bytes;
use outlet::{RequestData, ResponseData};
use std::{
collections::HashMap,
time::{Duration, SystemTime},
};
use uuid::Uuid;
#[test]
fn test_parse_ai_request_no_body() {
let request_data = RequestData {
correlation_id: 123,
timestamp: SystemTime::now(),
method: Method::POST,
uri: "/test".parse::<Uri>().unwrap(),
headers: HashMap::new(),
body: None,
trace_id: None,
span_id: None,
};
let result = parse_ai_request(&request_data).unwrap();
match result.request {
AiRequest::Other(value) => assert!(value.is_null()),
_ => panic!("Expected AiRequest::Other(null)"),
}
}
#[test]
fn test_parse_ai_request_empty_bytes() {
let request_data = RequestData {
correlation_id: 123,
timestamp: SystemTime::now(),
method: Method::POST,
uri: "/test".parse::<Uri>().unwrap(),
headers: HashMap::new(),
body: Some(Bytes::new()), trace_id: None,
span_id: None,
};
let result = parse_ai_request(&request_data).unwrap();
match result.request {
AiRequest::Other(value) => assert!(value.is_null()),
_ => panic!("Expected AiRequest::Other(null)"),
}
}
#[test]
fn test_parse_ai_request_invalid_json() {
let request_data = RequestData {
correlation_id: 123,
timestamp: SystemTime::now(),
method: Method::POST,
uri: "/test".parse::<Uri>().unwrap(),
headers: HashMap::new(),
body: Some(Bytes::from("invalid json")),
trace_id: None,
span_id: None,
};
let result = parse_ai_request(&request_data);
assert!(result.is_err());
let error = result.unwrap_err();
assert!(error.fallback_data.starts_with("base64:"));
}
#[test]
fn test_parse_ai_request_valid_json() {
let json_body = r#"{"model": "gpt-4", "messages": [{"role": "user", "content": "hello"}]}"#;
let request_data = RequestData {
correlation_id: 123,
timestamp: SystemTime::now(),
method: Method::POST,
uri: "/test".parse::<Uri>().unwrap(),
headers: HashMap::new(),
body: Some(Bytes::from(json_body)),
trace_id: None,
span_id: None,
};
let result = parse_ai_request(&request_data).unwrap();
match result.request {
AiRequest::ChatCompletions(req) => {
assert_eq!(req.model, "gpt-4");
assert_eq!(req.messages.len(), 1);
}
_ => panic!("Expected AiRequest::ChatCompletions"),
}
}
#[test]
fn test_parse_ai_request_completions() {
let json_body = r#"{"model": "gpt-3.5-turbo-instruct", "prompt": "Say hello"}"#;
let request_data = RequestData {
correlation_id: 123,
timestamp: SystemTime::now(),
method: Method::POST,
uri: "/test".parse::<Uri>().unwrap(),
headers: HashMap::new(),
body: Some(Bytes::from(json_body)),
trace_id: None,
span_id: None,
};
let result = parse_ai_request(&request_data).unwrap();
match result.request {
AiRequest::Completions(req) => {
assert_eq!(req.model, "gpt-3.5-turbo-instruct");
}
_ => panic!("Expected AiRequest::Completions"),
}
}
#[test]
fn test_parse_ai_request_embeddings() {
let json_body = r#"{"model": "text-embedding-ada-002", "input": "hello world"}"#;
let request_data = RequestData {
correlation_id: 123,
timestamp: SystemTime::now(),
method: Method::POST,
uri: "/test".parse::<Uri>().unwrap(),
headers: HashMap::new(),
body: Some(Bytes::from(json_body)),
trace_id: None,
span_id: None,
};
let result = parse_ai_request(&request_data).unwrap();
match result.request {
AiRequest::Embeddings(req) => {
assert_eq!(req.model, "text-embedding-ada-002");
}
_ => panic!("Expected AiRequest::Embeddings"),
}
}
#[test]
fn test_parse_ai_response_no_body() {
let request_data = RequestData {
correlation_id: 123,
timestamp: SystemTime::now(),
method: Method::POST,
uri: "/test".parse::<Uri>().unwrap(),
headers: HashMap::new(),
body: None,
trace_id: None,
span_id: None,
};
let response_data = ResponseData {
correlation_id: 123,
timestamp: SystemTime::now(),
status: StatusCode::OK,
headers: HashMap::new(),
body: None,
duration: Duration::from_millis(100),
duration_to_first_byte: Duration::from_millis(50),
};
let result = parse_ai_response(&request_data, &response_data).unwrap();
match result {
AiResponse::Other(value) => assert!(value.is_null()),
_ => panic!("Expected AiResponse::Other(null)"),
}
}
#[test]
fn test_parse_ai_response_empty_body() {
let request_data = RequestData {
correlation_id: 123,
timestamp: SystemTime::now(),
method: Method::POST,
uri: "/test".parse::<Uri>().unwrap(),
headers: HashMap::new(),
body: None,
trace_id: None,
span_id: None,
};
let response_data = ResponseData {
correlation_id: 123,
timestamp: SystemTime::now(),
status: StatusCode::OK,
headers: HashMap::new(),
body: Some(Bytes::new()), duration: Duration::from_millis(100),
duration_to_first_byte: Duration::from_millis(50),
};
let result = parse_ai_response(&request_data, &response_data).unwrap();
match result {
AiResponse::Other(value) => assert!(value.is_null()),
_ => panic!("Expected AiResponse::Other(null)"),
}
}
#[test]
fn test_parse_ai_response_valid_json() {
let request_data = RequestData {
correlation_id: 123,
timestamp: SystemTime::now(),
method: Method::POST,
uri: "/test".parse::<Uri>().unwrap(),
headers: HashMap::new(),
body: None,
trace_id: None,
span_id: None,
};
let json_response = r#"{
"id": "chatcmpl-123",
"object": "chat.completion",
"created": 1677652288,
"model": "gpt-4",
"choices": [],
"usage": {"prompt_tokens": 10, "completion_tokens": 20, "total_tokens": 30}
}"#;
let response_data = ResponseData {
correlation_id: 123,
timestamp: SystemTime::now(),
status: StatusCode::OK,
headers: HashMap::new(),
body: Some(Bytes::from(json_response)),
duration: Duration::from_millis(100),
duration_to_first_byte: Duration::from_millis(50),
};
let result = parse_ai_response(&request_data, &response_data).unwrap();
match result {
AiResponse::ChatCompletions(response) => {
assert_eq!(response.model, "gpt-4");
assert_eq!(response.id, "chatcmpl-123");
}
_ => panic!("Expected AiResponse::ChatCompletions"),
}
}
#[test]
fn test_parse_ai_response_streaming() {
let request_json = r#"{"model": "gpt-4", "messages": [{"role": "user", "content": "hello"}], "stream": true}"#;
let request_data = RequestData {
correlation_id: 123,
timestamp: SystemTime::now(),
method: Method::POST,
uri: "/test".parse::<Uri>().unwrap(),
headers: HashMap::new(),
body: Some(Bytes::from(request_json)),
trace_id: None,
span_id: None,
};
let sse_response = "data: {\"id\":\"chatcmpl-123\",\"object\":\"chat.completion.chunk\",\"created\":1677652288,\"model\":\"gpt-4\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"Hello\"}}]}\n\ndata: [DONE]\n\n";
let response_data = ResponseData {
correlation_id: 123,
timestamp: SystemTime::now(),
status: StatusCode::OK,
headers: HashMap::new(),
body: Some(Bytes::from(sse_response)),
duration: Duration::from_millis(100),
duration_to_first_byte: Duration::from_millis(50),
};
let result = parse_ai_response(&request_data, &response_data).unwrap();
match result {
AiResponse::ChatCompletionsStream(chunks) => {
assert!(!chunks.is_empty());
}
_ => panic!("Expected AiResponse::ChatCompletionsStream"),
}
}
#[test]
fn test_parse_ai_response_fusillade_stream_header() {
let request_json = r#"{"model": "gpt-4", "messages": [{"role": "user", "content": "hello"}], "stream": false}"#;
let mut headers = HashMap::new();
headers.insert("x-fusillade-stream".to_string(), vec![Bytes::from("true")]);
let request_data = RequestData {
correlation_id: 123,
timestamp: SystemTime::now(),
method: Method::POST,
uri: "/test".parse::<Uri>().unwrap(),
headers,
body: Some(Bytes::from(request_json)),
trace_id: None,
span_id: None,
};
let sse_response = "data: {\"id\":\"chatcmpl-123\",\"object\":\"chat.completion.chunk\",\"created\":1677652288,\"model\":\"gpt-4\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"Hello\"}}],\"usage\":null}\n\ndata: {\"id\":\"chatcmpl-123\",\"object\":\"chat.completion.chunk\",\"created\":1677652288,\"model\":\"gpt-4\",\"choices\":[],\"usage\":{\"prompt_tokens\":10,\"completion_tokens\":5,\"total_tokens\":15}}\n\ndata: [DONE]\n\n";
let response_data = ResponseData {
correlation_id: 123,
timestamp: SystemTime::now(),
status: StatusCode::OK,
headers: HashMap::new(),
body: Some(Bytes::from(sse_response)),
duration: Duration::from_millis(100),
duration_to_first_byte: Duration::from_millis(50),
};
let result = parse_ai_response(&request_data, &response_data).unwrap();
match &result {
AiResponse::ChatCompletionsStream(chunks) => {
assert!(!chunks.is_empty(), "Expected parsed SSE chunks");
let metrics = UsageMetrics::extract(
uuid::Uuid::nil(),
&request_data,
&response_data,
&result,
&crate::config::Config::default(),
);
assert_eq!(metrics.prompt_tokens, 10);
assert_eq!(metrics.completion_tokens, 5);
assert_eq!(metrics.total_tokens, 15);
}
other => panic!("Expected ChatCompletionsStream, got {:?}", std::mem::discriminant(other)),
}
}
#[test]
fn test_parse_ai_response_fusillade_completions_stream() {
let request_json = r#"{"model": "gpt-3.5-turbo-instruct", "prompt": "Hello", "stream": false}"#;
let mut headers = HashMap::new();
headers.insert("x-fusillade-stream".to_string(), vec![Bytes::from("true")]);
let request_data = RequestData {
correlation_id: 123,
timestamp: SystemTime::now(),
method: Method::POST,
uri: "/test".parse::<Uri>().unwrap(),
headers,
body: Some(Bytes::from(request_json)),
trace_id: None,
span_id: None,
};
let sse_response = "data: {\"id\":\"cmpl-123\",\"object\":\"text_completion\",\"created\":1677652288,\"model\":\"gpt-3.5-turbo-instruct\",\"choices\":[{\"text\":\" world\",\"index\":0}]}\n\ndata: {\"id\":\"cmpl-123\",\"object\":\"text_completion\",\"created\":1677652288,\"model\":\"gpt-3.5-turbo-instruct\",\"choices\":[],\"usage\":{\"prompt_tokens\":8,\"completion_tokens\":12,\"total_tokens\":20}}\n\ndata: [DONE]\n\n";
let response_data = ResponseData {
correlation_id: 123,
timestamp: SystemTime::now(),
status: StatusCode::OK,
headers: HashMap::new(),
body: Some(Bytes::from(sse_response)),
duration: Duration::from_millis(100),
duration_to_first_byte: Duration::from_millis(50),
};
let result = parse_ai_response(&request_data, &response_data).unwrap();
match &result {
AiResponse::CompletionsStream(chunks) => {
assert!(!chunks.is_empty(), "Expected parsed SSE chunks");
let metrics = UsageMetrics::extract(
uuid::Uuid::nil(),
&request_data,
&response_data,
&result,
&crate::config::Config::default(),
);
assert_eq!(metrics.prompt_tokens, 8);
assert_eq!(metrics.completion_tokens, 12);
assert_eq!(metrics.total_tokens, 20);
}
other => panic!("Expected CompletionsStream, got {:?}", std::mem::discriminant(other)),
}
}
#[test]
fn test_parse_ai_response_embeddings() {
let request_data = RequestData {
correlation_id: 123,
timestamp: SystemTime::now(),
method: Method::POST,
uri: "/test".parse::<Uri>().unwrap(),
headers: HashMap::new(),
body: None,
trace_id: None,
span_id: None,
};
let embeddings_response = r#"{
"object": "list",
"data": [{"object": "embedding", "embedding": [0.1, 0.2], "index": 0}],
"model": "text-embedding-ada-002",
"usage": {"prompt_tokens": 5, "total_tokens": 5}
}"#;
let response_data = ResponseData {
correlation_id: 123,
timestamp: SystemTime::now(),
status: StatusCode::OK,
headers: HashMap::new(),
body: Some(Bytes::from(embeddings_response)),
duration: Duration::from_millis(100),
duration_to_first_byte: Duration::from_millis(50),
};
let result = parse_ai_response(&request_data, &response_data).unwrap();
match result {
AiResponse::Embeddings(response) => {
assert_eq!(response.model, "text-embedding-ada-002");
assert_eq!(response.object, "list");
}
_ => panic!("Expected AiResponse::Embeddings"),
}
}
#[test]
fn test_parse_ai_response_invalid_json() {
let request_data = RequestData {
correlation_id: 123,
timestamp: SystemTime::now(),
method: Method::POST,
uri: "/test".parse::<Uri>().unwrap(),
headers: HashMap::new(),
body: None,
trace_id: None,
span_id: None,
};
let response_data = ResponseData {
correlation_id: 123,
timestamp: SystemTime::now(),
status: StatusCode::OK,
headers: HashMap::new(),
body: Some(Bytes::from("invalid json response")),
duration: Duration::from_millis(100),
duration_to_first_byte: Duration::from_millis(50),
};
let result = parse_ai_response(&request_data, &response_data);
assert!(result.is_err());
let error = result.unwrap_err();
assert!(error.fallback_data.starts_with("base64:"));
}
#[test]
fn test_analytics_metrics_extract_basic() {
let instance_id = Uuid::new_v4();
let request_data = RequestData {
correlation_id: 12345,
timestamp: SystemTime::now(),
method: Method::POST,
uri: "/v1/chat/completions".parse::<Uri>().unwrap(),
headers: HashMap::new(),
body: None,
trace_id: None,
span_id: None,
};
let response_data = ResponseData {
correlation_id: 12345,
timestamp: SystemTime::now(),
status: StatusCode::OK,
headers: HashMap::new(),
body: None,
duration: Duration::from_millis(250),
duration_to_first_byte: Duration::from_millis(50),
};
let parsed_response = AiResponse::Other(serde_json::Value::Null);
let metrics = UsageMetrics::extract(
instance_id,
&request_data,
&response_data,
&parsed_response,
&crate::test::utils::create_test_config(),
);
assert_eq!(metrics.instance_id, instance_id);
assert_eq!(metrics.correlation_id, 12345);
assert_eq!(metrics.method, "POST");
assert_eq!(metrics.uri, "/v1/chat/completions");
assert_eq!(metrics.request_model, None);
assert_eq!(metrics.response_model, None);
assert_eq!(metrics.status_code, 200);
assert_eq!(metrics.duration_ms, 250);
assert_eq!(metrics.duration_to_first_byte_ms, Some(50));
assert_eq!(metrics.prompt_tokens, 0);
assert_eq!(metrics.completion_tokens, 0);
assert_eq!(metrics.total_tokens, 0);
assert_eq!(metrics.response_type, "other");
}
#[test]
fn test_analytics_metrics_extract_with_tokens() {
let instance_id = Uuid::new_v4();
let request_json = r#"{"model": "gpt-4", "messages": [{"role": "user", "content": "hello"}]}"#;
let request_data = RequestData {
correlation_id: 12345,
timestamp: SystemTime::now(),
method: Method::POST,
uri: "/v1/chat/completions".parse::<Uri>().unwrap(),
headers: HashMap::new(),
body: Some(Bytes::from(request_json)),
trace_id: None,
span_id: None,
};
let response_data = ResponseData {
correlation_id: 12345,
timestamp: SystemTime::now(),
status: StatusCode::OK,
headers: HashMap::new(),
body: None,
duration: Duration::from_millis(500),
duration_to_first_byte: Duration::from_millis(50),
};
#[allow(deprecated)]
let chat_response = CreateChatCompletionResponse {
id: "chatcmpl-123".to_string(),
object: "chat.completion".to_string(),
created: 1677652288,
model: "gpt-5".to_string(),
choices: vec![],
usage: Some(async_openai::types::chat::CompletionUsage {
prompt_tokens: 15,
completion_tokens: 25,
total_tokens: 40,
prompt_tokens_details: None,
completion_tokens_details: None,
}),
system_fingerprint: None,
service_tier: None,
};
let parsed_response = AiResponse::ChatCompletions(chat_response);
let metrics = UsageMetrics::extract(
instance_id,
&request_data,
&response_data,
&parsed_response,
&crate::test::utils::create_test_config(),
);
assert_eq!(metrics.instance_id, instance_id);
assert_eq!(metrics.correlation_id, 12345);
assert_eq!(metrics.method, "POST");
assert_eq!(metrics.uri, "/v1/chat/completions");
assert_eq!(metrics.request_model, Some("gpt-4".to_string()));
assert_eq!(metrics.response_model, Some("gpt-5".to_string()));
assert_eq!(metrics.status_code, 200);
assert_eq!(metrics.duration_ms, 500);
assert_eq!(metrics.prompt_tokens, 15);
assert_eq!(metrics.completion_tokens, 25);
assert_eq!(metrics.reasoning_tokens, 0);
assert_eq!(metrics.total_tokens, 40);
assert_eq!(metrics.response_type, "chat_completion");
}
#[test]
fn test_analytics_metrics_extract_streaming_tokens() {
let instance_id = Uuid::new_v4();
let request_data = RequestData {
correlation_id: 12345,
timestamp: SystemTime::now(),
method: Method::POST,
uri: "/v1/chat/completions".parse::<Uri>().unwrap(),
headers: HashMap::new(),
body: None,
trace_id: None,
span_id: None,
};
let response_data = ResponseData {
correlation_id: 12345,
timestamp: SystemTime::now(),
status: StatusCode::OK,
headers: HashMap::new(),
body: None,
duration: Duration::from_millis(300),
duration_to_first_byte: Duration::from_millis(50),
};
#[allow(deprecated)]
let stream_chunk = CreateChatCompletionStreamResponse {
id: "chatcmpl-123".to_string(),
object: "chat.completion.chunk".to_string(),
created: 1677652288,
model: "gpt-4".to_string(),
choices: vec![],
usage: Some(async_openai::types::chat::CompletionUsage {
prompt_tokens: 8,
completion_tokens: 12,
total_tokens: 20,
prompt_tokens_details: None,
completion_tokens_details: None,
}),
system_fingerprint: None,
service_tier: None,
};
let parsed_response =
AiResponse::ChatCompletionsStream(vec![crate::request_logging::models::ChatCompletionChunk::Normal(stream_chunk)]);
let metrics = UsageMetrics::extract(
instance_id,
&request_data,
&response_data,
&parsed_response,
&crate::test::utils::create_test_config(),
);
assert_eq!(metrics.prompt_tokens, 8);
assert_eq!(metrics.completion_tokens, 12);
assert_eq!(metrics.reasoning_tokens, 0);
assert_eq!(metrics.total_tokens, 20);
assert_eq!(metrics.response_type, "chat_completion_stream");
}
#[test]
fn test_analytics_metrics_extract_chat_reasoning_tokens() {
let instance_id = Uuid::new_v4();
let request_json = r#"{"model": "gpt-5", "messages": [{"role": "user", "content": "hello"}]}"#;
let request_data = RequestData {
correlation_id: 12345,
timestamp: SystemTime::now(),
method: Method::POST,
uri: "/v1/chat/completions".parse::<Uri>().unwrap(),
headers: HashMap::new(),
body: Some(Bytes::from(request_json)),
trace_id: None,
span_id: None,
};
let response_data = ResponseData {
correlation_id: 12345,
timestamp: SystemTime::now(),
status: StatusCode::OK,
headers: HashMap::new(),
body: None,
duration: Duration::from_millis(500),
duration_to_first_byte: Duration::from_millis(50),
};
let chat_response: CreateChatCompletionResponse = serde_json::from_value(serde_json::json!({
"id": "chatcmpl-123",
"object": "chat.completion",
"created": 1677652288,
"model": "gpt-5",
"choices": [],
"usage": {
"prompt_tokens": 15,
"completion_tokens": 25,
"total_tokens": 40,
"completion_tokens_details": {
"reasoning_tokens": 11
}
}
}))
.unwrap();
let parsed_response = AiResponse::ChatCompletions(chat_response);
let metrics = UsageMetrics::extract(
instance_id,
&request_data,
&response_data,
&parsed_response,
&crate::test::utils::create_test_config(),
);
assert_eq!(metrics.reasoning_tokens, 11);
assert_eq!(metrics.completion_tokens, 25);
assert_eq!(metrics.total_tokens, 40);
}
#[test]
fn test_analytics_metrics_extract_embeddings_tokens() {
let instance_id = Uuid::new_v4();
let request_data = RequestData {
correlation_id: 12345,
timestamp: SystemTime::now(),
method: Method::POST,
uri: "/v1/embeddings".parse::<Uri>().unwrap(),
headers: HashMap::new(),
body: None,
trace_id: None,
span_id: None,
};
let response_data = ResponseData {
correlation_id: 12345,
timestamp: SystemTime::now(),
status: StatusCode::OK,
headers: HashMap::new(),
body: None,
duration: Duration::from_millis(150),
duration_to_first_byte: Duration::from_millis(50),
};
let embeddings_response = CreateEmbeddingResponse {
object: "list".to_string(),
data: vec![],
model: "text-embedding-ada-002".to_string(),
usage: EmbeddingUsage {
prompt_tokens: 6,
total_tokens: 6,
},
};
let parsed_response = AiResponse::Embeddings(embeddings_response);
let metrics = UsageMetrics::extract(
instance_id,
&request_data,
&response_data,
&parsed_response,
&crate::test::utils::create_test_config(),
);
assert_eq!(metrics.prompt_tokens, 6);
assert_eq!(metrics.completion_tokens, 0); assert_eq!(metrics.total_tokens, 6);
assert_eq!(metrics.response_type, "embeddings");
}
#[test]
fn test_analytics_metrics_extract_completions_tokens() {
let instance_id = Uuid::new_v4();
let request_data = RequestData {
correlation_id: 12345,
timestamp: SystemTime::now(),
method: Method::POST,
uri: "/v1/completions".parse::<Uri>().unwrap(),
headers: HashMap::new(),
body: None,
trace_id: None,
span_id: None,
};
let response_data = ResponseData {
correlation_id: 12345,
timestamp: SystemTime::now(),
status: StatusCode::OK,
headers: HashMap::new(),
body: None,
duration: Duration::from_millis(400),
duration_to_first_byte: Duration::from_millis(50),
};
#[allow(deprecated)]
let completions_response = CreateCompletionResponse {
id: "cmpl-123".to_string(),
object: "text_completion".to_string(),
created: 1677652288,
model: "gpt-3.5-turbo-instruct".to_string(),
choices: vec![],
usage: Some(async_openai::types::chat::CompletionUsage {
prompt_tokens: 10,
completion_tokens: 15,
total_tokens: 25,
prompt_tokens_details: None,
completion_tokens_details: None,
}),
system_fingerprint: None,
};
let parsed_response = AiResponse::Completions(completions_response);
let metrics = UsageMetrics::extract(
instance_id,
&request_data,
&response_data,
&parsed_response,
&crate::test::utils::create_test_config(),
);
assert_eq!(metrics.prompt_tokens, 10);
assert_eq!(metrics.completion_tokens, 15);
assert_eq!(metrics.total_tokens, 25);
assert_eq!(metrics.response_type, "completion");
}
#[test]
fn test_analytics_metrics_extract_base64_embeddings_tokens() {
let instance_id = Uuid::new_v4();
let request_data = RequestData {
correlation_id: 12345,
timestamp: SystemTime::now(),
method: Method::POST,
uri: "/v1/embeddings".parse::<Uri>().unwrap(),
headers: HashMap::new(),
body: None,
trace_id: None,
span_id: None,
};
let response_data = ResponseData {
correlation_id: 12345,
timestamp: SystemTime::now(),
status: StatusCode::OK,
headers: HashMap::new(),
body: None,
duration: Duration::from_millis(200),
duration_to_first_byte: Duration::from_millis(50),
};
let base64_embeddings_response = CreateBase64EmbeddingResponse {
object: "list".to_string(),
data: vec![],
model: "text-embedding-3-large".to_string(),
usage: EmbeddingUsage {
prompt_tokens: 4,
total_tokens: 4,
},
};
let parsed_response = AiResponse::Base64Embeddings(base64_embeddings_response);
let metrics = UsageMetrics::extract(
instance_id,
&request_data,
&response_data,
&parsed_response,
&crate::test::utils::create_test_config(),
);
assert_eq!(metrics.prompt_tokens, 4);
assert_eq!(metrics.completion_tokens, 0); assert_eq!(metrics.total_tokens, 4);
assert_eq!(metrics.response_type, "base64_embeddings");
}
fn responses_api_body(usage: bool) -> String {
let usage_json = if usage {
r#","usage":{"input_tokens":15,"input_tokens_details":{"cached_tokens":0},"output_tokens":25,"output_tokens_details":{"reasoning_tokens":0},"total_tokens":40}"#
} else {
""
};
format!(
r#"{{"id":"resp_123","object":"response","created_at":1234567890,"model":"gpt-4o","status":"completed","output":[]{usage_json}}}"#
)
}
fn responses_request_data(stream: Option<bool>) -> RequestData {
let stream_field = match stream {
Some(true) => r#","stream":true"#,
Some(false) => r#","stream":false"#,
None => "",
};
let body = format!(r#"{{"model":"gpt-4o","input":"tell me a joke"{stream_field}}}"#);
RequestData {
correlation_id: 1,
timestamp: SystemTime::now(),
method: Method::POST,
uri: "/v1/responses".parse::<Uri>().unwrap(),
headers: HashMap::new(),
body: Some(Bytes::from(body)),
trace_id: None,
span_id: None,
}
}
fn responses_response_data(body: String) -> ResponseData {
ResponseData {
correlation_id: 1,
timestamp: SystemTime::now(),
status: StatusCode::OK,
headers: HashMap::new(),
body: Some(Bytes::from(body)),
duration: Duration::from_millis(100),
duration_to_first_byte: Duration::from_millis(50),
}
}
#[test]
fn test_parse_ai_request_responses_path_not_classified_as_embeddings() {
let result = parse_ai_request(&responses_request_data(None)).unwrap();
let rr = result.responses_request.expect("responses_request should be set");
assert_eq!(rr.model, Some("gpt-4o".to_string()));
assert_eq!(rr.stream, None);
match result.request {
AiRequest::Other(_) => {}
_ => panic!("expected AiRequest::Other for /v1/responses, not Embeddings or ChatCompletions"),
}
}
#[test]
fn test_parse_ai_request_responses_path_stream_flag() {
let result = parse_ai_request(&responses_request_data(Some(true))).unwrap();
let rr = result.responses_request.unwrap();
assert_eq!(rr.stream, Some(true));
}
#[test]
fn test_parse_ai_response_responses_non_streaming() {
let request_data = responses_request_data(None);
let response_data = responses_response_data(responses_api_body(true));
let result = parse_ai_response(&request_data, &response_data).unwrap();
match result {
AiResponse::Responses(resp) => {
assert_eq!(resp.model, "gpt-4o");
let usage = resp.usage.expect("usage should be present");
assert_eq!(usage.input_tokens, 15);
assert_eq!(usage.output_tokens, 25);
assert_eq!(usage.output_tokens_details.reasoning_tokens, 0);
assert_eq!(usage.total_tokens, 40);
}
_ => panic!("expected AiResponse::Responses"),
}
}
#[test]
fn test_analytics_metrics_extract_responses_reasoning_tokens() {
let instance_id = Uuid::new_v4();
let request_data = responses_request_data(None);
let response_data = responses_response_data(
r#"{"id":"resp_123","object":"response","created_at":1234567890,"model":"gpt-4o","status":"completed","output":[],"usage":{"input_tokens":15,"input_tokens_details":{"cached_tokens":0},"output_tokens":25,"output_tokens_details":{"reasoning_tokens":9},"total_tokens":40}}"#
.to_string(),
);
let parsed_response = parse_ai_response(&request_data, &response_data).unwrap();
let metrics = UsageMetrics::extract(
instance_id,
&request_data,
&response_data,
&parsed_response,
&crate::test::utils::create_test_config(),
);
assert_eq!(metrics.reasoning_tokens, 9);
assert_eq!(metrics.completion_tokens, 25);
assert_eq!(metrics.total_tokens, 40);
assert_eq!(metrics.response_type, "response");
}
#[test]
fn test_parse_ai_response_responses_error_body_falls_back_to_other() {
let request_data = responses_request_data(None);
let error_json = r#"{"error":{"message":"invalid request","type":"invalid_request_error","code":"model_not_found"}}"#;
let response_data = ResponseData {
correlation_id: 1,
timestamp: SystemTime::now(),
status: StatusCode::BAD_REQUEST,
headers: HashMap::new(),
body: Some(Bytes::from(error_json)),
duration: Duration::from_millis(100),
duration_to_first_byte: Duration::from_millis(50),
};
let result = parse_ai_response(&request_data, &response_data).unwrap();
match result {
AiResponse::Other(_) => {}
_ => panic!("expected AiResponse::Other for error body, got something else"),
}
}
#[test]
fn test_parse_ai_response_responses_streaming() {
let request_data = responses_request_data(Some(true));
let completed_data = responses_api_body(true);
let sse_body = format!(
"data: {{\"type\":\"response.output_text.delta\",\"sequence_number\":1,\"item_id\":\"item_1\",\"output_index\":0,\"content_index\":0,\"delta\":\"Hello\"}}\n\ndata: {{\"type\":\"response.completed\",\"sequence_number\":5,\"response\":{completed_data}}}\n\n"
);
let response_data = responses_response_data(sse_body);
let result = parse_ai_response(&request_data, &response_data).unwrap();
match result {
AiResponse::ResponsesStream(events) => {
assert!(!events.is_empty(), "should have parsed at least the completed event");
let has_completed = events
.iter()
.any(|e| matches!(e, async_openai::types::responses::ResponseStreamEvent::ResponseCompleted(_)));
assert!(has_completed, "should contain a ResponseCompleted event");
}
_ => panic!("expected AiResponse::ResponsesStream"),
}
}
#[test]
fn test_analytics_metrics_extract_responses_tokens() {
let instance_id = Uuid::new_v4();
let request_data = responses_request_data(None);
let response_data = responses_response_data(responses_api_body(true));
let parsed_response = parse_ai_response(&request_data, &response_data).unwrap();
let metrics = UsageMetrics::extract(
instance_id,
&request_data,
&response_data,
&parsed_response,
&crate::test::utils::create_test_config(),
);
assert_eq!(metrics.request_model, Some("gpt-4o".to_string()));
assert_eq!(metrics.response_model, Some("gpt-4o".to_string()));
assert_eq!(metrics.prompt_tokens, 15);
assert_eq!(metrics.completion_tokens, 25);
assert_eq!(metrics.total_tokens, 40);
assert_eq!(metrics.response_type, "response");
}
#[test]
fn test_analytics_metrics_extract_responses_streaming_tokens() {
let instance_id = Uuid::new_v4();
let request_data = responses_request_data(Some(true));
let completed_data = responses_api_body(true);
let sse_body = format!("data: {{\"type\":\"response.completed\",\"sequence_number\":5,\"response\":{completed_data}}}\n\n");
let response_data = responses_response_data(sse_body);
let parsed_response = parse_ai_response(&request_data, &response_data).unwrap();
let metrics = UsageMetrics::extract(
instance_id,
&request_data,
&response_data,
&parsed_response,
&crate::test::utils::create_test_config(),
);
assert_eq!(metrics.request_model, Some("gpt-4o".to_string()));
assert_eq!(metrics.response_model, Some("gpt-4o".to_string()));
assert_eq!(metrics.prompt_tokens, 15);
assert_eq!(metrics.completion_tokens, 25);
assert_eq!(metrics.total_tokens, 40);
assert_eq!(metrics.response_type, "response_stream");
}
}