simple_agents_workflow/yaml_runner/
runner.rs1use std::path::Path;
2
3use serde_json::{json, Value};
4use simple_agents_core::SimpleAgentsClient;
5
6use super::typed_contracts::YamlWorkflowEventSinkFanout;
7use super::{
8 load_workflow_yaml_file,
9 run_workflow_yaml_with_client_and_custom_worker_and_events_and_options,
10 run_workflow_yaml_with_custom_worker_and_events_and_options, YamlWorkflow,
11 YamlWorkflowCustomWorkerExecutor, YamlWorkflowEventSink, YamlWorkflowLlmExecutor,
12 YamlWorkflowRunError, YamlWorkflowRunOptions, YamlWorkflowRunOutput,
13 YamlWorkflowRunTypedOutput, YamlWorkflowTypedEventSink, YamlWorkflowTypedEventSinkAdapter,
14};
15
16#[derive(Clone, Copy)]
17enum WorkflowRunnerExecutor<'a> {
18 Llm(&'a dyn YamlWorkflowLlmExecutor),
19 Client(&'a SimpleAgentsClient),
20}
21
22#[derive(Clone, Copy)]
23enum WorkflowRunnerSource<'a> {
24 File(&'a Path),
25 Inline(&'a YamlWorkflow),
26}
27
28pub struct WorkflowRunner<'a> {
33 source: WorkflowRunnerSource<'a>,
34 workflow_input: Option<&'a Value>,
35 email_text: Option<&'a str>,
36 executor: Option<WorkflowRunnerExecutor<'a>>,
37 custom_worker: Option<&'a dyn YamlWorkflowCustomWorkerExecutor>,
38 event_sink: Option<&'a dyn YamlWorkflowEventSink>,
39 typed_event_sink: Option<&'a dyn YamlWorkflowTypedEventSink>,
40 options: Option<&'a YamlWorkflowRunOptions>,
41}
42
43impl<'a> WorkflowRunner<'a> {
44 pub fn from_file(workflow_path: &'a Path) -> Self {
45 Self {
46 source: WorkflowRunnerSource::File(workflow_path),
47 workflow_input: None,
48 email_text: None,
49 executor: None,
50 custom_worker: None,
51 event_sink: None,
52 typed_event_sink: None,
53 options: None,
54 }
55 }
56
57 pub fn from_workflow(workflow: &'a YamlWorkflow) -> Self {
58 Self {
59 source: WorkflowRunnerSource::Inline(workflow),
60 workflow_input: None,
61 email_text: None,
62 executor: None,
63 custom_worker: None,
64 event_sink: None,
65 typed_event_sink: None,
66 options: None,
67 }
68 }
69
70 pub fn with_input(mut self, workflow_input: &'a Value) -> Self {
71 self.workflow_input = Some(workflow_input);
72 self
73 }
74
75 pub fn with_email_text(mut self, email_text: &'a str) -> Self {
76 self.email_text = Some(email_text);
77 self
78 }
79
80 pub fn with_executor(mut self, executor: &'a dyn YamlWorkflowLlmExecutor) -> Self {
81 self.executor = Some(WorkflowRunnerExecutor::Llm(executor));
82 self
83 }
84
85 pub fn with_client(mut self, client: &'a SimpleAgentsClient) -> Self {
86 self.executor = Some(WorkflowRunnerExecutor::Client(client));
87 self
88 }
89
90 pub fn with_custom_worker(
91 mut self,
92 custom_worker: Option<&'a dyn YamlWorkflowCustomWorkerExecutor>,
93 ) -> Self {
94 self.custom_worker = custom_worker;
95 self
96 }
97
98 pub fn with_event_sink(mut self, event_sink: Option<&'a dyn YamlWorkflowEventSink>) -> Self {
99 self.event_sink = event_sink;
100 self
101 }
102
103 pub fn with_typed_event_sink(
104 mut self,
105 typed_event_sink: Option<&'a dyn YamlWorkflowTypedEventSink>,
106 ) -> Self {
107 self.typed_event_sink = typed_event_sink;
108 self
109 }
110
111 pub fn with_options(mut self, options: &'a YamlWorkflowRunOptions) -> Self {
112 self.options = Some(options);
113 self
114 }
115
116 pub async fn run(self) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
117 let WorkflowRunner {
118 source,
119 workflow_input,
120 email_text,
121 executor,
122 custom_worker,
123 event_sink,
124 typed_event_sink,
125 options,
126 } = self;
127
128 let fallback_input;
129 let workflow_input = if let Some(workflow_input) = workflow_input {
130 workflow_input
131 } else if let Some(email_text) = email_text {
132 fallback_input = json!({ "email_text": email_text });
133 &fallback_input
134 } else {
135 return Err(YamlWorkflowRunError::InvalidInput {
136 message: "workflow input is required; call with_input(...) or with_email_text(...)"
137 .to_string(),
138 });
139 };
140
141 let default_options;
142 let options = if let Some(options) = options {
143 options
144 } else {
145 default_options = YamlWorkflowRunOptions::default();
146 &default_options
147 };
148
149 let executor = executor.ok_or_else(|| YamlWorkflowRunError::InvalidInput {
150 message: "workflow executor is required; call with_executor(...) or with_client(...)"
151 .to_string(),
152 })?;
153
154 let typed_event_adapter = typed_event_sink.map(YamlWorkflowTypedEventSinkAdapter::new);
155 let fanout_sink = match (event_sink, typed_event_adapter.as_ref()) {
156 (Some(legacy_sink), Some(typed_sink_adapter)) => Some(
157 YamlWorkflowEventSinkFanout::new(legacy_sink, typed_sink_adapter),
158 ),
159 _ => None,
160 };
161 let resolved_event_sink: Option<&dyn YamlWorkflowEventSink> =
162 if let Some(fanout_sink) = fanout_sink.as_ref() {
163 Some(fanout_sink)
164 } else if let Some(typed_sink_adapter) = typed_event_adapter.as_ref() {
165 Some(typed_sink_adapter)
166 } else {
167 event_sink
168 };
169
170 match (source, executor) {
171 (WorkflowRunnerSource::File(path), WorkflowRunnerExecutor::Llm(executor)) => {
172 let (_, workflow) = load_workflow_yaml_file(path)?;
173 run_workflow_yaml_with_custom_worker_and_events_and_options(
174 &workflow,
175 workflow_input,
176 executor,
177 custom_worker,
178 resolved_event_sink,
179 options,
180 )
181 .await
182 }
183 (WorkflowRunnerSource::File(path), WorkflowRunnerExecutor::Client(client)) => {
184 let (_, workflow) = load_workflow_yaml_file(path)?;
185 run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
186 &workflow,
187 workflow_input,
188 client,
189 custom_worker,
190 resolved_event_sink,
191 options,
192 )
193 .await
194 }
195 (WorkflowRunnerSource::Inline(workflow), WorkflowRunnerExecutor::Llm(executor)) => {
196 run_workflow_yaml_with_custom_worker_and_events_and_options(
197 workflow,
198 workflow_input,
199 executor,
200 custom_worker,
201 resolved_event_sink,
202 options,
203 )
204 .await
205 }
206 (WorkflowRunnerSource::Inline(workflow), WorkflowRunnerExecutor::Client(client)) => {
207 run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
208 workflow,
209 workflow_input,
210 client,
211 custom_worker,
212 resolved_event_sink,
213 options,
214 )
215 .await
216 }
217 }
218 }
219
220 pub async fn run_typed(self) -> Result<YamlWorkflowRunTypedOutput, YamlWorkflowRunError> {
221 let WorkflowRunner {
222 source,
223 workflow_input,
224 email_text,
225 executor,
226 custom_worker,
227 event_sink,
228 typed_event_sink,
229 options,
230 } = self;
231
232 match source {
233 WorkflowRunnerSource::Inline(workflow) => {
234 let output = WorkflowRunner {
235 source: WorkflowRunnerSource::Inline(workflow),
236 workflow_input,
237 email_text,
238 executor,
239 custom_worker,
240 event_sink,
241 typed_event_sink,
242 options,
243 }
244 .run()
245 .await?;
246 Ok(output.to_typed_output(workflow))
247 }
248 WorkflowRunnerSource::File(path) => {
249 let (_, workflow) = load_workflow_yaml_file(path)?;
250 let output = WorkflowRunner::from_workflow(&workflow)
251 .with_optional_input(workflow_input)
252 .with_optional_email_text(email_text)
253 .with_optional_executor(executor)
254 .with_custom_worker(custom_worker)
255 .with_event_sink(event_sink)
256 .with_typed_event_sink(typed_event_sink)
257 .with_optional_options(options)
258 .run()
259 .await?;
260 Ok(output.to_typed_output(&workflow))
261 }
262 }
263 }
264
265 fn with_optional_input(mut self, workflow_input: Option<&'a Value>) -> Self {
266 self.workflow_input = workflow_input;
267 self
268 }
269
270 fn with_optional_email_text(mut self, email_text: Option<&'a str>) -> Self {
271 self.email_text = email_text;
272 self
273 }
274
275 fn with_optional_executor(mut self, executor: Option<WorkflowRunnerExecutor<'a>>) -> Self {
276 self.executor = executor;
277 self
278 }
279
280 fn with_optional_options(mut self, options: Option<&'a YamlWorkflowRunOptions>) -> Self {
281 self.options = options;
282 self
283 }
284}