use std::collections::HashMap;
use std::time::Instant;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use crate::db::models::Event;
pub(crate) fn is_zero(v: &i32) -> bool {
*v == 0
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ExecutionState {
Initial,
InProgress,
Completed,
Failed,
Cancelled,
}
impl std::fmt::Display for ExecutionState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Initial => write!(f, "initial"),
Self::InProgress => write!(f, "in_progress"),
Self::Completed => write!(f, "completed"),
Self::Failed => write!(f, "failed"),
Self::Cancelled => write!(f, "cancelled"),
}
}
}
impl From<&str> for ExecutionState {
fn from(s: &str) -> Self {
match s.to_lowercase().as_str() {
"initial" | "pending" => Self::Initial,
"in_progress" | "running" => Self::InProgress,
"completed" | "success" => Self::Completed,
"failed" | "error" => Self::Failed,
"cancelled" | "canceled" => Self::Cancelled,
_ => Self::Initial,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum StepState {
Pending,
Entered,
CommandIssued,
CommandClaimed,
CommandStarted,
Completed,
Failed,
Skipped,
}
impl std::fmt::Display for StepState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Pending => write!(f, "pending"),
Self::Entered => write!(f, "entered"),
Self::CommandIssued => write!(f, "command_issued"),
Self::CommandClaimed => write!(f, "command_claimed"),
Self::CommandStarted => write!(f, "command_started"),
Self::Completed => write!(f, "completed"),
Self::Failed => write!(f, "failed"),
Self::Skipped => write!(f, "skipped"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StepInfo {
pub name: String,
pub state: StepState,
#[serde(skip_serializing_if = "Option::is_none")]
pub result: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub entered_at: Option<DateTime<Utc>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub completed_at: Option<DateTime<Utc>>,
pub attempt: i32,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub iterations_expected: Option<i32>,
#[serde(default, skip_serializing_if = "std::collections::HashSet::is_empty")]
pub iteration_command_ids: std::collections::HashSet<String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub iteration_results: Vec<serde_json::Value>,
#[serde(default, skip_serializing_if = "crate::engine::state::is_zero")]
pub iterations_dispatched: i32,
}
impl StepInfo {
pub fn new(name: &str) -> Self {
Self {
name: name.to_string(),
state: StepState::Pending,
result: None,
error: None,
entered_at: None,
completed_at: None,
attempt: 0,
iterations_expected: None,
iteration_command_ids: std::collections::HashSet::new(),
iteration_results: Vec::new(),
iterations_dispatched: 0,
}
}
pub fn is_iterator(&self) -> bool {
self.iterations_expected.is_some()
}
pub fn iterations_completed(&self) -> i32 {
self.iteration_command_ids.len() as i32
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkflowState {
pub execution_id: i64,
pub catalog_id: i64,
pub state: ExecutionState,
pub steps: HashMap<String, StepInfo>,
#[serde(skip_serializing_if = "Option::is_none")]
pub workload: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub path: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub version: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub started_at: Option<DateTime<Utc>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub completed_at: Option<DateTime<Utc>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub parent_execution_id: Option<i64>,
}
fn extract_command_id(event: &Event) -> Option<String> {
if let Some(meta) = &event.meta {
if let Some(s) = meta.get("command_id").and_then(|v| v.as_str()) {
return Some(s.to_string());
}
}
if let Some(result) = &event.result {
if let Some(s) = result
.get("context")
.and_then(|c| c.get("command_id"))
.and_then(|v| v.as_str())
{
return Some(s.to_string());
}
if let Some(s) = result
.get("data")
.and_then(|d| d.get("command_id"))
.and_then(|v| v.as_str())
{
return Some(s.to_string());
}
}
None
}
impl WorkflowState {
pub fn new(execution_id: i64, catalog_id: i64) -> Self {
Self {
execution_id,
catalog_id,
state: ExecutionState::Initial,
steps: HashMap::new(),
workload: None,
path: None,
version: None,
started_at: None,
completed_at: None,
parent_execution_id: None,
}
}
pub fn from_events(events: &[Event]) -> Option<Self> {
let start = Instant::now();
if events.is_empty() {
return None;
}
let first = &events[0];
let mut state = Self::new(first.execution_id, first.catalog_id);
for event in events {
state.apply_event(event);
}
let duration = start.elapsed();
let event_count = events.len();
tracing::info!(
target: "noetl.performance",
execution_id = %first.execution_id,
phase = "state_reconstruction",
event_count = %event_count,
step_count = %state.steps.len(),
duration_ms = %duration.as_millis(),
"State reconstructed from events"
);
if duration.as_millis() > 100 || event_count > 50 {
tracing::warn!(
target: "noetl.performance",
execution_id = %first.execution_id,
event_count = %event_count,
duration_ms = %duration.as_millis(),
"Slow state reconstruction detected - consider optimizing event loading"
);
}
Some(state)
}
pub fn apply_event(&mut self, event: &Event) {
match event.event_type.as_str() {
"playbook_started" => {
self.state = ExecutionState::InProgress;
self.started_at = Some(event.created_at);
self.parent_execution_id = event.parent_execution_id;
if let Some(context) = &event.context {
if let Some(workload) = context.get("workload") {
self.workload = Some(workload.clone());
}
if let Some(path) = context.get("path").and_then(|v| v.as_str()) {
self.path = Some(path.to_string());
}
if let Some(version) = context.get("version").and_then(|v| v.as_str()) {
self.version = Some(version.to_string());
}
}
}
"playbook_completed" | "playbook.completed" => {
self.state = ExecutionState::Completed;
self.completed_at = Some(event.created_at);
}
"playbook_failed" | "playbook.failed" => {
self.state = ExecutionState::Failed;
self.completed_at = Some(event.created_at);
}
"playbook.cancelled" => {
self.state = ExecutionState::Cancelled;
self.completed_at = Some(event.created_at);
}
"step.enter" | "step_enter" | "step_started" => {
if let Some(name) = &event.node_name {
let step = self
.steps
.entry(name.clone())
.or_insert_with(|| StepInfo::new(name));
step.state = StepState::Entered;
step.entered_at = Some(event.created_at);
let total = event
.result
.as_ref()
.and_then(|r| r.get("context"))
.and_then(|c| c.get("iterations_expected"))
.and_then(|v| v.as_i64())
.or_else(|| {
event
.context
.as_ref()
.and_then(|c| c.get("iterations_expected"))
.and_then(|v| v.as_i64())
});
if let Some(total) = total {
step.iterations_expected = Some(total as i32);
}
}
}
"step.skipped" | "step_skipped" => {
if let Some(name) = &event.node_name {
let step = self
.steps
.entry(name.clone())
.or_insert_with(|| StepInfo::new(name));
step.state = StepState::Skipped;
step.entered_at = Some(event.created_at);
step.completed_at = Some(event.created_at);
}
}
"command.issued" => {
if let Some(name) = &event.node_name {
let step = self
.steps
.entry(name.clone())
.or_insert_with(|| StepInfo::new(name));
step.state = StepState::CommandIssued;
if step.is_iterator() {
step.iterations_dispatched += 1;
}
}
}
"command.claimed" => {
if let Some(name) = &event.node_name {
let step = self
.steps
.entry(name.clone())
.or_insert_with(|| StepInfo::new(name));
step.state = StepState::CommandClaimed;
}
}
"command.started" | "action_started" => {
if let Some(name) = &event.node_name {
let step = self
.steps
.entry(name.clone())
.or_insert_with(|| StepInfo::new(name));
step.state = StepState::CommandStarted;
if let Some(attempt) = event.attempt {
step.attempt = attempt;
}
}
}
"command.completed" | "action_completed" | "step.exit" | "step_completed" => {
if let Some(name) = &event.node_name {
let step = self
.steps
.entry(name.clone())
.or_insert_with(|| StepInfo::new(name));
if let Some(expected) = step.iterations_expected {
let command_id = extract_command_id(event);
if let Some(cid) = command_id {
if step.iteration_command_ids.insert(cid) {
if let Some(result) = event.result.clone() {
step.iteration_results.push(result);
}
}
}
if step.iterations_completed() >= expected {
step.state = StepState::Completed;
step.completed_at = Some(event.created_at);
step.result =
Some(serde_json::Value::Array(step.iteration_results.clone()));
}
} else {
step.state = StepState::Completed;
step.completed_at = Some(event.created_at);
if step.result.is_none() {
step.result = event.result.clone();
}
}
}
}
"call.done" | "action_done" => {
if let Some(name) = &event.node_name {
let step = self
.steps
.entry(name.clone())
.or_insert_with(|| StepInfo::new(name));
if step.iterations_expected.is_none() {
if let Some(result) = event.result.clone() {
step.result = Some(result);
}
}
}
}
"command.failed" | "action_failed" | "step_failed" => {
if let Some(name) = &event.node_name {
let step = self
.steps
.entry(name.clone())
.or_insert_with(|| StepInfo::new(name));
step.state = StepState::Failed;
step.completed_at = Some(event.created_at);
if let Some(result) = &event.result {
let err_value =
result.get("error").and_then(|v| v.as_str()).or_else(|| {
result
.get("context")
.and_then(|c| c.get("error"))
.and_then(|v| v.as_str())
});
if let Some(error) = err_value {
step.error = Some(error.to_string());
}
}
}
}
_ => {}
}
}
pub fn get_step_result(&self, step_name: &str) -> Option<&serde_json::Value> {
self.steps.get(step_name).and_then(|s| s.result.as_ref())
}
pub fn get_all_results(&self) -> HashMap<String, serde_json::Value> {
self.steps
.iter()
.filter_map(|(name, info)| info.result.clone().map(|r| (name.clone(), r)))
.collect()
}
pub fn is_step_done(&self, step_name: &str) -> bool {
self.steps
.get(step_name)
.map(|s| {
matches!(
s.state,
StepState::Completed | StepState::Failed | StepState::Skipped
)
})
.unwrap_or(false)
}
pub fn is_step_completed(&self, step_name: &str) -> bool {
self.steps
.get(step_name)
.map(|s| matches!(s.state, StepState::Completed))
.unwrap_or(false)
}
pub fn is_step_failed(&self, step_name: &str) -> bool {
self.steps
.get(step_name)
.map(|s| matches!(s.state, StepState::Failed))
.unwrap_or(false)
}
pub fn completed_steps(&self) -> Vec<&str> {
self.steps
.iter()
.filter(|(_, info)| matches!(info.state, StepState::Completed))
.map(|(name, _)| name.as_str())
.collect()
}
pub fn running_steps(&self) -> Vec<&str> {
self.steps
.iter()
.filter(|(_, info)| {
matches!(
info.state,
StepState::Entered
| StepState::CommandIssued
| StepState::CommandClaimed
| StepState::CommandStarted
)
})
.map(|(name, _)| name.as_str())
.collect()
}
pub fn has_running_steps(&self) -> bool {
!self.running_steps().is_empty()
}
pub fn build_context(&self) -> serde_json::Value {
let mut context = serde_json::Map::new();
if let Some(serde_json::Value::Object(wl)) = &self.workload {
for (k, v) in wl {
context.insert(k.clone(), v.clone());
}
context.insert(
"workload".to_string(),
serde_json::Value::Object(wl.clone()),
);
}
let mut steps = serde_json::Map::new();
for (name, info) in &self.steps {
if let Some(result) = &info.result {
steps.insert(name.clone(), result.clone());
if let Some(user_data) = extract_user_data(result) {
let with_data = match &user_data {
serde_json::Value::Object(map) if !map.contains_key("data") => {
let mut m = map.clone();
m.insert("data".to_string(), user_data.clone());
serde_json::Value::Object(m)
}
_ => user_data,
};
context.insert(name.clone(), with_data);
}
}
}
context.insert("steps".to_string(), serde_json::Value::Object(steps));
context.insert(
"execution_id".to_string(),
serde_json::json!(self.execution_id.to_string()),
);
context.insert(
"catalog_id".to_string(),
serde_json::json!(self.catalog_id.to_string()),
);
if let Some(path) = &self.path {
context.insert("path".to_string(), serde_json::json!(path));
}
if let Some(version) = &self.version {
context.insert("version".to_string(), serde_json::json!(version));
}
serde_json::Value::Object(context)
}
}
pub fn apply_set_mutations(
variables: &mut HashMap<String, serde_json::Value>,
mutations: &HashMap<String, serde_json::Value>,
) {
for (key, value) in mutations {
if let Some((scope, bare)) = key.split_once('.') {
if matches!(scope, "ctx" | "iter" | "step") {
variables.insert(bare.to_string(), value.clone());
continue;
}
}
variables.insert(key.clone(), value.clone());
}
}
pub(crate) fn extract_user_data(result: &serde_json::Value) -> Option<serde_json::Value> {
if result.is_null() {
return None;
}
let inner = result
.get("context")
.and_then(|v| v.get("result"))
.and_then(|v| v.get("context"))
.and_then(|v| v.get("data"));
if let Some(data) = inner {
return Some(flatten_task_sequence_data(data));
}
if let Some(ctx) = result.get("context") {
if let Some(data) = ctx.get("data") {
return Some(flatten_task_sequence_data(data));
}
return Some(ctx.clone());
}
Some(result.clone())
}
fn flatten_task_sequence_data(data: &serde_json::Value) -> serde_json::Value {
let map = match data.as_object() {
Some(m) if !m.is_empty() => m,
_ => return data.clone(),
};
let all_objects = map.values().all(|v| v.is_object());
if !all_objects {
return data.clone();
}
let mut merged = map.clone();
for value in map.values() {
if let serde_json::Value::Object(task_map) = value {
for (k, v) in task_map {
merged.insert(k.clone(), v.clone());
}
}
}
serde_json::Value::Object(merged)
}
#[cfg(test)]
mod tests {
use super::*;
fn make_event(event_type: &str, node_name: Option<&str>) -> Event {
Event {
id: 1,
execution_id: 12345,
catalog_id: 67890,
event_id: 1,
parent_event_id: None,
parent_execution_id: None,
event_type: event_type.to_string(),
node_id: None,
node_name: node_name.map(|s| s.to_string()),
node_type: None,
status: "".to_string(),
context: None,
meta: None,
result: None,
worker_id: None,
attempt: None,
created_at: Utc::now(),
}
}
#[test]
fn test_execution_state_display() {
assert_eq!(ExecutionState::Initial.to_string(), "initial");
assert_eq!(ExecutionState::InProgress.to_string(), "in_progress");
assert_eq!(ExecutionState::Completed.to_string(), "completed");
}
#[test]
fn test_execution_state_from_str() {
assert_eq!(ExecutionState::from("initial"), ExecutionState::Initial);
assert_eq!(ExecutionState::from("RUNNING"), ExecutionState::InProgress);
assert_eq!(ExecutionState::from("completed"), ExecutionState::Completed);
assert_eq!(ExecutionState::from("FAILED"), ExecutionState::Failed);
}
#[test]
fn test_workflow_state_from_events() {
let events = vec![
{
let mut e = make_event("playbook_started", None);
e.context = Some(serde_json::json!({
"workload": {"key": "value"},
"path": "test/playbook",
"version": "1"
}));
e
},
make_event("step.enter", Some("step1")),
make_event("command.issued", Some("step1")),
{
let mut e = make_event("command.completed", Some("step1"));
e.result = Some(serde_json::json!({"output": "success"}));
e
},
];
let state = WorkflowState::from_events(&events).unwrap();
assert_eq!(state.execution_id, 12345);
assert_eq!(state.state, ExecutionState::InProgress);
assert!(state.is_step_completed("step1"));
assert_eq!(
state.get_step_result("step1"),
Some(&serde_json::json!({"output": "success"}))
);
}
#[test]
fn test_workflow_state_build_context() {
let mut state = WorkflowState::new(12345, 67890);
state.workload = Some(serde_json::json!({"var1": "value1"}));
state.path = Some("test/path".to_string());
let mut step_info = StepInfo::new("step1");
step_info.result = Some(serde_json::json!({"output": "result1"}));
state.steps.insert("step1".to_string(), step_info);
let context = state.build_context();
assert_eq!(context.get("var1").and_then(|v| v.as_str()), Some("value1"));
assert_eq!(
context.get("path").and_then(|v| v.as_str()),
Some("test/path")
);
assert!(context.get("steps").is_some());
}
#[test]
fn test_step_state_transitions() {
let mut state = WorkflowState::new(1, 1);
state.apply_event(&make_event("step.enter", Some("step1")));
assert_eq!(state.steps.get("step1").unwrap().state, StepState::Entered);
state.apply_event(&make_event("command.issued", Some("step1")));
assert_eq!(
state.steps.get("step1").unwrap().state,
StepState::CommandIssued
);
state.apply_event(&make_event("command.completed", Some("step1")));
assert_eq!(
state.steps.get("step1").unwrap().state,
StepState::Completed
);
}
#[test]
fn step_skipped_event_marks_state_skipped() {
let mut state = WorkflowState::new(1, 1);
state.apply_event(&make_event("step.skipped", Some("guarded_step")));
let step = state
.steps
.get("guarded_step")
.expect("apply_event should record the skipped step");
assert_eq!(step.state, StepState::Skipped);
assert!(step.entered_at.is_some());
assert!(step.completed_at.is_some());
assert!(state.is_step_done("guarded_step"));
assert!(!state.is_step_completed("guarded_step"));
}
#[test]
fn step_skipped_underscore_alias_also_marks_skipped() {
let mut state = WorkflowState::new(1, 1);
state.apply_event(&make_event("step_skipped", Some("guarded_step")));
assert_eq!(
state.steps.get("guarded_step").unwrap().state,
StepState::Skipped
);
}
#[test]
fn test_iterator_step_aggregates_completion() {
let mut state = WorkflowState::new(1, 1);
let mut enter = make_event("step.enter", Some("looped"));
enter.context = Some(serde_json::json!({
"iterations_expected": 3,
"iterator_var": "item",
}));
state.apply_event(&enter);
let after_enter = state.steps.get("looped").unwrap();
assert_eq!(after_enter.state, StepState::Entered);
assert_eq!(after_enter.iterations_expected, Some(3));
assert_eq!(after_enter.iterations_completed(), 0);
for (idx, payload) in [(0, "a"), (1, "b"), (2, "c")] {
let mut ev = make_event("command.completed", Some("looped"));
ev.meta = Some(serde_json::json!({
"command_id": format!("e:looped:0:i{}", idx),
"iteration_index": idx,
"iteration_total": 3,
}));
ev.result = Some(serde_json::json!({ "value": payload }));
state.apply_event(&ev);
}
let info = state.steps.get("looped").unwrap();
assert_eq!(info.state, StepState::Completed);
assert_eq!(info.iterations_completed(), 3);
let agg = info.result.as_ref().unwrap();
assert_eq!(agg.as_array().map(|a| a.len()), Some(3));
let values: Vec<String> = agg
.as_array()
.unwrap()
.iter()
.map(|v| v.get("value").unwrap().as_str().unwrap().to_string())
.collect();
assert_eq!(values, vec!["a", "b", "c"]);
}
#[test]
fn test_iterator_step_dedupes_duplicate_command_completed() {
let mut state = WorkflowState::new(1, 1);
let mut enter = make_event("step.enter", Some("looped"));
enter.context = Some(serde_json::json!({
"iterations_expected": 2,
}));
state.apply_event(&enter);
for _ in 0..2 {
let mut ev = make_event("command.completed", Some("looped"));
ev.meta = Some(serde_json::json!({
"command_id": "e:looped:0:i0",
}));
ev.result = Some(serde_json::json!({"i": 0}));
state.apply_event(&ev);
}
let info = state.steps.get("looped").unwrap();
assert_eq!(info.iterations_completed(), 1);
assert_ne!(info.state, StepState::Completed);
let mut ev = make_event("command.completed", Some("looped"));
ev.meta = Some(serde_json::json!({
"command_id": "e:looped:0:i1",
}));
ev.result = Some(serde_json::json!({"i": 1}));
state.apply_event(&ev);
let info = state.steps.get("looped").unwrap();
assert_eq!(info.iterations_completed(), 2);
assert_eq!(info.state, StepState::Completed);
}
#[test]
fn test_iterator_step_partial_completion_stays_running() {
let mut state = WorkflowState::new(1, 1);
let mut enter = make_event("step.enter", Some("looped"));
enter.context = Some(serde_json::json!({
"iterations_expected": 3,
}));
state.apply_event(&enter);
for idx in 0..2 {
let mut ev = make_event("command.completed", Some("looped"));
ev.meta = Some(serde_json::json!({
"command_id": format!("e:looped:0:i{}", idx),
}));
ev.result = Some(serde_json::json!({"i": idx}));
state.apply_event(&ev);
}
let info = state.steps.get("looped").unwrap();
assert_ne!(info.state, StepState::Completed);
assert_eq!(info.iterations_completed(), 2);
assert!(!state.is_step_completed("looped"));
}
#[test]
fn test_iterator_partial_with_worker_step_exit_does_not_complete() {
let mut state = WorkflowState::new(1, 1);
let mut enter = make_event("step.enter", Some("looped"));
enter.result = Some(serde_json::json!({
"status": "ENTERED",
"context": {
"iterations_expected": 3,
"iterator_var": "item",
},
}));
state.apply_event(&enter);
for idx in 0..3 {
let mut ev = make_event("command.issued", Some("looped"));
ev.meta = Some(serde_json::json!({
"command_id": format!("exec:looped:e0:i{}", idx),
"iteration_index": idx,
"iteration_total": 3,
}));
state.apply_event(&ev);
}
let cid = "exec:looped:e0:i2".to_string();
let mut claimed = make_event("command.claimed", Some("looped"));
claimed.meta = Some(serde_json::json!({"command_id": cid}));
state.apply_event(&claimed);
let mut started = make_event("command.started", Some("looped"));
started.meta = Some(serde_json::json!({"command_id": cid}));
state.apply_event(&started);
let mut worker_enter = make_event("step.enter", Some("looped"));
worker_enter.context = Some(serde_json::json!({"status": "started"}));
state.apply_event(&worker_enter);
let call_done = make_event("call.done", Some("looped"));
state.apply_event(&call_done);
let mut step_exit = make_event("step.exit", Some("looped"));
step_exit.result = Some(serde_json::json!({
"status": "COMPLETED",
"context": { "command_id": cid.clone(), "status": "COMPLETED" }
}));
state.apply_event(&step_exit);
let mut completed = make_event("command.completed", Some("looped"));
completed.result = Some(serde_json::json!({
"status": "COMPLETED",
"context": { "command_id": cid.clone(), "worker_id": "w" }
}));
state.apply_event(&completed);
let info = state.steps.get("looped").unwrap();
assert_eq!(info.iterations_expected, Some(3));
assert_eq!(
info.iterations_completed(),
1,
"only ONE distinct command_id observed across step.exit + command.completed; \
iteration_command_ids = {:?}",
info.iteration_command_ids
);
assert_ne!(
info.state,
StepState::Completed,
"looped must NOT be Completed after only 1 of 3 iterations; state = {:?}",
info.state
);
assert!(!state.is_step_completed("looped"));
}
#[test]
fn test_plain_step_unaffected_by_iterator_logic() {
let mut state = WorkflowState::new(1, 1);
state.apply_event(&make_event("step.enter", Some("plain")));
let mut ev = make_event("command.completed", Some("plain"));
ev.result = Some(serde_json::json!({"ok": true}));
state.apply_event(&ev);
let info = state.steps.get("plain").unwrap();
assert_eq!(info.state, StepState::Completed);
assert_eq!(info.iterations_expected, None);
assert_eq!(info.iterations_completed(), 0);
assert_eq!(info.result, Some(serde_json::json!({"ok": true})));
}
#[test]
fn test_extract_user_data_unwraps_standard_envelope() {
let envelope = serde_json::json!({
"status": "COMPLETED",
"context": {
"result": {
"status": "success",
"context": {
"data": {"is_hot": true, "message": "hot"},
"status": "success",
"stdout": "",
"stderr": "",
},
},
"call_index": 0,
},
});
let data = extract_user_data(&envelope).expect("unwrap should succeed");
assert_eq!(data.get("is_hot").and_then(|v| v.as_bool()), Some(true));
assert_eq!(data.get("message").and_then(|v| v.as_str()), Some("hot"));
}
#[test]
fn test_extract_user_data_handles_flat_result() {
let flat = serde_json::json!({"is_hot": false});
let data = extract_user_data(&flat).expect("flat result preserved");
assert_eq!(data, flat);
}
#[test]
fn test_extract_user_data_null_returns_none() {
let null = serde_json::Value::Null;
assert!(extract_user_data(&null).is_none());
}
#[test]
fn test_build_context_exposes_step_data_at_top_level() {
let mut state = WorkflowState::new(1, 1);
state.workload = Some(serde_json::json!({"temp": 30}));
let mut info = StepInfo::new("eval_flag");
info.result = Some(serde_json::json!({
"status": "COMPLETED",
"context": {
"result": {
"status": "success",
"context": {
"data": {"is_hot": true, "message": "hot"},
},
},
},
}));
state.steps.insert("eval_flag".to_string(), info);
let ctx = state.build_context();
let eval_flag = ctx.get("eval_flag").expect("top-level step data exposed");
assert_eq!(
eval_flag.get("is_hot").and_then(|v| v.as_bool()),
Some(true)
);
let steps = ctx.get("steps").expect("steps namespace present");
assert!(
steps.get("eval_flag").is_some(),
"back-compat steps namespace populated"
);
assert_eq!(ctx.get("temp").and_then(|v| v.as_i64()), Some(30));
}
#[test]
fn test_build_context_exposes_step_data_accessor_for_flat_user_dict() {
let mut state = WorkflowState::new(1, 1);
let mut info = StepInfo::new("run_from_file");
info.result = Some(serde_json::json!({
"status": "COMPLETED",
"context": {
"result": {
"status": "success",
"context": {
"data": {
"status": "success",
"messages": ["Hello, NoETL! (#1)", "Hello, NoETL! (#2)", "Hello, NoETL! (#3)"],
"total_greetings": 3,
"script_source": "file"
}
}
}
}
}));
state.steps.insert("run_from_file".to_string(), info);
let ctx = state.build_context();
let step = ctx
.get("run_from_file")
.expect("top-level step entry exposed");
assert_eq!(
step.get("status").and_then(|v| v.as_str()),
Some("success"),
"flat `run_from_file.status` must still resolve"
);
assert_eq!(
step.get("total_greetings").and_then(|v| v.as_i64()),
Some(3),
"flat `run_from_file.total_greetings` must still resolve"
);
let data = step
.get("data")
.expect("`.data` accessor populated for flat user dict");
assert_eq!(
data.get("status").and_then(|v| v.as_str()),
Some("success"),
"`run_from_file.data.status` must resolve"
);
assert_eq!(
data.get("total_greetings").and_then(|v| v.as_i64()),
Some(3),
"`run_from_file.data.total_greetings` must resolve"
);
assert_eq!(
data.get("messages")
.and_then(|v| v.as_array())
.map(|a| a.len()),
Some(3),
"`run_from_file.data.messages` must resolve"
);
}
#[test]
fn test_build_context_data_accessor_does_not_clobber_existing_data_field() {
let mut state = WorkflowState::new(1, 1);
let mut info = StepInfo::new("multi_step");
info.result = Some(serde_json::json!({
"status": "COMPLETED",
"context": {
"result": {
"status": "success",
"context": {
"data": {
"init_action": {
"data": {"executed": true, "value": 42},
"status": "success"
}
}
}
}
}
}));
state.steps.insert("multi_step".to_string(), info);
let ctx = state.build_context();
let step = ctx.get("multi_step").expect("step entry exposed");
let labeled = step
.get("init_action")
.and_then(|v| v.get("data"))
.and_then(|v| v.get("executed"))
.and_then(|v| v.as_bool());
assert_eq!(
labeled,
Some(true),
"labeled task_sequence path stays intact"
);
let flat = step
.get("data")
.and_then(|v| v.get("executed"))
.and_then(|v| v.as_bool());
assert_eq!(
flat,
Some(true),
"flattened `multi_step.data.executed` must still resolve (#66 fix preserves task_sequence flatten)"
);
}
#[test]
fn test_extract_user_data_flattens_task_sequence_wrap() {
let envelope = serde_json::json!({
"status": "COMPLETED",
"context": {
"call_index": 0,
"command_id": "321180039523602432:start:321180039552962560",
"result": {
"status": "success",
"context": {
"data": {
"init_action": {
"data": {
"executed": true,
"input": {"test_value": "hello"}
},
"message": "Start step executed with action type",
"status": "success"
}
},
"duration_ms": 79,
"exit_code": 0,
"status": "success",
"stderr": "",
"stdout": ""
}
}
}
});
let unwrapped = extract_user_data(&envelope).expect("envelope unwraps");
assert_eq!(
unwrapped
.get("data")
.and_then(|v| v.get("executed"))
.and_then(|v| v.as_bool()),
Some(true),
"start.data.executed must resolve after flatten"
);
assert_eq!(
unwrapped.get("status").and_then(|v| v.as_str()),
Some("success"),
"start.status must resolve after flatten"
);
assert_eq!(
unwrapped
.get("init_action")
.and_then(|v| v.get("data"))
.and_then(|v| v.get("executed"))
.and_then(|v| v.as_bool()),
Some(true),
"start.init_action.data.executed must still resolve"
);
}
#[test]
fn test_apply_set_mutations_strips_ctx_prefix() {
let mut vars: HashMap<String, serde_json::Value> = HashMap::new();
let mutations = [("ctx.foo".to_string(), serde_json::json!(1))]
.into_iter()
.collect();
apply_set_mutations(&mut vars, &mutations);
assert_eq!(vars.get("foo"), Some(&serde_json::json!(1)));
assert!(
!vars.contains_key("ctx.foo"),
"scoped key must not be present"
);
}
#[test]
fn test_apply_set_mutations_strips_iter_prefix() {
let mut vars: HashMap<String, serde_json::Value> = HashMap::new();
let mutations = [("iter.bar".to_string(), serde_json::json!(2))]
.into_iter()
.collect();
apply_set_mutations(&mut vars, &mutations);
assert_eq!(vars.get("bar"), Some(&serde_json::json!(2)));
assert!(!vars.contains_key("iter.bar"));
}
#[test]
fn test_apply_set_mutations_strips_step_prefix() {
let mut vars: HashMap<String, serde_json::Value> = HashMap::new();
let mutations = [("step.baz".to_string(), serde_json::json!(3))]
.into_iter()
.collect();
apply_set_mutations(&mut vars, &mutations);
assert_eq!(vars.get("baz"), Some(&serde_json::json!(3)));
assert!(!vars.contains_key("step.baz"));
}
#[test]
fn test_apply_set_mutations_keeps_bare_keys() {
let mut vars: HashMap<String, serde_json::Value> = HashMap::new();
let mutations = [("qux".to_string(), serde_json::json!(4))]
.into_iter()
.collect();
apply_set_mutations(&mut vars, &mutations);
assert_eq!(vars.get("qux"), Some(&serde_json::json!(4)));
}
#[test]
fn test_apply_set_mutations_keeps_unknown_scope_dot_keys() {
let mut vars: HashMap<String, serde_json::Value> = HashMap::new();
let mutations = [("app.config".to_string(), serde_json::json!({"level": 5}))]
.into_iter()
.collect();
apply_set_mutations(&mut vars, &mutations);
assert_eq!(
vars.get("app.config"),
Some(&serde_json::json!({"level": 5}))
);
assert!(
!vars.contains_key("config"),
"bare key must NOT be present for unknown scope"
);
}
#[test]
fn test_apply_set_mutations_all_cases_together() {
let mut vars: HashMap<String, serde_json::Value> = HashMap::new();
let mutations: HashMap<String, serde_json::Value> = [
("ctx.foo".to_string(), serde_json::json!(1)),
("iter.bar".to_string(), serde_json::json!(2)),
("step.baz".to_string(), serde_json::json!(3)),
("qux".to_string(), serde_json::json!(4)),
("app.config".to_string(), serde_json::json!({"level": 5})),
]
.into_iter()
.collect();
apply_set_mutations(&mut vars, &mutations);
assert_eq!(vars.get("foo"), Some(&serde_json::json!(1)));
assert_eq!(vars.get("bar"), Some(&serde_json::json!(2)));
assert_eq!(vars.get("baz"), Some(&serde_json::json!(3)));
assert_eq!(vars.get("qux"), Some(&serde_json::json!(4)));
assert_eq!(
vars.get("app.config"),
Some(&serde_json::json!({"level": 5}))
);
assert!(!vars.contains_key("ctx.foo"));
assert!(!vars.contains_key("iter.bar"));
assert!(!vars.contains_key("step.baz"));
}
}