use actix_web::{Responder, get, post, web};
use serde_json::{Value, json};
use sqlx::Row;
use std::sync::atomic::{AtomicBool, Ordering};
use tracing::{error, info, warn};
use crate::AppState;
pub use crate::api::gateway::contracts::{
GATEWAY_DEFERRED_KIND_DELETE as DEFERRED_KIND_GATEWAY_DELETE,
GATEWAY_DEFERRED_KIND_FETCH as DEFERRED_KIND_GATEWAY_FETCH,
GATEWAY_DEFERRED_KIND_INSERT as DEFERRED_KIND_GATEWAY_INSERT,
GATEWAY_DEFERRED_KIND_QUERY as DEFERRED_KIND_GATEWAY_QUERY,
GATEWAY_DEFERRED_KIND_UPDATE as DEFERRED_KIND_GATEWAY_UPDATE,
};
use crate::api::gateway::contracts::{
GatewayDeferredRequest, GatewayDeleteRequest, GatewayFetchRequest, GatewayInsertRequest,
extract_update_payload, parse_conditions_from_body,
};
use crate::api::gateway::GatewayOperationKind;
use crate::api::gateway::update::table_id_map::get_resource_id_key;
use crate::api::provision::ensure_deferred_queue_table_initialized;
use crate::api::response::{api_success, conflict, not_found, service_unavailable};
use crate::config_validation::runtime_env_settings;
use crate::drivers::postgresql::column_resolver::resolve_information_schema_targets;
use crate::drivers::postgresql::raw_sql::{PostgresSqlExecutionResult, execute_postgres_sql};
use crate::drivers::postgresql::sqlx_driver::{
PostgresInsertError, delete_rows, fetch_rows_with_columns, insert_row, update_rows,
};
use crate::parser::query_builder::Condition;
use crate::utils::format::normalize_column_name;
const DEFAULT_MAX_DEFERRED_ATTEMPTS: i32 = 3;
fn deferred_worker_sleep_after_error(
err: &str,
poll_interval: std::time::Duration,
) -> std::time::Duration {
const POOL_EXHAUST_BACKOFF: std::time::Duration = std::time::Duration::from_secs(5);
let lower: String = err.to_ascii_lowercase();
let pool_pressure: bool = lower.contains("pool timed out")
|| lower.contains("waiting for an open connection")
|| lower.contains("forcibly closed by the remote host")
|| lower.contains("broken pipe")
|| lower.contains("connection reset");
if pool_pressure {
poll_interval.max(POOL_EXHAUST_BACKOFF)
} else {
poll_interval
}
}
fn max_deferred_attempts() -> i32 {
let attempts = runtime_env_settings().deferred_max_attempts;
attempts.max(DEFAULT_MAX_DEFERRED_ATTEMPTS)
}
fn is_supported_deferred_kind(kind: &str) -> bool {
GatewayOperationKind::from_deferred_kind(kind).is_some()
}
fn deferred_request_should_invalidate_cache(value: &Value) -> bool {
match value {
Value::Null => false,
Value::Object(map) => !map.is_empty(),
_ => true,
}
}
async fn invalidate_deferred_gateway_cache(state: &AppState, _client_name: &str, table_name: &str) {
if state.cache.entry_count() == 0 {
return;
}
let table_for_predicate = table_name.trim().to_string();
let _ = state.cache.invalidate_entries_if(move |key, _value| {
crate::api::cache::invalidation::gateway_cache_entry_matches_table_invalidation(
key.as_str(),
&table_for_predicate,
)
});
state.cache.run_pending_tasks().await;
}
fn parse_fetch_sort_options_from_body(
body: &Value,
force_snake_case: bool,
) -> Option<(String, bool)> {
let sort_object: &serde_json::Map<String, Value> = body
.get("sort_by")
.or_else(|| body.get("sortBy"))
.and_then(Value::as_object)?;
let field: &str = sort_object
.get("field")
.or_else(|| sort_object.get("column"))
.and_then(Value::as_str)?;
let column: String = normalize_column_name(field, force_snake_case);
if column.is_empty() {
return None;
}
let ascending: bool = sort_object
.get("direction")
.and_then(Value::as_str)
.map(|value| matches!(value.to_ascii_lowercase().as_str(), "asc" | "ascending"))
.unwrap_or(true);
Some((column, ascending))
}
fn normalize_table_name_for_resource_id_lookup(
table_name: &str,
allow_schema_names_prefixed_as_table_name: bool,
) -> String {
let trimmed: &str = table_name.trim();
if trimmed.is_empty() {
return String::new();
}
if !allow_schema_names_prefixed_as_table_name {
return trimmed.to_string();
}
match resolve_information_schema_targets(trimmed, true) {
Ok((schema, table)) if schema.eq_ignore_ascii_case("public") => table,
_ => trimmed.to_string(),
}
}
fn postgres_insert_error_message(err: PostgresInsertError) -> String {
match err {
PostgresInsertError::InvalidTableName => "invalid table name".to_string(),
PostgresInsertError::InvalidPayload(reason) => format!("invalid payload: {reason}"),
PostgresInsertError::InvalidJsonString { column, reason } => {
format!("invalid JSON string for JSON column '{column}': {reason}")
}
PostgresInsertError::NoValidColumns => "no valid columns provided".to_string(),
PostgresInsertError::MissingReturnColumn => {
"insert succeeded without returned row".to_string()
}
PostgresInsertError::SqlExecution { message, sql_state } => {
if let Some(sql_state) = sql_state {
format!("{message} (sql_state={sql_state})")
} else {
message
}
}
}
}
fn deferred_client_name(request: &GatewayDeferredRequest) -> Result<String, String> {
let client_name: String = request.client_name.trim().to_string();
if client_name.is_empty() {
Err("deferred payload missing client_name".to_string())
} else {
Ok(client_name)
}
}
fn deferred_request_body(request: &GatewayDeferredRequest) -> Result<Value, String> {
request
.request_body
.clone()
.ok_or_else(|| "deferred payload missing request_body".to_string())
}
static DEFERRED_WORKER_WARNED_NO_STORAGE: AtomicBool = AtomicBool::new(false);
fn deferred_worker_no_storage_poll_ms() -> u64 {
runtime_env_settings()
.deferred_worker_no_storage_poll_ms
.max(1_000)
}
pub fn deferred_logging_storage_ready(state: &AppState) -> bool {
state
.logging_client_name
.as_ref()
.is_some_and(|name| state.pg_registry.get_pool(name).is_some())
}
async fn ensure_deferred_queue_table(state: &AppState) -> Result<(), String> {
let Some(logging_client_name) = state.logging_client_name.as_ref() else {
return Err("logging client is not configured".to_string());
};
let Some(pool) = state.pg_registry.get_pool(logging_client_name) else {
return Err(format!(
"logging client '{}' is not connected",
logging_client_name
));
};
let initialized_now: bool = ensure_deferred_queue_table_initialized(&pool).await?;
if initialized_now {
for kind in [
GatewayOperationKind::Query,
GatewayOperationKind::Fetch,
GatewayOperationKind::Insert,
GatewayOperationKind::Update,
GatewayOperationKind::Delete,
] {
state
.metrics_state
.record_deferred_event(kind.deferred_kind(), "storage_ready");
}
info!(
client = %logging_client_name,
"Deferred queue table initialization completed"
);
}
Ok(())
}
pub async fn enqueue_deferred_request(
state: &AppState,
request_id: &str,
method: &str,
route: &str,
client_name: Option<&str>,
request_bytes: Option<u64>,
payload: &Value,
) -> Result<(), String> {
let deferred_kind: &str = payload
.get("kind")
.and_then(Value::as_str)
.unwrap_or("unknown");
if !is_supported_deferred_kind(deferred_kind) {
return Err(format!(
"unsupported deferred payload kind '{}'",
deferred_kind
));
}
let Some(logging_client_name) = state.logging_client_name.as_ref() else {
return Err("logging client is not configured".to_string());
};
let Some(pool) = state.pg_registry.get_pool(logging_client_name) else {
return Err(format!(
"logging client '{}' is not connected",
logging_client_name
));
};
ensure_deferred_queue_table(state).await?;
let request_bytes_i64: Option<i64> = request_bytes.map(|value| value as i64);
let deferred_reason: Option<&str> = payload.get("reason").and_then(Value::as_str);
sqlx::query(
r#"
INSERT INTO public.gateway_deferred_request_queue (
request_id,
method,
route,
client_name,
request_bytes,
payload,
deferred_reason,
status,
created_at,
updated_at
)
VALUES ($1, $2, $3, $4, $5, $6, $7, 'queued', now(), now())
ON CONFLICT (request_id)
DO UPDATE SET
payload = EXCLUDED.payload,
deferred_reason = EXCLUDED.deferred_reason,
request_bytes = EXCLUDED.request_bytes,
updated_at = now()
"#,
)
.bind(request_id)
.bind(method)
.bind(route)
.bind(client_name)
.bind(request_bytes_i64)
.bind(payload)
.bind(deferred_reason)
.execute(&pool)
.await
.map_err(|err| format!("failed to persist deferred request: {err}"))?;
state
.metrics_state
.record_deferred_event(deferred_kind, "queued");
Ok(())
}
pub async fn enqueue_gateway_deferred_request(
state: &AppState,
method: &str,
route: &str,
request_bytes: Option<u64>,
deferred_request: &GatewayDeferredRequest,
) -> Result<(), String> {
let payload: Value = serde_json::to_value(deferred_request)
.map_err(|err| format!("failed to serialize deferred request payload: {err}"))?;
enqueue_deferred_request(
state,
&deferred_request.request_id,
method,
route,
Some(&deferred_request.client_name),
request_bytes,
&payload,
)
.await
}
fn parse_deferred_request(payload: &Value) -> Result<GatewayDeferredRequest, String> {
let request: GatewayDeferredRequest = serde_json::from_value(payload.clone())
.map_err(|err| format!("invalid deferred payload contract: {err}"))?;
if request.kind.trim().is_empty() {
return Err("deferred payload missing kind".to_string());
}
if request.request_id.trim().is_empty() {
return Err("deferred payload missing request_id".to_string());
}
if request.client_name.trim().is_empty() {
return Err("deferred payload missing client_name".to_string());
}
Ok(request)
}
async fn claim_next_deferred_job(state: &AppState) -> Result<Option<(String, Value)>, String> {
let Some(logging_client_name) = state.logging_client_name.as_ref() else {
return Ok(None);
};
let Some(pool) = state.pg_registry.get_pool(logging_client_name) else {
return Ok(None);
};
ensure_deferred_queue_table(state).await?;
let row: Option<sqlx::postgres::PgRow> = sqlx::query(
r#"
WITH candidate AS (
SELECT request_id
FROM public.gateway_deferred_request_queue
WHERE status = 'queued'
AND (next_attempt_at IS NULL OR next_attempt_at <= now())
ORDER BY created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE public.gateway_deferred_request_queue q
SET status = 'running',
updated_at = now(),
attempts = q.attempts + 1,
error_message = NULL
FROM candidate
WHERE q.request_id = candidate.request_id
RETURNING q.request_id, q.payload
"#,
)
.fetch_optional(&pool)
.await
.map_err(|err| format!("failed to claim deferred job: {err}"))?;
Ok(row.map(|row| {
let request_id: String = row.try_get("request_id").unwrap_or_default();
let payload: Value = row.try_get("payload").unwrap_or_else(|_| json!({}));
(request_id, payload)
}))
}
async fn mark_deferred_job_failed(
state: &AppState,
request_id: &str,
deferred_kind: &str,
message: &str,
) -> Result<(), String> {
let Some(logging_client_name) = state.logging_client_name.as_ref() else {
return Ok(());
};
let Some(pool) = state.pg_registry.get_pool(logging_client_name) else {
return Ok(());
};
let max_attempts: i32 = max_deferred_attempts();
sqlx::query(
r#"
UPDATE public.gateway_deferred_request_queue
SET status = CASE WHEN attempts >= $3 THEN 'dead_letter' ELSE 'queued' END,
error_message = $2,
updated_at = now(),
completed_at = CASE WHEN attempts >= $3 THEN now() ELSE NULL END,
next_attempt_at = CASE
WHEN attempts >= $3 THEN NULL
ELSE now() + make_interval(secs => LEAST(300, CAST(power(2, GREATEST(0, attempts - 1)) AS int)))
END
WHERE request_id = $1
"#,
)
.bind(request_id)
.bind(message)
.bind(max_attempts)
.execute(&pool)
.await
.map_err(|err| format!("failed to mark deferred job failed: {err}"))?;
if let Ok(status_row) = sqlx::query(
r#"
SELECT status
FROM public.gateway_deferred_request_queue
WHERE request_id = $1
"#,
)
.bind(request_id)
.fetch_one(&pool)
.await
{
let status: String = status_row.try_get("status").unwrap_or_default();
if status == "dead_letter" {
state
.metrics_state
.record_deferred_event(deferred_kind, "dead_letter");
} else {
state
.metrics_state
.record_deferred_event(deferred_kind, "retry_scheduled");
}
}
Ok(())
}
async fn mark_deferred_job_completed(
state: &AppState,
request_id: &str,
deferred_kind: &str,
summary: Value,
) -> Result<(), String> {
let Some(logging_client_name) = state.logging_client_name.as_ref() else {
return Ok(());
};
let Some(pool) = state.pg_registry.get_pool(logging_client_name) else {
return Ok(());
};
sqlx::query(
r#"
UPDATE public.gateway_deferred_request_queue
SET status = 'completed',
result_summary = $2,
updated_at = now(),
completed_at = now()
WHERE request_id = $1
"#,
)
.bind(request_id)
.bind(summary)
.execute(&pool)
.await
.map_err(|err| format!("failed to mark deferred job completed: {err}"))?;
state
.metrics_state
.record_deferred_event(deferred_kind, "completed");
Ok(())
}
async fn execute_deferred_gateway_query(
state: &AppState,
request: &GatewayDeferredRequest,
) -> Result<Value, String> {
let client_name: String = deferred_client_name(request)?;
let query: String = request.query_text().unwrap_or_default().trim().to_string();
if query.is_empty() {
return Err("deferred payload missing query".to_string());
}
let Some(pool) = state.pg_registry.get_pool(&client_name) else {
return Err(format!(
"postgres client '{}' unavailable for deferred execution",
client_name
));
};
let result: PostgresSqlExecutionResult = execute_postgres_sql(&pool, &query)
.await
.map_err(|err| format!("deferred query execution failed: {err}"))?;
Ok(json!({
"kind": DEFERRED_KIND_GATEWAY_QUERY,
"client_name": client_name,
"statement_count": result.summary.statement_count,
"rows_affected": result.summary.rows_affected,
"returned_row_count": result.summary.returned_row_count,
"rows_preview": result.rows.into_iter().take(25).collect::<Vec<_>>()
}))
}
async fn execute_deferred_gateway_fetch(
state: &AppState,
request: &GatewayDeferredRequest,
) -> Result<Value, String> {
let client_name: String = deferred_client_name(request)?;
let request_body: Value = deferred_request_body(request)?;
let request: GatewayFetchRequest =
GatewayFetchRequest::from_body(&request_body, state.gateway_force_camel_case_to_snake_case);
if request.table_name.trim().is_empty() {
return Err("deferred fetch payload missing table_name".to_string());
}
let Some(pool) = state.pg_registry.get_pool(&client_name) else {
return Err(format!(
"postgres client '{}' unavailable for deferred execution",
client_name
));
};
let mut columns: Vec<String> = request.columns.clone();
if columns.is_empty() {
columns.push("*".to_string());
}
let columns_refs: Vec<&str> = columns.iter().map(|column| column.as_str()).collect();
let conditions: Vec<super::GatewayRequestCondition> = parse_conditions_from_body(&request_body);
let pg_conditions: Vec<Condition> = conditions
.iter()
.map(|condition| {
let column_name = normalize_column_name(
&condition.eq_column,
state.gateway_force_camel_case_to_snake_case,
);
Condition::eq(column_name, condition.eq_value.clone())
.with_uuid_value_text_cast(state.gateway_auto_cast_uuid_filter_values_to_text)
})
.collect();
let limit: i64 = request.limit.or(request.page_size).unwrap_or(100);
let current_page: i64 = request.current_page.unwrap_or(1).max(1);
let page_size: i64 = request.page_size.unwrap_or(100);
let offset: i64 = request.offset.unwrap_or(0);
let calculated_offset: i64 = (current_page - 1) * page_size + offset;
let sort_options: Option<(String, bool)> = parse_fetch_sort_options_from_body(
&request_body,
state.gateway_force_camel_case_to_snake_case,
);
let order_by: Option<(&str, bool)> = sort_options
.as_ref()
.map(|(column, ascending)| (column.as_str(), *ascending));
let rows: Vec<Value> = fetch_rows_with_columns(
&pool,
&request.table_name,
&columns_refs,
&pg_conditions,
limit,
calculated_offset,
order_by,
state.gateway_allow_schema_names_prefixed_as_table_name,
)
.await
.map_err(|err| format!("deferred fetch execution failed: {err}"))?;
Ok(json!({
"kind": DEFERRED_KIND_GATEWAY_FETCH,
"client_name": client_name,
"table_name": request.table_name,
"returned_row_count": rows.len(),
"rows_preview": rows.into_iter().take(25).collect::<Vec<_>>()
}))
}
async fn execute_deferred_gateway_insert(
state: &AppState,
request: &GatewayDeferredRequest,
) -> Result<Value, String> {
let client_name: String = deferred_client_name(request)?;
let request_body: Value = deferred_request_body(request)?;
let request: GatewayInsertRequest = GatewayInsertRequest::from_body(&request_body)
.ok_or_else(|| "deferred insert payload missing table_name or insert_body".to_string())?;
let Some(pool) = state.pg_registry.get_pool(&client_name) else {
return Err(format!(
"postgres client '{}' unavailable for deferred execution",
client_name
));
};
let inserted_row: Value = insert_row(&pool, &request.table_name, &request.insert_body)
.await
.map_err(postgres_insert_error_message)?;
if deferred_request_should_invalidate_cache(&inserted_row) {
invalidate_deferred_gateway_cache(state, &client_name, &request.table_name).await;
}
Ok(json!({
"kind": DEFERRED_KIND_GATEWAY_INSERT,
"client_name": client_name,
"table_name": request.table_name,
"inserted_row": inserted_row
}))
}
async fn execute_deferred_gateway_update(
state: &AppState,
request: &GatewayDeferredRequest,
) -> Result<Value, String> {
let client_name: String = deferred_client_name(request)?;
let request_body: Value = deferred_request_body(request)?;
let table_name: String = request_body
.get("table_name")
.and_then(Value::as_str)
.unwrap_or_default()
.trim()
.to_string();
if table_name.is_empty() {
return Err("deferred update payload missing table_name".to_string());
}
let set_payload: serde_json::Map<String, Value> =
extract_update_payload(&request_body, state.gateway_force_camel_case_to_snake_case)
.ok_or_else(|| "deferred update payload missing data/set/columns values".to_string())?;
let conditions: Vec<super::GatewayRequestCondition> = parse_conditions_from_body(&request_body);
if conditions.is_empty() {
return Err("deferred update requires at least one condition".to_string());
}
let pg_conditions: Vec<Condition> = conditions
.iter()
.map(|condition| {
let column_name = normalize_column_name(
&condition.eq_column,
state.gateway_force_camel_case_to_snake_case,
);
Condition::eq(column_name, condition.eq_value.clone())
.with_uuid_value_text_cast(state.gateway_auto_cast_uuid_filter_values_to_text)
})
.collect();
let Some(pool) = state.pg_registry.get_pool(&client_name) else {
return Err(format!(
"postgres client '{}' unavailable for deferred execution",
client_name
));
};
let rows: Vec<Value> = update_rows(
&pool,
&table_name,
&pg_conditions,
&Value::Object(set_payload),
)
.await
.map_err(|err| format!("deferred update execution failed: {err}"))?;
if !rows.is_empty() {
invalidate_deferred_gateway_cache(state, &client_name, &table_name).await;
}
Ok(json!({
"kind": DEFERRED_KIND_GATEWAY_UPDATE,
"client_name": client_name,
"table_name": table_name,
"updated_count": rows.len(),
"rows_preview": rows.into_iter().take(25).collect::<Vec<_>>()
}))
}
async fn execute_deferred_gateway_delete(
state: &AppState,
request: &GatewayDeferredRequest,
) -> Result<Value, String> {
let client_name: String = deferred_client_name(request)?;
let request_body: Value = deferred_request_body(request)?;
let request: GatewayDeleteRequest = serde_json::from_value(request_body)
.map_err(|err| format!("invalid deferred delete payload: {err}"))?;
if request.table_name.trim().is_empty() || request.resource_id.trim().is_empty() {
return Err("deferred delete payload missing table_name or resource_id".to_string());
}
let Some(pool) = state.pg_registry.get_pool(&client_name) else {
return Err(format!(
"postgres client '{}' unavailable for deferred execution",
client_name
));
};
let id_lookup_table: String = normalize_table_name_for_resource_id_lookup(
&request.table_name,
state.gateway_allow_schema_names_prefixed_as_table_name,
);
let resource_id_key: String = get_resource_id_key(&id_lookup_table).await;
let conditions: Vec<Condition> = vec![
Condition::eq(resource_id_key, request.resource_id.clone())
.with_uuid_value_text_cast(state.gateway_auto_cast_uuid_filter_values_to_text),
];
let rows: Vec<Value> = delete_rows(&pool, &request.table_name, &conditions)
.await
.map_err(|err| format!("deferred delete execution failed: {err}"))?;
if !rows.is_empty() {
invalidate_deferred_gateway_cache(state, &client_name, &request.table_name).await;
}
Ok(json!({
"kind": DEFERRED_KIND_GATEWAY_DELETE,
"client_name": client_name,
"table_name": request.table_name,
"resource_id": request.resource_id,
"deleted_count": rows.len(),
"rows_preview": rows.into_iter().take(25).collect::<Vec<_>>()
}))
}
async fn process_one_deferred_job(state: &AppState) -> Result<bool, String> {
let Some((request_id, payload)) = claim_next_deferred_job(state).await? else {
return Ok(false);
};
let deferred_request: GatewayDeferredRequest = match parse_deferred_request(&payload) {
Ok(value) => value,
Err(reason) => {
let _ = mark_deferred_job_failed(state, &request_id, "unknown", &reason).await;
return Ok(true);
}
};
if deferred_request.request_id != request_id {
let reason = format!(
"deferred payload request_id mismatch: row='{}' payload='{}'",
request_id, deferred_request.request_id
);
let _ = mark_deferred_job_failed(state, &request_id, "unknown", &reason).await;
return Ok(true);
}
let deferred_kind: String = deferred_request.kind.clone();
let Some(operation_kind) = GatewayOperationKind::from_deferred_kind(&deferred_kind) else {
let reason = format!("unsupported deferred payload kind '{}'", deferred_kind);
let _ = mark_deferred_job_failed(state, &request_id, "unknown", &reason).await;
return Ok(true);
};
state
.metrics_state
.record_deferred_event(operation_kind.deferred_kind(), "running");
let execution_result: Result<Value, String> = match operation_kind {
GatewayOperationKind::Query => {
execute_deferred_gateway_query(state, &deferred_request).await
}
GatewayOperationKind::Fetch => {
execute_deferred_gateway_fetch(state, &deferred_request).await
}
GatewayOperationKind::Insert => {
execute_deferred_gateway_insert(state, &deferred_request).await
}
GatewayOperationKind::Update => {
execute_deferred_gateway_update(state, &deferred_request).await
}
GatewayOperationKind::Delete => {
execute_deferred_gateway_delete(state, &deferred_request).await
}
GatewayOperationKind::Rpc => Err(format!(
"unsupported deferred payload kind '{}'",
deferred_kind
)),
};
match execution_result {
Ok(summary) => {
let _ = mark_deferred_job_completed(
state,
&request_id,
operation_kind.deferred_kind(),
summary,
)
.await;
Ok(true)
}
Err(reason) => {
let _ = mark_deferred_job_failed(
state,
&request_id,
operation_kind.deferred_kind(),
&reason,
)
.await;
Ok(true)
}
}
}
pub async fn run_deferred_gateway_worker_once(state: &AppState) -> Result<bool, String> {
process_one_deferred_job(state).await
}
pub async fn run_deferred_query_worker_once(state: &AppState) -> Result<bool, String> {
run_deferred_gateway_worker_once(state).await
}
pub fn spawn_deferred_query_worker(
app_state: web::Data<AppState>,
enabled: bool,
poll_interval: std::time::Duration,
) {
if !enabled {
info!("Deferred query worker disabled");
return;
}
let state: web::Data<AppState> = app_state.clone();
tokio::spawn(async move {
info!(
poll_ms = poll_interval.as_millis() as u64,
"Deferred query worker started"
);
let no_storage_poll: u64 = deferred_worker_no_storage_poll_ms();
loop {
let st = state.get_ref();
if st.logging_client_name.is_none() {
if !DEFERRED_WORKER_WARNED_NO_STORAGE.swap(true, Ordering::Relaxed) {
warn!(
poll_ms = no_storage_poll,
"Deferred query worker idle: gateway.logging_client is not set in the \
loaded configuration; deferred queue storage is unavailable (retrying on interval)"
);
}
tokio::time::sleep(std::time::Duration::from_millis(no_storage_poll)).await;
continue;
}
let logging_client: &str = st.logging_client_name.as_deref().expect("checked above");
if st.pg_registry.get_pool(logging_client).is_none() {
if !DEFERRED_WORKER_WARNED_NO_STORAGE.swap(true, Ordering::Relaxed) {
warn!(
client = %logging_client,
poll_ms = no_storage_poll,
"Deferred query worker idle: logging Postgres client has no connected pool; \
deferred queue storage is unavailable (check POSTGRES_* env vars, \
gateway.logging_pg_uri, and startup connection errors; retrying on interval)"
);
}
tokio::time::sleep(std::time::Duration::from_millis(no_storage_poll)).await;
continue;
}
DEFERRED_WORKER_WARNED_NO_STORAGE.store(false, Ordering::Relaxed);
match run_deferred_gateway_worker_once(st).await {
Ok(processed) => {
if !processed {
tokio::time::sleep(poll_interval).await;
}
}
Err(err) => {
error!(error = %err, "Deferred query worker loop failed");
let backoff: std::time::Duration =
deferred_worker_sleep_after_error(&err, poll_interval);
tokio::time::sleep(backoff).await;
}
}
}
});
}
#[post("/gateway/deferred/{request_id}/requeue")]
pub async fn gateway_deferred_requeue(
request_id: web::Path<String>,
app_state: web::Data<AppState>,
) -> impl Responder {
let request_id: String = request_id.into_inner();
if let Some(logging_client_name) = app_state.logging_client_name.as_ref()
&& let Some(pool) = app_state.pg_registry.get_pool(logging_client_name)
{
if let Err(err) = ensure_deferred_queue_table(app_state.get_ref()).await {
return service_unavailable(
"Deferred queue unavailable",
format!("Failed to prepare deferred queue storage: {err}"),
);
}
let current = sqlx::query(
r#"
SELECT status, payload
FROM public.gateway_deferred_request_queue
WHERE request_id = $1
"#,
)
.bind(&request_id)
.fetch_optional(&pool)
.await;
let Some(row) = (match current {
Ok(row) => row,
Err(err) => {
return service_unavailable(
"Deferred queue unavailable",
format!("Failed to read deferred request: {err}"),
);
}
}) else {
return not_found(
"Deferred request not found",
format!("No deferred request exists for '{}'.", request_id),
);
};
let status: String = row
.try_get("status")
.unwrap_or_else(|_| "queued".to_string());
if status == "queued" || status == "running" {
return conflict(
"Deferred request is not requeueable",
format!(
"Request '{}' is currently '{}' and cannot be requeued.",
request_id, status
),
);
}
let payload: Value = row.try_get("payload").unwrap_or_else(|_| json!({}));
let deferred_kind = payload
.get("kind")
.and_then(Value::as_str)
.unwrap_or("unknown");
if let Err(err) = sqlx::query(
r#"
UPDATE public.gateway_deferred_request_queue
SET status = 'queued',
updated_at = now(),
completed_at = NULL,
error_message = NULL,
result_summary = NULL
WHERE request_id = $1
"#,
)
.bind(&request_id)
.execute(&pool)
.await
{
return service_unavailable(
"Deferred queue unavailable",
format!("Failed to requeue deferred request: {err}"),
);
}
app_state
.metrics_state
.record_deferred_event(deferred_kind, "requeued");
return api_success(
"Deferred request requeued",
json!({
"request_id": request_id,
"status": "queued"
}),
);
}
service_unavailable(
"Deferred queue unavailable",
"Logging client is not configured or not connected",
)
}
#[get("/gateway/deferred/{request_id}")]
pub async fn gateway_deferred_status(
request_id: web::Path<String>,
app_state: web::Data<AppState>,
) -> impl Responder {
let request_id: String = request_id.into_inner();
if let Some(logging_client_name) = app_state.logging_client_name.as_ref()
&& let Some(pool) = app_state.pg_registry.get_pool(logging_client_name)
{
let row = sqlx::query(
r#"
SELECT
request_id,
method,
route,
client_name,
deferred_reason,
attempts,
status,
created_at,
updated_at,
completed_at,
next_attempt_at,
error_message,
result_summary
FROM public.gateway_deferred_request_queue
WHERE request_id = $1
"#,
)
.bind(&request_id)
.fetch_optional(&pool)
.await;
match row {
Ok(Some(row)) => {
let status: String = row.try_get("status").unwrap_or_else(|_| "queued".into());
let method: Option<String> = row.try_get("method").ok();
let route: Option<String> = row.try_get("route").ok();
let client_name: Option<String> = row.try_get("client_name").ok();
let deferred_reason: Option<String> = row.try_get("deferred_reason").ok();
let attempts: Option<i32> = row.try_get("attempts").ok();
let created_at: Option<chrono::DateTime<chrono::Utc>> =
row.try_get("created_at").ok();
let updated_at: Option<chrono::DateTime<chrono::Utc>> =
row.try_get("updated_at").ok();
let completed_at: Option<chrono::DateTime<chrono::Utc>> =
row.try_get("completed_at").ok();
let next_attempt_at: Option<chrono::DateTime<chrono::Utc>> =
row.try_get("next_attempt_at").ok();
let error_message: Option<String> = row.try_get("error_message").ok();
let result_summary: Option<Value> = row.try_get("result_summary").ok();
return api_success(
"Deferred request status",
json!({
"request_id": request_id,
"status": status,
"method": method,
"route": route,
"client": client_name,
"deferred_reason": deferred_reason,
"attempts": attempts,
"created_at": created_at,
"updated_at": updated_at,
"completed_at": completed_at,
"next_attempt_at": next_attempt_at,
"error_message": error_message,
"result_summary": result_summary,
"storage": "postgres"
}),
);
}
Ok(None) => {}
Err(err) => {
warn!(error = %err, request_id = %request_id, "Deferred status query failed");
return service_unavailable(
"Deferred status unavailable",
format!("failed to query deferred queue: {err}"),
);
}
}
}
not_found(
"Deferred request not found",
format!("No deferred request exists for '{}'.", request_id),
)
}
#[cfg(test)]
mod deferred_worker_backoff_tests {
use super::deferred_worker_sleep_after_error;
#[test]
fn pool_timeout_triggers_minimum_backoff() {
let poll: std::time::Duration = std::time::Duration::from_millis(100);
let backoff: std::time::Duration = deferred_worker_sleep_after_error(
"pool timed out while waiting for an open connection",
poll,
);
assert!(backoff >= std::time::Duration::from_secs(5));
}
#[test]
fn unrelated_errors_use_poll_interval() {
let poll: std::time::Duration = std::time::Duration::from_millis(250);
let backoff: std::time::Duration =
deferred_worker_sleep_after_error("syntax error at or near \"foo\"", poll);
assert_eq!(backoff, poll);
}
}