1use std::marker::PhantomData;
2
3use apalis_core::{
4 backend::{Backend, BackendExt, codec::RawDataBackend},
5 error::BoxDynError,
6 task::{Task, metadata::MetadataExt},
7 worker::builder::{IntoWorkerService, WorkerService},
8};
9use futures::Sink;
10
11use crate::{
12 context::WorkflowContext,
13 id_generator::GenerateId,
14 router::WorkflowRouter,
15 service::WorkflowService,
16 step::{Identity, Layer, Stack, Step},
17};
18
19#[derive(Debug)]
21pub struct Workflow<Start, Current, Backend, T = Identity> {
22 pub(crate) inner: T,
23 pub(crate) name: String,
24 _marker: PhantomData<(Start, Current, Backend)>,
25}
26
27impl<Start, Backend> Workflow<Start, Start, Backend> {
28 #[allow(missing_docs)]
29 #[must_use]
30 pub fn new(name: &str) -> Self {
31 Self {
32 inner: Identity,
33 name: name.to_owned(),
34 _marker: PhantomData,
35 }
36 }
37}
38
39impl<Start, Cur, B, L> Workflow<Start, Cur, B, L> {
40 pub fn add_step<S, Output>(self, step: S) -> Workflow<Start, Output, B, Stack<S, L>> {
42 Workflow {
43 inner: Stack::new(step, self.inner),
44 name: self.name,
45 _marker: PhantomData,
46 }
47 }
48
49 pub fn finalize<S>(self, root: S) -> Workflow<Start, Cur, B, L::Step>
51 where
52 S: Step<Cur, B>,
53 L: Layer<S>,
54 B: BackendExt,
55 {
56 Workflow {
57 inner: self.inner.layer(root),
58 name: self.name,
59 _marker: PhantomData,
60 }
61 }
62}
63
64impl<Start, Cur, B, L> Workflow<Start, Cur, B, L>
65where
66 B: BackendExt,
67{
68 pub fn build<N>(self) -> L::Step
70 where
71 L: Layer<RootStep<N>>,
72 {
73 let root = RootStep(std::marker::PhantomData);
74 self.inner.layer(root)
75 }
76}
77
78#[derive(Clone, Debug)]
80pub struct RootStep<Res>(std::marker::PhantomData<Res>);
81
82impl<Res> Default for RootStep<Res> {
83 fn default() -> Self {
84 Self(std::marker::PhantomData)
85 }
86}
87
88impl<Input, Current, B: BackendExt> Step<Input, B> for RootStep<Current> {
89 type Response = Current;
90 type Error = BoxDynError;
91 fn register(&mut self, _ctx: &mut WorkflowRouter<B>) -> Result<(), BoxDynError> {
92 Ok(())
94 }
95}
96
97impl<Input, Output, Current, B, Compact, Err, L>
98 IntoWorkerService<B, WorkflowService<B, Output>, Compact, B::Context>
99 for Workflow<Input, Current, B, L>
100where
101 B: BackendExt<Compact = Compact>
102 + Send
103 + Sync
104 + 'static
105 + Sink<Task<Compact, B::Context, B::IdType>, Error = Err>
106 + Unpin
107 + Clone,
108 Err: std::error::Error + Send + Sync + 'static,
109 B::Context: MetadataExt<WorkflowContext> + Send + Sync + 'static,
110 B::IdType: Send + 'static + Default + GenerateId,
111 B: Sync + Backend<Args = Compact, Error = Err>,
112 B::Compact: Send + Sync + 'static,
113 <B::Context as MetadataExt<WorkflowContext>>::Error: Into<BoxDynError>,
114 L: Layer<RootStep<Current>>,
115 L::Step: Step<Output, B>,
116{
117 type Backend = RawDataBackend<B>;
118 fn into_service(self, b: B) -> WorkerService<RawDataBackend<B>, WorkflowService<B, Output>> {
119 let mut ctx = WorkflowRouter::<B>::new();
120
121 let mut root = self.finalize(RootStep(std::marker::PhantomData));
122
123 root.inner
124 .register(&mut ctx)
125 .expect("Failed to register workflow steps");
126 WorkerService {
127 backend: RawDataBackend::new(b.clone()),
128 service: WorkflowService::new(ctx.steps, b),
129 }
130 }
131}