apalis_core/backend/
custom.rs

1//! # Custom Backend
2//!
3//! A highly customizable backend for task processing that allows integration with any persistence engine by providing custom fetcher and sink functions.
4//!
5//! ## Overview
6//!
7//! The [`CustomBackend`] struct enables you to define how tasks are fetched from and persisted to
8//! your storage engine.
9//!
10//! You can use the [`BackendBuilder`] to construct a [`CustomBackend`] by
11//! providing the required database, fetcher, sink, and optional configuration and codec.
12//!
13//! ## Usage
14//!
15//! Use [`BackendBuilder`] to configure and build your custom backend:
16//!
17//! ## Example: CustomBackend with Worker
18//!
19//! ```rust
20//! # use std::collections::VecDeque;
21//! # use std::sync::Arc;
22//! # use futures_util::{lock::Mutex, sink, stream, sink::SinkExt};
23//! # use apalis_core::backend::custom::{BackendBuilder, CustomBackend};
24//! # use apalis_core::task::Task;
25//! # use apalis_core::worker::builder::WorkerBuilder;
26//! # use apalis_core::worker::context::WorkerContext;
27//! # use apalis_core::error::BoxDynError;
28//! # use std::time::Duration;
29//! # use futures_util::StreamExt;
30//! # use futures_util::FutureExt;
31//! # use apalis_core::backend::TaskSink;
32//! # use apalis_core::task::task_id::RandomId;
33//! #[tokio::main]
34//! async fn main() {
35//!     // Create a memory-backed VecDeque
36//!     let memory = Arc::new(Mutex::new(VecDeque::<Task<u32, (), RandomId>>::new()));
37//!
38//!     // Build the custom backend
39//!     let mut backend = BackendBuilder::new()
40//!         .database(memory)
41//!         .fetcher(|memory, _, _| {
42//!             stream::unfold(memory.clone(), |p| async move {
43//!                 let mut memory = p.lock().await;
44//!                 let item = memory.pop_front();
45//!                 drop(memory);
46//!                 match item {
47//!                     Some(item) => Some((Ok::<_, BoxDynError>(Some(item)), p)),
48//!                     None => Some((Ok::<_, BoxDynError>(None), p)),
49//!                 }
50//!             })
51//!             .boxed()
52//!         })
53//!         .sink(|memory, _| {
54//!             sink::unfold(memory.clone(), move |p, item| {
55//!                 async move {
56//!                     let mut memory = p.lock().await;
57//!                     memory.push_back(item);
58//!                     drop(memory);
59//!                     Ok::<_, BoxDynError>(p)
60//!                 }
61//!                 .boxed()
62//!             })
63//!         })
64//!         .build()
65//!         .unwrap();
66//!
67//!     // Add a task to the backend
68//!     backend.send(Task::new(42)).await.unwrap();
69//!
70//!     // Define the task handler
71//!     async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
72//!         tokio::time::sleep(Duration::from_secs(1)).await;
73//! #       ctx.stop().unwrap();
74//!         Ok(())
75//!     }
76//!
77//!     // Build and run the worker
78//!     let worker = WorkerBuilder::new("custom-worker")
79//!         .backend(backend)
80//!         .build(task);
81//!     worker.run().await.unwrap();
82//! }
83//! ```
84//!
85//! ## Features
86//!
87//! - **Custom Fetcher**: Define how jobs are fetched from your storage.
88//! - **Custom Sink**: Define how jobs are persisted to your storage.
89//! - **Configurable**: Pass custom configuration to your backend.
90//!
91use futures_core::stream::BoxStream;
92use futures_sink::Sink;
93use futures_util::SinkExt;
94use futures_util::{Stream, StreamExt};
95use std::pin::Pin;
96use std::sync::Arc;
97use std::task::{Context, Poll};
98use std::{fmt, marker::PhantomData};
99use thiserror::Error;
100use tower_layer::Identity;
101
102use crate::backend::TaskStream;
103use crate::error::BoxDynError;
104use crate::features_table;
105use crate::{backend::Backend, task::Task, worker::context::WorkerContext};
106
107type Fetcher<DB, Config, Fetch> =
108    Arc<Box<dyn Fn(&mut DB, &Config, &WorkerContext) -> Fetch + Send + Sync>>;
109
110type Sinker<DB, Config, Sink> = Arc<Box<dyn Fn(&mut DB, &Config) -> Sink + Send + Sync>>;
111
112/// A highly customizable backend for integration with any persistence engine
113///
114/// This backend allows you to define how tasks are fetched from and persisted to your storage,
115/// meaning you can use it to integrate with existing systems.
116///
117/// # Example
118/// ```rust,ignore
119/// let backend = BackendBuilder::new()
120///     .database(my_db)
121///     .fetcher(my_fetcher_fn)
122///     .sink(my_sink_fn)
123///     .build()
124///     .unwrap();
125/// ```
126#[doc = features_table! {
127    setup = "{ unreachable!() }",
128    TaskSink => supported("Ability to push new tasks", false),
129    Serialization => supported("Serialization support for arguments", false),
130    FetchById => not_supported("Allow fetching a task by its ID"),
131    RegisterWorker => not_implemented("Allow registering a worker with the backend"),
132    PipeExt => limited("Allow other backends to pipe to this backend", false), // Would require Clone,
133    MakeShared => not_implemented("Share the same [`CustomBackend`] across multiple workers", false),
134    Workflow => not_implemented("Flexible enough to support workflows"),
135    WaitForCompletion => not_implemented("Wait for tasks to complete without blocking"), // Would require Clone
136    ResumeById => not_supported("Resume a task by its ID"),
137    ResumeAbandoned => not_supported("Resume abandoned tasks"),
138    ListWorkers => not_implemented("List all workers registered with the backend"),
139    ListTasks => not_implemented("List all tasks in the backend"),
140}]
141#[pin_project::pin_project]
142#[must_use = "Custom backends must be polled or used as a sink"]
143pub struct CustomBackend<Args, DB, Fetch, Sink, IdType, Config = ()> {
144    _marker: PhantomData<(Args, IdType)>,
145    db: DB,
146    fetcher: Fetcher<DB, Config, Fetch>,
147    sinker: Sinker<DB, Config, Sink>,
148    #[pin]
149    current_sink: Sink,
150    config: Config,
151}
152
153impl<Args, DB, Fetch, Sink, IdType, Config> Clone
154    for CustomBackend<Args, DB, Fetch, Sink, IdType, Config>
155where
156    DB: Clone,
157    Config: Clone,
158{
159    fn clone(&self) -> Self {
160        let mut db = self.db.clone();
161        let current_sink = (self.sinker)(&mut db, &self.config);
162        Self {
163            _marker: PhantomData,
164            db,
165            fetcher: Arc::clone(&self.fetcher),
166            sinker: Arc::clone(&self.sinker),
167            current_sink,
168            config: self.config.clone(),
169        }
170    }
171}
172
173impl<Args, DB, Fetch, Sink, IdType, Config> fmt::Debug
174    for CustomBackend<Args, DB, Fetch, Sink, IdType, Config>
175where
176    DB: fmt::Debug,
177    Config: fmt::Debug,
178{
179    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
180        f.debug_struct("CustomBackend")
181            .field(
182                "_marker",
183                &format_args!(
184                    "PhantomData<({}, {})>",
185                    std::any::type_name::<Args>(),
186                    std::any::type_name::<IdType>()
187                ),
188            )
189            .field("db", &self.db)
190            .field("fetcher", &"Fn(&mut DB, &Config, &WorkerContext) -> Fetch")
191            .field("sink", &"Fn(&mut DB, &Config) -> Sink")
192            .field("config", &self.config)
193            .finish()
194    }
195}
196
197type FetcherBuilder<DB, Config, Fetch> =
198    Box<dyn Fn(&mut DB, &Config, &WorkerContext) -> Fetch + Send + Sync + 'static>;
199
200type SinkerBuilder<DB, Config, Sink> =
201    Box<dyn Fn(&mut DB, &Config) -> Sink + Send + Sync + 'static>;
202
203/// Builder for [`CustomBackend`]
204///
205/// Lets you set the database, fetcher, sink, codec, and config
206pub struct BackendBuilder<Args, DB, Fetch, Sink, IdType, Config = ()> {
207    _marker: PhantomData<(Args, IdType)>,
208    database: Option<DB>,
209    fetcher: Option<FetcherBuilder<DB, Config, Fetch>>,
210    sink: Option<SinkerBuilder<DB, Config, Sink>>,
211    config: Option<Config>,
212}
213
214impl<Args, DB, Fetch, Sink, IdType, Config> fmt::Debug
215    for BackendBuilder<Args, DB, Fetch, Sink, IdType, Config>
216where
217    DB: fmt::Debug,
218    Config: fmt::Debug,
219{
220    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221        f.debug_struct("BackendBuilder")
222            .field(
223                "_marker",
224                &format_args!(
225                    "PhantomData<({}, {})>",
226                    std::any::type_name::<Args>(),
227                    std::any::type_name::<IdType>()
228                ),
229            )
230            .field("database", &self.database)
231            .field("fetcher", &self.fetcher.as_ref().map(|_| "Some(fn)"))
232            .field("sink", &self.sink.as_ref().map(|_| "Some(fn)"))
233            .field("config", &self.config)
234            .finish()
235    }
236}
237
238impl<Args, DB, Fetch, Sink, IdType, Config> Default
239    for BackendBuilder<Args, DB, Fetch, Sink, IdType, Config>
240{
241    fn default() -> Self {
242        Self {
243            _marker: PhantomData,
244            database: None,
245            fetcher: None,
246            sink: None,
247            config: None,
248        }
249    }
250}
251
252impl<Args, DB, Fetch, Sink, IdType> BackendBuilder<Args, DB, Fetch, Sink, IdType, ()> {
253    /// Create a new `BackendBuilder` instance
254    #[must_use]
255    pub fn new() -> Self {
256        Self::new_with_cfg(())
257    }
258
259    /// Create a new `BackendBuilder` instance with custom configuration
260    pub fn new_with_cfg<Config>(
261        config: Config,
262    ) -> BackendBuilder<Args, DB, Fetch, Sink, IdType, Config> {
263        BackendBuilder {
264            config: Some(config),
265            ..Default::default()
266        }
267    }
268}
269
270impl<Args, DB, Fetch, Sink, IdType, Config> BackendBuilder<Args, DB, Fetch, Sink, IdType, Config> {
271    /// The custom backend persistence engine
272    #[must_use]
273    pub fn database(mut self, db: DB) -> Self {
274        self.database = Some(db);
275        self
276    }
277
278    /// The fetcher function to retrieve tasks from the database
279    #[must_use]
280    pub fn fetcher<F: Fn(&mut DB, &Config, &WorkerContext) -> Fetch + Send + Sync + 'static>(
281        mut self,
282        fetcher: F,
283    ) -> Self {
284        self.fetcher = Some(Box::new(fetcher));
285        self
286    }
287
288    /// The sink function to persist tasks to the database
289    #[must_use]
290    pub fn sink<F: Fn(&mut DB, &Config) -> Sink + Send + Sync + 'static>(
291        mut self,
292        sink: F,
293    ) -> Self {
294        self.sink = Some(Box::new(sink));
295        self
296    }
297
298    #[allow(clippy::type_complexity)]
299    /// Build the `CustomBackend` instance
300    pub fn build(self) -> Result<CustomBackend<Args, DB, Fetch, Sink, IdType, Config>, BuildError> {
301        let mut db = self.database.ok_or(BuildError::MissingDb)?;
302        let config = self.config.ok_or(BuildError::MissingConfig)?;
303        let sink_fn = self.sink.ok_or(BuildError::MissingSink)?;
304        let sink = sink_fn(&mut db, &config);
305
306        Ok(CustomBackend {
307            _marker: PhantomData,
308            db,
309            fetcher: self
310                .fetcher
311                .map(Arc::new)
312                .ok_or(BuildError::MissingFetcher)?,
313            current_sink: sink,
314            sinker: Arc::new(sink_fn),
315            config,
316        })
317    }
318}
319
320/// Errors encountered building a `CustomBackend`
321#[derive(Debug, Error)]
322pub enum BuildError {
323    /// Missing database db
324    #[error("Database db is required")]
325    MissingDb,
326    /// Missing fetcher function
327    #[error("Fetcher is required")]
328    MissingFetcher,
329    /// Missing sink function
330    #[error("Sink is required")]
331    MissingSink,
332    /// Missing configuration
333    #[error("Config is required")]
334    MissingConfig,
335}
336
337/// Errors encountered while using the `CustomBackend`
338#[derive(Debug, Error)]
339pub enum CustomBackendError {
340    /// Inner error
341    #[error("Inner error: {0}")]
342    Inner(#[from] BoxDynError),
343}
344
345impl<Args, DB, Fetch, Sink, IdType: Clone, E, Ctx: Default, Config> Backend
346    for CustomBackend<Args, DB, Fetch, Sink, IdType, Config>
347where
348    Fetch: Stream<Item = Result<Option<Task<Args, Ctx, IdType>>, E>> + Send + 'static,
349    E: Into<BoxDynError>,
350{
351    type Args = Args;
352    type IdType = IdType;
353
354    type Context = Ctx;
355
356    type Error = CustomBackendError;
357
358    type Stream = TaskStream<Task<Args, Ctx, IdType>, CustomBackendError>;
359
360    type Beat = BoxStream<'static, Result<(), Self::Error>>;
361
362    type Layer = Identity;
363
364    fn heartbeat(&self, _: &WorkerContext) -> Self::Beat {
365        futures_util::stream::once(async { Ok(()) }).boxed()
366    }
367
368    fn middleware(&self) -> Self::Layer {
369        Identity::new()
370    }
371
372    fn poll(mut self, worker: &WorkerContext) -> Self::Stream {
373        (self.fetcher)(&mut self.db, &self.config, worker)
374            .map(|task| match task {
375                Ok(Some(t)) => Ok(Some(t)),
376                Ok(None) => Ok(None),
377                Err(e) => Err(e.into().into()),
378            })
379            .boxed()
380    }
381}
382
383impl<Args, Ctx, IdType, DB, Fetch, S, Config> Sink<Task<Args, Ctx, IdType>>
384    for CustomBackend<Args, DB, Fetch, S, IdType, Config>
385where
386    S: Sink<Task<Args, Ctx, IdType>>,
387    S::Error: Into<BoxDynError>,
388{
389    type Error = CustomBackendError;
390
391    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
392        self.project()
393            .current_sink
394            .poll_ready_unpin(cx)
395            .map_err(|e| CustomBackendError::Inner(e.into()))
396    }
397
398    fn start_send(self: Pin<&mut Self>, item: Task<Args, Ctx, IdType>) -> Result<(), Self::Error> {
399        self.project()
400            .current_sink
401            .start_send_unpin(item)
402            .map_err(|e| CustomBackendError::Inner(e.into()))
403    }
404
405    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
406        self.project()
407            .current_sink
408            .poll_flush_unpin(cx)
409            .map_err(|e| CustomBackendError::Inner(e.into()))
410    }
411
412    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
413        self.project()
414            .current_sink
415            .poll_close_unpin(cx)
416            .map_err(|e| CustomBackendError::Inner(e.into()))
417    }
418}
419
420#[cfg(test)]
421mod tests {
422    use std::{collections::VecDeque, time::Duration};
423
424    use futures_util::{FutureExt, lock::Mutex, sink, stream};
425
426    use crate::{
427        error::BoxDynError,
428        task::task_id::RandomId,
429        worker::{builder::WorkerBuilder, ext::event_listener::EventListenerExt},
430    };
431
432    use super::*;
433
434    const ITEMS: u32 = 10;
435
436    #[tokio::test]
437    async fn basic_custom_backend() {
438        let memory: Arc<Mutex<VecDeque<Task<u32, (), RandomId>>>> =
439            Arc::new(Mutex::new(VecDeque::new()));
440
441        let mut backend = BackendBuilder::new()
442            .database(memory)
443            .fetcher(|db, _, _| {
444                stream::unfold(db.clone(), |p| async move {
445                    tokio::time::sleep(Duration::from_millis(100)).await; // Debounce
446                    let mut db = p.lock().await;
447                    let item = db.pop_front();
448                    drop(db);
449                    match item {
450                        Some(item) => Some((Ok::<_, CustomBackendError>(Some(item)), p)),
451                        None => Some((Ok::<_, CustomBackendError>(None), p)),
452                    }
453                })
454                .boxed()
455            })
456            .sink(|db, _| {
457                sink::unfold(db.clone(), move |p, item| {
458                    async move {
459                        let mut db = p.lock().await;
460                        db.push_back(item);
461                        drop(db);
462                        Ok::<_, CustomBackendError>(p)
463                    }
464                    .boxed()
465                })
466            })
467            .build()
468            .unwrap();
469
470        for i in 0..ITEMS {
471            backend.send(Task::new(i)).await.unwrap();
472        }
473
474        async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
475            tokio::time::sleep(Duration::from_secs(1)).await;
476            if task == ITEMS - 1 {
477                ctx.stop().unwrap();
478                return Err("Worker stopped!")?;
479            }
480            Ok(())
481        }
482
483        let worker = WorkerBuilder::new("rango-tango")
484            .backend(backend)
485            .on_event(|ctx, ev| {
486                println!("On Event = {ev:?} from {}", ctx.name());
487            })
488            .build(task);
489        worker.run().await.unwrap();
490    }
491}