runkon-flow 0.2.0-alpha

Portable workflow execution engine — DSL, traits, and in-memory reference implementations
Documentation
use std::collections::HashMap;
use std::thread;
use std::time::Duration;

use crate::dsl::{GateNode, GateOptions, OnFailAction, OnTimeout, QUALITY_GATE_TYPE};
use crate::engine::{emit_event, restore_step, should_skip, ExecutionState};
use crate::engine_error::{EngineError, Result};
use crate::events::EngineEvent;
use crate::status::{WorkflowRunStatus, WorkflowStepStatus};
use crate::traits::persistence::{GateApprovalState, StepUpdate};

fn resume_run_status(state: &ExecutionState, gate_name: &str, context: &str) {
    if let Err(e) = state.persistence.update_run_status(
        &state.workflow_run_id,
        WorkflowRunStatus::Running,
        None,
        None,
    ) {
        tracing::warn!("Gate '{gate_name}': failed to update run status {context}: {e}");
    }
}

pub fn execute_gate(state: &mut ExecutionState, node: &GateNode, iteration: u32) -> Result<()> {
    let pos = state.position;
    state.position += 1;

    // Skip completed gates on resume — restore feedback for downstream steps
    if should_skip(state, &node.name, iteration) {
        tracing::info!("Skipping completed gate '{}'", node.name);
        restore_step(state, &node.name, iteration);
        return Ok(());
    }

    // Quality gates evaluate immediately — no blocking/waiting.
    if node.gate_type == QUALITY_GATE_TYPE {
        return execute_quality_gate(state, node, pos, iteration);
    }

    // Dry-run: auto-approve all gates
    if state.exec_config.dry_run {
        tracing::info!("gate '{}': dry-run auto-approved", node.name);
        super::insert_step_with_status(
            state,
            &node.name,
            "reviewer",
            pos,
            iteration,
            None,
            WorkflowStepStatus::Completed,
            Some("dry-run: auto-approved".to_string()),
        )?;
        return Ok(());
    }

    // Insert step and mark as waiting
    let step_id = super::insert_step_with_status(
        state,
        &node.name,
        "gate",
        pos,
        iteration,
        None,
        WorkflowStepStatus::Waiting,
        None,
    )?;

    emit_event(
        state,
        EngineEvent::GateWaiting {
            gate_name: node.name.clone(),
        },
    );

    // Resolve gate options (if any) — stored for future use by gate resolvers
    let _resolved_options: HashMap<String, String> = if let Some(ref gate_opts) = node.options {
        match gate_opts {
            GateOptions::Static(map) => map.clone(),
            GateOptions::StepRef(dotted) => {
                let dot = dotted.find('.').ok_or_else(|| {
                    EngineError::Workflow(format!(
                        "Gate '{}': options StepRef '{dotted}' must be in 'step.field' format",
                        node.name
                    ))
                })?;
                let step_key = &dotted[..dot];
                let field_key = &dotted[dot + 1..];
                let result = state.step_results.get(step_key).ok_or_else(|| {
                    EngineError::Workflow(format!(
                        "Gate '{}': options StepRef references step '{step_key}' which has no result yet",
                        node.name
                    ))
                })?;
                let json_str = result.structured_output.as_deref().ok_or_else(|| {
                    EngineError::Workflow(format!(
                        "Gate '{}': step '{step_key}' has no structured_output to extract field '{field_key}' from",
                        node.name
                    ))
                })?;
                let val: serde_json::Value = serde_json::from_str(json_str).map_err(|e| {
                    EngineError::Workflow(format!(
                        "Gate '{}': failed to parse structured_output of step '{step_key}': {e}",
                        node.name
                    ))
                })?;
                let obj = val.get(field_key).and_then(|v| v.as_object()).ok_or_else(|| {
                    EngineError::Workflow(format!(
                        "Gate '{}': field '{field_key}' in step '{step_key}' structured_output is not a JSON object",
                        node.name
                    ))
                })?;
                obj.iter()
                    .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
                    .collect()
            }
        }
    } else {
        HashMap::new()
    };

    tracing::info!("Gate '{}' waiting (type = {})", node.name, node.gate_type);

    // Poll/timeout loop — poll via persistence.get_gate_approval()
    let start = std::time::Instant::now();
    loop {
        if start.elapsed() > Duration::from_secs(node.timeout_secs) {
            return handle_gate_timeout(state, &step_id, node);
        }

        match state.persistence.get_gate_approval(&step_id) {
            Ok(GateApprovalState::Approved {
                feedback,
                selections,
            }) => {
                tracing::info!("Gate '{}' approved", node.name);
                if let Some(ref fb) = feedback {
                    state.last_gate_feedback = Some(fb.clone());
                }
                if let Some(sel) = selections {
                    if !sel.is_empty() {
                        // Store gate selection as feedback
                        state.last_gate_feedback = Some(sel.join(", "));
                    }
                }
                resume_run_status(state, &node.name, "after approval");
                emit_event(
                    state,
                    EngineEvent::GateResolved {
                        gate_name: node.name.clone(),
                        approved: true,
                    },
                );
                return Ok(());
            }
            Ok(GateApprovalState::Rejected { feedback }) => {
                tracing::warn!("Gate '{}' rejected", node.name);
                state.all_succeeded = false;
                resume_run_status(state, &node.name, "after rejection");
                emit_event(
                    state,
                    EngineEvent::GateResolved {
                        gate_name: node.name.clone(),
                        approved: false,
                    },
                );
                let reason = feedback.unwrap_or_else(|| format!("Gate '{}' rejected", node.name));
                return Err(EngineError::Workflow(reason));
            }
            Ok(GateApprovalState::Pending) => {
                thread::sleep(state.exec_config.poll_interval);
            }
            Err(e) => {
                tracing::warn!("Gate '{}': error checking approval state: {e}", node.name);
                thread::sleep(state.exec_config.poll_interval);
            }
        }

        // Check cancellation
        match state.persistence.is_run_cancelled(&state.workflow_run_id) {
            Ok(true) => {
                state
                    .cancellation
                    .cancel(crate::cancellation_reason::CancellationReason::UserRequested(None));
                return Err(EngineError::Cancelled(
                    crate::cancellation_reason::CancellationReason::UserRequested(None),
                ));
            }
            Ok(false) => {}
            Err(e) => {
                tracing::warn!(
                    "Database error during cancellation check for gate '{}': {}",
                    node.name,
                    e
                );
            }
        }
    }
}

