apalis_workflow/and_then/
mod.rs

1use std::marker::PhantomData;
2
3use apalis_core::{
4    backend::{BackendExt, codec::Codec},
5    error::BoxDynError,
6    task::{Task, metadata::MetadataExt},
7    task_fn::{TaskFn, task_fn},
8};
9use futures::{
10    FutureExt, Sink,
11    future::{BoxFuture, ready},
12};
13use tower::{Service, ServiceBuilder, layer::layer_fn};
14
15use crate::{
16    SteppedService,
17    context::{StepContext, WorkflowContext},
18    id_generator::GenerateId,
19    router::{GoTo, StepResult, WorkflowRouter},
20    service::handle_step_result,
21    step::{Layer, Stack, Step},
22    workflow::Workflow,
23};
24
25/// A layer that represents an `and_then` step in the workflow.
26#[derive(Clone, Debug)]
27pub struct AndThen<F> {
28    then_fn: F,
29}
30
31impl<F> AndThen<F> {
32    /// Creates a new `AndThen` layer with the provided function.
33    pub fn new(then_fn: F) -> Self {
34        Self { then_fn }
35    }
36}
37
38/// The step implementation for the `AndThen` layer.
39#[derive(Clone, Debug)]
40pub struct AndThenStep<F, S> {
41    then_fn: F,
42    step: S,
43}
44
45impl<S, F> Layer<S> for AndThen<F>
46where
47    F: Clone,
48{
49    type Step = AndThenStep<F, S>;
50
51    fn layer(&self, step: S) -> Self::Step {
52        AndThenStep {
53            then_fn: self.then_fn.clone(),
54            step,
55        }
56    }
57}
58
59impl<F, Input, S, B, CodecError, SinkError> Step<Input, B> for AndThenStep<F, S>
60where
61    B: BackendExt<Error = SinkError>
62        + Send
63        + Sync
64        + 'static
65        + Clone
66        + Sink<Task<B::Compact, B::Context, B::IdType>, Error = SinkError>
67        + Unpin,
68    F: Service<Task<Input, B::Context, B::IdType>, Error = BoxDynError> + Send + 'static + Clone,
69    S: Step<F::Response, B>,
70    Input: Send + 'static,
71    F::Future: Send + 'static,
72    F::Error: Into<BoxDynError> + Send + 'static,
73    B::Codec: Codec<F::Response, Error = CodecError, Compact = B::Compact>
74        + Codec<Input, Error = CodecError, Compact = B::Compact>
75        + Codec<S::Response, Error = CodecError, Compact = B::Compact>
76        + 'static,
77    CodecError: std::error::Error + Send + Sync + 'static,
78    B::IdType: GenerateId + Send + 'static,
79    S::Response: Send + 'static,
80    B::Compact: Send + 'static,
81    B::Context: Send + MetadataExt<WorkflowContext> + 'static,
82    SinkError: std::error::Error + Send + Sync + 'static,
83    F::Response: Send + 'static,
84{
85    type Response = F::Response;
86    type Error = F::Error;
87    fn register(&mut self, ctx: &mut WorkflowRouter<B>) -> Result<(), BoxDynError> {
88        let svc = ServiceBuilder::new()
89            .layer(layer_fn(|s| AndThenService {
90                service: s,
91                _marker: PhantomData::<(B, Input)>,
92            }))
93            .map_response(|res: F::Response| GoTo::Next(res))
94            .service(self.then_fn.clone());
95        let svc = SteppedService::<B::Compact, B::Context, B::IdType>::new(svc);
96        let count = ctx.steps.len();
97        ctx.steps.insert(count, svc);
98        self.step.register(ctx)
99    }
100}
101
102/// The service implementation for the `AndThen` step.
103#[derive(Debug)]
104pub struct AndThenService<Svc, Backend, Cur> {
105    service: Svc,
106    _marker: PhantomData<(Backend, Cur)>,
107}
108
109impl<Svc, Backend, Cur> AndThenService<Svc, Backend, Cur> {
110    /// Creates a new `AndThenService` with the provided service.
111    pub fn new(service: Svc) -> Self {
112        Self {
113            service,
114            _marker: PhantomData,
115        }
116    }
117}
118
119impl<S, B, Cur, Res, CodecErr, SinkError> Service<Task<B::Compact, B::Context, B::IdType>>
120    for AndThenService<S, B, Cur>
121where
122    S: Service<Task<Cur, B::Context, B::IdType>, Response = GoTo<Res>>,
123    S::Future: Send + 'static,
124    B: BackendExt<Error = SinkError>
125        + Sync
126        + Send
127        + 'static
128        + Clone
129        + Sink<Task<B::Compact, B::Context, B::IdType>, Error = SinkError>
130        + Unpin,
131    B::Codec: Codec<Cur, Compact = B::Compact, Error = CodecErr>
132        + Codec<Res, Compact = B::Compact, Error = CodecErr>,
133    S::Error: Into<BoxDynError> + Send + 'static,
134    CodecErr: Into<BoxDynError> + Send + 'static,
135    Cur: Send + 'static,
136    B::IdType: GenerateId + Send + 'static,
137    SinkError: std::error::Error + Send + Sync + 'static,
138    Res: Send + 'static,
139    B::Compact: Send + 'static,
140    B::Context: Send + MetadataExt<WorkflowContext> + 'static,
141{
142    type Response = GoTo<StepResult<B::Compact, B::IdType>>;
143    type Error = BoxDynError;
144    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
145
146    fn poll_ready(
147        &mut self,
148        cx: &mut std::task::Context<'_>,
149    ) -> std::task::Poll<Result<(), Self::Error>> {
150        self.service.poll_ready(cx).map_err(|e| e.into())
151    }
152
153    fn call(&mut self, request: Task<B::Compact, B::Context, B::IdType>) -> Self::Future {
154        let mut ctx = request.parts.data.get::<StepContext<B>>().cloned().unwrap();
155        let compacted = request.try_map(|t| B::Codec::decode(&t));
156        match compacted {
157            Ok(task) => {
158                let fut = self.service.call(task);
159                async move {
160                    let res = fut.await.map_err(|e| e.into())?;
161                    Ok(handle_step_result(&mut ctx, res).await?)
162                }
163                .boxed()
164            }
165            Err(e) => ready(Err(e.into())).boxed(),
166        }
167    }
168}
169
170impl<Start, Cur, B, L> Workflow<Start, Cur, B, L>
171where
172    B: BackendExt,
173{
174    /// Adds a transformation step to the workflow that processes the output of the previous step.
175    ///
176    /// The `and_then` method allows you to chain operations by providing a function that
177    /// takes the result of the current workflow step and transforms it into the input
178    /// for the next step. This enables building complex processing pipelines with
179    /// type-safe transformations between steps.
180    /// # Example
181    /// ```rust,ignore
182    /// workflow
183    ///     .and_then(extract)
184    ///     .and_then(transform)
185    ///     .and_then(load);
186    /// ```
187    pub fn and_then<F, O, FnArgs>(
188        self,
189        and_then: F,
190    ) -> Workflow<Start, O, B, Stack<AndThen<TaskFn<F, Cur, B::Context, FnArgs>>, L>>
191    where
192        TaskFn<F, Cur, B::Context, FnArgs>: Service<Task<Cur, B::Context, B::IdType>, Response = O>,
193    {
194        self.add_step(AndThen {
195            then_fn: task_fn(and_then),
196        })
197    }
198}