use axum::{
extract::{Path, State},
Json,
};
use serde::{Deserialize, Serialize};
use sqlx::Row;
use tracing::{debug, info, warn};
use crate::error::{AppError, AppResult};
use crate::sanitize::sanitize_sensitive_data;
use crate::state::AppState;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EventRequest {
pub execution_id: String,
pub step: String,
#[serde(alias = "name")]
pub event_type: String,
#[serde(default, alias = "context")]
pub payload: serde_json::Value,
#[serde(skip_serializing_if = "Option::is_none")]
pub meta: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub worker_id: Option<String>,
#[serde(default = "default_result_kind")]
pub result_kind: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub result_uri: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub event_ids: Option<Vec<i64>>,
#[serde(default = "default_true")]
pub actionable: bool,
#[serde(default = "default_true")]
pub informative: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub event_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub status: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub created_at: Option<chrono::DateTime<chrono::Utc>>,
}
fn default_result_kind() -> String {
"data".to_string()
}
fn default_true() -> bool {
true
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EventResponse {
pub status: String,
pub event_id: i64,
pub commands_generated: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClaimRequest {
pub worker_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClaimResponse {
pub status: String,
pub event_id: i64,
pub execution_id: i64,
pub node_id: String,
pub node_name: String,
pub action: String,
pub context: serde_json::Value,
pub meta: serde_json::Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BatchEventItem {
pub step: String,
#[serde(alias = "name")]
pub event_type: String,
#[serde(default, alias = "context")]
pub payload: serde_json::Value,
#[serde(default)]
pub actionable: bool,
#[serde(default = "default_true")]
pub informative: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BatchEventRequest {
pub execution_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub worker_id: Option<String>,
pub events: Vec<BatchEventItem>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BatchEventResponse {
pub status: String,
pub event_ids: Vec<i64>,
pub commands_generated: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CommandResponse {
pub execution_id: i64,
pub node_id: String,
pub node_name: String,
pub action: String,
pub context: serde_json::Value,
pub meta: serde_json::Value,
}
pub async fn handle_event(
state: State<AppState>,
request: Json<EventRequest>,
) -> Result<Json<EventResponse>, AppError> {
let event_type_for_metrics = request.0.event_type.clone();
let started_at = std::time::Instant::now();
let result = handle_event_inner(state, request).await;
let status_label = if result.is_ok() { "ok" } else { "error" };
let duration_seconds = started_at.elapsed().as_secs_f64();
crate::metrics::record_event_ingest(
&event_type_for_metrics,
status_label,
duration_seconds,
);
result
}
async fn handle_event_inner(
State(state): State<AppState>,
Json(request): Json<EventRequest>,
) -> Result<Json<EventResponse>, AppError> {
debug!(
"Event received: execution_id={}, step={}, event_type={}",
request.execution_id, request.step, request.event_type
);
let execution_id: i64 = request
.execution_id
.parse()
.map_err(|_| AppError::Validation("Invalid execution_id".to_string()))?;
let skip_engine_events = [
"command.claimed",
"command.started",
"command.completed",
"command.failed",
"step.enter",
];
if request.event_type == "command.claimed" {
if let Some(command_id) = get_command_id(&request) {
if check_already_claimed(&state, execution_id, &command_id, &request.worker_id).await? {
return Ok(Json(EventResponse {
status: "ok".to_string(),
event_id: 0,
commands_generated: 0,
}));
}
}
}
let derived_status: String = request
.status
.clone()
.unwrap_or_else(|| event_status_from_name(&request.event_type).to_string());
let result_obj_raw = build_result_object(&request, &derived_status);
let result_obj = sanitize_sensitive_data(&result_obj_raw);
let event_id: i64 = match request.event_id.as_deref() {
Some(raw) => raw.parse().map_err(|_| {
AppError::Validation(format!("Invalid event_id: {raw}"))
})?,
None => generate_snowflake_id(&state).await?,
};
let catalog_id = get_catalog_id(&state, execution_id).await?;
let mut meta_obj = request.meta.clone().unwrap_or(serde_json::json!({}));
if let serde_json::Value::Object(ref mut map) = meta_obj {
map.insert(
"actionable".to_string(),
serde_json::json!(request.actionable),
);
map.insert(
"informative".to_string(),
serde_json::json!(request.informative),
);
if let Some(ref worker_id) = request.worker_id {
map.insert("worker_id".to_string(), serde_json::json!(worker_id));
}
}
let meta_obj = sanitize_sensitive_data(&meta_obj);
let created_at = request.created_at.unwrap_or_else(chrono::Utc::now);
sqlx::query(
r#"
INSERT INTO noetl.event (
event_id, execution_id, catalog_id, event_type,
node_id, node_name, status, result, meta, created_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
"#,
)
.bind(event_id)
.bind(execution_id)
.bind(catalog_id)
.bind(&request.event_type)
.bind(&request.step)
.bind(&request.step)
.bind(&derived_status)
.bind(&result_obj)
.bind(&meta_obj)
.bind(created_at)
.execute(&state.db)
.await?;
info!(
"Event persisted: event_id={}, execution_id={}, event_type={}",
event_id, execution_id, request.event_type
);
let commands_generated = if !skip_engine_events.contains(&request.event_type.as_str()) {
debug!("Would process through engine: event_type={}", request.event_type);
0
} else {
debug!("Skipped engine for administrative event: {}", request.event_type);
0
};
if request.event_type == "command.completed" && request.step.to_lowercase() != "end" {
match trigger_orchestrator(&state, execution_id, event_id).await {
Ok(cmds) => {
info!(
"Orchestrator generated {} commands for execution {}",
cmds, execution_id
);
}
Err(e) => {
warn!("Orchestrator error: {}", e);
}
}
}
Ok(Json(EventResponse {
status: "ok".to_string(),
event_id,
commands_generated,
}))
}
pub async fn get_command(
State(state): State<AppState>,
Path(event_id): Path<i64>,
) -> Result<Json<CommandResponse>, AppError> {
debug!("Getting command for event_id={}", event_id);
let row: Option<(i64, String, String, serde_json::Value, serde_json::Value)> =
sqlx::query_as::<_, (i64, String, String, serde_json::Value, serde_json::Value)>(
r#"
SELECT execution_id, node_name, node_type, context, meta
FROM noetl.event
WHERE event_id = $1 AND event_type = 'command.issued'
"#,
)
.bind(event_id)
.fetch_optional(&state.db)
.await?;
match row {
Some((execution_id, node_name, node_type, context, meta)) => Ok(Json(CommandResponse {
execution_id,
node_id: node_name.clone(),
node_name,
action: node_type,
context,
meta,
})),
None => Err(AppError::NotFound(format!(
"command.issued event not found: {}",
event_id
))),
}
}
pub async fn claim_command(
State(state): State<AppState>,
Path(event_id): Path<i64>,
Json(request): Json<ClaimRequest>,
) -> Result<Json<ClaimResponse>, AppError> {
debug!(
"Claim request received: event_id={}, worker_id={}",
event_id, request.worker_id
);
let mut tx = state.db.begin().await?;
let cmd_row = sqlx::query(
r#"
SELECT execution_id, catalog_id, node_name, node_type, context, meta
FROM noetl.event
WHERE event_id = $1 AND event_type = 'command.issued'
"#,
)
.bind(event_id)
.fetch_optional(&mut *tx)
.await?;
let Some(row) = cmd_row else {
return Err(AppError::NotFound(format!(
"command.issued event not found: {}",
event_id
)));
};
let execution_id: i64 = row.try_get("execution_id")?;
let catalog_id: Option<i64> = row.try_get("catalog_id")?;
let step: String = row.try_get("node_name")?;
let tool_kind: String = row.try_get("node_type")?;
let context: serde_json::Value = row
.try_get("context")
.unwrap_or_else(|_| serde_json::json!({}));
let meta: serde_json::Value = row
.try_get("meta")
.unwrap_or_else(|_| serde_json::json!({}));
let command_id = meta
.get("command_id")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.unwrap_or_else(|| format!("{}:{}:{}", execution_id, step, event_id));
let terminal_row = sqlx::query(
r#"
SELECT event_type
FROM noetl.event
WHERE execution_id = $1
AND event_type IN ('command.completed', 'command.failed')
AND (meta->>'command_id' = $2 OR result->'data'->>'command_id' = $2)
ORDER BY event_id DESC
LIMIT 1
"#,
)
.bind(execution_id)
.bind(&command_id)
.fetch_optional(&mut *tx)
.await?;
if terminal_row.is_some() {
return Err(AppError::Conflict(
"Command already reached terminal state".to_string(),
));
}
let cancelled_row = sqlx::query(
r#"
SELECT 1
FROM noetl.event
WHERE execution_id = $1
AND event_type = 'execution.cancelled'
LIMIT 1
"#,
)
.bind(execution_id)
.fetch_optional(&mut *tx)
.await?;
if cancelled_row.is_some() {
return Err(AppError::Conflict(
"Execution has been cancelled".to_string(),
));
}
let lock_row =
sqlx::query("SELECT pg_try_advisory_xact_lock(hashtext($1)::bigint) AS lock_acquired")
.bind(&command_id)
.fetch_one(&mut *tx)
.await?;
let lock_acquired: bool = lock_row.try_get("lock_acquired")?;
if !lock_acquired {
return Err(AppError::Conflict(
"Command is being claimed by another worker".to_string(),
));
}
let existing_claim = sqlx::query(
r#"
SELECT worker_id, meta
FROM noetl.event
WHERE execution_id = $1
AND event_type = 'command.claimed'
AND (meta->>'command_id' = $2 OR result->'data'->>'command_id' = $2)
ORDER BY event_id DESC
LIMIT 1
"#,
)
.bind(execution_id)
.bind(&command_id)
.fetch_optional(&mut *tx)
.await?;
if let Some(existing) = existing_claim {
let worker_id_db: Option<String> = existing.try_get("worker_id").ok();
let worker_id_meta = existing
.try_get::<serde_json::Value, _>("meta")
.ok()
.and_then(|value| {
value
.get("worker_id")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
});
let existing_worker = worker_id_db.or(worker_id_meta);
if let Some(existing_worker_id) = existing_worker {
if existing_worker_id != request.worker_id {
return Err(AppError::Conflict(format!(
"Command already claimed by {}",
existing_worker_id
)));
}
tx.commit().await?;
return Ok(Json(ClaimResponse {
status: "ok".to_string(),
event_id,
execution_id,
node_id: step.clone(),
node_name: step,
action: tool_kind,
context,
meta,
}));
}
}
let claim_event_id = generate_snowflake_id_with_tx(&mut tx).await?;
let claim_result = serde_json::json!({
"status": "RUNNING",
"context": {
"command_id": command_id,
"worker_id": request.worker_id,
}
});
let claim_meta = serde_json::json!({
"command_id": command_id,
"worker_id": request.worker_id,
"actionable": false,
"informative": true,
});
sqlx::query(
r#"
INSERT INTO noetl.event (
event_id, execution_id, catalog_id, event_type,
node_id, node_name, status, result, meta, worker_id, created_at
) VALUES (
$1, $2, $3, $4,
$5, $6, $7, $8, $9, $10, $11
)
"#,
)
.bind(claim_event_id)
.bind(execution_id)
.bind(catalog_id)
.bind("command.claimed")
.bind(&step)
.bind(&step)
.bind("RUNNING")
.bind(claim_result)
.bind(claim_meta)
.bind(&request.worker_id)
.bind(chrono::Utc::now())
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(Json(ClaimResponse {
status: "ok".to_string(),
event_id,
execution_id,
node_id: step.clone(),
node_name: step,
action: tool_kind,
context,
meta,
}))
}
pub async fn handle_batch_events(
State(state): State<AppState>,
Json(request): Json<BatchEventRequest>,
) -> Result<Json<BatchEventResponse>, AppError> {
if request.events.is_empty() {
return Ok(Json(BatchEventResponse {
status: "ok".to_string(),
event_ids: Vec::new(),
commands_generated: 0,
}));
}
let execution_id: i64 = request
.execution_id
.parse()
.map_err(|_| AppError::Validation("Invalid execution_id".to_string()))?;
let catalog_id = get_catalog_id(&state, execution_id).await?;
let mut tx = state.db.begin().await?;
let mut event_ids = Vec::with_capacity(request.events.len());
for item in &request.events {
let event_id = generate_snowflake_id_with_tx(&mut tx).await?;
let status = event_status_from_name(&item.event_type);
let mut result_map = serde_json::Map::new();
result_map.insert(
"status".to_string(),
serde_json::Value::String(status.to_string()),
);
if let serde_json::Value::Object(_) = item.payload {
result_map.insert("context".to_string(), item.payload.clone());
}
let result_obj_raw = serde_json::Value::Object(result_map);
let result_obj = sanitize_sensitive_data(&result_obj_raw);
let mut meta_obj = serde_json::json!({
"actionable": item.actionable,
"informative": item.informative,
});
if let Some(worker_id) = &request.worker_id {
if let serde_json::Value::Object(ref mut map) = meta_obj {
map.insert("worker_id".to_string(), serde_json::json!(worker_id));
}
}
let meta_obj = sanitize_sensitive_data(&meta_obj);
sqlx::query(
r#"
INSERT INTO noetl.event (
event_id, execution_id, catalog_id, event_type,
node_id, node_name, status, result, meta, worker_id, created_at
) VALUES (
$1, $2, $3, $4,
$5, $6, $7, $8, $9, $10, $11
)
"#,
)
.bind(event_id)
.bind(execution_id)
.bind(catalog_id)
.bind(&item.event_type)
.bind(&item.step)
.bind(&item.step)
.bind(status)
.bind(result_obj)
.bind(meta_obj)
.bind(&request.worker_id)
.bind(chrono::Utc::now())
.execute(&mut *tx)
.await?;
event_ids.push(event_id);
}
tx.commit().await?;
for (idx, item) in request.events.iter().enumerate() {
if item.event_type == "command.completed" && item.step.to_lowercase() != "end" {
let trigger_event_id = event_ids[idx];
match trigger_orchestrator(&state, execution_id, trigger_event_id).await {
Ok(cmds) => {
info!(
"Orchestrator (batch) generated {} commands for execution {} step {}",
cmds, execution_id, item.step
);
}
Err(e) => {
warn!(
"Orchestrator error in batch for execution {} step {}: {}",
execution_id, item.step, e
);
}
}
}
}
Ok(Json(BatchEventResponse {
status: "ok".to_string(),
event_ids,
commands_generated: 0,
}))
}
fn get_command_id(request: &EventRequest) -> Option<String> {
if let Some(id) = request.payload.get("command_id").and_then(|v| v.as_str()) {
return Some(id.to_string());
}
if let Some(meta) = &request.meta {
if let Some(id) = meta.get("command_id").and_then(|v| v.as_str()) {
return Some(id.to_string());
}
}
None
}
async fn check_already_claimed(
state: &AppState,
execution_id: i64,
command_id: &str,
worker_id: &Option<String>,
) -> AppResult<bool> {
let row: Option<(Option<String>, Option<serde_json::Value>)> =
sqlx::query_as::<_, (Option<String>, Option<serde_json::Value>)>(
r#"
SELECT worker_id, meta FROM noetl.event
WHERE execution_id = $1
AND event_type = 'command.claimed'
AND (meta->>'command_id' = $2 OR result->'data'->>'command_id' = $2)
LIMIT 1
"#,
)
.bind(execution_id)
.bind(command_id)
.fetch_optional(&state.db)
.await?;
if let Some((existing_worker, meta)) = row {
let existing_worker_id = existing_worker.or_else(|| {
meta.and_then(|m| {
m.get("worker_id")
.and_then(|v| v.as_str())
.map(String::from)
})
});
if let (Some(existing), Some(current)) = (&existing_worker_id, worker_id) {
if existing != current {
return Err(AppError::Conflict(format!(
"Command already claimed by {}",
existing
)));
}
return Ok(true);
}
}
Ok(false)
}
fn build_result_object(request: &EventRequest, status: &str) -> serde_json::Value {
let mut result = serde_json::Map::new();
result.insert("status".to_string(), serde_json::Value::String(status.to_string()));
match request.result_kind.as_str() {
"ref" if request.result_uri.is_some() => {
let uri = request.result_uri.as_ref().unwrap();
let store_tier = if uri.starts_with("gs://") {
"gcs"
} else if uri.starts_with("s3://") {
"s3"
} else {
"artifact"
};
result.insert(
"reference".to_string(),
serde_json::json!({
"store_tier": store_tier,
"logical_uri": uri,
}),
);
}
"refs" if request.event_ids.is_some() => {
let event_ids = request.event_ids.as_ref().unwrap();
result.insert(
"reference".to_string(),
serde_json::json!({
"event_ids": event_ids,
"total_parts": event_ids.len(),
}),
);
}
_ => {
if let serde_json::Value::Object(_) = request.payload {
result.insert("context".to_string(), request.payload.clone());
}
}
}
serde_json::Value::Object(result)
}
async fn get_catalog_id(state: &AppState, execution_id: i64) -> AppResult<Option<i64>> {
let row: Option<(i64,)> = sqlx::query_as::<_, (i64,)>(
"SELECT catalog_id FROM noetl.event WHERE execution_id = $1 LIMIT 1",
)
.bind(execution_id)
.fetch_optional(&state.db)
.await?;
Ok(row.map(|(id,)| id))
}
async fn generate_snowflake_id(state: &AppState) -> AppResult<i64> {
let row: (i64,) = sqlx::query_as::<_, (i64,)>("SELECT noetl.snowflake_id()")
.fetch_one(&state.db)
.await?;
Ok(row.0)
}
async fn generate_snowflake_id_with_tx(
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> AppResult<i64> {
let row: (i64,) = sqlx::query_as::<_, (i64,)>("SELECT noetl.snowflake_id()")
.fetch_one(&mut **tx)
.await?;
Ok(row.0)
}
fn event_status_from_name(event_name: &str) -> &'static str {
if event_name.contains("done")
|| event_name.contains("exit")
|| event_name.contains("completed")
{
"COMPLETED"
} else if event_name.contains("error") || event_name.contains("failed") {
"FAILED"
} else {
"RUNNING"
}
}
async fn trigger_orchestrator(
state: &AppState,
execution_id: i64,
trigger_event_id: i64,
) -> AppResult<i32> {
use crate::engine::WorkflowOrchestrator;
use sqlx::Row;
debug!(
execution_id,
trigger_event_id, "trigger_orchestrator: loading events"
);
let rows = sqlx::query(
r#"
SELECT event_id, execution_id, catalog_id,
parent_event_id, parent_execution_id,
event_type, node_id, node_name, node_type, status,
context, meta, result, worker_id,
NULLIF(meta->>'attempt', '')::int AS attempt,
created_at
FROM noetl.event
WHERE execution_id = $1
ORDER BY event_id ASC
"#,
)
.bind(execution_id)
.fetch_all(&state.db)
.await?;
let events: Vec<crate::db::models::Event> = rows
.into_iter()
.map(|r| crate::db::models::Event {
id: r.try_get("event_id").unwrap_or(0),
execution_id: r.try_get("execution_id").unwrap_or(0),
catalog_id: r.try_get("catalog_id").unwrap_or(0),
event_id: r.try_get("event_id").unwrap_or(0),
parent_event_id: r.try_get("parent_event_id").ok(),
parent_execution_id: r.try_get("parent_execution_id").ok(),
event_type: r.try_get("event_type").unwrap_or_default(),
node_id: r.try_get("node_id").ok(),
node_name: r.try_get("node_name").ok(),
node_type: r.try_get("node_type").ok(),
status: r.try_get("status").unwrap_or_default(),
context: r.try_get("context").ok(),
meta: r.try_get("meta").ok(),
result: r.try_get("result").ok(),
worker_id: r.try_get("worker_id").ok(),
attempt: r.try_get("attempt").ok(),
created_at: r.try_get("created_at").unwrap_or_else(|_| chrono::Utc::now()),
})
.collect();
if events.is_empty() {
debug!(execution_id, "No events to evaluate — orchestrator exit early");
return Ok(0);
}
let catalog_id = events
.iter()
.find_map(|e| {
if e.catalog_id > 0 {
Some(e.catalog_id)
} else {
None
}
})
.ok_or_else(|| {
AppError::Internal(format!(
"No catalog_id found in events for execution {execution_id}"
))
})?;
let playbook_yaml: String = sqlx::query_scalar(
"SELECT content FROM noetl.catalog WHERE catalog_id = $1",
)
.bind(catalog_id)
.fetch_one(&state.db)
.await
.map_err(|e| {
AppError::Internal(format!(
"Failed to load playbook for catalog_id {catalog_id}: {e}"
))
})?;
let playbook = crate::playbook::parser::parse_playbook(&playbook_yaml)?;
let orchestrator = WorkflowOrchestrator::new();
let result = orchestrator
.evaluate(&events, &playbook, Some("command.completed"))
.map_err(|e| AppError::Internal(format!("Orchestrator evaluate failed: {e}")))?;
info!(
execution_id,
trigger_event_id,
new_commands = result.commands.len(),
new_events = result.events_to_emit.len(),
should_complete = result.should_complete,
"Orchestrator evaluate complete"
);
for emit in &result.events_to_emit {
let event_id = generate_snowflake_id(state).await?;
let event_status = if emit.status.is_empty() {
"STARTED".to_string()
} else {
emit.status.clone()
};
let result_obj = match &emit.context {
Some(serde_json::Value::Object(_)) => serde_json::json!({
"status": event_status,
"context": emit.context.clone().unwrap(),
}),
_ => serde_json::json!({"status": event_status}),
};
sqlx::query(
r#"
INSERT INTO noetl.event (
event_id, execution_id, catalog_id, event_type,
node_id, node_name, status, result, meta, created_at, parent_event_id
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
"#,
)
.bind(event_id)
.bind(execution_id)
.bind(catalog_id)
.bind(&emit.event_type)
.bind(emit.node_name.as_deref())
.bind(emit.node_name.as_deref())
.bind(&event_status)
.bind(&result_obj)
.bind(serde_json::json!({"emitted_by": "orchestrator"}))
.bind(chrono::Utc::now())
.bind(trigger_event_id)
.execute(&state.db)
.await?;
}
let mut commands_generated = 0i32;
for command in &result.commands {
let step = playbook.get_step(&command.step_name).ok_or_else(|| {
AppError::Internal(format!(
"Orchestrator returned command for unknown step '{}'",
command.step_name
))
})?;
let render_context: std::collections::HashMap<String, serde_json::Value> =
command.context.clone().unwrap_or_default();
crate::handlers::execute::persist_engine_command(
state,
execution_id,
catalog_id,
trigger_event_id,
step,
command,
&render_context,
&playbook,
)
.await?;
commands_generated += 1;
}
if result.should_complete {
let (event_type, status) = match &result.completion_status {
Some(cs) if cs.status == "FAILED" => ("playbook.failed", "FAILED"),
_ => ("playbook.completed", "COMPLETED"),
};
let event_id = generate_snowflake_id(state).await?;
let terminal_meta = serde_json::to_value(&result.completion_status).unwrap_or_default();
sqlx::query(
r#"
INSERT INTO noetl.event (
event_id, execution_id, catalog_id, event_type,
node_id, node_name, status, result, meta, created_at, parent_event_id
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
"#,
)
.bind(event_id)
.bind(execution_id)
.bind(catalog_id)
.bind(event_type)
.bind("playbook")
.bind("playbook")
.bind(status)
.bind(serde_json::json!({"status": status}))
.bind(terminal_meta)
.bind(chrono::Utc::now())
.bind(trigger_event_id)
.execute(&state.db)
.await?;
info!(
execution_id,
terminal_event = %event_type,
"Orchestrator marked execution as terminal"
);
}
Ok(commands_generated)
}
#[cfg(test)]
mod tests {
use super::*;
fn test_request_skeleton() -> EventRequest {
EventRequest {
execution_id: "123".to_string(),
step: "step1".to_string(),
event_type: "step.exit".to_string(),
payload: serde_json::json!({}),
meta: None,
worker_id: None,
result_kind: "data".to_string(),
result_uri: None,
event_ids: None,
actionable: true,
informative: true,
event_id: None,
status: None,
created_at: None,
}
}
#[test]
fn test_event_request_defaults() {
let json = r#"{"execution_id": "123", "step": "step1", "event_type": "step.enter"}"#;
let request: EventRequest = serde_json::from_str(json).unwrap();
assert_eq!(request.event_type, "step.enter");
assert_eq!(request.result_kind, "data");
assert!(request.actionable);
assert!(request.informative);
assert!(request.event_id.is_none());
assert!(request.status.is_none());
assert!(request.created_at.is_none());
}
#[test]
fn test_legacy_name_alias_deserializes_into_event_type() {
let json = r#"{"execution_id": "123", "step": "step1", "name": "step.exit"}"#;
let request: EventRequest = serde_json::from_str(json).unwrap();
assert_eq!(request.event_type, "step.exit");
}
#[test]
fn test_context_alias_deserializes_into_payload() {
let json = r#"{
"execution_id": "123",
"step": "step1",
"event_type": "step.exit",
"context": {"result": 42}
}"#;
let request: EventRequest = serde_json::from_str(json).unwrap();
assert_eq!(request.payload["result"], 42);
}
#[test]
fn test_new_optional_fields_accept_executor_event_shape() {
let json = r#"{
"execution_id": "478775660589088776",
"event_type": "command.completed",
"step": "fetch_calendar",
"status": "COMPLETED",
"created_at": "2026-05-31T03:14:15Z",
"context": {"items": 42},
"event_id": "478775660589088777",
"worker_id": "worker-prod-7",
"meta": {"attempts": 2}
}"#;
let request: EventRequest = serde_json::from_str(json).unwrap();
assert_eq!(request.event_type, "command.completed");
assert_eq!(request.event_id.as_deref(), Some("478775660589088777"));
assert_eq!(request.status.as_deref(), Some("COMPLETED"));
assert_eq!(request.worker_id.as_deref(), Some("worker-prod-7"));
assert!(request.created_at.is_some());
}
#[test]
fn test_build_result_object_data() {
let request = EventRequest {
payload: serde_json::json!({"output": "success"}),
..test_request_skeleton()
};
let result = build_result_object(&request, "COMPLETED");
assert_eq!(result["status"], "COMPLETED");
assert_eq!(result["context"]["output"], "success");
assert!(result.get("kind").is_none());
assert!(result.get("data").is_none());
assert!(result.get("reference").is_none());
}
#[test]
fn test_build_result_object_data_with_null_payload_omits_context() {
let request = EventRequest {
payload: serde_json::Value::Null,
..test_request_skeleton()
};
let result = build_result_object(&request, "STARTED");
assert_eq!(result["status"], "STARTED");
assert!(result.get("context").is_none(),
"context must not be set when payload is non-object: {result}");
}
#[test]
fn test_build_result_object_data_with_primitive_payload_omits_context() {
let request = EventRequest {
payload: serde_json::json!("just a string"),
..test_request_skeleton()
};
let result = build_result_object(&request, "RUNNING");
assert!(result.get("context").is_none());
}
#[test]
fn test_build_result_object_ref() {
let request = EventRequest {
result_kind: "ref".to_string(),
result_uri: Some("gs://bucket/path/to/result".to_string()),
..test_request_skeleton()
};
let result = build_result_object(&request, "COMPLETED");
assert_eq!(result["status"], "COMPLETED");
let reference = &result["reference"];
assert_eq!(reference["store_tier"], "gcs");
assert_eq!(reference["logical_uri"], "gs://bucket/path/to/result");
assert!(result.get("kind").is_none());
assert!(result.get("store_tier").is_none());
assert!(result.get("logical_uri").is_none());
}
#[test]
fn test_build_result_object_refs() {
let request = EventRequest {
result_kind: "refs".to_string(),
event_ids: Some(vec![100, 200, 300]),
..test_request_skeleton()
};
let result = build_result_object(&request, "COMPLETED");
assert_eq!(result["status"], "COMPLETED");
let reference = &result["reference"];
assert_eq!(reference["event_ids"][0], 100);
assert_eq!(reference["total_parts"], 3);
assert!(result.get("event_ids").is_none(),
"event_ids should be nested under reference, not top-level");
}
#[test]
fn test_build_result_object_constraint_top_level_keys_only() {
let allowed: std::collections::HashSet<&str> =
["status", "reference", "context"].iter().copied().collect();
let cases: Vec<(&str, EventRequest)> = vec![
(
"data with object payload",
EventRequest {
payload: serde_json::json!({"k": "v"}),
..test_request_skeleton()
},
),
(
"data with null payload",
EventRequest {
payload: serde_json::Value::Null,
..test_request_skeleton()
},
),
(
"ref",
EventRequest {
result_kind: "ref".to_string(),
result_uri: Some("gs://foo".to_string()),
..test_request_skeleton()
},
),
(
"refs",
EventRequest {
result_kind: "refs".to_string(),
event_ids: Some(vec![1, 2]),
..test_request_skeleton()
},
),
];
for (label, req) in cases {
let r = build_result_object(&req, "OK");
let obj = r.as_object().expect("result must be object");
for k in obj.keys() {
assert!(
allowed.contains(k.as_str()),
"[{label}] disallowed top-level key: {k} (full result: {r})"
);
}
assert_eq!(r["status"], "OK", "[{label}] status must be present");
}
}
#[test]
fn test_batch_event_item_legacy_name_alias() {
let json = r#"{"step": "s", "name": "call.done", "payload": {}}"#;
let item: BatchEventItem = serde_json::from_str(json).unwrap();
assert_eq!(item.event_type, "call.done");
}
#[test]
fn test_event_response_serialization() {
let response = EventResponse {
status: "ok".to_string(),
event_id: 12345,
commands_generated: 2,
};
let json = serde_json::to_string(&response).unwrap();
assert!(json.contains("ok"));
assert!(json.contains("12345"));
}
#[test]
fn test_command_response_serialization() {
let response = CommandResponse {
execution_id: 12345,
node_id: "step1".to_string(),
node_name: "step1".to_string(),
action: "python".to_string(),
context: serde_json::json!({"tool_config": {}}),
meta: serde_json::json!({"attempt": 1}),
};
let json = serde_json::to_string(&response).unwrap();
assert!(json.contains("step1"));
assert!(json.contains("python"));
}
}