apalis_workflow/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![doc = include_str!("../README.md")]
3#![warn(
4    missing_debug_implementations,
5    missing_docs,
6    rust_2018_idioms,
7    unreachable_pub
8)]
9
10use apalis_core::{error::BoxDynError, task::Task};
11
12use crate::sequential::router::{GoTo, StepResult};
13
14type BoxedService<Input, Output> = tower::util::BoxCloneSyncService<Input, Output, BoxDynError>;
15type SteppedService<Compact, Ctx, IdType> =
16    BoxedService<Task<Compact, Ctx, IdType>, GoTo<StepResult<Compact, IdType>>>;
17
18type DagService<Compact, Ctx, IdType> = BoxedService<Task<Compact, Ctx, IdType>, Compact>;
19
20/// combinator for chaining multiple workflows.
21pub mod composite;
22/// utilities for directed acyclic graph workflows.
23pub mod dag;
24mod id_generator;
25/// utilities for workflow steps.
26pub mod sequential;
27/// utilities for workflow sinks.
28pub mod sink;
29
30pub use {
31    dag::DagFlow, dag::executor::DagExecutor, sequential::workflow::Workflow, sink::WorkflowSink,
32};
33
34#[cfg(test)]
35mod tests {
36    use std::{collections::HashMap, time::Duration};
37
38    use apalis_core::{
39        task::{
40            builder::TaskBuilder,
41            task_id::{RandomId, TaskId},
42        },
43        task_fn::task_fn,
44        worker::{
45            builder::WorkerBuilder, context::WorkerContext, event::Event,
46            ext::event_listener::EventListenerExt,
47        },
48    };
49    use apalis_file_storage::JsonStorage;
50    use futures::SinkExt;
51    use serde_json::Value;
52
53    use crate::sequential::{AndThen, repeat_until::RepeaterState, workflow::Workflow};
54
55    use super::*;
56
57    #[tokio::test]
58    async fn basic_workflow() {
59        let workflow = Workflow::new("and-then-workflow")
60            .and_then(async |input: u32| (input) as usize)
61            .delay_for(Duration::from_secs(1))
62            .and_then(async |input: usize| (input) as usize)
63            .delay_for(Duration::from_secs(1))
64            .delay_with(|_| Duration::from_secs(1))
65            .repeat_until(|res: usize, state: RepeaterState<RandomId>| async move {
66                println!("Iteration {}: got result {}", state.iterations(), res);
67                // Repeat until we have iterated 3 times
68                // Of course, in a real-world scenario, the condition would be based on `res`
69                if state.iterations() < 3 {
70                    None
71                } else {
72                    Some(res)
73                }
74            })
75            .add_step(AndThen::new(task_fn(async |input: usize| {
76                Ok::<_, BoxDynError>(input.to_string())
77            })))
78            .and_then(async |input: String, _task_id: TaskId<_>| input.parse::<usize>())
79            .and_then(async |res: usize| {
80                Ok::<_, BoxDynError>((0..res).enumerate().collect::<HashMap<_, _>>())
81            })
82            .filter_map(async |(index, input): (usize, usize)| {
83                if input % 2 == 0 {
84                    Some(index.to_string())
85                } else {
86                    None
87                }
88            })
89            .fold(
90                async move |(acc, item): (usize, String), _wrk: WorkerContext| {
91                    println!("Folding item {item} with acc {acc}");
92                    let item = item.parse::<usize>().unwrap();
93                    let acc = acc + item;
94                    acc
95                },
96            )
97            .and_then(async |res: usize, wrk: WorkerContext| {
98                wrk.stop().unwrap();
99                println!("Completed with {res:?}");
100            });
101
102        let mut backend: JsonStorage<Value> = JsonStorage::new_temp().unwrap();
103
104        backend
105            .send(TaskBuilder::new(Value::from(17)).build())
106            .await
107            .unwrap();
108
109        let worker = WorkerBuilder::new("rango-tango")
110            .backend(backend)
111            .on_event(|ctx, ev| {
112                println!("On Event = {ev:?}");
113                if matches!(ev, Event::Error(_)) {
114                    ctx.stop().unwrap();
115                }
116            })
117            .build(workflow);
118        worker.run().await.unwrap();
119    }
120}