1use std::{fmt::Debug, marker::PhantomData};
2
3use apalis_core::{
4 backend::{Backend, WeakTaskSink, codec::Codec},
5 error::BoxDynError,
6 task::{Task, metadata::MetadataExt},
7 task_fn::{TaskFn, task_fn},
8};
9use futures::Sink;
10use tower::Service;
11
12use crate::{GenerateId, GoTo, Step, Workflow, WorkflowRequest, context::StepContext};
13
14#[derive(Debug)]
15pub struct ThenStep<S, T> {
16 inner: S,
17 _marker: std::marker::PhantomData<T>,
18}
19
20impl<S: Clone, T> Clone for ThenStep<S, T> {
21 fn clone(&self) -> Self {
22 ThenStep {
23 inner: self.inner.clone(),
24 _marker: PhantomData,
25 }
26 }
27}
28
29impl<S, T> ThenStep<S, T> {
30 pub fn new(inner: S) -> Self {
31 Self {
32 inner,
33 _marker: std::marker::PhantomData,
34 }
35 }
36}
37
38impl<S, Current, O, E, FlowSink, Encode, Compact> Step<Current, FlowSink, Encode>
39 for ThenStep<S, Current>
40where
41 S: Service<Task<Current, FlowSink::Context, FlowSink::IdType>, Response = O, Error = E>
42 + Sync
43 + Send,
44 Current: Sync + Send + 'static,
45 S::Future: Send + 'static,
46 S::Error: Into<BoxDynError>,
47 E: Into<BoxDynError> + Send + Sync + 'static,
48 O: Sync,
49 FlowSink: Sync + Unpin + WeakTaskSink<O> + Send,
50 Current: Send,
51 FlowSink::Context: Send + Sync + Default + MetadataExt<WorkflowRequest>,
52 FlowSink::Error: Into<BoxDynError> + Send + 'static,
53 FlowSink::IdType: GenerateId + Send,
54 Compact: Sync + Send,
55 Encode: Codec<Current, Compact = Compact> + Sync + Send + 'static,
56 Encode::Error: std::error::Error + Sync + Send + 'static,
57 <FlowSink::Context as MetadataExt<WorkflowRequest>>::Error:
58 std::error::Error + Sync + Send + 'static,
59{
60 type Response = S::Response;
61 type Error = S::Error;
62
63 async fn run(
64 &mut self,
65 _: &StepContext<FlowSink, Encode>,
66 args: Task<Current, FlowSink::Context, FlowSink::IdType>,
67 ) -> Result<GoTo<S::Response>, Self::Error> {
68 let res = self.inner.call(args).await?;
69 Ok(GoTo::Next(res))
70 }
71}
72
73impl<Input, Current, FlowSink, Encode, Compact>
74 Workflow<Input, Current, FlowSink, Encode, Compact, FlowSink::Context, FlowSink::IdType>
75where
76 Current: Send + 'static,
77 FlowSink: Send + Clone + Sync + 'static + Unpin + Backend,
78{
79 pub fn then<F, O, E, FnArgs, CodecError, DbError>(
80 self,
81 then: F,
82 ) -> Workflow<Input, O, FlowSink, Encode, Compact, FlowSink::Context, FlowSink::IdType>
83 where
84 O: Sync + Send + 'static,
85 E: Into<BoxDynError> + Send + Sync + 'static,
86 F: Send + 'static + Sync + Clone,
87 TaskFn<F, Current, FlowSink::Context, FnArgs>:
88 Service<Task<Current, FlowSink::Context, FlowSink::IdType>, Response = O, Error = E>,
89 FnArgs: std::marker::Send + 'static + Sync,
90 Current: std::marker::Send + 'static + Sync,
91 FlowSink::Context: Send + Sync + Default + 'static + MetadataExt<WorkflowRequest>,
92 DbError: std::error::Error + Send + Sync + 'static,
93 <TaskFn<F, Current, FlowSink::Context, FnArgs> as Service<
94 Task<Current, FlowSink::Context, FlowSink::IdType>,
95 >>::Future: Send + 'static,
96 <TaskFn<F, Current, FlowSink::Context, FnArgs> as Service<
97 Task<Current, FlowSink::Context, FlowSink::IdType>,
98 >>::Error: Into<BoxDynError>,
99 FlowSink::IdType: Send + GenerateId,
100 Compact: Sync + Send + 'static,
101 Encode: Codec<Current, Compact = Compact, Error = CodecError> + Send + Sync,
102 CodecError: Send + Sync + std::error::Error + 'static,
103 E: Into<BoxDynError>,
104 Encode: Codec<O, Compact = Compact, Error = CodecError> + 'static,
105 Encode: Codec<GoTo<O>, Compact = Compact, Error = CodecError> + 'static,
106 <FlowSink::Context as MetadataExt<WorkflowRequest>>::Error:
107 std::error::Error + Sync + Send + 'static,
108 FlowSink: WeakTaskSink<O, Codec = Encode, Error = DbError>
109 + Sink<Task<Compact, FlowSink::Context, FlowSink::IdType>, Error = DbError>,
110 {
111 self.add_step::<_, O, _, _, _>(ThenStep {
112 inner: task_fn::<F, Current, FlowSink::Context, FnArgs>(then),
113 _marker: PhantomData,
114 })
115 }
116}