apalis_workflow/steps/
delay.rs

1use std::{convert::Infallible, fmt::Debug, marker::PhantomData, time::Duration};
2
3use apalis_core::{
4    backend::{Backend, WeakTaskSink, codec::Codec},
5    error::BoxDynError,
6    task::{Task, metadata::MetadataExt},
7};
8use futures::Sink;
9
10use crate::{GenerateId, GoTo, Step, Workflow, WorkflowRequest, context::StepContext};
11
12#[derive(Debug)]
13pub struct DelayStep<S, T> {
14    duration: S,
15    _marker: std::marker::PhantomData<T>,
16}
17
18impl<S: Clone, T> Clone for DelayStep<S, T> {
19    fn clone(&self) -> Self {
20        DelayStep {
21            duration: self.duration.clone(),
22            _marker: PhantomData,
23        }
24    }
25}
26
27impl<S, T> DelayStep<S, T> {
28    pub fn new(inner: S) -> Self {
29        Self {
30            duration: inner,
31            _marker: std::marker::PhantomData,
32        }
33    }
34}
35
36impl<Current, FlowSink, Encode, Compact> Step<Current, FlowSink, Encode>
37    for DelayStep<Duration, Current>
38where
39    Current: Sync + Send + 'static,
40    FlowSink: Sync + Unpin + WeakTaskSink<Current> + Send,
41    Current: Send,
42    FlowSink::Context: Send + Sync + Default + MetadataExt<WorkflowRequest>,
43    FlowSink::Error: Into<BoxDynError> + Send + 'static,
44    FlowSink::IdType: GenerateId + Send,
45    Compact: Sync + Send,
46    Encode: Codec<Current, Compact = Compact> + Sync + Send + 'static,
47    Encode::Error: std::error::Error + Sync + Send + 'static,
48    <FlowSink::Context as MetadataExt<WorkflowRequest>>::Error:
49        std::error::Error + Sync + Send + 'static,
50{
51    type Response = Current;
52    type Error = Infallible;
53
54    async fn run(
55        &mut self,
56        _: &StepContext<FlowSink, Encode>,
57        task: Task<Current, FlowSink::Context, FlowSink::IdType>,
58    ) -> Result<GoTo<Current>, Self::Error> {
59        Ok(GoTo::DelayFor(self.duration, task.args))
60    }
61}
62
63impl<Input, Current, FlowSink, Encode, Compact>
64    Workflow<Input, Current, FlowSink, Encode, Compact, FlowSink::Context, FlowSink::IdType>
65where
66    Current: Send + 'static,
67    FlowSink: Send + Clone + Sync + 'static + Unpin + WeakTaskSink<Current, Codec = Encode>,
68{
69    pub fn delay_for<CodecError, DbError>(
70        self,
71        duration: Duration,
72    ) -> Workflow<Input, Current, FlowSink, Encode, Compact, FlowSink::Context, FlowSink::IdType>
73    where
74        Current: std::marker::Send + 'static + Sync,
75        FlowSink::Context: Send + Sync + Default + 'static + MetadataExt<WorkflowRequest>,
76        FlowSink: Sink<Task<Compact, FlowSink::Context, FlowSink::IdType>, Error = DbError>
77            + Backend<Error = DbError>,
78        DbError: std::error::Error + Send + Sync + 'static,
79        FlowSink::IdType: Send + GenerateId,
80        Compact: Sync + Send + 'static,
81        Encode: Codec<Current, Compact = Compact, Error = CodecError> + Send + Sync + 'static,
82        Encode: Codec<GoTo<Current>, Compact = Compact, Error = CodecError> + Send + Sync + 'static,
83        CodecError: Send + Sync + std::error::Error + 'static,
84        <FlowSink::Context as MetadataExt<WorkflowRequest>>::Error:
85            std::error::Error + Sync + Send + 'static,
86    {
87        self.add_step::<_, Current, _, _, DbError>(DelayStep {
88            duration,
89            _marker: PhantomData,
90        })
91    }
92}