apalis_core/backend/
pipe.rs

1//! # Pipe streams to backends
2//!
3//! This backend allows you to pipe tasks from any stream into another backend.
4//! It is useful for connecting different backends together, such as piping tasks
5//! from a cron stream into a database backend, or transforming and forwarding tasks
6//! between systems.
7//!
8//! ## Example
9//!
10//! ```rust
11//! # use futures_util::stream;
12//! # use apalis_core::backend::pipe::PipeExt;
13//! # use apalis_core::backend::json::JsonStorage;
14//! # use apalis_core::worker::{builder::WorkerBuilder, context::WorkerContext};
15//! # use apalis_core::error::BoxDynError;
16//! # use std::time::Duration;
17//! # use futures_util::StreamExt;
18//! # use crate::apalis_core::worker::ext::event_listener::EventListenerExt;
19//! #[tokio::main]
20//! async fn main() {
21//!     let stm = stream::iter(0..10).map(|s| Ok::<_, std::io::Error>(s));
22//!
23//!     let in_memory = JsonStorage::new_temp().unwrap();
24//!     let backend = stm.pipe_to(in_memory);
25//!
26//!     async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
27//!         tokio::time::sleep(Duration::from_secs(1)).await;
28//! #        if task == 9 {
29//! #            ctx.stop().unwrap();
30//! #        }
31//!         Ok(())
32//!     }
33//!
34//!     let worker = WorkerBuilder::new("rango-tango")
35//!         .backend(backend)
36//!         .on_event(|_ctx, ev| {
37//!             println!("On Event = {:?}", ev);
38//!         })
39//!         .build(task);
40//!     worker.run().await.unwrap();
41//! }
42//! ```
43//!
44//! This example pipes a stream of numbers into an in-memory backend and processes them with a worker.
45//!
46//! See also:
47//! - [`apalis-cron`](https://docs.rs/apalis-cron)
48
49use crate::backend::TaskSink;
50use crate::error::BoxDynError;
51use crate::features_table;
52use crate::task::Task;
53use crate::{backend::Backend, backend::codec::Codec, worker::context::WorkerContext};
54use futures_sink::Sink;
55use futures_util::stream::{once, select};
56use futures_util::{SinkExt, Stream, TryStreamExt};
57use futures_util::{StreamExt, stream::BoxStream};
58use std::fmt;
59use std::fmt::Debug;
60use std::marker::PhantomData;
61use std::ops::{Deref, DerefMut};
62
63/// A generic pipe that wraps a [`Stream`] and passes it to a backend
64#[doc = features_table! {
65    setup = unreachable!();,
66    TaskSink => supported("Ability to push new tasks", false),
67    InheritsFeatures => limited("Inherits features from the underlying backend", false),
68}]
69pub struct Pipe<S, Into, Args, Ctx> {
70    pub(crate) from: S,
71    pub(crate) into: Into,
72    pub(crate) _req: PhantomData<(Args, Ctx)>,
73}
74
75impl<S: Clone, Into: Clone, Args, Ctx> Clone for Pipe<S, Into, Args, Ctx> {
76    fn clone(&self) -> Self {
77        Pipe {
78            from: self.from.clone(),
79            into: self.into.clone(),
80            _req: PhantomData,
81        }
82    }
83}
84
85impl<S, Into, Args, Ctx> Deref for Pipe<S, Into, Args, Ctx> {
86    type Target = Into;
87
88    fn deref(&self) -> &Self::Target {
89        &self.into
90    }
91}
92
93impl<S, Into, Args, Ctx> DerefMut for Pipe<S, Into, Args, Ctx> {
94    fn deref_mut(&mut self) -> &mut Self::Target {
95        &mut self.into
96    }
97}
98
99impl<S, Into, Args, Ctx> Pipe<S, Into, Args, Ctx> {
100    /// Create a new Pipe instance
101    pub fn new(stream: S, backend: Into) -> Self {
102        Pipe {
103            from: stream,
104            into: backend,
105            _req: PhantomData,
106        }
107    }
108}
109
110impl<S: fmt::Debug, Into: fmt::Debug, Args, Ctx> fmt::Debug for Pipe<S, Into, Args, Ctx> {
111    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
112        f.debug_struct("Pipe")
113            .field("inner", &self.from)
114            .field("into", &self.into)
115            .finish()
116    }
117}
118
119impl<Args, Ctx, S, TSink, Err> Backend for Pipe<S, TSink, Args, Ctx>
120where
121    S: Stream<Item = Result<Args, Err>> + Send + 'static,
122    TSink: Backend<Args = Args, Context = Ctx>
123        + TaskSink<Args>
124        + Clone
125        + Unpin
126        + Send
127        + 'static
128        + Sink<Task<TSink::Compact, Ctx, TSink::IdType>>,
129    <TSink as Backend>::Error: std::error::Error + Send + Sync + 'static,
130    TSink::Beat: Send + 'static,
131    TSink::IdType: Send + Clone + 'static,
132    TSink::Stream: Send + 'static,
133    Args: Send + 'static,
134    Ctx: Send + 'static + Default,
135    Err: std::error::Error + Send + Sync + 'static,
136    <TSink as Sink<Task<TSink::Compact, Ctx, TSink::IdType>>>::Error:
137        std::error::Error + Send + Sync + 'static,
138    <<TSink as Backend>::Codec as Codec<Args>>::Error: std::error::Error + Send + Sync + 'static,
139    TSink::Compact: Send,
140{
141    type Args = Args;
142
143    type IdType = TSink::IdType;
144
145    type Context = Ctx;
146
147    type Stream = BoxStream<'static, Result<Option<Task<Args, Ctx, Self::IdType>>, PipeError>>;
148
149    type Layer = TSink::Layer;
150
151    type Beat = BoxStream<'static, Result<(), PipeError>>;
152
153    type Error = PipeError;
154
155    type Codec = TSink::Codec;
156
157    type Compact = TSink::Compact;
158
159    fn heartbeat(&self, worker: &WorkerContext) -> Self::Beat {
160        self.into
161            .heartbeat(worker)
162            .map_err(|e| PipeError::Inner(e.into()))
163            .boxed()
164    }
165
166    fn middleware(&self) -> Self::Layer {
167        self.into.middleware()
168    }
169
170    fn poll(self, worker: &WorkerContext) -> Self::Stream {
171        let mut sink = self
172            .into
173            .clone()
174            .sink_map_err(|e| PipeError::Inner(e.into()));
175
176        let mut sink_stream = self
177            .from
178            .map_err(|e| PipeError::Inner(e.into()))
179            .map_ok(|s| {
180                Task::new(s)
181                    .try_map(|s| TSink::Codec::encode(&s).map_err(|e| PipeError::Inner(e.into())))
182            })
183            .map(|t| t.and_then(|t| t))
184            .boxed();
185
186        let sender_stream = self.into.poll(worker);
187        select(
188            once(async move {
189                let fut = sink.send_all(&mut sink_stream);
190                fut.await.map_err(|e| PipeError::Inner(e.into()))?;
191                Ok(None)
192            }),
193            sender_stream.map_err(|e| PipeError::Inner(e.into())),
194        )
195        .boxed()
196    }
197}
198
199/// Represents utility for piping streams into a backend
200pub trait PipeExt<B, Args, Ctx>
201where
202    Self: Sized,
203{
204    /// Pipe the current stream into the provided backend
205    fn pipe_to(self, backend: B) -> Pipe<Self, B, Args, Ctx>;
206}
207
208impl<B, Args, Ctx, Err, S> PipeExt<B, Args, Ctx> for S
209where
210    S: Stream<Item = Result<Args, Err>> + Send + 'static,
211    <B as Backend>::Error: Into<BoxDynError> + Send + Sync + 'static,
212    B: Backend<Args = Args> + TaskSink<Args>,
213{
214    fn pipe_to(self, backend: B) -> Pipe<Self, B, Args, Ctx> {
215        Pipe::new(self, backend)
216    }
217}
218
219/// Error encountered while piping streams
220#[derive(Debug, thiserror::Error)]
221pub enum PipeError {
222    /// The cron stream provided a None
223    #[error("The inner stream provided a None")]
224    EmptyStream,
225    /// An inner stream error occurred
226    #[error("The inner stream error: {0}")]
227    Inner(BoxDynError),
228}
229
230#[cfg(test)]
231#[cfg(feature = "json")]
232mod tests {
233    use std::{io, time::Duration};
234
235    use futures_util::stream;
236
237    use crate::{
238        backend::json::JsonStorage,
239        error::BoxDynError,
240        worker::{
241            builder::WorkerBuilder, context::WorkerContext, ext::event_listener::EventListenerExt,
242        },
243    };
244
245    use super::*;
246
247    const ITEMS: u32 = 10;
248
249    #[tokio::test]
250    async fn basic_worker() {
251        let stm = stream::iter(0..ITEMS).map(|s| Ok::<_, io::Error>(s));
252        let in_memory = JsonStorage::new_temp().unwrap();
253
254        let backend = Pipe::new(stm, in_memory);
255
256        async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
257            tokio::time::sleep(Duration::from_secs(1)).await;
258            if task == ITEMS - 1 {
259                ctx.stop().unwrap();
260                return Err("Graceful Exit".into());
261            }
262            Ok(())
263        }
264
265        let worker = WorkerBuilder::new("rango-tango")
266            .backend(backend)
267            .on_event(|_ctx, ev| {
268                println!("On Event = {:?}", ev);
269            })
270            .build(task);
271        worker.run().await.unwrap();
272    }
273}