apalis_core/worker/ext/parallelize/
mod.rs

1//! Worker extension for parallel execution and spawning child tasks.
2//!
3//! This module provides functionality to parallelize task execution within a worker
4//! and spawn child tasks. It includes the [`ParallelizeLayer`] middleware for parallel execution
5//! and the [`ParallelizeService`] service.
6//!
7//! # Example
8//! ```rust,no_run
9//! # use apalis_core::worker::builder::WorkerBuilder;
10//! # use apalis_core::backend::memory::MemoryStorage;
11//! # use apalis_core::worker::ext::parallelize::ParallelizeExt;
12//! # use apalis_core::worker::ext::event_listener::EventListenerExt;
13//! #[tokio::main]
14//! async fn main() {
15//!     async fn task(task: u32) {
16//!         println!("Processing task: {task}");
17//!     }
18//!     let in_memory = MemoryStorage::new();
19//!     let worker = WorkerBuilder::new("rango-tango")
20//!         .backend(in_memory)
21//!         .parallelize(tokio::spawn)
22//!         .on_event(|ctx, ev| {
23//!             println!("CTX {:?}, On Event = {:?}", ctx.name(), ev);
24//!         })
25//!         .build(task);
26//!     worker.run().await.unwrap();
27//! }
28//! ```
29use std::future::ready;
30
31use futures_core::future::BoxFuture;
32use futures_util::FutureExt;
33use futures_util::TryFutureExt;
34use tower_layer::{Layer, Stack};
35use tower_service::Service;
36
37use crate::{backend::Backend, error::BoxDynError, task::Task, worker::builder::WorkerBuilder};
38
39/// Worker extension for parallel execution
40pub trait ParallelizeExt<Args, Ctx, Source, Middleware, Executor>: Sized {
41    /// Register the executor for parallel task execution.
42    fn parallelize(
43        self,
44        f: Executor,
45    ) -> WorkerBuilder<Args, Ctx, Source, Stack<ParallelizeLayer<Executor>, Middleware>>;
46}
47
48/// Middleware for emitting events
49#[derive(Debug, Clone, Default)]
50pub struct ParallelizeLayer<Executor> {
51    executor: Executor,
52}
53
54impl<Executor> ParallelizeLayer<Executor> {
55    /// Create a new event listener layer
56    pub fn new(executor: Executor) -> Self {
57        Self { executor: executor }
58    }
59}
60
61impl<S, Executor: Clone> Layer<S> for ParallelizeLayer<Executor> {
62    type Service = ParallelizeService<S, Executor>;
63
64    fn layer(&self, service: S) -> Self::Service {
65        ParallelizeService {
66            service,
67            executor: self.executor.clone(),
68        }
69    }
70}
71
72/// Service for emitting events
73#[derive(Debug, Clone)]
74pub struct ParallelizeService<S, Executor> {
75    service: S,
76    executor: Executor,
77}
78
79impl<S, Args, Ctx, IdType, Fut, T, Executor, ExecErr> Service<Task<Args, Ctx, IdType>>
80    for ParallelizeService<S, Executor>
81where
82    S: Service<Task<Args, Ctx, IdType>, Future = Fut>,
83    Executor: Fn(Fut) -> T + Send + 'static,
84    Fut: Future<Output = Result<S::Response, S::Error>> + Send + 'static,
85    T: Future<Output = Result<Result<S::Response, S::Error>, ExecErr>> + Send + 'static,
86    S::Error: Into<BoxDynError> + Send + 'static,
87    ExecErr: Into<BoxDynError>,
88    S::Response: Send + 'static,
89{
90    type Response = S::Response;
91    type Error = BoxDynError;
92    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
93
94    fn poll_ready(
95        &mut self,
96        cx: &mut std::task::Context<'_>,
97    ) -> std::task::Poll<Result<(), Self::Error>> {
98        self.service.poll_ready(cx).map_err(|e| e.into())
99    }
100
101    fn call(&mut self, request: Task<Args, Ctx, IdType>) -> Self::Future {
102        (self.executor)(self.service.call(request))
103            .map_err(|e| e.into())
104            .and_then(|s| ready(s.map_err(|e| e.into())))
105            .boxed()
106    }
107}
108
109impl<Args, P, M, Ctx, Executor> ParallelizeExt<Args, Ctx, P, M, Executor>
110    for WorkerBuilder<Args, Ctx, P, M>
111where
112    P: Backend<Args = Args, Context = Ctx>,
113    M: Layer<ParallelizeLayer<Executor>>,
114{
115    fn parallelize(
116        self,
117        f: Executor,
118    ) -> WorkerBuilder<Args, Ctx, P, Stack<ParallelizeLayer<Executor>, M>> {
119        self.layer(ParallelizeLayer::new(f))
120    }
121}