simple_agents_workflow/yaml_runner/
runner.rs1use std::path::Path;
2
3use serde_json::{json, Value};
4use simple_agents_core::SimpleAgentsClient;
5
6use super::typed_contracts::YamlWorkflowEventSinkFanout;
7use super::{
8 load_workflow_yaml_file,
9 run_workflow_yaml_with_client_and_custom_worker_and_events_and_options,
10 run_workflow_yaml_with_custom_worker_and_events_and_options, YamlWorkflow,
11 YamlWorkflowCustomWorkerExecutor, YamlWorkflowEventSink, YamlWorkflowLlmExecutor,
12 YamlWorkflowRunError, YamlWorkflowRunOptions, YamlWorkflowRunOutput,
13 YamlWorkflowRunTypedOutput, YamlWorkflowTypedEventSink, YamlWorkflowTypedEventSinkAdapter,
14};
15
16#[derive(Clone, Copy)]
17enum WorkflowRunnerExecutor<'a> {
18 Llm(&'a dyn YamlWorkflowLlmExecutor),
19 Client(&'a SimpleAgentsClient),
20}
21
22#[derive(Clone, Copy)]
23enum WorkflowRunnerSource<'a> {
24 File(&'a Path),
25 Inline(&'a YamlWorkflow),
26}
27
28pub struct WorkflowRunner<'a> {
33 source: WorkflowRunnerSource<'a>,
34 workflow_input: Option<&'a Value>,
35 email_text: Option<&'a str>,
36 executor: Option<WorkflowRunnerExecutor<'a>>,
37 custom_worker: Option<&'a dyn YamlWorkflowCustomWorkerExecutor>,
38 event_sink: Option<&'a dyn YamlWorkflowEventSink>,
39 typed_event_sink: Option<&'a dyn YamlWorkflowTypedEventSink>,
40 options: Option<&'a YamlWorkflowRunOptions>,
41}
42
43impl<'a> WorkflowRunner<'a> {
44 pub fn from_file(workflow_path: &'a Path) -> Self {
45 Self {
46 source: WorkflowRunnerSource::File(workflow_path),
47 workflow_input: None,
48 email_text: None,
49 executor: None,
50 custom_worker: None,
51 event_sink: None,
52 typed_event_sink: None,
53 options: None,
54 }
55 }
56
57 pub fn from_workflow(workflow: &'a YamlWorkflow) -> Self {
58 Self {
59 source: WorkflowRunnerSource::Inline(workflow),
60 workflow_input: None,
61 email_text: None,
62 executor: None,
63 custom_worker: None,
64 event_sink: None,
65 typed_event_sink: None,
66 options: None,
67 }
68 }
69
70 pub fn with_input(mut self, workflow_input: &'a Value) -> Self {
71 self.workflow_input = Some(workflow_input);
72 self
73 }
74
75 pub fn with_email_text(mut self, email_text: &'a str) -> Self {
76 self.email_text = Some(email_text);
77 self
78 }
79
80 pub fn with_executor(mut self, executor: &'a dyn YamlWorkflowLlmExecutor) -> Self {
81 self.executor = Some(WorkflowRunnerExecutor::Llm(executor));
82 self
83 }
84
85 pub fn with_client(mut self, client: &'a SimpleAgentsClient) -> Self {
86 self.executor = Some(WorkflowRunnerExecutor::Client(client));
87 self
88 }
89
90 pub fn with_custom_worker(
91 mut self,
92 custom_worker: Option<&'a dyn YamlWorkflowCustomWorkerExecutor>,
93 ) -> Self {
94 self.custom_worker = custom_worker;
95 self
96 }
97
98 pub fn with_event_sink(mut self, event_sink: Option<&'a dyn YamlWorkflowEventSink>) -> Self {
99 self.event_sink = event_sink;
100 self
101 }
102
103 pub fn with_typed_event_sink(
104 mut self,
105 typed_event_sink: Option<&'a dyn YamlWorkflowTypedEventSink>,
106 ) -> Self {
107 self.typed_event_sink = typed_event_sink;
108 self
109 }
110
111 pub fn with_options(mut self, options: &'a YamlWorkflowRunOptions) -> Self {
112 self.options = Some(options);
113 self
114 }
115
116 pub async fn run(self) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
117 let WorkflowRunner {
118 source,
119 workflow_input,
120 email_text,
121 executor,
122 custom_worker,
123 event_sink,
124 typed_event_sink,
125 options,
126 } = self;
127
128 let fallback_input;
129 let workflow_input = if let Some(workflow_input) = workflow_input {
130 workflow_input
131 } else if let Some(email_text) = email_text {
132 fallback_input = json!({ "email_text": email_text });
133 &fallback_input
134 } else {
135 return Err(YamlWorkflowRunError::InvalidInput {
136 message: "workflow input is required; call with_input(...) or with_email_text(...)"
137 .to_string(),
138 });
139 };
140
141 let default_options;
142 let options = if let Some(options) = options {
143 options
144 } else {
145 default_options = YamlWorkflowRunOptions::default();
146 &default_options
147 };
148
149 let executor = executor.ok_or_else(|| YamlWorkflowRunError::InvalidInput {
150 message: "workflow executor is required; call with_executor(...) or with_client(...)"
151 .to_string(),
152 })?;
153
154 let typed_event_adapter = typed_event_sink.map(YamlWorkflowTypedEventSinkAdapter::new);
155 let fanout_sink = match (event_sink, typed_event_adapter.as_ref()) {
156 (Some(legacy_sink), Some(typed_sink_adapter)) => Some(
157 YamlWorkflowEventSinkFanout::new(legacy_sink, typed_sink_adapter),
158 ),
159 _ => None,
160 };
161 let resolved_event_sink: Option<&dyn YamlWorkflowEventSink> =
162 if let Some(fanout_sink) = fanout_sink.as_ref() {
163 Some(fanout_sink)
164 } else if let Some(typed_sink_adapter) = typed_event_adapter.as_ref() {
165 Some(typed_sink_adapter)
166 } else {
167 event_sink
168 };
169
170 match (source, executor) {
171 (WorkflowRunnerSource::File(path), WorkflowRunnerExecutor::Llm(executor)) => {
172 let (_, workflow) = load_workflow_yaml_file(path)?;
173 run_workflow_yaml_with_custom_worker_and_events_and_options(
174 &workflow,
175 workflow_input,
176 executor,
177 custom_worker,
178 resolved_event_sink,
179 options,
180 )
181 .await
182 }
183 (WorkflowRunnerSource::File(path), WorkflowRunnerExecutor::Client(client)) => {
184 let (_, workflow) = load_workflow_yaml_file(path)?;
185 run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
186 &workflow,
187 workflow_input,
188 client,
189 custom_worker,
190 resolved_event_sink,
191 options,
192 )
193 .await
194 }
195 (WorkflowRunnerSource::Inline(workflow), WorkflowRunnerExecutor::Llm(executor)) => {
196 run_workflow_yaml_with_custom_worker_and_events_and_options(
197 workflow,
198 workflow_input,
199 executor,
200 custom_worker,
201 resolved_event_sink,
202 options,
203 )
204 .await
205 }
206 (WorkflowRunnerSource::Inline(workflow), WorkflowRunnerExecutor::Client(client)) => {
207 run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
208 workflow,
209 workflow_input,
210 client,
211 custom_worker,
212 resolved_event_sink,
213 options,
214 )
215 .await
216 }
217 }
218 }
219
220 pub async fn run_typed(self) -> Result<YamlWorkflowRunTypedOutput, YamlWorkflowRunError> {
229 let WorkflowRunner {
230 source,
231 workflow_input,
232 email_text,
233 executor,
234 custom_worker,
235 event_sink,
236 typed_event_sink,
237 options,
238 } = self;
239
240 match source {
241 WorkflowRunnerSource::Inline(workflow) => {
242 let output = WorkflowRunner {
243 source: WorkflowRunnerSource::Inline(workflow),
244 workflow_input,
245 email_text,
246 executor,
247 custom_worker,
248 event_sink,
249 typed_event_sink,
250 options,
251 }
252 .run()
253 .await?;
254 Ok(output.to_typed_output(workflow))
255 }
256 WorkflowRunnerSource::File(path) => {
257 let (_, workflow) = load_workflow_yaml_file(path)?;
258 let output = WorkflowRunner::from_workflow(&workflow)
259 .with_optional_input(workflow_input)
260 .with_optional_email_text(email_text)
261 .with_optional_executor(executor)
262 .with_custom_worker(custom_worker)
263 .with_event_sink(event_sink)
264 .with_typed_event_sink(typed_event_sink)
265 .with_optional_options(options)
266 .run()
267 .await?;
268 Ok(output.to_typed_output(&workflow))
269 }
270 }
271 }
272
273 fn with_optional_input(mut self, workflow_input: Option<&'a Value>) -> Self {
274 self.workflow_input = workflow_input;
275 self
276 }
277
278 fn with_optional_email_text(mut self, email_text: Option<&'a str>) -> Self {
279 self.email_text = email_text;
280 self
281 }
282
283 fn with_optional_executor(mut self, executor: Option<WorkflowRunnerExecutor<'a>>) -> Self {
284 self.executor = executor;
285 self
286 }
287
288 fn with_optional_options(mut self, options: Option<&'a YamlWorkflowRunOptions>) -> Self {
289 self.options = options;
290 self
291 }
292}