simple_agents_workflow/yaml_runner/
runner.rs1use std::path::Path;
2
3use serde_json::Value;
4use simple_agents_core::SimpleAgentsClient;
5
6use super::typed_contracts::YamlWorkflowEventSinkFanout;
7use super::{
8 load_workflow_yaml_file,
9 run_workflow_yaml_with_client_and_custom_worker_and_events_and_options,
10 run_workflow_yaml_with_custom_worker_and_events_and_options, YamlWorkflow,
11 YamlWorkflowCustomWorkerExecutor, YamlWorkflowEventSink, YamlWorkflowLlmExecutor,
12 YamlWorkflowRunError, YamlWorkflowRunOptions, YamlWorkflowRunOutput,
13 YamlWorkflowRunTypedOutput, YamlWorkflowTypedEventSink, YamlWorkflowTypedEventSinkAdapter,
14};
15
16#[derive(Clone, Copy)]
17enum WorkflowRunnerExecutor<'a> {
18 Llm(&'a dyn YamlWorkflowLlmExecutor),
19 Client(&'a SimpleAgentsClient),
20}
21
22#[derive(Clone, Copy)]
23enum WorkflowRunnerSource<'a> {
24 File(&'a Path),
25 Inline(&'a YamlWorkflow),
26}
27
28pub struct WorkflowRunner<'a> {
33 source: WorkflowRunnerSource<'a>,
34 workflow_input: Option<&'a Value>,
35 executor: Option<WorkflowRunnerExecutor<'a>>,
36 custom_worker: Option<&'a dyn YamlWorkflowCustomWorkerExecutor>,
37 event_sink: Option<&'a dyn YamlWorkflowEventSink>,
38 typed_event_sink: Option<&'a dyn YamlWorkflowTypedEventSink>,
39 options: Option<&'a YamlWorkflowRunOptions>,
40}
41
42impl<'a> WorkflowRunner<'a> {
43 pub fn from_file(workflow_path: &'a Path) -> Self {
44 Self {
45 source: WorkflowRunnerSource::File(workflow_path),
46 workflow_input: None,
47 executor: None,
48 custom_worker: None,
49 event_sink: None,
50 typed_event_sink: None,
51 options: None,
52 }
53 }
54
55 pub fn from_workflow(workflow: &'a YamlWorkflow) -> Self {
56 Self {
57 source: WorkflowRunnerSource::Inline(workflow),
58 workflow_input: None,
59 executor: None,
60 custom_worker: None,
61 event_sink: None,
62 typed_event_sink: None,
63 options: None,
64 }
65 }
66
67 pub fn with_input(mut self, workflow_input: &'a Value) -> Self {
68 self.workflow_input = Some(workflow_input);
69 self
70 }
71
72 pub fn with_executor(mut self, executor: &'a dyn YamlWorkflowLlmExecutor) -> Self {
73 self.executor = Some(WorkflowRunnerExecutor::Llm(executor));
74 self
75 }
76
77 pub fn with_client(mut self, client: &'a SimpleAgentsClient) -> Self {
78 self.executor = Some(WorkflowRunnerExecutor::Client(client));
79 self
80 }
81
82 pub fn with_custom_worker(
83 mut self,
84 custom_worker: Option<&'a dyn YamlWorkflowCustomWorkerExecutor>,
85 ) -> Self {
86 self.custom_worker = custom_worker;
87 self
88 }
89
90 pub fn with_event_sink(mut self, event_sink: Option<&'a dyn YamlWorkflowEventSink>) -> Self {
91 self.event_sink = event_sink;
92 self
93 }
94
95 pub fn with_typed_event_sink(
96 mut self,
97 typed_event_sink: Option<&'a dyn YamlWorkflowTypedEventSink>,
98 ) -> Self {
99 self.typed_event_sink = typed_event_sink;
100 self
101 }
102
103 pub fn with_options(mut self, options: &'a YamlWorkflowRunOptions) -> Self {
104 self.options = Some(options);
105 self
106 }
107
108 pub async fn run(self) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
109 let WorkflowRunner {
110 source,
111 workflow_input,
112 executor,
113 custom_worker,
114 event_sink,
115 typed_event_sink,
116 options,
117 } = self;
118
119 let workflow_input = if let Some(workflow_input) = workflow_input {
120 workflow_input
121 } else {
122 return Err(YamlWorkflowRunError::InvalidInput {
123 message: "workflow input is required; call with_input(...)".to_string(),
124 });
125 };
126
127 let default_options;
128 let options = if let Some(options) = options {
129 options
130 } else {
131 default_options = YamlWorkflowRunOptions::default();
132 &default_options
133 };
134
135 let executor = executor.ok_or_else(|| YamlWorkflowRunError::InvalidInput {
136 message: "workflow executor is required; call with_executor(...) or with_client(...)"
137 .to_string(),
138 })?;
139
140 let typed_event_adapter = typed_event_sink.map(YamlWorkflowTypedEventSinkAdapter::new);
141 let fanout_sink = match (event_sink, typed_event_adapter.as_ref()) {
142 (Some(legacy_sink), Some(typed_sink_adapter)) => Some(
143 YamlWorkflowEventSinkFanout::new(legacy_sink, typed_sink_adapter),
144 ),
145 _ => None,
146 };
147 let resolved_event_sink: Option<&dyn YamlWorkflowEventSink> =
148 if let Some(fanout_sink) = fanout_sink.as_ref() {
149 Some(fanout_sink)
150 } else if let Some(typed_sink_adapter) = typed_event_adapter.as_ref() {
151 Some(typed_sink_adapter)
152 } else {
153 event_sink
154 };
155
156 match (source, executor) {
157 (WorkflowRunnerSource::File(path), WorkflowRunnerExecutor::Llm(executor)) => {
158 let (_, workflow) = load_workflow_yaml_file(path)?;
159 run_workflow_yaml_with_custom_worker_and_events_and_options(
160 &workflow,
161 workflow_input,
162 executor,
163 custom_worker,
164 resolved_event_sink,
165 options,
166 )
167 .await
168 }
169 (WorkflowRunnerSource::File(path), WorkflowRunnerExecutor::Client(client)) => {
170 let (_, workflow) = load_workflow_yaml_file(path)?;
171 run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
172 &workflow,
173 workflow_input,
174 client,
175 custom_worker,
176 resolved_event_sink,
177 options,
178 )
179 .await
180 }
181 (WorkflowRunnerSource::Inline(workflow), WorkflowRunnerExecutor::Llm(executor)) => {
182 run_workflow_yaml_with_custom_worker_and_events_and_options(
183 workflow,
184 workflow_input,
185 executor,
186 custom_worker,
187 resolved_event_sink,
188 options,
189 )
190 .await
191 }
192 (WorkflowRunnerSource::Inline(workflow), WorkflowRunnerExecutor::Client(client)) => {
193 run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
194 workflow,
195 workflow_input,
196 client,
197 custom_worker,
198 resolved_event_sink,
199 options,
200 )
201 .await
202 }
203 }
204 }
205
206 pub async fn run_typed(self) -> Result<YamlWorkflowRunTypedOutput, YamlWorkflowRunError> {
215 let WorkflowRunner {
216 source,
217 workflow_input,
218 executor,
219 custom_worker,
220 event_sink,
221 typed_event_sink,
222 options,
223 } = self;
224
225 match source {
226 WorkflowRunnerSource::Inline(workflow) => {
227 let output = WorkflowRunner {
228 source: WorkflowRunnerSource::Inline(workflow),
229 workflow_input,
230 executor,
231 custom_worker,
232 event_sink,
233 typed_event_sink,
234 options,
235 }
236 .run()
237 .await?;
238 Ok(output.to_typed_output(workflow))
239 }
240 WorkflowRunnerSource::File(path) => {
241 let (_, workflow) = load_workflow_yaml_file(path)?;
242 let output = WorkflowRunner::from_workflow(&workflow)
243 .with_optional_input(workflow_input)
244 .with_optional_executor(executor)
245 .with_custom_worker(custom_worker)
246 .with_event_sink(event_sink)
247 .with_typed_event_sink(typed_event_sink)
248 .with_optional_options(options)
249 .run()
250 .await?;
251 Ok(output.to_typed_output(&workflow))
252 }
253 }
254 }
255
256 fn with_optional_input(mut self, workflow_input: Option<&'a Value>) -> Self {
257 self.workflow_input = workflow_input;
258 self
259 }
260
261 fn with_optional_executor(mut self, executor: Option<WorkflowRunnerExecutor<'a>>) -> Self {
262 self.executor = executor;
263 self
264 }
265
266 fn with_optional_options(mut self, options: Option<&'a YamlWorkflowRunOptions>) -> Self {
267 self.options = options;
268 self
269 }
270}