1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![doc = include_str!("../README.md")]
3#![warn(
4 missing_debug_implementations,
5 missing_docs,
6 rust_2018_idioms,
7 unreachable_pub
8)]
9
10use apalis_core::{error::BoxDynError, task::Task};
11
12use crate::router::{GoTo, StepResult};
13
14type BoxedService<Input, Output> = tower::util::BoxService<Input, Output, BoxDynError>;
15type SteppedService<Compact, Ctx, IdType> =
16 BoxedService<Task<Compact, Ctx, IdType>, GoTo<StepResult<Compact, IdType>>>;
17
18pub mod and_then;
20pub mod chain;
22pub mod context;
24#[allow(unused)]
26pub mod dag;
27pub mod delay;
29pub mod filter_map;
31pub mod fold;
33mod id_generator;
34pub mod router;
36pub mod service;
38pub mod sink;
40pub mod step;
42pub mod workflow;
44
45pub use {dag::DagExecutor, dag::DagFlow, sink::WorkflowSink, workflow::Workflow};
46
47#[cfg(test)]
48mod tests {
49 use std::{collections::HashMap, time::Duration};
50
51 use apalis_core::{
52 backend::json::JsonStorage,
53 task::{builder::TaskBuilder, task_id::TaskId},
54 task_fn::task_fn,
55 worker::{
56 builder::WorkerBuilder, context::WorkerContext, event::Event,
57 ext::event_listener::EventListenerExt,
58 },
59 };
60 use futures::SinkExt;
61 use serde_json::Value;
62
63 use crate::{and_then::AndThen, workflow::Workflow};
64
65 use super::*;
66
67 #[tokio::test]
68 async fn basic_workflow() {
69 let workflow = Workflow::new("and-then-workflow")
70 .and_then(async |input: u32| (input) as usize)
71 .delay_for(Duration::from_secs(1))
72 .and_then(async |input: usize| (input) as usize)
73 .delay_for(Duration::from_secs(1))
74 .add_step(AndThen::new(task_fn(async |input: usize| {
76 Ok::<_, BoxDynError>(input.to_string())
77 })))
78 .and_then(async |input: String, _task_id: TaskId| input.parse::<usize>())
79 .and_then(async |res: usize| {
80 Ok::<_, BoxDynError>((0..res).enumerate().collect::<HashMap<_, _>>())
81 })
82 .filter_map(async |(index, input): (usize, usize)| {
83 if input % 2 == 0 {
84 Some(index.to_string())
85 } else {
86 None
87 }
88 })
89 .fold(
90 async move |(acc, item): (usize, String), _wrk: WorkerContext| {
91 println!("Folding item {item} with acc {acc}");
92 let item = item.parse::<usize>().unwrap();
93 let acc = acc + item;
94 acc
95 },
96 )
97 .and_then(async |res: usize, wrk: WorkerContext| {
98 wrk.stop().unwrap();
99 println!("Completed with {res:?}");
100 });
101
102 let mut in_memory: JsonStorage<Value> = JsonStorage::new_temp().unwrap();
103
104 in_memory
105 .send(TaskBuilder::new(Value::from(17)).build())
106 .await
107 .unwrap();
108
109 let worker = WorkerBuilder::new("rango-tango")
110 .backend(in_memory)
111 .on_event(|ctx, ev| {
112 println!("On Event = {ev:?}");
113 if matches!(ev, Event::Error(_)) {
114 ctx.stop().unwrap();
115 }
116 })
117 .build(workflow);
118 worker.run().await.unwrap();
119 }
120}