use axum::{
extract::{Path, State},
Json,
};
use serde::{Deserialize, Serialize};
use sqlx::Row;
use tracing::{debug, info, warn};
use crate::error::{AppError, AppResult};
use crate::sanitize::sanitize_sensitive_data;
use crate::state::AppState;
fn deserialize_string_or_i64<'de, D>(deserializer: D) -> std::result::Result<String, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::{self, Visitor};
use std::fmt;
struct StringOrI64;
impl<'de> Visitor<'de> for StringOrI64 {
type Value = String;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a string or signed/unsigned integer representing a snowflake id")
}
fn visit_str<E>(self, v: &str) -> std::result::Result<Self::Value, E>
where
E: de::Error,
{
Ok(v.to_string())
}
fn visit_string<E>(self, v: String) -> std::result::Result<Self::Value, E>
where
E: de::Error,
{
Ok(v)
}
fn visit_i64<E>(self, v: i64) -> std::result::Result<Self::Value, E>
where
E: de::Error,
{
Ok(v.to_string())
}
fn visit_u64<E>(self, v: u64) -> std::result::Result<Self::Value, E>
where
E: de::Error,
{
Ok(v.to_string())
}
}
deserializer.deserialize_any(StringOrI64)
}
fn deserialize_optional_string_or_i64<'de, D>(
deserializer: D,
) -> std::result::Result<Option<String>, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::{self, Visitor};
use std::fmt;
struct OptStringOrI64;
impl<'de> Visitor<'de> for OptStringOrI64 {
type Value = Option<String>;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str(
"null, a string, or a signed/unsigned integer representing an optional snowflake id",
)
}
fn visit_none<E>(self) -> std::result::Result<Self::Value, E>
where
E: de::Error,
{
Ok(None)
}
fn visit_unit<E>(self) -> std::result::Result<Self::Value, E>
where
E: de::Error,
{
Ok(None)
}
fn visit_some<D2>(self, deserializer: D2) -> std::result::Result<Self::Value, D2::Error>
where
D2: serde::Deserializer<'de>,
{
deserialize_string_or_i64(deserializer).map(Some)
}
fn visit_str<E>(self, v: &str) -> std::result::Result<Self::Value, E>
where
E: de::Error,
{
Ok(Some(v.to_string()))
}
fn visit_string<E>(self, v: String) -> std::result::Result<Self::Value, E>
where
E: de::Error,
{
Ok(Some(v))
}
fn visit_i64<E>(self, v: i64) -> std::result::Result<Self::Value, E>
where
E: de::Error,
{
Ok(Some(v.to_string()))
}
fn visit_u64<E>(self, v: u64) -> std::result::Result<Self::Value, E>
where
E: de::Error,
{
Ok(Some(v.to_string()))
}
}
deserializer.deserialize_any(OptStringOrI64)
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EventRequest {
#[serde(deserialize_with = "deserialize_string_or_i64")]
pub execution_id: String,
pub step: String,
#[serde(alias = "name")]
pub event_type: String,
#[serde(default, alias = "context")]
pub payload: serde_json::Value,
#[serde(skip_serializing_if = "Option::is_none")]
pub meta: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub worker_id: Option<String>,
#[serde(default = "default_result_kind")]
pub result_kind: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub result_uri: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub event_ids: Option<Vec<i64>>,
#[serde(default = "default_true")]
pub actionable: bool,
#[serde(default = "default_true")]
pub informative: bool,
#[serde(
default,
skip_serializing_if = "Option::is_none",
deserialize_with = "deserialize_optional_string_or_i64"
)]
pub event_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub status: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub created_at: Option<chrono::DateTime<chrono::Utc>>,
}
fn default_result_kind() -> String {
"data".to_string()
}
fn default_true() -> bool {
true
}
impl From<noetl_events::ExecutorEvent> for EventRequest {
fn from(ev: noetl_events::ExecutorEvent) -> Self {
Self {
execution_id: ev.execution_id.to_string(),
step: ev.step,
event_type: ev.event_type,
payload: ev.context,
meta: ev.meta,
worker_id: ev.worker_id,
result_kind: default_result_kind(),
result_uri: None,
event_ids: None,
actionable: true,
informative: true,
event_id: ev.event_id.map(|id| id.to_string()),
status: Some(ev.status),
created_at: Some(ev.created_at),
}
}
}
impl TryFrom<&EventRequest> for noetl_events::ExecutorEvent {
type Error = anyhow::Error;
fn try_from(req: &EventRequest) -> std::result::Result<Self, Self::Error> {
let execution_id: i64 = req.execution_id.parse().map_err(|e| {
anyhow::anyhow!(
"execution_id {:?} not parseable as i64: {e}",
req.execution_id
)
})?;
let event_id = req
.event_id
.as_deref()
.map(|s| s.parse::<i64>())
.transpose()
.map_err(|e| {
anyhow::anyhow!("event_id {:?} not parseable as i64: {e}", req.event_id)
})?;
let status = req
.status
.clone()
.unwrap_or_else(|| event_status_from_name(&req.event_type).to_string());
let created_at = req.created_at.unwrap_or_else(chrono::Utc::now);
Ok(Self {
execution_id,
event_type: req.event_type.clone(),
step: req.step.clone(),
status,
created_at,
context: req.payload.clone(),
event_id,
worker_id: req.worker_id.clone(),
meta: req.meta.clone(),
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EventResponse {
pub status: String,
pub event_id: i64,
pub commands_generated: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClaimRequest {
pub worker_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClaimResponse {
pub status: String,
pub event_id: i64,
pub execution_id: i64,
pub node_id: String,
pub node_name: String,
pub action: String,
pub context: serde_json::Value,
pub meta: serde_json::Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BatchEventItem {
pub step: String,
#[serde(alias = "name")]
pub event_type: String,
#[serde(default, alias = "context")]
pub payload: serde_json::Value,
#[serde(default)]
pub actionable: bool,
#[serde(default = "default_true")]
pub informative: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BatchEventRequest {
#[serde(deserialize_with = "deserialize_string_or_i64")]
pub execution_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub worker_id: Option<String>,
pub events: Vec<BatchEventItem>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BatchEventResponse {
pub status: String,
pub event_ids: Vec<i64>,
pub commands_generated: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CommandResponse {
pub execution_id: i64,
pub node_id: String,
pub node_name: String,
pub action: String,
pub context: serde_json::Value,
pub meta: serde_json::Value,
}
pub async fn handle_event(
state: State<AppState>,
request: Json<EventRequest>,
) -> Result<Json<EventResponse>, AppError> {
let event_type_for_metrics = request.0.event_type.clone();
let started_at = std::time::Instant::now();
let result = handle_event_inner(state, request).await;
let status_label = if result.is_ok() { "ok" } else { "error" };
let duration_seconds = started_at.elapsed().as_secs_f64();
crate::metrics::record_event_ingest(&event_type_for_metrics, status_label, duration_seconds);
result
}
async fn handle_event_inner(
State(state): State<AppState>,
Json(request): Json<EventRequest>,
) -> Result<Json<EventResponse>, AppError> {
debug!(
"Event received: execution_id={}, step={}, event_type={}",
request.execution_id, request.step, request.event_type
);
let execution_id: i64 = request
.execution_id
.parse()
.map_err(|_| AppError::Validation("Invalid execution_id".to_string()))?;
let skip_engine_events = [
"command.claimed",
"command.started",
"command.completed",
"command.failed",
"step.enter",
];
if request.event_type == "command.claimed" {
if let Some(command_id) = get_command_id(&request) {
if check_already_claimed(&state, execution_id, &command_id, &request.worker_id).await? {
return Ok(Json(EventResponse {
status: "ok".to_string(),
event_id: 0,
commands_generated: 0,
}));
}
}
}
let derived_status: String = request
.status
.clone()
.unwrap_or_else(|| event_status_from_name(&request.event_type).to_string());
let result_obj_raw = build_result_object(&request, &derived_status);
let result_obj = sanitize_sensitive_data(&result_obj_raw);
let event_id: i64 = match request.event_id.as_deref() {
Some(raw) => raw
.parse()
.map_err(|_| AppError::Validation(format!("Invalid event_id: {raw}")))?,
None => state.snowflake.generate()?,
};
let catalog_id = get_catalog_id(&state, execution_id).await?;
let mut meta_obj = request.meta.clone().unwrap_or(serde_json::json!({}));
if let serde_json::Value::Object(ref mut map) = meta_obj {
map.insert(
"actionable".to_string(),
serde_json::json!(request.actionable),
);
map.insert(
"informative".to_string(),
serde_json::json!(request.informative),
);
if let Some(ref worker_id) = request.worker_id {
map.insert("worker_id".to_string(), serde_json::json!(worker_id));
}
}
let meta_obj = sanitize_sensitive_data(&meta_obj);
let created_at = request.created_at.unwrap_or_else(chrono::Utc::now);
sqlx::query(
r#"
INSERT INTO noetl.event (
event_id, execution_id, catalog_id, event_type,
node_id, node_name, status, result, meta, created_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
"#,
)
.bind(event_id)
.bind(execution_id)
.bind(catalog_id)
.bind(&request.event_type)
.bind(&request.step)
.bind(&request.step)
.bind(&derived_status)
.bind(&result_obj)
.bind(&meta_obj)
.bind(created_at)
.execute(state.pools.pool_for(execution_id))
.await?;
info!(
"Event persisted: event_id={}, execution_id={}, event_type={}",
event_id, execution_id, request.event_type
);
let commands_generated = if !skip_engine_events.contains(&request.event_type.as_str()) {
debug!(
"Would process through engine: event_type={}",
request.event_type
);
0
} else {
debug!(
"Skipped engine for administrative event: {}",
request.event_type
);
0
};
let should_trigger_orchestrator =
request.event_type == "command.completed" || request.event_type == "command.failed";
if should_trigger_orchestrator {
match trigger_orchestrator(&state, execution_id, event_id).await {
Ok(cmds) => {
info!(
"Orchestrator generated {} commands for execution {}",
cmds, execution_id
);
}
Err(e) => {
warn!("Orchestrator error: {}", e);
}
}
}
Ok(Json(EventResponse {
status: "ok".to_string(),
event_id,
commands_generated,
}))
}
pub async fn get_command(
State(state): State<AppState>,
Path(event_id): Path<i64>,
) -> Result<Json<CommandResponse>, AppError> {
debug!("Getting command for event_id={}", event_id);
let found = state
.pools
.find_first(|_shard_idx, pool| async move {
sqlx::query_as::<_, (i64, String, String, serde_json::Value, serde_json::Value)>(
r#"
SELECT execution_id, node_name, node_type, context, meta
FROM noetl.event
WHERE event_id = $1 AND event_type = 'command.issued'
"#,
)
.bind(event_id)
.fetch_optional(&pool)
.await
})
.await?;
let row = found.map(|(_shard_idx, r)| r);
match row {
Some((execution_id, node_name, node_type, context, meta)) => Ok(Json(CommandResponse {
execution_id,
node_id: node_name.clone(),
node_name,
action: node_type,
context,
meta,
})),
None => Err(AppError::NotFound(format!(
"command.issued event not found: {}",
event_id
))),
}
}
pub async fn claim_command(
State(state): State<AppState>,
Path(event_id): Path<i64>,
Json(request): Json<ClaimRequest>,
) -> Result<Json<ClaimResponse>, AppError> {
debug!(
"Claim request received: event_id={}, worker_id={}",
event_id, request.worker_id
);
let resolved_execution_id: Option<i64> = state
.pools
.find_first(|_shard_idx, pool| async move {
sqlx::query_scalar::<_, i64>(
r#"
SELECT execution_id
FROM noetl.event
WHERE event_id = $1 AND event_type = 'command.issued'
"#,
)
.bind(event_id)
.fetch_optional(&pool)
.await
})
.await?
.map(|(_shard_idx, eid)| eid);
let resolved_execution_id = resolved_execution_id.ok_or_else(|| {
AppError::NotFound(format!("command.issued event not found: {}", event_id))
})?;
let mut tx = state.pools.pool_for(resolved_execution_id).begin().await?;
let cmd_row = sqlx::query(
r#"
SELECT execution_id, catalog_id, node_name, node_type, context, meta
FROM noetl.event
WHERE event_id = $1 AND event_type = 'command.issued'
"#,
)
.bind(event_id)
.fetch_optional(&mut *tx)
.await?;
let Some(row) = cmd_row else {
return Err(AppError::NotFound(format!(
"command.issued event not found: {}",
event_id
)));
};
let execution_id: i64 = row.try_get("execution_id")?;
let catalog_id: Option<i64> = row.try_get("catalog_id")?;
let step: String = row.try_get("node_name")?;
let tool_kind: String = row.try_get("node_type")?;
let context: serde_json::Value = row
.try_get("context")
.unwrap_or_else(|_| serde_json::json!({}));
let meta: serde_json::Value = row
.try_get("meta")
.unwrap_or_else(|_| serde_json::json!({}));
let command_id = meta
.get("command_id")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.unwrap_or_else(|| format!("{}:{}:{}", execution_id, step, event_id));
let terminal_row = sqlx::query(
r#"
SELECT event_type
FROM noetl.event
WHERE execution_id = $1
AND event_type IN ('command.completed', 'command.failed')
AND (meta->>'command_id' = $2 OR result->'data'->>'command_id' = $2)
ORDER BY event_id DESC
LIMIT 1
"#,
)
.bind(execution_id)
.bind(&command_id)
.fetch_optional(&mut *tx)
.await?;
if terminal_row.is_some() {
return Err(AppError::Conflict(
"Command already reached terminal state".to_string(),
));
}
let cancelled_row = sqlx::query(
r#"
SELECT 1
FROM noetl.event
WHERE execution_id = $1
AND event_type = 'execution.cancelled'
LIMIT 1
"#,
)
.bind(execution_id)
.fetch_optional(&mut *tx)
.await?;
if cancelled_row.is_some() {
return Err(AppError::Conflict(
"Execution has been cancelled".to_string(),
));
}
let lock_row =
sqlx::query("SELECT pg_try_advisory_xact_lock(hashtext($1)::bigint) AS lock_acquired")
.bind(&command_id)
.fetch_one(&mut *tx)
.await?;
let lock_acquired: bool = lock_row.try_get("lock_acquired")?;
if !lock_acquired {
return Err(AppError::Conflict(
"Command is being claimed by another worker".to_string(),
));
}
let existing_claim = sqlx::query(
r#"
SELECT worker_id, meta
FROM noetl.event
WHERE execution_id = $1
AND event_type = 'command.claimed'
AND (meta->>'command_id' = $2 OR result->'data'->>'command_id' = $2)
ORDER BY event_id DESC
LIMIT 1
"#,
)
.bind(execution_id)
.bind(&command_id)
.fetch_optional(&mut *tx)
.await?;
if let Some(existing) = existing_claim {
let worker_id_db: Option<String> = existing.try_get("worker_id").ok();
let worker_id_meta = existing
.try_get::<serde_json::Value, _>("meta")
.ok()
.and_then(|value| {
value
.get("worker_id")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
});
let existing_worker = worker_id_db.or(worker_id_meta);
if let Some(existing_worker_id) = existing_worker {
if existing_worker_id != request.worker_id {
return Err(AppError::Conflict(format!(
"Command already claimed by {}",
existing_worker_id
)));
}
tx.commit().await?;
return Ok(Json(ClaimResponse {
status: "ok".to_string(),
event_id,
execution_id,
node_id: step.clone(),
node_name: step,
action: tool_kind,
context,
meta,
}));
}
}
let claim_event_id = state.snowflake.generate()?;
let claim_result = serde_json::json!({
"status": "RUNNING",
"context": {
"command_id": command_id,
"worker_id": request.worker_id,
}
});
let claim_meta = serde_json::json!({
"command_id": command_id,
"worker_id": request.worker_id,
"actionable": false,
"informative": true,
});
sqlx::query(
r#"
INSERT INTO noetl.event (
event_id, execution_id, catalog_id, event_type,
node_id, node_name, status, result, meta, worker_id, created_at
) VALUES (
$1, $2, $3, $4,
$5, $6, $7, $8, $9, $10, $11
)
"#,
)
.bind(claim_event_id)
.bind(execution_id)
.bind(catalog_id)
.bind("command.claimed")
.bind(&step)
.bind(&step)
.bind("RUNNING")
.bind(claim_result)
.bind(claim_meta)
.bind(&request.worker_id)
.bind(chrono::Utc::now())
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(Json(ClaimResponse {
status: "ok".to_string(),
event_id,
execution_id,
node_id: step.clone(),
node_name: step,
action: tool_kind,
context,
meta,
}))
}
pub async fn handle_batch_events(
State(state): State<AppState>,
Json(request): Json<BatchEventRequest>,
) -> Result<Json<BatchEventResponse>, AppError> {
if request.events.is_empty() {
return Ok(Json(BatchEventResponse {
status: "ok".to_string(),
event_ids: Vec::new(),
commands_generated: 0,
}));
}
let execution_id: i64 = request
.execution_id
.parse()
.map_err(|_| AppError::Validation("Invalid execution_id".to_string()))?;
let catalog_id = get_catalog_id(&state, execution_id).await?;
let mut tx = state.pools.pool_for(execution_id).begin().await?;
let mut event_ids = Vec::with_capacity(request.events.len());
for item in &request.events {
let event_id = state.snowflake.generate()?;
let status = event_status_from_name(&item.event_type);
let mut result_map = serde_json::Map::new();
result_map.insert(
"status".to_string(),
serde_json::Value::String(status.to_string()),
);
if let serde_json::Value::Object(_) = item.payload {
result_map.insert("context".to_string(), item.payload.clone());
}
let result_obj_raw = serde_json::Value::Object(result_map);
let result_obj = sanitize_sensitive_data(&result_obj_raw);
let mut meta_obj = serde_json::json!({
"actionable": item.actionable,
"informative": item.informative,
});
if let Some(worker_id) = &request.worker_id {
if let serde_json::Value::Object(ref mut map) = meta_obj {
map.insert("worker_id".to_string(), serde_json::json!(worker_id));
}
}
let meta_obj = sanitize_sensitive_data(&meta_obj);
sqlx::query(
r#"
INSERT INTO noetl.event (
event_id, execution_id, catalog_id, event_type,
node_id, node_name, status, result, meta, worker_id, created_at
) VALUES (
$1, $2, $3, $4,
$5, $6, $7, $8, $9, $10, $11
)
"#,
)
.bind(event_id)
.bind(execution_id)
.bind(catalog_id)
.bind(&item.event_type)
.bind(&item.step)
.bind(&item.step)
.bind(status)
.bind(result_obj)
.bind(meta_obj)
.bind(&request.worker_id)
.bind(chrono::Utc::now())
.execute(&mut *tx)
.await?;
event_ids.push(event_id);
}
tx.commit().await?;
for (idx, item) in request.events.iter().enumerate() {
if item.event_type == "command.completed" {
let trigger_event_id = event_ids[idx];
match trigger_orchestrator(&state, execution_id, trigger_event_id).await {
Ok(cmds) => {
info!(
"Orchestrator (batch) generated {} commands for execution {} step {}",
cmds, execution_id, item.step
);
}
Err(e) => {
warn!(
"Orchestrator error in batch for execution {} step {}: {}",
execution_id, item.step, e
);
}
}
}
}
Ok(Json(BatchEventResponse {
status: "ok".to_string(),
event_ids,
commands_generated: 0,
}))
}
fn get_command_id(request: &EventRequest) -> Option<String> {
if let Some(id) = request.payload.get("command_id").and_then(|v| v.as_str()) {
return Some(id.to_string());
}
if let Some(meta) = &request.meta {
if let Some(id) = meta.get("command_id").and_then(|v| v.as_str()) {
return Some(id.to_string());
}
}
None
}
async fn check_already_claimed(
state: &AppState,
execution_id: i64,
command_id: &str,
worker_id: &Option<String>,
) -> AppResult<bool> {
let row: Option<(Option<String>, Option<serde_json::Value>)> =
sqlx::query_as::<_, (Option<String>, Option<serde_json::Value>)>(
r#"
SELECT worker_id, meta FROM noetl.event
WHERE execution_id = $1
AND event_type = 'command.claimed'
AND (meta->>'command_id' = $2 OR result->'data'->>'command_id' = $2)
LIMIT 1
"#,
)
.bind(execution_id)
.bind(command_id)
.fetch_optional(state.pools.pool_for(execution_id))
.await?;
if let Some((existing_worker, meta)) = row {
let existing_worker_id = existing_worker.or_else(|| {
meta.and_then(|m| {
m.get("worker_id")
.and_then(|v| v.as_str())
.map(String::from)
})
});
if let (Some(existing), Some(current)) = (&existing_worker_id, worker_id) {
if existing != current {
return Err(AppError::Conflict(format!(
"Command already claimed by {}",
existing
)));
}
return Ok(true);
}
}
Ok(false)
}
fn build_result_object(request: &EventRequest, status: &str) -> serde_json::Value {
let mut result = serde_json::Map::new();
result.insert(
"status".to_string(),
serde_json::Value::String(status.to_string()),
);
match request.result_kind.as_str() {
"ref" if request.result_uri.is_some() => {
let uri = request.result_uri.as_ref().unwrap();
let store_tier = if uri.starts_with("gs://") {
"gcs"
} else if uri.starts_with("s3://") {
"s3"
} else {
"artifact"
};
result.insert(
"reference".to_string(),
serde_json::json!({
"store_tier": store_tier,
"logical_uri": uri,
}),
);
}
"refs" if request.event_ids.is_some() => {
let event_ids = request.event_ids.as_ref().unwrap();
result.insert(
"reference".to_string(),
serde_json::json!({
"event_ids": event_ids,
"total_parts": event_ids.len(),
}),
);
}
_ => {
if let serde_json::Value::Object(_) = request.payload {
result.insert("context".to_string(), request.payload.clone());
}
}
}
serde_json::Value::Object(result)
}
async fn get_catalog_id(state: &AppState, execution_id: i64) -> AppResult<Option<i64>> {
let row: Option<(i64,)> = sqlx::query_as::<_, (i64,)>(
"SELECT catalog_id FROM noetl.event WHERE execution_id = $1 LIMIT 1",
)
.bind(execution_id)
.fetch_optional(state.pools.pool_for(execution_id))
.await?;
Ok(row.map(|(id,)| id))
}
fn event_status_from_name(event_name: &str) -> &'static str {
if event_name.contains("done")
|| event_name.contains("exit")
|| event_name.contains("completed")
{
"COMPLETED"
} else if event_name.contains("error") || event_name.contains("failed") {
"FAILED"
} else {
"RUNNING"
}
}
async fn hydrate_result_references(
events: &mut [crate::db::models::Event],
result_store: &crate::services::result_store::ResultStoreService,
keep_refs: bool,
) {
use crate::services::result_store::parse_noetl_ref;
enum Shape {
Nested,
TopLevel,
}
let mut hydrated = 0usize;
let mut kept = 0usize;
for ev in events.iter_mut() {
let Some(result) = ev.result.as_mut() else {
continue;
};
let (shape, ref_uri) = if let Some(u) = result
.pointer("/context/result/reference/ref")
.and_then(|v| v.as_str())
{
(Shape::Nested, u.to_string())
} else if let Some(u) = result.pointer("/reference/ref").and_then(|v| v.as_str()) {
(Shape::TopLevel, u.to_string())
} else {
continue;
};
if keep_refs {
let extracted = match shape {
Shape::Nested => result
.pointer("/context/result/reference/extracted")
.cloned(),
Shape::TopLevel => result.pointer("/reference/extracted").cloned(),
};
if let Some(extracted) = extracted {
let data = serde_json::json!({ "data": extracted });
match shape {
Shape::Nested => {
if let Some(inner) = result
.get_mut("context")
.and_then(|c| c.get_mut("result"))
.and_then(|r| r.as_object_mut())
{
inner.insert("context".to_string(), data);
kept += 1;
}
}
Shape::TopLevel => {
if let Some(obj) = result.as_object_mut() {
obj.insert("context".to_string(), data);
kept += 1;
}
}
}
continue; }
}
let parsed = match parse_noetl_ref(&ref_uri) {
Ok(p) => p,
Err(e) => {
warn!(execution_id = ev.execution_id, ref_uri, %e, "unparseable result reference; left as-is");
continue;
}
};
let resolved = match result_store.resolve(&parsed).await {
Ok(Some(data)) => data,
Ok(None) => {
warn!(execution_id = ev.execution_id, ref_uri, "result reference not found in store; left as-is");
continue;
}
Err(e) => {
warn!(execution_id = ev.execution_id, ref_uri, %e, "result reference resolution failed; left as-is");
continue;
}
};
match shape {
Shape::Nested => {
if let Some(inner) = result
.get_mut("context")
.and_then(|c| c.get_mut("result"))
.and_then(|r| r.as_object_mut())
{
inner.insert("context".to_string(), resolved);
inner.remove("reference");
hydrated += 1;
}
}
Shape::TopLevel => {
if let Some(obj) = result.as_object_mut() {
obj.insert("context".to_string(), resolved);
obj.remove("reference");
hydrated += 1;
}
}
}
}
if hydrated > 0 || kept > 0 {
debug!(
hydrated,
kept, "hydrate_result_references: resolved / kept-by-reference over-budget results"
);
}
}
const ORCH_EVENT_COLS: &str = r#"
SELECT event_id, execution_id, catalog_id,
parent_event_id, parent_execution_id,
event_type, node_id, node_name, node_type, status,
context, meta, result, worker_id,
NULLIF(meta->>'attempt', '')::int AS attempt,
created_at
FROM noetl.event "#;
fn parse_event_rows(rows: Vec<sqlx::postgres::PgRow>) -> Vec<crate::db::models::Event> {
use sqlx::Row;
rows.into_iter()
.map(|r| crate::db::models::Event {
id: r.try_get("event_id").unwrap_or(0),
execution_id: r.try_get("execution_id").unwrap_or(0),
catalog_id: r.try_get("catalog_id").unwrap_or(0),
event_id: r.try_get("event_id").unwrap_or(0),
parent_event_id: r.try_get("parent_event_id").ok(),
parent_execution_id: r.try_get("parent_execution_id").ok(),
event_type: r.try_get("event_type").unwrap_or_default(),
node_id: r.try_get("node_id").ok(),
node_name: r.try_get("node_name").ok(),
node_type: r.try_get("node_type").ok(),
status: r.try_get("status").unwrap_or_default(),
context: r.try_get("context").ok(),
meta: r.try_get("meta").ok(),
result: r.try_get("result").ok(),
worker_id: r.try_get("worker_id").ok(),
attempt: r.try_get("attempt").ok(),
created_at: r
.try_get("created_at")
.unwrap_or_else(|_| chrono::Utc::now()),
})
.collect()
}
struct RebuildResult {
state: crate::engine::state::WorkflowState,
last_event_id: i64,
snapshot_version: i64,
routing_meta: Option<serde_json::Value>,
}
const REBUILD_STRAGGLER_MARGIN_SECS: i64 = 30;
async fn resolve_cursor_claim_refs(
ws: &mut crate::engine::state::WorkflowState,
result_store: &crate::services::result_store::ResultStoreService,
) {
use crate::services::result_store::parse_noetl_ref;
for step in ws.steps.values_mut() {
if !step.is_cursor {
continue;
}
for frame in step.cursor_frames.values_mut() {
if !frame.claim_rows.is_empty() {
continue;
}
let Some(uri) = frame.claim_ref.clone() else {
continue;
};
let parsed = match parse_noetl_ref(&uri) {
Ok(p) => p,
Err(e) => {
warn!(uri, %e, "cursor claim reference unparseable; left as drain");
continue;
}
};
match result_store.resolve(&parsed).await {
Ok(Some(data)) => match extract_claim_rows(&data) {
Some(rows) => {
frame.claim_rows = rows;
frame.claim_ref = None;
}
None => warn!(uri, "cursor claim reference resolved but no rows array found"),
},
Ok(None) => warn!(uri, "cursor claim reference not found in store"),
Err(e) => warn!(uri, %e, "cursor claim reference resolve failed"),
}
}
}
}
fn extract_claim_rows(data: &serde_json::Value) -> Option<Vec<serde_json::Value>> {
for ptr in ["/rows", "/data/rows", "/result/rows", "/context/data/rows"] {
if let Some(arr) = data.pointer(ptr).and_then(|v| v.as_array()) {
return Some(arr.clone());
}
}
crate::engine::state::extract_user_data(data)
.and_then(|d| d.get("rows").and_then(|r| r.as_array()).cloned())
}
async fn rebuild_state(
pool: &crate::db::DbPool,
result_store: &crate::services::result_store::ResultStoreService,
execution_id: i64,
keep_refs: bool,
) -> AppResult<RebuildResult> {
use crate::services::orch_snapshot;
match orch_snapshot::load_latest(pool, execution_id).await? {
Some(snap) => {
let margin_floor =
snap.updated_at - chrono::Duration::seconds(REBUILD_STRAGGLER_MARGIN_SECS);
let mut events_since = parse_event_rows(
sqlx::query(&format!(
"{ORCH_EVENT_COLS} WHERE execution_id = $1 \
AND (event_id > $2 OR created_at > $3) ORDER BY event_id ASC"
))
.bind(execution_id)
.bind(snap.version)
.bind(margin_floor)
.fetch_all(pool)
.await?,
);
hydrate_result_references(&mut events_since, result_store, keep_refs).await;
let mut ws = snap.state;
for e in &events_since {
ws.apply_event(e);
}
let last_event_id = events_since
.iter()
.map(|e| e.event_id)
.max()
.unwrap_or(snap.version);
Ok(RebuildResult {
state: ws,
last_event_id,
snapshot_version: snap.version,
routing_meta: snap.routing_meta,
})
}
None => {
let mut all_events = parse_event_rows(
sqlx::query(&format!(
"{ORCH_EVENT_COLS} WHERE execution_id = $1 ORDER BY event_id ASC"
))
.bind(execution_id)
.fetch_all(pool)
.await?,
);
hydrate_result_references(&mut all_events, result_store, keep_refs).await;
let state = crate::engine::state::WorkflowState::from_events(&all_events)
.ok_or_else(|| AppError::Validation("No events found for execution".to_string()))?;
let last_event_id = all_events.last().map(|e| e.event_id).unwrap_or(0);
let routing_meta = all_events
.iter()
.find(|e| e.event_type == "playbook_started")
.and_then(|e| e.meta.clone());
Ok(RebuildResult {
state,
last_event_id,
snapshot_version: 0,
routing_meta,
})
}
}
}
async fn trigger_orchestrator(
state: &AppState,
execution_id: i64,
trigger_event_id: i64,
) -> AppResult<i32> {
trigger_orchestrator_inner(state, execution_id, trigger_event_id, false).await
}
pub fn spawn_orchestrator_reconciler(state: AppState) {
tokio::spawn(async move {
const RECONCILE_INTERVAL: std::time::Duration = std::time::Duration::from_secs(8);
loop {
tokio::time::sleep(RECONCILE_INTERVAL).await;
for execution_id in state.orch_cache.active_executions() {
match trigger_orchestrator_inner(&state, execution_id, i64::MAX, true).await {
Ok(n) if n > 0 => {
info!(execution_id, commands = n, "reconcile poller advanced a stuck execution")
}
Ok(_) => {}
Err(e) => {
warn!(execution_id, %e, "reconcile poller: orchestrator trigger failed")
}
}
}
}
});
}
async fn trigger_orchestrator_inner(
state: &AppState,
execution_id: i64,
trigger_event_id: i64,
force_count: bool,
) -> AppResult<i32> {
use crate::engine::WorkflowOrchestrator;
debug!(
execution_id,
trigger_event_id, force_count, "trigger_orchestrator: loading events"
);
let pool = state.pools.pool_for(execution_id);
let result_store = crate::services::result_store::ResultStoreService::new(
pool.clone(),
state.snowflake.clone(),
);
let cache_slot = state.orch_cache.entry(execution_id);
let mut cache = cache_slot.lock().await;
const COUNT_THROTTLE: std::time::Duration = std::time::Duration::from_millis(1000);
let now = std::time::Instant::now();
let do_count = force_count
|| cache.state.is_none()
|| cache
.last_count_check
.is_none_or(|t| now.duration_since(t) >= COUNT_THROTTLE);
let total: Option<i64> = if do_count {
cache.last_count_check = Some(now);
Some(
sqlx::query_scalar("SELECT COUNT(*) FROM noetl.event WHERE execution_id = $1")
.bind(execution_id)
.fetch_one(pool)
.await?,
)
} else {
None
};
let mut did_rebuild = false;
if cache.state.is_none() {
let r = rebuild_state(pool, &result_store, execution_id, state.config.refs_in_state).await?;
cache.state = Some(r.state);
cache.last_event_id = r.last_event_id;
cache.applied_count = total.unwrap_or(0);
cache.snapshot_version = r.snapshot_version;
cache.routing_meta = r.routing_meta;
did_rebuild = true;
}
let mut straggler_applied = false;
if !did_rebuild && trigger_event_id <= cache.last_event_id {
let mut strag = parse_event_rows(
sqlx::query(&format!(
"{ORCH_EVENT_COLS} WHERE execution_id = $1 AND event_id = $2"
))
.bind(execution_id)
.bind(trigger_event_id)
.fetch_all(pool)
.await?,
);
hydrate_result_references(&mut strag, &result_store, state.config.refs_in_state).await;
if let Some(ws) = cache.state.as_mut() {
for e in &strag {
ws.apply_event(e);
}
}
straggler_applied = !strag.is_empty();
cache.applied_count += strag.len() as i64;
}
let mut new_events: Vec<crate::db::models::Event> = parse_event_rows(
sqlx::query(&format!(
"{ORCH_EVENT_COLS} WHERE execution_id = $1 AND event_id > $2 ORDER BY event_id ASC"
))
.bind(execution_id)
.bind(cache.last_event_id)
.fetch_all(pool)
.await?,
);
hydrate_result_references(&mut new_events, &result_store, state.config.refs_in_state).await;
let trigger_event_type = new_events
.iter()
.find(|e| e.event_id == trigger_event_id)
.map(|e| e.event_type.clone())
.unwrap_or_else(|| "command.completed".to_string());
let latest_ts = new_events.last().map(|e| e.created_at);
let mismatch = matches!(total, Some(t) if cache.applied_count + new_events.len() as i64 != t);
if mismatch {
let r = rebuild_state(pool, &result_store, execution_id, state.config.refs_in_state).await?;
cache.state = Some(r.state);
cache.last_event_id = r.last_event_id;
cache.applied_count = total.unwrap_or(cache.applied_count);
cache.snapshot_version = r.snapshot_version;
cache.routing_meta = r.routing_meta;
did_rebuild = true;
} else if !new_events.is_empty() {
let ws = cache.state.as_mut().unwrap();
for e in &new_events {
ws.apply_event(e);
}
cache.applied_count += new_events.len() as i64;
cache.last_event_id = new_events
.last()
.map(|e| e.event_id)
.unwrap_or(cache.last_event_id);
}
if new_events.is_empty() && !did_rebuild && !straggler_applied && !force_count {
debug!(execution_id, "No new events to evaluate — orchestrator exit early");
return Ok(0);
}
const SNAPSHOT_INTERVAL_EVENTS: i64 = 500;
if total == Some(cache.applied_count)
&& cache.last_event_id - cache.snapshot_version >= SNAPSHOT_INTERVAL_EVENTS
{
let version = cache.last_event_id;
let applied = cache.applied_count;
let routing = cache.routing_meta.clone();
let saved = if let Some(ws) = cache.state.as_ref() {
crate::services::orch_snapshot::save(
pool,
execution_id,
version,
applied,
routing.as_ref(),
ws,
)
.await
} else {
Ok(())
};
match saved {
Ok(()) => cache.snapshot_version = version,
Err(e) => {
warn!(execution_id, %e, "orch_snapshot.save failed; continuing without snapshot")
}
}
}
let catalog_id = cache.state.as_ref().map(|s| s.catalog_id).unwrap_or(0);
if catalog_id == 0 {
return Err(AppError::Internal(format!(
"No catalog_id found for execution {execution_id}"
)));
}
let playbook_yaml: String =
sqlx::query_scalar("SELECT content FROM noetl.catalog WHERE catalog_id = $1")
.bind(catalog_id)
.fetch_one(state.pools.cluster())
.await
.map_err(|e| {
AppError::Internal(format!(
"Failed to load playbook for catalog_id {catalog_id}: {e}"
))
})?;
let playbook = crate::playbook::parser::parse_playbook(&playbook_yaml)?;
let orchestrator = WorkflowOrchestrator::new();
let ws = cache.state.as_mut().unwrap();
resolve_cursor_claim_refs(ws, &result_store).await;
let result = match orchestrator.evaluate_state(
ws,
latest_ts,
&playbook,
Some(trigger_event_type.as_str()),
) {
Ok(r) => r,
Err(e) => {
let msg = format!("Orchestrator evaluate failed: {e}");
warn!(
execution_id,
error = %msg,
"Orchestrator evaluate error is deterministic — terminating execution as FAILED"
);
emit_playbook_failed(state, execution_id, catalog_id, trigger_event_id, &msg).await?;
return Ok(0);
}
};
info!(
execution_id,
trigger_event_id,
new_commands = result.commands.len(),
new_events = result.events_to_emit.len(),
should_complete = result.should_complete,
"Orchestrator evaluate complete"
);
for emit in &result.events_to_emit {
let event_id = state.snowflake.generate()?;
let event_status = if emit.status.is_empty() {
"STARTED".to_string()
} else {
emit.status.clone()
};
let result_obj = match &emit.context {
Some(serde_json::Value::Object(_)) => serde_json::json!({
"status": event_status,
"context": emit.context.clone().unwrap(),
}),
_ => serde_json::json!({"status": event_status}),
};
sqlx::query(
r#"
INSERT INTO noetl.event (
event_id, execution_id, catalog_id, event_type,
node_id, node_name, status, result, meta, created_at, parent_event_id
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
"#,
)
.bind(event_id)
.bind(execution_id)
.bind(catalog_id)
.bind(&emit.event_type)
.bind(emit.node_name.as_deref())
.bind(emit.node_name.as_deref())
.bind(&event_status)
.bind(&result_obj)
.bind(serde_json::json!({"emitted_by": "orchestrator"}))
.bind(chrono::Utc::now())
.bind(trigger_event_id)
.execute(state.pools.pool_for(execution_id))
.await?;
}
let routing = cache
.routing_meta
.as_ref()
.map(crate::handlers::execute::CommandRouting::from_started_meta)
.unwrap_or_default();
let commands_generated = crate::handlers::execute::persist_engine_commands_batch(
state,
execution_id,
catalog_id,
trigger_event_id,
&result.commands,
&playbook,
&routing,
)
.await?;
if result.should_complete {
let (event_type, status) = match &result.completion_status {
Some(cs) if cs.status == "FAILED" => ("playbook.failed", "FAILED"),
_ => ("playbook.completed", "COMPLETED"),
};
let event_id = state.snowflake.generate()?;
let terminal_meta = serde_json::to_value(&result.completion_status).unwrap_or_default();
sqlx::query(
r#"
INSERT INTO noetl.event (
event_id, execution_id, catalog_id, event_type,
node_id, node_name, status, result, meta, created_at, parent_event_id
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
"#,
)
.bind(event_id)
.bind(execution_id)
.bind(catalog_id)
.bind(event_type)
.bind("playbook")
.bind("playbook")
.bind(status)
.bind(serde_json::json!({"status": status}))
.bind(terminal_meta)
.bind(chrono::Utc::now())
.bind(trigger_event_id)
.execute(state.pools.pool_for(execution_id))
.await?;
info!(
execution_id,
terminal_event = %event_type,
"Orchestrator marked execution as terminal"
);
drop(cache);
state.orch_cache.evict(execution_id);
}
Ok(commands_generated)
}
async fn emit_playbook_failed(
state: &AppState,
execution_id: i64,
catalog_id: i64,
trigger_event_id: i64,
error: &str,
) -> AppResult<()> {
let event_id = state.snowflake.generate()?;
sqlx::query(
r#"
INSERT INTO noetl.event (
event_id, execution_id, catalog_id, event_type,
node_id, node_name, status, result, meta, created_at, parent_event_id
) VALUES ($1, $2, $3, 'playbook.failed', 'playbook', 'playbook', 'FAILED', $4, $5, $6, $7)
"#,
)
.bind(event_id)
.bind(execution_id)
.bind(catalog_id)
.bind(serde_json::json!({"status": "FAILED", "context": {"error": error}}))
.bind(serde_json::json!({
"emitted_by": "orchestrator",
"reason": "evaluate_error",
"error": error,
}))
.bind(chrono::Utc::now())
.bind(trigger_event_id)
.execute(state.pools.pool_for(execution_id))
.await?;
info!(
execution_id,
terminal_event = "playbook.failed",
"Orchestrator evaluate error → execution terminated as FAILED"
);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
fn test_request_skeleton() -> EventRequest {
EventRequest {
execution_id: "123".to_string(),
step: "step1".to_string(),
event_type: "step.exit".to_string(),
payload: serde_json::json!({}),
meta: None,
worker_id: None,
result_kind: "data".to_string(),
result_uri: None,
event_ids: None,
actionable: true,
informative: true,
event_id: None,
status: None,
created_at: None,
}
}
#[test]
fn test_event_request_defaults() {
let json = r#"{"execution_id": "123", "step": "step1", "event_type": "step.enter"}"#;
let request: EventRequest = serde_json::from_str(json).unwrap();
assert_eq!(request.event_type, "step.enter");
assert_eq!(request.result_kind, "data");
assert!(request.actionable);
assert!(request.informative);
assert!(request.event_id.is_none());
assert!(request.status.is_none());
assert!(request.created_at.is_none());
}
#[test]
fn test_legacy_name_alias_deserializes_into_event_type() {
let json = r#"{"execution_id": "123", "step": "step1", "name": "step.exit"}"#;
let request: EventRequest = serde_json::from_str(json).unwrap();
assert_eq!(request.event_type, "step.exit");
}
#[test]
fn test_context_alias_deserializes_into_payload() {
let json = r#"{
"execution_id": "123",
"step": "step1",
"event_type": "step.exit",
"context": {"result": 42}
}"#;
let request: EventRequest = serde_json::from_str(json).unwrap();
assert_eq!(request.payload["result"], 42);
}
#[test]
fn test_new_optional_fields_accept_executor_event_shape() {
let json = r#"{
"execution_id": "478775660589088776",
"event_type": "command.completed",
"step": "fetch_calendar",
"status": "COMPLETED",
"created_at": "2026-05-31T03:14:15Z",
"context": {"items": 42},
"event_id": "478775660589088777",
"worker_id": "worker-prod-7",
"meta": {"attempts": 2}
}"#;
let request: EventRequest = serde_json::from_str(json).unwrap();
assert_eq!(request.event_type, "command.completed");
assert_eq!(request.event_id.as_deref(), Some("478775660589088777"));
assert_eq!(request.status.as_deref(), Some("COMPLETED"));
assert_eq!(request.worker_id.as_deref(), Some("worker-prod-7"));
assert!(request.created_at.is_some());
}
#[test]
fn test_event_request_accepts_integer_execution_id() {
let json = r#"{
"execution_id": 321079436235509760,
"step": "start",
"event_type": "step.start"
}"#;
let request: EventRequest = serde_json::from_str(json).unwrap();
assert_eq!(request.execution_id, "321079436235509760");
assert_eq!(request.step, "start");
}
#[test]
fn test_event_request_accepts_string_execution_id() {
let json = r#"{
"execution_id": "321079436235509760",
"step": "start",
"event_type": "step.start"
}"#;
let request: EventRequest = serde_json::from_str(json).unwrap();
assert_eq!(request.execution_id, "321079436235509760");
}
#[test]
fn test_event_request_accepts_integer_event_id() {
let json = r#"{
"execution_id": "1",
"step": "s",
"event_type": "e",
"event_id": 478775660589088777
}"#;
let request: EventRequest = serde_json::from_str(json).unwrap();
assert_eq!(request.event_id.as_deref(), Some("478775660589088777"));
}
#[test]
fn test_event_request_event_id_null_is_none() {
let json = r#"{
"execution_id": "1",
"step": "s",
"event_type": "e",
"event_id": null
}"#;
let request: EventRequest = serde_json::from_str(json).unwrap();
assert!(request.event_id.is_none());
}
#[test]
fn test_batch_event_request_accepts_integer_execution_id() {
let json = r#"{
"execution_id": 321079436235509760,
"worker_id": "worker-rust-pool-0",
"events": [
{
"step": "start",
"event_type": "step.start"
}
]
}"#;
let request: BatchEventRequest = serde_json::from_str(json).unwrap();
assert_eq!(request.execution_id, "321079436235509760");
assert_eq!(request.events.len(), 1);
}
#[test]
fn test_event_request_rejects_garbage_execution_id() {
let bogus_shapes = [
r#"{"execution_id": [1,2,3], "step": "s", "event_type": "e"}"#,
r#"{"execution_id": {"id": 1}, "step": "s", "event_type": "e"}"#,
];
for json in bogus_shapes {
let result: std::result::Result<EventRequest, _> = serde_json::from_str(json);
assert!(result.is_err(), "Expected reject for {}", json);
}
}
#[test]
fn test_build_result_object_data() {
let request = EventRequest {
payload: serde_json::json!({"output": "success"}),
..test_request_skeleton()
};
let result = build_result_object(&request, "COMPLETED");
assert_eq!(result["status"], "COMPLETED");
assert_eq!(result["context"]["output"], "success");
assert!(result.get("kind").is_none());
assert!(result.get("data").is_none());
assert!(result.get("reference").is_none());
}
#[test]
fn test_build_result_object_data_with_null_payload_omits_context() {
let request = EventRequest {
payload: serde_json::Value::Null,
..test_request_skeleton()
};
let result = build_result_object(&request, "STARTED");
assert_eq!(result["status"], "STARTED");
assert!(
result.get("context").is_none(),
"context must not be set when payload is non-object: {result}"
);
}
#[test]
fn test_build_result_object_data_with_primitive_payload_omits_context() {
let request = EventRequest {
payload: serde_json::json!("just a string"),
..test_request_skeleton()
};
let result = build_result_object(&request, "RUNNING");
assert!(result.get("context").is_none());
}
#[test]
fn test_build_result_object_ref() {
let request = EventRequest {
result_kind: "ref".to_string(),
result_uri: Some("gs://bucket/path/to/result".to_string()),
..test_request_skeleton()
};
let result = build_result_object(&request, "COMPLETED");
assert_eq!(result["status"], "COMPLETED");
let reference = &result["reference"];
assert_eq!(reference["store_tier"], "gcs");
assert_eq!(reference["logical_uri"], "gs://bucket/path/to/result");
assert!(result.get("kind").is_none());
assert!(result.get("store_tier").is_none());
assert!(result.get("logical_uri").is_none());
}
#[test]
fn test_build_result_object_refs() {
let request = EventRequest {
result_kind: "refs".to_string(),
event_ids: Some(vec![100, 200, 300]),
..test_request_skeleton()
};
let result = build_result_object(&request, "COMPLETED");
assert_eq!(result["status"], "COMPLETED");
let reference = &result["reference"];
assert_eq!(reference["event_ids"][0], 100);
assert_eq!(reference["total_parts"], 3);
assert!(
result.get("event_ids").is_none(),
"event_ids should be nested under reference, not top-level"
);
}
#[test]
fn test_build_result_object_constraint_top_level_keys_only() {
let allowed: std::collections::HashSet<&str> =
["status", "reference", "context"].iter().copied().collect();
let cases: Vec<(&str, EventRequest)> = vec![
(
"data with object payload",
EventRequest {
payload: serde_json::json!({"k": "v"}),
..test_request_skeleton()
},
),
(
"data with null payload",
EventRequest {
payload: serde_json::Value::Null,
..test_request_skeleton()
},
),
(
"ref",
EventRequest {
result_kind: "ref".to_string(),
result_uri: Some("gs://foo".to_string()),
..test_request_skeleton()
},
),
(
"refs",
EventRequest {
result_kind: "refs".to_string(),
event_ids: Some(vec![1, 2]),
..test_request_skeleton()
},
),
];
for (label, req) in cases {
let r = build_result_object(&req, "OK");
let obj = r.as_object().expect("result must be object");
for k in obj.keys() {
assert!(
allowed.contains(k.as_str()),
"[{label}] disallowed top-level key: {k} (full result: {r})"
);
}
assert_eq!(r["status"], "OK", "[{label}] status must be present");
}
}
#[test]
fn test_batch_event_item_legacy_name_alias() {
let json = r#"{"step": "s", "name": "call.done", "payload": {}}"#;
let item: BatchEventItem = serde_json::from_str(json).unwrap();
assert_eq!(item.event_type, "call.done");
}
#[test]
fn test_event_response_serialization() {
let response = EventResponse {
status: "ok".to_string(),
event_id: 12345,
commands_generated: 2,
};
let json = serde_json::to_string(&response).unwrap();
assert!(json.contains("ok"));
assert!(json.contains("12345"));
}
#[test]
fn test_command_response_serialization() {
let response = CommandResponse {
execution_id: 12345,
node_id: "step1".to_string(),
node_name: "step1".to_string(),
action: "python".to_string(),
context: serde_json::json!({"tool_config": {}}),
meta: serde_json::json!({"attempt": 1}),
};
let json = serde_json::to_string(&response).unwrap();
assert!(json.contains("step1"));
assert!(json.contains("python"));
}
#[test]
fn ee4_executor_event_converts_into_event_request() {
let executor_event = noetl_events::ExecutorEvent {
execution_id: 478775660589088776,
event_type: "command.completed".to_string(),
step: "fetch_calendar".to_string(),
status: "COMPLETED".to_string(),
created_at: chrono::DateTime::parse_from_rfc3339("2026-05-31T03:14:15Z")
.unwrap()
.with_timezone(&chrono::Utc),
context: serde_json::json!({"items": 42}),
event_id: Some(478775660589088777),
worker_id: Some("worker-prod-7".to_string()),
meta: Some(serde_json::json!({"attempts": 2})),
};
let req: EventRequest = executor_event.clone().into();
assert_eq!(req.execution_id, "478775660589088776");
assert_eq!(req.event_id.as_deref(), Some("478775660589088777"));
assert_eq!(req.event_type, executor_event.event_type);
assert_eq!(req.step, executor_event.step);
assert_eq!(req.status.as_deref(), Some(executor_event.status.as_str()));
assert_eq!(req.created_at, Some(executor_event.created_at));
assert_eq!(req.payload, executor_event.context);
assert_eq!(req.worker_id, executor_event.worker_id);
assert_eq!(req.meta, executor_event.meta);
assert_eq!(req.result_kind, "data");
assert!(req.result_uri.is_none());
assert!(req.event_ids.is_none());
assert!(req.actionable);
assert!(req.informative);
}
#[test]
fn ee4_event_request_converts_into_executor_event() {
let req = EventRequest {
execution_id: "478775660589088776".to_string(),
step: "fetch_calendar".to_string(),
event_type: "command.completed".to_string(),
payload: serde_json::json!({"items": 42}),
meta: Some(serde_json::json!({"attempts": 2})),
worker_id: Some("worker-prod-7".to_string()),
result_kind: "data".to_string(),
result_uri: None,
event_ids: None,
actionable: true,
informative: true,
event_id: Some("478775660589088777".to_string()),
status: Some("COMPLETED".to_string()),
created_at: Some(
chrono::DateTime::parse_from_rfc3339("2026-05-31T03:14:15Z")
.unwrap()
.with_timezone(&chrono::Utc),
),
};
let ev: noetl_events::ExecutorEvent =
(&req).try_into().expect("convert with explicit fields");
assert_eq!(ev.execution_id, 478775660589088776_i64);
assert_eq!(ev.event_id, Some(478775660589088777_i64));
assert_eq!(ev.status, "COMPLETED");
assert_eq!(ev.created_at, req.created_at.unwrap());
assert_eq!(ev.context, req.payload);
assert_eq!(ev.worker_id, req.worker_id);
assert_eq!(ev.meta, req.meta);
}
#[test]
fn ee4_try_from_event_request_fills_defaults_for_missing_status_and_created_at() {
let mut req = test_request_skeleton();
req.event_type = "command.completed".to_string();
req.status = None;
req.created_at = None;
let ev: noetl_events::ExecutorEvent = (&req).try_into().expect("convert with defaults");
assert_eq!(ev.status, "COMPLETED"); let age = chrono::Utc::now() - ev.created_at;
assert!(age.num_seconds() >= 0 && age.num_seconds() < 60);
}
#[test]
fn ee4_try_from_event_request_rejects_non_numeric_execution_id() {
let req = EventRequest {
execution_id: "not-a-number".to_string(),
..test_request_skeleton()
};
let err = noetl_events::ExecutorEvent::try_from(&req).unwrap_err();
assert!(
err.to_string().contains("execution_id"),
"error should mention the field name: {err}"
);
}
}