Skip to main content

sayiir_runtime/runner/
in_process.rs

1use super::WorkflowRunner;
2use crate::execution::execute_continuation_async;
3use sayiir_core::codec::Codec;
4use sayiir_core::codec::sealed;
5use sayiir_core::context::with_context;
6use sayiir_core::workflow::{Workflow, WorkflowStatus};
7
8/// A workflow runner that executes workflows in-process.
9///
10/// This is an in-process implementation that executes workflows synchronously
11/// by following the continuation chain.
12///
13/// # Example
14///
15/// ```rust,no_run
16/// # use sayiir_runtime::prelude::*;
17/// # use std::sync::Arc;
18/// # async fn example() -> Result<(), sayiir_core::error::BoxError> {
19/// let ctx = WorkflowContext::new("my-workflow", Arc::new(RkyvCodec), Arc::new(()));
20/// let workflow = WorkflowBuilder::new(ctx)
21///     .then("test", |i: u32| async move { Ok(i + 1) })
22///     .build()?;
23/// let runner = InProcessRunner::default();
24/// let status = runner.run(&workflow, 1).await?;
25/// # Ok(())
26/// # }
27/// ```
28#[derive(Default)]
29pub struct InProcessRunner;
30
31impl WorkflowRunner for InProcessRunner {
32    fn run<'w, C, Input, M>(
33        &self,
34        workflow: &'w Workflow<C, Input, M>,
35        input: Input,
36    ) -> impl std::future::Future<Output = Result<WorkflowStatus, crate::error::RuntimeError>> + Send + 'w
37    where
38        Input: Send + 'static,
39        M: Send + Sync + 'static,
40        C: Codec + sealed::EncodeValue<Input>,
41    {
42        let context = workflow.context().clone();
43        let continuation = workflow.continuation();
44        let codec = context.codec.clone();
45        async move {
46            with_context(context, || async move {
47                let input_bytes = codec.encode(&input)?;
48                match execute_continuation_async(continuation, input_bytes).await {
49                    Ok(_) => Ok(WorkflowStatus::Completed),
50                    Err(e) => Ok(WorkflowStatus::Failed(e.to_string())),
51                }
52            })
53            .await
54        }
55    }
56}
57
58#[cfg(test)]
59#[allow(
60    clippy::unwrap_used,
61    clippy::expect_used,
62    clippy::panic,
63    clippy::indexing_slicing
64)]
65mod tests {
66    use super::*;
67    use crate::serialization::JsonCodec;
68    use sayiir_core::context::WorkflowContext;
69    use sayiir_core::task::BranchOutputs;
70    use sayiir_core::workflow::WorkflowBuilder;
71    use std::sync::Arc;
72
73    fn ctx() -> WorkflowContext<JsonCodec, ()> {
74        WorkflowContext::new("test-workflow", Arc::new(JsonCodec), Arc::new(()))
75    }
76
77    #[tokio::test]
78    async fn test_single_task() {
79        let workflow = WorkflowBuilder::new(ctx())
80            .then("add_one", |i: u32| async move { Ok(i + 1) })
81            .build()
82            .unwrap();
83
84        let runner = InProcessRunner;
85        let status = runner.run(&workflow, 5u32).await.unwrap();
86        assert!(matches!(status, WorkflowStatus::Completed));
87    }
88
89    #[tokio::test]
90    async fn test_chained_tasks() {
91        let workflow = WorkflowBuilder::new(ctx())
92            .then("add_one", |i: u32| async move { Ok(i + 1) })
93            .then("double", |i: u32| async move { Ok(i * 2) })
94            .then("to_string", |i: u32| async move { Ok(i.to_string()) })
95            .build()
96            .unwrap();
97
98        let runner = InProcessRunner;
99        let status = runner.run(&workflow, 10u32).await.unwrap();
100        // 10 + 1 = 11, 11 * 2 = 22, "22"
101        assert!(matches!(status, WorkflowStatus::Completed));
102    }
103
104    #[tokio::test]
105    async fn test_task_failure_returns_failed_status() {
106        let workflow = WorkflowBuilder::new(ctx())
107            .then("fail", |_i: u32| async move {
108                Err::<u32, sayiir_core::error::BoxError>("intentional failure".into())
109            })
110            .build()
111            .unwrap();
112
113        let runner = InProcessRunner;
114        let status = runner.run(&workflow, 1u32).await.unwrap();
115        match status {
116            WorkflowStatus::Failed(e) => {
117                assert!(e.contains("intentional failure"));
118            }
119            _ => panic!("Expected Failed status"),
120        }
121    }
122
123    #[tokio::test]
124    async fn test_fork_join() {
125        let workflow = WorkflowBuilder::new(ctx())
126            .then("prepare", |i: u32| async move { Ok(i) })
127            .branches(|b| {
128                b.add("double", |i: u32| async move { Ok(i * 2) });
129                b.add("add_ten", |i: u32| async move { Ok(i + 10) });
130            })
131            .join("combine", |outputs: BranchOutputs<JsonCodec>| async move {
132                let doubled: u32 = outputs.get("double")?;
133                let added: u32 = outputs.get("add_ten")?;
134                Ok(doubled + added)
135            })
136            .build()
137            .unwrap();
138
139        let runner = InProcessRunner;
140        let status = runner.run(&workflow, 5u32).await.unwrap();
141        // prepare: 5, double: 10, add_ten: 15, combine: 10+15=25
142        assert!(matches!(status, WorkflowStatus::Completed));
143    }
144
145    #[tokio::test]
146    async fn test_failure_in_chain_propagates() {
147        let workflow = WorkflowBuilder::new(ctx())
148            .then("step1", |i: u32| async move { Ok(i + 1) })
149            .then("fail_step", |_i: u32| async move {
150                Err::<u32, sayiir_core::error::BoxError>("step2 failed".into())
151            })
152            .then("step3", |i: u32| async move { Ok(i * 2) })
153            .build()
154            .unwrap();
155
156        let runner = InProcessRunner;
157        let status = runner.run(&workflow, 1u32).await.unwrap();
158        match status {
159            WorkflowStatus::Failed(e) => {
160                assert!(e.contains("step2 failed"));
161            }
162            _ => panic!("Expected Failed status"),
163        }
164    }
165
166    #[tokio::test]
167    async fn test_with_custom_metadata() {
168        let ctx = WorkflowContext::new(
169            "meta-workflow",
170            Arc::new(JsonCodec),
171            Arc::new("my-metadata".to_string()),
172        );
173        let workflow = WorkflowBuilder::new(ctx)
174            .then("task", |i: u32| async move { Ok(i + 1) })
175            .build()
176            .unwrap();
177
178        assert_eq!(workflow.workflow_id(), "meta-workflow");
179        assert_eq!(**workflow.metadata(), "my-metadata");
180
181        let runner = InProcessRunner;
182        let status = runner.run(&workflow, 1u32).await.unwrap();
183        assert!(matches!(status, WorkflowStatus::Completed));
184    }
185
186    // ========================================================================
187    // Delay tests
188    // ========================================================================
189
190    #[tokio::test]
191    async fn test_delay_short_completes() {
192        let workflow = WorkflowBuilder::new(ctx())
193            .then("step1", |i: u32| async move { Ok(i + 1) })
194            .delay("short_wait", std::time::Duration::from_millis(1))
195            .then("step2", |i: u32| async move { Ok(i * 2) })
196            .build()
197            .unwrap();
198
199        let runner = InProcessRunner;
200        let status = runner.run(&workflow, 10u32).await.unwrap();
201        // 10 + 1 = 11, delay (passthrough 11), 11 * 2 = 22
202        assert!(matches!(status, WorkflowStatus::Completed));
203    }
204
205    #[tokio::test]
206    async fn test_delay_only_completes() {
207        let workflow = WorkflowBuilder::new(ctx())
208            .delay("only_wait", std::time::Duration::from_millis(1))
209            .build()
210            .unwrap();
211
212        let runner = InProcessRunner;
213        let status = runner.run(&workflow, 42u32).await.unwrap();
214        assert!(matches!(status, WorkflowStatus::Completed));
215    }
216}