use crate::config::Config;
use crate::request_logging::AiResponse;
use crate::request_logging::batcher::{AnalyticsSender, RawAnalyticsRecord};
use crate::request_logging::serializers::{Auth, UsageMetrics, parse_ai_response};
use crate::request_logging::utils::{extract_header_as_string, extract_header_as_uuid};
use axum::http::Uri;
use metrics::counter;
use outlet::{RequestData, RequestHandler, ResponseData};
use serde_json::Value;
use tracing::{Instrument, error, info_span, warn};
use uuid::Uuid;
pub struct AnalyticsHandler {
sender: AnalyticsSender,
instance_id: Uuid,
config: Config,
}
impl AnalyticsHandler {
pub fn new(sender: AnalyticsSender, instance_id: Uuid, config: Config) -> Self {
Self {
sender,
instance_id,
config,
}
}
fn usage_required_endpoint(uri: &Uri) -> Option<&'static str> {
match uri.path() {
path if path.ends_with("/v1/chat/completions") || path.ends_with("/chat/completions") => Some("/v1/chat/completions"),
path if path.ends_with("/v1/completions") || path.ends_with("/completions") => Some("/v1/completions"),
path if path.ends_with("/v1/responses") || path.ends_with("/responses") => Some("/v1/responses"),
_ => None,
}
}
fn is_fusillade_stream(request_data: &RequestData) -> bool {
request_data
.headers
.get("x-fusillade-stream")
.and_then(|values| values.first())
.and_then(|bytes| std::str::from_utf8(bytes).ok())
== Some("true")
}
}
impl RequestHandler for AnalyticsHandler {
async fn handle_request(&self, _data: RequestData) {
}
async fn handle_response(&self, request_data: RequestData, response_data: ResponseData) {
let correlation_id = request_data.correlation_id;
let span = info_span!(
"dwctl.analytics_handler",
correlation_id = correlation_id,
status = %response_data.status
);
async {
let usage_required_endpoint = Self::usage_required_endpoint(&request_data.uri);
let fusillade_stream = Self::is_fusillade_stream(&request_data);
let parse_result = parse_ai_response(&request_data, &response_data);
let metrics_response = match &parse_result {
Ok(response) => response.clone(),
Err(e) => {
if response_data.status.is_success() {
tracing::warn!(
correlation_id = correlation_id,
uri = %request_data.uri,
error = %e.error,
"Failed to parse successful AI response — tokens will be zero"
);
if let Some(endpoint) = usage_required_endpoint {
counter!(
"dwctl_usage_extraction_failures_total",
"endpoint" => endpoint,
"reason" => "parse_error"
)
.increment(1);
error!(
correlation_id = correlation_id,
uri = %request_data.uri,
endpoint,
fusillade_stream,
error = %e.error,
"CRITICAL: failed to serialize usage for successful generative response"
);
}
}
AiResponse::Other(Value::Null)
}
};
let metrics = UsageMetrics::extract(self.instance_id, &request_data, &response_data, &metrics_response, &self.config);
if response_data.status.is_success()
&& metrics.total_tokens == 0
&& let Some(endpoint) = usage_required_endpoint
{
counter!(
"dwctl_usage_extraction_failures_total",
"endpoint" => endpoint,
"reason" => "missing_usage"
)
.increment(1);
error!(
correlation_id = correlation_id,
uri = %request_data.uri,
endpoint,
response_type = %metrics.response_type,
fusillade_stream,
request_model = ?metrics.request_model,
response_model = ?metrics.response_model,
"CRITICAL: successful generative response recorded zero tokens"
);
}
let auth = Auth::from_request(&request_data, &self.config);
let fusillade_batch_id = extract_header_as_uuid(&request_data, "x-fusillade-batch-id");
let fusillade_request_id = extract_header_as_uuid(&request_data, "x-fusillade-request-id");
let custom_id = extract_header_as_string(&request_data, "x-fusillade-custom-id");
let batch_completion_window = extract_header_as_string(&request_data, "x-fusillade-batch-completion-window");
let batch_request_source = extract_header_as_string(&request_data, "x-fusillade-batch-request-source").unwrap_or_default();
let batch_created_at = extract_header_as_string(&request_data, "x-fusillade-batch-created-at")
.and_then(|s| s.parse::<chrono::DateTime<chrono::Utc>>().ok());
let bearer_token = match &auth {
Auth::ApiKey { bearer_token } => Some(bearer_token.clone()),
Auth::None => None,
};
let record = RawAnalyticsRecord {
instance_id: metrics.instance_id,
correlation_id: metrics.correlation_id,
timestamp: metrics.timestamp,
method: metrics.method,
uri: metrics.uri,
request_model: metrics.request_model,
response_model: metrics.response_model,
status_code: metrics.status_code,
duration_ms: metrics.duration_ms,
duration_to_first_byte_ms: metrics.duration_to_first_byte_ms,
prompt_tokens: metrics.prompt_tokens,
completion_tokens: metrics.completion_tokens,
reasoning_tokens: metrics.reasoning_tokens,
total_tokens: metrics.total_tokens,
response_type: metrics.response_type,
server_address: metrics.server_address,
server_port: metrics.server_port,
bearer_token,
fusillade_batch_id,
fusillade_request_id,
custom_id,
batch_completion_window,
batch_created_at,
batch_request_source,
trace_id: request_data.trace_id.clone(),
};
if let Err(e) = self.sender.send(record).await {
counter!("dwctl_analytics_send_errors_total").increment(1);
warn!(
correlation_id = correlation_id,
error = %e,
"Failed to send analytics record to batcher - channel may be full or closed"
);
}
}
.instrument(span)
.await;
}
}
#[cfg(test)]
mod tests {
use super::*;
use axum::http::{Method, StatusCode, Uri};
use std::collections::HashMap;
use std::time::{Duration, SystemTime};
use tokio::sync::mpsc;
fn create_test_request_data() -> RequestData {
RequestData {
correlation_id: 123,
timestamp: SystemTime::now(),
method: Method::POST,
uri: Uri::from_static("/ai/v1/chat/completions"),
headers: HashMap::new(),
body: None,
trace_id: None,
span_id: None,
}
}
fn create_test_response_data() -> ResponseData {
ResponseData {
correlation_id: 123,
timestamp: SystemTime::now(),
status: StatusCode::OK,
headers: HashMap::new(),
body: None,
duration_to_first_byte: Duration::from_millis(10),
duration: Duration::from_millis(100),
}
}
#[test]
fn test_analytics_handler_creation() {
let (tx, _rx) = mpsc::channel::<RawAnalyticsRecord>(100);
let config = Config::default();
let _handler = AnalyticsHandler::new(tx, Uuid::new_v4(), config);
}
#[test]
fn test_request_data_creation() {
let data = create_test_request_data();
assert_eq!(data.correlation_id, 123);
assert_eq!(data.method, Method::POST);
}
#[test]
fn test_response_data_creation() {
let data = create_test_response_data();
assert_eq!(data.correlation_id, 123);
assert_eq!(data.status, StatusCode::OK);
}
#[test]
fn test_usage_required_endpoint_matches_proxied_and_v1_paths() {
assert_eq!(
AnalyticsHandler::usage_required_endpoint(&Uri::from_static("/v1/chat/completions")),
Some("/v1/chat/completions")
);
assert_eq!(
AnalyticsHandler::usage_required_endpoint(&Uri::from_static("/chat/completions")),
Some("/v1/chat/completions")
);
assert_eq!(
AnalyticsHandler::usage_required_endpoint(&Uri::from_static("/v1/completions")),
Some("/v1/completions")
);
assert_eq!(
AnalyticsHandler::usage_required_endpoint(&Uri::from_static("/completions")),
Some("/v1/completions")
);
assert_eq!(
AnalyticsHandler::usage_required_endpoint(&Uri::from_static("/v1/responses")),
Some("/v1/responses")
);
assert_eq!(
AnalyticsHandler::usage_required_endpoint(&Uri::from_static("/responses")),
Some("/v1/responses")
);
assert_eq!(AnalyticsHandler::usage_required_endpoint(&Uri::from_static("/embeddings")), None);
}
#[tokio::test]
async fn test_handler_sends_to_channel() {
let (tx, mut rx) = mpsc::channel::<RawAnalyticsRecord>(100);
let config = Config::default();
let handler = AnalyticsHandler::new(tx, Uuid::new_v4(), config);
let request_data = create_test_request_data();
let response_data = create_test_response_data();
handler.handle_response(request_data, response_data).await;
let record = rx.try_recv().expect("Should have received a record");
assert_eq!(record.correlation_id, 123);
assert_eq!(record.method, "POST");
assert!(record.uri.contains("chat/completions"));
}
}