apalis_cron/
pipe.rs

1use apalis_core::backend::Backend;
2use apalis_core::error::BoxDynError;
3use apalis_core::request::BoxStream;
4use apalis_core::{poller::Poller, request::Request, worker::Context, worker::Worker};
5use futures::StreamExt;
6use std::{error, fmt};
7
8/// A generic Pipe that wraps an inner type along with a `RequestStream`.
9pub struct CronPipe<Inner> {
10    pub(crate) stream: BoxStream<'static, Result<(), BoxDynError>>,
11    pub(crate) inner: Inner,
12}
13
14impl<Inner: fmt::Debug> fmt::Debug for CronPipe<Inner> {
15    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
16        f.debug_struct("Pipe")
17            .field("stream", &"<RequestStream<()>>") // Placeholder as `RequestStream` might not implement Debug
18            .field("inner", &self.inner)
19            .finish()
20    }
21}
22
23impl<T, Ctx, Inner> Backend<Request<T, Ctx>> for CronPipe<Inner>
24where
25    Inner: Backend<Request<T, Ctx>>,
26{
27    type Stream = Inner::Stream;
28
29    type Layer = Inner::Layer;
30
31    type Codec = Inner::Codec;
32
33    fn poll(mut self, worker: &Worker<Context>) -> Poller<Self::Stream, Self::Layer> {
34        let pipe_heartbeat = async move { while (self.stream.next().await).is_some() {} };
35        let inner = self.inner.poll(worker);
36        let heartbeat = inner.heartbeat;
37
38        Poller::new_with_layer(
39            inner.stream,
40            async {
41                futures::join!(heartbeat, pipe_heartbeat);
42            },
43            inner.layer,
44        )
45    }
46}
47
48/// A cron error
49#[derive(Debug)]
50pub struct PipeError {
51    kind: PipeErrorKind,
52}
53
54/// The kind of pipe error that occurred
55#[derive(Debug)]
56pub enum PipeErrorKind {
57    /// The cron stream provided a None
58    EmptyStream,
59}
60
61impl fmt::Display for PipeError {
62    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63        match self.kind {
64            PipeErrorKind::EmptyStream => write!(f, "The cron stream provided a None",),
65        }
66    }
67}
68
69impl error::Error for PipeError {}
70
71impl From<PipeErrorKind> for PipeError {
72    fn from(kind: PipeErrorKind) -> PipeError {
73        PipeError { kind }
74    }
75}