pub mod call;
pub mod call_workflow;
pub mod control_flow;
pub mod foreach;
pub mod gate;
pub mod parallel;
pub mod script;
use std::collections::HashMap;
use std::sync::Arc;
use crate::engine_error::EngineError;
#[inline]
#[track_caller]
pub(super) fn p_err(e: impl std::fmt::Display) -> EngineError {
let loc = std::panic::Location::caller();
EngineError::Persistence(format!("{}:{} — {e}", loc.file(), loc.line()))
}
pub(super) fn insert_step_record(
state: &crate::engine::ExecutionState,
step_name: &str,
role: &str,
pos: i64,
iteration: u32,
retry_count: Option<i64>,
) -> crate::engine_error::Result<String> {
use crate::traits::persistence::NewStep;
state
.persistence
.insert_step(NewStep {
workflow_run_id: state.workflow_run_id.clone(),
step_name: step_name.to_string(),
role: role.to_string(),
can_commit: false,
position: pos,
iteration: iteration as i64,
retry_count,
})
.map_err(p_err)
}
#[allow(clippy::too_many_arguments)]
pub(super) fn insert_step_with_status(
state: &crate::engine::ExecutionState,
step_name: &str,
role: &str,
pos: i64,
iteration: u32,
retry_count: Option<i64>,
status: crate::status::WorkflowStepStatus,
result_text: Option<String>,
) -> crate::engine_error::Result<String> {
use crate::traits::persistence::StepUpdate;
let step_id = insert_step_record(state, step_name, role, pos, iteration, retry_count)?;
let generation = state.expect_lease_generation();
state.persistence.update_step(
&step_id,
StepUpdate {
generation,
status,
child_run_id: None,
result_text,
context_out: None,
markers_out: None,
retry_count,
structured_output: None,
step_error: None,
},
)?;
Ok(step_id)
}
pub(super) fn begin_retry_attempt(
state: &mut crate::engine::ExecutionState,
step_name: &str,
role: &str,
pos: i64,
iteration: u32,
attempt: u32,
) -> crate::engine_error::Result<String> {
use crate::engine::emit_event;
use crate::events::EngineEvent;
if attempt > 0 {
emit_event(
state,
EngineEvent::StepRetrying {
step_name: step_name.to_string(),
attempt,
},
);
}
insert_step_record(state, step_name, role, pos, iteration, Some(attempt as i64))
}
pub(super) fn build_inputs_map(
state: &crate::engine::ExecutionState,
) -> Arc<HashMap<String, String>> {
let var_map = crate::prompt_builder::build_variable_map(state);
Arc::new(
var_map
.into_iter()
.map(|(k, v)| (k.to_string(), v))
.collect(),
)
}
pub(super) fn build_step_info(
state: &crate::engine::ExecutionState,
step_id: &str,
) -> crate::traits::action_executor::StepInfo {
crate::traits::action_executor::StepInfo {
step_id: step_id.to_string(),
step_timeout: state.exec_config.step_timeout,
}
}
#[allow(clippy::too_many_arguments)]
pub(super) fn persist_completed_step(
state: &crate::engine::ExecutionState,
step_id: &str,
child_run_id: Option<String>,
result_text: Option<String>,
context_out: Option<String>,
markers_out: Option<String>,
attempt: u32,
structured_output: Option<String>,
) -> crate::engine_error::Result<()> {
use crate::traits::persistence::StepUpdate;
let generation = state.expect_lease_generation();
state.persistence.update_step(
step_id,
StepUpdate::completed(
generation,
child_run_id,
result_text,
context_out,
markers_out,
attempt,
structured_output,
),
)
}
#[allow(clippy::too_many_arguments)]
pub(super) fn build_action_params(
name: &str,
inputs: Arc<HashMap<String, String>>,
snippets: Vec<String>,
dry_run: bool,
gate_feedback: Option<String>,
schema: Option<crate::output_schema::OutputSchema>,
retries_remaining: u32,
retry_error: Option<String>,
model: Option<String>,
as_identity: Option<String>,
plugin_dirs: Vec<String>,
max_turns: Option<u32>,
) -> crate::traits::action_executor::ActionParams {
let mut extensions = crate::extensions::Extensions::default();
if let Some(s) = schema {
extensions.insert(s);
}
if let Some(mt) = max_turns {
extensions.insert(crate::extensions::ClaudeActionParams {
max_turns: Some(mt),
});
}
crate::traits::action_executor::ActionParams {
name: name.to_string(),
inputs,
retries_remaining,
retry_error,
snippets,
dry_run,
gate_feedback,
extensions,
model,
as_identity,
plugin_dirs,
}
}
#[allow(clippy::too_many_arguments)]
pub(super) fn record_dispatch_success(
state: &mut crate::engine::ExecutionState,
step_id: &str,
step_key: &str,
agent_label: &str,
output: &crate::traits::action_executor::ActionOutput,
iteration: u32,
attempt: u32,
output_file: Option<String>,
) -> crate::engine_error::Result<()> {
let markers_json = crate::helpers::serialize_or_empty_array(
&output.markers,
&format!("agent '{agent_label}'"),
);
let context = output.context.clone().unwrap_or_default();
persist_completed_step(
state,
step_id,
output.child_run_id.clone(),
output.result_text.clone(),
Some(context.clone()),
Some(markers_json),
attempt,
output.structured_output.clone(),
)?;
crate::engine::record_step_success(
state,
step_key.to_string(),
crate::types::StepSuccess::from_action_output(
output,
agent_label.to_string(),
context,
iteration,
output_file,
),
);
Ok(())
}
pub(super) fn skip_if_already_completed(
state: &mut crate::engine::ExecutionState,
step_key: &str,
iteration: u32,
label: &str,
) -> bool {
use crate::engine::{restore_step, should_skip};
if should_skip(state, step_key, iteration) {
tracing::info!(
"Skipping completed step '{}' (iteration {})",
label,
iteration
);
restore_step(state, step_key, iteration);
true
} else {
false
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::extensions::ClaudeActionParams;
use crate::traits::action_executor::ActionParams;
fn make_inputs() -> Arc<HashMap<String, String>> {
Arc::new(HashMap::new())
}
fn make_test_params(max_turns: Option<u32>) -> ActionParams {
build_action_params(
"test",
make_inputs(),
vec![],
false,
None,
None,
0,
None,
None,
None,
vec![],
max_turns,
)
}
#[test]
fn build_action_params_inserts_claude_action_params_when_max_turns_some() {
let params = make_test_params(Some(50));
let cap = params
.extensions
.get::<ClaudeActionParams>()
.expect("ClaudeActionParams should be present");
assert_eq!(cap.max_turns, Some(50));
}
#[test]
fn build_action_params_no_claude_action_params_when_max_turns_none() {
let params = make_test_params(None);
assert!(params.extensions.get::<ClaudeActionParams>().is_none());
}
}