Skip to main content

enki_next/workflow/
runtime.rs

1use crate::tooling::types::WorkflowToolContext;
2use crate::workflow::persistence::WorkflowWorkspace;
3use crate::workflow::types::{
4    InterventionRequest, InterventionStatus, NodeRunState, NodeStatus, RetryPolicy, TaskDefinition,
5    TaskTarget, TransformRegistry, WorkflowContext, WorkflowDefinition, WorkflowEdgeDefinition,
6    WorkflowEdgeTransition, WorkflowEvent, WorkflowEventListener, WorkflowFailurePolicy,
7    WorkflowNodeDefinition, WorkflowNodeKind, WorkflowRequest, WorkflowResponse, WorkflowRunState,
8    WorkflowStatus, WorkflowTaskRunner, WorkflowTransform,
9};
10use serde_json::{Map, Value, json};
11use std::collections::{BTreeMap, BTreeSet};
12use std::path::PathBuf;
13use std::sync::Arc;
14
15pub struct WorkflowRuntimeBuilder {
16    tasks: BTreeMap<String, TaskDefinition>,
17    workflows: BTreeMap<String, WorkflowDefinition>,
18    transforms: TransformRegistry,
19    workspace_home: Option<PathBuf>,
20    task_runner: Option<Arc<dyn WorkflowTaskRunner>>,
21    event_listener: Option<Arc<dyn WorkflowEventListener>>,
22}
23
24impl WorkflowRuntimeBuilder {
25    pub fn new() -> Self {
26        let mut transforms: TransformRegistry = BTreeMap::new();
27        transforms.insert("identity".to_string(), Arc::new(IdentityTransform));
28        transforms.insert(
29            "extract_content".to_string(),
30            Arc::new(ExtractContentTransform),
31        );
32        Self {
33            tasks: BTreeMap::new(),
34            workflows: BTreeMap::new(),
35            transforms,
36            workspace_home: None,
37            task_runner: None,
38            event_listener: None,
39        }
40    }
41
42    pub fn with_workspace_home(mut self, home: impl Into<PathBuf>) -> Self {
43        self.workspace_home = Some(home.into());
44        self
45    }
46
47    pub fn with_task_runner(mut self, runner: Arc<dyn WorkflowTaskRunner>) -> Self {
48        self.task_runner = Some(runner);
49        self
50    }
51
52    pub fn with_event_listener(mut self, listener: Arc<dyn WorkflowEventListener>) -> Self {
53        self.event_listener = Some(listener);
54        self
55    }
56
57    pub fn add_task(mut self, task: TaskDefinition) -> Self {
58        self.tasks.insert(task.id.clone(), task);
59        self
60    }
61
62    pub fn add_workflow(mut self, workflow: WorkflowDefinition) -> Self {
63        self.workflows.insert(workflow.id.clone(), workflow);
64        self
65    }
66
67    pub fn register_transform(
68        mut self,
69        transform_id: impl Into<String>,
70        transform: Arc<dyn WorkflowTransform>,
71    ) -> Self {
72        self.transforms.insert(transform_id.into(), transform);
73        self
74    }
75
76    pub async fn build(self) -> Result<WorkflowRuntime, String> {
77        let task_runner = self
78            .task_runner
79            .ok_or_else(|| "WorkflowRuntimeBuilder requires a task runner.".to_string())?;
80        let workspace =
81            WorkflowWorkspace::new(self.workspace_home.unwrap_or_else(|| PathBuf::from(".")));
82        workspace.ensure_dirs().await?;
83
84        let runtime = WorkflowRuntime {
85            tasks: self.tasks,
86            workflows: self.workflows,
87            transforms: self.transforms,
88            workspace,
89            task_runner,
90            event_listener: self.event_listener,
91        };
92        runtime.validate_all()?;
93        Ok(runtime)
94    }
95}
96
97impl Default for WorkflowRuntimeBuilder {
98    fn default() -> Self {
99        Self::new()
100    }
101}
102
103pub struct WorkflowRuntime {
104    tasks: BTreeMap<String, TaskDefinition>,
105    workflows: BTreeMap<String, WorkflowDefinition>,
106    transforms: TransformRegistry,
107    workspace: WorkflowWorkspace,
108    task_runner: Arc<dyn WorkflowTaskRunner>,
109    event_listener: Option<Arc<dyn WorkflowEventListener>>,
110}
111
112impl WorkflowRuntime {
113    pub fn builder() -> WorkflowRuntimeBuilder {
114        WorkflowRuntimeBuilder::new()
115    }
116
117    pub fn list_workflows(&self) -> Vec<&WorkflowDefinition> {
118        self.workflows.values().collect()
119    }
120
121    pub async fn list_runs(&self) -> Result<Vec<WorkflowRunState>, String> {
122        let mut runs = Vec::new();
123        for run_id in self.workspace.list_run_ids().await? {
124            if let Ok(state) = self.workspace.load_state(&run_id).await {
125                runs.push(state);
126            }
127        }
128        runs.sort_by(|left, right| left.run_id.cmp(&right.run_id));
129        Ok(runs)
130    }
131
132    pub async fn inspect(&self, run_id: &str) -> Result<WorkflowRunState, String> {
133        self.workspace.load_state(run_id).await
134    }
135
136    pub async fn start(&self, request: WorkflowRequest) -> Result<WorkflowResponse, String> {
137        let workflow = self
138            .workflows
139            .get(&request.workflow_id)
140            .cloned()
141            .ok_or_else(|| format!("Workflow '{}' not found.", request.workflow_id))?;
142        self.validate_workflow(&workflow)?;
143
144        let run_id = format!("wf-{}", current_timestamp_nanos());
145        self.workspace.initialize_run_dir(&run_id).await?;
146        self.workspace.save_definition(&run_id, &workflow).await?;
147
148        let mut context = WorkflowContext::default();
149        context.insert("input", request.input.clone());
150
151        let now = current_timestamp_nanos();
152        let mut node_states = BTreeMap::new();
153        for node in &workflow.nodes {
154            let task = self.resolve_task(node)?;
155            let output_key = output_key_for(node, task.as_ref());
156            node_states.insert(
157                node.id.clone(),
158                NodeRunState {
159                    node_id: node.id.clone(),
160                    status: NodeStatus::Pending,
161                    attempts: 0,
162                    started_at: None,
163                    completed_at: None,
164                    last_error: None,
165                    output_key,
166                    output: None,
167                    activated_incoming: Vec::new(),
168                    session_id: None,
169                },
170            );
171        }
172
173        let state = WorkflowRunState {
174            workflow_id: workflow.id.clone(),
175            run_id: run_id.clone(),
176            status: WorkflowStatus::Pending,
177            created_at: now,
178            updated_at: now,
179            input: request.input,
180            context,
181            node_states,
182            pending_interventions: Vec::new(),
183            failed_nodes: Vec::new(),
184        };
185        self.workspace.save_state(&run_id, &state).await?;
186        self.drive_run(workflow, state).await
187    }
188
189    pub async fn resume(&self, run_id: &str) -> Result<WorkflowResponse, String> {
190        let workflow = self.workspace.load_definition(run_id).await?;
191        self.validate_workflow(&workflow)?;
192        let state = self.workspace.load_state(run_id).await?;
193        self.drive_run(workflow, state).await
194    }
195
196    pub async fn submit_intervention(
197        &self,
198        run_id: &str,
199        intervention_id: &str,
200        response: Option<String>,
201    ) -> Result<WorkflowRunState, String> {
202        let mut state = self.workspace.load_state(run_id).await?;
203        let request = state
204            .pending_interventions
205            .iter_mut()
206            .find(|entry| entry.id == intervention_id)
207            .ok_or_else(|| {
208                format!(
209                    "Intervention '{}' not found for run '{}'.",
210                    intervention_id, run_id
211                )
212            })?;
213        request.response = response;
214        request.status = InterventionStatus::Resolved;
215        request.resolved_at = Some(current_timestamp_nanos());
216        state.updated_at = current_timestamp_nanos();
217        self.workspace.save_state(run_id, &state).await?;
218        Ok(state)
219    }
220
221    pub async fn list_pending_interventions(
222        &self,
223        run_id: &str,
224    ) -> Result<Vec<InterventionRequest>, String> {
225        let state = self.workspace.load_state(run_id).await?;
226        Ok(state
227            .pending_interventions
228            .into_iter()
229            .filter(|entry| entry.status == InterventionStatus::Pending)
230            .collect())
231    }
232
233    async fn drive_run(
234        &self,
235        workflow: WorkflowDefinition,
236        mut state: WorkflowRunState,
237    ) -> Result<WorkflowResponse, String> {
238        let mut events = Vec::new();
239        if state.status == WorkflowStatus::Pending {
240            state.status = WorkflowStatus::Running;
241            let event = WorkflowEvent::WorkflowStarted {
242                workflow_id: workflow.id.clone(),
243                run_id: state.run_id.clone(),
244            };
245            self.record_event(&state.run_id, &mut events, event).await?;
246        }
247
248        loop {
249            self.apply_resolved_interventions(&workflow, &mut state, &mut events)
250                .await?;
251
252            if matches!(
253                state.status,
254                WorkflowStatus::Failed
255                    | WorkflowStatus::Completed
256                    | WorkflowStatus::CompletedWithFailures
257            ) {
258                break;
259            }
260
261            if state.status == WorkflowStatus::Paused {
262                if state
263                    .pending_interventions
264                    .iter()
265                    .any(|entry| entry.status == InterventionStatus::Pending)
266                {
267                    break;
268                }
269                state.status = WorkflowStatus::Running;
270            }
271
272            let skipped = self
273                .reconcile_skipped_nodes(&workflow, &mut state, &mut events)
274                .await?;
275            let ready = self.collect_ready_nodes(&workflow, &state)?;
276            if ready.is_empty() {
277                if !skipped {
278                    self.finalize_if_quiescent(&mut state, &mut events).await?;
279                    break;
280                }
281                continue;
282            }
283
284            for node_id in ready {
285                if state.status == WorkflowStatus::Paused || state.status == WorkflowStatus::Failed
286                {
287                    break;
288                }
289                let event = WorkflowEvent::NodeReady {
290                    node_id: node_id.clone(),
291                };
292                self.record_event(&state.run_id, &mut events, event).await?;
293                self.execute_node(&workflow, &node_id, &mut state, &mut events)
294                    .await?;
295            }
296        }
297
298        state.updated_at = current_timestamp_nanos();
299        self.workspace.save_state(&state.run_id, &state).await?;
300        Ok(WorkflowResponse {
301            workflow_id: state.workflow_id.clone(),
302            run_id: state.run_id.clone(),
303            status: state.status.clone(),
304            context: state.context.clone(),
305            events,
306        })
307    }
308
309    fn validate_all(&self) -> Result<(), String> {
310        for workflow in self.workflows.values() {
311            self.validate_workflow(workflow)?;
312        }
313        Ok(())
314    }
315
316    fn validate_workflow(&self, workflow: &WorkflowDefinition) -> Result<(), String> {
317        let mut node_ids = BTreeSet::new();
318        for node in &workflow.nodes {
319            if !node_ids.insert(node.id.clone()) {
320                return Err(format!(
321                    "Workflow '{}' has duplicate node '{}'.",
322                    workflow.id, node.id
323                ));
324            }
325            match &node.kind {
326                WorkflowNodeKind::Task { task_id, task } => match (task_id, task) {
327                    (Some(task_id), None) => {
328                        if !self.tasks.contains_key(task_id) {
329                            return Err(format!(
330                                "Workflow '{}' references unknown task '{}'.",
331                                workflow.id, task_id
332                            ));
333                        }
334                    }
335                    (None, Some(task)) => self.validate_task(task)?,
336                    (Some(_), Some(_)) => {
337                        return Err(format!(
338                            "Workflow '{}' node '{}' cannot define both task_id and inline task.",
339                            workflow.id, node.id
340                        ));
341                    }
342                    (None, None) => {
343                        return Err(format!(
344                            "Workflow '{}' node '{}' must define a task_id or inline task.",
345                            workflow.id, node.id
346                        ));
347                    }
348                },
349                WorkflowNodeKind::Decision { condition } => {
350                    if condition.trim().is_empty() {
351                        return Err(format!(
352                            "Workflow '{}' decision node '{}' must define a condition.",
353                            workflow.id, node.id
354                        ));
355                    }
356                }
357                WorkflowNodeKind::HumanGate { prompt } => {
358                    if prompt.trim().is_empty() {
359                        return Err(format!(
360                            "Workflow '{}' human_gate node '{}' must define a prompt.",
361                            workflow.id, node.id
362                        ));
363                    }
364                }
365                WorkflowNodeKind::Transform { transform_id, .. } => {
366                    if !self.transforms.contains_key(transform_id) {
367                        return Err(format!(
368                            "Workflow '{}' transform node '{}' references unknown transform '{}'.",
369                            workflow.id, node.id, transform_id
370                        ));
371                    }
372                }
373                WorkflowNodeKind::Join => {}
374            }
375        }
376        let mut adjacency: BTreeMap<String, Vec<String>> = BTreeMap::new();
377        let mut incoming_counts: BTreeMap<String, usize> = workflow
378            .nodes
379            .iter()
380            .map(|node| (node.id.clone(), 0usize))
381            .collect();
382
383        for edge in &workflow.edges {
384            if !node_ids.contains(&edge.from) || !node_ids.contains(&edge.to) {
385                return Err(format!(
386                    "Workflow '{}' edge '{}' -> '{}' references an unknown node.",
387                    workflow.id, edge.from, edge.to
388                ));
389            }
390            adjacency
391                .entry(edge.from.clone())
392                .or_default()
393                .push(edge.to.clone());
394            *incoming_counts.entry(edge.to.clone()).or_default() += 1;
395        }
396
397        for node in &workflow.nodes {
398            if matches!(node.kind, WorkflowNodeKind::Join)
399                && incoming_counts.get(&node.id).copied().unwrap_or_default() == 0
400            {
401                return Err(format!(
402                    "Workflow '{}' join node '{}' must have at least one incoming edge.",
403                    workflow.id, node.id
404                ));
405            }
406        }
407
408        let mut visiting = BTreeSet::new();
409        let mut visited = BTreeSet::new();
410        for node in &workflow.nodes {
411            self.visit_for_cycle(&node.id, &adjacency, &mut visiting, &mut visited)?;
412        }
413        Ok(())
414    }
415
416    fn visit_for_cycle(
417        &self,
418        node_id: &str,
419        adjacency: &BTreeMap<String, Vec<String>>,
420        visiting: &mut BTreeSet<String>,
421        visited: &mut BTreeSet<String>,
422    ) -> Result<(), String> {
423        if visited.contains(node_id) {
424            return Ok(());
425        }
426        if !visiting.insert(node_id.to_string()) {
427            return Err(format!(
428                "Workflow graph contains a cycle at node '{}'.",
429                node_id
430            ));
431        }
432        if let Some(children) = adjacency.get(node_id) {
433            for child in children {
434                self.visit_for_cycle(child, adjacency, visiting, visited)?;
435            }
436        }
437        visiting.remove(node_id);
438        visited.insert(node_id.to_string());
439        Ok(())
440    }
441
442    fn validate_task(&self, task: &TaskDefinition) -> Result<(), String> {
443        match &task.target {
444            TaskTarget::AgentId(agent_id) if agent_id.trim().is_empty() => {
445                Err("Task target agent_id cannot be empty.".to_string())
446            }
447            TaskTarget::Capabilities(capabilities) if capabilities.is_empty() => {
448                Err("Task target capabilities cannot be empty.".to_string())
449            }
450            _ => Ok(()),
451        }
452    }
453
454    fn resolve_task(
455        &self,
456        node: &WorkflowNodeDefinition,
457    ) -> Result<Option<TaskDefinition>, String> {
458        match &node.kind {
459            WorkflowNodeKind::Task { task_id, task } => match (task_id, task) {
460                (Some(task_id), None) => self
461                    .tasks
462                    .get(task_id)
463                    .cloned()
464                    .map(Some)
465                    .ok_or_else(|| format!("Unknown task '{}'.", task_id)),
466                (None, Some(task)) => Ok(Some(task.clone())),
467                _ => Ok(None),
468            },
469            _ => Ok(None),
470        }
471    }
472
473    fn collect_ready_nodes(
474        &self,
475        workflow: &WorkflowDefinition,
476        state: &WorkflowRunState,
477    ) -> Result<Vec<String>, String> {
478        let mut ready = Vec::new();
479        for node in &workflow.nodes {
480            let node_state = state
481                .node_states
482                .get(&node.id)
483                .ok_or_else(|| format!("Missing node state for '{}'.", node.id))?;
484            if node_state.status != NodeStatus::Pending {
485                continue;
486            }
487            let incoming: Vec<&WorkflowEdgeDefinition> = workflow
488                .edges
489                .iter()
490                .filter(|edge| edge.to == node.id)
491                .collect();
492            if incoming.is_empty() {
493                ready.push(node.id.clone());
494                continue;
495            }
496
497            let mut all_terminal = true;
498            let mut activated = false;
499            for edge in incoming {
500                let from_state = state
501                    .node_states
502                    .get(&edge.from)
503                    .ok_or_else(|| format!("Missing node state for '{}'.", edge.from))?;
504                if !from_state.status.is_terminal() {
505                    all_terminal = false;
506                    break;
507                }
508                if edge_is_active(edge, from_state, &state.context) {
509                    activated = true;
510                }
511            }
512
513            if all_terminal && activated {
514                ready.push(node.id.clone());
515            }
516        }
517        Ok(ready)
518    }
519
520    async fn reconcile_skipped_nodes(
521        &self,
522        workflow: &WorkflowDefinition,
523        state: &mut WorkflowRunState,
524        events: &mut Vec<WorkflowEvent>,
525    ) -> Result<bool, String> {
526        let mut any_skipped = false;
527        for node in &workflow.nodes {
528            let current = state
529                .node_states
530                .get(&node.id)
531                .ok_or_else(|| format!("Missing node state for '{}'.", node.id))?
532                .status
533                .clone();
534            if current != NodeStatus::Pending {
535                continue;
536            }
537            let incoming: Vec<&WorkflowEdgeDefinition> = workflow
538                .edges
539                .iter()
540                .filter(|edge| edge.to == node.id)
541                .collect();
542            if incoming.is_empty() {
543                continue;
544            }
545            let mut all_terminal = true;
546            let mut activated = false;
547            for edge in incoming {
548                let from_state = state
549                    .node_states
550                    .get(&edge.from)
551                    .ok_or_else(|| format!("Missing node state for '{}'.", edge.from))?;
552                if !from_state.status.is_terminal() {
553                    all_terminal = false;
554                    break;
555                }
556                if edge_is_active(edge, from_state, &state.context) {
557                    activated = true;
558                }
559            }
560            if all_terminal && !activated {
561                if let Some(node_state) = state.node_states.get_mut(&node.id) {
562                    node_state.status = NodeStatus::Skipped;
563                    node_state.completed_at = Some(current_timestamp_nanos());
564                }
565                any_skipped = true;
566                self.record_event(
567                    &state.run_id,
568                    events,
569                    WorkflowEvent::NodeSkipped {
570                        node_id: node.id.clone(),
571                    },
572                )
573                .await?;
574            }
575        }
576        Ok(any_skipped)
577    }
578
579    async fn finalize_if_quiescent(
580        &self,
581        state: &mut WorkflowRunState,
582        events: &mut Vec<WorkflowEvent>,
583    ) -> Result<(), String> {
584        if state
585            .pending_interventions
586            .iter()
587            .any(|entry| entry.status == InterventionStatus::Pending)
588        {
589            state.status = WorkflowStatus::Paused;
590            self.record_event(
591                &state.run_id,
592                events,
593                WorkflowEvent::WorkflowPaused {
594                    run_id: state.run_id.clone(),
595                    reason: "Waiting for intervention".to_string(),
596                },
597            )
598            .await?;
599            return Ok(());
600        }
601
602        state.status = if state.failed_nodes.is_empty() {
603            WorkflowStatus::Completed
604        } else {
605            WorkflowStatus::CompletedWithFailures
606        };
607        self.record_event(
608            &state.run_id,
609            events,
610            WorkflowEvent::WorkflowCompleted {
611                run_id: state.run_id.clone(),
612                status: state.status.clone(),
613            },
614        )
615        .await
616    }
617
618    async fn execute_node(
619        &self,
620        workflow: &WorkflowDefinition,
621        node_id: &str,
622        state: &mut WorkflowRunState,
623        events: &mut Vec<WorkflowEvent>,
624    ) -> Result<(), String> {
625        let node = workflow
626            .nodes
627            .iter()
628            .find(|candidate| candidate.id == node_id)
629            .ok_or_else(|| format!("Workflow node '{}' not found.", node_id))?;
630        let task = self.resolve_task(node)?;
631        let attempt = state
632            .node_states
633            .get(node_id)
634            .map(|entry| entry.attempts + 1)
635            .unwrap_or(1);
636        let output_key = output_key_for(node, task.as_ref());
637        let session_id = format!("wf-{}-{}-attempt-{}", state.run_id, node_id, attempt);
638
639        let activated_incoming = workflow
640            .edges
641            .iter()
642            .filter(|edge| edge.to == node_id)
643            .filter_map(|edge| {
644                let from_state = state.node_states.get(&edge.from)?;
645                if edge_is_active(edge, from_state, &state.context) {
646                    Some(edge.from.clone())
647                } else {
648                    None
649                }
650            })
651            .collect::<Vec<_>>();
652
653        {
654            let node_state = state
655                .node_states
656                .get_mut(node_id)
657                .ok_or_else(|| format!("Missing node state for '{}'.", node_id))?;
658            node_state.status = NodeStatus::Running;
659            node_state.attempts = attempt;
660            node_state.started_at = Some(current_timestamp_nanos());
661            node_state.session_id = Some(session_id.clone());
662            node_state.output_key = output_key.clone();
663            node_state.activated_incoming = activated_incoming;
664        }
665
666        self.record_event(
667            &state.run_id,
668            events,
669            WorkflowEvent::NodeStarted {
670                node_id: node_id.to_string(),
671                attempt,
672            },
673        )
674        .await?;
675
676        let result = match &node.kind {
677            WorkflowNodeKind::Task { .. } => {
678                let task =
679                    task.ok_or_else(|| format!("Workflow node '{}' is missing a task.", node_id))?;
680                self.execute_task_node(state, node, &task, attempt, &session_id)
681                    .await
682            }
683            WorkflowNodeKind::Decision { condition } => {
684                self.execute_decision_node(state, condition)
685            }
686            WorkflowNodeKind::HumanGate { prompt } => {
687                self.execute_human_gate_node(state, node, prompt).await
688            }
689            WorkflowNodeKind::Transform {
690                transform_id,
691                input_key,
692            } => {
693                self.execute_transform_node(state, transform_id, input_key.as_deref())
694                    .await
695            }
696            WorkflowNodeKind::Join => Ok(json!({
697                "joined": state
698                    .node_states
699                    .get(node_id)
700                    .map(|entry| entry.activated_incoming.clone())
701                    .unwrap_or_default()
702            })),
703        };
704
705        match result {
706            Ok(output) => {
707                state.context.insert(output_key.clone(), output.clone());
708                if let Some(node_state) = state.node_states.get_mut(node_id) {
709                    node_state.status = NodeStatus::Completed;
710                    node_state.completed_at = Some(current_timestamp_nanos());
711                    node_state.output = Some(output);
712                    node_state.last_error = None;
713                }
714                self.record_event(
715                    &state.run_id,
716                    events,
717                    WorkflowEvent::NodeCompleted {
718                        node_id: node_id.to_string(),
719                        output_key,
720                    },
721                )
722                .await?;
723            }
724            Err(error) => {
725                if state.status == WorkflowStatus::Paused {
726                    if let Some(intervention) = state
727                        .pending_interventions
728                        .iter()
729                        .find(|entry| {
730                            entry.node_id == node.id && entry.status == InterventionStatus::Pending
731                        })
732                        .cloned()
733                    {
734                        self.record_event(
735                            &state.run_id,
736                            events,
737                            WorkflowEvent::InterventionRequested {
738                                intervention_id: intervention.id,
739                                node_id: node.id.clone(),
740                                reason: intervention.reason,
741                            },
742                        )
743                        .await?;
744                    }
745                    self.record_event(
746                        &state.run_id,
747                        events,
748                        WorkflowEvent::WorkflowPaused {
749                            run_id: state.run_id.clone(),
750                            reason: error,
751                        },
752                    )
753                    .await?;
754                } else {
755                    self.handle_node_failure(workflow, state, node, attempt, error, events)
756                        .await?;
757                }
758            }
759        }
760
761        state.updated_at = current_timestamp_nanos();
762        self.workspace.save_state(&state.run_id, state).await
763    }
764    async fn execute_task_node(
765        &self,
766        state: &WorkflowRunState,
767        node: &WorkflowNodeDefinition,
768        task: &TaskDefinition,
769        attempt: usize,
770        session_id: &str,
771    ) -> Result<Value, String> {
772        let mut input = if task.input_bindings.is_empty() {
773            state
774                .context
775                .get("input")
776                .cloned()
777                .unwrap_or_else(|| json!({}))
778        } else {
779            let mut map = Map::new();
780            for (alias, path) in &task.input_bindings {
781                map.insert(
782                    alias.clone(),
783                    state.context.lookup_path(path).unwrap_or(Value::Null),
784                );
785            }
786            Value::Object(map)
787        };
788
789        if let Some(transform_id) = &task.input_transform {
790            input = self
791                .apply_transform(transform_id, &input, &state.context)
792                .await?;
793        }
794
795        let prompt = render_prompt(&task.prompt, &input, &state.context);
796        let metadata = WorkflowToolContext {
797            workflow_id: state.workflow_id.clone(),
798            run_id: state.run_id.clone(),
799            node_id: node.id.clone(),
800            attempt,
801        };
802        let workspace_dir = self.workspace.task_workspace(&state.run_id, &node.id);
803        #[cfg(not(target_arch = "wasm32"))]
804        tokio::fs::create_dir_all(&workspace_dir)
805            .await
806            .map_err(|e| format!("Failed to create workflow task workspace: {e}"))?;
807
808        let result = self
809            .task_runner
810            .run_task(&task.target, &metadata, &workspace_dir, &prompt)
811            .await?;
812
813        let mut output = result.value;
814        if let Some(transform_id) = &task.output_transform {
815            output = self
816                .apply_transform(transform_id, &output, &state.context)
817                .await?;
818        }
819        if output.is_null() {
820            output = json!({
821                "content": result.content,
822                "agent_id": result.agent_id,
823                "session_id": session_id,
824                "attempt": attempt,
825            });
826        }
827        Ok(output)
828    }
829
830    fn execute_decision_node(
831        &self,
832        state: &WorkflowRunState,
833        condition: &str,
834    ) -> Result<Value, String> {
835        Ok(json!({
836            "matched": evaluate_condition(condition, &state.context)
837        }))
838    }
839
840    async fn execute_human_gate_node(
841        &self,
842        state: &mut WorkflowRunState,
843        node: &WorkflowNodeDefinition,
844        prompt: &str,
845    ) -> Result<Value, String> {
846        if let Some(existing) = state
847            .pending_interventions
848            .iter()
849            .find(|entry| entry.node_id == node.id && entry.status == InterventionStatus::Resolved)
850        {
851            let response = existing.response.clone().unwrap_or_default();
852            return Ok(json!({
853                "response": response.clone(),
854                "approved": is_truthy(&Value::String(response)),
855            }));
856        }
857
858        let intervention_id = format!("int-{}", current_timestamp_nanos());
859        state.pending_interventions.push(InterventionRequest {
860            id: intervention_id,
861            workflow_id: state.workflow_id.clone(),
862            run_id: state.run_id.clone(),
863            node_id: node.id.clone(),
864            prompt: prompt.to_string(),
865            reason: "human_gate".to_string(),
866            response: None,
867            status: InterventionStatus::Pending,
868            created_at: current_timestamp_nanos(),
869            resolved_at: None,
870        });
871        if let Some(node_state) = state.node_states.get_mut(&node.id) {
872            node_state.status = NodeStatus::Paused;
873        }
874        state.status = WorkflowStatus::Paused;
875        Err("Human intervention required".to_string())
876    }
877
878    async fn execute_transform_node(
879        &self,
880        state: &WorkflowRunState,
881        transform_id: &str,
882        input_key: Option<&str>,
883    ) -> Result<Value, String> {
884        let input = match input_key {
885            Some(key) => state.context.lookup_path(key).unwrap_or(Value::Null),
886            None => state.context.to_value(),
887        };
888        self.apply_transform(transform_id, &input, &state.context)
889            .await
890    }
891
892    async fn handle_node_failure(
893        &self,
894        workflow: &WorkflowDefinition,
895        state: &mut WorkflowRunState,
896        node: &WorkflowNodeDefinition,
897        attempt: usize,
898        error: String,
899        events: &mut Vec<WorkflowEvent>,
900    ) -> Result<(), String> {
901        let retry_policy =
902            effective_retry_policy(workflow, node, self.resolve_task(node)?.as_ref());
903        if attempt < retry_policy.max_attempts {
904            if let Some(node_state) = state.node_states.get_mut(&node.id) {
905                node_state.status = NodeStatus::Pending;
906                node_state.last_error = Some(error.clone());
907            }
908            self.record_event(
909                &state.run_id,
910                events,
911                WorkflowEvent::NodeRetryScheduled {
912                    node_id: node.id.clone(),
913                    attempt,
914                    error,
915                },
916            )
917            .await?;
918            return Ok(());
919        }
920
921        let failure_policy =
922            effective_failure_policy(workflow, node, self.resolve_task(node)?.as_ref());
923        if let Some(node_state) = state.node_states.get_mut(&node.id) {
924            node_state.last_error = Some(error.clone());
925            node_state.completed_at = Some(current_timestamp_nanos());
926        }
927
928        match failure_policy {
929            WorkflowFailurePolicy::ContinueBestEffort => {
930                if let Some(node_state) = state.node_states.get_mut(&node.id) {
931                    node_state.status = NodeStatus::Failed;
932                }
933                if !state.failed_nodes.contains(&node.id) {
934                    state.failed_nodes.push(node.id.clone());
935                }
936                self.record_event(
937                    &state.run_id,
938                    events,
939                    WorkflowEvent::NodeFailed {
940                        node_id: node.id.clone(),
941                        error,
942                    },
943                )
944                .await?;
945            }
946            WorkflowFailurePolicy::FailWorkflow => {
947                if let Some(node_state) = state.node_states.get_mut(&node.id) {
948                    node_state.status = NodeStatus::Failed;
949                }
950                if !state.failed_nodes.contains(&node.id) {
951                    state.failed_nodes.push(node.id.clone());
952                }
953                state.status = WorkflowStatus::Failed;
954                self.record_event(
955                    &state.run_id,
956                    events,
957                    WorkflowEvent::NodeFailed {
958                        node_id: node.id.clone(),
959                        error: error.clone(),
960                    },
961                )
962                .await?;
963                self.record_event(
964                    &state.run_id,
965                    events,
966                    WorkflowEvent::WorkflowCompleted {
967                        run_id: state.run_id.clone(),
968                        status: WorkflowStatus::Failed,
969                    },
970                )
971                .await?;
972            }
973            WorkflowFailurePolicy::PauseForIntervention => {
974                if let Some(node_state) = state.node_states.get_mut(&node.id) {
975                    node_state.status = NodeStatus::Paused;
976                }
977                let intervention_id = format!("int-{}", current_timestamp_nanos());
978                state.pending_interventions.push(InterventionRequest {
979                    id: intervention_id.clone(),
980                    workflow_id: state.workflow_id.clone(),
981                    run_id: state.run_id.clone(),
982                    node_id: node.id.clone(),
983                    prompt: format!(
984                        "Node '{}' failed after {} attempt(s). Reply with retry, skip, continue, or fail.",
985                        node.id, attempt
986                    ),
987                    reason: format!("step_failure:{error}"),
988                    response: None,
989                    status: InterventionStatus::Pending,
990                    created_at: current_timestamp_nanos(),
991                    resolved_at: None,
992                });
993                state.status = WorkflowStatus::Paused;
994                self.record_event(
995                    &state.run_id,
996                    events,
997                    WorkflowEvent::InterventionRequested {
998                        intervention_id,
999                        node_id: node.id.clone(),
1000                        reason: error,
1001                    },
1002                )
1003                .await?;
1004            }
1005        }
1006        Ok(())
1007    }
1008
1009    async fn apply_resolved_interventions(
1010        &self,
1011        workflow: &WorkflowDefinition,
1012        state: &mut WorkflowRunState,
1013        events: &mut Vec<WorkflowEvent>,
1014    ) -> Result<(), String> {
1015        let mut remaining = Vec::new();
1016        let interventions = std::mem::take(&mut state.pending_interventions);
1017        for intervention in interventions {
1018            if intervention.status == InterventionStatus::Pending {
1019                remaining.push(intervention);
1020                continue;
1021            }
1022
1023            self.record_event(
1024                &state.run_id,
1025                events,
1026                WorkflowEvent::InterventionResolved {
1027                    intervention_id: intervention.id.clone(),
1028                    node_id: intervention.node_id.clone(),
1029                },
1030            )
1031            .await?;
1032
1033            let node = workflow
1034                .nodes
1035                .iter()
1036                .find(|candidate| candidate.id == intervention.node_id)
1037                .ok_or_else(|| format!("Workflow node '{}' not found.", intervention.node_id))?;
1038
1039            if intervention.reason == "human_gate" {
1040                let response = intervention.response.clone().unwrap_or_default();
1041                let output = json!({
1042                    "response": response.clone(),
1043                    "approved": is_truthy(&Value::String(response)),
1044                });
1045                if let Some(node_state) = state.node_states.get_mut(&node.id) {
1046                    node_state.status = NodeStatus::Completed;
1047                    node_state.completed_at = Some(current_timestamp_nanos());
1048                    node_state.output = Some(output.clone());
1049                    node_state.last_error = None;
1050                }
1051                let output_key = state
1052                    .node_states
1053                    .get(&node.id)
1054                    .map(|entry| entry.output_key.clone())
1055                    .unwrap_or_else(|| node.id.clone());
1056                state.context.insert(output_key.clone(), output);
1057                self.record_event(
1058                    &state.run_id,
1059                    events,
1060                    WorkflowEvent::NodeCompleted {
1061                        node_id: node.id.clone(),
1062                        output_key,
1063                    },
1064                )
1065                .await?;
1066                continue;
1067            }
1068
1069            match intervention
1070                .response
1071                .clone()
1072                .unwrap_or_default()
1073                .trim()
1074                .to_ascii_lowercase()
1075                .as_str()
1076            {
1077                "retry" => {
1078                    if let Some(node_state) = state.node_states.get_mut(&node.id) {
1079                        node_state.status = NodeStatus::Pending;
1080                    }
1081                    state.status = WorkflowStatus::Running;
1082                }
1083                "skip" => {
1084                    if let Some(node_state) = state.node_states.get_mut(&node.id) {
1085                        node_state.status = NodeStatus::Skipped;
1086                        node_state.completed_at = Some(current_timestamp_nanos());
1087                    }
1088                    state.status = WorkflowStatus::Running;
1089                    self.record_event(
1090                        &state.run_id,
1091                        events,
1092                        WorkflowEvent::NodeSkipped {
1093                            node_id: node.id.clone(),
1094                        },
1095                    )
1096                    .await?;
1097                }
1098                "continue" => {
1099                    if let Some(node_state) = state.node_states.get_mut(&node.id) {
1100                        node_state.status = NodeStatus::Failed;
1101                        node_state.completed_at = Some(current_timestamp_nanos());
1102                    }
1103                    if !state.failed_nodes.contains(&node.id) {
1104                        state.failed_nodes.push(node.id.clone());
1105                    }
1106                    state.status = WorkflowStatus::Running;
1107                }
1108                _ => {
1109                    if let Some(node_state) = state.node_states.get_mut(&node.id) {
1110                        node_state.status = NodeStatus::Failed;
1111                        node_state.completed_at = Some(current_timestamp_nanos());
1112                    }
1113                    if !state.failed_nodes.contains(&node.id) {
1114                        state.failed_nodes.push(node.id.clone());
1115                    }
1116                    state.status = WorkflowStatus::Failed;
1117                }
1118            }
1119        }
1120        state.pending_interventions = remaining;
1121        Ok(())
1122    }
1123
1124    async fn apply_transform(
1125        &self,
1126        transform_id: &str,
1127        input: &Value,
1128        context: &WorkflowContext,
1129    ) -> Result<Value, String> {
1130        let transform = self
1131            .transforms
1132            .get(transform_id)
1133            .ok_or_else(|| format!("Unknown transform '{}'.", transform_id))?;
1134        transform.apply(input, context).await
1135    }
1136
1137    async fn record_event(
1138        &self,
1139        run_id: &str,
1140        events: &mut Vec<WorkflowEvent>,
1141        event: WorkflowEvent,
1142    ) -> Result<(), String> {
1143        self.workspace.append_event(run_id, &event).await?;
1144        events.push(event.clone());
1145        if let Some(listener) = &self.event_listener {
1146            listener.on_event(&event).await?;
1147        }
1148        Ok(())
1149    }
1150}
1151struct IdentityTransform;
1152
1153#[async_trait::async_trait(?Send)]
1154impl WorkflowTransform for IdentityTransform {
1155    async fn apply(&self, input: &Value, _context: &WorkflowContext) -> Result<Value, String> {
1156        Ok(input.clone())
1157    }
1158}
1159
1160struct ExtractContentTransform;
1161
1162#[async_trait::async_trait(?Send)]
1163impl WorkflowTransform for ExtractContentTransform {
1164    async fn apply(&self, input: &Value, _context: &WorkflowContext) -> Result<Value, String> {
1165        match input {
1166            Value::Object(map) => Ok(map.get("content").cloned().unwrap_or_else(|| input.clone())),
1167            _ => Ok(input.clone()),
1168        }
1169    }
1170}
1171
1172fn output_key_for(node: &WorkflowNodeDefinition, task: Option<&TaskDefinition>) -> String {
1173    node.output_key
1174        .clone()
1175        .or_else(|| task.and_then(|task| task.output_key.clone()))
1176        .unwrap_or_else(|| node.id.clone())
1177}
1178
1179fn effective_retry_policy(
1180    workflow: &WorkflowDefinition,
1181    node: &WorkflowNodeDefinition,
1182    task: Option<&TaskDefinition>,
1183) -> RetryPolicy {
1184    node.retry_policy
1185        .clone()
1186        .or_else(|| task.and_then(|task| task.retry_policy.clone()))
1187        .or_else(|| workflow.retry_policy.clone())
1188        .unwrap_or_default()
1189}
1190
1191fn effective_failure_policy(
1192    workflow: &WorkflowDefinition,
1193    node: &WorkflowNodeDefinition,
1194    task: Option<&TaskDefinition>,
1195) -> WorkflowFailurePolicy {
1196    node.failure_policy
1197        .clone()
1198        .or_else(|| task.and_then(|task| task.failure_policy.clone()))
1199        .or_else(|| workflow.failure_policy.clone())
1200        .unwrap_or_default()
1201}
1202
1203fn edge_is_active(
1204    edge: &WorkflowEdgeDefinition,
1205    from_state: &NodeRunState,
1206    context: &WorkflowContext,
1207) -> bool {
1208    match &edge.transition {
1209        WorkflowEdgeTransition::Always => from_state.status.is_terminal(),
1210        WorkflowEdgeTransition::OnSuccess => from_state.status == NodeStatus::Completed,
1211        WorkflowEdgeTransition::OnFailure => from_state.status == NodeStatus::Failed,
1212        WorkflowEdgeTransition::Condition(condition) => {
1213            from_state.status == NodeStatus::Completed && evaluate_condition(condition, context)
1214        }
1215    }
1216}
1217
1218fn evaluate_condition(condition: &str, context: &WorkflowContext) -> bool {
1219    let condition = condition.trim();
1220    if let Some((left, right)) = condition.split_once("==") {
1221        return normalize_condition_value(context.lookup_path(normalize_path(left.trim())))
1222            == parse_literal(right.trim());
1223    }
1224    if let Some((left, right)) = condition.split_once("!=") {
1225        return normalize_condition_value(context.lookup_path(normalize_path(left.trim())))
1226            != parse_literal(right.trim());
1227    }
1228    if let Some(path) = condition.strip_prefix('!') {
1229        return !is_truthy(
1230            &context
1231                .lookup_path(normalize_path(path.trim()))
1232                .unwrap_or(Value::Null),
1233        );
1234    }
1235    is_truthy(
1236        &context
1237            .lookup_path(normalize_path(condition))
1238            .unwrap_or(Value::Null),
1239    )
1240}
1241
1242fn normalize_path(path: &str) -> &str {
1243    path.strip_prefix("context.").unwrap_or(path)
1244}
1245
1246fn normalize_condition_value(value: Option<Value>) -> Value {
1247    value.unwrap_or(Value::Null)
1248}
1249
1250fn parse_literal(raw: &str) -> Value {
1251    let raw = raw.trim();
1252    if (raw.starts_with('"') && raw.ends_with('"'))
1253        || (raw.starts_with('\'') && raw.ends_with('\''))
1254    {
1255        return Value::String(raw[1..raw.len().saturating_sub(1)].to_string());
1256    }
1257    if raw.eq_ignore_ascii_case("true") {
1258        return Value::Bool(true);
1259    }
1260    if raw.eq_ignore_ascii_case("false") {
1261        return Value::Bool(false);
1262    }
1263    if raw.eq_ignore_ascii_case("null") {
1264        return Value::Null;
1265    }
1266    if let Ok(number) = raw.parse::<i64>() {
1267        return json!(number);
1268    }
1269    if let Ok(number) = raw.parse::<f64>() {
1270        return json!(number);
1271    }
1272    Value::String(raw.to_string())
1273}
1274fn render_prompt(template: &str, input: &Value, context: &WorkflowContext) -> String {
1275    let mut rendered = String::new();
1276    let mut remaining = template;
1277    while let Some(start) = remaining.find("{{") {
1278        rendered.push_str(&remaining[..start]);
1279        let after_start = &remaining[start + 2..];
1280        if let Some(end) = after_start.find("}}") {
1281            let key = after_start[..end].trim();
1282            let replacement = resolve_template_value(key, input, context);
1283            rendered.push_str(&replacement);
1284            remaining = &after_start[end + 2..];
1285        } else {
1286            rendered.push_str(&remaining[start..]);
1287            remaining = "";
1288            break;
1289        }
1290    }
1291    rendered.push_str(remaining);
1292
1293    if !matches!(input, Value::Null)
1294        && !(input.is_object() && input.as_object().map(|map| map.is_empty()).unwrap_or(false))
1295        && !template.contains("{{")
1296    {
1297        if !rendered.trim().is_empty() {
1298            rendered.push_str("\n\n");
1299        }
1300        rendered.push_str("Workflow input:\n");
1301        rendered
1302            .push_str(&serde_json::to_string_pretty(input).unwrap_or_else(|_| input.to_string()));
1303    }
1304
1305    rendered
1306}
1307
1308fn resolve_template_value(key: &str, input: &Value, context: &WorkflowContext) -> String {
1309    if let Some(path) = key.strip_prefix("input.") {
1310        if let Some(value) = lookup_value(input, path) {
1311            return value_to_template_string(&value);
1312        }
1313    }
1314    if let Some(value) = context.lookup_path(normalize_path(key)) {
1315        return value_to_template_string(&value);
1316    }
1317    String::new()
1318}
1319
1320fn lookup_value(root: &Value, path: &str) -> Option<Value> {
1321    let mut current = root.clone();
1322    for segment in path.split('.') {
1323        current = match current {
1324            Value::Object(map) => map.get(segment)?.clone(),
1325            _ => return None,
1326        };
1327    }
1328    Some(current)
1329}
1330
1331fn value_to_template_string(value: &Value) -> String {
1332    match value {
1333        Value::Null => String::new(),
1334        Value::String(value) => value.clone(),
1335        _ => value.to_string(),
1336    }
1337}
1338
1339fn is_truthy(value: &Value) -> bool {
1340    match value {
1341        Value::Null => false,
1342        Value::Bool(value) => *value,
1343        Value::Number(number) => number.as_f64().map(|value| value != 0.0).unwrap_or(false),
1344        Value::String(value) => {
1345            let normalized = value.trim().to_ascii_lowercase();
1346            !(normalized.is_empty()
1347                || normalized == "false"
1348                || normalized == "0"
1349                || normalized == "no")
1350        }
1351        Value::Array(values) => !values.is_empty(),
1352        Value::Object(values) => !values.is_empty(),
1353    }
1354}
1355
1356fn current_timestamp_nanos() -> u128 {
1357    std::time::SystemTime::now()
1358        .duration_since(std::time::UNIX_EPOCH)
1359        .map(|duration| duration.as_nanos())
1360        .unwrap_or_default()
1361}