sayiir_runtime/runner/
in_process.rs1use super::WorkflowRunner;
2use crate::execution::execute_continuation_async;
3use sayiir_core::codec::sealed;
4use sayiir_core::codec::{Codec, EnvelopeCodec};
5use sayiir_core::workflow::{Workflow, WorkflowStatus};
6
7#[derive(Default)]
28pub struct InProcessRunner;
29
30impl WorkflowRunner for InProcessRunner {
31 fn run<'w, C, Input, M>(
32 &self,
33 workflow: &'w Workflow<C, Input, M>,
34 input: Input,
35 ) -> impl std::future::Future<Output = Result<WorkflowStatus, crate::error::RuntimeError>> + Send + 'w
36 where
37 Input: Send + 'static,
38 M: Send + Sync + 'static,
39 C: Codec + EnvelopeCodec + sealed::EncodeValue<Input>,
40 {
41 let continuation = workflow.continuation();
42 let codec = workflow.context().codec.clone();
43 async move {
44 let input_bytes = codec.encode(&input)?;
45 match execute_continuation_async(continuation, input_bytes, &codec).await {
46 Ok(_) => Ok(WorkflowStatus::Completed),
47 Err(e) => Ok(WorkflowStatus::Failed(e.to_string())),
48 }
49 }
50 }
51}
52
53#[cfg(test)]
54#[allow(
55 clippy::unwrap_used,
56 clippy::expect_used,
57 clippy::panic,
58 clippy::indexing_slicing
59)]
60mod tests {
61 use super::*;
62 use crate::serialization::JsonCodec;
63 use sayiir_core::context::WorkflowContext;
64 use sayiir_core::task::BranchOutputs;
65 use sayiir_core::workflow::WorkflowBuilder;
66 use std::sync::Arc;
67
68 fn ctx() -> WorkflowContext<JsonCodec, ()> {
69 WorkflowContext::new("test-workflow", Arc::new(JsonCodec), Arc::new(()))
70 }
71
72 #[tokio::test]
73 async fn test_single_task() {
74 let workflow = WorkflowBuilder::new(ctx())
75 .then("add_one", |i: u32| async move { Ok(i + 1) })
76 .build()
77 .unwrap();
78
79 let runner = InProcessRunner;
80 let status = runner.run(&workflow, 5u32).await.unwrap();
81 assert!(matches!(status, WorkflowStatus::Completed));
82 }
83
84 #[tokio::test]
85 async fn test_chained_tasks() {
86 let workflow = WorkflowBuilder::new(ctx())
87 .then("add_one", |i: u32| async move { Ok(i + 1) })
88 .then("double", |i: u32| async move { Ok(i * 2) })
89 .then("to_string", |i: u32| async move { Ok(i.to_string()) })
90 .build()
91 .unwrap();
92
93 let runner = InProcessRunner;
94 let status = runner.run(&workflow, 10u32).await.unwrap();
95 assert!(matches!(status, WorkflowStatus::Completed));
97 }
98
99 #[tokio::test]
100 async fn test_task_failure_returns_failed_status() {
101 let workflow = WorkflowBuilder::new(ctx())
102 .then("fail", |_i: u32| async move {
103 Err::<u32, sayiir_core::error::BoxError>("intentional failure".into())
104 })
105 .build()
106 .unwrap();
107
108 let runner = InProcessRunner;
109 let status = runner.run(&workflow, 1u32).await.unwrap();
110 match status {
111 WorkflowStatus::Failed(e) => {
112 assert!(e.contains("intentional failure"));
113 }
114 _ => panic!("Expected Failed status"),
115 }
116 }
117
118 #[tokio::test]
119 async fn test_fork_join() {
120 let workflow = WorkflowBuilder::new(ctx())
121 .then("prepare", |i: u32| async move { Ok(i) })
122 .branches(|b| {
123 b.add("double", |i: u32| async move { Ok(i * 2) });
124 b.add("add_ten", |i: u32| async move { Ok(i + 10) });
125 })
126 .join("combine", |outputs: BranchOutputs<JsonCodec>| async move {
127 let doubled: u32 = outputs.get_by_id("double")?;
128 let added: u32 = outputs.get_by_id("add_ten")?;
129 Ok(doubled + added)
130 })
131 .build()
132 .unwrap();
133
134 let runner = InProcessRunner;
135 let status = runner.run(&workflow, 5u32).await.unwrap();
136 assert!(matches!(status, WorkflowStatus::Completed));
138 }
139
140 #[tokio::test]
141 async fn test_failure_in_chain_propagates() {
142 let workflow = WorkflowBuilder::new(ctx())
143 .then("step1", |i: u32| async move { Ok(i + 1) })
144 .then("fail_step", |_i: u32| async move {
145 Err::<u32, sayiir_core::error::BoxError>("step2 failed".into())
146 })
147 .then("step3", |i: u32| async move { Ok(i * 2) })
148 .build()
149 .unwrap();
150
151 let runner = InProcessRunner;
152 let status = runner.run(&workflow, 1u32).await.unwrap();
153 match status {
154 WorkflowStatus::Failed(e) => {
155 assert!(e.contains("step2 failed"));
156 }
157 _ => panic!("Expected Failed status"),
158 }
159 }
160
161 #[tokio::test]
162 async fn test_with_custom_metadata() {
163 let ctx = WorkflowContext::new(
164 "meta-workflow",
165 Arc::new(JsonCodec),
166 Arc::new("my-metadata".to_string()),
167 );
168 let workflow = WorkflowBuilder::new(ctx)
169 .then("task", |i: u32| async move { Ok(i + 1) })
170 .build()
171 .unwrap();
172
173 assert_eq!(workflow.workflow_id(), "meta-workflow");
174 assert_eq!(**workflow.metadata(), "my-metadata");
175
176 let runner = InProcessRunner;
177 let status = runner.run(&workflow, 1u32).await.unwrap();
178 assert!(matches!(status, WorkflowStatus::Completed));
179 }
180
181 #[tokio::test]
186 async fn test_delay_short_completes() {
187 let workflow = WorkflowBuilder::new(ctx())
188 .then("step1", |i: u32| async move { Ok(i + 1) })
189 .delay("short_wait", std::time::Duration::from_millis(1))
190 .then("step2", |i: u32| async move { Ok(i * 2) })
191 .build()
192 .unwrap();
193
194 let runner = InProcessRunner;
195 let status = runner.run(&workflow, 10u32).await.unwrap();
196 assert!(matches!(status, WorkflowStatus::Completed));
198 }
199
200 #[tokio::test]
201 async fn test_delay_only_completes() {
202 let workflow = WorkflowBuilder::new(ctx())
203 .delay("only_wait", std::time::Duration::from_millis(1))
204 .build()
205 .unwrap();
206
207 let runner = InProcessRunner;
208 let status = runner.run(&workflow, 42u32).await.unwrap();
209 assert!(matches!(status, WorkflowStatus::Completed));
210 }
211}