apalis_workflow/sequential/
workflow.rs

1use std::marker::PhantomData;
2
3use apalis_core::{
4    backend::{Backend, BackendExt, codec::RawDataBackend},
5    error::BoxDynError,
6    task::{Task, metadata::MetadataExt},
7    worker::builder::{IntoWorkerService, WorkerService},
8};
9use futures::Sink;
10
11use crate::{
12    id_generator::GenerateId,
13    sequential::context::WorkflowContext,
14    sequential::router::WorkflowRouter,
15    sequential::service::WorkflowService,
16    sequential::step::{Identity, Layer, Stack, Step},
17};
18
19/// A workflow represents a sequence of steps to be executed in order.
20#[derive(Debug)]
21pub struct Workflow<Start, Current, Backend, T = Identity> {
22    pub(crate) inner: T,
23    pub(crate) name: String,
24    _marker: PhantomData<(Start, Current, Backend)>,
25}
26
27impl<Start, Backend> Workflow<Start, Start, Backend> {
28    #[allow(missing_docs)]
29    #[must_use]
30    pub fn new(name: &str) -> Self {
31        Self {
32            inner: Identity,
33            name: name.to_owned(),
34            _marker: PhantomData,
35        }
36    }
37}
38
39impl<Start, Cur, B, L> Workflow<Start, Cur, B, L> {
40    /// Adds a new step to the workflow pipeline.
41    ///
42    /// This method should be used with caution, as it allows adding arbitrary steps
43    /// and manipulating types. It is recommended to use higher-level abstractions for
44    /// common workflow patterns.
45    #[must_use]
46    pub fn add_step<S, Output>(self, step: S) -> Workflow<Start, Output, B, Stack<S, L>> {
47        Workflow {
48            inner: Stack::new(step, self.inner),
49            name: self.name,
50            _marker: PhantomData,
51        }
52    }
53
54    /// Finalizes the workflow by attaching a root step.
55    pub fn finalize<S>(self, root: S) -> Workflow<Start, Cur, B, L::Step>
56    where
57        S: Step<Cur, B>,
58        L: Layer<S>,
59        B: BackendExt,
60    {
61        Workflow {
62            inner: self.inner.layer(root),
63            name: self.name,
64            _marker: PhantomData,
65        }
66    }
67}
68
69impl<Start, Cur, B, L> Workflow<Start, Cur, B, L>
70where
71    B: BackendExt,
72{
73    /// Builds the workflow by layering the root step.
74    pub fn build<N>(self) -> L::Step
75    where
76        L: Layer<RootStep<N>>,
77    {
78        let root = RootStep(std::marker::PhantomData);
79        self.inner.layer(root)
80    }
81}
82
83/// The root step of a workflow.
84#[derive(Clone, Debug)]
85pub struct RootStep<Res>(std::marker::PhantomData<Res>);
86
87impl<Res> Default for RootStep<Res> {
88    fn default() -> Self {
89        Self(std::marker::PhantomData)
90    }
91}
92
93impl<Input, Current, B: BackendExt> Step<Input, B> for RootStep<Current> {
94    type Response = Current;
95    type Error = BoxDynError;
96    fn register(&mut self, _ctx: &mut WorkflowRouter<B>) -> Result<(), BoxDynError> {
97        // TODO: Implement runtime checks to ensure Inputs and Outputs are compatible
98        Ok(())
99    }
100}
101
102impl<Input, Output, Current, B, Compact, Err, L>
103    IntoWorkerService<B, WorkflowService<B, Output>, Compact, B::Context>
104    for Workflow<Input, Current, B, L>
105where
106    B: BackendExt<Compact = Compact>
107        + Send
108        + Sync
109        + 'static
110        + Sink<Task<Compact, B::Context, B::IdType>, Error = Err>
111        + Unpin
112        + Clone,
113    Err: std::error::Error + Send + Sync + 'static,
114    B::Context: MetadataExt<WorkflowContext> + Send + Sync + 'static,
115    B::IdType: Send + 'static + Default + GenerateId,
116    B: Sync + Backend<Args = Compact, Error = Err>,
117    B::Compact: Send + Sync + 'static,
118    <B::Context as MetadataExt<WorkflowContext>>::Error: Into<BoxDynError>,
119    L: Layer<RootStep<Current>>,
120    L::Step: Step<Output, B>,
121{
122    type Backend = RawDataBackend<B>;
123    fn into_service(self, b: B) -> WorkerService<RawDataBackend<B>, WorkflowService<B, Output>> {
124        let mut ctx = WorkflowRouter::<B>::new();
125
126        let mut root = self.finalize(RootStep(std::marker::PhantomData));
127
128        root.inner
129            .register(&mut ctx)
130            .expect("Failed to register workflow steps");
131        WorkerService {
132            backend: RawDataBackend::new(b.clone()),
133            service: WorkflowService::new(ctx.steps, b),
134        }
135    }
136}