use chrono::{DateTime, NaiveDateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::types::JsonValue;
use crate::db::DbPool;
use crate::error::AppResult;
#[derive(Debug, Clone, Deserialize)]
pub struct CleanupPolicy {
#[serde(default = "default_command_days")]
pub command_retention_days: i64,
#[serde(default = "default_runtime_minutes")]
pub runtime_stale_minutes: i64,
#[serde(default)]
pub event_retention_days: i64,
}
fn default_command_days() -> i64 {
7
}
fn default_runtime_minutes() -> i64 {
60
}
impl Default for CleanupPolicy {
fn default() -> Self {
Self {
command_retention_days: default_command_days(),
runtime_stale_minutes: default_runtime_minutes(),
event_retention_days: 0,
}
}
}
#[derive(Debug, Clone, Serialize)]
pub struct CleanupResult {
pub commands_purged: u64,
pub runtime_purged: u64,
pub events_purged: u64,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub event_partitions_dropped: Vec<String>,
}
const NOETL_EPOCH_MS: i64 = 1_704_067_200_000;
const SNOWFLAKE_TS_SHIFT: i64 = 22;
async fn drop_old_event_partitions(
pool: &DbPool,
retention_days: i64,
) -> AppResult<Vec<String>> {
let cutoff: i64 = sqlx::query_scalar(
r#"
SELECT ((((extract(epoch from now()) * 1000)::bigint)
- ($1::bigint * 86400000) - $2::bigint) << $3::int)::bigint
"#,
)
.bind(retention_days)
.bind(NOETL_EPOCH_MS)
.bind(SNOWFLAKE_TS_SHIFT as i32)
.fetch_one(pool)
.await?;
let parts: Vec<(String, String)> = sqlx::query_as(
r#"
SELECT c.relname, pg_get_expr(c.relpartbound, c.oid)
FROM pg_inherits i
JOIN pg_class c ON c.oid = i.inhrelid
JOIN pg_class p ON p.oid = i.inhparent
JOIN pg_namespace n ON n.oid = p.relnamespace
WHERE n.nspname = 'noetl' AND p.relname = 'event'
AND c.relname <> 'event_default'
"#,
)
.fetch_all(pool)
.await?;
let mut dropped = Vec::new();
for (name, bound) in parts {
match parse_partition_upper(&bound) {
Some(hi) if hi <= cutoff => {
match sqlx::query(&format!("DROP TABLE IF EXISTS noetl.\"{}\"", name))
.execute(pool)
.await
{
Ok(_) => dropped.push(name),
Err(e) => tracing::warn!(
partition = %name,
error = %e,
"cleanup: could not DROP event partition — the server's \
DB role must OWN the noetl.event partitions for \
drop-based retention (ALTER TABLE ... OWNER TO <app user>)",
),
}
}
_ => {}
}
}
Ok(dropped)
}
fn parse_partition_upper(bound: &str) -> Option<i64> {
let start = bound.rfind("TO (")? + 4;
let rest = &bound[start..];
let end = rest.find(')')?;
rest[..end].trim().trim_matches('\'').parse::<i64>().ok()
}
pub async fn purge_stale(pool: &DbPool, policy: &CleanupPolicy) -> AppResult<CleanupResult> {
let commands_purged = if policy.command_retention_days > 0 {
sqlx::query(
r#"
DELETE FROM noetl.command
WHERE completed_at IS NOT NULL
AND completed_at < now() - make_interval(days => $1::int)
"#,
)
.bind(policy.command_retention_days as i32)
.execute(pool)
.await?
.rows_affected()
} else {
0
};
let runtime_purged = if policy.runtime_stale_minutes > 0 {
sqlx::query(
r#"
DELETE FROM noetl.runtime
WHERE kind = 'worker_pool'
AND heartbeat < now() - make_interval(mins => $1::int)
"#,
)
.bind(policy.runtime_stale_minutes as i32)
.execute(pool)
.await?
.rows_affected()
} else {
0
};
let event_partitions_dropped = if policy.event_retention_days > 0 {
drop_old_event_partitions(pool, policy.event_retention_days).await?
} else {
Vec::new()
};
Ok(CleanupResult {
commands_purged,
runtime_purged,
events_purged: event_partitions_dropped.len() as u64,
event_partitions_dropped,
})
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OutboxRow {
pub outbox_id: i64,
pub event_id: i64,
#[serde(skip_serializing_if = "Option::is_none")]
pub execution_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub subject: Option<String>,
pub payload: JsonValue,
#[serde(default = "default_payload_codec")]
pub payload_codec: String,
#[serde(default)]
pub attempts: i32,
}
fn default_payload_codec() -> String {
"arrow-feather".to_string()
}
pub async fn claim_batch(pool: &DbPool, limit: i64) -> AppResult<Vec<OutboxRow>> {
let limit = limit.clamp(1, 1000);
let rows = sqlx::query_as::<_, (i64, i64, Option<i64>, Option<String>, JsonValue, String, i32)>(
r#"
WITH ready AS (
SELECT outbox_id
FROM noetl.outbox
WHERE status IN ('PENDING', 'FAILED')
AND available_at <= now()
ORDER BY outbox_id
LIMIT $1
FOR UPDATE SKIP LOCKED
)
UPDATE noetl.outbox o
SET status = 'IN_FLIGHT',
attempts = attempts + 1,
locked_at = now(),
updated_at = now()
FROM ready
WHERE o.outbox_id = ready.outbox_id
RETURNING o.outbox_id, o.event_id, o.execution_id, o.subject,
o.payload, o.payload_codec, o.attempts
"#,
)
.bind(limit)
.fetch_all(pool)
.await?;
Ok(rows
.into_iter()
.map(
|(outbox_id, event_id, execution_id, subject, payload, payload_codec, attempts)| {
OutboxRow {
outbox_id,
event_id,
execution_id,
subject,
payload,
payload_codec,
attempts,
}
},
)
.collect())
}
pub async fn mark_published_batch(pool: &DbPool, outbox_ids: &[i64]) -> AppResult<i64> {
if outbox_ids.is_empty() {
return Ok(0);
}
let result = sqlx::query(
r#"
UPDATE noetl.outbox
SET status = 'PUBLISHED',
published_at = now(),
updated_at = now(),
last_error = NULL
WHERE outbox_id = ANY($1)
"#,
)
.bind(outbox_ids)
.execute(pool)
.await?;
Ok(result.rows_affected() as i64)
}
pub async fn mark_failed_row(
pool: &DbPool,
outbox_id: i64,
error: &str,
attempts: i32,
max_delay_seconds: i32,
) -> AppResult<i64> {
let clamped_exponent = (attempts - 1).clamp(0, 8);
let raw_delay: i64 = 1i64 << clamped_exponent;
let delay_seconds = raw_delay.min(max_delay_seconds as i64);
let truncated_error: String = error.chars().take(2000).collect();
sqlx::query(
r#"
UPDATE noetl.outbox
SET status = 'FAILED',
available_at = now() + ($1 || ' seconds')::interval,
last_error = $2,
updated_at = now()
WHERE outbox_id = $3
"#,
)
.bind(delay_seconds.to_string())
.bind(truncated_error)
.bind(outbox_id)
.execute(pool)
.await?;
Ok(delay_seconds)
}
pub async fn pending_count(pool: &DbPool) -> AppResult<i64> {
let row: (i64,) = sqlx::query_as(
r#"
SELECT count(*)
FROM noetl.outbox
WHERE status IN ('PENDING', 'FAILED')
AND available_at <= now()
"#,
)
.fetch_one(pool)
.await?;
Ok(row.0)
}
fn deserialize_flexible_timestamp<'de, D>(
deserializer: D,
) -> Result<Option<DateTime<Utc>>, D::Error>
where
D: serde::Deserializer<'de>,
{
let opt = Option::<String>::deserialize(deserializer)?;
let Some(raw) = opt else { return Ok(None) };
let s = raw.trim();
if s.is_empty() {
return Ok(None);
}
if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
return Ok(Some(dt.with_timezone(&Utc)));
}
const NAIVE_FORMATS: &[&str] = &[
"%Y-%m-%dT%H:%M:%S%.f",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%d %H:%M:%S%.f",
"%Y-%m-%d %H:%M:%S",
];
for fmt in NAIVE_FORMATS {
if let Ok(naive) = NaiveDateTime::parse_from_str(s, fmt) {
return Ok(Some(DateTime::from_naive_utc_and_offset(naive, Utc)));
}
}
Err(serde::de::Error::custom(format!(
"unrecognized timestamp format: {s}"
)))
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct EventEnvelope {
pub event_id: i64,
#[serde(skip_serializing_if = "Option::is_none")]
pub execution_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub parent_event_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub event_type: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub node_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub node_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub node_type: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub status: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub duration: Option<f64>,
#[serde(
default,
deserialize_with = "deserialize_flexible_timestamp",
skip_serializing_if = "Option::is_none"
)]
pub timestamp: Option<DateTime<Utc>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub context: Option<JsonValue>,
#[serde(skip_serializing_if = "Option::is_none")]
pub result: Option<JsonValue>,
#[serde(skip_serializing_if = "Option::is_none")]
pub meta: Option<JsonValue>,
#[serde(skip_serializing_if = "Option::is_none")]
pub stack_trace: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub trace_component: Option<String>,
#[serde(flatten, default)]
pub extra: std::collections::BTreeMap<String, JsonValue>,
}
pub async fn project_events(pool: &DbPool, events: &[EventEnvelope]) -> AppResult<(i64, i64)> {
if events.is_empty() {
return Ok((0, 0));
}
let payload = serde_json::to_value(events)?;
let result = sqlx::query(
r#"
INSERT INTO noetl.event (
event_id,
execution_id,
catalog_id,
parent_event_id,
event_type,
node_id,
node_name,
node_type,
status,
duration,
context,
result,
meta,
error,
stack_trace,
tenant_id,
organization_id,
created_at
)
SELECT
(row->>'event_id')::bigint,
COALESCE(NULLIF(row->>'execution_id', '')::bigint, 0),
COALESCE(NULLIF(row->>'catalog_id', '')::bigint, 0),
NULLIF(row->>'parent_event_id', '')::bigint,
row->>'event_type',
row->>'node_id',
row->>'node_name',
row->>'node_type',
row->>'status',
NULLIF(row->>'duration', '')::double precision,
NULLIF(row->'context', 'null'::jsonb),
NULLIF(row->'result', 'null'::jsonb),
NULLIF(row->'meta', 'null'::jsonb),
row->>'error',
row->>'stack_trace',
COALESCE(NULLIF(row->>'tenant_id', ''), 'default'),
COALESCE(NULLIF(row->>'organization_id', ''), 'default'),
COALESCE(NULLIF(row->>'timestamp', '')::timestamp,
NULLIF(row->>'created_at', '')::timestamp,
now())
FROM jsonb_array_elements($1::jsonb) AS row
-- noetl.event is partitioned (15 partitions); event_id alone
-- is not a partition-spanning unique constraint. Using
-- ``ON CONFLICT DO NOTHING`` without a target catches any
-- uniqueness violation across partitions — sufficient for
-- projector idempotency.
ON CONFLICT DO NOTHING
"#,
)
.bind(payload)
.execute(pool)
.await?;
let projected = result.rows_affected() as i64;
let duplicates = (events.len() as i64 - projected).max(0);
Ok((projected, duplicates))
}
#[cfg(test)]
mod tests {
use super::CleanupPolicy;
use super::EventEnvelope;
#[test]
fn envelope_accepts_timezone_less_timestamp() {
let env: EventEnvelope = serde_json::from_str(
r#"{"event_id":1,"timestamp":"2026-06-17T16:48:21.379403"}"#,
)
.expect("tz-less timestamp must deserialize (#106)");
let ts = env.timestamp.expect("timestamp parsed");
assert_eq!(ts.to_rfc3339(), "2026-06-17T16:48:21.379403+00:00");
}
#[test]
fn envelope_still_accepts_rfc3339_and_null_timestamp() {
let with_tz: EventEnvelope =
serde_json::from_str(r#"{"event_id":2,"timestamp":"2026-06-17T16:48:21Z"}"#).unwrap();
assert!(with_tz.timestamp.is_some());
let none: EventEnvelope =
serde_json::from_str(r#"{"event_id":3,"timestamp":null}"#).unwrap();
assert!(none.timestamp.is_none());
let absent: EventEnvelope = serde_json::from_str(r#"{"event_id":4}"#).unwrap();
assert!(absent.timestamp.is_none());
}
#[test]
fn events_project_request_accepts_tz_less_batch() {
use crate::handlers::internal::EventsProjectRequest;
let body = r#"{"events":[
{"event_id":10,"event_type":"call.done","timestamp":"2026-06-17T16:48:21.379403"},
{"event_id":11,"event_type":"command.completed","timestamp":"2026-06-17T16:48:22.001"}
]}"#;
let req: EventsProjectRequest = serde_json::from_str(body).expect("batch deserializes (#106)");
assert_eq!(req.events.len(), 2);
assert!(req.events[0].timestamp.is_some());
}
#[test]
fn cleanup_empty_body_uses_safe_defaults_and_skips_events() {
let p: CleanupPolicy = serde_json::from_str("{}").unwrap();
assert_eq!(p.command_retention_days, 7);
assert_eq!(p.runtime_stale_minutes, 60);
assert_eq!(p.event_retention_days, 0, "event log must be opt-in");
let d = CleanupPolicy::default();
assert_eq!(d.command_retention_days, 7);
assert_eq!(d.runtime_stale_minutes, 60);
assert_eq!(d.event_retention_days, 0);
}
#[test]
fn parse_partition_upper_extracts_range_bound() {
use super::parse_partition_upper;
assert_eq!(
parse_partition_upper("FOR VALUES FROM (264905529753600000) TO (297520437657600000)"),
Some(297520437657600000)
);
assert_eq!(
parse_partition_upper("FOR VALUES FROM (MINVALUE) TO (264905529753600000)"),
Some(264905529753600000)
);
assert_eq!(
parse_partition_upper("FOR VALUES FROM (569000000000000000) TO (MAXVALUE)"),
None
);
assert_eq!(
parse_partition_upper("FOR VALUES FROM ('1') TO ('100')"),
Some(100)
);
}
#[test]
fn cleanup_event_retention_is_explicit_opt_in() {
let p: CleanupPolicy =
serde_json::from_str(r#"{"event_retention_days": 365}"#).unwrap();
assert_eq!(p.event_retention_days, 365);
assert_eq!(p.command_retention_days, 7);
assert_eq!(p.runtime_stale_minutes, 60);
}
fn compute_delay(attempts: i32, max_delay: i32) -> i64 {
let clamped_exponent = (attempts - 1).clamp(0, 8);
let raw_delay: i64 = 1i64 << clamped_exponent;
raw_delay.min(max_delay as i64)
}
#[test]
fn backoff_attempts_1_is_1s() {
assert_eq!(compute_delay(1, 300), 1);
}
#[test]
fn backoff_attempts_2_is_2s() {
assert_eq!(compute_delay(2, 300), 2);
}
#[test]
fn backoff_attempts_5_is_16s() {
assert_eq!(compute_delay(5, 300), 16);
}
#[test]
fn backoff_attempts_9_clamps_to_max() {
assert_eq!(compute_delay(9, 300), 256);
}
#[test]
fn backoff_attempts_20_clamps_to_max() {
assert_eq!(compute_delay(20, 300), 256);
}
#[test]
fn backoff_respects_max_delay_lower_than_2_pow_8() {
assert_eq!(compute_delay(5, 10), 10);
}
#[test]
fn backoff_attempts_0_is_1s() {
assert_eq!(compute_delay(0, 300), 1);
}
}