Skip to main content

runkon_flow/
types.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use serde::{Deserialize, Serialize};
6
7use crate::extensions::Extensions;
8use crate::status::{WorkflowRunStatus, WorkflowStepStatus};
9
10/// A step key is a `(name, iteration)` pair used for skip-set and step-map lookups.
11#[allow(dead_code)]
12pub(crate) type StepKey = (String, u32);
13
14/// Describes what a workflow run is currently blocked on when in `Waiting` status.
15#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
16#[derive(Debug, Clone, Serialize, Deserialize)]
17#[serde(tag = "type", rename_all = "snake_case")]
18pub enum BlockedOn {
19    HumanApproval {
20        gate_name: String,
21        prompt: Option<String>,
22        #[serde(default)]
23        options: Vec<String>,
24    },
25    HumanReview {
26        gate_name: String,
27        prompt: Option<String>,
28        #[serde(default)]
29        options: Vec<String>,
30    },
31    PrApproval {
32        gate_name: String,
33        approvals_needed: u32,
34    },
35    PrChecks {
36        gate_name: String,
37    },
38}
39
40/// A workflow run record from the database.
41#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
42#[derive(Debug, Clone, Serialize)]
43pub struct WorkflowRun {
44    pub id: String,
45    pub workflow_name: String,
46    pub parent_run_id: String,
47    pub status: WorkflowRunStatus,
48    pub dry_run: bool,
49    pub trigger: String,
50    pub started_at: String,
51    pub ended_at: Option<String>,
52    pub result_summary: Option<String>,
53    pub error: Option<String>,
54    pub definition_snapshot: Option<String>,
55    pub inputs: HashMap<String, String>,
56    pub parent_workflow_run_id: Option<String>,
57    pub iteration: i64,
58    pub blocked_on: Option<BlockedOn>,
59    pub workflow_title: Option<String>,
60    pub total_duration_ms: Option<i64>,
61    pub dismissed: bool,
62    #[serde(skip)]
63    pub extensions: Extensions,
64    #[serde(skip)]
65    pub owner_token: Option<String>,
66    #[serde(skip)]
67    pub lease_until: Option<String>,
68    #[serde(skip)]
69    pub generation: i64,
70}
71
72/// Extract the human-readable title from a workflow definition snapshot JSON string.
73pub fn extract_workflow_title(snapshot: Option<&str>) -> Option<String> {
74    let s = snapshot?;
75    match serde_json::from_str::<serde_json::Value>(s) {
76        Ok(v) => v["title"].as_str().map(String::from),
77        Err(e) => {
78            tracing::warn!(
79                "Malformed definition_snapshot JSON — could not extract workflow title: {e}"
80            );
81            None
82        }
83    }
84}
85
86impl WorkflowRun {
87    /// Whether this run was triggered by a workflow hook (prevents infinite chains).
88    pub fn is_triggered_by_hook(&self) -> bool {
89        self.trigger == "hook"
90    }
91
92    /// Returns the human-readable display name for this run.
93    pub fn display_name(&self) -> &str {
94        self.workflow_title
95            .as_deref()
96            .unwrap_or(&self.workflow_name)
97    }
98}
99
100/// A workflow step execution record from the database.
101#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
102#[derive(Debug, Clone, Default, Serialize)]
103pub struct WorkflowRunStep {
104    pub id: String,
105    pub workflow_run_id: String,
106    pub step_name: String,
107    pub role: String,
108    pub can_commit: bool,
109    pub condition_expr: Option<String>,
110    pub status: WorkflowStepStatus,
111    pub child_run_id: Option<String>,
112    pub position: i64,
113    pub started_at: Option<String>,
114    pub ended_at: Option<String>,
115    pub result_text: Option<String>,
116    pub condition_met: Option<bool>,
117    pub iteration: i64,
118    pub parallel_group_id: Option<String>,
119    pub context_out: Option<String>,
120    pub markers_out: Option<String>,
121    pub retry_count: i64,
122    pub gate_type: Option<String>,
123    pub gate_prompt: Option<String>,
124    pub gate_timeout: Option<String>,
125    pub gate_approved_by: Option<String>,
126    pub gate_approved_at: Option<String>,
127    pub gate_feedback: Option<String>,
128    pub structured_output: Option<String>,
129    pub output_file: Option<String>,
130    pub gate_options: Option<String>,
131    pub gate_selections: Option<String>,
132    pub fan_out_total: Option<i64>,
133    pub fan_out_completed: i64,
134    pub fan_out_failed: i64,
135    pub fan_out_skipped: i64,
136    pub step_error: Option<String>,
137}
138
139/// Lightweight summary of the currently-running step for a workflow run.
140#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct WorkflowStepSummary {
142    pub step_name: String,
143    pub iteration: i64,
144    pub workflow_chain: Vec<String>,
145}
146
147/// Configuration for workflow execution.
148#[derive(Clone)]
149pub struct WorkflowExecConfig {
150    pub poll_interval: Duration,
151    pub step_timeout: Duration,
152    pub fail_fast: bool,
153    pub dry_run: bool,
154    pub shutdown: Option<std::sync::Arc<std::sync::atomic::AtomicBool>>,
155    /// Event sinks that receive observability events after each state transition.
156    /// Defaults to empty (no sinks). Use `..WorkflowExecConfig::default()` spread
157    /// syntax to leave this unset when you don't need events.
158    pub event_sinks: Vec<Arc<dyn crate::events::EventSink>>,
159    /// Lease TTL passed to `acquire_lease`. Defaults to 30s (LEASE_TTL_SECONDS).
160    /// Override in tests to use a shorter TTL.
161    pub lease_ttl_secs: i64,
162    /// How often the background refresh thread renews the lease. Defaults to 10s.
163    /// Override in tests to exercise refresh behaviour without long waits.
164    pub lease_refresh_interval: Duration,
165}
166
167impl std::fmt::Debug for WorkflowExecConfig {
168    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169        f.debug_struct("WorkflowExecConfig")
170            .field("poll_interval", &self.poll_interval)
171            .field("step_timeout", &self.step_timeout)
172            .field("fail_fast", &self.fail_fast)
173            .field("dry_run", &self.dry_run)
174            .field("shutdown", &self.shutdown)
175            .field(
176                "event_sinks",
177                &format_args!("[{} sink(s)]", self.event_sinks.len()),
178            )
179            .field("lease_ttl_secs", &self.lease_ttl_secs)
180            .field("lease_refresh_interval", &self.lease_refresh_interval)
181            .finish()
182    }
183}
184
185impl Default for WorkflowExecConfig {
186    fn default() -> Self {
187        Self {
188            poll_interval: Duration::from_secs(5),
189            step_timeout: Duration::from_secs(12 * 60 * 60),
190            fail_fast: true,
191            dry_run: false,
192            shutdown: None,
193            event_sinks: vec![],
194            lease_ttl_secs: 30,
195            lease_refresh_interval: Duration::from_secs(10),
196        }
197    }
198}
199
200/// Result of executing a workflow.
201#[derive(Debug, Clone)]
202pub struct WorkflowResult {
203    pub workflow_run_id: String,
204    pub workflow_name: String,
205    pub all_succeeded: bool,
206    pub total_duration_ms: i64,
207    pub extensions: Extensions,
208}
209
210/// Input describing a successfully completed step, passed to `record_step_success`.
211///
212/// Groups the step output data that previously made call sites unwieldy.
213/// Does not include `step_key` — that is an execution bookkeeping concern kept
214/// as a separate parameter. `iteration` is included because it is needed to
215/// populate `ContextEntry`.
216#[derive(Debug, Clone, Default)]
217pub struct StepSuccess {
218    pub step_name: String,
219    pub result_text: Option<String>,
220    /// Executor-provided metric metadata (keys from `runkon_flow::constants::metadata_keys`).
221    pub metadata: HashMap<String, String>,
222    pub markers: Vec<String>,
223    pub context: String,
224    pub child_run_id: Option<String>,
225    pub iteration: u32,
226    pub structured_output: Option<String>,
227    pub output_file: Option<String>,
228}
229
230impl StepSuccess {
231    /// Build a [`StepSuccess`] from an [`ActionOutput`] plus execution bookkeeping.
232    pub fn from_action_output(
233        output: &crate::traits::action_executor::ActionOutput,
234        step_name: String,
235        context: String,
236        iteration: u32,
237        output_file: Option<String>,
238    ) -> Self {
239        Self {
240            step_name,
241            result_text: output.result_text.clone(),
242            metadata: output.metadata.clone(),
243            markers: output.markers.clone(),
244            context,
245            child_run_id: output.child_run_id.clone(),
246            iteration,
247            structured_output: output.structured_output.clone(),
248            output_file,
249        }
250    }
251
252    /// Build a [`StepSuccess`] from a database [`WorkflowRunStep`] record.
253    pub fn from_workflow_run_step(
254        step_name: String,
255        step: &WorkflowRunStep,
256        markers: Vec<String>,
257        context: String,
258        iteration: u32,
259    ) -> Self {
260        Self {
261            step_name,
262            result_text: step.result_text.clone(),
263            markers,
264            context,
265            child_run_id: step.child_run_id.clone(),
266            structured_output: step.structured_output.clone(),
267            output_file: step.output_file.clone(),
268            iteration,
269            metadata: HashMap::new(),
270        }
271    }
272}
273
274/// Result of a single step execution (kept in memory during execution).
275#[derive(Debug, Clone, Default)]
276pub struct StepResult {
277    pub step_name: String,
278    pub status: WorkflowStepStatus,
279    pub result_text: Option<String>,
280    pub markers: Vec<String>,
281    pub context: String,
282    pub child_run_id: Option<String>,
283    pub structured_output: Option<String>,
284    pub output_file: Option<String>,
285}
286
287impl StepResult {
288    /// Create a failed StepResult with the given error text.
289    pub fn failed(step_name: &str, result_text: String) -> Self {
290        Self {
291            step_name: step_name.to_string(),
292            status: WorkflowStepStatus::Failed,
293            result_text: Some(result_text),
294            ..Self::default()
295        }
296    }
297
298    /// Create a skipped StepResult.
299    pub fn skipped(step_name: &str) -> Self {
300        Self {
301            step_name: step_name.to_string(),
302            status: WorkflowStepStatus::Skipped,
303            ..Self::default()
304        }
305    }
306
307    /// Create a completed StepResult without per-step metrics (for resume paths).
308    pub fn completed_without_metrics(success: &StepSuccess) -> Self {
309        Self::completed(success)
310    }
311
312    /// Create a completed StepResult from a [`StepSuccess`] description.
313    pub fn completed(success: &StepSuccess) -> Self {
314        Self {
315            step_name: success.step_name.clone(),
316            status: WorkflowStepStatus::Completed,
317            result_text: success.result_text.clone(),
318            markers: success.markers.clone(),
319            context: success.context.clone(),
320            child_run_id: success.child_run_id.clone(),
321            structured_output: success.structured_output.clone(),
322            output_file: success.output_file.clone(),
323        }
324    }
325}
326
327/// An entry in the accumulated context history.
328#[derive(Debug, Clone, Serialize, Deserialize)]
329pub struct ContextEntry {
330    pub step: String,
331    pub iteration: u32,
332    pub context: String,
333    #[serde(default)]
334    pub markers: Vec<String>,
335    #[serde(default)]
336    pub structured_output: Option<String>,
337    #[serde(default)]
338    pub output_file: Option<String>,
339}
340
341impl From<StepSuccess> for ContextEntry {
342    fn from(success: StepSuccess) -> Self {
343        Self {
344            step: success.step_name,
345            iteration: success.iteration,
346            context: success.context,
347            markers: success.markers,
348            structured_output: success.structured_output,
349            output_file: success.output_file,
350        }
351    }
352}
353
354/// A single row in the `workflow_run_step_fan_out_items` table.
355#[derive(Debug, Clone, Serialize, Deserialize)]
356#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
357pub struct FanOutItemRow {
358    pub id: String,
359    pub step_run_id: String,
360    pub item_type: String,
361    pub item_id: String,
362    pub item_ref: String,
363    pub child_run_id: Option<String>,
364    pub status: String,
365    pub dispatched_at: Option<String>,
366    pub completed_at: Option<String>,
367    /// Per-item context map, merged into child workflow inputs as `item.<key>`.
368    #[serde(default)]
369    pub context: std::collections::HashMap<String, String>,
370}
371
372#[cfg(test)]
373mod tests {
374    use std::collections::HashMap;
375
376    use super::{StepResult, StepSuccess, WorkflowRunStep};
377    use crate::status::WorkflowStepStatus;
378
379    #[test]
380    fn step_result_failed_sets_status_and_text() {
381        let r = StepResult::failed("plan", "out of tokens".to_string());
382        assert_eq!(r.step_name, "plan");
383        assert_eq!(r.status, WorkflowStepStatus::Failed);
384        assert_eq!(r.result_text, Some("out of tokens".to_string()));
385        assert!(r.markers.is_empty());
386        assert_eq!(r.context, "");
387    }
388
389    #[test]
390    fn step_result_skipped_sets_status_and_defaults() {
391        let r = StepResult::skipped("lint");
392        assert_eq!(r.step_name, "lint");
393        assert_eq!(r.status, WorkflowStepStatus::Skipped);
394        assert!(r.result_text.is_none());
395        assert!(r.markers.is_empty());
396        assert_eq!(r.context, "");
397    }
398
399    #[test]
400    fn step_result_completed_sets_all_fields() {
401        let success = StepSuccess {
402            step_name: "review".to_string(),
403            result_text: Some("looks good".to_string()),
404            markers: vec!["approved".to_string()],
405            context: "ctx".to_string(),
406            child_run_id: Some("child-1".to_string()),
407            structured_output: Some(r#"{"ok":true}"#.to_string()),
408            output_file: Some("/tmp/out".to_string()),
409            ..StepSuccess::default()
410        };
411        let r = StepResult::completed(&success);
412        assert_eq!(r.step_name, "review");
413        assert_eq!(r.status, WorkflowStepStatus::Completed);
414        assert_eq!(r.result_text, Some("looks good".to_string()));
415        assert_eq!(r.markers, vec!["approved"]);
416        assert_eq!(r.context, "ctx");
417        assert_eq!(r.child_run_id, Some("child-1".to_string()));
418        assert_eq!(r.structured_output, Some(r#"{"ok":true}"#.to_string()));
419        assert_eq!(r.output_file, Some("/tmp/out".to_string()));
420    }
421
422    #[test]
423    fn completed_without_metrics_delegates_to_completed() {
424        let success = StepSuccess {
425            step_name: "restore".to_string(),
426            result_text: Some("ok".to_string()),
427            markers: vec!["done".to_string()],
428            context: "restored".to_string(),
429            ..StepSuccess::default()
430        };
431        let r = StepResult::completed_without_metrics(&success);
432        assert_eq!(r.step_name, "restore");
433        assert_eq!(r.status, WorkflowStepStatus::Completed);
434        assert_eq!(r.result_text, Some("ok".to_string()));
435        assert_eq!(r.markers, vec!["done"]);
436        assert_eq!(r.context, "restored");
437    }
438
439    #[test]
440    fn step_success_into_context_entry_maps_all_fields() {
441        let success = StepSuccess {
442            step_name: "my-step".to_string(),
443            iteration: 7,
444            context: "ctx-body".to_string(),
445            markers: vec!["m1".to_string(), "m2".to_string()],
446            structured_output: Some(r#"{"k":"v"}"#.to_string()),
447            output_file: Some("/tmp/out".to_string()),
448            result_text: Some("rt".to_string()),
449            child_run_id: Some("child-1".to_string()),
450            ..StepSuccess::default()
451        };
452        let entry: super::ContextEntry = success.into();
453        assert_eq!(entry.step, "my-step", "step should come from step_name");
454        assert_eq!(entry.iteration, 7);
455        assert_eq!(entry.context, "ctx-body");
456        assert_eq!(entry.markers, vec!["m1", "m2"]);
457        assert_eq!(entry.structured_output, Some(r#"{"k":"v"}"#.to_string()));
458        assert_eq!(entry.output_file, Some("/tmp/out".to_string()));
459    }
460
461    #[test]
462    fn from_workflow_run_step_maps_fields() {
463        let step = WorkflowRunStep {
464            result_text: Some("all good".to_string()),
465            child_run_id: Some("child-1".to_string()),
466            structured_output: Some(r#"{"ok":true}"#.to_string()),
467            output_file: Some("/tmp/out".to_string()),
468            ..WorkflowRunStep::default()
469        };
470        let success = StepSuccess::from_workflow_run_step(
471            "child-step".to_string(),
472            &step,
473            vec!["m1".to_string(), "m2".to_string()],
474            "ctx-body".to_string(),
475            7,
476        );
477        assert_eq!(success.step_name, "child-step");
478        assert_eq!(success.result_text, Some("all good".to_string()));
479        assert_eq!(success.markers, vec!["m1", "m2"]);
480        assert_eq!(success.context, "ctx-body");
481        assert_eq!(success.child_run_id, Some("child-1".to_string()));
482        assert_eq!(
483            success.structured_output,
484            Some(r#"{"ok":true}"#.to_string())
485        );
486        assert_eq!(success.output_file, Some("/tmp/out".to_string()));
487        assert_eq!(success.iteration, 7);
488        assert!(success.metadata.is_empty());
489    }
490
491    #[test]
492    fn from_action_output_maps_all_fields() {
493        use crate::constants::metadata_keys;
494        let mut metadata = HashMap::new();
495        metadata.insert(metadata_keys::COST_USD.to_string(), "0.05".to_string());
496        metadata.insert(metadata_keys::NUM_TURNS.to_string(), "3".to_string());
497        metadata.insert(metadata_keys::DURATION_MS.to_string(), "1200".to_string());
498        metadata.insert(metadata_keys::INPUT_TOKENS.to_string(), "100".to_string());
499        metadata.insert(metadata_keys::OUTPUT_TOKENS.to_string(), "200".to_string());
500        metadata.insert(
501            metadata_keys::CACHE_READ_INPUT_TOKENS.to_string(),
502            "50".to_string(),
503        );
504        metadata.insert(
505            metadata_keys::CACHE_CREATION_INPUT_TOKENS.to_string(),
506            "25".to_string(),
507        );
508        let output = crate::traits::action_executor::ActionOutput {
509            markers: vec!["m1".to_string()],
510            context: Some("ctx".to_string()),
511            result_text: Some("rt".to_string()),
512            metadata: metadata.clone(),
513            child_run_id: Some("child-1".to_string()),
514            structured_output: Some(r#"{"ok":true}"#.to_string()),
515        };
516        let success = StepSuccess::from_action_output(
517            &output,
518            "review".to_string(),
519            "ctx".to_string(),
520            5,
521            Some("/tmp/out".to_string()),
522        );
523        assert_eq!(success.step_name, "review");
524        assert_eq!(success.result_text, Some("rt".to_string()));
525        assert_eq!(
526            success.metadata.get(metadata_keys::COST_USD),
527            Some(&"0.05".to_string())
528        );
529        assert_eq!(
530            success.metadata.get(metadata_keys::NUM_TURNS),
531            Some(&"3".to_string())
532        );
533        assert_eq!(
534            success.metadata.get(metadata_keys::DURATION_MS),
535            Some(&"1200".to_string())
536        );
537        assert_eq!(success.markers, vec!["m1"]);
538        assert_eq!(success.context, "ctx");
539        assert_eq!(success.child_run_id, Some("child-1".to_string()));
540        assert_eq!(success.iteration, 5);
541        assert_eq!(
542            success.structured_output,
543            Some(r#"{"ok":true}"#.to_string())
544        );
545        assert_eq!(success.output_file, Some("/tmp/out".to_string()));
546    }
547}