apalis_workflow/and_then/
mod.rs1use std::marker::PhantomData;
2
3use apalis_core::{
4 backend::{BackendExt, codec::Codec},
5 error::BoxDynError,
6 task::{Task, metadata::MetadataExt},
7 task_fn::{TaskFn, task_fn},
8};
9use futures::{
10 FutureExt, Sink,
11 future::{BoxFuture, ready},
12};
13use tower::{Service, ServiceBuilder, layer::layer_fn};
14
15use crate::{
16 SteppedService,
17 context::{StepContext, WorkflowContext},
18 id_generator::GenerateId,
19 router::{GoTo, StepResult, WorkflowRouter},
20 service::handle_step_result,
21 step::{Layer, Stack, Step},
22 workflow::Workflow,
23};
24
25#[derive(Clone, Debug)]
27pub struct AndThen<F> {
28 then_fn: F,
29}
30
31impl<F> AndThen<F> {
32 pub fn new(then_fn: F) -> Self {
34 Self { then_fn }
35 }
36}
37
38#[derive(Clone, Debug)]
40pub struct AndThenStep<F, S> {
41 then_fn: F,
42 step: S,
43}
44
45impl<S, F> Layer<S> for AndThen<F>
46where
47 F: Clone,
48{
49 type Step = AndThenStep<F, S>;
50
51 fn layer(&self, step: S) -> Self::Step {
52 AndThenStep {
53 then_fn: self.then_fn.clone(),
54 step,
55 }
56 }
57}
58
59impl<F, Input, S, B, CodecError, SinkError> Step<Input, B> for AndThenStep<F, S>
60where
61 B: BackendExt<Error = SinkError>
62 + Send
63 + Sync
64 + 'static
65 + Clone
66 + Sink<Task<B::Compact, B::Context, B::IdType>, Error = SinkError>
67 + Unpin,
68 F: Service<Task<Input, B::Context, B::IdType>, Error = BoxDynError> + Send + 'static + Clone,
69 S: Step<F::Response, B>,
70 Input: Send + 'static,
71 F::Future: Send + 'static,
72 F::Error: Into<BoxDynError> + Send + 'static,
73 B::Codec: Codec<F::Response, Error = CodecError, Compact = B::Compact>
74 + Codec<Input, Error = CodecError, Compact = B::Compact>
75 + Codec<S::Response, Error = CodecError, Compact = B::Compact>
76 + 'static,
77 CodecError: std::error::Error + Send + Sync + 'static,
78 B::IdType: GenerateId + Send + 'static,
79 S::Response: Send + 'static,
80 B::Compact: Send + 'static,
81 B::Context: Send + MetadataExt<WorkflowContext> + 'static,
82 SinkError: std::error::Error + Send + Sync + 'static,
83 F::Response: Send + 'static,
84{
85 type Response = F::Response;
86 type Error = F::Error;
87 fn register(&mut self, ctx: &mut WorkflowRouter<B>) -> Result<(), BoxDynError> {
88 let svc = ServiceBuilder::new()
89 .layer(layer_fn(|s| AndThenService {
90 service: s,
91 _marker: PhantomData::<(B, Input)>,
92 }))
93 .map_response(|res: F::Response| GoTo::Next(res))
94 .service(self.then_fn.clone());
95 let svc = SteppedService::<B::Compact, B::Context, B::IdType>::new(svc);
96 let count = ctx.steps.len();
97 ctx.steps.insert(count, svc);
98 self.step.register(ctx)
99 }
100}
101
102#[derive(Debug)]
104pub struct AndThenService<Svc, Backend, Cur> {
105 service: Svc,
106 _marker: PhantomData<(Backend, Cur)>,
107}
108
109impl<Svc, Backend, Cur> AndThenService<Svc, Backend, Cur> {
110 pub fn new(service: Svc) -> Self {
112 Self {
113 service,
114 _marker: PhantomData,
115 }
116 }
117}
118
119impl<S, B, Cur, Res, CodecErr, SinkError> Service<Task<B::Compact, B::Context, B::IdType>>
120 for AndThenService<S, B, Cur>
121where
122 S: Service<Task<Cur, B::Context, B::IdType>, Response = GoTo<Res>>,
123 S::Future: Send + 'static,
124 B: BackendExt<Error = SinkError>
125 + Sync
126 + Send
127 + 'static
128 + Clone
129 + Sink<Task<B::Compact, B::Context, B::IdType>, Error = SinkError>
130 + Unpin,
131 B::Codec: Codec<Cur, Compact = B::Compact, Error = CodecErr>
132 + Codec<Res, Compact = B::Compact, Error = CodecErr>,
133 S::Error: Into<BoxDynError> + Send + 'static,
134 CodecErr: Into<BoxDynError> + Send + 'static,
135 Cur: Send + 'static,
136 B::IdType: GenerateId + Send + 'static,
137 SinkError: std::error::Error + Send + Sync + 'static,
138 Res: Send + 'static,
139 B::Compact: Send + 'static,
140 B::Context: Send + MetadataExt<WorkflowContext> + 'static,
141{
142 type Response = GoTo<StepResult<B::Compact, B::IdType>>;
143 type Error = BoxDynError;
144 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
145
146 fn poll_ready(
147 &mut self,
148 cx: &mut std::task::Context<'_>,
149 ) -> std::task::Poll<Result<(), Self::Error>> {
150 self.service.poll_ready(cx).map_err(|e| e.into())
151 }
152
153 fn call(&mut self, request: Task<B::Compact, B::Context, B::IdType>) -> Self::Future {
154 let mut ctx = request.parts.data.get::<StepContext<B>>().cloned().unwrap();
155 let compacted = request.try_map(|t| B::Codec::decode(&t));
156 match compacted {
157 Ok(task) => {
158 let fut = self.service.call(task);
159 async move {
160 let res = fut.await.map_err(|e| e.into())?;
161 Ok(handle_step_result(&mut ctx, res).await?)
162 }
163 .boxed()
164 }
165 Err(e) => ready(Err(e.into())).boxed(),
166 }
167 }
168}
169
170impl<Start, Cur, B, L> Workflow<Start, Cur, B, L>
171where
172 B: BackendExt,
173{
174 pub fn and_then<F, O, FnArgs>(
188 self,
189 and_then: F,
190 ) -> Workflow<Start, O, B, Stack<AndThen<TaskFn<F, Cur, B::Context, FnArgs>>, L>>
191 where
192 TaskFn<F, Cur, B::Context, FnArgs>: Service<Task<Cur, B::Context, B::IdType>, Response = O>,
193 {
194 self.add_step(AndThen {
195 then_fn: task_fn(and_then),
196 })
197 }
198}