Skip to main content

simple_agents_workflow/yaml_runner/
runner.rs

1use std::path::Path;
2
3use serde_json::{json, Value};
4use simple_agents_core::SimpleAgentsClient;
5
6use super::typed_contracts::YamlWorkflowEventSinkFanout;
7use super::{
8    load_workflow_yaml_file,
9    run_workflow_yaml_with_client_and_custom_worker_and_events_and_options,
10    run_workflow_yaml_with_custom_worker_and_events_and_options, YamlWorkflow,
11    YamlWorkflowCustomWorkerExecutor, YamlWorkflowEventSink, YamlWorkflowLlmExecutor,
12    YamlWorkflowRunError, YamlWorkflowRunOptions, YamlWorkflowRunOutput,
13    YamlWorkflowRunTypedOutput, YamlWorkflowTypedEventSink, YamlWorkflowTypedEventSinkAdapter,
14};
15
16#[derive(Clone, Copy)]
17enum WorkflowRunnerExecutor<'a> {
18    Llm(&'a dyn YamlWorkflowLlmExecutor),
19    Client(&'a SimpleAgentsClient),
20}
21
22#[derive(Clone, Copy)]
23enum WorkflowRunnerSource<'a> {
24    File(&'a Path),
25    Inline(&'a YamlWorkflow),
26}
27
28/// Builder-style runner for YAML workflow execution.
29///
30/// This is the preferred additive API for configuring workflow runs while keeping
31/// legacy `run_*` helpers as compatibility adapters.
32pub struct WorkflowRunner<'a> {
33    source: WorkflowRunnerSource<'a>,
34    workflow_input: Option<&'a Value>,
35    email_text: Option<&'a str>,
36    executor: Option<WorkflowRunnerExecutor<'a>>,
37    custom_worker: Option<&'a dyn YamlWorkflowCustomWorkerExecutor>,
38    event_sink: Option<&'a dyn YamlWorkflowEventSink>,
39    typed_event_sink: Option<&'a dyn YamlWorkflowTypedEventSink>,
40    options: Option<&'a YamlWorkflowRunOptions>,
41}
42
43impl<'a> WorkflowRunner<'a> {
44    pub fn from_file(workflow_path: &'a Path) -> Self {
45        Self {
46            source: WorkflowRunnerSource::File(workflow_path),
47            workflow_input: None,
48            email_text: None,
49            executor: None,
50            custom_worker: None,
51            event_sink: None,
52            typed_event_sink: None,
53            options: None,
54        }
55    }
56
57    pub fn from_workflow(workflow: &'a YamlWorkflow) -> Self {
58        Self {
59            source: WorkflowRunnerSource::Inline(workflow),
60            workflow_input: None,
61            email_text: None,
62            executor: None,
63            custom_worker: None,
64            event_sink: None,
65            typed_event_sink: None,
66            options: None,
67        }
68    }
69
70    pub fn with_input(mut self, workflow_input: &'a Value) -> Self {
71        self.workflow_input = Some(workflow_input);
72        self
73    }
74
75    pub fn with_email_text(mut self, email_text: &'a str) -> Self {
76        self.email_text = Some(email_text);
77        self
78    }
79
80    pub fn with_executor(mut self, executor: &'a dyn YamlWorkflowLlmExecutor) -> Self {
81        self.executor = Some(WorkflowRunnerExecutor::Llm(executor));
82        self
83    }
84
85    pub fn with_client(mut self, client: &'a SimpleAgentsClient) -> Self {
86        self.executor = Some(WorkflowRunnerExecutor::Client(client));
87        self
88    }
89
90    pub fn with_custom_worker(
91        mut self,
92        custom_worker: Option<&'a dyn YamlWorkflowCustomWorkerExecutor>,
93    ) -> Self {
94        self.custom_worker = custom_worker;
95        self
96    }
97
98    pub fn with_event_sink(mut self, event_sink: Option<&'a dyn YamlWorkflowEventSink>) -> Self {
99        self.event_sink = event_sink;
100        self
101    }
102
103    pub fn with_typed_event_sink(
104        mut self,
105        typed_event_sink: Option<&'a dyn YamlWorkflowTypedEventSink>,
106    ) -> Self {
107        self.typed_event_sink = typed_event_sink;
108        self
109    }
110
111    pub fn with_options(mut self, options: &'a YamlWorkflowRunOptions) -> Self {
112        self.options = Some(options);
113        self
114    }
115
116    pub async fn run(self) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
117        let WorkflowRunner {
118            source,
119            workflow_input,
120            email_text,
121            executor,
122            custom_worker,
123            event_sink,
124            typed_event_sink,
125            options,
126        } = self;
127
128        let fallback_input;
129        let workflow_input = if let Some(workflow_input) = workflow_input {
130            workflow_input
131        } else if let Some(email_text) = email_text {
132            fallback_input = json!({ "email_text": email_text });
133            &fallback_input
134        } else {
135            return Err(YamlWorkflowRunError::InvalidInput {
136                message: "workflow input is required; call with_input(...) or with_email_text(...)"
137                    .to_string(),
138            });
139        };
140
141        let default_options;
142        let options = if let Some(options) = options {
143            options
144        } else {
145            default_options = YamlWorkflowRunOptions::default();
146            &default_options
147        };
148
149        let executor = executor.ok_or_else(|| YamlWorkflowRunError::InvalidInput {
150            message: "workflow executor is required; call with_executor(...) or with_client(...)"
151                .to_string(),
152        })?;
153
154        let typed_event_adapter = typed_event_sink.map(YamlWorkflowTypedEventSinkAdapter::new);
155        let fanout_sink = match (event_sink, typed_event_adapter.as_ref()) {
156            (Some(legacy_sink), Some(typed_sink_adapter)) => Some(
157                YamlWorkflowEventSinkFanout::new(legacy_sink, typed_sink_adapter),
158            ),
159            _ => None,
160        };
161        let resolved_event_sink: Option<&dyn YamlWorkflowEventSink> =
162            if let Some(fanout_sink) = fanout_sink.as_ref() {
163                Some(fanout_sink)
164            } else if let Some(typed_sink_adapter) = typed_event_adapter.as_ref() {
165                Some(typed_sink_adapter)
166            } else {
167                event_sink
168            };
169
170        match (source, executor) {
171            (WorkflowRunnerSource::File(path), WorkflowRunnerExecutor::Llm(executor)) => {
172                let (_, workflow) = load_workflow_yaml_file(path)?;
173                run_workflow_yaml_with_custom_worker_and_events_and_options(
174                    &workflow,
175                    workflow_input,
176                    executor,
177                    custom_worker,
178                    resolved_event_sink,
179                    options,
180                )
181                .await
182            }
183            (WorkflowRunnerSource::File(path), WorkflowRunnerExecutor::Client(client)) => {
184                let (_, workflow) = load_workflow_yaml_file(path)?;
185                run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
186                    &workflow,
187                    workflow_input,
188                    client,
189                    custom_worker,
190                    resolved_event_sink,
191                    options,
192                )
193                .await
194            }
195            (WorkflowRunnerSource::Inline(workflow), WorkflowRunnerExecutor::Llm(executor)) => {
196                run_workflow_yaml_with_custom_worker_and_events_and_options(
197                    workflow,
198                    workflow_input,
199                    executor,
200                    custom_worker,
201                    resolved_event_sink,
202                    options,
203                )
204                .await
205            }
206            (WorkflowRunnerSource::Inline(workflow), WorkflowRunnerExecutor::Client(client)) => {
207                run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
208                    workflow,
209                    workflow_input,
210                    client,
211                    custom_worker,
212                    resolved_event_sink,
213                    options,
214                )
215                .await
216            }
217        }
218    }
219
220    /// Execute the workflow and return the typed projection.
221    ///
222    /// The typed output keeps only workflow identity, traversal, and node
223    /// outputs. This compatibility bridge intentionally omits observability
224    /// fields from `YamlWorkflowRunOutput`, including `step_timings`,
225    /// `llm_node_metrics`, token counters, `total_elapsed_ms`, `trace_id`,
226    /// and `metadata` (plus legacy compatibility fields such as `email_text`).
227    /// Call `run()` when those diagnostics are required.
228    pub async fn run_typed(self) -> Result<YamlWorkflowRunTypedOutput, YamlWorkflowRunError> {
229        let WorkflowRunner {
230            source,
231            workflow_input,
232            email_text,
233            executor,
234            custom_worker,
235            event_sink,
236            typed_event_sink,
237            options,
238        } = self;
239
240        match source {
241            WorkflowRunnerSource::Inline(workflow) => {
242                let output = WorkflowRunner {
243                    source: WorkflowRunnerSource::Inline(workflow),
244                    workflow_input,
245                    email_text,
246                    executor,
247                    custom_worker,
248                    event_sink,
249                    typed_event_sink,
250                    options,
251                }
252                .run()
253                .await?;
254                Ok(output.to_typed_output(workflow))
255            }
256            WorkflowRunnerSource::File(path) => {
257                let (_, workflow) = load_workflow_yaml_file(path)?;
258                let output = WorkflowRunner::from_workflow(&workflow)
259                    .with_optional_input(workflow_input)
260                    .with_optional_email_text(email_text)
261                    .with_optional_executor(executor)
262                    .with_custom_worker(custom_worker)
263                    .with_event_sink(event_sink)
264                    .with_typed_event_sink(typed_event_sink)
265                    .with_optional_options(options)
266                    .run()
267                    .await?;
268                Ok(output.to_typed_output(&workflow))
269            }
270        }
271    }
272
273    fn with_optional_input(mut self, workflow_input: Option<&'a Value>) -> Self {
274        self.workflow_input = workflow_input;
275        self
276    }
277
278    fn with_optional_email_text(mut self, email_text: Option<&'a str>) -> Self {
279        self.email_text = email_text;
280        self
281    }
282
283    fn with_optional_executor(mut self, executor: Option<WorkflowRunnerExecutor<'a>>) -> Self {
284        self.executor = executor;
285        self
286    }
287
288    fn with_optional_options(mut self, options: Option<&'a YamlWorkflowRunOptions>) -> Self {
289        self.options = options;
290        self
291    }
292}