apalis_workflow/steps/
then.rs

1use std::{fmt::Debug, marker::PhantomData};
2
3use apalis_core::{
4    backend::{codec::Codec, TaskSink},
5    error::BoxDynError,
6    task::{metadata::MetadataExt, Task},
7    task_fn::{task_fn, TaskFn},
8};
9use tower::Service;
10
11use crate::{context::StepContext, Step, WorkFlow, WorkflowError, WorkflowRequest};
12
13#[derive(Debug)]
14pub struct ThenStep<S, T> {
15    inner: S,
16    _marker: std::marker::PhantomData<T>,
17}
18
19impl<S: Clone, T> Clone for ThenStep<S, T> {
20    fn clone(&self) -> Self {
21        ThenStep {
22            inner: self.inner.clone(),
23            _marker: PhantomData,
24        }
25    }
26}
27
28impl<S, T> ThenStep<S, T> {
29    pub fn new(inner: S) -> Self {
30        Self {
31            inner,
32            _marker: std::marker::PhantomData,
33        }
34    }
35}
36
37impl<S, Current, O, E, FlowSink, Encode, Compact> Step<Current, FlowSink, Encode>
38    for ThenStep<S, Current>
39where
40    S: Service<Task<Current, FlowSink::Context, FlowSink::IdType>, Response = O, Error = E>
41        + Sync
42        + Send,
43    Current: Sync + Send + 'static,
44    S::Future: Send + 'static,
45    S::Error: Into<BoxDynError>,
46    E: Into<BoxDynError>,
47    O: Sync,
48    FlowSink: Sync + Unpin + TaskSink<Compact> + Send,
49    Current: Send,
50    FlowSink::Context: Send + Sync + Default + MetadataExt<WorkflowRequest>,
51    FlowSink::Error: Into<BoxDynError> + Send + 'static,
52    FlowSink::IdType: Default + Send,
53    Compact: Sync + Send,
54    Encode: Codec<Current, Compact = Compact> + Sync + Send + 'static,
55    Encode::Error: std::error::Error + Sync + Send + 'static,
56    <FlowSink::Context as MetadataExt<WorkflowRequest>>::Error:
57        std::error::Error + Sync + Send + 'static,
58{
59    type Response = S::Response;
60    type Error = WorkflowError;
61    async fn pre(
62        &self,
63        ctx: &mut StepContext<FlowSink, Encode>,
64        step: &Current,
65    ) -> Result<(), Self::Error> {
66        Ok(())
67    }
68
69    async fn run(
70        &mut self,
71        _: &StepContext<FlowSink, Encode>,
72        args: Task<Current, FlowSink::Context, FlowSink::IdType>,
73    ) -> Result<Self::Response, Self::Error> {
74        let res = self
75            .inner
76            .call(args)
77            .await
78            .map_err(|e| WorkflowError::SingleStepError(e.into()))?;
79        Ok(res)
80    }
81}
82
83impl<Input, Current, FlowSink, Encode, Compact> WorkFlow<Input, Current, FlowSink, Encode, Compact>
84where
85    Current: Send + 'static,
86    FlowSink: Send + Clone + Sync + 'static + Unpin + TaskSink<Compact>,
87{
88    pub fn then<F, O, E, FnArgs, CodecError>(
89        self,
90        then: F,
91    ) -> WorkFlow<Input, O, FlowSink, Encode, Compact>
92    where
93        O: Sync + Send + 'static,
94        E: Into<BoxDynError> + Send + Sync + 'static,
95        F: Send + 'static + Sync + Clone,
96        TaskFn<F, Current, FlowSink::Context, FnArgs>:
97            Service<Task<Current, FlowSink::Context, FlowSink::IdType>, Response = O, Error = E>,
98        FnArgs: std::marker::Send + 'static + Sync,
99        Current: std::marker::Send + 'static + Sync,
100        FlowSink::Context: Send + Sync + Default + 'static + MetadataExt<WorkflowRequest>,
101        FlowSink::Error: Into<BoxDynError> + Send + 'static,
102        <TaskFn<F, Current, FlowSink::Context, FnArgs> as Service<
103            Task<Current, FlowSink::Context, FlowSink::IdType>,
104        >>::Future: Send + 'static,
105        <TaskFn<F, Current, FlowSink::Context, FnArgs> as Service<
106            Task<Current, FlowSink::Context, FlowSink::IdType>,
107        >>::Error: Into<BoxDynError>,
108        FlowSink::IdType: Send + Default,
109        Compact: Sync + Send + 'static,
110        Encode: Codec<Current, Compact = Compact, Error = CodecError> + Send + Sync,
111        CodecError: Send + Sync + std::error::Error + 'static,
112        E: Into<BoxDynError>,
113        Encode: Codec<O, Compact = Compact, Error = CodecError> + 'static,
114        <FlowSink::Context as MetadataExt<WorkflowRequest>>::Error:
115            std::error::Error + Sync + Send + 'static,
116    {
117        self.add_step::<_, O, _, _>(ThenStep {
118            inner: task_fn::<F, Current, FlowSink::Context, FnArgs>(then),
119            _marker: PhantomData,
120        })
121    }
122}