apalis_workflow/steps/
delay.rs

1use std::{f32::consts::E, fmt::Debug, marker::PhantomData, time::Duration};
2
3use apalis_core::{
4    backend::{TaskSink, codec::Codec},
5    error::{BoxDynError, DeferredError, RetryAfterError},
6    task::{self, Task, builder::TaskBuilder, metadata::MetadataExt},
7    task_fn::{TaskFn, task_fn},
8};
9use tower::Service;
10
11use crate::{Step, WorkFlow, WorkflowError, WorkflowRequest, context::StepContext};
12
13#[derive(Debug)]
14pub struct DelayStep<S, T> {
15    duration: S,
16    _marker: std::marker::PhantomData<T>,
17}
18
19impl<S: Clone, T> Clone for DelayStep<S, T> {
20    fn clone(&self) -> Self {
21        DelayStep {
22            duration: self.duration.clone(),
23            _marker: PhantomData,
24        }
25    }
26}
27
28impl<S, T> DelayStep<S, T> {
29    pub fn new(inner: S) -> Self {
30        Self {
31            duration: inner,
32            _marker: std::marker::PhantomData,
33        }
34    }
35}
36
37impl<Current, FlowSink, Encode, Compact> Step<Current, FlowSink, Encode>
38    for DelayStep<Duration, Current>
39where
40    Current: Sync + Send + 'static,
41    FlowSink: Sync + Unpin + TaskSink<Compact> + Send,
42    Current: Send,
43    FlowSink::Context: Send + Sync + Default + MetadataExt<WorkflowRequest>,
44    FlowSink::Error: Into<BoxDynError> + Send + 'static,
45    FlowSink::IdType: Default + Send,
46    Compact: Sync + Send,
47    Encode: Codec<Current, Compact = Compact> + Sync + Send + 'static,
48    Encode::Error: std::error::Error + Sync + Send + 'static,
49    <FlowSink::Context as MetadataExt<WorkflowRequest>>::Error:
50        std::error::Error + Sync + Send + 'static,
51{
52    type Response = Current;
53    type Error = RetryAfterError;
54    async fn pre(
55        &self,
56        ctx: &mut StepContext<FlowSink, Encode>,
57        step: &Current,
58    ) -> Result<(), Self::Error> {
59        ctx.push_next_step(step).await.unwrap();
60        Ok(())
61    }
62
63    async fn run(
64        &mut self,
65        ctx: &StepContext<FlowSink, Encode>,
66        task: Task<Current, FlowSink::Context, FlowSink::IdType>,
67    ) -> Result<Self::Response, Self::Error> {
68        Err(RetryAfterError::new(
69            format!(
70                "Delaying for {:?} before continuing workflow",
71                self.duration
72            ),
73            self.duration,
74        ))
75    }
76}
77
78impl<Input, Current, FlowSink, Encode, Compact> WorkFlow<Input, Current, FlowSink, Encode, Compact>
79where
80    Current: Send + 'static,
81    FlowSink: Send + Clone + Sync + 'static + Unpin + TaskSink<Compact>,
82{
83    pub fn delay_for<CodecError>(
84        self,
85        duration: Duration,
86    ) -> WorkFlow<Input, Current, FlowSink, Encode, Compact>
87    where
88        Current: std::marker::Send + 'static + Sync,
89        FlowSink::Context: Send + Sync + Default + 'static + MetadataExt<WorkflowRequest>,
90        FlowSink::Error: Into<BoxDynError> + Send + 'static,
91        FlowSink::IdType: Send + Default,
92        Compact: Sync + Send + 'static,
93        Encode: Codec<Current, Compact = Compact, Error = CodecError> + Send + Sync + 'static,
94        CodecError: Send + Sync + std::error::Error + 'static,
95        <FlowSink::Context as MetadataExt<WorkflowRequest>>::Error:
96            std::error::Error + Sync + Send + 'static,
97    {
98        self.add_step::<_, Current, _, _>(DelayStep {
99            duration,
100            _marker: PhantomData,
101        })
102    }
103}