use actix_web::{Responder, get, post, web};
use serde_json::{Value, json};
use sqlx::Row;
use tokio::sync::OnceCell;
use tracing::{error, info, warn};
use crate::AppState;
use crate::api::response::{api_success, conflict, not_found, service_unavailable};
use crate::drivers::postgresql::raw_sql::execute_postgres_sql;
const DEFERRED_QUEUE_TABLE_INIT_SQL: &[&str] = &[
r#"
CREATE TABLE IF NOT EXISTS public.gateway_deferred_request_queue (
request_id text PRIMARY KEY,
method text NOT NULL,
route text NOT NULL,
client_name text,
request_bytes bigint,
payload jsonb NOT NULL,
status text NOT NULL DEFAULT 'queued',
deferred_reason text,
attempts integer NOT NULL DEFAULT 0,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now(),
completed_at timestamptz,
error_message text,
result_summary jsonb
)
"#,
r#"
CREATE INDEX IF NOT EXISTS idx_gateway_deferred_request_queue_status
ON public.gateway_deferred_request_queue(status)
"#,
r#"
CREATE INDEX IF NOT EXISTS idx_gateway_deferred_request_queue_created_at
ON public.gateway_deferred_request_queue(created_at DESC)
"#,
r#"
CREATE INDEX IF NOT EXISTS idx_gateway_deferred_request_queue_status_created
ON public.gateway_deferred_request_queue(status, created_at ASC)
"#,
r#"
ALTER TABLE public.gateway_deferred_request_queue
ADD COLUMN IF NOT EXISTS result_summary jsonb
"#,
r#"
ALTER TABLE public.gateway_deferred_request_queue
ADD COLUMN IF NOT EXISTS deferred_reason text
"#,
r#"
ALTER TABLE public.gateway_deferred_request_queue
ADD COLUMN IF NOT EXISTS attempts integer NOT NULL DEFAULT 0
"#,
];
static DEFERRED_QUEUE_TABLE_INIT: OnceCell<()> = OnceCell::const_new();
async fn ensure_deferred_queue_table(state: &AppState) -> Result<(), String> {
DEFERRED_QUEUE_TABLE_INIT
.get_or_try_init(|| async {
let Some(logging_client_name) = state.logging_client_name.as_ref() else {
return Err("logging client is not configured".to_string());
};
let Some(pool) = state.pg_registry.get_pool(logging_client_name) else {
return Err(format!(
"logging client '{}' is not connected",
logging_client_name
));
};
for statement in DEFERRED_QUEUE_TABLE_INIT_SQL {
sqlx::query(statement)
.execute(&pool)
.await
.map_err(|err| format!("failed to initialize deferred queue table: {err}"))?;
}
state
.metrics_state
.record_deferred_event("gateway_query", "storage_ready");
info!(
client = %logging_client_name,
"Deferred queue table initialization completed"
);
Ok(())
})
.await
.map(|_| ())
}
pub async fn enqueue_deferred_request(
state: &AppState,
request_id: &str,
method: &str,
route: &str,
client_name: Option<&str>,
request_bytes: Option<u64>,
payload: &Value,
) -> Result<(), String> {
ensure_deferred_queue_table(state).await?;
let Some(logging_client_name) = state.logging_client_name.as_ref() else {
return Err("logging client is not configured".to_string());
};
let Some(pool) = state.pg_registry.get_pool(logging_client_name) else {
return Err(format!(
"logging client '{}' is not connected",
logging_client_name
));
};
let request_bytes_i64: Option<i64> = request_bytes.map(|value| value as i64);
let deferred_reason: Option<&str> = payload.get("reason").and_then(Value::as_str);
sqlx::query(
r#"
INSERT INTO public.gateway_deferred_request_queue (
request_id,
method,
route,
client_name,
request_bytes,
payload,
deferred_reason,
status,
created_at,
updated_at
)
VALUES ($1, $2, $3, $4, $5, $6, $7, 'queued', now(), now())
ON CONFLICT (request_id)
DO UPDATE SET
payload = EXCLUDED.payload,
deferred_reason = EXCLUDED.deferred_reason,
request_bytes = EXCLUDED.request_bytes,
updated_at = now()
"#,
)
.bind(request_id)
.bind(method)
.bind(route)
.bind(client_name)
.bind(request_bytes_i64)
.bind(payload)
.bind(deferred_reason)
.execute(&pool)
.await
.map_err(|err| format!("failed to persist deferred request: {err}"))?;
let deferred_kind = payload
.get("kind")
.and_then(Value::as_str)
.unwrap_or("unknown");
state
.metrics_state
.record_deferred_event(deferred_kind, "queued");
Ok(())
}
async fn claim_next_deferred_query_job(
state: &AppState,
) -> Result<Option<(String, Value)>, String> {
ensure_deferred_queue_table(state).await?;
let Some(logging_client_name) = state.logging_client_name.as_ref() else {
return Ok(None);
};
let Some(pool) = state.pg_registry.get_pool(logging_client_name) else {
return Ok(None);
};
let row = sqlx::query(
r#"
WITH candidate AS (
SELECT request_id
FROM public.gateway_deferred_request_queue
WHERE status = 'queued'
AND payload->>'kind' = 'gateway_query'
ORDER BY created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE public.gateway_deferred_request_queue q
SET status = 'running',
updated_at = now(),
attempts = q.attempts + 1,
error_message = NULL
FROM candidate
WHERE q.request_id = candidate.request_id
RETURNING q.request_id, q.payload
"#,
)
.fetch_optional(&pool)
.await
.map_err(|err| format!("failed to claim deferred query job: {err}"))?;
Ok(row.map(|row| {
let request_id: String = row.try_get("request_id").unwrap_or_default();
let payload: Value = row.try_get("payload").unwrap_or_else(|_| json!({}));
(request_id, payload)
}))
}
async fn mark_deferred_job_failed(
state: &AppState,
request_id: &str,
message: &str,
) -> Result<(), String> {
let Some(logging_client_name) = state.logging_client_name.as_ref() else {
return Ok(());
};
let Some(pool) = state.pg_registry.get_pool(logging_client_name) else {
return Ok(());
};
sqlx::query(
r#"
UPDATE public.gateway_deferred_request_queue
SET status = 'failed',
error_message = $2,
updated_at = now(),
completed_at = now()
WHERE request_id = $1
"#,
)
.bind(request_id)
.bind(message)
.execute(&pool)
.await
.map_err(|err| format!("failed to mark deferred job failed: {err}"))?;
state
.metrics_state
.record_deferred_event("gateway_query", "failed");
Ok(())
}
async fn mark_deferred_job_completed(
state: &AppState,
request_id: &str,
summary: Value,
) -> Result<(), String> {
let Some(logging_client_name) = state.logging_client_name.as_ref() else {
return Ok(());
};
let Some(pool) = state.pg_registry.get_pool(logging_client_name) else {
return Ok(());
};
sqlx::query(
r#"
UPDATE public.gateway_deferred_request_queue
SET status = 'completed',
result_summary = $2,
updated_at = now(),
completed_at = now()
WHERE request_id = $1
"#,
)
.bind(request_id)
.bind(summary)
.execute(&pool)
.await
.map_err(|err| format!("failed to mark deferred job completed: {err}"))?;
state
.metrics_state
.record_deferred_event("gateway_query", "completed");
Ok(())
}
async fn process_one_deferred_query(state: &AppState) -> Result<bool, String> {
let Some((request_id, payload)) = claim_next_deferred_query_job(state).await? else {
return Ok(false);
};
let client_name = payload
.get("client_name")
.and_then(Value::as_str)
.unwrap_or_default()
.to_string();
let query = payload
.get("query")
.and_then(Value::as_str)
.unwrap_or_default()
.to_string();
state
.metrics_state
.record_deferred_event("gateway_query", "running");
if client_name.is_empty() || query.trim().is_empty() {
let reason = "deferred payload missing client_name or query";
let _ = mark_deferred_job_failed(state, &request_id, reason).await;
return Ok(true);
}
let Some(pool) = state.pg_registry.get_pool(&client_name) else {
let reason = format!(
"postgres client '{}' unavailable for deferred execution",
client_name
);
let _ = mark_deferred_job_failed(state, &request_id, &reason).await;
return Ok(true);
};
match execute_postgres_sql(&pool, &query).await {
Ok(result) => {
let summary = json!({
"statement_count": result.summary.statement_count,
"rows_affected": result.summary.rows_affected,
"returned_row_count": result.summary.returned_row_count,
"rows_preview": result.rows.into_iter().take(25).collect::<Vec<_>>()
});
let _ = mark_deferred_job_completed(state, &request_id, summary).await;
Ok(true)
}
Err(err) => {
let reason = format!("deferred query execution failed: {err}");
let _ = mark_deferred_job_failed(state, &request_id, &reason).await;
Ok(true)
}
}
}
pub async fn run_deferred_query_worker_once(state: &AppState) -> Result<bool, String> {
process_one_deferred_query(state).await
}
pub fn spawn_deferred_query_worker(
app_state: web::Data<AppState>,
enabled: bool,
poll_interval: std::time::Duration,
) {
if !enabled {
info!("Deferred query worker disabled");
return;
}
let state = app_state.clone();
tokio::spawn(async move {
info!(
poll_ms = poll_interval.as_millis() as u64,
"Deferred query worker started"
);
loop {
match run_deferred_query_worker_once(state.get_ref()).await {
Ok(processed) => {
if !processed {
tokio::time::sleep(poll_interval).await;
}
}
Err(err) => {
error!(error = %err, "Deferred query worker loop failed");
tokio::time::sleep(poll_interval).await;
}
}
}
});
}
#[post("/gateway/deferred/{request_id}/requeue")]
pub async fn gateway_deferred_requeue(
request_id: web::Path<String>,
app_state: web::Data<AppState>,
) -> impl Responder {
let request_id = request_id.into_inner();
if let Some(logging_client_name) = app_state.logging_client_name.as_ref()
&& let Some(pool) = app_state.pg_registry.get_pool(logging_client_name)
{
if let Err(err) = ensure_deferred_queue_table(app_state.get_ref()).await {
return service_unavailable(
"Deferred queue unavailable",
format!("Failed to prepare deferred queue storage: {err}"),
);
}
let current = sqlx::query(
r#"
SELECT status, payload
FROM public.gateway_deferred_request_queue
WHERE request_id = $1
"#,
)
.bind(&request_id)
.fetch_optional(&pool)
.await;
let Some(row) = (match current {
Ok(row) => row,
Err(err) => {
return service_unavailable(
"Deferred queue unavailable",
format!("Failed to read deferred request: {err}"),
);
}
}) else {
return not_found(
"Deferred request not found",
format!("No deferred request exists for '{}'.", request_id),
);
};
let status: String = row
.try_get("status")
.unwrap_or_else(|_| "queued".to_string());
if status == "queued" || status == "running" {
return conflict(
"Deferred request is not requeueable",
format!(
"Request '{}' is currently '{}' and cannot be requeued.",
request_id, status
),
);
}
let payload: Value = row.try_get("payload").unwrap_or_else(|_| json!({}));
let deferred_kind = payload
.get("kind")
.and_then(Value::as_str)
.unwrap_or("unknown");
if let Err(err) = sqlx::query(
r#"
UPDATE public.gateway_deferred_request_queue
SET status = 'queued',
updated_at = now(),
completed_at = NULL,
error_message = NULL,
result_summary = NULL
WHERE request_id = $1
"#,
)
.bind(&request_id)
.execute(&pool)
.await
{
return service_unavailable(
"Deferred queue unavailable",
format!("Failed to requeue deferred request: {err}"),
);
}
app_state
.metrics_state
.record_deferred_event(deferred_kind, "requeued");
return api_success(
"Deferred request requeued",
json!({
"request_id": request_id,
"status": "queued"
}),
);
}
service_unavailable(
"Deferred queue unavailable",
"Logging client is not configured or not connected",
)
}
#[get("/gateway/deferred/{request_id}")]
pub async fn gateway_deferred_status(
request_id: web::Path<String>,
app_state: web::Data<AppState>,
) -> impl Responder {
let request_id = request_id.into_inner();
if let Some(logging_client_name) = app_state.logging_client_name.as_ref()
&& let Some(pool) = app_state.pg_registry.get_pool(logging_client_name)
{
let row = sqlx::query(
r#"
SELECT
request_id,
method,
route,
client_name,
deferred_reason,
attempts,
status,
created_at,
updated_at,
completed_at,
error_message,
result_summary
FROM public.gateway_deferred_request_queue
WHERE request_id = $1
"#,
)
.bind(&request_id)
.fetch_optional(&pool)
.await;
match row {
Ok(Some(row)) => {
let status: String = row.try_get("status").unwrap_or_else(|_| "queued".into());
let method: Option<String> = row.try_get("method").ok();
let route: Option<String> = row.try_get("route").ok();
let client_name: Option<String> = row.try_get("client_name").ok();
let deferred_reason: Option<String> = row.try_get("deferred_reason").ok();
let attempts: Option<i32> = row.try_get("attempts").ok();
let created_at: Option<chrono::DateTime<chrono::Utc>> =
row.try_get("created_at").ok();
let updated_at: Option<chrono::DateTime<chrono::Utc>> =
row.try_get("updated_at").ok();
let completed_at: Option<chrono::DateTime<chrono::Utc>> =
row.try_get("completed_at").ok();
let error_message: Option<String> = row.try_get("error_message").ok();
let result_summary: Option<Value> = row.try_get("result_summary").ok();
return api_success(
"Deferred request status",
json!({
"request_id": request_id,
"status": status,
"method": method,
"route": route,
"client": client_name,
"deferred_reason": deferred_reason,
"attempts": attempts,
"created_at": created_at,
"updated_at": updated_at,
"completed_at": completed_at,
"error_message": error_message,
"result_summary": result_summary,
"storage": "postgres"
}),
);
}
Ok(None) => {}
Err(err) => {
warn!(error = %err, request_id = %request_id, "Deferred status query failed");
return service_unavailable(
"Deferred status unavailable",
format!("failed to query deferred queue: {err}"),
);
}
}
}
let cache_key = format!("gateway:deferred:{}", request_id);
if let Some(payload) = app_state.immortal_cache.get(&cache_key).await {
return api_success(
"Deferred request status",
json!({
"request_id": request_id,
"status": "queued",
"payload": payload,
"storage": "cache"
}),
);
}
not_found(
"Deferred request not found",
format!("No deferred request exists for '{}'.", request_id),
)
}