1use crate::resolve;
8use crate::store::WorkflowStateStore;
9use crate::types::*;
10
11#[async_trait::async_trait]
13pub trait EventSink: Send + Sync {
14 async fn emit(&self, event: WorkflowEvent);
15}
16
17pub struct TracingEventSink;
19
20#[async_trait::async_trait]
21impl EventSink for TracingEventSink {
22 async fn emit(&self, event: WorkflowEvent) {
23 match &event {
24 WorkflowEvent::WorkflowStarted {
25 workflow_id,
26 total_steps,
27 } => {
28 tracing::info!(%workflow_id, total_steps, "workflow started");
29 }
30 WorkflowEvent::StepStarted {
31 step_id,
32 step_label,
33 ..
34 } => {
35 tracing::info!(%step_id, %step_label, "step started");
36 }
37 WorkflowEvent::StepCompleted {
38 step_id,
39 step_label,
40 ..
41 } => {
42 tracing::info!(%step_id, %step_label, "step completed");
43 }
44 WorkflowEvent::StepFailed {
45 step_id,
46 step_label,
47 error,
48 ..
49 } => {
50 tracing::error!(%step_id, %step_label, %error, "step failed");
51 }
52 WorkflowEvent::WorkflowCompleted {
53 workflow_id,
54 status,
55 steps_done,
56 steps_failed,
57 } => {
58 tracing::info!(%workflow_id, ?status, steps_done, steps_failed, "workflow completed");
59 }
60 WorkflowEvent::StepWaiting {
61 step_id,
62 step_label,
63 message,
64 ..
65 } => {
66 tracing::info!(%step_id, %step_label, %message, "step waiting for input");
67 }
68 }
69 }
70}
71
72pub struct NoopEventSink;
74
75#[async_trait::async_trait]
76impl EventSink for NoopEventSink {
77 async fn emit(&self, _event: WorkflowEvent) {}
78}
79
80#[async_trait::async_trait]
82pub trait StepExecutor: Send + Sync {
83 async fn execute(
85 &self,
86 step: &WorkflowStep,
87 context: &serde_json::Value,
88 ) -> Result<StepResult, String>;
89
90 fn supports(&self, _requirement: &StepRequirement) -> bool {
93 true
94 }
95
96 fn available_skills(&self) -> Vec<StepRequirement> {
99 vec![]
100 }
101}
102
103pub struct WorkflowRunner<S: WorkflowStateStore, E: StepExecutor, K: EventSink = NoopEventSink> {
105 pub store: S,
106 pub executor: E,
107 pub events: K,
108}
109
110impl<S: WorkflowStateStore, E: StepExecutor> WorkflowRunner<S, E, NoopEventSink> {
111 pub fn new(store: S, executor: E) -> Self {
112 Self {
113 store,
114 executor,
115 events: NoopEventSink,
116 }
117 }
118}
119
120impl<S: WorkflowStateStore, E: StepExecutor, K: EventSink> WorkflowRunner<S, E, K> {
121 pub fn with_events(store: S, executor: E, events: K) -> Self {
122 Self {
123 store,
124 executor,
125 events,
126 }
127 }
128
129 fn unmet_requirements<'a>(&self, step: &'a WorkflowStep) -> Vec<&'a StepRequirement> {
131 step.requires
132 .iter()
133 .filter(|r| !self.executor.supports(r))
134 .collect()
135 }
136
137 pub async fn run_next(&self, workflow_id: &str) -> Result<Vec<(String, StepResult)>, String> {
140 let mut run = self
141 .store
142 .load(workflow_id)
143 .await?
144 .ok_or("Workflow not found")?;
145
146 if run.is_complete() {
147 run.status = WorkflowStatus::Completed;
148 self.store.save(&run).await?;
149 return Ok(vec![]);
150 }
151
152 if run.has_failed() {
153 return Err("Workflow has failed steps".into());
154 }
155
156 let mut skipped_any = false;
158 for i in 0..run.definition.steps.len() {
159 if run.step_runs[i].status != StepStatus::Pending {
160 continue;
161 }
162 let skip_expr = run.definition.steps[i].skip_if.clone();
163 if let Some(skip_expr) = skip_expr {
164 if resolve::evaluate_skip_condition(&skip_expr, &run.context) {
165 let step_id = run.definition.steps[i].id.clone();
166 run.step_runs[i].status = StepStatus::Skipped;
167 run.step_runs[i].completed_at = Some(chrono::Utc::now());
168 run.add_note(&step_id, "Skipped by skip_if condition");
169 skipped_any = true;
170 }
171 }
172 }
173 if skipped_any {
174 self.store.save(&run).await?;
175 }
176
177 let runnable: Vec<(usize, String, StepExecution, WorkflowStep)> = run
179 .runnable_steps()
180 .into_iter()
181 .map(|(i, s)| (i, s.id.clone(), s.execution, s.clone()))
182 .collect();
183
184 if runnable.is_empty() {
185 if run.is_stuck() {
187 run.status = WorkflowStatus::Blocked;
188 self.store.save(&run).await?;
189 return Ok(vec![]);
190 }
191 return Err("No runnable steps (all blocked by dependencies)".into());
192 }
193
194 let mut blocked_indices = vec![];
196 let mut executable = vec![];
197
198 for (idx, step_id, exec, step) in runnable {
199 let unmet = self.unmet_requirements(&step);
200 if !unmet.is_empty() {
201 let missing: Vec<String> = unmet.iter().map(|r| r.skill.clone()).collect();
202 blocked_indices.push((idx, missing));
203 } else {
204 executable.push((idx, step_id, exec, step));
205 }
206 }
207
208 for (idx, missing) in &blocked_indices {
210 run.step_runs[*idx].status = StepStatus::Blocked;
211 run.step_runs[*idx].error = Some(format!("Missing skills: {}", missing.join(", ")));
212 }
213
214 if !blocked_indices.is_empty() {
215 self.store.save(&run).await?;
216 }
217
218 if executable.is_empty() {
219 if run.is_stuck() {
221 run.status = WorkflowStatus::Blocked;
222 self.store.save(&run).await?;
223 }
224 return Ok(vec![]);
225 }
226
227 let (wait_steps, non_wait): (Vec<_>, Vec<_>) = executable
229 .into_iter()
230 .partition(|(_, _, _, step)| matches!(step.kind, StepKind::WaitForInput { .. }));
231
232 if !wait_steps.is_empty() {
234 let (idx, step_id, _, step) = &wait_steps[0];
235 let (message, schema) = match &step.kind {
236 StepKind::WaitForInput { message, schema } => (message.clone(), schema.clone()),
237 _ => unreachable!(),
238 };
239 run.step_runs[*idx].status = StepStatus::WaitingForInput;
240 run.step_runs[*idx].started_at = Some(chrono::Utc::now());
241 run.status = WorkflowStatus::Paused;
242 run.current_step = *idx;
243 self.store.save(&run).await?;
244 self.events
245 .emit(WorkflowEvent::StepWaiting {
246 workflow_id: workflow_id.to_string(),
247 step_id: step_id.clone(),
248 step_label: step.label.clone(),
249 message,
250 schema,
251 })
252 .await;
253 return Ok(vec![]);
254 }
255
256 let (parallel, sequential): (Vec<_>, Vec<_>) = non_wait
257 .into_iter()
258 .partition(|(_, _, exec, _)| *exec == StepExecution::Parallel);
259
260 let mut results = vec![];
261
262 if !parallel.is_empty() {
264 for (idx, _, _, _) in ¶llel {
265 run.step_runs[*idx].status = StepStatus::Running;
266 run.step_runs[*idx].started_at = Some(chrono::Utc::now());
267 }
268 run.status = WorkflowStatus::Running;
269 self.store.save(&run).await?;
270
271 for (idx, step_id, _, step) in ¶llel {
272 self.events
273 .emit(WorkflowEvent::StepStarted {
274 workflow_id: workflow_id.to_string(),
275 step_id: step_id.clone(),
276 step_label: step.label.clone(),
277 })
278 .await;
279
280 let step_context = resolve::resolve_step_input(step.input.as_ref(), &run.context);
281 let result = self.executor.execute(step, &step_context).await;
282 match result {
283 Ok(r) if r.status == StepStatus::Failed => {
284 let error = r.error.clone().unwrap_or_else(|| "Step failed".to_string());
285 self.events
286 .emit(WorkflowEvent::StepFailed {
287 workflow_id: workflow_id.to_string(),
288 step_id: step_id.clone(),
289 step_label: step.label.clone(),
290 error: error.clone(),
291 })
292 .await;
293 self.store.commit_step(workflow_id, *idx, r.clone()).await?;
294 results.push((step_id.clone(), r));
295 }
296 Ok(r) => {
297 self.events
298 .emit(WorkflowEvent::StepCompleted {
299 workflow_id: workflow_id.to_string(),
300 step_id: step_id.clone(),
301 step_label: step.label.clone(),
302 result: r.result.clone(),
303 })
304 .await;
305 self.store.commit_step(workflow_id, *idx, r.clone()).await?;
306 results.push((step_id.clone(), r));
307 }
308 Err(e) => {
309 self.events
310 .emit(WorkflowEvent::StepFailed {
311 workflow_id: workflow_id.to_string(),
312 step_id: step_id.clone(),
313 step_label: step.label.clone(),
314 error: e.clone(),
315 })
316 .await;
317 let failed = StepResult::failed(&e);
318 self.store
319 .commit_step(workflow_id, *idx, failed.clone())
320 .await?;
321 results.push((step_id.clone(), failed));
322 }
323 }
324 }
325 }
326
327 if !sequential.is_empty() && parallel.is_empty() {
329 let (idx, step_id, _, step) = &sequential[0];
330
331 run.step_runs[*idx].status = StepStatus::Running;
332 run.step_runs[*idx].started_at = Some(chrono::Utc::now());
333 run.status = WorkflowStatus::Running;
334 run.current_step = *idx;
335 self.store.save(&run).await?;
336
337 self.events
338 .emit(WorkflowEvent::StepStarted {
339 workflow_id: workflow_id.to_string(),
340 step_id: step_id.clone(),
341 step_label: step.label.clone(),
342 })
343 .await;
344
345 let step_context = resolve::resolve_step_input(step.input.as_ref(), &run.context);
346 let result = self.executor.execute(step, &step_context).await;
347 match result {
348 Ok(r) if r.status == StepStatus::Failed => {
349 let error = r.error.clone().unwrap_or_else(|| "Step failed".to_string());
350 self.events
351 .emit(WorkflowEvent::StepFailed {
352 workflow_id: workflow_id.to_string(),
353 step_id: step_id.clone(),
354 step_label: step.label.clone(),
355 error: error.clone(),
356 })
357 .await;
358 self.store.commit_step(workflow_id, *idx, r.clone()).await?;
359 results.push((step_id.clone(), r));
360 }
361 Ok(r) => {
362 self.events
363 .emit(WorkflowEvent::StepCompleted {
364 workflow_id: workflow_id.to_string(),
365 step_id: step_id.clone(),
366 step_label: step.label.clone(),
367 result: r.result.clone(),
368 })
369 .await;
370 self.store.commit_step(workflow_id, *idx, r.clone()).await?;
371 results.push((step_id.clone(), r));
372 }
373 Err(e) => {
374 self.events
375 .emit(WorkflowEvent::StepFailed {
376 workflow_id: workflow_id.to_string(),
377 step_id: step_id.clone(),
378 step_label: step.label.clone(),
379 error: e.clone(),
380 })
381 .await;
382 let failed = StepResult::failed(&e);
383 self.store
384 .commit_step(workflow_id, *idx, failed.clone())
385 .await?;
386 results.push((step_id.clone(), failed));
387 }
388 }
389 }
390
391 let run = self.store.load(workflow_id).await?.unwrap();
393 if run.is_complete() {
394 let mut w = run;
395 if w.is_stuck() || w.step_runs.iter().any(|s| s.status == StepStatus::Blocked) {
396 w.status = WorkflowStatus::Blocked;
397 } else {
398 w.status = WorkflowStatus::Completed;
399 }
400 self.store.save(&w).await?;
401 }
402
403 Ok(results)
404 }
405
406 pub async fn run_all(&self, workflow_id: &str) -> Result<WorkflowStatus, String> {
408 let run = self
410 .store
411 .load(workflow_id)
412 .await?
413 .ok_or("Workflow not found")?;
414 run.detect_cycles()?;
415
416 self.events
418 .emit(WorkflowEvent::WorkflowStarted {
419 workflow_id: workflow_id.to_string(),
420 total_steps: run.definition.steps.len(),
421 })
422 .await;
423
424 loop {
425 let run = self
426 .store
427 .load(workflow_id)
428 .await?
429 .ok_or("Workflow not found")?;
430
431 match run.status {
432 WorkflowStatus::Completed | WorkflowStatus::Failed | WorkflowStatus::Blocked => {
433 let done = run
434 .step_runs
435 .iter()
436 .filter(|s| s.status == StepStatus::Done)
437 .count();
438 let failed = run
439 .step_runs
440 .iter()
441 .filter(|s| s.status == StepStatus::Failed)
442 .count();
443 self.events
444 .emit(WorkflowEvent::WorkflowCompleted {
445 workflow_id: workflow_id.to_string(),
446 status: run.status,
447 steps_done: done,
448 steps_failed: failed,
449 })
450 .await;
451 return Ok(run.status);
452 }
453 WorkflowStatus::Paused => {
455 return Ok(WorkflowStatus::Paused);
456 }
457 _ => {}
458 }
459
460 if run.is_complete() {
461 self.events
462 .emit(WorkflowEvent::WorkflowCompleted {
463 workflow_id: workflow_id.to_string(),
464 status: WorkflowStatus::Completed,
465 steps_done: run.definition.steps.len(),
466 steps_failed: 0,
467 })
468 .await;
469 return Ok(WorkflowStatus::Completed);
470 }
471
472 let results = self.run_next(workflow_id).await?;
473
474 if results.iter().any(|(_, r)| r.status == StepStatus::Failed) {
475 let mut w = self.store.load(workflow_id).await?.unwrap();
476 w.status = WorkflowStatus::Failed;
477 self.store.save(&w).await?;
478 let done = w
479 .step_runs
480 .iter()
481 .filter(|s| s.status == StepStatus::Done)
482 .count();
483 let failed = w
484 .step_runs
485 .iter()
486 .filter(|s| s.status == StepStatus::Failed)
487 .count();
488 self.events
489 .emit(WorkflowEvent::WorkflowCompleted {
490 workflow_id: workflow_id.to_string(),
491 status: WorkflowStatus::Failed,
492 steps_done: done,
493 steps_failed: failed,
494 })
495 .await;
496 return Ok(WorkflowStatus::Failed);
497 }
498
499 if results.is_empty() {
500 let w = self.store.load(workflow_id).await?.unwrap();
501 return Ok(w.status);
502 }
503 }
504 }
505
506 pub async fn resume(
509 &self,
510 workflow_id: &str,
511 step_id: &str,
512 input: serde_json::Value,
513 ) -> Result<WorkflowStatus, String> {
514 let mut run = self
515 .store
516 .load(workflow_id)
517 .await?
518 .ok_or("Workflow not found")?;
519
520 if run.status != WorkflowStatus::Paused {
521 return Err(format!("Workflow is not paused (status: {:?})", run.status));
522 }
523
524 run.resume_step(step_id, input)?;
525 self.store.save(&run).await?;
526
527 self.run_all(workflow_id).await
529 }
530
531 pub async fn get_state(&self, workflow_id: &str) -> Result<Option<WorkflowRun>, String> {
533 self.store.load(workflow_id).await
534 }
535}