1use std::{convert::Infallible, fmt::Debug, marker::PhantomData, time::Duration};
2
3use apalis_core::{
4 backend::{Backend, WeakTaskSink, codec::Codec},
5 error::BoxDynError,
6 task::{Task, metadata::MetadataExt},
7};
8use futures::Sink;
9
10use crate::{GenerateId, GoTo, Step, Workflow, WorkflowRequest, context::StepContext};
11
12#[derive(Debug)]
13pub struct DelayStep<S, T> {
14 duration: S,
15 _marker: std::marker::PhantomData<T>,
16}
17
18impl<S: Clone, T> Clone for DelayStep<S, T> {
19 fn clone(&self) -> Self {
20 DelayStep {
21 duration: self.duration.clone(),
22 _marker: PhantomData,
23 }
24 }
25}
26
27impl<S, T> DelayStep<S, T> {
28 pub fn new(inner: S) -> Self {
29 Self {
30 duration: inner,
31 _marker: std::marker::PhantomData,
32 }
33 }
34}
35
36impl<Current, FlowSink, Encode, Compact> Step<Current, FlowSink, Encode>
37 for DelayStep<Duration, Current>
38where
39 Current: Sync + Send + 'static,
40 FlowSink: Sync + Unpin + WeakTaskSink<Current> + Send,
41 Current: Send,
42 FlowSink::Context: Send + Sync + Default + MetadataExt<WorkflowRequest>,
43 FlowSink::Error: Into<BoxDynError> + Send + 'static,
44 FlowSink::IdType: GenerateId + Send,
45 Compact: Sync + Send,
46 Encode: Codec<Current, Compact = Compact> + Sync + Send + 'static,
47 Encode::Error: std::error::Error + Sync + Send + 'static,
48 <FlowSink::Context as MetadataExt<WorkflowRequest>>::Error:
49 std::error::Error + Sync + Send + 'static,
50{
51 type Response = Current;
52 type Error = Infallible;
53
54 async fn run(
55 &mut self,
56 _: &StepContext<FlowSink, Encode>,
57 task: Task<Current, FlowSink::Context, FlowSink::IdType>,
58 ) -> Result<GoTo<Current>, Self::Error> {
59 Ok(GoTo::DelayFor(self.duration, task.args))
60 }
61}
62
63impl<Input, Current, FlowSink, Encode, Compact>
64 Workflow<Input, Current, FlowSink, Encode, Compact, FlowSink::Context, FlowSink::IdType>
65where
66 Current: Send + 'static,
67 FlowSink: Send + Clone + Sync + 'static + Unpin + WeakTaskSink<Current, Codec = Encode>,
68{
69 pub fn delay_for<CodecError, DbError>(
70 self,
71 duration: Duration,
72 ) -> Workflow<Input, Current, FlowSink, Encode, Compact, FlowSink::Context, FlowSink::IdType>
73 where
74 Current: std::marker::Send + 'static + Sync,
75 FlowSink::Context: Send + Sync + Default + 'static + MetadataExt<WorkflowRequest>,
76 FlowSink: Sink<Task<Compact, FlowSink::Context, FlowSink::IdType>, Error = DbError>
77 + Backend<Error = DbError>,
78 DbError: std::error::Error + Send + Sync + 'static,
79 FlowSink::IdType: Send + GenerateId,
80 Compact: Sync + Send + 'static,
81 Encode: Codec<Current, Compact = Compact, Error = CodecError> + Send + Sync + 'static,
82 Encode: Codec<GoTo<Current>, Compact = Compact, Error = CodecError> + Send + Sync + 'static,
83 CodecError: Send + Sync + std::error::Error + 'static,
84 <FlowSink::Context as MetadataExt<WorkflowRequest>>::Error:
85 std::error::Error + Sync + Send + 'static,
86 {
87 self.add_step::<_, Current, _, _, DbError>(DelayStep {
88 duration,
89 _marker: PhantomData,
90 })
91 }
92}