apalis_core/backend/
custom.rs

1//! # Custom Backend
2//!
3//! A highly customizable backend for task processing that allows integration with any persistence engine by providing custom fetcher and sink functions.
4//!
5//! ## Overview
6//!
7//! The [`CustomBackend`] struct enables you to define how tasks are fetched from and persisted to
8//! your storage engine.
9//!
10//! You can use the [`BackendBuilder`] to construct a [`CustomBackend`] by
11//! providing the required database, fetcher, sink, and optional configuration and codec.
12//!
13//! ## Usage
14//!
15//! Use [`BackendBuilder`] to configure and build your custom backend:
16//!
17//! ## Example: CustomBackend with Worker
18//!
19//! ```rust
20//! # use std::collections::VecDeque;
21//! # use std::sync::Arc;
22//! # use futures_util::{lock::Mutex, sink, stream, sink::SinkExt};
23//! # use apalis_core::backend::custom::{BackendBuilder, CustomBackend};
24//! # use apalis_core::task::Task;
25//! # use apalis_core::worker::builder::WorkerBuilder;
26//! # use apalis_core::worker::context::WorkerContext;
27//! # use apalis_core::error::BoxDynError;
28//! # use std::time::Duration;
29//! # use futures_util::StreamExt;
30//! # use futures_util::FutureExt;
31//! # use apalis_core::backend::TaskSink;
32//! #[tokio::main]
33//! async fn main() {
34//!     // Create a memory-backed VecDeque
35//!     let memory = Arc::new(Mutex::new(VecDeque::<Task<u32, ()>>::new()));
36//!
37//!     // Build the custom backend
38//!     let mut backend = BackendBuilder::new()
39//!         .database(memory)
40//!         .fetcher(|memory, _, _| {
41//!             stream::unfold(memory.clone(), |p| async move {
42//!                 let mut memory = p.lock().await;
43//!                 let item = memory.pop_front();
44//!                 drop(memory);
45//!                 match item {
46//!                     Some(item) => Some((Ok::<_, BoxDynError>(Some(item)), p)),
47//!                     None => Some((Ok::<_, BoxDynError>(None), p)),
48//!                 }
49//!             })
50//!             .boxed()
51//!         })
52//!         .sink(|memory, _| {
53//!             sink::unfold(memory.clone(), move |p, item| {
54//!                 async move {
55//!                     let mut memory = p.lock().await;
56//!                     memory.push_back(item);
57//!                     drop(memory);
58//!                     Ok::<_, BoxDynError>(p)
59//!                 }
60//!                 .boxed()
61//!             })
62//!         })
63//!         .build()
64//!         .unwrap();
65//!
66//!     // Add a task to the backend
67//!     backend.send(Task::new(42)).await.unwrap();
68//!
69//!     // Define the task handler
70//!     async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
71//!         tokio::time::sleep(Duration::from_secs(1)).await;
72//! #       ctx.stop().unwrap();
73//!         Ok(())
74//!     }
75//!
76//!     // Build and run the worker
77//!     let worker = WorkerBuilder::new("custom-worker")
78//!         .backend(backend)
79//!         .build(task);
80//!     worker.run().await.unwrap();
81//! }
82//! ```
83//!
84//! ## Features
85//!
86//! - **Custom Fetcher**: Define how jobs are fetched from your storage.
87//! - **Custom Sink**: Define how jobs are persisted to your storage.
88//! - **Configurable**: Pass custom configuration to your backend.
89//!
90use futures_core::stream::BoxStream;
91use futures_sink::Sink;
92use futures_util::SinkExt;
93use futures_util::{Stream, StreamExt};
94use std::pin::Pin;
95use std::sync::Arc;
96use std::task::{Context, Poll};
97use std::{fmt, marker::PhantomData};
98use thiserror::Error;
99use tower_layer::Identity;
100
101use crate::backend::TaskStream;
102use crate::error::BoxDynError;
103use crate::features_table;
104use crate::{backend::Backend, task::Task, worker::context::WorkerContext};
105
106type Fetcher<DB, Config, Fetch> =
107    Arc<Box<dyn Fn(&mut DB, &Config, &WorkerContext) -> Fetch + Send + Sync>>;
108
109type Sinker<DB, Config, Sink> = Arc<Box<dyn Fn(&mut DB, &Config) -> Sink + Send + Sync>>;
110
111/// A highly customizable backend for integration with any persistence engine
112///
113/// This backend allows you to define how tasks are fetched from and persisted to your storage,
114/// meaning you can use it to integrate with existing systems.
115///
116/// # Example
117/// ```rust,ignore
118/// let backend = BackendBuilder::new()
119///     .database(my_db)
120///     .fetcher(my_fetcher_fn)
121///     .sink(my_sink_fn)
122///     .build()
123///     .unwrap();
124/// ```
125#[doc = features_table! {
126    setup = "{ unreachable!() }",
127    TaskSink => supported("Ability to push new tasks", false),
128    Serialization => supported("Serialization support for arguments", false),
129    FetchById => not_supported("Allow fetching a task by its ID"),
130    RegisterWorker => not_implemented("Allow registering a worker with the backend"),
131    PipeExt => limited("Allow other backends to pipe to this backend", false), // Would require Clone,
132    MakeShared => not_implemented("Share the same [`CustomBackend`] across multiple workers", false),
133    Workflow => not_implemented("Flexible enough to support workflows"),
134    WaitForCompletion => not_implemented("Wait for tasks to complete without blocking"), // Would require Clone
135    ResumeById => not_supported("Resume a task by its ID"),
136    ResumeAbandoned => not_supported("Resume abandoned tasks"),
137    ListWorkers => not_implemented("List all workers registered with the backend"),
138    ListTasks => not_implemented("List all tasks in the backend"),
139}]
140#[pin_project::pin_project]
141#[must_use = "Custom backends must be polled or used as a sink"]
142pub struct CustomBackend<Args, DB, Fetch, Sink, IdType, Config = ()> {
143    _marker: PhantomData<(Args, IdType)>,
144    db: DB,
145    fetcher: Fetcher<DB, Config, Fetch>,
146    sinker: Sinker<DB, Config, Sink>,
147    #[pin]
148    current_sink: Sink,
149    config: Config,
150}
151
152impl<Args, DB, Fetch, Sink, IdType, Config> Clone
153    for CustomBackend<Args, DB, Fetch, Sink, IdType, Config>
154where
155    DB: Clone,
156    Config: Clone,
157{
158    fn clone(&self) -> Self {
159        let mut db = self.db.clone();
160        let current_sink = (self.sinker)(&mut db, &self.config);
161        Self {
162            _marker: PhantomData,
163            db,
164            fetcher: Arc::clone(&self.fetcher),
165            sinker: Arc::clone(&self.sinker),
166            current_sink,
167            config: self.config.clone(),
168        }
169    }
170}
171
172impl<Args, DB, Fetch, Sink, IdType, Config> fmt::Debug
173    for CustomBackend<Args, DB, Fetch, Sink, IdType, Config>
174where
175    DB: fmt::Debug,
176    Config: fmt::Debug,
177{
178    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
179        f.debug_struct("CustomBackend")
180            .field(
181                "_marker",
182                &format_args!(
183                    "PhantomData<({}, {})>",
184                    std::any::type_name::<Args>(),
185                    std::any::type_name::<IdType>()
186                ),
187            )
188            .field("db", &self.db)
189            .field("fetcher", &"Fn(&mut DB, &Config, &WorkerContext) -> Fetch")
190            .field("sink", &"Fn(&mut DB, &Config) -> Sink")
191            .field("config", &self.config)
192            .finish()
193    }
194}
195
196type FetcherBuilder<DB, Config, Fetch> =
197    Box<dyn Fn(&mut DB, &Config, &WorkerContext) -> Fetch + Send + Sync + 'static>;
198
199type SinkerBuilder<DB, Config, Sink> =
200    Box<dyn Fn(&mut DB, &Config) -> Sink + Send + Sync + 'static>;
201
202/// Builder for [`CustomBackend`]
203///
204/// Lets you set the database, fetcher, sink, codec, and config
205pub struct BackendBuilder<Args, DB, Fetch, Sink, IdType, Config = ()> {
206    _marker: PhantomData<(Args, IdType)>,
207    database: Option<DB>,
208    fetcher: Option<FetcherBuilder<DB, Config, Fetch>>,
209    sink: Option<SinkerBuilder<DB, Config, Sink>>,
210    config: Option<Config>,
211}
212
213impl<Args, DB, Fetch, Sink, IdType, Config> fmt::Debug
214    for BackendBuilder<Args, DB, Fetch, Sink, IdType, Config>
215where
216    DB: fmt::Debug,
217    Config: fmt::Debug,
218{
219    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
220        f.debug_struct("BackendBuilder")
221            .field(
222                "_marker",
223                &format_args!(
224                    "PhantomData<({}, {})>",
225                    std::any::type_name::<Args>(),
226                    std::any::type_name::<IdType>()
227                ),
228            )
229            .field("database", &self.database)
230            .field("fetcher", &self.fetcher.as_ref().map(|_| "Some(fn)"))
231            .field("sink", &self.sink.as_ref().map(|_| "Some(fn)"))
232            .field("config", &self.config)
233            .finish()
234    }
235}
236
237impl<Args, DB, Fetch, Sink, IdType, Config> Default
238    for BackendBuilder<Args, DB, Fetch, Sink, IdType, Config>
239{
240    fn default() -> Self {
241        Self {
242            _marker: PhantomData,
243            database: None,
244            fetcher: None,
245            sink: None,
246            config: None,
247        }
248    }
249}
250
251impl<Args, DB, Fetch, Sink, IdType> BackendBuilder<Args, DB, Fetch, Sink, IdType, ()> {
252    /// Create a new `BackendBuilder` instance
253    #[must_use]
254    pub fn new() -> Self {
255        Self::new_with_cfg(())
256    }
257
258    /// Create a new `BackendBuilder` instance with custom configuration
259    pub fn new_with_cfg<Config>(
260        config: Config,
261    ) -> BackendBuilder<Args, DB, Fetch, Sink, IdType, Config> {
262        BackendBuilder {
263            config: Some(config),
264            ..Default::default()
265        }
266    }
267}
268
269impl<Args, DB, Fetch, Sink, IdType, Config> BackendBuilder<Args, DB, Fetch, Sink, IdType, Config> {
270    /// The custom backend persistence engine
271    #[must_use]
272    pub fn database(mut self, db: DB) -> Self {
273        self.database = Some(db);
274        self
275    }
276
277    /// The fetcher function to retrieve tasks from the database
278    #[must_use]
279    pub fn fetcher<F: Fn(&mut DB, &Config, &WorkerContext) -> Fetch + Send + Sync + 'static>(
280        mut self,
281        fetcher: F,
282    ) -> Self {
283        self.fetcher = Some(Box::new(fetcher));
284        self
285    }
286
287    /// The sink function to persist tasks to the database
288    #[must_use]
289    pub fn sink<F: Fn(&mut DB, &Config) -> Sink + Send + Sync + 'static>(
290        mut self,
291        sink: F,
292    ) -> Self {
293        self.sink = Some(Box::new(sink));
294        self
295    }
296
297    #[allow(clippy::type_complexity)]
298    /// Build the `CustomBackend` instance
299    pub fn build(self) -> Result<CustomBackend<Args, DB, Fetch, Sink, IdType, Config>, BuildError> {
300        let mut db = self.database.ok_or(BuildError::MissingPool)?;
301        let config = self.config.ok_or(BuildError::MissingConfig)?;
302        let sink_fn = self.sink.ok_or(BuildError::MissingSink)?;
303        let sink = sink_fn(&mut db, &config);
304
305        Ok(CustomBackend {
306            _marker: PhantomData,
307            db,
308            fetcher: self
309                .fetcher
310                .map(Arc::new)
311                .ok_or(BuildError::MissingFetcher)?,
312            current_sink: sink,
313            sinker: Arc::new(sink_fn),
314            config,
315        })
316    }
317}
318
319/// Errors encountered building a `CustomBackend`
320#[derive(Debug, Error)]
321pub enum BuildError {
322    /// Missing database db
323    #[error("Database db is required")]
324    MissingPool,
325    /// Missing fetcher function
326    #[error("Fetcher is required")]
327    MissingFetcher,
328    /// Missing sink function
329    #[error("Sink is required")]
330    MissingSink,
331    /// Missing configuration
332    #[error("Config is required")]
333    MissingConfig,
334}
335
336impl<Args, DB, Fetch, Sink, IdType: Clone, E, Ctx: Default, Config> Backend
337    for CustomBackend<Args, DB, Fetch, Sink, IdType, Config>
338where
339    Fetch: Stream<Item = Result<Option<Task<Args, Ctx, IdType>>, E>> + Send + 'static,
340    E: Into<BoxDynError>,
341{
342    type Args = Args;
343    type IdType = IdType;
344
345    type Context = Ctx;
346
347    type Error = BoxDynError;
348
349    type Stream = TaskStream<Task<Args, Ctx, IdType>, BoxDynError>;
350
351    type Beat = BoxStream<'static, Result<(), Self::Error>>;
352
353    type Layer = Identity;
354
355    fn heartbeat(&self, _: &WorkerContext) -> Self::Beat {
356        futures_util::stream::once(async { Ok(()) }).boxed()
357    }
358
359    fn middleware(&self) -> Self::Layer {
360        Identity::new()
361    }
362
363    fn poll(mut self, worker: &WorkerContext) -> Self::Stream {
364        (self.fetcher)(&mut self.db, &self.config, worker)
365            .map(|task| match task {
366                Ok(Some(t)) => Ok(Some(t)),
367                Ok(None) => Ok(None),
368                Err(e) => Err(e.into()),
369            })
370            .boxed()
371    }
372}
373
374impl<Args, Ctx, IdType, DB, Fetch, S, Config> Sink<Task<Args, Ctx, IdType>>
375    for CustomBackend<Args, DB, Fetch, S, IdType, Config>
376where
377    S: Sink<Task<Args, Ctx, IdType>>,
378{
379    type Error = S::Error;
380
381    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
382        self.project().current_sink.poll_ready_unpin(cx)
383    }
384
385    fn start_send(self: Pin<&mut Self>, item: Task<Args, Ctx, IdType>) -> Result<(), Self::Error> {
386        self.project().current_sink.start_send_unpin(item)
387    }
388
389    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
390        self.project().current_sink.poll_flush_unpin(cx)
391    }
392
393    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
394        self.project().current_sink.poll_close_unpin(cx)
395    }
396}
397
398#[cfg(test)]
399mod tests {
400    use std::{collections::VecDeque, time::Duration};
401
402    use futures_util::{FutureExt, lock::Mutex, sink, stream};
403
404    use crate::{
405        error::BoxDynError,
406        worker::{builder::WorkerBuilder, ext::event_listener::EventListenerExt},
407    };
408
409    use super::*;
410
411    const ITEMS: u32 = 10;
412
413    #[tokio::test]
414    async fn basic_custom_backend() {
415        let memory: Arc<Mutex<VecDeque<Task<u32, ()>>>> = Arc::new(Mutex::new(VecDeque::new()));
416
417        let mut backend = BackendBuilder::new()
418            .database(memory)
419            .fetcher(|db, _, _| {
420                stream::unfold(db.clone(), |p| async move {
421                    tokio::time::sleep(Duration::from_millis(100)).await; // Debounce
422                    let mut db = p.lock().await;
423                    let item = db.pop_front();
424                    drop(db);
425                    match item {
426                        Some(item) => Some((Ok::<_, BoxDynError>(Some(item)), p)),
427                        None => Some((Ok::<_, BoxDynError>(None), p)),
428                    }
429                })
430                .boxed()
431            })
432            .sink(|db, _| {
433                sink::unfold(db.clone(), move |p, item| {
434                    async move {
435                        let mut db = p.lock().await;
436                        db.push_back(item);
437                        drop(db);
438                        Ok::<_, BoxDynError>(p)
439                    }
440                    .boxed()
441                })
442            })
443            .build()
444            .unwrap();
445
446        for i in 0..ITEMS {
447            backend.send(Task::new(i)).await.unwrap();
448        }
449
450        async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
451            tokio::time::sleep(Duration::from_secs(1)).await;
452            if task == ITEMS - 1 {
453                ctx.stop().unwrap();
454                return Err("Worker stopped!")?;
455            }
456            Ok(())
457        }
458
459        let worker = WorkerBuilder::new("rango-tango")
460            .backend(backend)
461            .on_event(|ctx, ev| {
462                println!("On Event = {ev:?} from {}", ctx.name());
463            })
464            .build(task);
465        worker.run().await.unwrap();
466    }
467}