sayiir_runtime/runner/
in_process.rs1use super::WorkflowRunner;
2use crate::execution::execute_continuation_async;
3use sayiir_core::codec::Codec;
4use sayiir_core::codec::sealed;
5use sayiir_core::context::with_context;
6use sayiir_core::workflow::{Workflow, WorkflowStatus};
7
8#[derive(Default)]
29pub struct InProcessRunner;
30
31impl WorkflowRunner for InProcessRunner {
32 fn run<'w, C, Input, M>(
33 &self,
34 workflow: &'w Workflow<C, Input, M>,
35 input: Input,
36 ) -> impl std::future::Future<Output = Result<WorkflowStatus, crate::error::RuntimeError>> + Send + 'w
37 where
38 Input: Send + 'static,
39 M: Send + Sync + 'static,
40 C: Codec + sealed::EncodeValue<Input>,
41 {
42 let context = workflow.context().clone();
43 let continuation = workflow.continuation();
44 let codec = context.codec.clone();
45 async move {
46 with_context(context, || async move {
47 let input_bytes = codec.encode(&input)?;
48 match execute_continuation_async(continuation, input_bytes).await {
49 Ok(_) => Ok(WorkflowStatus::Completed),
50 Err(e) => Ok(WorkflowStatus::Failed(e.to_string())),
51 }
52 })
53 .await
54 }
55 }
56}
57
58#[cfg(test)]
59#[allow(
60 clippy::unwrap_used,
61 clippy::expect_used,
62 clippy::panic,
63 clippy::indexing_slicing
64)]
65mod tests {
66 use super::*;
67 use crate::serialization::JsonCodec;
68 use sayiir_core::context::WorkflowContext;
69 use sayiir_core::task::BranchOutputs;
70 use sayiir_core::workflow::WorkflowBuilder;
71 use std::sync::Arc;
72
73 fn ctx() -> WorkflowContext<JsonCodec, ()> {
74 WorkflowContext::new("test-workflow", Arc::new(JsonCodec), Arc::new(()))
75 }
76
77 #[tokio::test]
78 async fn test_single_task() {
79 let workflow = WorkflowBuilder::new(ctx())
80 .then("add_one", |i: u32| async move { Ok(i + 1) })
81 .build()
82 .unwrap();
83
84 let runner = InProcessRunner;
85 let status = runner.run(&workflow, 5u32).await.unwrap();
86 assert!(matches!(status, WorkflowStatus::Completed));
87 }
88
89 #[tokio::test]
90 async fn test_chained_tasks() {
91 let workflow = WorkflowBuilder::new(ctx())
92 .then("add_one", |i: u32| async move { Ok(i + 1) })
93 .then("double", |i: u32| async move { Ok(i * 2) })
94 .then("to_string", |i: u32| async move { Ok(i.to_string()) })
95 .build()
96 .unwrap();
97
98 let runner = InProcessRunner;
99 let status = runner.run(&workflow, 10u32).await.unwrap();
100 assert!(matches!(status, WorkflowStatus::Completed));
102 }
103
104 #[tokio::test]
105 async fn test_task_failure_returns_failed_status() {
106 let workflow = WorkflowBuilder::new(ctx())
107 .then("fail", |_i: u32| async move {
108 Err::<u32, sayiir_core::error::BoxError>("intentional failure".into())
109 })
110 .build()
111 .unwrap();
112
113 let runner = InProcessRunner;
114 let status = runner.run(&workflow, 1u32).await.unwrap();
115 match status {
116 WorkflowStatus::Failed(e) => {
117 assert!(e.contains("intentional failure"));
118 }
119 _ => panic!("Expected Failed status"),
120 }
121 }
122
123 #[tokio::test]
124 async fn test_fork_join() {
125 let workflow = WorkflowBuilder::new(ctx())
126 .then("prepare", |i: u32| async move { Ok(i) })
127 .branches(|b| {
128 b.add("double", |i: u32| async move { Ok(i * 2) });
129 b.add("add_ten", |i: u32| async move { Ok(i + 10) });
130 })
131 .join("combine", |outputs: BranchOutputs<JsonCodec>| async move {
132 let doubled: u32 = outputs.get("double")?;
133 let added: u32 = outputs.get("add_ten")?;
134 Ok(doubled + added)
135 })
136 .build()
137 .unwrap();
138
139 let runner = InProcessRunner;
140 let status = runner.run(&workflow, 5u32).await.unwrap();
141 assert!(matches!(status, WorkflowStatus::Completed));
143 }
144
145 #[tokio::test]
146 async fn test_failure_in_chain_propagates() {
147 let workflow = WorkflowBuilder::new(ctx())
148 .then("step1", |i: u32| async move { Ok(i + 1) })
149 .then("fail_step", |_i: u32| async move {
150 Err::<u32, sayiir_core::error::BoxError>("step2 failed".into())
151 })
152 .then("step3", |i: u32| async move { Ok(i * 2) })
153 .build()
154 .unwrap();
155
156 let runner = InProcessRunner;
157 let status = runner.run(&workflow, 1u32).await.unwrap();
158 match status {
159 WorkflowStatus::Failed(e) => {
160 assert!(e.contains("step2 failed"));
161 }
162 _ => panic!("Expected Failed status"),
163 }
164 }
165
166 #[tokio::test]
167 async fn test_with_custom_metadata() {
168 let ctx = WorkflowContext::new(
169 "meta-workflow",
170 Arc::new(JsonCodec),
171 Arc::new("my-metadata".to_string()),
172 );
173 let workflow = WorkflowBuilder::new(ctx)
174 .then("task", |i: u32| async move { Ok(i + 1) })
175 .build()
176 .unwrap();
177
178 assert_eq!(workflow.workflow_id(), "meta-workflow");
179 assert_eq!(**workflow.metadata(), "my-metadata");
180
181 let runner = InProcessRunner;
182 let status = runner.run(&workflow, 1u32).await.unwrap();
183 assert!(matches!(status, WorkflowStatus::Completed));
184 }
185
186 #[tokio::test]
191 async fn test_delay_short_completes() {
192 let workflow = WorkflowBuilder::new(ctx())
193 .then("step1", |i: u32| async move { Ok(i + 1) })
194 .delay("short_wait", std::time::Duration::from_millis(1))
195 .then("step2", |i: u32| async move { Ok(i * 2) })
196 .build()
197 .unwrap();
198
199 let runner = InProcessRunner;
200 let status = runner.run(&workflow, 10u32).await.unwrap();
201 assert!(matches!(status, WorkflowStatus::Completed));
203 }
204
205 #[tokio::test]
206 async fn test_delay_only_completes() {
207 let workflow = WorkflowBuilder::new(ctx())
208 .delay("only_wait", std::time::Duration::from_millis(1))
209 .build()
210 .unwrap();
211
212 let runner = InProcessRunner;
213 let status = runner.run(&workflow, 42u32).await.unwrap();
214 assert!(matches!(status, WorkflowStatus::Completed));
215 }
216}