apalis_workflow/sequential/
workflow.rs1use std::marker::PhantomData;
2
3use apalis_core::{
4 backend::{Backend, BackendExt, codec::RawDataBackend},
5 error::BoxDynError,
6 task::{Task, metadata::MetadataExt},
7 worker::builder::{IntoWorkerService, WorkerService},
8};
9use futures::Sink;
10
11use crate::{
12 id_generator::GenerateId,
13 sequential::context::WorkflowContext,
14 sequential::router::WorkflowRouter,
15 sequential::service::WorkflowService,
16 sequential::step::{Identity, Layer, Stack, Step},
17};
18
19#[derive(Debug)]
21pub struct Workflow<Start, Current, Backend, T = Identity> {
22 pub(crate) inner: T,
23 pub(crate) name: String,
24 _marker: PhantomData<(Start, Current, Backend)>,
25}
26
27impl<Start, Backend> Workflow<Start, Start, Backend> {
28 #[allow(missing_docs)]
29 #[must_use]
30 pub fn new(name: &str) -> Self {
31 Self {
32 inner: Identity,
33 name: name.to_owned(),
34 _marker: PhantomData,
35 }
36 }
37}
38
39impl<Start, Cur, B, L> Workflow<Start, Cur, B, L> {
40 #[must_use]
46 pub fn add_step<S, Output>(self, step: S) -> Workflow<Start, Output, B, Stack<S, L>> {
47 Workflow {
48 inner: Stack::new(step, self.inner),
49 name: self.name,
50 _marker: PhantomData,
51 }
52 }
53
54 pub fn finalize<S>(self, root: S) -> Workflow<Start, Cur, B, L::Step>
56 where
57 S: Step<Cur, B>,
58 L: Layer<S>,
59 B: BackendExt,
60 {
61 Workflow {
62 inner: self.inner.layer(root),
63 name: self.name,
64 _marker: PhantomData,
65 }
66 }
67}
68
69impl<Start, Cur, B, L> Workflow<Start, Cur, B, L>
70where
71 B: BackendExt,
72{
73 pub fn build<N>(self) -> L::Step
75 where
76 L: Layer<RootStep<N>>,
77 {
78 let root = RootStep(std::marker::PhantomData);
79 self.inner.layer(root)
80 }
81}
82
83#[derive(Clone, Debug)]
85pub struct RootStep<Res>(std::marker::PhantomData<Res>);
86
87impl<Res> Default for RootStep<Res> {
88 fn default() -> Self {
89 Self(std::marker::PhantomData)
90 }
91}
92
93impl<Input, Current, B: BackendExt> Step<Input, B> for RootStep<Current> {
94 type Response = Current;
95 type Error = BoxDynError;
96 fn register(&mut self, _ctx: &mut WorkflowRouter<B>) -> Result<(), BoxDynError> {
97 Ok(())
99 }
100}
101
102impl<Input, Output, Current, B, Compact, Err, L>
103 IntoWorkerService<B, WorkflowService<B, Output>, Compact, B::Context>
104 for Workflow<Input, Current, B, L>
105where
106 B: BackendExt<Compact = Compact>
107 + Send
108 + Sync
109 + 'static
110 + Sink<Task<Compact, B::Context, B::IdType>, Error = Err>
111 + Unpin
112 + Clone,
113 Err: std::error::Error + Send + Sync + 'static,
114 B::Context: MetadataExt<WorkflowContext> + Send + Sync + 'static,
115 B::IdType: Send + 'static + Default + GenerateId,
116 B: Sync + Backend<Args = Compact, Error = Err>,
117 B::Compact: Send + Sync + 'static,
118 <B::Context as MetadataExt<WorkflowContext>>::Error: Into<BoxDynError>,
119 L: Layer<RootStep<Current>>,
120 L::Step: Step<Output, B>,
121{
122 type Backend = RawDataBackend<B>;
123 fn into_service(self, b: B) -> WorkerService<RawDataBackend<B>, WorkflowService<B, Output>> {
124 let mut ctx = WorkflowRouter::<B>::new();
125
126 let mut root = self.finalize(RootStep(std::marker::PhantomData));
127
128 root.inner
129 .register(&mut ctx)
130 .expect("Failed to register workflow steps");
131 WorkerService {
132 backend: RawDataBackend::new(b.clone()),
133 service: WorkflowService::new(ctx.steps, b),
134 }
135 }
136}