apalis_workflow/steps/
then.rs

1use std::{fmt::Debug, marker::PhantomData};
2
3use apalis_core::{
4    backend::{Backend, WeakTaskSink, codec::Codec},
5    error::BoxDynError,
6    task::{Task, metadata::MetadataExt},
7    task_fn::{TaskFn, task_fn},
8};
9use futures::Sink;
10use tower::Service;
11
12use crate::{GenerateId, GoTo, Step, Workflow, WorkflowRequest, context::StepContext};
13
14#[derive(Debug)]
15pub struct ThenStep<S, T> {
16    inner: S,
17    _marker: std::marker::PhantomData<T>,
18}
19
20impl<S: Clone, T> Clone for ThenStep<S, T> {
21    fn clone(&self) -> Self {
22        ThenStep {
23            inner: self.inner.clone(),
24            _marker: PhantomData,
25        }
26    }
27}
28
29impl<S, T> ThenStep<S, T> {
30    pub fn new(inner: S) -> Self {
31        Self {
32            inner,
33            _marker: std::marker::PhantomData,
34        }
35    }
36}
37
38impl<S, Current, O, E, FlowSink, Encode, Compact> Step<Current, FlowSink, Encode>
39    for ThenStep<S, Current>
40where
41    S: Service<Task<Current, FlowSink::Context, FlowSink::IdType>, Response = O, Error = E>
42        + Sync
43        + Send,
44    Current: Sync + Send + 'static,
45    S::Future: Send + 'static,
46    S::Error: Into<BoxDynError>,
47    E: Into<BoxDynError> + Send + Sync + 'static,
48    O: Sync,
49    FlowSink: Sync + Unpin + WeakTaskSink<O> + Send,
50    Current: Send,
51    FlowSink::Context: Send + Sync + Default + MetadataExt<WorkflowRequest>,
52    FlowSink::Error: Into<BoxDynError> + Send + 'static,
53    FlowSink::IdType: GenerateId + Send,
54    Compact: Sync + Send,
55    Encode: Codec<Current, Compact = Compact> + Sync + Send + 'static,
56    Encode::Error: std::error::Error + Sync + Send + 'static,
57    <FlowSink::Context as MetadataExt<WorkflowRequest>>::Error:
58        std::error::Error + Sync + Send + 'static,
59{
60    type Response = S::Response;
61    type Error = S::Error;
62
63    async fn run(
64        &mut self,
65        _: &StepContext<FlowSink, Encode>,
66        args: Task<Current, FlowSink::Context, FlowSink::IdType>,
67    ) -> Result<GoTo<S::Response>, Self::Error> {
68        let res = self.inner.call(args).await?;
69        Ok(GoTo::Next(res))
70    }
71}
72
73impl<Input, Current, FlowSink, Encode, Compact>
74    Workflow<Input, Current, FlowSink, Encode, Compact, FlowSink::Context, FlowSink::IdType>
75where
76    Current: Send + 'static,
77    FlowSink: Send + Clone + Sync + 'static + Unpin + Backend,
78{
79    pub fn then<F, O, E, FnArgs, CodecError, DbError>(
80        self,
81        then: F,
82    ) -> Workflow<Input, O, FlowSink, Encode, Compact, FlowSink::Context, FlowSink::IdType>
83    where
84        O: Sync + Send + 'static,
85        E: Into<BoxDynError> + Send + Sync + 'static,
86        F: Send + 'static + Sync + Clone,
87        TaskFn<F, Current, FlowSink::Context, FnArgs>:
88            Service<Task<Current, FlowSink::Context, FlowSink::IdType>, Response = O, Error = E>,
89        FnArgs: std::marker::Send + 'static + Sync,
90        Current: std::marker::Send + 'static + Sync,
91        FlowSink::Context: Send + Sync + Default + 'static + MetadataExt<WorkflowRequest>,
92        DbError: std::error::Error + Send + Sync + 'static,
93        <TaskFn<F, Current, FlowSink::Context, FnArgs> as Service<
94            Task<Current, FlowSink::Context, FlowSink::IdType>,
95        >>::Future: Send + 'static,
96        <TaskFn<F, Current, FlowSink::Context, FnArgs> as Service<
97            Task<Current, FlowSink::Context, FlowSink::IdType>,
98        >>::Error: Into<BoxDynError>,
99        FlowSink::IdType: Send + GenerateId,
100        Compact: Sync + Send + 'static,
101        Encode: Codec<Current, Compact = Compact, Error = CodecError> + Send + Sync,
102        CodecError: Send + Sync + std::error::Error + 'static,
103        E: Into<BoxDynError>,
104        Encode: Codec<O, Compact = Compact, Error = CodecError> + 'static,
105        Encode: Codec<GoTo<O>, Compact = Compact, Error = CodecError> + 'static,
106        <FlowSink::Context as MetadataExt<WorkflowRequest>>::Error:
107            std::error::Error + Sync + Send + 'static,
108        FlowSink: WeakTaskSink<O, Codec = Encode, Error = DbError>
109            + Sink<Task<Compact, FlowSink::Context, FlowSink::IdType>, Error = DbError>,
110    {
111        self.add_step::<_, O, _, _, _>(ThenStep {
112            inner: task_fn::<F, Current, FlowSink::Context, FnArgs>(then),
113            _marker: PhantomData,
114        })
115    }
116}