use actix_web::{Responder, get, post, web};
use serde_json::{Value, json};
use sqlx::Row;
use tokio::sync::OnceCell;
use tracing::{error, info, warn};
use crate::AppState;
pub use crate::api::gateway::contracts::{
GATEWAY_DEFERRED_KIND_DELETE as DEFERRED_KIND_GATEWAY_DELETE,
GATEWAY_DEFERRED_KIND_FETCH as DEFERRED_KIND_GATEWAY_FETCH,
GATEWAY_DEFERRED_KIND_INSERT as DEFERRED_KIND_GATEWAY_INSERT,
GATEWAY_DEFERRED_KIND_QUERY as DEFERRED_KIND_GATEWAY_QUERY,
GATEWAY_DEFERRED_KIND_UPDATE as DEFERRED_KIND_GATEWAY_UPDATE,
};
use crate::api::gateway::contracts::{
GatewayDeferredRequest, GatewayDeleteRequest, GatewayFetchRequest, GatewayInsertRequest,
extract_update_payload, parse_conditions_from_body,
};
use crate::api::gateway::update::table_id_map::get_resource_id_key;
use crate::api::response::{api_success, conflict, not_found, service_unavailable};
use crate::drivers::postgresql::column_resolver::resolve_information_schema_targets;
use crate::drivers::postgresql::raw_sql::execute_postgres_sql;
use crate::drivers::postgresql::sqlx_driver::{
PostgresInsertError, delete_rows, fetch_rows_with_columns, insert_row, update_rows,
};
use crate::parser::query_builder::Condition;
use crate::utils::format::normalize_column_name;
const DEFAULT_MAX_DEFERRED_ATTEMPTS: i32 = 3;
fn max_deferred_attempts() -> i32 {
std::env::var("ATHENA_DEFERRED_MAX_ATTEMPTS")
.ok()
.and_then(|value| value.parse::<i32>().ok())
.filter(|value| *value >= 1)
.unwrap_or(DEFAULT_MAX_DEFERRED_ATTEMPTS)
}
fn is_supported_deferred_kind(kind: &str) -> bool {
matches!(
kind,
DEFERRED_KIND_GATEWAY_QUERY
| DEFERRED_KIND_GATEWAY_FETCH
| DEFERRED_KIND_GATEWAY_INSERT
| DEFERRED_KIND_GATEWAY_UPDATE
| DEFERRED_KIND_GATEWAY_DELETE
)
}
fn deferred_request_should_invalidate_cache(value: &Value) -> bool {
match value {
Value::Null => false,
Value::Object(map) => !map.is_empty(),
_ => true,
}
}
async fn invalidate_deferred_gateway_cache(state: &AppState, client_name: &str, table_name: &str) {
let table_prefix = format!("{}:", table_name.trim());
let normalized_client = client_name.trim().to_ascii_lowercase();
let _ = state.cache.invalidate_entries_if({
move |key, _value| {
let key = key.as_str();
key.starts_with(&table_prefix)
|| key.starts_with("query_count:")
|| key.ends_with(":__raw_json")
|| key.to_ascii_lowercase().contains(&normalized_client)
}
});
state.cache.run_pending_tasks().await;
}
fn parse_fetch_sort_options_from_body(
body: &Value,
force_snake_case: bool,
) -> Option<(String, bool)> {
let sort_object = body
.get("sort_by")
.or_else(|| body.get("sortBy"))
.and_then(Value::as_object)?;
let field = sort_object
.get("field")
.or_else(|| sort_object.get("column"))
.and_then(Value::as_str)?;
let column = normalize_column_name(field, force_snake_case);
if column.is_empty() {
return None;
}
let ascending = sort_object
.get("direction")
.and_then(Value::as_str)
.map(|value| matches!(value.to_ascii_lowercase().as_str(), "asc" | "ascending"))
.unwrap_or(true);
Some((column, ascending))
}
fn normalize_table_name_for_resource_id_lookup(
table_name: &str,
allow_schema_names_prefixed_as_table_name: bool,
) -> String {
let trimmed = table_name.trim();
if trimmed.is_empty() {
return String::new();
}
if !allow_schema_names_prefixed_as_table_name {
return trimmed.to_string();
}
match resolve_information_schema_targets(trimmed, true) {
Ok((schema, table)) if schema.eq_ignore_ascii_case("public") => table,
_ => trimmed.to_string(),
}
}
fn postgres_insert_error_message(err: PostgresInsertError) -> String {
match err {
PostgresInsertError::InvalidTableName => "invalid table name".to_string(),
PostgresInsertError::InvalidPayload(reason) => format!("invalid payload: {reason}"),
PostgresInsertError::NoValidColumns => "no valid columns provided".to_string(),
PostgresInsertError::MissingReturnColumn => {
"insert succeeded without returned row".to_string()
}
PostgresInsertError::SqlExecution { message, sql_state } => {
if let Some(sql_state) = sql_state {
format!("{message} (sql_state={sql_state})")
} else {
message
}
}
}
}
fn deferred_client_name(request: &GatewayDeferredRequest) -> Result<String, String> {
let client_name = request.client_name.trim().to_string();
if client_name.is_empty() {
Err("deferred payload missing client_name".to_string())
} else {
Ok(client_name)
}
}
fn deferred_request_body(request: &GatewayDeferredRequest) -> Result<Value, String> {
request
.request_body
.clone()
.ok_or_else(|| "deferred payload missing request_body".to_string())
}
const DEFERRED_QUEUE_TABLE_INIT_SQL: &[&str] = &[
r#"
CREATE TABLE IF NOT EXISTS public.gateway_deferred_request_queue (
request_id text PRIMARY KEY,
method text NOT NULL,
route text NOT NULL,
client_name text,
request_bytes bigint,
payload jsonb NOT NULL,
status text NOT NULL DEFAULT 'queued',
deferred_reason text,
attempts integer NOT NULL DEFAULT 0,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now(),
completed_at timestamptz,
error_message text,
result_summary jsonb
)
"#,
r#"
CREATE INDEX IF NOT EXISTS idx_gateway_deferred_request_queue_status
ON public.gateway_deferred_request_queue(status)
"#,
r#"
CREATE INDEX IF NOT EXISTS idx_gateway_deferred_request_queue_created_at
ON public.gateway_deferred_request_queue(created_at DESC)
"#,
r#"
CREATE INDEX IF NOT EXISTS idx_gateway_deferred_request_queue_status_created
ON public.gateway_deferred_request_queue(status, created_at ASC)
"#,
r#"
ALTER TABLE public.gateway_deferred_request_queue
ADD COLUMN IF NOT EXISTS result_summary jsonb
"#,
r#"
ALTER TABLE public.gateway_deferred_request_queue
ADD COLUMN IF NOT EXISTS deferred_reason text
"#,
r#"
ALTER TABLE public.gateway_deferred_request_queue
ADD COLUMN IF NOT EXISTS attempts integer NOT NULL DEFAULT 0
"#,
r#"
ALTER TABLE public.gateway_deferred_request_queue
ADD COLUMN IF NOT EXISTS next_attempt_at timestamptz
"#,
];
static DEFERRED_QUEUE_TABLE_INIT: OnceCell<()> = OnceCell::const_new();
async fn ensure_deferred_queue_table(state: &AppState) -> Result<(), String> {
DEFERRED_QUEUE_TABLE_INIT
.get_or_try_init(|| async {
let Some(logging_client_name) = state.logging_client_name.as_ref() else {
return Err("logging client is not configured".to_string());
};
let Some(pool) = state.pg_registry.get_pool(logging_client_name) else {
return Err(format!(
"logging client '{}' is not connected",
logging_client_name
));
};
for statement in DEFERRED_QUEUE_TABLE_INIT_SQL {
sqlx::query(statement)
.execute(&pool)
.await
.map_err(|err| format!("failed to initialize deferred queue table: {err}"))?;
}
for kind in [
DEFERRED_KIND_GATEWAY_QUERY,
DEFERRED_KIND_GATEWAY_FETCH,
DEFERRED_KIND_GATEWAY_INSERT,
DEFERRED_KIND_GATEWAY_UPDATE,
DEFERRED_KIND_GATEWAY_DELETE,
] {
state
.metrics_state
.record_deferred_event(kind, "storage_ready");
}
info!(
client = %logging_client_name,
"Deferred queue table initialization completed"
);
Ok(())
})
.await
.map(|_| ())
}
pub async fn enqueue_deferred_request(
state: &AppState,
request_id: &str,
method: &str,
route: &str,
client_name: Option<&str>,
request_bytes: Option<u64>,
payload: &Value,
) -> Result<(), String> {
let deferred_kind = payload
.get("kind")
.and_then(Value::as_str)
.unwrap_or("unknown");
if !is_supported_deferred_kind(deferred_kind) {
return Err(format!(
"unsupported deferred payload kind '{}'",
deferred_kind
));
}
ensure_deferred_queue_table(state).await?;
let Some(logging_client_name) = state.logging_client_name.as_ref() else {
return Err("logging client is not configured".to_string());
};
let Some(pool) = state.pg_registry.get_pool(logging_client_name) else {
return Err(format!(
"logging client '{}' is not connected",
logging_client_name
));
};
let request_bytes_i64: Option<i64> = request_bytes.map(|value| value as i64);
let deferred_reason: Option<&str> = payload.get("reason").and_then(Value::as_str);
sqlx::query(
r#"
INSERT INTO public.gateway_deferred_request_queue (
request_id,
method,
route,
client_name,
request_bytes,
payload,
deferred_reason,
status,
created_at,
updated_at
)
VALUES ($1, $2, $3, $4, $5, $6, $7, 'queued', now(), now())
ON CONFLICT (request_id)
DO UPDATE SET
payload = EXCLUDED.payload,
deferred_reason = EXCLUDED.deferred_reason,
request_bytes = EXCLUDED.request_bytes,
updated_at = now()
"#,
)
.bind(request_id)
.bind(method)
.bind(route)
.bind(client_name)
.bind(request_bytes_i64)
.bind(payload)
.bind(deferred_reason)
.execute(&pool)
.await
.map_err(|err| format!("failed to persist deferred request: {err}"))?;
state
.metrics_state
.record_deferred_event(deferred_kind, "queued");
Ok(())
}
pub async fn enqueue_gateway_deferred_request(
state: &AppState,
method: &str,
route: &str,
request_bytes: Option<u64>,
deferred_request: &GatewayDeferredRequest,
) -> Result<(), String> {
let payload = serde_json::to_value(deferred_request)
.map_err(|err| format!("failed to serialize deferred request payload: {err}"))?;
enqueue_deferred_request(
state,
&deferred_request.request_id,
method,
route,
Some(&deferred_request.client_name),
request_bytes,
&payload,
)
.await
}
fn parse_deferred_request(payload: &Value) -> Result<GatewayDeferredRequest, String> {
let request: GatewayDeferredRequest = serde_json::from_value(payload.clone())
.map_err(|err| format!("invalid deferred payload contract: {err}"))?;
if request.kind.trim().is_empty() {
return Err("deferred payload missing kind".to_string());
}
if request.request_id.trim().is_empty() {
return Err("deferred payload missing request_id".to_string());
}
if request.client_name.trim().is_empty() {
return Err("deferred payload missing client_name".to_string());
}
Ok(request)
}
async fn claim_next_deferred_job(state: &AppState) -> Result<Option<(String, Value)>, String> {
ensure_deferred_queue_table(state).await?;
let Some(logging_client_name) = state.logging_client_name.as_ref() else {
return Ok(None);
};
let Some(pool) = state.pg_registry.get_pool(logging_client_name) else {
return Ok(None);
};
let row = sqlx::query(
r#"
WITH candidate AS (
SELECT request_id
FROM public.gateway_deferred_request_queue
WHERE status = 'queued'
AND (next_attempt_at IS NULL OR next_attempt_at <= now())
ORDER BY created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE public.gateway_deferred_request_queue q
SET status = 'running',
updated_at = now(),
attempts = q.attempts + 1,
error_message = NULL
FROM candidate
WHERE q.request_id = candidate.request_id
RETURNING q.request_id, q.payload
"#,
)
.fetch_optional(&pool)
.await
.map_err(|err| format!("failed to claim deferred job: {err}"))?;
Ok(row.map(|row| {
let request_id: String = row.try_get("request_id").unwrap_or_default();
let payload: Value = row.try_get("payload").unwrap_or_else(|_| json!({}));
(request_id, payload)
}))
}
async fn mark_deferred_job_failed(
state: &AppState,
request_id: &str,
deferred_kind: &str,
message: &str,
) -> Result<(), String> {
let Some(logging_client_name) = state.logging_client_name.as_ref() else {
return Ok(());
};
let Some(pool) = state.pg_registry.get_pool(logging_client_name) else {
return Ok(());
};
let max_attempts = max_deferred_attempts();
sqlx::query(
r#"
UPDATE public.gateway_deferred_request_queue
SET status = CASE WHEN attempts >= $3 THEN 'dead_letter' ELSE 'queued' END,
error_message = $2,
updated_at = now(),
completed_at = CASE WHEN attempts >= $3 THEN now() ELSE NULL END,
next_attempt_at = CASE
WHEN attempts >= $3 THEN NULL
ELSE now() + make_interval(secs => LEAST(300, CAST(power(2, GREATEST(0, attempts - 1)) AS int)))
END
WHERE request_id = $1
"#,
)
.bind(request_id)
.bind(message)
.bind(max_attempts)
.execute(&pool)
.await
.map_err(|err| format!("failed to mark deferred job failed: {err}"))?;
if let Ok(status_row) = sqlx::query(
r#"
SELECT status
FROM public.gateway_deferred_request_queue
WHERE request_id = $1
"#,
)
.bind(request_id)
.fetch_one(&pool)
.await
{
let status: String = status_row.try_get("status").unwrap_or_default();
if status == "dead_letter" {
state
.metrics_state
.record_deferred_event(deferred_kind, "dead_letter");
} else {
state
.metrics_state
.record_deferred_event(deferred_kind, "retry_scheduled");
}
}
Ok(())
}
async fn mark_deferred_job_completed(
state: &AppState,
request_id: &str,
deferred_kind: &str,
summary: Value,
) -> Result<(), String> {
let Some(logging_client_name) = state.logging_client_name.as_ref() else {
return Ok(());
};
let Some(pool) = state.pg_registry.get_pool(logging_client_name) else {
return Ok(());
};
sqlx::query(
r#"
UPDATE public.gateway_deferred_request_queue
SET status = 'completed',
result_summary = $2,
updated_at = now(),
completed_at = now()
WHERE request_id = $1
"#,
)
.bind(request_id)
.bind(summary)
.execute(&pool)
.await
.map_err(|err| format!("failed to mark deferred job completed: {err}"))?;
state
.metrics_state
.record_deferred_event(deferred_kind, "completed");
Ok(())
}
async fn execute_deferred_gateway_query(
state: &AppState,
request: &GatewayDeferredRequest,
) -> Result<Value, String> {
let client_name = deferred_client_name(request)?;
let query = request.query_text().unwrap_or_default().trim().to_string();
if query.is_empty() {
return Err("deferred payload missing query".to_string());
}
let Some(pool) = state.pg_registry.get_pool(&client_name) else {
return Err(format!(
"postgres client '{}' unavailable for deferred execution",
client_name
));
};
let result = execute_postgres_sql(&pool, &query)
.await
.map_err(|err| format!("deferred query execution failed: {err}"))?;
Ok(json!({
"kind": DEFERRED_KIND_GATEWAY_QUERY,
"client_name": client_name,
"statement_count": result.summary.statement_count,
"rows_affected": result.summary.rows_affected,
"returned_row_count": result.summary.returned_row_count,
"rows_preview": result.rows.into_iter().take(25).collect::<Vec<_>>()
}))
}
async fn execute_deferred_gateway_fetch(
state: &AppState,
request: &GatewayDeferredRequest,
) -> Result<Value, String> {
let client_name = deferred_client_name(request)?;
let request_body = deferred_request_body(request)?;
let request =
GatewayFetchRequest::from_body(&request_body, state.gateway_force_camel_case_to_snake_case);
if request.table_name.trim().is_empty() {
return Err("deferred fetch payload missing table_name".to_string());
}
let Some(pool) = state.pg_registry.get_pool(&client_name) else {
return Err(format!(
"postgres client '{}' unavailable for deferred execution",
client_name
));
};
let mut columns = request.columns.clone();
if columns.is_empty() {
columns.push("*".to_string());
}
let columns_refs: Vec<&str> = columns.iter().map(|column| column.as_str()).collect();
let conditions = parse_conditions_from_body(&request_body);
let pg_conditions: Vec<Condition> = conditions
.iter()
.map(|condition| {
let column_name = normalize_column_name(
&condition.eq_column,
state.gateway_force_camel_case_to_snake_case,
);
Condition::eq(column_name, condition.eq_value.clone())
.with_uuid_value_text_cast(state.gateway_auto_cast_uuid_filter_values_to_text)
})
.collect();
let limit = request.limit.or(request.page_size).unwrap_or(100);
let current_page = request.current_page.unwrap_or(1).max(1);
let page_size = request.page_size.unwrap_or(100);
let offset = request.offset.unwrap_or(0);
let calculated_offset = (current_page - 1) * page_size + offset;
let sort_options = parse_fetch_sort_options_from_body(
&request_body,
state.gateway_force_camel_case_to_snake_case,
);
let order_by = sort_options
.as_ref()
.map(|(column, ascending)| (column.as_str(), *ascending));
let rows = fetch_rows_with_columns(
&pool,
&request.table_name,
&columns_refs,
&pg_conditions,
limit,
calculated_offset,
order_by,
state.gateway_allow_schema_names_prefixed_as_table_name,
)
.await
.map_err(|err| format!("deferred fetch execution failed: {err}"))?;
Ok(json!({
"kind": DEFERRED_KIND_GATEWAY_FETCH,
"client_name": client_name,
"table_name": request.table_name,
"returned_row_count": rows.len(),
"rows_preview": rows.into_iter().take(25).collect::<Vec<_>>()
}))
}
async fn execute_deferred_gateway_insert(
state: &AppState,
request: &GatewayDeferredRequest,
) -> Result<Value, String> {
let client_name = deferred_client_name(request)?;
let request_body = deferred_request_body(request)?;
let request = GatewayInsertRequest::from_body(&request_body)
.ok_or_else(|| "deferred insert payload missing table_name or insert_body".to_string())?;
let Some(pool) = state.pg_registry.get_pool(&client_name) else {
return Err(format!(
"postgres client '{}' unavailable for deferred execution",
client_name
));
};
let inserted_row = insert_row(&pool, &request.table_name, &request.insert_body)
.await
.map_err(postgres_insert_error_message)?;
if deferred_request_should_invalidate_cache(&inserted_row) {
invalidate_deferred_gateway_cache(state, &client_name, &request.table_name).await;
}
Ok(json!({
"kind": DEFERRED_KIND_GATEWAY_INSERT,
"client_name": client_name,
"table_name": request.table_name,
"inserted_row": inserted_row
}))
}
async fn execute_deferred_gateway_update(
state: &AppState,
request: &GatewayDeferredRequest,
) -> Result<Value, String> {
let client_name = deferred_client_name(request)?;
let request_body = deferred_request_body(request)?;
let table_name = request_body
.get("table_name")
.and_then(Value::as_str)
.unwrap_or_default()
.trim()
.to_string();
if table_name.is_empty() {
return Err("deferred update payload missing table_name".to_string());
}
let set_payload =
extract_update_payload(&request_body, state.gateway_force_camel_case_to_snake_case)
.ok_or_else(|| "deferred update payload missing data/set/columns values".to_string())?;
let conditions = parse_conditions_from_body(&request_body);
if conditions.is_empty() {
return Err("deferred update requires at least one condition".to_string());
}
let pg_conditions: Vec<Condition> = conditions
.iter()
.map(|condition| {
let column_name = normalize_column_name(
&condition.eq_column,
state.gateway_force_camel_case_to_snake_case,
);
Condition::eq(column_name, condition.eq_value.clone())
.with_uuid_value_text_cast(state.gateway_auto_cast_uuid_filter_values_to_text)
})
.collect();
let Some(pool) = state.pg_registry.get_pool(&client_name) else {
return Err(format!(
"postgres client '{}' unavailable for deferred execution",
client_name
));
};
let rows = update_rows(
&pool,
&table_name,
&pg_conditions,
&Value::Object(set_payload),
)
.await
.map_err(|err| format!("deferred update execution failed: {err}"))?;
if !rows.is_empty() {
invalidate_deferred_gateway_cache(state, &client_name, &table_name).await;
}
Ok(json!({
"kind": DEFERRED_KIND_GATEWAY_UPDATE,
"client_name": client_name,
"table_name": table_name,
"updated_count": rows.len(),
"rows_preview": rows.into_iter().take(25).collect::<Vec<_>>()
}))
}
async fn execute_deferred_gateway_delete(
state: &AppState,
request: &GatewayDeferredRequest,
) -> Result<Value, String> {
let client_name = deferred_client_name(request)?;
let request_body = deferred_request_body(request)?;
let request: GatewayDeleteRequest = serde_json::from_value(request_body)
.map_err(|err| format!("invalid deferred delete payload: {err}"))?;
if request.table_name.trim().is_empty() || request.resource_id.trim().is_empty() {
return Err("deferred delete payload missing table_name or resource_id".to_string());
}
let Some(pool) = state.pg_registry.get_pool(&client_name) else {
return Err(format!(
"postgres client '{}' unavailable for deferred execution",
client_name
));
};
let id_lookup_table = normalize_table_name_for_resource_id_lookup(
&request.table_name,
state.gateway_allow_schema_names_prefixed_as_table_name,
);
let resource_id_key = get_resource_id_key(&id_lookup_table).await;
let conditions = vec![
Condition::eq(resource_id_key, request.resource_id.clone())
.with_uuid_value_text_cast(state.gateway_auto_cast_uuid_filter_values_to_text),
];
let rows = delete_rows(&pool, &request.table_name, &conditions)
.await
.map_err(|err| format!("deferred delete execution failed: {err}"))?;
if !rows.is_empty() {
invalidate_deferred_gateway_cache(state, &client_name, &request.table_name).await;
}
Ok(json!({
"kind": DEFERRED_KIND_GATEWAY_DELETE,
"client_name": client_name,
"table_name": request.table_name,
"resource_id": request.resource_id,
"deleted_count": rows.len(),
"rows_preview": rows.into_iter().take(25).collect::<Vec<_>>()
}))
}
async fn process_one_deferred_job(state: &AppState) -> Result<bool, String> {
let Some((request_id, payload)) = claim_next_deferred_job(state).await? else {
return Ok(false);
};
let deferred_request = match parse_deferred_request(&payload) {
Ok(value) => value,
Err(reason) => {
let _ = mark_deferred_job_failed(state, &request_id, "unknown", &reason).await;
return Ok(true);
}
};
if deferred_request.request_id != request_id {
let reason = format!(
"deferred payload request_id mismatch: row='{}' payload='{}'",
request_id, deferred_request.request_id
);
let _ = mark_deferred_job_failed(state, &request_id, "unknown", &reason).await;
return Ok(true);
}
let deferred_kind = deferred_request.kind.clone();
if !is_supported_deferred_kind(&deferred_kind) {
let reason = format!("unsupported deferred payload kind '{}'", deferred_kind);
let _ = mark_deferred_job_failed(state, &request_id, "unknown", &reason).await;
return Ok(true);
}
state
.metrics_state
.record_deferred_event(&deferred_kind, "running");
let execution_result = match deferred_kind.as_str() {
DEFERRED_KIND_GATEWAY_QUERY => {
execute_deferred_gateway_query(state, &deferred_request).await
}
DEFERRED_KIND_GATEWAY_FETCH => {
execute_deferred_gateway_fetch(state, &deferred_request).await
}
DEFERRED_KIND_GATEWAY_INSERT => {
execute_deferred_gateway_insert(state, &deferred_request).await
}
DEFERRED_KIND_GATEWAY_UPDATE => {
execute_deferred_gateway_update(state, &deferred_request).await
}
DEFERRED_KIND_GATEWAY_DELETE => {
execute_deferred_gateway_delete(state, &deferred_request).await
}
_ => Err(format!(
"unsupported deferred payload kind '{}'",
deferred_kind
)),
};
match execution_result {
Ok(summary) => {
let _ = mark_deferred_job_completed(state, &request_id, &deferred_kind, summary).await;
Ok(true)
}
Err(reason) => {
let _ = mark_deferred_job_failed(state, &request_id, &deferred_kind, &reason).await;
Ok(true)
}
}
}
pub async fn run_deferred_gateway_worker_once(state: &AppState) -> Result<bool, String> {
process_one_deferred_job(state).await
}
pub async fn run_deferred_query_worker_once(state: &AppState) -> Result<bool, String> {
run_deferred_gateway_worker_once(state).await
}
pub fn spawn_deferred_query_worker(
app_state: web::Data<AppState>,
enabled: bool,
poll_interval: std::time::Duration,
) {
if !enabled {
info!("Deferred query worker disabled");
return;
}
let state = app_state.clone();
tokio::spawn(async move {
info!(
poll_ms = poll_interval.as_millis() as u64,
"Deferred query worker started"
);
loop {
match run_deferred_gateway_worker_once(state.get_ref()).await {
Ok(processed) => {
if !processed {
tokio::time::sleep(poll_interval).await;
}
}
Err(err) => {
error!(error = %err, "Deferred query worker loop failed");
tokio::time::sleep(poll_interval).await;
}
}
}
});
}
#[post("/gateway/deferred/{request_id}/requeue")]
pub async fn gateway_deferred_requeue(
request_id: web::Path<String>,
app_state: web::Data<AppState>,
) -> impl Responder {
let request_id = request_id.into_inner();
if let Some(logging_client_name) = app_state.logging_client_name.as_ref()
&& let Some(pool) = app_state.pg_registry.get_pool(logging_client_name)
{
if let Err(err) = ensure_deferred_queue_table(app_state.get_ref()).await {
return service_unavailable(
"Deferred queue unavailable",
format!("Failed to prepare deferred queue storage: {err}"),
);
}
let current = sqlx::query(
r#"
SELECT status, payload
FROM public.gateway_deferred_request_queue
WHERE request_id = $1
"#,
)
.bind(&request_id)
.fetch_optional(&pool)
.await;
let Some(row) = (match current {
Ok(row) => row,
Err(err) => {
return service_unavailable(
"Deferred queue unavailable",
format!("Failed to read deferred request: {err}"),
);
}
}) else {
return not_found(
"Deferred request not found",
format!("No deferred request exists for '{}'.", request_id),
);
};
let status: String = row
.try_get("status")
.unwrap_or_else(|_| "queued".to_string());
if status == "queued" || status == "running" {
return conflict(
"Deferred request is not requeueable",
format!(
"Request '{}' is currently '{}' and cannot be requeued.",
request_id, status
),
);
}
let payload: Value = row.try_get("payload").unwrap_or_else(|_| json!({}));
let deferred_kind = payload
.get("kind")
.and_then(Value::as_str)
.unwrap_or("unknown");
if let Err(err) = sqlx::query(
r#"
UPDATE public.gateway_deferred_request_queue
SET status = 'queued',
updated_at = now(),
completed_at = NULL,
error_message = NULL,
result_summary = NULL
WHERE request_id = $1
"#,
)
.bind(&request_id)
.execute(&pool)
.await
{
return service_unavailable(
"Deferred queue unavailable",
format!("Failed to requeue deferred request: {err}"),
);
}
app_state
.metrics_state
.record_deferred_event(deferred_kind, "requeued");
return api_success(
"Deferred request requeued",
json!({
"request_id": request_id,
"status": "queued"
}),
);
}
service_unavailable(
"Deferred queue unavailable",
"Logging client is not configured or not connected",
)
}
#[get("/gateway/deferred/{request_id}")]
pub async fn gateway_deferred_status(
request_id: web::Path<String>,
app_state: web::Data<AppState>,
) -> impl Responder {
let request_id = request_id.into_inner();
if let Some(logging_client_name) = app_state.logging_client_name.as_ref()
&& let Some(pool) = app_state.pg_registry.get_pool(logging_client_name)
{
let row = sqlx::query(
r#"
SELECT
request_id,
method,
route,
client_name,
deferred_reason,
attempts,
status,
created_at,
updated_at,
completed_at,
next_attempt_at,
error_message,
result_summary
FROM public.gateway_deferred_request_queue
WHERE request_id = $1
"#,
)
.bind(&request_id)
.fetch_optional(&pool)
.await;
match row {
Ok(Some(row)) => {
let status: String = row.try_get("status").unwrap_or_else(|_| "queued".into());
let method: Option<String> = row.try_get("method").ok();
let route: Option<String> = row.try_get("route").ok();
let client_name: Option<String> = row.try_get("client_name").ok();
let deferred_reason: Option<String> = row.try_get("deferred_reason").ok();
let attempts: Option<i32> = row.try_get("attempts").ok();
let created_at: Option<chrono::DateTime<chrono::Utc>> =
row.try_get("created_at").ok();
let updated_at: Option<chrono::DateTime<chrono::Utc>> =
row.try_get("updated_at").ok();
let completed_at: Option<chrono::DateTime<chrono::Utc>> =
row.try_get("completed_at").ok();
let next_attempt_at: Option<chrono::DateTime<chrono::Utc>> =
row.try_get("next_attempt_at").ok();
let error_message: Option<String> = row.try_get("error_message").ok();
let result_summary: Option<Value> = row.try_get("result_summary").ok();
return api_success(
"Deferred request status",
json!({
"request_id": request_id,
"status": status,
"method": method,
"route": route,
"client": client_name,
"deferred_reason": deferred_reason,
"attempts": attempts,
"created_at": created_at,
"updated_at": updated_at,
"completed_at": completed_at,
"next_attempt_at": next_attempt_at,
"error_message": error_message,
"result_summary": result_summary,
"storage": "postgres"
}),
);
}
Ok(None) => {}
Err(err) => {
warn!(error = %err, request_id = %request_id, "Deferred status query failed");
return service_unavailable(
"Deferred status unavailable",
format!("failed to query deferred queue: {err}"),
);
}
}
}
not_found(
"Deferred request not found",
format!("No deferred request exists for '{}'.", request_id),
)
}