/// Evaluate a quality gate by checking a prior step's structured output against a threshold.
pub fn execute_quality_gate(
    state: &mut ExecutionState,
    node: &GateNode,
    pos: i64,
    iteration: u32,
) -> Result<()> {
    let qg = node.quality_gate.as_ref().ok_or_else(|| {
        EngineError::Workflow(format!(
            "Quality gate '{}' is missing required quality_gate configuration (source, threshold)",
            node.name
        ))
    })?;
    let source = qg.source.as_str();
    let threshold = qg.threshold;
    let on_fail_action = qg.on_fail_action.clone();

    let step_id = super::insert_step_record(state, &node.name, "gate", pos, iteration, None)?;
    let generation = state.expect_lease_generation();

    let set_step_status = |status: WorkflowStepStatus, context: &str| -> Result<()> {
        state.persistence.update_step(
            &step_id,
            StepUpdate {
                generation,
                status,
                child_run_id: None,
                result_text: Some(context.to_string()),
                context_out: None,
                markers_out: None,
                retry_count: None,
                structured_output: None,
                step_error: None,
            },
        )
    };

    // Look up the source step's structured output
    let (confidence, degradation_reason): (u32, Option<String>) = match state
        .step_results
        .get(source)
    {
        Some(result) => {
            if let Some(ref json_str) = result.structured_output {
                match serde_json::from_str::<serde_json::Value>(json_str) {
                    Ok(val) => {
                        if let Some(c) = val.get("confidence").and_then(|v| v.as_u64()) {
                            (c.min(100) as u32, None)
                        } else if let Some(f) = val.get("confidence").and_then(|v| v.as_f64()) {
                            ((f as u64).min(100) as u32, None)
                        } else {
                            let reason = format!(
                                "'confidence' key missing or not a number in structured output from '{}'",
                                source
                            );
                            tracing::warn!("quality_gate '{}': {}", node.name, reason);
                            (0, Some(reason))
                        }
                    }
                    Err(e) => {
                        let reason =
                            format!("failed to parse structured output from '{}': {}", source, e);
                        tracing::warn!("quality_gate '{}': {}", node.name, reason);
                        (0, Some(reason))
                    }
                }
            } else {
                let reason = format!("source step '{}' has no structured output", source);
                tracing::warn!("quality_gate '{}': {}", node.name, reason);
                (0, Some(reason))
            }
        }
        None => {
            let msg = format!(
                "Quality gate '{}': source step '{}' not found in step results",
                node.name, source
            );
            set_step_status(WorkflowStepStatus::Failed, &msg)?;
            return Err(EngineError::Workflow(msg));
        }
    };

    let passed = confidence >= threshold;
    let mut context = format!(
        "quality_gate: confidence={}, threshold={}, result={}",
        confidence,
        threshold,
        if passed { "pass" } else { "fail" }
    );
    if let Some(ref reason) = degradation_reason {
        context.push_str(&format!(" (confidence defaulted to 0: {})", reason));
    }

    if passed {
        tracing::info!(
            "quality_gate '{}': passed (confidence {} >= threshold {})",
            node.name,
            confidence,
            threshold
        );
        set_step_status(WorkflowStepStatus::Completed, &context)?;
    } else {
        tracing::warn!(
            "quality_gate '{}': failed (confidence {} < threshold {})",
            node.name,
            confidence,
            threshold
        );
        match on_fail_action {
            OnFailAction::Fail => {
                set_step_status(WorkflowStepStatus::Failed, &context)?;
                return Err(EngineError::Workflow(format!(
                    "Quality gate '{}' failed: confidence {} is below threshold {}",
                    node.name, confidence, threshold
                )));
            }
            OnFailAction::Continue => {
                set_step_status(
                    WorkflowStepStatus::Completed,
                    &format!("{} (on_fail=continue, proceeding)", context),
                )?;
            }
        }
    }

    Ok(())
}

