use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::types::JsonValue;
use crate::db::DbPool;
use crate::error::AppResult;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OutboxRow {
pub outbox_id: i64,
pub event_id: i64,
#[serde(skip_serializing_if = "Option::is_none")]
pub execution_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub subject: Option<String>,
pub payload: JsonValue,
#[serde(default = "default_payload_codec")]
pub payload_codec: String,
#[serde(default)]
pub attempts: i32,
}
fn default_payload_codec() -> String {
"arrow-feather".to_string()
}
pub async fn claim_batch(pool: &DbPool, limit: i64) -> AppResult<Vec<OutboxRow>> {
let limit = limit.clamp(1, 1000);
let rows = sqlx::query_as::<_, (i64, i64, Option<i64>, Option<String>, JsonValue, String, i32)>(
r#"
WITH ready AS (
SELECT outbox_id
FROM noetl.outbox
WHERE status IN ('PENDING', 'FAILED')
AND available_at <= now()
ORDER BY outbox_id
LIMIT $1
FOR UPDATE SKIP LOCKED
)
UPDATE noetl.outbox o
SET status = 'IN_FLIGHT',
attempts = attempts + 1,
locked_at = now(),
updated_at = now()
FROM ready
WHERE o.outbox_id = ready.outbox_id
RETURNING o.outbox_id, o.event_id, o.execution_id, o.subject,
o.payload, o.payload_codec, o.attempts
"#,
)
.bind(limit)
.fetch_all(pool)
.await?;
Ok(rows
.into_iter()
.map(
|(outbox_id, event_id, execution_id, subject, payload, payload_codec, attempts)| {
OutboxRow {
outbox_id,
event_id,
execution_id,
subject,
payload,
payload_codec,
attempts,
}
},
)
.collect())
}
pub async fn mark_published_batch(pool: &DbPool, outbox_ids: &[i64]) -> AppResult<i64> {
if outbox_ids.is_empty() {
return Ok(0);
}
let result = sqlx::query(
r#"
UPDATE noetl.outbox
SET status = 'PUBLISHED',
published_at = now(),
updated_at = now(),
last_error = NULL
WHERE outbox_id = ANY($1)
"#,
)
.bind(outbox_ids)
.execute(pool)
.await?;
Ok(result.rows_affected() as i64)
}
pub async fn mark_failed_row(
pool: &DbPool,
outbox_id: i64,
error: &str,
attempts: i32,
max_delay_seconds: i32,
) -> AppResult<i64> {
let clamped_exponent = (attempts - 1).clamp(0, 8);
let raw_delay: i64 = 1i64 << clamped_exponent;
let delay_seconds = raw_delay.min(max_delay_seconds as i64);
let truncated_error: String = error.chars().take(2000).collect();
sqlx::query(
r#"
UPDATE noetl.outbox
SET status = 'FAILED',
available_at = now() + ($1 || ' seconds')::interval,
last_error = $2,
updated_at = now()
WHERE outbox_id = $3
"#,
)
.bind(delay_seconds.to_string())
.bind(truncated_error)
.bind(outbox_id)
.execute(pool)
.await?;
Ok(delay_seconds)
}
pub async fn pending_count(pool: &DbPool) -> AppResult<i64> {
let row: (i64,) = sqlx::query_as(
r#"
SELECT count(*)
FROM noetl.outbox
WHERE status IN ('PENDING', 'FAILED')
AND available_at <= now()
"#,
)
.fetch_one(pool)
.await?;
Ok(row.0)
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct EventEnvelope {
pub event_id: i64,
#[serde(skip_serializing_if = "Option::is_none")]
pub execution_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub parent_event_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub event_type: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub node_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub node_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub node_type: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub status: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub duration: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub timestamp: Option<DateTime<Utc>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub context: Option<JsonValue>,
#[serde(skip_serializing_if = "Option::is_none")]
pub result: Option<JsonValue>,
#[serde(skip_serializing_if = "Option::is_none")]
pub meta: Option<JsonValue>,
#[serde(skip_serializing_if = "Option::is_none")]
pub stack_trace: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub trace_component: Option<String>,
#[serde(flatten, default)]
pub extra: std::collections::BTreeMap<String, JsonValue>,
}
pub async fn project_events(pool: &DbPool, events: &[EventEnvelope]) -> AppResult<(i64, i64)> {
if events.is_empty() {
return Ok((0, 0));
}
let payload = serde_json::to_value(events)?;
let result = sqlx::query(
r#"
INSERT INTO noetl.event (
event_id,
execution_id,
catalog_id,
parent_event_id,
event_type,
node_id,
node_name,
node_type,
status,
duration,
context,
result,
meta,
error,
stack_trace,
tenant_id,
organization_id,
created_at
)
SELECT
(row->>'event_id')::bigint,
COALESCE(NULLIF(row->>'execution_id', '')::bigint, 0),
COALESCE(NULLIF(row->>'catalog_id', '')::bigint, 0),
NULLIF(row->>'parent_event_id', '')::bigint,
row->>'event_type',
row->>'node_id',
row->>'node_name',
row->>'node_type',
row->>'status',
NULLIF(row->>'duration', '')::double precision,
NULLIF(row->'context', 'null'::jsonb),
NULLIF(row->'result', 'null'::jsonb),
NULLIF(row->'meta', 'null'::jsonb),
row->>'error',
row->>'stack_trace',
COALESCE(NULLIF(row->>'tenant_id', ''), 'default'),
COALESCE(NULLIF(row->>'organization_id', ''), 'default'),
COALESCE(NULLIF(row->>'timestamp', '')::timestamp,
NULLIF(row->>'created_at', '')::timestamp,
now())
FROM jsonb_array_elements($1::jsonb) AS row
-- noetl.event is partitioned (15 partitions); event_id alone
-- is not a partition-spanning unique constraint. Using
-- ``ON CONFLICT DO NOTHING`` without a target catches any
-- uniqueness violation across partitions — sufficient for
-- projector idempotency.
ON CONFLICT DO NOTHING
"#,
)
.bind(payload)
.execute(pool)
.await?;
let projected = result.rows_affected() as i64;
let duplicates = (events.len() as i64 - projected).max(0);
Ok((projected, duplicates))
}
#[cfg(test)]
mod tests {
fn compute_delay(attempts: i32, max_delay: i32) -> i64 {
let clamped_exponent = (attempts - 1).clamp(0, 8);
let raw_delay: i64 = 1i64 << clamped_exponent;
raw_delay.min(max_delay as i64)
}
#[test]
fn backoff_attempts_1_is_1s() {
assert_eq!(compute_delay(1, 300), 1);
}
#[test]
fn backoff_attempts_2_is_2s() {
assert_eq!(compute_delay(2, 300), 2);
}
#[test]
fn backoff_attempts_5_is_16s() {
assert_eq!(compute_delay(5, 300), 16);
}
#[test]
fn backoff_attempts_9_clamps_to_max() {
assert_eq!(compute_delay(9, 300), 256);
}
#[test]
fn backoff_attempts_20_clamps_to_max() {
assert_eq!(compute_delay(20, 300), 256);
}
#[test]
fn backoff_respects_max_delay_lower_than_2_pow_8() {
assert_eq!(compute_delay(5, 10), 10);
}
#[test]
fn backoff_attempts_0_is_1s() {
assert_eq!(compute_delay(0, 300), 1);
}
}