1use std::{f32::consts::E, fmt::Debug, marker::PhantomData, time::Duration};
2
3use apalis_core::{
4 backend::{TaskSink, codec::Codec},
5 error::{BoxDynError, DeferredError, RetryAfterError},
6 task::{self, Task, builder::TaskBuilder, metadata::MetadataExt},
7 task_fn::{TaskFn, task_fn},
8};
9use tower::Service;
10
11use crate::{Step, WorkFlow, WorkflowError, WorkflowRequest, context::StepContext};
12
13#[derive(Debug)]
14pub struct DelayStep<S, T> {
15 duration: S,
16 _marker: std::marker::PhantomData<T>,
17}
18
19impl<S: Clone, T> Clone for DelayStep<S, T> {
20 fn clone(&self) -> Self {
21 DelayStep {
22 duration: self.duration.clone(),
23 _marker: PhantomData,
24 }
25 }
26}
27
28impl<S, T> DelayStep<S, T> {
29 pub fn new(inner: S) -> Self {
30 Self {
31 duration: inner,
32 _marker: std::marker::PhantomData,
33 }
34 }
35}
36
37impl<Current, FlowSink, Encode, Compact> Step<Current, FlowSink, Encode>
38 for DelayStep<Duration, Current>
39where
40 Current: Sync + Send + 'static,
41 FlowSink: Sync + Unpin + TaskSink<Compact> + Send,
42 Current: Send,
43 FlowSink::Context: Send + Sync + Default + MetadataExt<WorkflowRequest>,
44 FlowSink::Error: Into<BoxDynError> + Send + 'static,
45 FlowSink::IdType: Default + Send,
46 Compact: Sync + Send,
47 Encode: Codec<Current, Compact = Compact> + Sync + Send + 'static,
48 Encode::Error: std::error::Error + Sync + Send + 'static,
49 <FlowSink::Context as MetadataExt<WorkflowRequest>>::Error:
50 std::error::Error + Sync + Send + 'static,
51{
52 type Response = Current;
53 type Error = RetryAfterError;
54 async fn pre(
55 &self,
56 ctx: &mut StepContext<FlowSink, Encode>,
57 step: &Current,
58 ) -> Result<(), Self::Error> {
59 ctx.push_next_step(step).await.unwrap();
60 Ok(())
61 }
62
63 async fn run(
64 &mut self,
65 ctx: &StepContext<FlowSink, Encode>,
66 task: Task<Current, FlowSink::Context, FlowSink::IdType>,
67 ) -> Result<Self::Response, Self::Error> {
68 Err(RetryAfterError::new(
69 format!(
70 "Delaying for {:?} before continuing workflow",
71 self.duration
72 ),
73 self.duration,
74 ))
75 }
76}
77
78impl<Input, Current, FlowSink, Encode, Compact> WorkFlow<Input, Current, FlowSink, Encode, Compact>
79where
80 Current: Send + 'static,
81 FlowSink: Send + Clone + Sync + 'static + Unpin + TaskSink<Compact>,
82{
83 pub fn delay_for<CodecError>(
84 self,
85 duration: Duration,
86 ) -> WorkFlow<Input, Current, FlowSink, Encode, Compact>
87 where
88 Current: std::marker::Send + 'static + Sync,
89 FlowSink::Context: Send + Sync + Default + 'static + MetadataExt<WorkflowRequest>,
90 FlowSink::Error: Into<BoxDynError> + Send + 'static,
91 FlowSink::IdType: Send + Default,
92 Compact: Sync + Send + 'static,
93 Encode: Codec<Current, Compact = Compact, Error = CodecError> + Send + Sync + 'static,
94 CodecError: Send + Sync + std::error::Error + 'static,
95 <FlowSink::Context as MetadataExt<WorkflowRequest>>::Error:
96 std::error::Error + Sync + Send + 'static,
97 {
98 self.add_step::<_, Current, _, _>(DelayStep {
99 duration,
100 _marker: PhantomData,
101 })
102 }
103}