apalis_core/backend/
custom.rs

1//! # Custom Backend
2//!
3//! A highly customizable backend for task processing that allows integration with any persistence engine by providing custom fetcher and sink functions.
4//!
5//! ## Overview
6//!
7//! The [`CustomBackend`] struct enables you to define how tasks are fetched from and persisted to
8//! your storage engine.
9//!
10//! You can use the [`BackendBuilder`] to construct a [`CustomBackend`] by
11//! providing the required database, fetcher, sink, and optional configuration and codec.
12//!
13//! ## Usage
14//!
15//! Use [`BackendBuilder`] to configure and build your custom backend:
16//!
17//! ## Example: CustomBackend with Worker
18//!
19//! ```rust
20//! # use std::collections::VecDeque;
21//! # use std::sync::Arc;
22//! # use futures_util::{lock::Mutex, sink, stream, sink::SinkExt};
23//! # use apalis_core::backend::custom::{BackendBuilder, CustomBackend};
24//! # use apalis_core::task::Task;
25//! # use apalis_core::worker::builder::WorkerBuilder;
26//! # use apalis_core::worker::context::WorkerContext;
27//! # use apalis_core::error::BoxDynError;
28//! # use std::time::Duration;
29//! # use futures_util::StreamExt;
30//! # use futures_util::FutureExt;
31//! # use apalis_core::backend::TaskSink;
32//! # use apalis_core::task::task_id::RandomId;
33//! #[tokio::main]
34//! async fn main() {
35//!     // Create a memory-backed VecDeque
36//!     let memory = Arc::new(Mutex::new(VecDeque::<Task<u32, (), RandomId>>::new()));
37//!
38//!     // Build the custom backend
39//!     let mut backend = BackendBuilder::new()
40//!         .database(memory)
41//!         .fetcher(|memory, _, _| {
42//!             stream::unfold(memory.clone(), |p| async move {
43//!                 let mut memory = p.lock().await;
44//!                 let item = memory.pop_front();
45//!                 drop(memory);
46//!                 match item {
47//!                     Some(item) => Some((Ok::<_, BoxDynError>(Some(item)), p)),
48//!                     None => Some((Ok::<_, BoxDynError>(None), p)),
49//!                 }
50//!             })
51//!             .boxed()
52//!         })
53//!         .sink(|memory, _| {
54//!             sink::unfold(memory.clone(), move |p, item| {
55//!                 async move {
56//!                     let mut memory = p.lock().await;
57//!                     memory.push_back(item);
58//!                     drop(memory);
59//!                     Ok::<_, BoxDynError>(p)
60//!                 }
61//!                 .boxed()
62//!             })
63//!         })
64//!         .build()
65//!         .unwrap();
66//!
67//!     // Add a task to the backend
68//!     backend.send(Task::new(42)).await.unwrap();
69//!
70//!     // Define the task handler
71//!     async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
72//!         tokio::time::sleep(Duration::from_secs(1)).await;
73//! #       ctx.stop().unwrap();
74//!         Ok(())
75//!     }
76//!
77//!     // Build and run the worker
78//!     let worker = WorkerBuilder::new("custom-worker")
79//!         .backend(backend)
80//!         .build(task);
81//!     worker.run().await.unwrap();
82//! }
83//! ```
84//!
85//! ## Features
86//!
87//! - **Custom Fetcher**: Define how jobs are fetched from your storage.
88//! - **Custom Sink**: Define how jobs are persisted to your storage.
89//! - **Configurable**: Pass custom configuration to your backend.
90//!
91use futures_core::stream::BoxStream;
92use futures_sink::Sink;
93use futures_util::SinkExt;
94use futures_util::{Stream, StreamExt};
95use std::pin::Pin;
96use std::sync::Arc;
97use std::task::{Context, Poll};
98use std::{fmt, marker::PhantomData};
99use thiserror::Error;
100use tower_layer::Identity;
101
102use crate::backend::TaskStream;
103use crate::error::BoxDynError;
104use crate::features_table;
105use crate::{backend::Backend, task::Task, worker::context::WorkerContext};
106
107type Fetcher<DB, Config, Fetch> =
108    Arc<Box<dyn Fn(&mut DB, &Config, &WorkerContext) -> Fetch + Send + Sync>>;
109
110type Sinker<DB, Config, Sink> = Arc<Box<dyn Fn(&mut DB, &Config) -> Sink + Send + Sync>>;
111
112/// A highly customizable backend for integration with any persistence engine
113///
114/// This backend allows you to define how tasks are fetched from and persisted to your storage,
115/// meaning you can use it to integrate with existing systems.
116///
117/// # Example
118/// ```rust,ignore
119/// let backend = BackendBuilder::new()
120///     .database(my_db)
121///     .fetcher(my_fetcher_fn)
122///     .sink(my_sink_fn)
123///     .build()
124///     .unwrap();
125/// ```
126#[doc = features_table! {
127    setup = "{ unreachable!() }",
128    TaskSink => supported("Ability to push new tasks", false),
129    Serialization => supported("Serialization support for arguments", false),
130    FetchById => not_supported("Allow fetching a task by its ID"),
131    RegisterWorker => not_implemented("Allow registering a worker with the backend"),
132    PipeExt => limited("Allow other backends to pipe to this backend", false), // Would require Clone,
133    MakeShared => not_implemented("Share the same [`CustomBackend`] across multiple workers", false),
134    Workflow => not_implemented("Flexible enough to support workflows"),
135    WaitForCompletion => not_implemented("Wait for tasks to complete without blocking"), // Would require Clone
136    ResumeById => not_supported("Resume a task by its ID"),
137    ResumeAbandoned => not_supported("Resume abandoned tasks"),
138    ListWorkers => not_implemented("List all workers registered with the backend"),
139    ListTasks => not_implemented("List all tasks in the backend"),
140}]
141#[pin_project::pin_project]
142#[must_use = "Custom backends must be polled or used as a sink"]
143pub struct CustomBackend<Args, DB, Fetch, Sink, IdType, Config = ()> {
144    _marker: PhantomData<(Args, IdType)>,
145    db: DB,
146    fetcher: Fetcher<DB, Config, Fetch>,
147    sinker: Sinker<DB, Config, Sink>,
148    #[pin]
149    current_sink: Sink,
150    config: Config,
151}
152
153impl<Args, DB, Fetch, Sink, IdType, Config> Clone
154    for CustomBackend<Args, DB, Fetch, Sink, IdType, Config>
155where
156    DB: Clone,
157    Config: Clone,
158{
159    fn clone(&self) -> Self {
160        let mut db = self.db.clone();
161        let current_sink = (self.sinker)(&mut db, &self.config);
162        Self {
163            _marker: PhantomData,
164            db,
165            fetcher: Arc::clone(&self.fetcher),
166            sinker: Arc::clone(&self.sinker),
167            current_sink,
168            config: self.config.clone(),
169        }
170    }
171}
172
173impl<Args, DB, Fetch, Sink, IdType, Config> fmt::Debug
174    for CustomBackend<Args, DB, Fetch, Sink, IdType, Config>
175where
176    DB: fmt::Debug,
177    Config: fmt::Debug,
178{
179    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
180        f.debug_struct("CustomBackend")
181            .field(
182                "_marker",
183                &format_args!(
184                    "PhantomData<({}, {})>",
185                    std::any::type_name::<Args>(),
186                    std::any::type_name::<IdType>()
187                ),
188            )
189            .field("db", &self.db)
190            .field("fetcher", &"Fn(&mut DB, &Config, &WorkerContext) -> Fetch")
191            .field("sink", &"Fn(&mut DB, &Config) -> Sink")
192            .field("config", &self.config)
193            .finish()
194    }
195}
196
197type FetcherBuilder<DB, Config, Fetch> =
198    Box<dyn Fn(&mut DB, &Config, &WorkerContext) -> Fetch + Send + Sync + 'static>;
199
200type SinkerBuilder<DB, Config, Sink> =
201    Box<dyn Fn(&mut DB, &Config) -> Sink + Send + Sync + 'static>;
202
203/// Builder for [`CustomBackend`]
204///
205/// Lets you set the database, fetcher, sink, codec, and config
206pub struct BackendBuilder<Args, DB, Fetch, Sink, IdType, Config = ()> {
207    _marker: PhantomData<(Args, IdType)>,
208    database: Option<DB>,
209    fetcher: Option<FetcherBuilder<DB, Config, Fetch>>,
210    sink: Option<SinkerBuilder<DB, Config, Sink>>,
211    config: Option<Config>,
212}
213
214impl<Args, DB, Fetch, Sink, IdType, Config> fmt::Debug
215    for BackendBuilder<Args, DB, Fetch, Sink, IdType, Config>
216where
217    DB: fmt::Debug,
218    Config: fmt::Debug,
219{
220    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221        f.debug_struct("BackendBuilder")
222            .field(
223                "_marker",
224                &format_args!(
225                    "PhantomData<({}, {})>",
226                    std::any::type_name::<Args>(),
227                    std::any::type_name::<IdType>()
228                ),
229            )
230            .field("database", &self.database)
231            .field("fetcher", &self.fetcher.as_ref().map(|_| "Some(fn)"))
232            .field("sink", &self.sink.as_ref().map(|_| "Some(fn)"))
233            .field("config", &self.config)
234            .finish()
235    }
236}
237
238impl<Args, DB, Fetch, Sink, IdType, Config> Default
239    for BackendBuilder<Args, DB, Fetch, Sink, IdType, Config>
240{
241    fn default() -> Self {
242        Self {
243            _marker: PhantomData,
244            database: None,
245            fetcher: None,
246            sink: None,
247            config: None,
248        }
249    }
250}
251
252impl<Args, DB, Fetch, Sink, IdType> BackendBuilder<Args, DB, Fetch, Sink, IdType, ()> {
253    /// Create a new `BackendBuilder` instance
254    #[must_use]
255    pub fn new() -> Self {
256        Self::new_with_cfg(())
257    }
258
259    /// Create a new `BackendBuilder` instance with custom configuration
260    pub fn new_with_cfg<Config>(
261        config: Config,
262    ) -> BackendBuilder<Args, DB, Fetch, Sink, IdType, Config> {
263        BackendBuilder {
264            config: Some(config),
265            ..Default::default()
266        }
267    }
268}
269
270impl<Args, DB, Fetch, Sink, IdType, Config> BackendBuilder<Args, DB, Fetch, Sink, IdType, Config> {
271    /// The custom backend persistence engine
272    #[must_use]
273    pub fn database(mut self, db: DB) -> Self {
274        self.database = Some(db);
275        self
276    }
277
278    /// The fetcher function to retrieve tasks from the database
279    #[must_use]
280    pub fn fetcher<F: Fn(&mut DB, &Config, &WorkerContext) -> Fetch + Send + Sync + 'static>(
281        mut self,
282        fetcher: F,
283    ) -> Self {
284        self.fetcher = Some(Box::new(fetcher));
285        self
286    }
287
288    /// The sink function to persist tasks to the database
289    #[must_use]
290    pub fn sink<F: Fn(&mut DB, &Config) -> Sink + Send + Sync + 'static>(
291        mut self,
292        sink: F,
293    ) -> Self {
294        self.sink = Some(Box::new(sink));
295        self
296    }
297
298    #[allow(clippy::type_complexity)]
299    /// Build the `CustomBackend` instance
300    pub fn build(self) -> Result<CustomBackend<Args, DB, Fetch, Sink, IdType, Config>, BuildError> {
301        let mut db = self.database.ok_or(BuildError::MissingPool)?;
302        let config = self.config.ok_or(BuildError::MissingConfig)?;
303        let sink_fn = self.sink.ok_or(BuildError::MissingSink)?;
304        let sink = sink_fn(&mut db, &config);
305
306        Ok(CustomBackend {
307            _marker: PhantomData,
308            db,
309            fetcher: self
310                .fetcher
311                .map(Arc::new)
312                .ok_or(BuildError::MissingFetcher)?,
313            current_sink: sink,
314            sinker: Arc::new(sink_fn),
315            config,
316        })
317    }
318}
319
320/// Errors encountered building a `CustomBackend`
321#[derive(Debug, Error)]
322pub enum BuildError {
323    /// Missing database db
324    #[error("Database db is required")]
325    MissingPool,
326    /// Missing fetcher function
327    #[error("Fetcher is required")]
328    MissingFetcher,
329    /// Missing sink function
330    #[error("Sink is required")]
331    MissingSink,
332    /// Missing configuration
333    #[error("Config is required")]
334    MissingConfig,
335}
336
337impl<Args, DB, Fetch, Sink, IdType: Clone, E, Ctx: Default, Config> Backend
338    for CustomBackend<Args, DB, Fetch, Sink, IdType, Config>
339where
340    Fetch: Stream<Item = Result<Option<Task<Args, Ctx, IdType>>, E>> + Send + 'static,
341    E: Into<BoxDynError>,
342{
343    type Args = Args;
344    type IdType = IdType;
345
346    type Context = Ctx;
347
348    type Error = BoxDynError;
349
350    type Stream = TaskStream<Task<Args, Ctx, IdType>, BoxDynError>;
351
352    type Beat = BoxStream<'static, Result<(), Self::Error>>;
353
354    type Layer = Identity;
355
356    fn heartbeat(&self, _: &WorkerContext) -> Self::Beat {
357        futures_util::stream::once(async { Ok(()) }).boxed()
358    }
359
360    fn middleware(&self) -> Self::Layer {
361        Identity::new()
362    }
363
364    fn poll(mut self, worker: &WorkerContext) -> Self::Stream {
365        (self.fetcher)(&mut self.db, &self.config, worker)
366            .map(|task| match task {
367                Ok(Some(t)) => Ok(Some(t)),
368                Ok(None) => Ok(None),
369                Err(e) => Err(e.into()),
370            })
371            .boxed()
372    }
373}
374
375impl<Args, Ctx, IdType, DB, Fetch, S, Config> Sink<Task<Args, Ctx, IdType>>
376    for CustomBackend<Args, DB, Fetch, S, IdType, Config>
377where
378    S: Sink<Task<Args, Ctx, IdType>>,
379{
380    type Error = S::Error;
381
382    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
383        self.project().current_sink.poll_ready_unpin(cx)
384    }
385
386    fn start_send(self: Pin<&mut Self>, item: Task<Args, Ctx, IdType>) -> Result<(), Self::Error> {
387        self.project().current_sink.start_send_unpin(item)
388    }
389
390    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
391        self.project().current_sink.poll_flush_unpin(cx)
392    }
393
394    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
395        self.project().current_sink.poll_close_unpin(cx)
396    }
397}
398
399#[cfg(test)]
400mod tests {
401    use std::{collections::VecDeque, time::Duration};
402
403    use futures_util::{FutureExt, lock::Mutex, sink, stream};
404
405    use crate::{
406        error::BoxDynError,
407        task::task_id::RandomId,
408        worker::{builder::WorkerBuilder, ext::event_listener::EventListenerExt},
409    };
410
411    use super::*;
412
413    const ITEMS: u32 = 10;
414
415    #[tokio::test]
416    async fn basic_custom_backend() {
417        let memory: Arc<Mutex<VecDeque<Task<u32, (), RandomId>>>> =
418            Arc::new(Mutex::new(VecDeque::new()));
419
420        let mut backend = BackendBuilder::new()
421            .database(memory)
422            .fetcher(|db, _, _| {
423                stream::unfold(db.clone(), |p| async move {
424                    tokio::time::sleep(Duration::from_millis(100)).await; // Debounce
425                    let mut db = p.lock().await;
426                    let item = db.pop_front();
427                    drop(db);
428                    match item {
429                        Some(item) => Some((Ok::<_, BoxDynError>(Some(item)), p)),
430                        None => Some((Ok::<_, BoxDynError>(None), p)),
431                    }
432                })
433                .boxed()
434            })
435            .sink(|db, _| {
436                sink::unfold(db.clone(), move |p, item| {
437                    async move {
438                        let mut db = p.lock().await;
439                        db.push_back(item);
440                        drop(db);
441                        Ok::<_, BoxDynError>(p)
442                    }
443                    .boxed()
444                })
445            })
446            .build()
447            .unwrap();
448
449        for i in 0..ITEMS {
450            backend.send(Task::new(i)).await.unwrap();
451        }
452
453        async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
454            tokio::time::sleep(Duration::from_secs(1)).await;
455            if task == ITEMS - 1 {
456                ctx.stop().unwrap();
457                return Err("Worker stopped!")?;
458            }
459            Ok(())
460        }
461
462        let worker = WorkerBuilder::new("rango-tango")
463            .backend(backend)
464            .on_event(|ctx, ev| {
465                println!("On Event = {ev:?} from {}", ctx.name());
466            })
467            .build(task);
468        worker.run().await.unwrap();
469    }
470}