apalis_core/worker/ext/parallelize/
mod.rs1use std::future::ready;
30
31use futures_core::future::BoxFuture;
32use futures_util::FutureExt;
33use futures_util::TryFutureExt;
34use tower_layer::{Layer, Stack};
35use tower_service::Service;
36
37use crate::{backend::Backend, error::BoxDynError, task::Task, worker::builder::WorkerBuilder};
38
39pub trait ParallelizeExt<Args, Ctx, Source, Middleware, Executor>: Sized {
41 fn parallelize(
43 self,
44 f: Executor,
45 ) -> WorkerBuilder<Args, Ctx, Source, Stack<ParallelizeLayer<Executor>, Middleware>>;
46}
47
48#[derive(Debug, Clone, Default)]
50pub struct ParallelizeLayer<Executor> {
51 executor: Executor,
52}
53
54impl<Executor> ParallelizeLayer<Executor> {
55 pub fn new(executor: Executor) -> Self {
57 Self { executor: executor }
58 }
59}
60
61impl<S, Executor: Clone> Layer<S> for ParallelizeLayer<Executor> {
62 type Service = ParallelizeService<S, Executor>;
63
64 fn layer(&self, service: S) -> Self::Service {
65 ParallelizeService {
66 service,
67 executor: self.executor.clone(),
68 }
69 }
70}
71
72#[derive(Debug, Clone)]
74pub struct ParallelizeService<S, Executor> {
75 service: S,
76 executor: Executor,
77}
78
79impl<S, Args, Ctx, IdType, Fut, T, Executor, ExecErr> Service<Task<Args, Ctx, IdType>>
80 for ParallelizeService<S, Executor>
81where
82 S: Service<Task<Args, Ctx, IdType>, Future = Fut>,
83 Executor: Fn(Fut) -> T + Send + 'static,
84 Fut: Future<Output = Result<S::Response, S::Error>> + Send + 'static,
85 T: Future<Output = Result<Result<S::Response, S::Error>, ExecErr>> + Send + 'static,
86 S::Error: Into<BoxDynError> + Send + 'static,
87 ExecErr: Into<BoxDynError>,
88 S::Response: Send + 'static,
89{
90 type Response = S::Response;
91 type Error = BoxDynError;
92 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
93
94 fn poll_ready(
95 &mut self,
96 cx: &mut std::task::Context<'_>,
97 ) -> std::task::Poll<Result<(), Self::Error>> {
98 self.service.poll_ready(cx).map_err(|e| e.into())
99 }
100
101 fn call(&mut self, request: Task<Args, Ctx, IdType>) -> Self::Future {
102 (self.executor)(self.service.call(request))
103 .map_err(|e| e.into())
104 .and_then(|s| ready(s.map_err(|e| e.into())))
105 .boxed()
106 }
107}
108
109impl<Args, P, M, Ctx, Executor> ParallelizeExt<Args, Ctx, P, M, Executor>
110 for WorkerBuilder<Args, Ctx, P, M>
111where
112 P: Backend<Args = Args, Context = Ctx>,
113 M: Layer<ParallelizeLayer<Executor>>,
114{
115 fn parallelize(
116 self,
117 f: Executor,
118 ) -> WorkerBuilder<Args, Ctx, P, Stack<ParallelizeLayer<Executor>, M>> {
119 self.layer(ParallelizeLayer::new(f))
120 }
121}