apalis_workflow/
workflow.rs

1use std::marker::PhantomData;
2
3use apalis_core::{
4    backend::{Backend, BackendExt, codec::RawDataBackend},
5    error::BoxDynError,
6    task::{Task, metadata::MetadataExt},
7    worker::builder::{IntoWorkerService, WorkerService},
8};
9use futures::Sink;
10
11use crate::{
12    context::WorkflowContext,
13    id_generator::GenerateId,
14    router::WorkflowRouter,
15    service::WorkflowService,
16    step::{Identity, Layer, Stack, Step},
17};
18
19/// A workflow represents a sequence of steps to be executed in order.
20#[derive(Debug)]
21pub struct Workflow<Start, Current, Backend, T = Identity> {
22    pub(crate) inner: T,
23    pub(crate) name: String,
24    _marker: PhantomData<(Start, Current, Backend)>,
25}
26
27impl<Start, Backend> Workflow<Start, Start, Backend> {
28    #[allow(missing_docs)]
29    #[must_use]
30    pub fn new(name: &str) -> Self {
31        Self {
32            inner: Identity,
33            name: name.to_owned(),
34            _marker: PhantomData,
35        }
36    }
37}
38
39impl<Start, Cur, B, L> Workflow<Start, Cur, B, L> {
40    /// Adds a new step to the workflow pipeline.
41    pub fn add_step<S, Output>(self, step: S) -> Workflow<Start, Output, B, Stack<S, L>> {
42        Workflow {
43            inner: Stack::new(step, self.inner),
44            name: self.name,
45            _marker: PhantomData,
46        }
47    }
48
49    /// Finalizes the workflow by attaching a root step.
50    pub fn finalize<S>(self, root: S) -> Workflow<Start, Cur, B, L::Step>
51    where
52        S: Step<Cur, B>,
53        L: Layer<S>,
54        B: BackendExt,
55    {
56        Workflow {
57            inner: self.inner.layer(root),
58            name: self.name,
59            _marker: PhantomData,
60        }
61    }
62}
63
64impl<Start, Cur, B, L> Workflow<Start, Cur, B, L>
65where
66    B: BackendExt,
67{
68    /// Builds the workflow by layering the root step.
69    pub fn build<N>(self) -> L::Step
70    where
71        L: Layer<RootStep<N>>,
72    {
73        let root = RootStep(std::marker::PhantomData);
74        self.inner.layer(root)
75    }
76}
77
78/// The root step of a workflow.
79#[derive(Clone, Debug)]
80pub struct RootStep<Res>(std::marker::PhantomData<Res>);
81
82impl<Res> Default for RootStep<Res> {
83    fn default() -> Self {
84        Self(std::marker::PhantomData)
85    }
86}
87
88impl<Input, Current, B: BackendExt> Step<Input, B> for RootStep<Current> {
89    type Response = Current;
90    type Error = BoxDynError;
91    fn register(&mut self, _ctx: &mut WorkflowRouter<B>) -> Result<(), BoxDynError> {
92        // TODO: Implement runtime checks to ensure Inputs and Outputs are compatible
93        Ok(())
94    }
95}
96
97impl<Input, Output, Current, B, Compact, Err, L>
98    IntoWorkerService<B, WorkflowService<B, Output>, Compact, B::Context>
99    for Workflow<Input, Current, B, L>
100where
101    B: BackendExt<Compact = Compact>
102        + Send
103        + Sync
104        + 'static
105        + Sink<Task<Compact, B::Context, B::IdType>, Error = Err>
106        + Unpin
107        + Clone,
108    Err: std::error::Error + Send + Sync + 'static,
109    B::Context: MetadataExt<WorkflowContext> + Send + Sync + 'static,
110    B::IdType: Send + 'static + Default + GenerateId,
111    B: Sync + Backend<Args = Compact, Error = Err>,
112    B::Compact: Send + Sync + 'static,
113    <B::Context as MetadataExt<WorkflowContext>>::Error: Into<BoxDynError>,
114    L: Layer<RootStep<Current>>,
115    L::Step: Step<Output, B>,
116{
117    type Backend = RawDataBackend<B>;
118    fn into_service(self, b: B) -> WorkerService<RawDataBackend<B>, WorkflowService<B, Output>> {
119        let mut ctx = WorkflowRouter::<B>::new();
120
121        let mut root = self.finalize(RootStep(std::marker::PhantomData));
122
123        root.inner
124            .register(&mut ctx)
125            .expect("Failed to register workflow steps");
126        WorkerService {
127            backend: RawDataBackend::new(b.clone()),
128            service: WorkflowService::new(ctx.steps, b),
129        }
130    }
131}