pub fn handle_gate_timeout(
    state: &mut ExecutionState,
    step_id: &str,
    node: &GateNode,
) -> Result<()> {
    tracing::warn!("Gate '{}' timed out", node.name);
    let generation = state.expect_lease_generation();
    match node.on_timeout {
        OnTimeout::Fail => {
            state.persistence.update_step(
                step_id,
                StepUpdate {
                    generation,
                    status: WorkflowStepStatus::Failed,
                    child_run_id: None,
                    result_text: Some("gate timed out".to_string()),
                    context_out: None,
                    markers_out: None,
                    retry_count: None,
                    structured_output: None,
                    step_error: Some(format!("Gate '{}' timed out", node.name)),
                },
            )?;
            state.all_succeeded = false;
            resume_run_status(state, &node.name, "after timeout (fail)");
            Err(EngineError::Workflow(format!(
                "Gate '{}' timed out",
                node.name
            )))
        }
        OnTimeout::Continue => {
            state.persistence.update_step(
                step_id,
                StepUpdate {
                    generation,
                    status: WorkflowStepStatus::TimedOut,
                    child_run_id: None,
                    result_text: Some("gate timed out (continuing)".to_string()),
                    context_out: None,
                    markers_out: None,
                    retry_count: None,
                    structured_output: None,
                    step_error: None,
                },
            )?;
            resume_run_status(state, &node.name, "after timeout (continue)");
            Ok(())
        }
    }
}