apalis_core/backend/
pipe.rs

1//! # Pipe streams to backends
2//!
3//! This backend allows you to pipe tasks from any stream into another backend.
4//! It is useful for connecting different backends together, such as piping tasks
5//! from a cron stream into a database backend, or transforming and forwarding tasks
6//! between systems.
7//!
8//! ## Example
9//!
10//! ```rust
11//! # use futures_util::stream;
12//! # use apalis_core::backend::pipe::PipeExt;
13//! # use apalis_core::backend::json::JsonStorage;
14//! # use apalis_core::worker::{builder::WorkerBuilder, context::WorkerContext};
15//! # use apalis_core::error::BoxDynError;
16//! # use std::time::Duration;
17//! # use futures_util::StreamExt;
18//! # use crate::apalis_core::worker::ext::event_listener::EventListenerExt;
19//! #[tokio::main]
20//! async fn main() {
21//!     let stm = stream::iter(0..10).map(|s| Ok::<_, std::io::Error>(s));
22//!
23//!     let in_memory = JsonStorage::new_temp().unwrap();
24//!     let backend = stm.pipe_to(in_memory);
25//!
26//!     async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
27//!         tokio::time::sleep(Duration::from_secs(1)).await;
28//! #        if task == 9 {
29//! #            ctx.stop().unwrap();
30//! #        }
31//!         Ok(())
32//!     }
33//!
34//!     let worker = WorkerBuilder::new("rango-tango")
35//!         .backend(backend)
36//!         .on_event(|_ctx, ev| {
37//!             println!("On Event = {:?}", ev);
38//!         })
39//!         .build(task);
40//!     worker.run().await.unwrap();
41//! }
42//! ```
43//!
44//! This example pipes a stream of numbers into an in-memory backend and processes them with a worker.
45//!
46//! See also:
47//! - [`apalis-cron`](https://docs.rs/apalis-cron)
48
49use crate::error::BoxDynError;
50use crate::features_table;
51use crate::task::Task;
52use crate::{backend::Backend, worker::context::WorkerContext};
53use futures_sink::Sink;
54use futures_util::stream::{once, select};
55use futures_util::{stream::BoxStream, StreamExt};
56use futures_util::{SinkExt, Stream, TryStreamExt};
57use std::fmt;
58use std::fmt::Debug;
59use std::marker::PhantomData;
60use std::ops::{Deref, DerefMut};
61
62/// A generic pipe that wraps a [`Stream`] and passes it to a backend
63#[doc = features_table! {
64    setup = unreachable!();,
65    TaskSink => supported("Ability to push new tasks", false),
66    InheritsFeatures => limited("Inherits features from the underlying backend", false),
67}]
68pub struct Pipe<S, Into, Args, Ctx> {
69    pub(crate) from: S,
70    pub(crate) into: Into,
71    pub(crate) _req: PhantomData<(Args, Ctx)>,
72}
73
74impl<S: Clone, Into: Clone, Args, Ctx> Clone for Pipe<S, Into, Args, Ctx> {
75    fn clone(&self) -> Self {
76        Pipe {
77            from: self.from.clone(),
78            into: self.into.clone(),
79            _req: PhantomData,
80        }
81    }
82}
83
84impl<S, Into, Args, Ctx> Deref for Pipe<S, Into, Args, Ctx> {
85    type Target = Into;
86
87    fn deref(&self) -> &Self::Target {
88        &self.into
89    }
90}
91
92impl<S, Into, Args, Ctx> DerefMut for Pipe<S, Into, Args, Ctx> {
93    fn deref_mut(&mut self) -> &mut Self::Target {
94        &mut self.into
95    }
96}
97
98impl<S, Into, Args, Ctx> Pipe<S, Into, Args, Ctx> {
99    /// Create a new Pipe instance
100    pub fn new(stream: S, backend: Into) -> Self {
101        Pipe {
102            from: stream,
103            into: backend,
104            _req: PhantomData,
105        }
106    }
107}
108
109impl<S: fmt::Debug, Into: fmt::Debug, Args, Ctx> fmt::Debug for Pipe<S, Into, Args, Ctx> {
110    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111        f.debug_struct("Pipe")
112            .field("inner", &self.from)
113            .field("into", &self.into)
114            .finish()
115    }
116}
117
118impl<Args, Ctx, S, TSink, Err> Backend<Args> for Pipe<S, TSink, Args, Ctx>
119where
120    S: Stream<Item = Result<Args, Err>> + Send + 'static,
121    TSink: Backend<Args, Context = Ctx>
122        + Sink<Task<Args, Ctx, TSink::IdType>>
123        + Clone
124        + Unpin
125        + Send
126        + 'static,
127    <TSink as Backend<Args>>::Error: Into<BoxDynError> + Send + Sync + 'static,
128    TSink::Beat: Send + 'static,
129    TSink::IdType: Send + Clone + 'static,
130    TSink::Stream: Send + 'static,
131    Args: Send + 'static,
132    Ctx: Send + 'static + Default,
133    Err: Into<BoxDynError> + Send + Sync + 'static,
134    <TSink as Sink<Task<Args, Ctx, TSink::IdType>>>::Error:
135        Into<BoxDynError> + Send + Sync + 'static,
136{
137    type IdType = TSink::IdType;
138
139    type Context = Ctx;
140
141    type Stream = BoxStream<'static, Result<Option<Task<Args, Ctx, Self::IdType>>, PipeError>>;
142
143    type Layer = TSink::Layer;
144
145    type Beat = BoxStream<'static, Result<(), PipeError>>;
146
147    type Error = PipeError;
148
149    type Codec = TSink::Codec;
150
151    fn heartbeat(&self, worker: &WorkerContext) -> Self::Beat {
152        self.into
153            .heartbeat(worker)
154            .map_err(|e| PipeError::Inner(e.into()))
155            .boxed()
156    }
157
158    fn middleware(&self) -> Self::Layer {
159        self.into.middleware()
160    }
161
162    fn poll(self, worker: &WorkerContext) -> Self::Stream {
163        let mut sink = self.into.clone().sink_map_err(|e| e.into());
164
165        let mut sink_stream = self
166            .from
167            .map_ok(|s| Task::new(s))
168            .map_err(|e| e.into())
169            .boxed();
170
171        let sender_stream = self.into.poll(worker);
172        select(
173            once(async move {
174                let fut = sink.send_all(&mut sink_stream);
175                fut.await.map_err(|e| PipeError::Inner(e.into()))?;
176                Ok(None)
177            }),
178            sender_stream.map_err(|e| PipeError::Inner(e.into())),
179        )
180        .boxed()
181    }
182}
183
184/// Represents utility for piping streams into a backend
185pub trait PipeExt<B, Args, Ctx>
186where
187    Self: Sized,
188{
189    /// Pipe the current stream into the provided backend
190    fn pipe_to(self, backend: B) -> Pipe<Self, B, Args, Ctx>;
191}
192
193impl<B, Args, Ctx, Err, S> PipeExt<B, Args, Ctx> for S
194where
195    S: Stream<Item = Result<Args, Err>> + Send + 'static,
196    <B as Backend<Args>>::Error: Into<BoxDynError> + Send + Sync + 'static,
197    B: Backend<Args> + Sink<Task<Args, Ctx, B::IdType>>,
198{
199    fn pipe_to(self, backend: B) -> Pipe<Self, B, Args, Ctx> {
200        Pipe::new(self, backend)
201    }
202}
203
204/// Error encountered while piping streams
205#[derive(Debug, thiserror::Error)]
206pub enum PipeError {
207    /// The cron stream provided a None
208    #[error("The inner stream provided a None")]
209    EmptyStream,
210    /// An inner stream error occurred
211    #[error("The inner stream error: {0}")]
212    Inner(BoxDynError),
213}
214
215#[cfg(test)]
216mod tests {
217    use std::{io, time::Duration};
218
219    use futures_util::stream;
220
221    use crate::{
222        backend::json::JsonStorage,
223        error::BoxDynError,
224        worker::{
225            builder::WorkerBuilder, context::WorkerContext, ext::event_listener::EventListenerExt,
226        },
227    };
228
229    use super::*;
230
231    const ITEMS: u32 = 10;
232
233    #[tokio::test]
234    async fn basic_worker() {
235        let stm = stream::iter(0..ITEMS).map(|s| Ok::<_, io::Error>(s));
236        let in_memory = JsonStorage::new_temp().unwrap();
237
238        let backend = Pipe::new(stm, in_memory);
239
240        async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
241            tokio::time::sleep(Duration::from_secs(1)).await;
242            if task == ITEMS - 1 {
243                ctx.stop().unwrap();
244                return Err("Graceful Exit".into());
245            }
246            Ok(())
247        }
248
249        let worker = WorkerBuilder::new("rango-tango")
250            .backend(backend)
251            .on_event(|_ctx, ev| {
252                println!("On Event = {:?}", ev);
253            })
254            .build(task);
255        worker.run().await.unwrap();
256    }
257}