1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![doc = include_str!("../README.md")]
3#![warn(
4 missing_debug_implementations,
5 missing_docs,
6 rust_2018_idioms,
7 unreachable_pub
8)]
9
10use apalis_core::{error::BoxDynError, task::Task};
11
12use crate::sequential::router::{GoTo, StepResult};
13
14type BoxedService<Input, Output> = tower::util::BoxCloneSyncService<Input, Output, BoxDynError>;
15type SteppedService<Compact, Ctx, IdType> =
16 BoxedService<Task<Compact, Ctx, IdType>, GoTo<StepResult<Compact, IdType>>>;
17
18type DagService<Compact, Ctx, IdType> = BoxedService<Task<Compact, Ctx, IdType>, Compact>;
19
20pub mod composite;
22pub mod dag;
24mod id_generator;
25pub mod sequential;
27pub mod sink;
29
30pub use {
31 dag::DagFlow, dag::executor::DagExecutor, sequential::workflow::Workflow, sink::WorkflowSink,
32};
33
34#[cfg(test)]
35mod tests {
36 use std::{collections::HashMap, time::Duration};
37
38 use apalis_core::{
39 task::{
40 builder::TaskBuilder,
41 task_id::{RandomId, TaskId},
42 },
43 task_fn::task_fn,
44 worker::{
45 builder::WorkerBuilder, context::WorkerContext, event::Event,
46 ext::event_listener::EventListenerExt,
47 },
48 };
49 use apalis_file_storage::JsonStorage;
50 use futures::SinkExt;
51 use serde_json::Value;
52
53 use crate::sequential::{AndThen, repeat_until::RepeaterState, workflow::Workflow};
54
55 use super::*;
56
57 #[tokio::test]
58 async fn basic_workflow() {
59 let workflow = Workflow::new("and-then-workflow")
60 .and_then(async |input: u32| (input) as usize)
61 .delay_for(Duration::from_secs(1))
62 .and_then(async |input: usize| (input) as usize)
63 .delay_for(Duration::from_secs(1))
64 .delay_with(|_| Duration::from_secs(1))
65 .repeat_until(|res: usize, state: RepeaterState<RandomId>| async move {
66 println!("Iteration {}: got result {}", state.iterations(), res);
67 if state.iterations() < 3 {
70 None
71 } else {
72 Some(res)
73 }
74 })
75 .add_step(AndThen::new(task_fn(async |input: usize| {
76 Ok::<_, BoxDynError>(input.to_string())
77 })))
78 .and_then(async |input: String, _task_id: TaskId<_>| input.parse::<usize>())
79 .and_then(async |res: usize| {
80 Ok::<_, BoxDynError>((0..res).enumerate().collect::<HashMap<_, _>>())
81 })
82 .filter_map(async |(index, input): (usize, usize)| {
83 if input % 2 == 0 {
84 Some(index.to_string())
85 } else {
86 None
87 }
88 })
89 .fold(
90 async move |(acc, item): (usize, String), _wrk: WorkerContext| {
91 println!("Folding item {item} with acc {acc}");
92 let item = item.parse::<usize>().unwrap();
93 let acc = acc + item;
94 acc
95 },
96 )
97 .and_then(async |res: usize, wrk: WorkerContext| {
98 wrk.stop().unwrap();
99 println!("Completed with {res:?}");
100 });
101
102 let mut backend: JsonStorage<Value> = JsonStorage::new_temp().unwrap();
103
104 backend
105 .send(TaskBuilder::new(Value::from(17)).build())
106 .await
107 .unwrap();
108
109 let worker = WorkerBuilder::new("rango-tango")
110 .backend(backend)
111 .on_event(|ctx, ev| {
112 println!("On Event = {ev:?}");
113 if matches!(ev, Event::Error(_)) {
114 ctx.stop().unwrap();
115 }
116 })
117 .build(workflow);
118 worker.run().await.unwrap();
119 }
120}