use axum::{
extract::{Path, State},
http::StatusCode,
Json,
};
use serde::{Deserialize, Serialize};
use crate::error::AppError;
use crate::handlers::internal::RequireInternalApiToken;
use crate::state::AppState;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum TerminalState {
Succeeded,
Failed,
FailedImagePull,
FailedOom,
FailedNodeLost,
FailedTimeout,
}
impl TerminalState {
pub fn as_str(&self) -> &'static str {
match self {
Self::Succeeded => "succeeded",
Self::Failed => "failed",
Self::FailedImagePull => "failed_image_pull",
Self::FailedOom => "failed_oom",
Self::FailedNodeLost => "failed_node_lost",
Self::FailedTimeout => "failed_timeout",
}
}
fn is_success(&self) -> bool {
matches!(self, Self::Succeeded)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ContainerCallbackRequest {
pub state: TerminalState,
pub job_name: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub job_uid: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub completed_at: Option<chrono::DateTime<chrono::Utc>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub exit_code: Option<i32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub stdout_uri: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub stderr_uri: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ContainerCallbackResponse {
pub status: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub event_id: Option<String>,
}
#[tracing::instrument(
skip(state, _token, request),
fields(
execution_id = %execution_id_raw,
step = %step,
state = ?request.state,
job_name = %request.job_name,
),
)]
pub async fn container_callback(
State(state): State<AppState>,
_token: RequireInternalApiToken,
Path((execution_id_raw, step)): Path<(String, String)>,
Json(request): Json<ContainerCallbackRequest>,
) -> Result<(StatusCode, Json<ContainerCallbackResponse>), AppError> {
let execution_id: i64 = execution_id_raw.parse().map_err(|_| {
AppError::Validation(format!(
"container-callback: execution_id '{execution_id_raw}' is not parseable as i64"
))
})?;
if step.trim().is_empty() {
return Err(AppError::Validation(
"container-callback: step path param is empty".to_string(),
));
}
let completed_at = request.completed_at.unwrap_or_else(chrono::Utc::now);
let pool = state.pools.pool_for(execution_id);
let row: Option<(i64,)> = sqlx::query_as(
"SELECT 1::bigint FROM noetl.event WHERE execution_id = $1 LIMIT 1",
)
.bind(execution_id)
.fetch_optional(pool)
.await?;
if row.is_none() {
tracing::info!(
execution_id,
step = %step,
state = request.state.as_str(),
job_name = %request.job_name,
"container-callback: stale — no events for execution_id; not emitting call.done"
);
crate::metrics::record_container_callback_stale(request.state.as_str());
return Ok((
StatusCode::ACCEPTED,
Json(ContainerCallbackResponse {
status: "accepted_stale".to_string(),
event_id: None,
}),
));
}
let event_id = state.snowflake.generate().map_err(|e| {
AppError::Internal(format!("container-callback: snowflake generate failed: {e}"))
})?;
let terminal_context = serde_json::json!({
"terminal_state": request.state.as_str(),
"job_name": request.job_name,
"job_uid": request.job_uid,
"completed_at": completed_at,
"exit_code": request.exit_code,
"reason": request.reason,
"stdout_uri": request.stdout_uri,
"stderr_uri": request.stderr_uri,
});
let status_label = if request.state.is_success() {
"COMPLETED"
} else {
"FAILED"
};
let result_obj = serde_json::json!({
"status": status_label,
"context": terminal_context,
});
let catalog_id: i64 = sqlx::query_as::<_, (i64,)>(
"SELECT catalog_id FROM noetl.event WHERE execution_id = $1 \
AND event_type IN ('playbook.initialized', 'playbook_started') \
LIMIT 1",
)
.bind(execution_id)
.fetch_optional(pool)
.await?
.map(|(c,)| c)
.unwrap_or(0);
sqlx::query(
r#"
INSERT INTO noetl.event (
event_id, execution_id, catalog_id, event_type,
node_id, node_name, status, result, meta, created_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
"#,
)
.bind(event_id)
.bind(execution_id)
.bind(catalog_id)
.bind("call.done")
.bind(&step)
.bind(&step)
.bind(status_label)
.bind(&result_obj)
.bind(serde_json::json!({ "node_type": "container" }))
.bind(chrono::Utc::now())
.execute(pool)
.await?;
crate::metrics::record_container_callback(request.state.as_str());
tracing::info!(
execution_id,
step = %step,
state = request.state.as_str(),
event_id,
"container-callback: emitted call.done"
);
Ok((
StatusCode::ACCEPTED,
Json(ContainerCallbackResponse {
status: "accepted_in_flight".to_string(),
event_id: Some(event_id.to_string()),
}),
))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn terminal_state_as_str_round_trip() {
let cases = [
(TerminalState::Succeeded, "succeeded"),
(TerminalState::Failed, "failed"),
(TerminalState::FailedImagePull, "failed_image_pull"),
(TerminalState::FailedOom, "failed_oom"),
(TerminalState::FailedNodeLost, "failed_node_lost"),
(TerminalState::FailedTimeout, "failed_timeout"),
];
for (state, label) in cases {
assert_eq!(state.as_str(), label);
}
}
#[test]
fn terminal_state_is_success_only_succeeded() {
assert!(TerminalState::Succeeded.is_success());
for st in [
TerminalState::Failed,
TerminalState::FailedImagePull,
TerminalState::FailedOom,
TerminalState::FailedNodeLost,
TerminalState::FailedTimeout,
] {
assert!(!st.is_success(), "{:?} should not count as success", st);
}
}
#[test]
fn request_deserialises_minimal_body() {
let raw = r#"{
"state": "succeeded",
"job_name": "noetl-container-step1-abcd-xyz"
}"#;
let parsed: ContainerCallbackRequest = serde_json::from_str(raw).unwrap();
assert_eq!(parsed.state, TerminalState::Succeeded);
assert_eq!(parsed.job_name, "noetl-container-step1-abcd-xyz");
assert!(parsed.job_uid.is_none());
assert!(parsed.completed_at.is_none());
assert!(parsed.exit_code.is_none());
}
#[test]
fn request_deserialises_full_body() {
let raw = r#"{
"state": "failed_oom",
"job_name": "noetl-container-train-42-q1",
"job_uid": "01234567-89ab-cdef-0123-456789abcdef",
"completed_at": "2026-06-07T04:00:00Z",
"exit_code": 137,
"reason": "Memory limit exceeded (256Mi)",
"stdout_uri": "noetl://execution/42/result/train/1/stdout",
"stderr_uri": "noetl://execution/42/result/train/1/stderr"
}"#;
let parsed: ContainerCallbackRequest = serde_json::from_str(raw).unwrap();
assert_eq!(parsed.state, TerminalState::FailedOom);
assert_eq!(parsed.exit_code, Some(137));
assert_eq!(parsed.reason.as_deref(), Some("Memory limit exceeded (256Mi)"));
assert_eq!(
parsed.completed_at,
Some(
chrono::DateTime::parse_from_rfc3339("2026-06-07T04:00:00Z")
.unwrap()
.with_timezone(&chrono::Utc)
)
);
}
#[test]
fn request_rejects_unknown_state() {
let raw = r#"{
"state": "in_progress",
"job_name": "j"
}"#;
let err = serde_json::from_str::<ContainerCallbackRequest>(raw).err();
assert!(err.is_some(), "unknown state should fail deserialisation");
}
#[test]
fn response_serialises_in_flight_with_event_id() {
let r = ContainerCallbackResponse {
status: "accepted_in_flight".to_string(),
event_id: Some("1234567890".to_string()),
};
let body = serde_json::to_value(&r).unwrap();
assert_eq!(body.get("status").and_then(|v| v.as_str()), Some("accepted_in_flight"));
assert_eq!(body.get("event_id").and_then(|v| v.as_str()), Some("1234567890"));
}
#[test]
fn response_serialises_stale_without_event_id() {
let r = ContainerCallbackResponse {
status: "accepted_stale".to_string(),
event_id: None,
};
let body = serde_json::to_value(&r).unwrap();
assert_eq!(body.get("status").and_then(|v| v.as_str()), Some("accepted_stale"));
assert!(body.get("event_id").is_none());
}
}