use std::collections::HashMap;
use std::thread;
use std::time::Duration;
use crate::dsl::{GateNode, GateOptions, OnFailAction, OnTimeout, QUALITY_GATE_TYPE};
use crate::engine::{emit_event, restore_step, should_skip, ExecutionState};
use crate::engine_error::{EngineError, Result};
use crate::events::EngineEvent;
use crate::status::{WorkflowRunStatus, WorkflowStepStatus};
use crate::traits::persistence::{GateApprovalState, StepUpdate};
fn resume_run_status(state: &ExecutionState, gate_name: &str, context: &str) {
if let Err(e) = state.persistence.update_run_status(
&state.workflow_run_id,
WorkflowRunStatus::Running,
None,
None,
) {
tracing::warn!("Gate '{gate_name}': failed to update run status {context}: {e}");
}
}
pub fn execute_gate(state: &mut ExecutionState, node: &GateNode, iteration: u32) -> Result<()> {
let pos = state.position;
state.position += 1;
if should_skip(state, &node.name, iteration) {
tracing::info!("Skipping completed gate '{}'", node.name);
restore_step(state, &node.name, iteration);
return Ok(());
}
if node.gate_type == QUALITY_GATE_TYPE {
return execute_quality_gate(state, node, pos, iteration);
}
if state.exec_config.dry_run {
tracing::info!("gate '{}': dry-run auto-approved", node.name);
super::insert_step_with_status(
state,
&node.name,
"reviewer",
pos,
iteration,
None,
WorkflowStepStatus::Completed,
Some("dry-run: auto-approved".to_string()),
)?;
return Ok(());
}
let step_id = super::insert_step_with_status(
state,
&node.name,
"gate",
pos,
iteration,
None,
WorkflowStepStatus::Waiting,
None,
)?;
emit_event(
state,
EngineEvent::GateWaiting {
gate_name: node.name.clone(),
},
);
let _resolved_options: HashMap<String, String> = if let Some(ref gate_opts) = node.options {
match gate_opts {
GateOptions::Static(map) => map.clone(),
GateOptions::StepRef(dotted) => {
let dot = dotted.find('.').ok_or_else(|| {
EngineError::Workflow(format!(
"Gate '{}': options StepRef '{dotted}' must be in 'step.field' format",
node.name
))
})?;
let step_key = &dotted[..dot];
let field_key = &dotted[dot + 1..];
let result = state.step_results.get(step_key).ok_or_else(|| {
EngineError::Workflow(format!(
"Gate '{}': options StepRef references step '{step_key}' which has no result yet",
node.name
))
})?;
let json_str = result.structured_output.as_deref().ok_or_else(|| {
EngineError::Workflow(format!(
"Gate '{}': step '{step_key}' has no structured_output to extract field '{field_key}' from",
node.name
))
})?;
let val: serde_json::Value = serde_json::from_str(json_str).map_err(|e| {
EngineError::Workflow(format!(
"Gate '{}': failed to parse structured_output of step '{step_key}': {e}",
node.name
))
})?;
let obj = val.get(field_key).and_then(|v| v.as_object()).ok_or_else(|| {
EngineError::Workflow(format!(
"Gate '{}': field '{field_key}' in step '{step_key}' structured_output is not a JSON object",
node.name
))
})?;
obj.iter()
.filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
.collect()
}
}
} else {
HashMap::new()
};
tracing::info!("Gate '{}' waiting (type = {})", node.name, node.gate_type);
let start = std::time::Instant::now();
loop {
if start.elapsed() > Duration::from_secs(node.timeout_secs) {
return handle_gate_timeout(state, &step_id, node);
}
match state.persistence.get_gate_approval(&step_id) {
Ok(GateApprovalState::Approved {
feedback,
selections,
}) => {
tracing::info!("Gate '{}' approved", node.name);
if let Some(ref fb) = feedback {
state.last_gate_feedback = Some(fb.clone());
}
if let Some(sel) = selections {
if !sel.is_empty() {
state.last_gate_feedback = Some(sel.join(", "));
}
}
resume_run_status(state, &node.name, "after approval");
emit_event(
state,
EngineEvent::GateResolved {
gate_name: node.name.clone(),
approved: true,
},
);
return Ok(());
}
Ok(GateApprovalState::Rejected { feedback }) => {
tracing::warn!("Gate '{}' rejected", node.name);
state.all_succeeded = false;
resume_run_status(state, &node.name, "after rejection");
emit_event(
state,
EngineEvent::GateResolved {
gate_name: node.name.clone(),
approved: false,
},
);
let reason = feedback.unwrap_or_else(|| format!("Gate '{}' rejected", node.name));
return Err(EngineError::Workflow(reason));
}
Ok(GateApprovalState::Pending) => {
thread::sleep(state.exec_config.poll_interval);
}
Err(e) => {
tracing::warn!("Gate '{}': error checking approval state: {e}", node.name);
thread::sleep(state.exec_config.poll_interval);
}
}
match state.persistence.is_run_cancelled(&state.workflow_run_id) {
Ok(true) => {
state
.cancellation
.cancel(crate::cancellation_reason::CancellationReason::UserRequested(None));
return Err(EngineError::Cancelled(
crate::cancellation_reason::CancellationReason::UserRequested(None),
));
}
Ok(false) => {}
Err(e) => {
tracing::warn!(
"Database error during cancellation check for gate '{}': {}",
node.name,
e
);
}
}
}
}
pub fn execute_quality_gate(
state: &mut ExecutionState,
node: &GateNode,
pos: i64,
iteration: u32,
) -> Result<()> {
let qg = node.quality_gate.as_ref().ok_or_else(|| {
EngineError::Workflow(format!(
"Quality gate '{}' is missing required quality_gate configuration (source, threshold)",
node.name
))
})?;
let source = qg.source.as_str();
let threshold = qg.threshold;
let on_fail_action = qg.on_fail_action.clone();
let step_id = super::insert_step_record(state, &node.name, "gate", pos, iteration, None)?;
let generation = state.expect_lease_generation();
let set_step_status = |status: WorkflowStepStatus, context: &str| -> Result<()> {
state.persistence.update_step(
&step_id,
StepUpdate {
generation,
status,
child_run_id: None,
result_text: Some(context.to_string()),
context_out: None,
markers_out: None,
retry_count: None,
structured_output: None,
step_error: None,
},
)
};
let (confidence, degradation_reason): (u32, Option<String>) = match state
.step_results
.get(source)
{
Some(result) => {
if let Some(ref json_str) = result.structured_output {
match serde_json::from_str::<serde_json::Value>(json_str) {
Ok(val) => {
if let Some(c) = val.get("confidence").and_then(|v| v.as_u64()) {
(c.min(100) as u32, None)
} else if let Some(f) = val.get("confidence").and_then(|v| v.as_f64()) {
((f as u64).min(100) as u32, None)
} else {
let reason = format!(
"'confidence' key missing or not a number in structured output from '{}'",
source
);
tracing::warn!("quality_gate '{}': {}", node.name, reason);
(0, Some(reason))
}
}
Err(e) => {
let reason =
format!("failed to parse structured output from '{}': {}", source, e);
tracing::warn!("quality_gate '{}': {}", node.name, reason);
(0, Some(reason))
}
}
} else {
let reason = format!("source step '{}' has no structured output", source);
tracing::warn!("quality_gate '{}': {}", node.name, reason);
(0, Some(reason))
}
}
None => {
let msg = format!(
"Quality gate '{}': source step '{}' not found in step results",
node.name, source
);
set_step_status(WorkflowStepStatus::Failed, &msg)?;
return Err(EngineError::Workflow(msg));
}
};
let passed = confidence >= threshold;
let mut context = format!(
"quality_gate: confidence={}, threshold={}, result={}",
confidence,
threshold,
if passed { "pass" } else { "fail" }
);
if let Some(ref reason) = degradation_reason {
context.push_str(&format!(" (confidence defaulted to 0: {})", reason));
}
if passed {
tracing::info!(
"quality_gate '{}': passed (confidence {} >= threshold {})",
node.name,
confidence,
threshold
);
set_step_status(WorkflowStepStatus::Completed, &context)?;
} else {
tracing::warn!(
"quality_gate '{}': failed (confidence {} < threshold {})",
node.name,
confidence,
threshold
);
match on_fail_action {
OnFailAction::Fail => {
set_step_status(WorkflowStepStatus::Failed, &context)?;
return Err(EngineError::Workflow(format!(
"Quality gate '{}' failed: confidence {} is below threshold {}",
node.name, confidence, threshold
)));
}
OnFailAction::Continue => {
set_step_status(
WorkflowStepStatus::Completed,
&format!("{} (on_fail=continue, proceeding)", context),
)?;
}
}
}
Ok(())
}
pub fn handle_gate_timeout(
state: &mut ExecutionState,
step_id: &str,
node: &GateNode,
) -> Result<()> {
tracing::warn!("Gate '{}' timed out", node.name);
let generation = state.expect_lease_generation();
match node.on_timeout {
OnTimeout::Fail => {
state.persistence.update_step(
step_id,
StepUpdate {
generation,
status: WorkflowStepStatus::Failed,
child_run_id: None,
result_text: Some("gate timed out".to_string()),
context_out: None,
markers_out: None,
retry_count: None,
structured_output: None,
step_error: Some(format!("Gate '{}' timed out", node.name)),
},
)?;
state.all_succeeded = false;
resume_run_status(state, &node.name, "after timeout (fail)");
Err(EngineError::Workflow(format!(
"Gate '{}' timed out",
node.name
)))
}
OnTimeout::Continue => {
state.persistence.update_step(
step_id,
StepUpdate {
generation,
status: WorkflowStepStatus::TimedOut,
child_run_id: None,
result_text: Some("gate timed out (continuing)".to_string()),
context_out: None,
markers_out: None,
retry_count: None,
structured_output: None,
step_error: None,
},
)?;
resume_run_status(state, &node.name, "after timeout (continue)");
Ok(())
}
}
}