1use crate::tooling::types::WorkflowToolContext;
2use crate::workflow::persistence::WorkflowWorkspace;
3use crate::workflow::types::{
4 InterventionRequest, InterventionStatus, NodeRunState, NodeStatus, RetryPolicy, TaskDefinition,
5 TaskTarget, TransformRegistry, WorkflowContext, WorkflowDefinition, WorkflowEdgeDefinition,
6 WorkflowEdgeTransition, WorkflowEvent, WorkflowEventListener, WorkflowFailurePolicy,
7 WorkflowNodeDefinition, WorkflowNodeKind, WorkflowRequest, WorkflowResponse, WorkflowRunState,
8 WorkflowStatus, WorkflowTaskRunner, WorkflowTransform,
9};
10use serde_json::{Map, Value, json};
11use std::collections::{BTreeMap, BTreeSet};
12use std::path::PathBuf;
13use std::sync::Arc;
14
15pub struct WorkflowRuntimeBuilder {
16 tasks: BTreeMap<String, TaskDefinition>,
17 workflows: BTreeMap<String, WorkflowDefinition>,
18 transforms: TransformRegistry,
19 workspace_home: Option<PathBuf>,
20 task_runner: Option<Arc<dyn WorkflowTaskRunner>>,
21 event_listener: Option<Arc<dyn WorkflowEventListener>>,
22}
23
24impl WorkflowRuntimeBuilder {
25 pub fn new() -> Self {
26 let mut transforms: TransformRegistry = BTreeMap::new();
27 transforms.insert("identity".to_string(), Arc::new(IdentityTransform));
28 transforms.insert(
29 "extract_content".to_string(),
30 Arc::new(ExtractContentTransform),
31 );
32 Self {
33 tasks: BTreeMap::new(),
34 workflows: BTreeMap::new(),
35 transforms,
36 workspace_home: None,
37 task_runner: None,
38 event_listener: None,
39 }
40 }
41
42 pub fn with_workspace_home(mut self, home: impl Into<PathBuf>) -> Self {
43 self.workspace_home = Some(home.into());
44 self
45 }
46
47 pub fn with_task_runner(mut self, runner: Arc<dyn WorkflowTaskRunner>) -> Self {
48 self.task_runner = Some(runner);
49 self
50 }
51
52 pub fn with_event_listener(mut self, listener: Arc<dyn WorkflowEventListener>) -> Self {
53 self.event_listener = Some(listener);
54 self
55 }
56
57 pub fn add_task(mut self, task: TaskDefinition) -> Self {
58 self.tasks.insert(task.id.clone(), task);
59 self
60 }
61
62 pub fn add_workflow(mut self, workflow: WorkflowDefinition) -> Self {
63 self.workflows.insert(workflow.id.clone(), workflow);
64 self
65 }
66
67 pub fn register_transform(
68 mut self,
69 transform_id: impl Into<String>,
70 transform: Arc<dyn WorkflowTransform>,
71 ) -> Self {
72 self.transforms.insert(transform_id.into(), transform);
73 self
74 }
75
76 pub async fn build(self) -> Result<WorkflowRuntime, String> {
77 let task_runner = self
78 .task_runner
79 .ok_or_else(|| "WorkflowRuntimeBuilder requires a task runner.".to_string())?;
80 let workspace =
81 WorkflowWorkspace::new(self.workspace_home.unwrap_or_else(|| PathBuf::from(".")));
82 workspace.ensure_dirs().await?;
83
84 let runtime = WorkflowRuntime {
85 tasks: self.tasks,
86 workflows: self.workflows,
87 transforms: self.transforms,
88 workspace,
89 task_runner,
90 event_listener: self.event_listener,
91 };
92 runtime.validate_all()?;
93 Ok(runtime)
94 }
95}
96
97impl Default for WorkflowRuntimeBuilder {
98 fn default() -> Self {
99 Self::new()
100 }
101}
102
103pub struct WorkflowRuntime {
104 tasks: BTreeMap<String, TaskDefinition>,
105 workflows: BTreeMap<String, WorkflowDefinition>,
106 transforms: TransformRegistry,
107 workspace: WorkflowWorkspace,
108 task_runner: Arc<dyn WorkflowTaskRunner>,
109 event_listener: Option<Arc<dyn WorkflowEventListener>>,
110}
111
112impl WorkflowRuntime {
113 pub fn builder() -> WorkflowRuntimeBuilder {
114 WorkflowRuntimeBuilder::new()
115 }
116
117 pub fn list_workflows(&self) -> Vec<&WorkflowDefinition> {
118 self.workflows.values().collect()
119 }
120
121 pub async fn list_runs(&self) -> Result<Vec<WorkflowRunState>, String> {
122 let mut runs = Vec::new();
123 for run_id in self.workspace.list_run_ids().await? {
124 if let Ok(state) = self.workspace.load_state(&run_id).await {
125 runs.push(state);
126 }
127 }
128 runs.sort_by(|left, right| left.run_id.cmp(&right.run_id));
129 Ok(runs)
130 }
131
132 pub async fn inspect(&self, run_id: &str) -> Result<WorkflowRunState, String> {
133 self.workspace.load_state(run_id).await
134 }
135
136 pub async fn start(&self, request: WorkflowRequest) -> Result<WorkflowResponse, String> {
137 let workflow = self
138 .workflows
139 .get(&request.workflow_id)
140 .cloned()
141 .ok_or_else(|| format!("Workflow '{}' not found.", request.workflow_id))?;
142 self.validate_workflow(&workflow)?;
143
144 let run_id = format!("wf-{}", current_timestamp_nanos());
145 self.workspace.initialize_run_dir(&run_id).await?;
146 self.workspace.save_definition(&run_id, &workflow).await?;
147
148 let mut context = WorkflowContext::default();
149 context.insert("input", request.input.clone());
150
151 let now = current_timestamp_nanos();
152 let mut node_states = BTreeMap::new();
153 for node in &workflow.nodes {
154 let task = self.resolve_task(node)?;
155 let output_key = output_key_for(node, task.as_ref());
156 node_states.insert(
157 node.id.clone(),
158 NodeRunState {
159 node_id: node.id.clone(),
160 status: NodeStatus::Pending,
161 attempts: 0,
162 started_at: None,
163 completed_at: None,
164 last_error: None,
165 output_key,
166 output: None,
167 activated_incoming: Vec::new(),
168 session_id: None,
169 },
170 );
171 }
172
173 let state = WorkflowRunState {
174 workflow_id: workflow.id.clone(),
175 run_id: run_id.clone(),
176 status: WorkflowStatus::Pending,
177 created_at: now,
178 updated_at: now,
179 input: request.input,
180 context,
181 node_states,
182 pending_interventions: Vec::new(),
183 failed_nodes: Vec::new(),
184 };
185 self.workspace.save_state(&run_id, &state).await?;
186 self.drive_run(workflow, state).await
187 }
188
189 pub async fn resume(&self, run_id: &str) -> Result<WorkflowResponse, String> {
190 let workflow = self.workspace.load_definition(run_id).await?;
191 self.validate_workflow(&workflow)?;
192 let state = self.workspace.load_state(run_id).await?;
193 self.drive_run(workflow, state).await
194 }
195
196 pub async fn submit_intervention(
197 &self,
198 run_id: &str,
199 intervention_id: &str,
200 response: Option<String>,
201 ) -> Result<WorkflowRunState, String> {
202 let mut state = self.workspace.load_state(run_id).await?;
203 let request = state
204 .pending_interventions
205 .iter_mut()
206 .find(|entry| entry.id == intervention_id)
207 .ok_or_else(|| {
208 format!(
209 "Intervention '{}' not found for run '{}'.",
210 intervention_id, run_id
211 )
212 })?;
213 request.response = response;
214 request.status = InterventionStatus::Resolved;
215 request.resolved_at = Some(current_timestamp_nanos());
216 state.updated_at = current_timestamp_nanos();
217 self.workspace.save_state(run_id, &state).await?;
218 Ok(state)
219 }
220
221 pub async fn list_pending_interventions(
222 &self,
223 run_id: &str,
224 ) -> Result<Vec<InterventionRequest>, String> {
225 let state = self.workspace.load_state(run_id).await?;
226 Ok(state
227 .pending_interventions
228 .into_iter()
229 .filter(|entry| entry.status == InterventionStatus::Pending)
230 .collect())
231 }
232
233 async fn drive_run(
234 &self,
235 workflow: WorkflowDefinition,
236 mut state: WorkflowRunState,
237 ) -> Result<WorkflowResponse, String> {
238 let mut events = Vec::new();
239 if state.status == WorkflowStatus::Pending {
240 state.status = WorkflowStatus::Running;
241 let event = WorkflowEvent::WorkflowStarted {
242 workflow_id: workflow.id.clone(),
243 run_id: state.run_id.clone(),
244 };
245 self.record_event(&state.run_id, &mut events, event).await?;
246 }
247
248 loop {
249 self.apply_resolved_interventions(&workflow, &mut state, &mut events)
250 .await?;
251
252 if matches!(
253 state.status,
254 WorkflowStatus::Failed
255 | WorkflowStatus::Completed
256 | WorkflowStatus::CompletedWithFailures
257 ) {
258 break;
259 }
260
261 if state.status == WorkflowStatus::Paused {
262 if state
263 .pending_interventions
264 .iter()
265 .any(|entry| entry.status == InterventionStatus::Pending)
266 {
267 break;
268 }
269 state.status = WorkflowStatus::Running;
270 }
271
272 let skipped = self
273 .reconcile_skipped_nodes(&workflow, &mut state, &mut events)
274 .await?;
275 let ready = self.collect_ready_nodes(&workflow, &state)?;
276 if ready.is_empty() {
277 if !skipped {
278 self.finalize_if_quiescent(&mut state, &mut events).await?;
279 break;
280 }
281 continue;
282 }
283
284 for node_id in ready {
285 if state.status == WorkflowStatus::Paused || state.status == WorkflowStatus::Failed
286 {
287 break;
288 }
289 let event = WorkflowEvent::NodeReady {
290 node_id: node_id.clone(),
291 };
292 self.record_event(&state.run_id, &mut events, event).await?;
293 self.execute_node(&workflow, &node_id, &mut state, &mut events)
294 .await?;
295 }
296 }
297
298 state.updated_at = current_timestamp_nanos();
299 self.workspace.save_state(&state.run_id, &state).await?;
300 Ok(WorkflowResponse {
301 workflow_id: state.workflow_id.clone(),
302 run_id: state.run_id.clone(),
303 status: state.status.clone(),
304 context: state.context.clone(),
305 events,
306 })
307 }
308
309 fn validate_all(&self) -> Result<(), String> {
310 for workflow in self.workflows.values() {
311 self.validate_workflow(workflow)?;
312 }
313 Ok(())
314 }
315
316 fn validate_workflow(&self, workflow: &WorkflowDefinition) -> Result<(), String> {
317 let mut node_ids = BTreeSet::new();
318 for node in &workflow.nodes {
319 if !node_ids.insert(node.id.clone()) {
320 return Err(format!(
321 "Workflow '{}' has duplicate node '{}'.",
322 workflow.id, node.id
323 ));
324 }
325 match &node.kind {
326 WorkflowNodeKind::Task { task_id, task } => match (task_id, task) {
327 (Some(task_id), None) => {
328 if !self.tasks.contains_key(task_id) {
329 return Err(format!(
330 "Workflow '{}' references unknown task '{}'.",
331 workflow.id, task_id
332 ));
333 }
334 }
335 (None, Some(task)) => self.validate_task(task)?,
336 (Some(_), Some(_)) => {
337 return Err(format!(
338 "Workflow '{}' node '{}' cannot define both task_id and inline task.",
339 workflow.id, node.id
340 ));
341 }
342 (None, None) => {
343 return Err(format!(
344 "Workflow '{}' node '{}' must define a task_id or inline task.",
345 workflow.id, node.id
346 ));
347 }
348 },
349 WorkflowNodeKind::Decision { condition } => {
350 if condition.trim().is_empty() {
351 return Err(format!(
352 "Workflow '{}' decision node '{}' must define a condition.",
353 workflow.id, node.id
354 ));
355 }
356 }
357 WorkflowNodeKind::HumanGate { prompt } => {
358 if prompt.trim().is_empty() {
359 return Err(format!(
360 "Workflow '{}' human_gate node '{}' must define a prompt.",
361 workflow.id, node.id
362 ));
363 }
364 }
365 WorkflowNodeKind::Transform { transform_id, .. } => {
366 if !self.transforms.contains_key(transform_id) {
367 return Err(format!(
368 "Workflow '{}' transform node '{}' references unknown transform '{}'.",
369 workflow.id, node.id, transform_id
370 ));
371 }
372 }
373 WorkflowNodeKind::Join => {}
374 }
375 }
376 let mut adjacency: BTreeMap<String, Vec<String>> = BTreeMap::new();
377 let mut incoming_counts: BTreeMap<String, usize> = workflow
378 .nodes
379 .iter()
380 .map(|node| (node.id.clone(), 0usize))
381 .collect();
382
383 for edge in &workflow.edges {
384 if !node_ids.contains(&edge.from) || !node_ids.contains(&edge.to) {
385 return Err(format!(
386 "Workflow '{}' edge '{}' -> '{}' references an unknown node.",
387 workflow.id, edge.from, edge.to
388 ));
389 }
390 adjacency
391 .entry(edge.from.clone())
392 .or_default()
393 .push(edge.to.clone());
394 *incoming_counts.entry(edge.to.clone()).or_default() += 1;
395 }
396
397 for node in &workflow.nodes {
398 if matches!(node.kind, WorkflowNodeKind::Join)
399 && incoming_counts.get(&node.id).copied().unwrap_or_default() == 0
400 {
401 return Err(format!(
402 "Workflow '{}' join node '{}' must have at least one incoming edge.",
403 workflow.id, node.id
404 ));
405 }
406 }
407
408 let mut visiting = BTreeSet::new();
409 let mut visited = BTreeSet::new();
410 for node in &workflow.nodes {
411 self.visit_for_cycle(&node.id, &adjacency, &mut visiting, &mut visited)?;
412 }
413 Ok(())
414 }
415
416 fn visit_for_cycle(
417 &self,
418 node_id: &str,
419 adjacency: &BTreeMap<String, Vec<String>>,
420 visiting: &mut BTreeSet<String>,
421 visited: &mut BTreeSet<String>,
422 ) -> Result<(), String> {
423 if visited.contains(node_id) {
424 return Ok(());
425 }
426 if !visiting.insert(node_id.to_string()) {
427 return Err(format!(
428 "Workflow graph contains a cycle at node '{}'.",
429 node_id
430 ));
431 }
432 if let Some(children) = adjacency.get(node_id) {
433 for child in children {
434 self.visit_for_cycle(child, adjacency, visiting, visited)?;
435 }
436 }
437 visiting.remove(node_id);
438 visited.insert(node_id.to_string());
439 Ok(())
440 }
441
442 fn validate_task(&self, task: &TaskDefinition) -> Result<(), String> {
443 match &task.target {
444 TaskTarget::AgentId(agent_id) if agent_id.trim().is_empty() => {
445 Err("Task target agent_id cannot be empty.".to_string())
446 }
447 TaskTarget::Capabilities(capabilities) if capabilities.is_empty() => {
448 Err("Task target capabilities cannot be empty.".to_string())
449 }
450 _ => Ok(()),
451 }
452 }
453
454 fn resolve_task(
455 &self,
456 node: &WorkflowNodeDefinition,
457 ) -> Result<Option<TaskDefinition>, String> {
458 match &node.kind {
459 WorkflowNodeKind::Task { task_id, task } => match (task_id, task) {
460 (Some(task_id), None) => self
461 .tasks
462 .get(task_id)
463 .cloned()
464 .map(Some)
465 .ok_or_else(|| format!("Unknown task '{}'.", task_id)),
466 (None, Some(task)) => Ok(Some(task.clone())),
467 _ => Ok(None),
468 },
469 _ => Ok(None),
470 }
471 }
472
473 fn collect_ready_nodes(
474 &self,
475 workflow: &WorkflowDefinition,
476 state: &WorkflowRunState,
477 ) -> Result<Vec<String>, String> {
478 let mut ready = Vec::new();
479 for node in &workflow.nodes {
480 let node_state = state
481 .node_states
482 .get(&node.id)
483 .ok_or_else(|| format!("Missing node state for '{}'.", node.id))?;
484 if node_state.status != NodeStatus::Pending {
485 continue;
486 }
487 let incoming: Vec<&WorkflowEdgeDefinition> = workflow
488 .edges
489 .iter()
490 .filter(|edge| edge.to == node.id)
491 .collect();
492 if incoming.is_empty() {
493 ready.push(node.id.clone());
494 continue;
495 }
496
497 let mut all_terminal = true;
498 let mut activated = false;
499 for edge in incoming {
500 let from_state = state
501 .node_states
502 .get(&edge.from)
503 .ok_or_else(|| format!("Missing node state for '{}'.", edge.from))?;
504 if !from_state.status.is_terminal() {
505 all_terminal = false;
506 break;
507 }
508 if edge_is_active(edge, from_state, &state.context) {
509 activated = true;
510 }
511 }
512
513 if all_terminal && activated {
514 ready.push(node.id.clone());
515 }
516 }
517 Ok(ready)
518 }
519
520 async fn reconcile_skipped_nodes(
521 &self,
522 workflow: &WorkflowDefinition,
523 state: &mut WorkflowRunState,
524 events: &mut Vec<WorkflowEvent>,
525 ) -> Result<bool, String> {
526 let mut any_skipped = false;
527 for node in &workflow.nodes {
528 let current = state
529 .node_states
530 .get(&node.id)
531 .ok_or_else(|| format!("Missing node state for '{}'.", node.id))?
532 .status
533 .clone();
534 if current != NodeStatus::Pending {
535 continue;
536 }
537 let incoming: Vec<&WorkflowEdgeDefinition> = workflow
538 .edges
539 .iter()
540 .filter(|edge| edge.to == node.id)
541 .collect();
542 if incoming.is_empty() {
543 continue;
544 }
545 let mut all_terminal = true;
546 let mut activated = false;
547 for edge in incoming {
548 let from_state = state
549 .node_states
550 .get(&edge.from)
551 .ok_or_else(|| format!("Missing node state for '{}'.", edge.from))?;
552 if !from_state.status.is_terminal() {
553 all_terminal = false;
554 break;
555 }
556 if edge_is_active(edge, from_state, &state.context) {
557 activated = true;
558 }
559 }
560 if all_terminal && !activated {
561 if let Some(node_state) = state.node_states.get_mut(&node.id) {
562 node_state.status = NodeStatus::Skipped;
563 node_state.completed_at = Some(current_timestamp_nanos());
564 }
565 any_skipped = true;
566 self.record_event(
567 &state.run_id,
568 events,
569 WorkflowEvent::NodeSkipped {
570 node_id: node.id.clone(),
571 },
572 )
573 .await?;
574 }
575 }
576 Ok(any_skipped)
577 }
578
579 async fn finalize_if_quiescent(
580 &self,
581 state: &mut WorkflowRunState,
582 events: &mut Vec<WorkflowEvent>,
583 ) -> Result<(), String> {
584 if state
585 .pending_interventions
586 .iter()
587 .any(|entry| entry.status == InterventionStatus::Pending)
588 {
589 state.status = WorkflowStatus::Paused;
590 self.record_event(
591 &state.run_id,
592 events,
593 WorkflowEvent::WorkflowPaused {
594 run_id: state.run_id.clone(),
595 reason: "Waiting for intervention".to_string(),
596 },
597 )
598 .await?;
599 return Ok(());
600 }
601
602 state.status = if state.failed_nodes.is_empty() {
603 WorkflowStatus::Completed
604 } else {
605 WorkflowStatus::CompletedWithFailures
606 };
607 self.record_event(
608 &state.run_id,
609 events,
610 WorkflowEvent::WorkflowCompleted {
611 run_id: state.run_id.clone(),
612 status: state.status.clone(),
613 },
614 )
615 .await
616 }
617
618 async fn execute_node(
619 &self,
620 workflow: &WorkflowDefinition,
621 node_id: &str,
622 state: &mut WorkflowRunState,
623 events: &mut Vec<WorkflowEvent>,
624 ) -> Result<(), String> {
625 let node = workflow
626 .nodes
627 .iter()
628 .find(|candidate| candidate.id == node_id)
629 .ok_or_else(|| format!("Workflow node '{}' not found.", node_id))?;
630 let task = self.resolve_task(node)?;
631 let attempt = state
632 .node_states
633 .get(node_id)
634 .map(|entry| entry.attempts + 1)
635 .unwrap_or(1);
636 let output_key = output_key_for(node, task.as_ref());
637 let session_id = format!("wf-{}-{}-attempt-{}", state.run_id, node_id, attempt);
638
639 let activated_incoming = workflow
640 .edges
641 .iter()
642 .filter(|edge| edge.to == node_id)
643 .filter_map(|edge| {
644 let from_state = state.node_states.get(&edge.from)?;
645 if edge_is_active(edge, from_state, &state.context) {
646 Some(edge.from.clone())
647 } else {
648 None
649 }
650 })
651 .collect::<Vec<_>>();
652
653 {
654 let node_state = state
655 .node_states
656 .get_mut(node_id)
657 .ok_or_else(|| format!("Missing node state for '{}'.", node_id))?;
658 node_state.status = NodeStatus::Running;
659 node_state.attempts = attempt;
660 node_state.started_at = Some(current_timestamp_nanos());
661 node_state.session_id = Some(session_id.clone());
662 node_state.output_key = output_key.clone();
663 node_state.activated_incoming = activated_incoming;
664 }
665
666 self.record_event(
667 &state.run_id,
668 events,
669 WorkflowEvent::NodeStarted {
670 node_id: node_id.to_string(),
671 attempt,
672 },
673 )
674 .await?;
675
676 let result = match &node.kind {
677 WorkflowNodeKind::Task { .. } => {
678 let task =
679 task.ok_or_else(|| format!("Workflow node '{}' is missing a task.", node_id))?;
680 self.execute_task_node(state, node, &task, attempt, &session_id)
681 .await
682 }
683 WorkflowNodeKind::Decision { condition } => {
684 self.execute_decision_node(state, condition)
685 }
686 WorkflowNodeKind::HumanGate { prompt } => {
687 self.execute_human_gate_node(state, node, prompt).await
688 }
689 WorkflowNodeKind::Transform {
690 transform_id,
691 input_key,
692 } => {
693 self.execute_transform_node(state, transform_id, input_key.as_deref())
694 .await
695 }
696 WorkflowNodeKind::Join => Ok(json!({
697 "joined": state
698 .node_states
699 .get(node_id)
700 .map(|entry| entry.activated_incoming.clone())
701 .unwrap_or_default()
702 })),
703 };
704
705 match result {
706 Ok(output) => {
707 state.context.insert(output_key.clone(), output.clone());
708 if let Some(node_state) = state.node_states.get_mut(node_id) {
709 node_state.status = NodeStatus::Completed;
710 node_state.completed_at = Some(current_timestamp_nanos());
711 node_state.output = Some(output);
712 node_state.last_error = None;
713 }
714 self.record_event(
715 &state.run_id,
716 events,
717 WorkflowEvent::NodeCompleted {
718 node_id: node_id.to_string(),
719 output_key,
720 },
721 )
722 .await?;
723 }
724 Err(error) => {
725 if state.status == WorkflowStatus::Paused {
726 if let Some(intervention) = state
727 .pending_interventions
728 .iter()
729 .find(|entry| {
730 entry.node_id == node.id && entry.status == InterventionStatus::Pending
731 })
732 .cloned()
733 {
734 self.record_event(
735 &state.run_id,
736 events,
737 WorkflowEvent::InterventionRequested {
738 intervention_id: intervention.id,
739 node_id: node.id.clone(),
740 reason: intervention.reason,
741 },
742 )
743 .await?;
744 }
745 self.record_event(
746 &state.run_id,
747 events,
748 WorkflowEvent::WorkflowPaused {
749 run_id: state.run_id.clone(),
750 reason: error,
751 },
752 )
753 .await?;
754 } else {
755 self.handle_node_failure(workflow, state, node, attempt, error, events)
756 .await?;
757 }
758 }
759 }
760
761 state.updated_at = current_timestamp_nanos();
762 self.workspace.save_state(&state.run_id, state).await
763 }
764 async fn execute_task_node(
765 &self,
766 state: &WorkflowRunState,
767 node: &WorkflowNodeDefinition,
768 task: &TaskDefinition,
769 attempt: usize,
770 session_id: &str,
771 ) -> Result<Value, String> {
772 let mut input = if task.input_bindings.is_empty() {
773 state
774 .context
775 .get("input")
776 .cloned()
777 .unwrap_or_else(|| json!({}))
778 } else {
779 let mut map = Map::new();
780 for (alias, path) in &task.input_bindings {
781 map.insert(
782 alias.clone(),
783 state.context.lookup_path(path).unwrap_or(Value::Null),
784 );
785 }
786 Value::Object(map)
787 };
788
789 if let Some(transform_id) = &task.input_transform {
790 input = self
791 .apply_transform(transform_id, &input, &state.context)
792 .await?;
793 }
794
795 let prompt = render_prompt(&task.prompt, &input, &state.context);
796 let metadata = WorkflowToolContext {
797 workflow_id: state.workflow_id.clone(),
798 run_id: state.run_id.clone(),
799 node_id: node.id.clone(),
800 attempt,
801 };
802 let workspace_dir = self.workspace.task_workspace(&state.run_id, &node.id);
803 #[cfg(not(target_arch = "wasm32"))]
804 tokio::fs::create_dir_all(&workspace_dir)
805 .await
806 .map_err(|e| format!("Failed to create workflow task workspace: {e}"))?;
807
808 let result = self
809 .task_runner
810 .run_task(&task.target, &metadata, &workspace_dir, &prompt)
811 .await?;
812
813 let mut output = result.value;
814 if let Some(transform_id) = &task.output_transform {
815 output = self
816 .apply_transform(transform_id, &output, &state.context)
817 .await?;
818 }
819 if output.is_null() {
820 output = json!({
821 "content": result.content,
822 "agent_id": result.agent_id,
823 "session_id": session_id,
824 "attempt": attempt,
825 });
826 }
827 Ok(output)
828 }
829
830 fn execute_decision_node(
831 &self,
832 state: &WorkflowRunState,
833 condition: &str,
834 ) -> Result<Value, String> {
835 Ok(json!({
836 "matched": evaluate_condition(condition, &state.context)
837 }))
838 }
839
840 async fn execute_human_gate_node(
841 &self,
842 state: &mut WorkflowRunState,
843 node: &WorkflowNodeDefinition,
844 prompt: &str,
845 ) -> Result<Value, String> {
846 if let Some(existing) = state
847 .pending_interventions
848 .iter()
849 .find(|entry| entry.node_id == node.id && entry.status == InterventionStatus::Resolved)
850 {
851 let response = existing.response.clone().unwrap_or_default();
852 return Ok(json!({
853 "response": response.clone(),
854 "approved": is_truthy(&Value::String(response)),
855 }));
856 }
857
858 let intervention_id = format!("int-{}", current_timestamp_nanos());
859 state.pending_interventions.push(InterventionRequest {
860 id: intervention_id,
861 workflow_id: state.workflow_id.clone(),
862 run_id: state.run_id.clone(),
863 node_id: node.id.clone(),
864 prompt: prompt.to_string(),
865 reason: "human_gate".to_string(),
866 response: None,
867 status: InterventionStatus::Pending,
868 created_at: current_timestamp_nanos(),
869 resolved_at: None,
870 });
871 if let Some(node_state) = state.node_states.get_mut(&node.id) {
872 node_state.status = NodeStatus::Paused;
873 }
874 state.status = WorkflowStatus::Paused;
875 Err("Human intervention required".to_string())
876 }
877
878 async fn execute_transform_node(
879 &self,
880 state: &WorkflowRunState,
881 transform_id: &str,
882 input_key: Option<&str>,
883 ) -> Result<Value, String> {
884 let input = match input_key {
885 Some(key) => state.context.lookup_path(key).unwrap_or(Value::Null),
886 None => state.context.to_value(),
887 };
888 self.apply_transform(transform_id, &input, &state.context)
889 .await
890 }
891
892 async fn handle_node_failure(
893 &self,
894 workflow: &WorkflowDefinition,
895 state: &mut WorkflowRunState,
896 node: &WorkflowNodeDefinition,
897 attempt: usize,
898 error: String,
899 events: &mut Vec<WorkflowEvent>,
900 ) -> Result<(), String> {
901 let retry_policy =
902 effective_retry_policy(workflow, node, self.resolve_task(node)?.as_ref());
903 if attempt < retry_policy.max_attempts {
904 if let Some(node_state) = state.node_states.get_mut(&node.id) {
905 node_state.status = NodeStatus::Pending;
906 node_state.last_error = Some(error.clone());
907 }
908 self.record_event(
909 &state.run_id,
910 events,
911 WorkflowEvent::NodeRetryScheduled {
912 node_id: node.id.clone(),
913 attempt,
914 error,
915 },
916 )
917 .await?;
918 return Ok(());
919 }
920
921 let failure_policy =
922 effective_failure_policy(workflow, node, self.resolve_task(node)?.as_ref());
923 if let Some(node_state) = state.node_states.get_mut(&node.id) {
924 node_state.last_error = Some(error.clone());
925 node_state.completed_at = Some(current_timestamp_nanos());
926 }
927
928 match failure_policy {
929 WorkflowFailurePolicy::ContinueBestEffort => {
930 if let Some(node_state) = state.node_states.get_mut(&node.id) {
931 node_state.status = NodeStatus::Failed;
932 }
933 if !state.failed_nodes.contains(&node.id) {
934 state.failed_nodes.push(node.id.clone());
935 }
936 self.record_event(
937 &state.run_id,
938 events,
939 WorkflowEvent::NodeFailed {
940 node_id: node.id.clone(),
941 error,
942 },
943 )
944 .await?;
945 }
946 WorkflowFailurePolicy::FailWorkflow => {
947 if let Some(node_state) = state.node_states.get_mut(&node.id) {
948 node_state.status = NodeStatus::Failed;
949 }
950 if !state.failed_nodes.contains(&node.id) {
951 state.failed_nodes.push(node.id.clone());
952 }
953 state.status = WorkflowStatus::Failed;
954 self.record_event(
955 &state.run_id,
956 events,
957 WorkflowEvent::NodeFailed {
958 node_id: node.id.clone(),
959 error: error.clone(),
960 },
961 )
962 .await?;
963 self.record_event(
964 &state.run_id,
965 events,
966 WorkflowEvent::WorkflowCompleted {
967 run_id: state.run_id.clone(),
968 status: WorkflowStatus::Failed,
969 },
970 )
971 .await?;
972 }
973 WorkflowFailurePolicy::PauseForIntervention => {
974 if let Some(node_state) = state.node_states.get_mut(&node.id) {
975 node_state.status = NodeStatus::Paused;
976 }
977 let intervention_id = format!("int-{}", current_timestamp_nanos());
978 state.pending_interventions.push(InterventionRequest {
979 id: intervention_id.clone(),
980 workflow_id: state.workflow_id.clone(),
981 run_id: state.run_id.clone(),
982 node_id: node.id.clone(),
983 prompt: format!(
984 "Node '{}' failed after {} attempt(s). Reply with retry, skip, continue, or fail.",
985 node.id, attempt
986 ),
987 reason: format!("step_failure:{error}"),
988 response: None,
989 status: InterventionStatus::Pending,
990 created_at: current_timestamp_nanos(),
991 resolved_at: None,
992 });
993 state.status = WorkflowStatus::Paused;
994 self.record_event(
995 &state.run_id,
996 events,
997 WorkflowEvent::InterventionRequested {
998 intervention_id,
999 node_id: node.id.clone(),
1000 reason: error,
1001 },
1002 )
1003 .await?;
1004 }
1005 }
1006 Ok(())
1007 }
1008
1009 async fn apply_resolved_interventions(
1010 &self,
1011 workflow: &WorkflowDefinition,
1012 state: &mut WorkflowRunState,
1013 events: &mut Vec<WorkflowEvent>,
1014 ) -> Result<(), String> {
1015 let mut remaining = Vec::new();
1016 let interventions = std::mem::take(&mut state.pending_interventions);
1017 for intervention in interventions {
1018 if intervention.status == InterventionStatus::Pending {
1019 remaining.push(intervention);
1020 continue;
1021 }
1022
1023 self.record_event(
1024 &state.run_id,
1025 events,
1026 WorkflowEvent::InterventionResolved {
1027 intervention_id: intervention.id.clone(),
1028 node_id: intervention.node_id.clone(),
1029 },
1030 )
1031 .await?;
1032
1033 let node = workflow
1034 .nodes
1035 .iter()
1036 .find(|candidate| candidate.id == intervention.node_id)
1037 .ok_or_else(|| format!("Workflow node '{}' not found.", intervention.node_id))?;
1038
1039 if intervention.reason == "human_gate" {
1040 let response = intervention.response.clone().unwrap_or_default();
1041 let output = json!({
1042 "response": response.clone(),
1043 "approved": is_truthy(&Value::String(response)),
1044 });
1045 if let Some(node_state) = state.node_states.get_mut(&node.id) {
1046 node_state.status = NodeStatus::Completed;
1047 node_state.completed_at = Some(current_timestamp_nanos());
1048 node_state.output = Some(output.clone());
1049 node_state.last_error = None;
1050 }
1051 let output_key = state
1052 .node_states
1053 .get(&node.id)
1054 .map(|entry| entry.output_key.clone())
1055 .unwrap_or_else(|| node.id.clone());
1056 state.context.insert(output_key.clone(), output);
1057 self.record_event(
1058 &state.run_id,
1059 events,
1060 WorkflowEvent::NodeCompleted {
1061 node_id: node.id.clone(),
1062 output_key,
1063 },
1064 )
1065 .await?;
1066 continue;
1067 }
1068
1069 match intervention
1070 .response
1071 .clone()
1072 .unwrap_or_default()
1073 .trim()
1074 .to_ascii_lowercase()
1075 .as_str()
1076 {
1077 "retry" => {
1078 if let Some(node_state) = state.node_states.get_mut(&node.id) {
1079 node_state.status = NodeStatus::Pending;
1080 }
1081 state.status = WorkflowStatus::Running;
1082 }
1083 "skip" => {
1084 if let Some(node_state) = state.node_states.get_mut(&node.id) {
1085 node_state.status = NodeStatus::Skipped;
1086 node_state.completed_at = Some(current_timestamp_nanos());
1087 }
1088 state.status = WorkflowStatus::Running;
1089 self.record_event(
1090 &state.run_id,
1091 events,
1092 WorkflowEvent::NodeSkipped {
1093 node_id: node.id.clone(),
1094 },
1095 )
1096 .await?;
1097 }
1098 "continue" => {
1099 if let Some(node_state) = state.node_states.get_mut(&node.id) {
1100 node_state.status = NodeStatus::Failed;
1101 node_state.completed_at = Some(current_timestamp_nanos());
1102 }
1103 if !state.failed_nodes.contains(&node.id) {
1104 state.failed_nodes.push(node.id.clone());
1105 }
1106 state.status = WorkflowStatus::Running;
1107 }
1108 _ => {
1109 if let Some(node_state) = state.node_states.get_mut(&node.id) {
1110 node_state.status = NodeStatus::Failed;
1111 node_state.completed_at = Some(current_timestamp_nanos());
1112 }
1113 if !state.failed_nodes.contains(&node.id) {
1114 state.failed_nodes.push(node.id.clone());
1115 }
1116 state.status = WorkflowStatus::Failed;
1117 }
1118 }
1119 }
1120 state.pending_interventions = remaining;
1121 Ok(())
1122 }
1123
1124 async fn apply_transform(
1125 &self,
1126 transform_id: &str,
1127 input: &Value,
1128 context: &WorkflowContext,
1129 ) -> Result<Value, String> {
1130 let transform = self
1131 .transforms
1132 .get(transform_id)
1133 .ok_or_else(|| format!("Unknown transform '{}'.", transform_id))?;
1134 transform.apply(input, context).await
1135 }
1136
1137 async fn record_event(
1138 &self,
1139 run_id: &str,
1140 events: &mut Vec<WorkflowEvent>,
1141 event: WorkflowEvent,
1142 ) -> Result<(), String> {
1143 self.workspace.append_event(run_id, &event).await?;
1144 events.push(event.clone());
1145 if let Some(listener) = &self.event_listener {
1146 listener.on_event(&event).await?;
1147 }
1148 Ok(())
1149 }
1150}
1151struct IdentityTransform;
1152
1153#[async_trait::async_trait(?Send)]
1154impl WorkflowTransform for IdentityTransform {
1155 async fn apply(&self, input: &Value, _context: &WorkflowContext) -> Result<Value, String> {
1156 Ok(input.clone())
1157 }
1158}
1159
1160struct ExtractContentTransform;
1161
1162#[async_trait::async_trait(?Send)]
1163impl WorkflowTransform for ExtractContentTransform {
1164 async fn apply(&self, input: &Value, _context: &WorkflowContext) -> Result<Value, String> {
1165 match input {
1166 Value::Object(map) => Ok(map.get("content").cloned().unwrap_or_else(|| input.clone())),
1167 _ => Ok(input.clone()),
1168 }
1169 }
1170}
1171
1172fn output_key_for(node: &WorkflowNodeDefinition, task: Option<&TaskDefinition>) -> String {
1173 node.output_key
1174 .clone()
1175 .or_else(|| task.and_then(|task| task.output_key.clone()))
1176 .unwrap_or_else(|| node.id.clone())
1177}
1178
1179fn effective_retry_policy(
1180 workflow: &WorkflowDefinition,
1181 node: &WorkflowNodeDefinition,
1182 task: Option<&TaskDefinition>,
1183) -> RetryPolicy {
1184 node.retry_policy
1185 .clone()
1186 .or_else(|| task.and_then(|task| task.retry_policy.clone()))
1187 .or_else(|| workflow.retry_policy.clone())
1188 .unwrap_or_default()
1189}
1190
1191fn effective_failure_policy(
1192 workflow: &WorkflowDefinition,
1193 node: &WorkflowNodeDefinition,
1194 task: Option<&TaskDefinition>,
1195) -> WorkflowFailurePolicy {
1196 node.failure_policy
1197 .clone()
1198 .or_else(|| task.and_then(|task| task.failure_policy.clone()))
1199 .or_else(|| workflow.failure_policy.clone())
1200 .unwrap_or_default()
1201}
1202
1203fn edge_is_active(
1204 edge: &WorkflowEdgeDefinition,
1205 from_state: &NodeRunState,
1206 context: &WorkflowContext,
1207) -> bool {
1208 match &edge.transition {
1209 WorkflowEdgeTransition::Always => from_state.status.is_terminal(),
1210 WorkflowEdgeTransition::OnSuccess => from_state.status == NodeStatus::Completed,
1211 WorkflowEdgeTransition::OnFailure => from_state.status == NodeStatus::Failed,
1212 WorkflowEdgeTransition::Condition(condition) => {
1213 from_state.status == NodeStatus::Completed && evaluate_condition(condition, context)
1214 }
1215 }
1216}
1217
1218fn evaluate_condition(condition: &str, context: &WorkflowContext) -> bool {
1219 let condition = condition.trim();
1220 if let Some((left, right)) = condition.split_once("==") {
1221 return normalize_condition_value(context.lookup_path(normalize_path(left.trim())))
1222 == parse_literal(right.trim());
1223 }
1224 if let Some((left, right)) = condition.split_once("!=") {
1225 return normalize_condition_value(context.lookup_path(normalize_path(left.trim())))
1226 != parse_literal(right.trim());
1227 }
1228 if let Some(path) = condition.strip_prefix('!') {
1229 return !is_truthy(
1230 &context
1231 .lookup_path(normalize_path(path.trim()))
1232 .unwrap_or(Value::Null),
1233 );
1234 }
1235 is_truthy(
1236 &context
1237 .lookup_path(normalize_path(condition))
1238 .unwrap_or(Value::Null),
1239 )
1240}
1241
1242fn normalize_path(path: &str) -> &str {
1243 path.strip_prefix("context.").unwrap_or(path)
1244}
1245
1246fn normalize_condition_value(value: Option<Value>) -> Value {
1247 value.unwrap_or(Value::Null)
1248}
1249
1250fn parse_literal(raw: &str) -> Value {
1251 let raw = raw.trim();
1252 if (raw.starts_with('"') && raw.ends_with('"'))
1253 || (raw.starts_with('\'') && raw.ends_with('\''))
1254 {
1255 return Value::String(raw[1..raw.len().saturating_sub(1)].to_string());
1256 }
1257 if raw.eq_ignore_ascii_case("true") {
1258 return Value::Bool(true);
1259 }
1260 if raw.eq_ignore_ascii_case("false") {
1261 return Value::Bool(false);
1262 }
1263 if raw.eq_ignore_ascii_case("null") {
1264 return Value::Null;
1265 }
1266 if let Ok(number) = raw.parse::<i64>() {
1267 return json!(number);
1268 }
1269 if let Ok(number) = raw.parse::<f64>() {
1270 return json!(number);
1271 }
1272 Value::String(raw.to_string())
1273}
1274fn render_prompt(template: &str, input: &Value, context: &WorkflowContext) -> String {
1275 let mut rendered = String::new();
1276 let mut remaining = template;
1277 while let Some(start) = remaining.find("{{") {
1278 rendered.push_str(&remaining[..start]);
1279 let after_start = &remaining[start + 2..];
1280 if let Some(end) = after_start.find("}}") {
1281 let key = after_start[..end].trim();
1282 let replacement = resolve_template_value(key, input, context);
1283 rendered.push_str(&replacement);
1284 remaining = &after_start[end + 2..];
1285 } else {
1286 rendered.push_str(&remaining[start..]);
1287 remaining = "";
1288 break;
1289 }
1290 }
1291 rendered.push_str(remaining);
1292
1293 if !matches!(input, Value::Null)
1294 && !(input.is_object() && input.as_object().map(|map| map.is_empty()).unwrap_or(false))
1295 && !template.contains("{{")
1296 {
1297 if !rendered.trim().is_empty() {
1298 rendered.push_str("\n\n");
1299 }
1300 rendered.push_str("Workflow input:\n");
1301 rendered
1302 .push_str(&serde_json::to_string_pretty(input).unwrap_or_else(|_| input.to_string()));
1303 }
1304
1305 rendered
1306}
1307
1308fn resolve_template_value(key: &str, input: &Value, context: &WorkflowContext) -> String {
1309 if let Some(path) = key.strip_prefix("input.") {
1310 if let Some(value) = lookup_value(input, path) {
1311 return value_to_template_string(&value);
1312 }
1313 }
1314 if let Some(value) = context.lookup_path(normalize_path(key)) {
1315 return value_to_template_string(&value);
1316 }
1317 String::new()
1318}
1319
1320fn lookup_value(root: &Value, path: &str) -> Option<Value> {
1321 let mut current = root.clone();
1322 for segment in path.split('.') {
1323 current = match current {
1324 Value::Object(map) => map.get(segment)?.clone(),
1325 _ => return None,
1326 };
1327 }
1328 Some(current)
1329}
1330
1331fn value_to_template_string(value: &Value) -> String {
1332 match value {
1333 Value::Null => String::new(),
1334 Value::String(value) => value.clone(),
1335 _ => value.to_string(),
1336 }
1337}
1338
1339fn is_truthy(value: &Value) -> bool {
1340 match value {
1341 Value::Null => false,
1342 Value::Bool(value) => *value,
1343 Value::Number(number) => number.as_f64().map(|value| value != 0.0).unwrap_or(false),
1344 Value::String(value) => {
1345 let normalized = value.trim().to_ascii_lowercase();
1346 !(normalized.is_empty()
1347 || normalized == "false"
1348 || normalized == "0"
1349 || normalized == "no")
1350 }
1351 Value::Array(values) => !values.is_empty(),
1352 Value::Object(values) => !values.is_empty(),
1353 }
1354}
1355
1356fn current_timestamp_nanos() -> u128 {
1357 std::time::SystemTime::now()
1358 .duration_since(std::time::UNIX_EPOCH)
1359 .map(|duration| duration.as_nanos())
1360 .unwrap_or_default()
1361}