apalis_core/backend/
pipe.rs

1//! # Pipe streams to backends
2//!
3//! This backend allows you to pipe tasks from any stream into another backend.
4//! It is useful for connecting different backends together, such as piping tasks
5//! from a cron stream into a database backend, or transforming and forwarding tasks
6//! between systems.
7//!
8//! ## Example
9//!
10//! ```rust
11//! # use futures_util::stream;
12//! # use apalis_core::backend::{pipe::PipeExt, dequeue};
13//! # use apalis_core::worker::{builder::WorkerBuilder, context::WorkerContext};
14//! # use apalis_core::error::BoxDynError;
15//! # use std::time::Duration;
16//! # use futures_util::StreamExt;
17//! # use crate::apalis_core::worker::ext::event_listener::EventListenerExt;
18//! #[tokio::main]
19//! async fn main() {
20//!     let stm = stream::iter(0..10).map(|s| Ok::<_, std::io::Error>(s));
21//!
22//!     let in_memory = dequeue::backend::<u32>(Duration::from_secs(1));
23//!     let backend = stm.pipe_to(in_memory);
24//!
25//!     async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
26//!         tokio::time::sleep(Duration::from_secs(1)).await;
27//! #        if task == 9 {
28//! #            ctx.stop().unwrap();
29//! #        }
30//!         Ok(())
31//!     }
32//!
33//!     let worker = WorkerBuilder::new("rango-tango")
34//!         .backend(backend)
35//!         .on_event(|_ctx, ev| {
36//!             println!("On Event = {:?}", ev);
37//!         })
38//!         .build(task);
39//!     worker.run().await.unwrap();
40//! }
41//! ```
42//!
43//! This example pipes a stream of numbers into an in-memory backend and processes them with a worker.
44//!
45//! See also:
46//! - [`apalis-cron`](https://docs.rs/apalis-cron)
47
48use crate::backend::{BackendExt, TaskSink};
49use crate::error::BoxDynError;
50use crate::features_table;
51use crate::task::Task;
52use crate::{backend::Backend, backend::codec::Codec, worker::context::WorkerContext};
53use futures_sink::Sink;
54use futures_util::stream::{once, select};
55use futures_util::{SinkExt, Stream, TryStreamExt};
56use futures_util::{StreamExt, stream::BoxStream};
57use std::fmt;
58use std::fmt::Debug;
59use std::marker::PhantomData;
60use std::ops::{Deref, DerefMut};
61
62/// A generic pipe that wraps a [`Stream`] and passes it to a backend
63#[doc = features_table! {
64    setup = "{ unreachable!() }",
65    TaskSink => supported("Ability to push new tasks", false),
66    InheritsFeatures => limited("Inherits features from the underlying backend", false),
67}]
68pub struct Pipe<S, Into, Args, Ctx> {
69    pub(crate) from: S,
70    pub(crate) into: Into,
71    pub(crate) _req: PhantomData<(Args, Ctx)>,
72}
73
74impl<S: Clone, Into: Clone, Args, Ctx> Clone for Pipe<S, Into, Args, Ctx> {
75    fn clone(&self) -> Self {
76        Self {
77            from: self.from.clone(),
78            into: self.into.clone(),
79            _req: PhantomData,
80        }
81    }
82}
83
84impl<S, Into, Args, Ctx> Deref for Pipe<S, Into, Args, Ctx> {
85    type Target = Into;
86
87    fn deref(&self) -> &Self::Target {
88        &self.into
89    }
90}
91
92impl<S, Into, Args, Ctx> DerefMut for Pipe<S, Into, Args, Ctx> {
93    fn deref_mut(&mut self) -> &mut Self::Target {
94        &mut self.into
95    }
96}
97
98impl<S, Into, Args, Ctx> Pipe<S, Into, Args, Ctx> {
99    /// Create a new Pipe instance
100    pub fn new(stream: S, backend: Into) -> Self {
101        Self {
102            from: stream,
103            into: backend,
104            _req: PhantomData,
105        }
106    }
107}
108
109impl<S: fmt::Debug, Into: fmt::Debug, Args, Ctx> fmt::Debug for Pipe<S, Into, Args, Ctx> {
110    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111        f.debug_struct("Pipe")
112            .field("inner", &self.from)
113            .field("into", &self.into)
114            .finish()
115    }
116}
117
118impl<Args, Ctx, S, TSink, Err> Backend for Pipe<S, TSink, Args, Ctx>
119where
120    S: Stream<Item = Result<Args, Err>> + Send + 'static,
121    TSink: Backend<Args = Args, Context = Ctx>
122        + BackendExt
123        + TaskSink<Args>
124        + Clone
125        + Unpin
126        + Send
127        + 'static
128        + Sink<Task<TSink::Compact, Ctx, TSink::IdType>>,
129    <TSink as Backend>::Error: std::error::Error + Send + Sync + 'static,
130    TSink::Beat: Send + 'static,
131    TSink::IdType: Send + Clone + 'static,
132    TSink::Stream: Send + 'static,
133    Args: Send + 'static,
134    Ctx: Send + 'static + Default,
135    Err: std::error::Error + Send + Sync + 'static,
136    <TSink as Sink<Task<TSink::Compact, Ctx, TSink::IdType>>>::Error:
137        std::error::Error + Send + Sync + 'static,
138    <<TSink as BackendExt>::Codec as Codec<Args>>::Error: std::error::Error + Send + Sync + 'static,
139    TSink::Compact: Send,
140{
141    type Args = Args;
142
143    type IdType = TSink::IdType;
144
145    type Context = Ctx;
146
147    type Stream = BoxStream<'static, Result<Option<Task<Args, Ctx, Self::IdType>>, PipeError>>;
148
149    type Layer = TSink::Layer;
150
151    type Beat = BoxStream<'static, Result<(), PipeError>>;
152
153    type Error = PipeError;
154
155    fn heartbeat(&self, worker: &WorkerContext) -> Self::Beat {
156        self.into
157            .heartbeat(worker)
158            .map_err(|e| PipeError::Inner(e.into()))
159            .boxed()
160    }
161
162    fn middleware(&self) -> Self::Layer {
163        self.into.middleware()
164    }
165
166    fn poll(self, worker: &WorkerContext) -> Self::Stream {
167        let mut sink = self
168            .into
169            .clone()
170            .sink_map_err(|e| PipeError::Inner(e.into()));
171
172        let mut sink_stream = self
173            .from
174            .map_err(|e| PipeError::Inner(e.into()))
175            .map_ok(|s| {
176                Task::new(s)
177                    .try_map(|s| TSink::Codec::encode(&s).map_err(|e| PipeError::Inner(e.into())))
178            })
179            .map(|t| t.and_then(|t| t))
180            .boxed();
181
182        let sender_stream = self.into.poll(worker);
183        select(
184            once(async move {
185                let fut = sink.send_all(&mut sink_stream);
186                fut.await.map_err(|e| PipeError::Inner(e.into()))?;
187                Ok(None)
188            }),
189            sender_stream.map_err(|e| PipeError::Inner(e.into())),
190        )
191        .boxed()
192    }
193}
194
195/// Represents utility for piping streams into a backend
196pub trait PipeExt<B, Args, Ctx>
197where
198    Self: Sized,
199{
200    /// Pipe the current stream into the provided backend
201    fn pipe_to(self, backend: B) -> Pipe<Self, B, Args, Ctx>;
202}
203
204impl<B, Args, Ctx, Err, S> PipeExt<B, Args, Ctx> for S
205where
206    S: Stream<Item = Result<Args, Err>> + Send + 'static,
207    <B as Backend>::Error: Into<BoxDynError> + Send + Sync + 'static,
208    B: Backend<Args = Args> + TaskSink<Args>,
209{
210    fn pipe_to(self, backend: B) -> Pipe<Self, B, Args, Ctx> {
211        Pipe::new(self, backend)
212    }
213}
214
215/// Error encountered while piping streams
216#[derive(Debug, thiserror::Error)]
217pub enum PipeError {
218    /// The cron stream provided a None
219    #[error("The inner stream provided a None")]
220    EmptyStream,
221    /// An inner stream error occurred
222    #[error("The inner stream error: {0}")]
223    Inner(BoxDynError),
224}
225
226#[cfg(test)]
227mod tests {
228    use std::{io, time::Duration};
229
230    use futures_util::stream;
231
232    use crate::{
233        backend::dequeue,
234        error::BoxDynError,
235        worker::{
236            builder::WorkerBuilder, context::WorkerContext, ext::event_listener::EventListenerExt,
237        },
238    };
239
240    use super::*;
241
242    const ITEMS: u32 = 10;
243
244    #[tokio::test]
245    async fn basic_worker() {
246        let stm = stream::iter(0..ITEMS).map(Ok::<_, io::Error>);
247        let in_memory = dequeue::backend::<u32>(Duration::from_secs(1));
248
249        let backend = Pipe::new(stm, in_memory);
250
251        async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
252            tokio::time::sleep(Duration::from_secs(1)).await;
253            if task == ITEMS - 1 {
254                ctx.stop().unwrap();
255                return Err("Graceful Exit".into());
256            }
257            Ok(())
258        }
259
260        let worker = WorkerBuilder::new("rango-tango")
261            .backend(backend)
262            .on_event(|_ctx, ev| {
263                println!("On Event = {ev:?}");
264            })
265            .build(task);
266        worker.run().await.unwrap();
267    }
268}