apalis_workflow/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![doc = include_str!("../README.md")]
3#![warn(
4    missing_debug_implementations,
5    missing_docs,
6    rust_2018_idioms,
7    unreachable_pub
8)]
9
10use apalis_core::{error::BoxDynError, task::Task};
11
12use crate::router::{GoTo, StepResult};
13
14type BoxedService<Input, Output> = tower::util::BoxService<Input, Output, BoxDynError>;
15type SteppedService<Compact, Ctx, IdType> =
16    BoxedService<Task<Compact, Ctx, IdType>, GoTo<StepResult<Compact, IdType>>>;
17
18/// combinator for sequential workflow execution.
19pub mod and_then;
20/// combinator for chaining multiple workflows.
21pub mod chain;
22/// utilities for workflow context management.
23pub mod context;
24/// utilities for directed acyclic graph workflows.
25#[allow(unused)]
26pub mod dag;
27/// utilities for introducing delays in workflow execution.
28pub mod delay;
29/// combinator for filtering and mapping workflow items.
30pub mod filter_map;
31/// combinator for folding over workflow items.
32pub mod fold;
33mod id_generator;
34/// utilities for workflow routing.
35pub mod router;
36/// utilities for workflow service orchestration.
37pub mod service;
38/// utilities for workflow sinks.
39pub mod sink;
40/// utilities for workflow steps.
41pub mod step;
42/// workflow definitions.
43pub mod workflow;
44
45pub use {dag::DagExecutor, dag::DagFlow, sink::WorkflowSink, workflow::Workflow};
46
47#[cfg(test)]
48mod tests {
49    use std::{collections::HashMap, time::Duration};
50
51    use apalis_core::{
52        backend::json::JsonStorage,
53        task::{builder::TaskBuilder, task_id::TaskId},
54        task_fn::task_fn,
55        worker::{
56            builder::WorkerBuilder, context::WorkerContext, event::Event,
57            ext::event_listener::EventListenerExt,
58        },
59    };
60    use futures::SinkExt;
61    use serde_json::Value;
62
63    use crate::{and_then::AndThen, workflow::Workflow};
64
65    use super::*;
66
67    #[tokio::test]
68    async fn basic_workflow() {
69        let workflow = Workflow::new("and-then-workflow")
70            .and_then(async |input: u32| (input) as usize)
71            .delay_for(Duration::from_secs(1))
72            .and_then(async |input: usize| (input) as usize)
73            .delay_for(Duration::from_secs(1))
74            // .delay_with(|_: Task<usize, _, _>| Duration::from_secs(1))
75            .add_step(AndThen::new(task_fn(async |input: usize| {
76                Ok::<_, BoxDynError>(input.to_string())
77            })))
78            .and_then(async |input: String, _task_id: TaskId| input.parse::<usize>())
79            .and_then(async |res: usize| {
80                Ok::<_, BoxDynError>((0..res).enumerate().collect::<HashMap<_, _>>())
81            })
82            .filter_map(async |(index, input): (usize, usize)| {
83                if input % 2 == 0 {
84                    Some(index.to_string())
85                } else {
86                    None
87                }
88            })
89            .fold(
90                async move |(acc, item): (usize, String), _wrk: WorkerContext| {
91                    println!("Folding item {item} with acc {acc}");
92                    let item = item.parse::<usize>().unwrap();
93                    let acc = acc + item;
94                    acc
95                },
96            )
97            .and_then(async |res: usize, wrk: WorkerContext| {
98                wrk.stop().unwrap();
99                println!("Completed with {res:?}");
100            });
101
102        let mut in_memory: JsonStorage<Value> = JsonStorage::new_temp().unwrap();
103
104        in_memory
105            .send(TaskBuilder::new(Value::from(17)).build())
106            .await
107            .unwrap();
108
109        let worker = WorkerBuilder::new("rango-tango")
110            .backend(in_memory)
111            .on_event(|ctx, ev| {
112                println!("On Event = {ev:?}");
113                if matches!(ev, Event::Error(_)) {
114                    ctx.stop().unwrap();
115                }
116            })
117            .build(workflow);
118        worker.run().await.unwrap();
119    }
120}