Skip to main content

sayiir_runtime/runner/
in_process.rs

1use super::WorkflowRunner;
2use crate::execution::execute_continuation_async;
3use sayiir_core::codec::sealed;
4use sayiir_core::codec::{Codec, EnvelopeCodec};
5use sayiir_core::workflow::{Workflow, WorkflowStatus};
6
7/// A workflow runner that executes workflows in-process.
8///
9/// This is an in-process implementation that executes workflows synchronously
10/// by following the continuation chain.
11///
12/// # Example
13///
14/// ```rust,no_run
15/// # use sayiir_runtime::prelude::*;
16/// # use std::sync::Arc;
17/// # async fn example() -> Result<(), sayiir_core::error::BoxError> {
18/// let ctx = WorkflowContext::new("my-workflow", Arc::new(RkyvCodec), Arc::new(()));
19/// let workflow = WorkflowBuilder::new(ctx)
20///     .then("test", |i: u32| async move { Ok(i + 1) })
21///     .build()?;
22/// let runner = InProcessRunner::default();
23/// let status = runner.run(&workflow, 1).await?;
24/// # Ok(())
25/// # }
26/// ```
27#[derive(Default)]
28pub struct InProcessRunner;
29
30impl WorkflowRunner for InProcessRunner {
31    fn run<'w, C, Input, M>(
32        &self,
33        workflow: &'w Workflow<C, Input, M>,
34        input: Input,
35    ) -> impl std::future::Future<Output = Result<WorkflowStatus, crate::error::RuntimeError>> + Send + 'w
36    where
37        Input: Send + 'static,
38        M: Send + Sync + 'static,
39        C: Codec + EnvelopeCodec + sealed::EncodeValue<Input>,
40    {
41        let continuation = workflow.continuation();
42        let codec = workflow.context().codec.clone();
43        async move {
44            let input_bytes = codec.encode(&input)?;
45            match execute_continuation_async(continuation, input_bytes, &codec).await {
46                Ok(_) => Ok(WorkflowStatus::Completed),
47                Err(e) => Ok(WorkflowStatus::Failed(e.to_string())),
48            }
49        }
50    }
51}
52
53#[cfg(test)]
54#[allow(
55    clippy::unwrap_used,
56    clippy::expect_used,
57    clippy::panic,
58    clippy::indexing_slicing
59)]
60mod tests {
61    use super::*;
62    use crate::serialization::JsonCodec;
63    use sayiir_core::context::WorkflowContext;
64    use sayiir_core::task::BranchOutputs;
65    use sayiir_core::workflow::WorkflowBuilder;
66    use std::sync::Arc;
67
68    fn ctx() -> WorkflowContext<JsonCodec, ()> {
69        WorkflowContext::new("test-workflow", Arc::new(JsonCodec), Arc::new(()))
70    }
71
72    #[tokio::test]
73    async fn test_single_task() {
74        let workflow = WorkflowBuilder::new(ctx())
75            .then("add_one", |i: u32| async move { Ok(i + 1) })
76            .build()
77            .unwrap();
78
79        let runner = InProcessRunner;
80        let status = runner.run(&workflow, 5u32).await.unwrap();
81        assert!(matches!(status, WorkflowStatus::Completed));
82    }
83
84    #[tokio::test]
85    async fn test_chained_tasks() {
86        let workflow = WorkflowBuilder::new(ctx())
87            .then("add_one", |i: u32| async move { Ok(i + 1) })
88            .then("double", |i: u32| async move { Ok(i * 2) })
89            .then("to_string", |i: u32| async move { Ok(i.to_string()) })
90            .build()
91            .unwrap();
92
93        let runner = InProcessRunner;
94        let status = runner.run(&workflow, 10u32).await.unwrap();
95        // 10 + 1 = 11, 11 * 2 = 22, "22"
96        assert!(matches!(status, WorkflowStatus::Completed));
97    }
98
99    #[tokio::test]
100    async fn test_task_failure_returns_failed_status() {
101        let workflow = WorkflowBuilder::new(ctx())
102            .then("fail", |_i: u32| async move {
103                Err::<u32, sayiir_core::error::BoxError>("intentional failure".into())
104            })
105            .build()
106            .unwrap();
107
108        let runner = InProcessRunner;
109        let status = runner.run(&workflow, 1u32).await.unwrap();
110        match status {
111            WorkflowStatus::Failed(e) => {
112                assert!(e.contains("intentional failure"));
113            }
114            _ => panic!("Expected Failed status"),
115        }
116    }
117
118    #[tokio::test]
119    async fn test_fork_join() {
120        let workflow = WorkflowBuilder::new(ctx())
121            .then("prepare", |i: u32| async move { Ok(i) })
122            .branches(|b| {
123                b.add("double", |i: u32| async move { Ok(i * 2) });
124                b.add("add_ten", |i: u32| async move { Ok(i + 10) });
125            })
126            .join("combine", |outputs: BranchOutputs<JsonCodec>| async move {
127                let doubled: u32 = outputs.get_by_id("double")?;
128                let added: u32 = outputs.get_by_id("add_ten")?;
129                Ok(doubled + added)
130            })
131            .build()
132            .unwrap();
133
134        let runner = InProcessRunner;
135        let status = runner.run(&workflow, 5u32).await.unwrap();
136        // prepare: 5, double: 10, add_ten: 15, combine: 10+15=25
137        assert!(matches!(status, WorkflowStatus::Completed));
138    }
139
140    #[tokio::test]
141    async fn test_failure_in_chain_propagates() {
142        let workflow = WorkflowBuilder::new(ctx())
143            .then("step1", |i: u32| async move { Ok(i + 1) })
144            .then("fail_step", |_i: u32| async move {
145                Err::<u32, sayiir_core::error::BoxError>("step2 failed".into())
146            })
147            .then("step3", |i: u32| async move { Ok(i * 2) })
148            .build()
149            .unwrap();
150
151        let runner = InProcessRunner;
152        let status = runner.run(&workflow, 1u32).await.unwrap();
153        match status {
154            WorkflowStatus::Failed(e) => {
155                assert!(e.contains("step2 failed"));
156            }
157            _ => panic!("Expected Failed status"),
158        }
159    }
160
161    #[tokio::test]
162    async fn test_with_custom_metadata() {
163        let ctx = WorkflowContext::new(
164            "meta-workflow",
165            Arc::new(JsonCodec),
166            Arc::new("my-metadata".to_string()),
167        );
168        let workflow = WorkflowBuilder::new(ctx)
169            .then("task", |i: u32| async move { Ok(i + 1) })
170            .build()
171            .unwrap();
172
173        assert_eq!(workflow.workflow_id(), "meta-workflow");
174        assert_eq!(**workflow.metadata(), "my-metadata");
175
176        let runner = InProcessRunner;
177        let status = runner.run(&workflow, 1u32).await.unwrap();
178        assert!(matches!(status, WorkflowStatus::Completed));
179    }
180
181    // ========================================================================
182    // Delay tests
183    // ========================================================================
184
185    #[tokio::test]
186    async fn test_delay_short_completes() {
187        let workflow = WorkflowBuilder::new(ctx())
188            .then("step1", |i: u32| async move { Ok(i + 1) })
189            .delay("short_wait", std::time::Duration::from_millis(1))
190            .then("step2", |i: u32| async move { Ok(i * 2) })
191            .build()
192            .unwrap();
193
194        let runner = InProcessRunner;
195        let status = runner.run(&workflow, 10u32).await.unwrap();
196        // 10 + 1 = 11, delay (passthrough 11), 11 * 2 = 22
197        assert!(matches!(status, WorkflowStatus::Completed));
198    }
199
200    #[tokio::test]
201    async fn test_delay_only_completes() {
202        let workflow = WorkflowBuilder::new(ctx())
203            .delay("only_wait", std::time::Duration::from_millis(1))
204            .build()
205            .unwrap();
206
207        let runner = InProcessRunner;
208        let status = runner.run(&workflow, 42u32).await.unwrap();
209        assert!(matches!(status, WorkflowStatus::Completed));
210    }
211}