use std::collections::HashMap;
use crate::engine_error::EngineError;
use crate::status::{WorkflowRunStatus, WorkflowStepStatus};
use crate::types::{WorkflowRun, WorkflowRunStep};
pub use crate::traits::gate_approval_store::{
gate_approval_state_from_fields, GateApprovalState, GateApprovalStore,
};
pub use crate::types::FanOutItemRow;
pub struct NewRun {
pub workflow_name: String,
pub parent_run_id: String,
pub dry_run: bool,
pub trigger: String,
pub definition_snapshot: Option<String>,
pub parent_workflow_run_id: Option<String>,
}
pub struct NewStep {
pub workflow_run_id: String,
pub step_name: String,
pub role: String,
pub can_commit: bool,
pub position: i64,
pub iteration: i64,
pub retry_count: Option<i64>,
}
pub struct StepUpdate {
pub generation: i64,
pub status: WorkflowStepStatus,
pub child_run_id: Option<String>,
pub result_text: Option<String>,
pub context_out: Option<String>,
pub markers_out: Option<String>,
pub retry_count: Option<i64>,
pub structured_output: Option<String>,
pub step_error: Option<String>,
}
impl StepUpdate {
pub fn completed(
generation: i64,
child_run_id: Option<String>,
result_text: Option<String>,
context_out: Option<String>,
markers_out: Option<String>,
attempt: u32,
structured_output: Option<String>,
) -> Self {
Self {
generation,
status: WorkflowStepStatus::Completed,
child_run_id,
result_text,
context_out,
markers_out,
retry_count: Some(attempt as i64),
structured_output,
step_error: None,
}
}
pub fn failed(generation: i64, err_msg: impl Into<String>, attempt: u32) -> Self {
Self::failed_with_child(generation, err_msg, attempt, None)
}
pub fn failed_with_child(
generation: i64,
err_msg: impl Into<String>,
attempt: u32,
child_run_id: Option<String>,
) -> Self {
let err_msg = err_msg.into();
Self {
generation,
status: WorkflowStepStatus::Failed,
child_run_id,
result_text: Some(err_msg.clone()),
context_out: None,
markers_out: None,
retry_count: Some(attempt as i64),
structured_output: None,
step_error: Some(err_msg),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum FanOutItemStatus {
Pending,
Running,
Completed,
Failed,
Skipped,
}
impl FanOutItemStatus {
pub fn as_str(&self) -> &'static str {
match self {
Self::Pending => "pending",
Self::Running => "running",
Self::Completed => "completed",
Self::Failed => "failed",
Self::Skipped => "skipped",
}
}
}
impl TryFrom<&str> for FanOutItemStatus {
type Error = String;
fn try_from(s: &str) -> std::result::Result<Self, Self::Error> {
match s {
"pending" => Ok(Self::Pending),
"running" => Ok(Self::Running),
"completed" => Ok(Self::Completed),
"failed" => Ok(Self::Failed),
"skipped" => Ok(Self::Skipped),
other => Err(format!("unknown FanOutItemStatus: {other}")),
}
}
}
#[derive(Clone)]
pub enum FanOutItemUpdate {
Running { child_run_id: String },
Terminal { status: FanOutItemStatus },
}
pub trait WorkflowPersistence: GateApprovalStore + Send + Sync {
fn create_run(&self, new_run: NewRun) -> Result<WorkflowRun, EngineError>;
fn get_run(&self, run_id: &str) -> Result<Option<WorkflowRun>, EngineError>;
fn list_active_runs(
&self,
statuses: &[WorkflowRunStatus],
) -> Result<Vec<WorkflowRun>, EngineError>;
fn update_run_status(
&self,
run_id: &str,
status: WorkflowRunStatus,
result_summary: Option<&str>,
error: Option<&str>,
) -> Result<(), EngineError>;
fn insert_step(&self, new_step: NewStep) -> Result<String, EngineError>;
fn update_step(&self, step_id: &str, update: StepUpdate) -> Result<(), EngineError>;
fn get_steps(&self, run_id: &str) -> Result<Vec<WorkflowRunStep>, EngineError>;
fn insert_fan_out_item(
&self,
step_run_id: &str,
item_type: &str,
item_id: &str,
item_ref: &str,
context: &HashMap<String, String>,
) -> Result<String, EngineError>;
fn update_fan_out_item(
&self,
item_id: &str,
update: FanOutItemUpdate,
) -> Result<(), EngineError>;
fn batch_update_fan_out_items(
&self,
updates: &[(String, FanOutItemUpdate)],
) -> Result<(), EngineError> {
for (id, update) in updates {
self.update_fan_out_item(id, update.clone())?;
}
Ok(())
}
fn get_fan_out_items(
&self,
step_run_id: &str,
status_filter: Option<FanOutItemStatus>,
) -> Result<Vec<FanOutItemRow>, EngineError>;
fn acquire_lease(
&self,
run_id: &str,
token: &str,
ttl_seconds: i64,
) -> Result<Option<i64>, EngineError>;
fn is_run_cancelled(&self, run_id: &str) -> Result<bool, EngineError>;
}
#[cfg(test)]
mod tests {
use super::*;
use crate::status::WorkflowStepStatus;
#[test]
fn pending_when_status_is_waiting_and_no_approved_at() {
let state = gate_approval_state_from_fields(None, WorkflowStepStatus::Waiting, None, None);
assert!(matches!(state, GateApprovalState::Pending));
}
#[test]
fn approved_when_approved_at_is_set_regardless_of_status() {
let state = gate_approval_state_from_fields(
Some("2025-01-01T00:00:00Z"),
WorkflowStepStatus::Waiting,
Some("lgtm".into()),
None,
);
match state {
GateApprovalState::Approved { feedback, .. } => {
assert_eq!(feedback.as_deref(), Some("lgtm"));
}
other => panic!("expected Approved, got {other:?}"),
}
}
#[test]
fn approved_when_status_completed_and_no_approved_at() {
let selections = vec!["option-a".to_string()];
let state = gate_approval_state_from_fields(
None,
WorkflowStepStatus::Completed,
None,
Some(selections.clone()),
);
match state {
GateApprovalState::Approved {
feedback,
selections: s,
} => {
assert!(feedback.is_none());
assert_eq!(s, Some(selections));
}
other => panic!("expected Approved, got {other:?}"),
}
}
#[test]
fn rejected_when_status_failed_surfaces_feedback() {
let state = gate_approval_state_from_fields(
None,
WorkflowStepStatus::Failed,
Some("not ready".into()),
None,
);
match state {
GateApprovalState::Rejected { feedback } => {
assert_eq!(feedback.as_deref(), Some("not ready"));
}
other => panic!("expected Rejected, got {other:?}"),
}
}
#[test]
fn rejected_with_no_feedback_when_none_stored() {
let state = gate_approval_state_from_fields(None, WorkflowStepStatus::Failed, None, None);
match state {
GateApprovalState::Rejected { feedback } => {
assert!(feedback.is_none());
}
other => panic!("expected Rejected, got {other:?}"),
}
}
#[test]
fn step_update_completed_sets_correct_fields() {
let update = StepUpdate::completed(
7,
Some("child-123".into()),
Some("result".into()),
Some("ctx".into()),
Some("markers".into()),
3,
Some("{\"key\": \"val\"}".into()),
);
assert_eq!(update.generation, 7);
assert_eq!(update.status, WorkflowStepStatus::Completed);
assert_eq!(update.child_run_id, Some("child-123".into()));
assert_eq!(update.result_text, Some("result".into()));
assert_eq!(update.context_out, Some("ctx".into()));
assert_eq!(update.markers_out, Some("markers".into()));
assert_eq!(update.retry_count, Some(3));
assert_eq!(update.structured_output, Some("{\"key\": \"val\"}".into()));
assert!(update.step_error.is_none());
}
#[test]
fn step_update_failed_sets_correct_fields() {
let update = StepUpdate::failed(5, "oops", 2);
assert_eq!(update.generation, 5);
assert_eq!(update.status, WorkflowStepStatus::Failed);
assert_eq!(update.result_text, Some("oops".into()));
assert_eq!(update.step_error, Some("oops".into()));
assert!(update.child_run_id.is_none());
assert!(update.context_out.is_none());
assert!(update.markers_out.is_none());
assert!(update.structured_output.is_none());
assert_eq!(update.retry_count, Some(2));
}
#[test]
fn step_update_failed_with_child_sets_child_run_id() {
let update = StepUpdate::failed_with_child(3, "child err", 1, Some("child-run-42".into()));
assert_eq!(update.generation, 3);
assert_eq!(update.status, WorkflowStepStatus::Failed);
assert_eq!(update.result_text, Some("child err".into()));
assert_eq!(update.step_error, Some("child err".into()));
assert_eq!(update.child_run_id, Some("child-run-42".into()));
assert_eq!(update.retry_count, Some(1));
assert!(update.context_out.is_none());
assert!(update.markers_out.is_none());
assert!(update.structured_output.is_none());
}
}