1use std::{fmt::Debug, marker::PhantomData};
2
3use apalis_core::{
4 backend::{codec::Codec, TaskSink},
5 error::BoxDynError,
6 task::{metadata::MetadataExt, Task},
7 task_fn::{task_fn, TaskFn},
8};
9use tower::Service;
10
11use crate::{context::StepContext, Step, WorkFlow, WorkflowError, WorkflowRequest};
12
13#[derive(Debug)]
14pub struct ThenStep<S, T> {
15 inner: S,
16 _marker: std::marker::PhantomData<T>,
17}
18
19impl<S: Clone, T> Clone for ThenStep<S, T> {
20 fn clone(&self) -> Self {
21 ThenStep {
22 inner: self.inner.clone(),
23 _marker: PhantomData,
24 }
25 }
26}
27
28impl<S, T> ThenStep<S, T> {
29 pub fn new(inner: S) -> Self {
30 Self {
31 inner,
32 _marker: std::marker::PhantomData,
33 }
34 }
35}
36
37impl<S, Current, O, E, FlowSink, Encode, Compact> Step<Current, FlowSink, Encode>
38 for ThenStep<S, Current>
39where
40 S: Service<Task<Current, FlowSink::Context, FlowSink::IdType>, Response = O, Error = E>
41 + Sync
42 + Send,
43 Current: Sync + Send + 'static,
44 S::Future: Send + 'static,
45 S::Error: Into<BoxDynError>,
46 E: Into<BoxDynError>,
47 O: Sync,
48 FlowSink: Sync + Unpin + TaskSink<Compact> + Send,
49 Current: Send,
50 FlowSink::Context: Send + Sync + Default + MetadataExt<WorkflowRequest>,
51 FlowSink::Error: Into<BoxDynError> + Send + 'static,
52 FlowSink::IdType: Default + Send,
53 Compact: Sync + Send,
54 Encode: Codec<Current, Compact = Compact> + Sync + Send + 'static,
55 Encode::Error: std::error::Error + Sync + Send + 'static,
56 <FlowSink::Context as MetadataExt<WorkflowRequest>>::Error:
57 std::error::Error + Sync + Send + 'static,
58{
59 type Response = S::Response;
60 type Error = WorkflowError;
61 async fn pre(
62 &self,
63 ctx: &mut StepContext<FlowSink, Encode>,
64 step: &Current,
65 ) -> Result<(), Self::Error> {
66 Ok(())
67 }
68
69 async fn run(
70 &mut self,
71 _: &StepContext<FlowSink, Encode>,
72 args: Task<Current, FlowSink::Context, FlowSink::IdType>,
73 ) -> Result<Self::Response, Self::Error> {
74 let res = self
75 .inner
76 .call(args)
77 .await
78 .map_err(|e| WorkflowError::SingleStepError(e.into()))?;
79 Ok(res)
80 }
81}
82
83impl<Input, Current, FlowSink, Encode, Compact> WorkFlow<Input, Current, FlowSink, Encode, Compact>
84where
85 Current: Send + 'static,
86 FlowSink: Send + Clone + Sync + 'static + Unpin + TaskSink<Compact>,
87{
88 pub fn then<F, O, E, FnArgs, CodecError>(
89 self,
90 then: F,
91 ) -> WorkFlow<Input, O, FlowSink, Encode, Compact>
92 where
93 O: Sync + Send + 'static,
94 E: Into<BoxDynError> + Send + Sync + 'static,
95 F: Send + 'static + Sync + Clone,
96 TaskFn<F, Current, FlowSink::Context, FnArgs>:
97 Service<Task<Current, FlowSink::Context, FlowSink::IdType>, Response = O, Error = E>,
98 FnArgs: std::marker::Send + 'static + Sync,
99 Current: std::marker::Send + 'static + Sync,
100 FlowSink::Context: Send + Sync + Default + 'static + MetadataExt<WorkflowRequest>,
101 FlowSink::Error: Into<BoxDynError> + Send + 'static,
102 <TaskFn<F, Current, FlowSink::Context, FnArgs> as Service<
103 Task<Current, FlowSink::Context, FlowSink::IdType>,
104 >>::Future: Send + 'static,
105 <TaskFn<F, Current, FlowSink::Context, FnArgs> as Service<
106 Task<Current, FlowSink::Context, FlowSink::IdType>,
107 >>::Error: Into<BoxDynError>,
108 FlowSink::IdType: Send + Default,
109 Compact: Sync + Send + 'static,
110 Encode: Codec<Current, Compact = Compact, Error = CodecError> + Send + Sync,
111 CodecError: Send + Sync + std::error::Error + 'static,
112 E: Into<BoxDynError>,
113 Encode: Codec<O, Compact = Compact, Error = CodecError> + 'static,
114 <FlowSink::Context as MetadataExt<WorkflowRequest>>::Error:
115 std::error::Error + Sync + Send + 'static,
116 {
117 self.add_step::<_, O, _, _>(ThenStep {
118 inner: task_fn::<F, Current, FlowSink::Context, FnArgs>(then),
119 _marker: PhantomData,
120 })
121 }
122}