use outlet::{RequestData, RequestHandler, ResponseData};
use std::sync::Arc;
use underway::Job;
use uuid::Uuid;
use super::jobs::CompleteResponseInput;
use super::store::ONWARDS_RESPONSE_ID_HEADER;
use crate::tasks::TaskState;
#[derive(Clone)]
pub struct FusilladeOutletHandler {
job: Arc<Job<CompleteResponseInput, TaskState>>,
}
impl FusilladeOutletHandler {
pub fn new(job: Arc<Job<CompleteResponseInput, TaskState>>) -> Self {
Self { job }
}
fn extract_response_id(request: &RequestData) -> Option<String> {
Self::header_str(request, ONWARDS_RESPONSE_ID_HEADER).map(String::from)
}
fn extract_request_id(request: &RequestData) -> Option<Uuid> {
Self::header_str(request, "x-fusillade-request-id").and_then(|s| Uuid::parse_str(s).ok())
}
fn extract_batch_id(request: &RequestData) -> Option<Uuid> {
Self::header_str(request, "x-fusillade-batch-id").and_then(|s| Uuid::parse_str(s).ok())
}
fn extract_api_key(request: &RequestData) -> Option<String> {
Self::header_str(request, "authorization")
.and_then(|s| s.strip_prefix("Bearer "))
.map(String::from)
}
fn header_str<'a>(request: &'a RequestData, name: &str) -> Option<&'a str> {
request
.headers
.get(name)
.and_then(|values| values.first())
.and_then(|bytes| std::str::from_utf8(bytes).ok())
}
}
impl RequestHandler for FusilladeOutletHandler {
async fn handle_request(&self, _data: RequestData) {}
fn handle_response(&self, request_data: RequestData, response_data: ResponseData) -> impl std::future::Future<Output = ()> + Send {
let job = self.job.clone();
async move {
let response_id = match Self::extract_response_id(&request_data) {
Some(id) => id,
None => return,
};
let request_id = match Self::extract_request_id(&request_data) {
Some(id) => id,
None => {
tracing::warn!(response_id = %response_id, "Missing x-fusillade-request-id header on response — skipping enqueue");
return;
}
};
let batch_id = match Self::extract_batch_id(&request_data) {
Some(id) => id,
None => {
tracing::warn!(response_id = %response_id, "Missing x-fusillade-batch-id header on response — skipping enqueue");
return;
}
};
let status_code = response_data.status.as_u16();
let response_body = response_data
.body
.as_ref()
.and_then(|b| std::str::from_utf8(b).ok())
.unwrap_or("")
.to_string();
let request_body = request_data
.body
.as_ref()
.and_then(|b| std::str::from_utf8(b).ok())
.unwrap_or("")
.to_string();
let model = Self::header_str(&request_data, "x-onwards-model").unwrap_or("unknown").to_string();
let endpoint = Self::header_str(&request_data, "x-onwards-endpoint").unwrap_or("").to_string();
let api_key = Self::extract_api_key(&request_data);
if endpoint.is_empty() {
tracing::warn!(
response_id = %response_id,
uri = %request_data.uri,
"Missing x-onwards-endpoint header on captured request — complete-response will fail"
);
}
if let Err(e) = job
.enqueue(&CompleteResponseInput {
response_id: response_id.clone(),
status_code,
response_body,
batch_id,
request_id,
request_body,
model,
endpoint,
base_url: String::new(),
api_key,
})
.await
{
tracing::warn!(error = %e, response_id = %response_id, "Failed to enqueue complete-response job");
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use std::collections::HashMap;
use std::time::SystemTime;
fn make_request_data(headers: HashMap<String, Vec<Bytes>>) -> RequestData {
RequestData {
correlation_id: 1,
timestamp: SystemTime::now(),
method: axum::http::Method::POST,
uri: "/v1/responses".parse().unwrap(),
headers,
body: None,
trace_id: None,
span_id: None,
}
}
#[test]
fn test_extract_response_id_present() {
let mut headers = HashMap::new();
headers.insert(
ONWARDS_RESPONSE_ID_HEADER.to_string(),
vec![Bytes::from("resp_12345678-1234-1234-1234-123456789abc")],
);
let request = make_request_data(headers);
let id = FusilladeOutletHandler::extract_response_id(&request);
assert_eq!(id, Some("resp_12345678-1234-1234-1234-123456789abc".to_string()));
}
#[test]
fn test_extract_response_id_absent() {
let request = make_request_data(HashMap::new());
let id = FusilladeOutletHandler::extract_response_id(&request);
assert!(id.is_none());
}
#[test]
fn test_extract_response_id_not_skipped_for_realtime_with_fusillade_header() {
let mut headers = HashMap::new();
headers.insert("x-fusillade-request-id".to_string(), vec![Bytes::from("some-id")]);
headers.insert(
ONWARDS_RESPONSE_ID_HEADER.to_string(),
vec![Bytes::from("resp_12345678-1234-1234-1234-123456789abc")],
);
let request = make_request_data(headers);
let id = FusilladeOutletHandler::extract_response_id(&request);
assert_eq!(id, Some("resp_12345678-1234-1234-1234-123456789abc".to_string()));
}
#[test]
fn test_extract_request_id_present() {
let mut headers = HashMap::new();
let uuid_str = "12345678-1234-1234-1234-123456789abc";
headers.insert("x-fusillade-request-id".to_string(), vec![Bytes::from(uuid_str)]);
let request = make_request_data(headers);
let id = FusilladeOutletHandler::extract_request_id(&request);
assert_eq!(id, Some(Uuid::parse_str(uuid_str).unwrap()));
}
#[test]
fn test_extract_request_id_absent() {
let request = make_request_data(HashMap::new());
assert!(FusilladeOutletHandler::extract_request_id(&request).is_none());
}
#[test]
fn test_extract_request_id_invalid_uuid() {
let mut headers = HashMap::new();
headers.insert("x-fusillade-request-id".to_string(), vec![Bytes::from("not-a-uuid")]);
let request = make_request_data(headers);
assert!(FusilladeOutletHandler::extract_request_id(&request).is_none());
}
#[test]
fn test_extract_api_key_strips_bearer_prefix() {
let mut headers = HashMap::new();
headers.insert("authorization".to_string(), vec![Bytes::from("Bearer sk-test-123")]);
let request = make_request_data(headers);
assert_eq!(FusilladeOutletHandler::extract_api_key(&request), Some("sk-test-123".to_string()));
}
#[test]
fn test_extract_api_key_without_bearer_prefix_is_none() {
let mut headers = HashMap::new();
headers.insert("authorization".to_string(), vec![Bytes::from("sk-test-123")]);
let request = make_request_data(headers);
assert!(FusilladeOutletHandler::extract_api_key(&request).is_none());
}
#[test]
fn test_extract_api_key_absent() {
let request = make_request_data(HashMap::new());
assert!(FusilladeOutletHandler::extract_api_key(&request).is_none());
}
}