apalis_core/step/
mod.rs

1use crate::backend::Backend;
2use crate::builder::WorkerBuilder;
3use crate::codec::Codec;
4use crate::error::{BoxDynError, Error};
5use crate::request::{Parts, Request};
6use crate::service_fn::{service_fn, ServiceFn};
7use crate::storage::Storage;
8use crate::worker::{Ready, Worker};
9use futures::future::BoxFuture;
10use futures::FutureExt;
11use serde::de::DeserializeOwned;
12use serde::{Deserialize, Serialize};
13use std::collections::HashMap;
14use std::fmt::Debug;
15use std::future::Future;
16use std::hash::Hash;
17use std::marker::PhantomData;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20use std::time::Duration;
21use tower::Layer;
22use tower::Service;
23
24type BoxedService<Input, Output> = tower::util::BoxService<Input, Output, crate::error::Error>;
25
26type SteppedService<Compact, Index, Ctx> =
27    BoxedService<Request<StepRequest<Compact, Index>, Ctx>, GoTo<Compact>>;
28
29/// Allows control of the next step
30#[derive(Debug, Serialize, Deserialize, Clone)]
31pub enum GoTo<N = ()> {
32    /// Go to the next step immediately
33    Next(N),
34    /// Delay the next step for some time
35    Delay {
36        /// The input of the next step
37        next: N,
38        /// The period to delay
39        delay: Duration,
40    },
41    /// Complete execution
42    Done(N),
43}
44
45/// A type that allows building the steps order
46#[derive(Debug)]
47pub struct StepBuilder<Ctx, Compact, Input, Current, Encode, Index = usize> {
48    steps: HashMap<Index, SteppedService<Compact, Index, Ctx>>,
49    current_index: Index,
50    current: PhantomData<Current>,
51    codec: PhantomData<Encode>,
52    input: PhantomData<Input>,
53}
54
55impl<Ctx, Compact, Input, Encode, Index: Default> Default
56    for StepBuilder<Ctx, Compact, Input, Input, Encode, Index>
57{
58    fn default() -> Self {
59        Self {
60            steps: HashMap::new(),
61            current_index: Index::default(),
62            current: PhantomData,
63            codec: PhantomData,
64            input: PhantomData,
65        }
66    }
67}
68
69impl<Ctx, Compact, Input, Encode> StepBuilder<Ctx, Compact, Input, Input, Encode, usize> {
70    /// Create a new StepBuilder
71    pub fn new() -> Self {
72        Self {
73            steps: HashMap::new(),
74            current_index: usize::default(),
75            current: PhantomData,
76            codec: PhantomData,
77            input: PhantomData,
78        }
79    }
80
81    /// Build a new StepBuilder with a custom stepper
82    pub fn new_with_stepper<I: Default>() -> StepBuilder<Ctx, Compact, Input, Input, Encode, I> {
83        StepBuilder {
84            steps: HashMap::new(),
85            current_index: I::default(),
86            current: PhantomData,
87            codec: PhantomData,
88            input: PhantomData,
89        }
90    }
91}
92
93// impl<Ctx, Compact, Input, Encode, Index> StepBuilder<Ctx, Compact, Input, Input, Encode, Index> {
94//     pub fn new_with_index<I>() -> Self
95//     where
96//         Index: Default,
97//     {
98//         Self {
99//             steps: HashMap::new(),
100//             current_index: Index::default(),
101//             current: PhantomData,
102//             codec: PhantomData,
103//             input: PhantomData,
104//         }
105//     }
106// }
107
108impl<Ctx, Compact, Input, Current, Encode, Index>
109    StepBuilder<Ctx, Compact, Input, Current, Encode, Index>
110{
111    /// Finalize the step building process
112    pub fn build<S>(self, store: S) -> StepService<Ctx, Compact, Input, S, Index> {
113        StepService {
114            inner: self.steps,
115            storage: store,
116            input: PhantomData,
117        }
118    }
119}
120
121/// Represents the tower service holding the different steps
122#[derive(Debug)]
123pub struct StepService<Ctx, Compact, Input, S, Index> {
124    inner: HashMap<Index, SteppedService<Compact, Index, Ctx>>,
125    storage: S,
126    input: PhantomData<Input>,
127}
128
129impl<
130        Ctx,
131        Compact,
132        S: Storage<Job = StepRequest<Compact, Index>> + Send + Clone + 'static,
133        Input,
134        Index,
135    > Service<Request<StepRequest<Compact, Index>, Ctx>>
136    for StepService<Ctx, Compact, Input, S, Index>
137where
138    Compact: DeserializeOwned + Send + Clone + 'static,
139    S::Error: Send + Sync + std::error::Error,
140    Index: StepIndex + Send + Sync + 'static,
141{
142    type Response = GoTo<Compact>;
143    type Error = crate::error::Error;
144    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
145
146    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
147        Poll::Ready(Ok(()))
148    }
149
150    fn call(&mut self, req: Request<StepRequest<Compact, Index>, Ctx>) -> Self::Future {
151        let index = &req.args.index;
152        let next_index = index.next();
153
154        let service = self
155            .inner
156            .get_mut(index)
157            .expect("Invalid index in inner services");
158        // Call the service and save the result to the store.
159        let fut = service.call(req);
160        let mut storage = self.storage.clone();
161        Box::pin(async move {
162            match fut.await {
163                Ok(response) => {
164                    match &response {
165                        GoTo::Next(resp) => {
166                            storage
167                                .push(StepRequest {
168                                    index: next_index,
169                                    step: resp.clone(),
170                                })
171                                .await
172                                .map_err(|e| Error::SourceError(Arc::new(e.into())))?;
173                        }
174                        GoTo::Delay { next, delay } => {
175                            let now = std::time::SystemTime::now();
176                            let epoch_ms: u128 = now
177                                .duration_since(std::time::UNIX_EPOCH)
178                                .unwrap_or_default()
179                                .as_millis();
180
181                            // Convert the given delay into milliseconds.
182                            let delay: u128 = delay.as_millis();
183
184                            // Calculate the target time as milliseconds since
185                            // the Unix epoch.
186                            let target_ms: u128 = epoch_ms + delay;
187
188                            // Convert the target time into seconds.
189                            let target_s: u128 = (target_ms + 999) / 1000;
190
191                            // Convert to i64 for scheduling.
192                            let target = i64::try_from(target_s).unwrap_or(i64::MAX);
193                            storage
194                                .schedule(
195                                    StepRequest {
196                                        index: next_index,
197                                        step: next.clone(),
198                                    },
199                                    target,
200                                )
201                                .await
202                                .map_err(|e| Error::SourceError(Arc::new(e.into())))?;
203                        }
204                        GoTo::Done(_) => {
205                            // Ignore
206                        }
207                    };
208                    Ok(response)
209                }
210                Err(e) => Err(e),
211            }
212        })
213    }
214}
215
216struct TransformingService<S, Compact, Input, Current, Next, Codec> {
217    inner: S,
218    _req: PhantomData<Compact>,
219    _input: PhantomData<Input>,
220    _codec: PhantomData<Codec>,
221    _output: PhantomData<Next>,
222    _current: PhantomData<Current>,
223}
224
225impl<S, Compact, Codec, Input, Current, Next>
226    TransformingService<S, Compact, Input, Current, Next, Codec>
227{
228    fn new(inner: S) -> Self {
229        TransformingService {
230            inner,
231            _req: PhantomData,
232            _input: PhantomData,
233            _output: PhantomData,
234            _codec: PhantomData,
235            _current: PhantomData,
236        }
237    }
238}
239
240impl<S, Ctx, Input, Current, Next, Compact, Encode, Index>
241    Service<Request<StepRequest<Compact, Index>, Ctx>>
242    for TransformingService<S, Compact, Input, Current, Next, Encode>
243where
244    S: Service<Request<Current, Ctx>, Response = GoTo<Next>>,
245    Ctx: Default,
246    S::Future: Send + 'static,
247    Current: DeserializeOwned,
248    Next: Serialize,
249    Encode: Codec<Compact = Compact>,
250    Encode::Error: Debug,
251{
252    type Response = GoTo<Compact>;
253    type Error = S::Error;
254    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
255
256    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
257        self.inner.poll_ready(cx)
258    }
259
260    fn call(&mut self, req: Request<StepRequest<Compact, Index>, Ctx>) -> Self::Future {
261        let transformed_req: Request<Current, Ctx> = {
262            Request::new_with_parts(
263                Encode::decode(req.args.step).expect(&format!(
264                    "Could not decode step, expecting {}",
265                    std::any::type_name::<Current>()
266                )),
267                req.parts,
268            )
269        };
270        let fut = self.inner.call(transformed_req).map(|res| match res {
271            Ok(o) => Ok(match o {
272                GoTo::Next(next) => {
273                    GoTo::Next(Encode::encode(next).expect("Could not encode the next step"))
274                }
275                GoTo::Delay { next, delay } => GoTo::Delay {
276                    next: Encode::encode(next).expect("Could not encode the next step"),
277                    delay,
278                },
279                GoTo::Done(res) => {
280                    GoTo::Done(Encode::encode(res).expect("Could not encode the next step"))
281                }
282            }),
283            Err(e) => Err(e),
284        });
285
286        Box::pin(fut)
287    }
288}
289
290/// Represents a specific step
291#[derive(Debug, Serialize, Deserialize)]
292pub struct StepRequest<T, Index = usize> {
293    step: T,
294    index: Index,
295}
296
297impl<T, Index> StepRequest<T, Index> {
298    /// Build a new step
299    pub fn new(step: T) -> Self
300    where
301        Index: Default,
302    {
303        Self {
304            step,
305            index: Index::default(),
306        }
307    }
308
309    /// Build a new step with a custom index
310    pub fn new_with_index(step: T, index: Index) -> Self {
311        Self { step, index }
312    }
313}
314
315/// Helper trait for building new steps from [`StepBuilder`]
316pub trait Step<S, Ctx, Compact, Input, Current, Next, Encode, Index> {
317    /// Helper function for building new steps from [`StepBuilder`]
318    fn step(self, service: S) -> StepBuilder<Ctx, Compact, Input, Next, Encode, Index>;
319}
320
321impl<S, Ctx, Input, Current, Next, Compact, Encode, Index>
322    Step<S, Ctx, Compact, Input, Current, Next, Encode, Index>
323    for StepBuilder<Ctx, Compact, Input, Current, Encode, Index>
324where
325    S: Service<Request<Current, Ctx>, Response = GoTo<Next>, Error = crate::error::Error>
326        + Send
327        + 'static
328        + Sync,
329    S::Future: Send + 'static,
330    Current: DeserializeOwned + Send + 'static,
331    S::Response: 'static,
332    Input: Send + 'static + Serialize,
333    Ctx: Default + Send,
334    Next: 'static + Send + Serialize,
335    Compact: Send + 'static,
336    Encode: Codec<Compact = Compact> + Send + 'static,
337    Encode::Error: Debug,
338    Index: StepIndex,
339{
340    fn step(mut self, service: S) -> StepBuilder<Ctx, Compact, Input, Next, Encode, Index> {
341        let next = self.current_index.next();
342        self.steps.insert(
343            self.current_index,
344            BoxedService::new(TransformingService::<
345                S,
346                Compact,
347                Input,
348                Current,
349                Next,
350                Encode,
351            >::new(service)),
352        );
353        StepBuilder {
354            steps: self.steps,
355            current: PhantomData,
356            codec: PhantomData,
357            input: PhantomData,
358            current_index: next,
359        }
360    }
361}
362
363/// Helper trait for building new steps from [`StepBuilder`]
364pub trait StepFn<F, FnArgs, Ctx, Compact, Input, Current, Next, Codec, Index> {
365    /// Helper function for building new steps from [`StepBuilder`]
366    fn step_fn(self, f: F) -> StepBuilder<Ctx, Compact, Input, Next, Codec, Index>;
367}
368
369impl<
370        S,
371        Ctx: Send + Sync,
372        F: Send + Sync,
373        FnArgs: Send + Sync,
374        Input,
375        Current,
376        Next,
377        Compact,
378        Encode,
379        Index,
380    > StepFn<F, FnArgs, Ctx, Compact, Input, Current, Next, Encode, Index> for S
381where
382    S: Step<ServiceFn<F, Current, Ctx, FnArgs>, Ctx, Compact, Input, Current, Next, Encode, Index>,
383{
384    fn step_fn(self, f: F) -> StepBuilder<Ctx, Compact, Input, Next, Encode, Index> {
385        self.step(service_fn(f))
386    }
387}
388
389/// Helper trait for building new Workers from [`WorkerBuilder`]
390pub trait StepWorkerFactory<Ctx, Compact, Input, Output, Index> {
391    /// The request source for the worker
392    type Source;
393
394    /// The service that the worker will run jobs against
395    type Service;
396
397    /// Represents the codec for the backend bound
398    type Codec;
399    /// Builds a [`StepWorkerFactory`] using a [`tower`] service
400    /// that can be used to generate a new [`Worker`] using the `build_stepped` method
401    /// # Arguments
402    ///
403    /// * `service` - A tower service
404    ///
405    /// # Examples
406    ///
407    fn build_stepped(
408        self,
409        builder: StepBuilder<Ctx, Compact, Input, Output, Self::Codec, Index>,
410    ) -> Worker<Ready<Self::Service, Self::Source>>;
411}
412
413impl<Req, P, M, Ctx, Input, Compact, Output, Index>
414    StepWorkerFactory<Ctx, Compact, Input, Output, Index>
415    for WorkerBuilder<Req, Ctx, P, M, StepService<Ctx, Compact, Input, P, Index>>
416where
417    Compact: Send + 'static + Sync,
418    P: Backend<Request<StepRequest<Compact, Index>, Ctx>> + 'static,
419    P: Storage<Job = StepRequest<Compact, Index>> + Clone,
420    M: Layer<StepService<Ctx, Compact, Input, P, Index>> + 'static,
421{
422    type Source = P;
423
424    type Service = M::Service;
425
426    type Codec = <P as Backend<Request<StepRequest<Compact, Index>, Ctx>>>::Codec;
427
428    fn build_stepped(
429        self,
430        builder: StepBuilder<Ctx, Compact, Input, Output, Self::Codec, Index>,
431    ) -> Worker<Ready<M::Service, P>> {
432        let worker_id = self.id;
433        let poller = self.source;
434        let middleware = self.layer;
435        let service = builder.build(poller.clone());
436        let service = middleware.service(service);
437
438        Worker::new(worker_id, Ready::new(service, poller))
439    }
440}
441
442/// Errors encountered while stepping through jobs
443#[derive(Debug, thiserror::Error)]
444pub enum StepError {
445    /// Encountered an encoding error
446    #[error("CodecError: {0}")]
447    CodecError(BoxDynError),
448    /// Encountered an error while pushing to the storage
449    #[error("StorageError: {0}")]
450    StorageError(BoxDynError),
451}
452
453/// Helper trait that transforms a storage with stepping capability
454pub trait SteppableStorage<S: Storage, Codec, Compact, Input, Index> {
455    /// Push a step with a custom index
456    fn push_step<T: Serialize + Send>(
457        &mut self,
458        step: StepRequest<T, Index>,
459    ) -> impl Future<Output = Result<Parts<S::Context>, StepError>> + Send;
460
461    /// Push the first step
462    fn start_stepped(
463        &mut self,
464        step: Input,
465    ) -> impl Future<Output = Result<Parts<S::Context>, StepError>> + Send
466    where
467        Input: Serialize + Send,
468        Index: Default,
469        Self: Send,
470    {
471        async {
472            self.push_step(StepRequest {
473                step,
474                index: Index::default(),
475            })
476            .await
477        }
478    }
479}
480
481impl<S, Encode, Compact, Input, Index> SteppableStorage<S, Encode, Compact, Input, Index> for S
482where
483    S: Storage<Job = StepRequest<Compact, Index>, Codec = Encode>
484        + Backend<Request<StepRequest<Compact, Index>, <S as Storage>::Context>>
485        + Send,
486    Encode: Codec<Compact = Compact>,
487    Encode::Error: std::error::Error + Send + Sync + 'static,
488    S::Error: std::error::Error + Send + Sync + 'static,
489    Compact: Send,
490    Index: Send,
491{
492    async fn push_step<T: Serialize + Send>(
493        &mut self,
494        step: StepRequest<T, Index>,
495    ) -> Result<Parts<S::Context>, StepError> {
496        self.push(StepRequest {
497            index: step.index,
498            step: Encode::encode(&step.step).map_err(|e| StepError::CodecError(Box::new(e)))?,
499        })
500        .await
501        .map_err(|e| StepError::StorageError(Box::new(e)))
502    }
503}
504
505/// A helper trait for planning the step index
506/// TODO: This will need to be improved to offer more flexibility
507pub trait StepIndex: Eq + Hash {
508    /// Returns the next item in the index
509    fn next(&self) -> Self;
510}
511
512impl StepIndex for usize {
513    fn next(&self) -> Self {
514        *self + 1
515    }
516}
517
518impl StepIndex for u32 {
519    fn next(&self) -> Self {
520        *self + 1
521    }
522}