1use apalis_core::backend::Backend;
2use apalis_core::error::BoxDynError;
3use apalis_core::request::BoxStream;
4use apalis_core::{poller::Poller, request::Request, worker::Context, worker::Worker};
5use futures::StreamExt;
6use std::{error, fmt};
7
8pub struct CronPipe<Inner> {
10 pub(crate) stream: BoxStream<'static, Result<(), BoxDynError>>,
11 pub(crate) inner: Inner,
12}
13
14impl<Inner: fmt::Debug> fmt::Debug for CronPipe<Inner> {
15 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
16 f.debug_struct("Pipe")
17 .field("stream", &"<RequestStream<()>>") .field("inner", &self.inner)
19 .finish()
20 }
21}
22
23impl<T, Ctx, Inner> Backend<Request<T, Ctx>> for CronPipe<Inner>
24where
25 Inner: Backend<Request<T, Ctx>>,
26{
27 type Stream = Inner::Stream;
28
29 type Layer = Inner::Layer;
30
31 type Codec = Inner::Codec;
32
33 fn poll(mut self, worker: &Worker<Context>) -> Poller<Self::Stream, Self::Layer> {
34 let pipe_heartbeat = async move { while (self.stream.next().await).is_some() {} };
35 let inner = self.inner.poll(worker);
36 let heartbeat = inner.heartbeat;
37
38 Poller::new_with_layer(
39 inner.stream,
40 async {
41 futures::join!(heartbeat, pipe_heartbeat);
42 },
43 inner.layer,
44 )
45 }
46}
47
48#[derive(Debug)]
50pub struct PipeError {
51 kind: PipeErrorKind,
52}
53
54#[derive(Debug)]
56pub enum PipeErrorKind {
57 EmptyStream,
59}
60
61impl fmt::Display for PipeError {
62 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63 match self.kind {
64 PipeErrorKind::EmptyStream => write!(f, "The cron stream provided a None",),
65 }
66 }
67}
68
69impl error::Error for PipeError {}
70
71impl From<PipeErrorKind> for PipeError {
72 fn from(kind: PipeErrorKind) -> PipeError {
73 PipeError { kind }
74 }
75}