use std::collections::HashMap;
use axum::{extract::State, Json};
use serde::{Deserialize, Serialize};
use tracing::{debug, info};
use crate::error::{AppError, AppResult};
use crate::state::AppState;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExecuteRequest {
#[serde(skip_serializing_if = "Option::is_none")]
pub path: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub catalog_id: Option<i64>,
#[serde(default, alias = "workload")]
pub payload: HashMap<String, serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub parent_execution_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub execution_pool: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub trace: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub dedup: Option<DedupSpec>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DedupSpec {
pub key: String,
#[serde(default = "default_dedup_window_secs")]
pub window_secs: u64,
}
fn default_dedup_window_secs() -> u64 {
crate::db::queries::subscription_dedup::DEFAULT_WINDOW_SECS
}
#[derive(Debug, Clone, Default)]
pub(crate) struct CommandRouting {
pub pool: Option<String>,
pub trace: Option<serde_json::Value>,
}
impl CommandRouting {
pub(crate) fn from_started_meta(meta: &serde_json::Value) -> CommandRouting {
CommandRouting {
pool: meta
.get("execution_pool")
.and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
.map(str::to_string),
trace: meta.get("trace").filter(|v| !v.is_null()).cloned(),
}
}
}
impl ExecuteRequest {
pub fn validate(&self) -> Result<(), String> {
if self.path.is_none() && self.catalog_id.is_none() {
return Err("Either 'path' or 'catalog_id' must be provided".to_string());
}
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExecuteResponse {
pub execution_id: String,
pub status: String,
pub commands_generated: i32,
}
pub async fn execute(
State(state): State<AppState>,
Json(request): Json<ExecuteRequest>,
) -> Result<Json<ExecuteResponse>, AppError> {
let outcome = execute_one(&state, request, "single").await?;
Ok(Json(outcome.into_response()))
}
const MAX_BATCH_ITEMS: usize = 1000;
#[derive(Debug, Deserialize)]
pub struct BatchExecuteRequest {
pub executions: Vec<ExecuteRequest>,
}
#[derive(Debug, Clone, Serialize)]
pub struct BatchItemResult {
pub index: usize,
pub status: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub execution_id: Option<String>,
#[serde(default)]
pub commands_generated: i32,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct BatchExecuteResponse {
pub count: usize,
pub started: usize,
pub duplicates: usize,
pub failed: usize,
pub results: Vec<BatchItemResult>,
}
pub async fn execute_batch(
State(state): State<AppState>,
Json(request): Json<BatchExecuteRequest>,
) -> Result<Json<BatchExecuteResponse>, AppError> {
let n = request.executions.len();
if n == 0 {
return Err(AppError::Validation(
"execute batch requires a non-empty 'executions' array".to_string(),
));
}
if n > MAX_BATCH_ITEMS {
return Err(AppError::Validation(format!(
"execute batch size {} exceeds the {} cap",
n, MAX_BATCH_ITEMS
)));
}
let span = tracing::info_span!("execute.batch", batch_size = n);
let _guard = span.enter();
crate::metrics::record_execute_batch_size(n);
let mut results = Vec::with_capacity(n);
let mut started = 0usize;
let mut duplicates = 0usize;
let mut failed = 0usize;
for (index, req) in request.executions.into_iter().enumerate() {
match execute_one(&state, req, "batch").await {
Ok(outcome) => {
if outcome.status == "duplicate" {
duplicates += 1;
} else {
started += 1;
}
results.push(BatchItemResult {
index,
status: outcome.status.to_string(),
execution_id: Some(outcome.execution_id.to_string()),
commands_generated: outcome.commands_generated,
error: None,
});
}
Err(e) => {
failed += 1;
crate::metrics::record_execute_outcome("batch", "error");
tracing::warn!(index, error = %e, "batch item failed (continuing)");
results.push(BatchItemResult {
index,
status: "error".to_string(),
execution_id: None,
commands_generated: 0,
error: Some(e.to_string()),
});
}
}
}
info!(
batch_size = n,
started, duplicates, failed, "execute batch complete"
);
Ok(Json(BatchExecuteResponse {
count: n,
started,
duplicates,
failed,
results,
}))
}
#[derive(Debug, Clone)]
pub(crate) struct ExecuteOutcome {
pub execution_id: i64,
pub status: &'static str,
pub commands_generated: i32,
}
impl ExecuteOutcome {
fn into_response(self) -> ExecuteResponse {
ExecuteResponse {
execution_id: self.execution_id.to_string(),
status: self.status.to_string(),
commands_generated: self.commands_generated,
}
}
}
pub(crate) async fn execute_one(
state: &AppState,
request: ExecuteRequest,
entry: &'static str,
) -> Result<ExecuteOutcome, AppError> {
request.validate().map_err(AppError::Validation)?;
debug!(
"Execute request: path={:?}, catalog_id={:?}",
request.path, request.catalog_id
);
let execution_id = state.snowflake.generate()?;
if let Some(dedup) = request.dedup.as_ref() {
let scope = request.parent_execution_id.unwrap_or(0);
use crate::db::queries::subscription_dedup::{claim, DedupOutcome};
match claim(
state.pools.cluster(),
scope,
&dedup.key,
dedup.window_secs,
execution_id,
)
.await?
{
DedupOutcome::Duplicate {
existing_execution_id,
} => {
emit_deduplicated_event(state, scope, &dedup.key, existing_execution_id, execution_id)
.await;
crate::metrics::record_execute_outcome(entry, "duplicate");
info!(
subscription_id = scope,
existing_execution_id,
suppressed_execution_id = execution_id,
dedup_key = %dedup.key,
"dedup window collapsed a duplicate delivery to the existing execution"
);
return Ok(ExecuteOutcome {
execution_id: existing_execution_id,
status: "duplicate",
commands_generated: 0,
});
}
DedupOutcome::Fresh => { }
}
}
let (catalog_id, path) = resolve_catalog(state, &request).await?;
info!(
"Starting execution for path={}, catalog_id={}",
path, catalog_id
);
let playbook_yaml = get_playbook_yaml(state, catalog_id).await?;
let playbook = crate::playbook::parser::parse_playbook(&playbook_yaml)?;
let mut merged_workload = serde_json::Map::new();
if let Some(serde_json::Value::Object(map)) = &playbook.workload {
for (k, v) in map {
merged_workload.insert(k.clone(), v.clone());
}
}
for (k, v) in &request.payload {
merged_workload.insert(k.clone(), v.clone());
}
let workload = serde_json::Value::Object(merged_workload);
let trace = match (&request.trace, request.parent_execution_id) {
(Some(t), _) if !t.is_null() => Some(t.clone()),
(_, Some(parent)) => inherit_parent_trace(state, parent).await,
_ => None,
};
let routing = CommandRouting {
pool: request
.execution_pool
.clone()
.filter(|s| !s.is_empty()),
trace,
};
let start_event_id = emit_playbook_started_event(
state,
execution_id,
catalog_id,
&path,
&workload,
request.parent_execution_id,
&routing,
)
.await?;
let commands_generated = generate_initial_commands(
state,
execution_id,
catalog_id,
start_event_id,
&playbook,
&request.payload,
&routing,
)
.await?;
info!(
"Execution started: execution_id={}, commands_generated={}",
execution_id, commands_generated
);
crate::metrics::record_execute_outcome(entry, "new");
Ok(ExecuteOutcome {
execution_id,
status: "started",
commands_generated,
})
}
async fn emit_deduplicated_event(
state: &AppState,
scope: i64,
dedup_key: &str,
existing_execution_id: i64,
suppressed_execution_id: i64,
) {
let event_id = match state.snowflake.generate() {
Ok(id) => id,
Err(e) => {
tracing::warn!(subscription_id = scope, error = %e, "dedup audit: snowflake gen failed");
return;
}
};
let catalog_id: Option<i64> = sqlx::query_scalar(
"SELECT catalog_id FROM noetl.event WHERE execution_id = $1 ORDER BY event_id ASC LIMIT 1",
)
.bind(scope)
.fetch_optional(state.pools.pool_for(scope))
.await
.ok()
.flatten();
let Some(catalog_id) = catalog_id else {
tracing::debug!(subscription_id = scope, "dedup audit: no catalog_id for scope; skipping audit event");
return;
};
let context = serde_json::json!({
"subscription_id": scope.to_string(),
"dedup_key": dedup_key,
"original_execution_id": existing_execution_id.to_string(),
"suppressed_execution_id": suppressed_execution_id.to_string(),
"duplicate_suppressed": true,
});
let meta = serde_json::json!({
"emitted_at": chrono::Utc::now().to_rfc3339(),
"emitter": "control_plane",
});
let res = sqlx::query(
r#"
INSERT INTO noetl.event (
execution_id, catalog_id, event_id, event_type, node_id, node_name, node_type,
status, context, meta, created_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
"#,
)
.bind(scope)
.bind(catalog_id)
.bind(event_id)
.bind("subscription.message.deduplicated")
.bind("subscription")
.bind("ingress")
.bind("subscription")
.bind("DEDUPLICATED")
.bind(&context)
.bind(&meta)
.bind(chrono::Utc::now())
.execute(state.pools.pool_for(scope))
.await;
if let Err(e) = res {
tracing::warn!(subscription_id = scope, error = %e, "dedup audit event insert failed (non-fatal)");
}
}
async fn resolve_catalog(state: &AppState, request: &ExecuteRequest) -> AppResult<(i64, String)> {
if let Some(catalog_id) = request.catalog_id {
let entry = sqlx::query_as::<_, (i64, String)>(
"SELECT catalog_id, path FROM noetl.catalog WHERE catalog_id = $1",
)
.bind(catalog_id)
.fetch_optional(state.pools.cluster())
.await?
.ok_or_else(|| AppError::NotFound(format!("Catalog entry not found: {}", catalog_id)))?;
Ok(entry)
} else if let Some(path) = &request.path {
let entry = sqlx::query_as::<_, (i64, String)>(
"SELECT catalog_id, path FROM noetl.catalog WHERE path = $1 ORDER BY version DESC LIMIT 1",
)
.bind(path)
.fetch_optional(state.pools.cluster())
.await?
.ok_or_else(|| AppError::NotFound(format!("Playbook not found: {}", path)))?;
Ok(entry)
} else {
Err(AppError::Validation(
"Either path or catalog_id must be provided".to_string(),
))
}
}
async fn get_playbook_yaml(state: &AppState, catalog_id: i64) -> AppResult<String> {
let row: (Option<String>, Option<serde_json::Value>) =
sqlx::query_as::<_, (Option<String>, Option<serde_json::Value>)>(
"SELECT content, payload FROM noetl.catalog WHERE catalog_id = $1",
)
.bind(catalog_id)
.fetch_optional(state.pools.cluster())
.await?
.ok_or_else(|| AppError::NotFound(format!("Catalog entry not found: {}", catalog_id)))?;
match row {
(Some(content), _) if !content.is_empty() => Ok(content),
(_, Some(payload)) => {
serde_yaml::to_string(&payload).map_err(|e| {
AppError::Internal(format!("Failed to convert payload to YAML: {}", e))
})
}
_ => Err(AppError::NotFound(format!(
"No playbook content found for catalog_id: {}",
catalog_id
))),
}
}
async fn emit_playbook_started_event(
state: &AppState,
execution_id: i64,
catalog_id: i64,
path: &str,
workload: &serde_json::Value,
parent_execution_id: Option<i64>,
routing: &CommandRouting,
) -> AppResult<i64> {
let event_id = state.snowflake.generate()?;
let context = serde_json::json!({
"catalog_id": catalog_id.to_string(),
"execution_id": execution_id.to_string(),
"path": path,
"workload": workload,
});
let mut meta = serde_json::json!({
"emitted_at": chrono::Utc::now().to_rfc3339(),
"emitter": "control_plane",
});
if let serde_json::Value::Object(ref mut m) = meta {
if let Some(pool) = routing.pool.as_ref() {
m.insert("execution_pool".to_string(), serde_json::json!(pool));
}
if let Some(trace) = routing.trace.as_ref() {
m.insert("trace".to_string(), trace.clone());
}
}
sqlx::query(
r#"
INSERT INTO noetl.event (
execution_id, catalog_id, event_id, parent_execution_id,
event_type, node_id, node_name, node_type, status,
context, meta, created_at
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12
)
"#,
)
.bind(execution_id)
.bind(catalog_id)
.bind(event_id)
.bind(parent_execution_id)
.bind("playbook_started")
.bind("playbook")
.bind(path)
.bind("execution")
.bind("STARTED")
.bind(&context)
.bind(&meta)
.bind(chrono::Utc::now())
.execute(state.pools.pool_for(execution_id))
.await?;
Ok(event_id)
}
async fn inherit_parent_trace(state: &AppState, parent_execution_id: i64) -> Option<serde_json::Value> {
let meta: Option<serde_json::Value> = sqlx::query_scalar(
"SELECT meta FROM noetl.event WHERE execution_id = $1 AND event_type = 'playbook_started' \
ORDER BY event_id ASC LIMIT 1",
)
.bind(parent_execution_id)
.fetch_optional(state.pools.pool_for(parent_execution_id))
.await
.ok()
.flatten();
meta.and_then(|m| m.get("trace").filter(|v| !v.is_null()).cloned())
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn persist_engine_command(
state: &AppState,
execution_id: i64,
catalog_id: i64,
parent_event_id: i64,
step: &crate::playbook::types::Step,
command: &crate::engine::commands::Command,
render_context: &HashMap<String, serde_json::Value>,
playbook: &crate::playbook::types::Playbook,
routing: &CommandRouting,
) -> AppResult<i64> {
let event_id = state.snowflake.generate()?;
let command_id = if let Some(iter) = command.iterator.as_ref() {
format!(
"{}:{}:{}:i{}",
execution_id, step.step, event_id, iter.index
)
} else {
format!("{}:{}:{}", execution_id, step.step, event_id)
};
let cmd_args = match &step.args {
Some(map) => serde_json::to_value(map).unwrap_or_else(|_| serde_json::json!({})),
None => serde_json::json!({}),
};
let cmd_context = serde_json::json!({
"tool_config": command.tool.config,
"args": cmd_args,
"render_context": render_context,
});
let mut cmd_meta = serde_json::json!({
"command_id": command_id,
"step": step.step,
"tool_kind": command.tool.kind,
"max_attempts": 3,
"attempt": 1,
"execution_id": execution_id.to_string(),
"catalog_id": catalog_id.to_string(),
"actionable": true,
});
if let Some(iter) = command.iterator.as_ref() {
if let serde_json::Value::Object(ref mut map) = cmd_meta {
map.insert("iteration_index".to_string(), serde_json::json!(iter.index));
map.insert("iteration_total".to_string(), serde_json::json!(iter.total));
map.insert(
"iterator_step".to_string(),
serde_json::json!(iter.iterator_step.clone()),
);
map.insert(
"item_var".to_string(),
serde_json::json!(iter.item_var.clone()),
);
}
}
if let Some(trace) = routing.trace.as_ref() {
if let serde_json::Value::Object(ref mut map) = cmd_meta {
map.insert("trace".to_string(), trace.clone());
}
}
if let Some(serde_json::Value::Object(extra)) = command.metadata.as_ref() {
if let serde_json::Value::Object(ref mut map) = cmd_meta {
for (k, v) in extra {
map.insert(k.clone(), v.clone());
}
}
}
sqlx::query(
r#"
INSERT INTO noetl.event (
event_id, execution_id, catalog_id, event_type,
node_id, node_name, node_type, status,
context, meta, parent_event_id, created_at
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12
)
"#,
)
.bind(event_id)
.bind(execution_id)
.bind(catalog_id)
.bind("command.issued")
.bind(&step.step)
.bind(&step.step)
.bind(command.tool.kind.as_str())
.bind("PENDING")
.bind(&cmd_context)
.bind(&cmd_meta)
.bind(parent_event_id)
.bind(chrono::Utc::now())
.execute(state.pools.pool_for(execution_id))
.await?;
if let Err(e) = insert_command_row(
state,
execution_id,
event_id,
catalog_id,
parent_event_id,
&step.step,
command.tool.kind.as_str(),
&cmd_context,
&cmd_meta,
)
.await
{
tracing::warn!(
error = %e,
execution_id,
event_id,
"Failed to insert noetl.command row (non-fatal — event log is source of truth)"
);
}
publish_command_notification(
state,
execution_id,
event_id,
&command_id,
&step.step,
command.tool.kind.as_str(),
playbook,
routing,
)
.await?;
Ok(event_id)
}
#[allow(clippy::too_many_arguments)]
async fn generate_initial_commands(
state: &AppState,
execution_id: i64,
catalog_id: i64,
parent_event_id: i64,
playbook: &crate::playbook::types::Playbook,
payload: &HashMap<String, serde_json::Value>,
routing: &CommandRouting,
) -> AppResult<i32> {
let start_step = playbook
.get_step("start")
.ok_or_else(|| AppError::Validation("Start step 'start' not found".to_string()))?;
let command_builder = crate::engine::commands::CommandBuilder::new();
let mut context = HashMap::new();
if let Some(serde_json::Value::Object(map)) = &playbook.workload {
for (k, v) in map {
context.insert(k.clone(), v.clone());
}
}
for (k, v) in payload {
context.insert(k.clone(), v.clone());
}
context.insert(
"workload".to_string(),
serde_json::to_value(&context).unwrap_or_default(),
);
if let Some(loop_cfg) = start_step.r#loop.as_ref() {
let renderer = crate::template::jinja::TemplateRenderer::new();
let raw_value = renderer.render_to_value(loop_cfg.in_expr.as_deref().unwrap_or(""), &context)?;
let items: Vec<serde_json::Value> = match raw_value {
serde_json::Value::Array(arr) => arr,
other => {
let type_name = match &other {
serde_json::Value::Null => "null",
serde_json::Value::Bool(_) => "bool",
serde_json::Value::Number(_) => "number",
serde_json::Value::String(_) => "string",
serde_json::Value::Object(_) => "object",
serde_json::Value::Array(_) => unreachable!(),
};
return Err(AppError::Validation(format!(
"start step loop.in must resolve to a JSON array, got: {}",
type_name
)));
}
};
let total = items.len();
let is_parallel = loop_cfg
.spec
.as_ref()
.map(|s| s.mode == crate::playbook::types::LoopMode::Parallel)
.unwrap_or(false);
info!(
execution_id,
total,
iterator = %loop_cfg.iterator,
mode = if is_parallel { "parallel" } else { "sequential" },
"Fanning out {} iterations for start step (iterator='{}', mode={})",
total,
loop_cfg.iterator,
if is_parallel { "parallel" } else { "sequential" },
);
let enter_event_id = state.snowflake.generate()?;
let enter_result = serde_json::json!({
"status": "ENTERED",
"context": {
"iterations_expected": total,
"iterator_var": loop_cfg.iterator,
},
});
sqlx::query(
r#"
INSERT INTO noetl.event (
event_id, execution_id, catalog_id, event_type,
node_id, node_name, status, result, meta, created_at, parent_event_id
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
"#,
)
.bind(enter_event_id)
.bind(execution_id)
.bind(catalog_id)
.bind("step.enter")
.bind(&start_step.step)
.bind(&start_step.step)
.bind("ENTERED")
.bind(&enter_result)
.bind(serde_json::json!({"emitted_by": "execute_handler"}))
.bind(chrono::Utc::now())
.bind(parent_event_id)
.execute(state.pools.pool_for(execution_id))
.await?;
let dispatch_count = if is_parallel { total } else { 1 };
for (idx, item) in items.into_iter().take(dispatch_count).enumerate() {
let iter_meta = crate::engine::commands::IteratorMetadata {
parent_execution_id: execution_id,
iterator_step: start_step.step.clone(),
item_var: loop_cfg.iterator.clone(),
item,
index: idx,
total,
};
let command = command_builder.build_iteration_command(
0,
execution_id,
catalog_id,
parent_event_id,
start_step,
&context,
iter_meta,
)?;
let cmd_render_ctx = command.context.clone().unwrap_or_default();
persist_engine_command(
state,
execution_id,
catalog_id,
parent_event_id,
start_step,
&command,
&cmd_render_ctx,
playbook,
routing,
)
.await?;
}
return Ok(total as i32);
}
let command = command_builder.build_command(
0, execution_id,
catalog_id,
parent_event_id,
start_step,
&context,
None,
)?;
let cmd_render_ctx = command.context.clone().unwrap_or_default();
persist_engine_command(
state,
execution_id,
catalog_id,
parent_event_id,
start_step,
&command,
&cmd_render_ctx,
playbook,
routing,
)
.await?;
Ok(1)
}
#[allow(clippy::too_many_arguments)]
async fn insert_command_row(
state: &AppState,
execution_id: i64,
event_id: i64,
catalog_id: i64,
parent_event_id: i64,
step_name: &str,
tool_kind: &str,
context: &serde_json::Value,
meta: &serde_json::Value,
) -> AppResult<()> {
let command_id = state.snowflake.generate()?;
sqlx::query(
r#"
INSERT INTO noetl.command (
command_id, event_id, execution_id, catalog_id,
step_name, tool_kind, status, attempt,
context, meta, latest_event_id
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11
)
"#,
)
.bind(command_id)
.bind(event_id)
.bind(execution_id)
.bind(catalog_id)
.bind(step_name)
.bind(tool_kind)
.bind("PENDING")
.bind(1_i32)
.bind(context)
.bind(meta)
.bind(parent_event_id)
.execute(state.pools.pool_for(execution_id))
.await?;
Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn publish_command_notification(
state: &AppState,
execution_id: i64,
event_id: i64,
command_id: &str,
step: &str,
_tool_kind: &str,
playbook: &crate::playbook::types::Playbook,
routing: &CommandRouting,
) -> AppResult<()> {
let Some(nats_client) = state.nats.as_ref() else {
tracing::warn!(
execution_id,
event_id,
"NATS not configured; command notification skipped — worker won't claim this command"
);
return Ok(());
};
let pool_segment = match routing.pool.as_deref() {
Some(p) if !p.is_empty() => p,
_ => match playbook.metadata.path.as_deref() {
Some(p) if p.starts_with("system/") => "system",
Some(p) if p.starts_with("subscription/") => "subscription",
_ => "shared",
},
};
let subject = format!("noetl.commands.{}.{}", pool_segment, execution_id);
let server_url = state
.config
.public_server_url
.clone()
.unwrap_or_else(|| "http://localhost:8082".to_string());
let mut notification = serde_json::json!({
"execution_id": execution_id,
"event_id": event_id,
"command_id": command_id,
"step": step,
"server_url": server_url,
});
if let Some(trace) = routing.trace.as_ref() {
if let serde_json::Value::Object(ref mut m) = notification {
m.insert("trace".to_string(), trace.clone());
}
}
let payload = serde_json::to_vec(¬ification)
.map_err(|e| AppError::Internal(format!("Serialize command notification: {e}")))?;
let js = async_nats::jetstream::new((**nats_client).clone());
js.publish(subject.clone(), payload.into())
.await
.map_err(|e| AppError::Internal(format!("NATS publish failed: {e}")))?
.await
.map_err(|e| AppError::Internal(format!("NATS publish ack failed: {e}")))?;
tracing::info!(
execution_id,
event_id,
%subject,
command_id = %command_id,
"Published command notification to NATS"
);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::engine::commands::{CommandBuilder, IteratorMetadata};
use crate::playbook::types::{Loop, Step, ToolDefinition, ToolKind, ToolSpec};
use crate::template::jinja::TemplateRenderer;
fn make_python_step(name: &str, loop_cfg: Option<Loop>) -> Step {
Step {
step: name.to_string(),
desc: None,
spec: None,
when: None,
args: None,
vars: None,
set_vars: None,
r#loop: loop_cfg,
tool: ToolDefinition::Single(Box::new(ToolSpec {
kind: ToolKind::Python,
auth: None,
libs: None,
args: None,
code: Some("num = input_data.get('num'); return {'number': num}".to_string()),
url: None,
method: None,
query: None,
command: None,
connection: None,
params: None,
headers: None,
eval: None,
output_select: None,
extra: HashMap::new(),
})),
next: None,
}
}
fn run_initial_fanout(
step: &Step,
context: &HashMap<String, serde_json::Value>,
) -> Result<Vec<crate::engine::commands::Command>, AppError> {
let command_builder = CommandBuilder::new();
let execution_id = 1_i64;
let catalog_id = 2_i64;
let parent_event_id = 0_i64;
if let Some(loop_cfg) = step.r#loop.as_ref() {
let renderer = TemplateRenderer::new();
let raw_value = renderer
.render_to_value(loop_cfg.in_expr.as_deref().unwrap_or(""), context)
.map_err(|e| AppError::Internal(e.to_string()))?;
let items: Vec<serde_json::Value> = match raw_value {
serde_json::Value::Array(arr) => arr,
other => {
let type_name = match &other {
serde_json::Value::Null => "null",
serde_json::Value::Bool(_) => "bool",
serde_json::Value::Number(_) => "number",
serde_json::Value::String(_) => "string",
serde_json::Value::Object(_) => "object",
serde_json::Value::Array(_) => unreachable!(),
};
return Err(AppError::Validation(format!(
"start step loop.in must resolve to a JSON array, got: {}",
type_name
)));
}
};
let total = items.len();
let mut commands = Vec::with_capacity(total);
for (idx, item) in items.into_iter().enumerate() {
let iter_meta = IteratorMetadata {
parent_execution_id: execution_id,
iterator_step: step.step.clone(),
item_var: loop_cfg.iterator.clone(),
item,
index: idx,
total,
};
let cmd = command_builder
.build_iteration_command(
0,
execution_id,
catalog_id,
parent_event_id,
step,
context,
iter_meta,
)
.map_err(|e| AppError::Internal(e.to_string()))?;
commands.push(cmd);
}
return Ok(commands);
}
let cmd = command_builder
.build_command(
0,
execution_id,
catalog_id,
parent_event_id,
step,
context,
None,
)
.map_err(|e| AppError::Internal(e.to_string()))?;
Ok(vec![cmd])
}
#[test]
fn test_generate_initial_commands_fans_out_when_start_has_loop() {
let loop_cfg = Loop {
in_expr: Some("{{ items }}".to_string()),
cursor: None,
iterator: "item".to_string(),
spec: None,
};
let step = make_python_step("start", Some(loop_cfg));
let mut context = HashMap::new();
context.insert("items".to_string(), serde_json::json!([1, 2, 3]));
let commands = run_initial_fanout(&step, &context).expect("should fan out");
assert_eq!(commands.len(), 3, "expected 3 commands for 3-element list");
for (expected_idx, cmd) in commands.iter().enumerate() {
let iter = cmd.iterator.as_ref().expect("iterator metadata present");
assert_eq!(
iter.item_var, "item",
"item_var must be the declared iterator name"
);
assert_eq!(
iter.index, expected_idx,
"index must match enumeration order"
);
assert_eq!(iter.total, 3, "total must be the collection length");
}
}
#[test]
fn test_generate_initial_commands_single_command_when_no_loop() {
let step = make_python_step("start", None);
let context = HashMap::new();
let commands = run_initial_fanout(&step, &context).expect("should produce one command");
assert_eq!(
commands.len(),
1,
"expected exactly 1 command for non-loop start step"
);
assert!(
commands[0].iterator.is_none(),
"non-loop command must not carry iterator metadata"
);
}
#[test]
fn test_generate_initial_commands_rejects_non_array_loop_in() {
let loop_cfg = Loop {
in_expr: Some("{{ count }}".to_string()),
cursor: None,
iterator: "item".to_string(),
spec: None,
};
let step = make_python_step("start", Some(loop_cfg));
let mut context = HashMap::new();
context.insert("count".to_string(), serde_json::json!(42));
let err =
run_initial_fanout(&step, &context).expect_err("scalar loop.in should return Err");
match err {
AppError::Validation(msg) => {
assert!(
msg.contains("start step loop.in must resolve to a JSON array"),
"unexpected validation message: {msg}"
);
assert!(
msg.contains("number"),
"message should name the actual type: {msg}"
);
}
other => panic!("expected Validation error, got: {other:?}"),
}
}
#[test]
fn test_execute_request_validation() {
let request = ExecuteRequest {
path: None,
catalog_id: None,
payload: HashMap::new(),
parent_execution_id: None,
execution_pool: None,
trace: None,
dedup: None,
};
assert!(request.validate().is_err());
let request = ExecuteRequest {
path: Some("test/playbook".to_string()),
catalog_id: None,
payload: HashMap::new(),
parent_execution_id: None,
execution_pool: None,
trace: None,
dedup: None,
};
assert!(request.validate().is_ok());
let request = ExecuteRequest {
path: None,
catalog_id: Some(12345),
payload: HashMap::new(),
parent_execution_id: None,
execution_pool: None,
trace: None,
dedup: None,
};
assert!(request.validate().is_ok());
}
#[test]
fn test_execute_response_serialization() {
let response = ExecuteResponse {
execution_id: "12345".to_string(),
status: "started".to_string(),
commands_generated: 1,
};
let json = serde_json::to_string(&response).unwrap();
assert!(json.contains("12345"));
assert!(json.contains("started"));
}
#[test]
fn test_dedup_spec_window_default() {
let d: DedupSpec = serde_json::from_str(r#"{"key":"abc"}"#).unwrap();
assert_eq!(d.key, "abc");
assert_eq!(
d.window_secs,
crate::db::queries::subscription_dedup::DEFAULT_WINDOW_SECS
);
let d: DedupSpec = serde_json::from_str(r#"{"key":"abc","window_secs":42}"#).unwrap();
assert_eq!(d.window_secs, 42);
}
#[test]
fn test_execute_request_dedup_optional() {
let r: ExecuteRequest = serde_json::from_str(r#"{"path":"p"}"#).unwrap();
assert!(r.dedup.is_none());
let r: ExecuteRequest = serde_json::from_str(
r#"{"path":"p","parent_execution_id":7,"dedup":{"key":"k1","window_secs":60}}"#,
)
.unwrap();
let d = r.dedup.expect("dedup present");
assert_eq!(d.key, "k1");
assert_eq!(d.window_secs, 60);
}
#[test]
fn test_batch_request_preserves_per_item_routing() {
let body = r#"{
"executions": [
{"path":"domain/a","execution_pool":"iot","payload":{"x":1}},
{"path":"domain/b","parent_execution_id":99,"dedup":{"key":"k"}},
{"catalog_id":5,"trace":{"traceparent":"00-abc-def-01"}}
]
}"#;
let req: BatchExecuteRequest = serde_json::from_str(body).unwrap();
assert_eq!(req.executions.len(), 3);
assert_eq!(req.executions[0].path.as_deref(), Some("domain/a"));
assert_eq!(req.executions[0].execution_pool.as_deref(), Some("iot"));
assert_eq!(req.executions[1].parent_execution_id, Some(99));
assert_eq!(req.executions[1].dedup.as_ref().unwrap().key, "k");
assert_eq!(req.executions[2].catalog_id, Some(5));
assert!(req.executions[2].trace.is_some());
}
#[test]
fn test_batch_response_serialization() {
let resp = BatchExecuteResponse {
count: 2,
started: 1,
duplicates: 0,
failed: 1,
results: vec![
BatchItemResult {
index: 0,
status: "started".to_string(),
execution_id: Some("777".to_string()),
commands_generated: 1,
error: None,
},
BatchItemResult {
index: 1,
status: "error".to_string(),
execution_id: None,
commands_generated: 0,
error: Some("Playbook not found: nope".to_string()),
},
],
};
let json = serde_json::to_string(&resp).unwrap();
assert!(json.contains("\"count\":2"));
assert!(json.contains("\"started\":1"));
assert!(json.contains("\"failed\":1"));
assert!(json.contains("777"));
assert!(json.contains("Playbook not found"));
let v: serde_json::Value = serde_json::from_str(&json).unwrap();
assert!(v["results"][0].get("error").is_none());
assert!(v["results"][1].get("execution_id").is_none());
}
#[test]
fn test_execute_outcome_into_response() {
let outcome = ExecuteOutcome {
execution_id: 42,
status: "duplicate",
commands_generated: 0,
};
let resp = outcome.into_response();
assert_eq!(resp.execution_id, "42");
assert_eq!(resp.status, "duplicate");
assert_eq!(resp.commands_generated, 0);
}
}