1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use serde::{Deserialize, Serialize};
6
7use crate::extensions::Extensions;
8use crate::status::{WorkflowRunStatus, WorkflowStepStatus};
9
10#[allow(dead_code)]
12pub(crate) type StepKey = (String, u32);
13
14#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
16#[derive(Debug, Clone, Serialize, Deserialize)]
17#[serde(tag = "type", rename_all = "snake_case")]
18pub enum BlockedOn {
19 HumanApproval {
20 gate_name: String,
21 prompt: Option<String>,
22 #[serde(default)]
23 options: Vec<String>,
24 },
25 HumanReview {
26 gate_name: String,
27 prompt: Option<String>,
28 #[serde(default)]
29 options: Vec<String>,
30 },
31 PrApproval {
32 gate_name: String,
33 approvals_needed: u32,
34 },
35 PrChecks {
36 gate_name: String,
37 },
38}
39
40#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
42#[derive(Debug, Clone, Serialize)]
43pub struct WorkflowRun {
44 pub id: String,
45 pub workflow_name: String,
46 pub parent_run_id: String,
47 pub status: WorkflowRunStatus,
48 pub dry_run: bool,
49 pub trigger: String,
50 pub started_at: String,
51 pub ended_at: Option<String>,
52 pub result_summary: Option<String>,
53 pub error: Option<String>,
54 pub definition_snapshot: Option<String>,
55 pub inputs: HashMap<String, String>,
56 pub parent_workflow_run_id: Option<String>,
57 pub iteration: i64,
58 pub blocked_on: Option<BlockedOn>,
59 pub workflow_title: Option<String>,
60 pub total_duration_ms: Option<i64>,
61 pub dismissed: bool,
62 #[serde(skip)]
63 pub extensions: Extensions,
64 #[serde(skip)]
65 pub owner_token: Option<String>,
66 #[serde(skip)]
67 pub lease_until: Option<String>,
68 #[serde(skip)]
69 pub generation: i64,
70}
71
72pub fn extract_workflow_title(snapshot: Option<&str>) -> Option<String> {
74 let s = snapshot?;
75 match serde_json::from_str::<serde_json::Value>(s) {
76 Ok(v) => v["title"].as_str().map(String::from),
77 Err(e) => {
78 tracing::warn!(
79 "Malformed definition_snapshot JSON — could not extract workflow title: {e}"
80 );
81 None
82 }
83 }
84}
85
86impl WorkflowRun {
87 pub fn is_triggered_by_hook(&self) -> bool {
89 self.trigger == "hook"
90 }
91
92 pub fn display_name(&self) -> &str {
94 self.workflow_title
95 .as_deref()
96 .unwrap_or(&self.workflow_name)
97 }
98}
99
100#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
102#[derive(Debug, Clone, Default, Serialize)]
103pub struct WorkflowRunStep {
104 pub id: String,
105 pub workflow_run_id: String,
106 pub step_name: String,
107 pub role: String,
108 pub can_commit: bool,
109 pub condition_expr: Option<String>,
110 pub status: WorkflowStepStatus,
111 pub child_run_id: Option<String>,
112 pub position: i64,
113 pub started_at: Option<String>,
114 pub ended_at: Option<String>,
115 pub result_text: Option<String>,
116 pub condition_met: Option<bool>,
117 pub iteration: i64,
118 pub parallel_group_id: Option<String>,
119 pub context_out: Option<String>,
120 pub markers_out: Option<String>,
121 pub retry_count: i64,
122 pub gate_type: Option<String>,
123 pub gate_prompt: Option<String>,
124 pub gate_timeout: Option<String>,
125 pub gate_approved_by: Option<String>,
126 pub gate_approved_at: Option<String>,
127 pub gate_feedback: Option<String>,
128 pub structured_output: Option<String>,
129 pub output_file: Option<String>,
130 pub gate_options: Option<String>,
131 pub gate_selections: Option<String>,
132 pub fan_out_total: Option<i64>,
133 pub fan_out_completed: i64,
134 pub fan_out_failed: i64,
135 pub fan_out_skipped: i64,
136 pub step_error: Option<String>,
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct WorkflowStepSummary {
142 pub step_name: String,
143 pub iteration: i64,
144 pub workflow_chain: Vec<String>,
145}
146
147#[derive(Clone)]
149pub struct WorkflowExecConfig {
150 pub poll_interval: Duration,
151 pub step_timeout: Duration,
152 pub fail_fast: bool,
153 pub dry_run: bool,
154 pub shutdown: Option<std::sync::Arc<std::sync::atomic::AtomicBool>>,
155 pub event_sinks: Vec<Arc<dyn crate::events::EventSink>>,
159 pub lease_ttl_secs: i64,
162 pub lease_refresh_interval: Duration,
165}
166
167impl std::fmt::Debug for WorkflowExecConfig {
168 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169 f.debug_struct("WorkflowExecConfig")
170 .field("poll_interval", &self.poll_interval)
171 .field("step_timeout", &self.step_timeout)
172 .field("fail_fast", &self.fail_fast)
173 .field("dry_run", &self.dry_run)
174 .field("shutdown", &self.shutdown)
175 .field(
176 "event_sinks",
177 &format_args!("[{} sink(s)]", self.event_sinks.len()),
178 )
179 .field("lease_ttl_secs", &self.lease_ttl_secs)
180 .field("lease_refresh_interval", &self.lease_refresh_interval)
181 .finish()
182 }
183}
184
185impl Default for WorkflowExecConfig {
186 fn default() -> Self {
187 Self {
188 poll_interval: Duration::from_secs(5),
189 step_timeout: Duration::from_secs(12 * 60 * 60),
190 fail_fast: true,
191 dry_run: false,
192 shutdown: None,
193 event_sinks: vec![],
194 lease_ttl_secs: 30,
195 lease_refresh_interval: Duration::from_secs(10),
196 }
197 }
198}
199
200#[derive(Debug, Clone)]
202pub struct WorkflowResult {
203 pub workflow_run_id: String,
204 pub workflow_name: String,
205 pub all_succeeded: bool,
206 pub total_duration_ms: i64,
207 pub extensions: Extensions,
208}
209
210#[derive(Debug, Clone, Default)]
217pub struct StepSuccess {
218 pub step_name: String,
219 pub result_text: Option<String>,
220 pub metadata: HashMap<String, String>,
222 pub markers: Vec<String>,
223 pub context: String,
224 pub child_run_id: Option<String>,
225 pub iteration: u32,
226 pub structured_output: Option<String>,
227 pub output_file: Option<String>,
228}
229
230impl StepSuccess {
231 pub fn from_action_output(
233 output: &crate::traits::action_executor::ActionOutput,
234 step_name: String,
235 context: String,
236 iteration: u32,
237 output_file: Option<String>,
238 ) -> Self {
239 Self {
240 step_name,
241 result_text: output.result_text.clone(),
242 metadata: output.metadata.clone(),
243 markers: output.markers.clone(),
244 context,
245 child_run_id: output.child_run_id.clone(),
246 iteration,
247 structured_output: output.structured_output.clone(),
248 output_file,
249 }
250 }
251
252 pub fn from_workflow_run_step(
254 step_name: String,
255 step: &WorkflowRunStep,
256 markers: Vec<String>,
257 context: String,
258 iteration: u32,
259 ) -> Self {
260 Self {
261 step_name,
262 result_text: step.result_text.clone(),
263 markers,
264 context,
265 child_run_id: step.child_run_id.clone(),
266 structured_output: step.structured_output.clone(),
267 output_file: step.output_file.clone(),
268 iteration,
269 metadata: HashMap::new(),
270 }
271 }
272}
273
274#[derive(Debug, Clone, Default)]
276pub struct StepResult {
277 pub step_name: String,
278 pub status: WorkflowStepStatus,
279 pub result_text: Option<String>,
280 pub markers: Vec<String>,
281 pub context: String,
282 pub child_run_id: Option<String>,
283 pub structured_output: Option<String>,
284 pub output_file: Option<String>,
285}
286
287impl StepResult {
288 pub fn failed(step_name: &str, result_text: String) -> Self {
290 Self {
291 step_name: step_name.to_string(),
292 status: WorkflowStepStatus::Failed,
293 result_text: Some(result_text),
294 ..Self::default()
295 }
296 }
297
298 pub fn skipped(step_name: &str) -> Self {
300 Self {
301 step_name: step_name.to_string(),
302 status: WorkflowStepStatus::Skipped,
303 ..Self::default()
304 }
305 }
306
307 pub fn completed_without_metrics(success: &StepSuccess) -> Self {
309 Self::completed(success)
310 }
311
312 pub fn completed(success: &StepSuccess) -> Self {
314 Self {
315 step_name: success.step_name.clone(),
316 status: WorkflowStepStatus::Completed,
317 result_text: success.result_text.clone(),
318 markers: success.markers.clone(),
319 context: success.context.clone(),
320 child_run_id: success.child_run_id.clone(),
321 structured_output: success.structured_output.clone(),
322 output_file: success.output_file.clone(),
323 }
324 }
325}
326
327#[derive(Debug, Clone, Serialize, Deserialize)]
329pub struct ContextEntry {
330 pub step: String,
331 pub iteration: u32,
332 pub context: String,
333 #[serde(default)]
334 pub markers: Vec<String>,
335 #[serde(default)]
336 pub structured_output: Option<String>,
337 #[serde(default)]
338 pub output_file: Option<String>,
339}
340
341impl From<StepSuccess> for ContextEntry {
342 fn from(success: StepSuccess) -> Self {
343 Self {
344 step: success.step_name,
345 iteration: success.iteration,
346 context: success.context,
347 markers: success.markers,
348 structured_output: success.structured_output,
349 output_file: success.output_file,
350 }
351 }
352}
353
354#[derive(Debug, Clone, Serialize, Deserialize)]
356#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
357pub struct FanOutItemRow {
358 pub id: String,
359 pub step_run_id: String,
360 pub item_type: String,
361 pub item_id: String,
362 pub item_ref: String,
363 pub child_run_id: Option<String>,
364 pub status: String,
365 pub dispatched_at: Option<String>,
366 pub completed_at: Option<String>,
367 #[serde(default)]
369 pub context: std::collections::HashMap<String, String>,
370}
371
372#[cfg(test)]
373mod tests {
374 use std::collections::HashMap;
375
376 use super::{StepResult, StepSuccess, WorkflowRunStep};
377 use crate::status::WorkflowStepStatus;
378
379 #[test]
380 fn step_result_failed_sets_status_and_text() {
381 let r = StepResult::failed("plan", "out of tokens".to_string());
382 assert_eq!(r.step_name, "plan");
383 assert_eq!(r.status, WorkflowStepStatus::Failed);
384 assert_eq!(r.result_text, Some("out of tokens".to_string()));
385 assert!(r.markers.is_empty());
386 assert_eq!(r.context, "");
387 }
388
389 #[test]
390 fn step_result_skipped_sets_status_and_defaults() {
391 let r = StepResult::skipped("lint");
392 assert_eq!(r.step_name, "lint");
393 assert_eq!(r.status, WorkflowStepStatus::Skipped);
394 assert!(r.result_text.is_none());
395 assert!(r.markers.is_empty());
396 assert_eq!(r.context, "");
397 }
398
399 #[test]
400 fn step_result_completed_sets_all_fields() {
401 let success = StepSuccess {
402 step_name: "review".to_string(),
403 result_text: Some("looks good".to_string()),
404 markers: vec!["approved".to_string()],
405 context: "ctx".to_string(),
406 child_run_id: Some("child-1".to_string()),
407 structured_output: Some(r#"{"ok":true}"#.to_string()),
408 output_file: Some("/tmp/out".to_string()),
409 ..StepSuccess::default()
410 };
411 let r = StepResult::completed(&success);
412 assert_eq!(r.step_name, "review");
413 assert_eq!(r.status, WorkflowStepStatus::Completed);
414 assert_eq!(r.result_text, Some("looks good".to_string()));
415 assert_eq!(r.markers, vec!["approved"]);
416 assert_eq!(r.context, "ctx");
417 assert_eq!(r.child_run_id, Some("child-1".to_string()));
418 assert_eq!(r.structured_output, Some(r#"{"ok":true}"#.to_string()));
419 assert_eq!(r.output_file, Some("/tmp/out".to_string()));
420 }
421
422 #[test]
423 fn completed_without_metrics_delegates_to_completed() {
424 let success = StepSuccess {
425 step_name: "restore".to_string(),
426 result_text: Some("ok".to_string()),
427 markers: vec!["done".to_string()],
428 context: "restored".to_string(),
429 ..StepSuccess::default()
430 };
431 let r = StepResult::completed_without_metrics(&success);
432 assert_eq!(r.step_name, "restore");
433 assert_eq!(r.status, WorkflowStepStatus::Completed);
434 assert_eq!(r.result_text, Some("ok".to_string()));
435 assert_eq!(r.markers, vec!["done"]);
436 assert_eq!(r.context, "restored");
437 }
438
439 #[test]
440 fn step_success_into_context_entry_maps_all_fields() {
441 let success = StepSuccess {
442 step_name: "my-step".to_string(),
443 iteration: 7,
444 context: "ctx-body".to_string(),
445 markers: vec!["m1".to_string(), "m2".to_string()],
446 structured_output: Some(r#"{"k":"v"}"#.to_string()),
447 output_file: Some("/tmp/out".to_string()),
448 result_text: Some("rt".to_string()),
449 child_run_id: Some("child-1".to_string()),
450 ..StepSuccess::default()
451 };
452 let entry: super::ContextEntry = success.into();
453 assert_eq!(entry.step, "my-step", "step should come from step_name");
454 assert_eq!(entry.iteration, 7);
455 assert_eq!(entry.context, "ctx-body");
456 assert_eq!(entry.markers, vec!["m1", "m2"]);
457 assert_eq!(entry.structured_output, Some(r#"{"k":"v"}"#.to_string()));
458 assert_eq!(entry.output_file, Some("/tmp/out".to_string()));
459 }
460
461 #[test]
462 fn from_workflow_run_step_maps_fields() {
463 let step = WorkflowRunStep {
464 result_text: Some("all good".to_string()),
465 child_run_id: Some("child-1".to_string()),
466 structured_output: Some(r#"{"ok":true}"#.to_string()),
467 output_file: Some("/tmp/out".to_string()),
468 ..WorkflowRunStep::default()
469 };
470 let success = StepSuccess::from_workflow_run_step(
471 "child-step".to_string(),
472 &step,
473 vec!["m1".to_string(), "m2".to_string()],
474 "ctx-body".to_string(),
475 7,
476 );
477 assert_eq!(success.step_name, "child-step");
478 assert_eq!(success.result_text, Some("all good".to_string()));
479 assert_eq!(success.markers, vec!["m1", "m2"]);
480 assert_eq!(success.context, "ctx-body");
481 assert_eq!(success.child_run_id, Some("child-1".to_string()));
482 assert_eq!(
483 success.structured_output,
484 Some(r#"{"ok":true}"#.to_string())
485 );
486 assert_eq!(success.output_file, Some("/tmp/out".to_string()));
487 assert_eq!(success.iteration, 7);
488 assert!(success.metadata.is_empty());
489 }
490
491 #[test]
492 fn from_action_output_maps_all_fields() {
493 use crate::constants::metadata_keys;
494 let mut metadata = HashMap::new();
495 metadata.insert(metadata_keys::COST_USD.to_string(), "0.05".to_string());
496 metadata.insert(metadata_keys::NUM_TURNS.to_string(), "3".to_string());
497 metadata.insert(metadata_keys::DURATION_MS.to_string(), "1200".to_string());
498 metadata.insert(metadata_keys::INPUT_TOKENS.to_string(), "100".to_string());
499 metadata.insert(metadata_keys::OUTPUT_TOKENS.to_string(), "200".to_string());
500 metadata.insert(
501 metadata_keys::CACHE_READ_INPUT_TOKENS.to_string(),
502 "50".to_string(),
503 );
504 metadata.insert(
505 metadata_keys::CACHE_CREATION_INPUT_TOKENS.to_string(),
506 "25".to_string(),
507 );
508 let output = crate::traits::action_executor::ActionOutput {
509 markers: vec!["m1".to_string()],
510 context: Some("ctx".to_string()),
511 result_text: Some("rt".to_string()),
512 metadata: metadata.clone(),
513 child_run_id: Some("child-1".to_string()),
514 structured_output: Some(r#"{"ok":true}"#.to_string()),
515 };
516 let success = StepSuccess::from_action_output(
517 &output,
518 "review".to_string(),
519 "ctx".to_string(),
520 5,
521 Some("/tmp/out".to_string()),
522 );
523 assert_eq!(success.step_name, "review");
524 assert_eq!(success.result_text, Some("rt".to_string()));
525 assert_eq!(
526 success.metadata.get(metadata_keys::COST_USD),
527 Some(&"0.05".to_string())
528 );
529 assert_eq!(
530 success.metadata.get(metadata_keys::NUM_TURNS),
531 Some(&"3".to_string())
532 );
533 assert_eq!(
534 success.metadata.get(metadata_keys::DURATION_MS),
535 Some(&"1200".to_string())
536 );
537 assert_eq!(success.markers, vec!["m1"]);
538 assert_eq!(success.context, "ctx");
539 assert_eq!(success.child_run_id, Some("child-1".to_string()));
540 assert_eq!(success.iteration, 5);
541 assert_eq!(
542 success.structured_output,
543 Some(r#"{"ok":true}"#.to_string())
544 );
545 assert_eq!(success.output_file, Some("/tmp/out".to_string()));
546 }
547}