Skip to main content

simple_agents_workflow/yaml_runner/
runner.rs

1use std::path::Path;
2
3use serde_json::Value;
4use simple_agents_core::SimpleAgentsClient;
5
6use super::typed_contracts::YamlWorkflowEventSinkFanout;
7use super::{
8    load_workflow_yaml_file,
9    run_workflow_yaml_with_client_and_custom_worker_and_events_and_options,
10    run_workflow_yaml_with_custom_worker_and_events_and_options, YamlWorkflow,
11    YamlWorkflowCustomWorkerExecutor, YamlWorkflowEventSink, YamlWorkflowLlmExecutor,
12    YamlWorkflowRunError, YamlWorkflowRunOptions, YamlWorkflowRunOutput,
13    YamlWorkflowRunTypedOutput, YamlWorkflowTypedEventSink, YamlWorkflowTypedEventSinkAdapter,
14};
15
16#[derive(Clone, Copy)]
17enum WorkflowRunnerExecutor<'a> {
18    Llm(&'a dyn YamlWorkflowLlmExecutor),
19    Client(&'a SimpleAgentsClient),
20}
21
22#[derive(Clone, Copy)]
23enum WorkflowRunnerSource<'a> {
24    File(&'a Path),
25    Inline(&'a YamlWorkflow),
26}
27
28/// Builder-style runner for YAML workflow execution.
29///
30/// This is the preferred additive API for configuring workflow runs while keeping
31/// legacy `run_*` helpers as compatibility adapters.
32pub struct WorkflowRunner<'a> {
33    source: WorkflowRunnerSource<'a>,
34    workflow_input: Option<&'a Value>,
35    executor: Option<WorkflowRunnerExecutor<'a>>,
36    custom_worker: Option<&'a dyn YamlWorkflowCustomWorkerExecutor>,
37    event_sink: Option<&'a dyn YamlWorkflowEventSink>,
38    typed_event_sink: Option<&'a dyn YamlWorkflowTypedEventSink>,
39    options: Option<&'a YamlWorkflowRunOptions>,
40}
41
42impl<'a> WorkflowRunner<'a> {
43    pub fn from_file(workflow_path: &'a Path) -> Self {
44        Self {
45            source: WorkflowRunnerSource::File(workflow_path),
46            workflow_input: None,
47            executor: None,
48            custom_worker: None,
49            event_sink: None,
50            typed_event_sink: None,
51            options: None,
52        }
53    }
54
55    pub fn from_workflow(workflow: &'a YamlWorkflow) -> Self {
56        Self {
57            source: WorkflowRunnerSource::Inline(workflow),
58            workflow_input: None,
59            executor: None,
60            custom_worker: None,
61            event_sink: None,
62            typed_event_sink: None,
63            options: None,
64        }
65    }
66
67    pub fn with_input(mut self, workflow_input: &'a Value) -> Self {
68        self.workflow_input = Some(workflow_input);
69        self
70    }
71
72    pub fn with_executor(mut self, executor: &'a dyn YamlWorkflowLlmExecutor) -> Self {
73        self.executor = Some(WorkflowRunnerExecutor::Llm(executor));
74        self
75    }
76
77    pub fn with_client(mut self, client: &'a SimpleAgentsClient) -> Self {
78        self.executor = Some(WorkflowRunnerExecutor::Client(client));
79        self
80    }
81
82    pub fn with_custom_worker(
83        mut self,
84        custom_worker: Option<&'a dyn YamlWorkflowCustomWorkerExecutor>,
85    ) -> Self {
86        self.custom_worker = custom_worker;
87        self
88    }
89
90    pub fn with_event_sink(mut self, event_sink: Option<&'a dyn YamlWorkflowEventSink>) -> Self {
91        self.event_sink = event_sink;
92        self
93    }
94
95    pub fn with_typed_event_sink(
96        mut self,
97        typed_event_sink: Option<&'a dyn YamlWorkflowTypedEventSink>,
98    ) -> Self {
99        self.typed_event_sink = typed_event_sink;
100        self
101    }
102
103    pub fn with_options(mut self, options: &'a YamlWorkflowRunOptions) -> Self {
104        self.options = Some(options);
105        self
106    }
107
108    pub async fn run(self) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
109        let WorkflowRunner {
110            source,
111            workflow_input,
112            executor,
113            custom_worker,
114            event_sink,
115            typed_event_sink,
116            options,
117        } = self;
118
119        let workflow_input = if let Some(workflow_input) = workflow_input {
120            workflow_input
121        } else {
122            return Err(YamlWorkflowRunError::InvalidInput {
123                message: "workflow input is required; call with_input(...)".to_string(),
124            });
125        };
126
127        let default_options;
128        let options = if let Some(options) = options {
129            options
130        } else {
131            default_options = YamlWorkflowRunOptions::default();
132            &default_options
133        };
134
135        let executor = executor.ok_or_else(|| YamlWorkflowRunError::InvalidInput {
136            message: "workflow executor is required; call with_executor(...) or with_client(...)"
137                .to_string(),
138        })?;
139
140        let typed_event_adapter = typed_event_sink.map(YamlWorkflowTypedEventSinkAdapter::new);
141        let fanout_sink = match (event_sink, typed_event_adapter.as_ref()) {
142            (Some(legacy_sink), Some(typed_sink_adapter)) => Some(
143                YamlWorkflowEventSinkFanout::new(legacy_sink, typed_sink_adapter),
144            ),
145            _ => None,
146        };
147        let resolved_event_sink: Option<&dyn YamlWorkflowEventSink> =
148            if let Some(fanout_sink) = fanout_sink.as_ref() {
149                Some(fanout_sink)
150            } else if let Some(typed_sink_adapter) = typed_event_adapter.as_ref() {
151                Some(typed_sink_adapter)
152            } else {
153                event_sink
154            };
155
156        match (source, executor) {
157            (WorkflowRunnerSource::File(path), WorkflowRunnerExecutor::Llm(executor)) => {
158                let (_, workflow) = load_workflow_yaml_file(path)?;
159                run_workflow_yaml_with_custom_worker_and_events_and_options(
160                    &workflow,
161                    workflow_input,
162                    executor,
163                    custom_worker,
164                    resolved_event_sink,
165                    options,
166                )
167                .await
168            }
169            (WorkflowRunnerSource::File(path), WorkflowRunnerExecutor::Client(client)) => {
170                let (_, workflow) = load_workflow_yaml_file(path)?;
171                run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
172                    &workflow,
173                    workflow_input,
174                    client,
175                    custom_worker,
176                    resolved_event_sink,
177                    options,
178                )
179                .await
180            }
181            (WorkflowRunnerSource::Inline(workflow), WorkflowRunnerExecutor::Llm(executor)) => {
182                run_workflow_yaml_with_custom_worker_and_events_and_options(
183                    workflow,
184                    workflow_input,
185                    executor,
186                    custom_worker,
187                    resolved_event_sink,
188                    options,
189                )
190                .await
191            }
192            (WorkflowRunnerSource::Inline(workflow), WorkflowRunnerExecutor::Client(client)) => {
193                run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
194                    workflow,
195                    workflow_input,
196                    client,
197                    custom_worker,
198                    resolved_event_sink,
199                    options,
200                )
201                .await
202            }
203        }
204    }
205
206    /// Execute the workflow and return the typed projection.
207    ///
208    /// The typed output keeps only workflow identity, traversal, and node
209    /// outputs. This compatibility bridge intentionally omits observability
210    /// fields from `YamlWorkflowRunOutput`, including `step_timings`,
211    /// `llm_node_metrics`, token counters, `total_elapsed_ms`, `trace_id`,
212    /// and `metadata`.
213    /// Call `run()` when those diagnostics are required.
214    pub async fn run_typed(self) -> Result<YamlWorkflowRunTypedOutput, YamlWorkflowRunError> {
215        let WorkflowRunner {
216            source,
217            workflow_input,
218            executor,
219            custom_worker,
220            event_sink,
221            typed_event_sink,
222            options,
223        } = self;
224
225        match source {
226            WorkflowRunnerSource::Inline(workflow) => {
227                let output = WorkflowRunner {
228                    source: WorkflowRunnerSource::Inline(workflow),
229                    workflow_input,
230                    executor,
231                    custom_worker,
232                    event_sink,
233                    typed_event_sink,
234                    options,
235                }
236                .run()
237                .await?;
238                Ok(output.to_typed_output(workflow))
239            }
240            WorkflowRunnerSource::File(path) => {
241                let (_, workflow) = load_workflow_yaml_file(path)?;
242                let output = WorkflowRunner::from_workflow(&workflow)
243                    .with_optional_input(workflow_input)
244                    .with_optional_executor(executor)
245                    .with_custom_worker(custom_worker)
246                    .with_event_sink(event_sink)
247                    .with_typed_event_sink(typed_event_sink)
248                    .with_optional_options(options)
249                    .run()
250                    .await?;
251                Ok(output.to_typed_output(&workflow))
252            }
253        }
254    }
255
256    fn with_optional_input(mut self, workflow_input: Option<&'a Value>) -> Self {
257        self.workflow_input = workflow_input;
258        self
259    }
260
261    fn with_optional_executor(mut self, executor: Option<WorkflowRunnerExecutor<'a>>) -> Self {
262        self.executor = executor;
263        self
264    }
265
266    fn with_optional_options(mut self, options: Option<&'a YamlWorkflowRunOptions>) -> Self {
267        self.options = options;
268        self
269    }
270}