use serde::{Deserialize, Serialize};
use uuid::Uuid;
use super::store::{self as response_store};
#[derive(Debug, Serialize, Deserialize)]
pub struct CreateResponseInput {
pub batch_id: Uuid,
pub request_id: Uuid,
pub body: String,
pub model: String,
pub base_url: String,
pub endpoint: String,
pub api_key: Option<String>,
}
pub async fn build_create_response_job<P: sqlx_pool_router::PoolProvider + Clone + Send + Sync + 'static>(
pool: sqlx::PgPool,
state: crate::tasks::TaskState<P>,
) -> anyhow::Result<underway::Job<CreateResponseInput, crate::tasks::TaskState<P>>> {
use underway::Job;
use underway::job::To;
use underway::task::Error as TaskError;
Job::<CreateResponseInput, _>::builder()
.state(state)
.step(|cx, input: CreateResponseInput| async move {
if let Some(ref key) = input.api_key {
if let Err(msg) =
crate::error_enrichment::validate_api_key_model_access(cx.state.dwctl_pool.clone(), key, &input.model).await
{
tracing::debug!(
request_id = %input.request_id,
error = %msg,
"Skipping response creation — model access denied"
);
return To::done();
}
} else {
tracing::debug!(
request_id = %input.request_id,
"Skipping response creation — no API key"
);
return To::done();
}
match response_store::request_exists(&cx.state.request_manager, input.request_id).await {
Ok(true) => {
tracing::debug!(
request_id = %input.request_id,
"Skipping response creation — row already exists (complete-response won the race)"
);
return To::done();
}
Ok(false) => {}
Err(e) => {
tracing::error!(
request_id = %input.request_id,
error = %e,
"Failed to check for existing request before create"
);
return Err(TaskError::Retryable(e.to_string()));
}
}
let created_by = response_store::lookup_created_by(&cx.state.dwctl_pool, input.api_key.as_deref()).await;
tracing::debug!(
request_id = %input.request_id,
model = %input.model,
endpoint = %input.endpoint,
"create-response inserting fusillade row"
);
let batch_input = fusillade::CreateSingleRequestBatchInput {
batch_id: Some(input.batch_id),
request_id: input.request_id,
body: input.body,
model: input.model.clone(),
base_url: input.base_url,
endpoint: input.endpoint,
completion_window: "0s".to_string(),
initial_state: "processing".to_string(),
api_key: input.api_key,
created_by,
};
if let Err(e) = fusillade::Storage::create_single_request_batch(&*cx.state.request_manager, batch_input).await {
if let Ok(true) = response_store::request_exists(&cx.state.request_manager, input.request_id).await {
tracing::debug!(
request_id = %input.request_id,
"create-response lost race after pre-check — row now exists, done"
);
return To::done();
}
tracing::error!(
request_id = %input.request_id,
error = %e,
"Failed to create realtime tracking batch"
);
return Err(TaskError::Retryable(e.to_string()));
}
tracing::debug!(
request_id = %input.request_id,
model = %input.model,
"Created realtime tracking batch in fusillade"
);
To::done()
})
.name("create-response")
.pool(pool)
.build()
.await
.map_err(Into::into)
}
#[derive(Debug, Serialize, Deserialize)]
pub struct CompleteResponseInput {
pub response_id: String,
pub status_code: u16,
pub response_body: String,
pub batch_id: Uuid,
pub request_id: Uuid,
pub request_body: String,
pub model: String,
pub endpoint: String,
pub base_url: String,
pub api_key: Option<String>,
}
pub async fn build_complete_response_job<P: sqlx_pool_router::PoolProvider + Clone + Send + Sync + 'static>(
pool: sqlx::PgPool,
state: crate::tasks::TaskState<P>,
) -> anyhow::Result<underway::Job<CompleteResponseInput, crate::tasks::TaskState<P>>> {
use underway::Job;
use underway::job::To;
use underway::task::Error as TaskError;
Job::<CompleteResponseInput, _>::builder()
.state(state)
.step(|cx, input: CompleteResponseInput| async move {
let create_ctx = response_store::CreateContext {
batch_id: input.batch_id,
request_id: input.request_id,
request_body: &input.request_body,
model: &input.model,
endpoint: &input.endpoint,
base_url: &input.base_url,
api_key: input.api_key.as_deref(),
};
if let Err(e) = response_store::complete_response_idempotent(
&cx.state.request_manager,
&cx.state.dwctl_pool,
&input.response_id,
&input.response_body,
input.status_code,
create_ctx,
)
.await
{
tracing::error!(
response_id = %input.response_id,
error = %e,
"Failed to complete response in fusillade"
);
return Err(TaskError::Retryable(e.to_string()));
}
if input.status_code >= 400 {
tracing::debug!(
response_id = %input.response_id,
status_code = input.status_code,
body_size = input.response_body.len(),
"Upstream error response stored in fusillade"
);
} else {
tracing::debug!(
response_id = %input.response_id,
status_code = input.status_code,
body_size = input.response_body.len(),
"Response completed in fusillade"
);
}
To::done()
})
.name("complete-response")
.pool(pool)
.build()
.await
.map_err(Into::into)
}