apalis_core/worker/ext/ack/
mod.rs1use futures_util::future::BoxFuture;
62use futures_util::FutureExt;
63use std::{future::Future, task::Poll};
64use tower_layer::{Layer, Stack};
65use tower_service::Service;
66
67use crate::{
68 backend::Backend,
69 error::BoxDynError,
70 task::{Parts, Task},
71 worker::{builder::WorkerBuilder, context::WorkerContext},
72};
73
74pub trait AcknowledgementExt<Args, Ctx, Source, Middleware, Ack, Res>: Sized
78where
79 Source: Backend<Args>,
80 Ack: Acknowledge<Res, Ctx, Source::IdType>,
81{
82 fn ack_with(
84 self,
85 ack: Ack,
86 ) -> WorkerBuilder<Args, Ctx, Source, Stack<AcknowledgeLayer<Ack>, Middleware>>;
87}
88
89pub trait Acknowledge<Res, Ctx, IdType> {
93 type Error;
95 type Future: Future<Output = Result<(), Self::Error>>;
97 fn ack(&mut self, res: &Result<Res, BoxDynError>, ctx: &Parts<Ctx, IdType>) -> Self::Future;
99}
100
101impl<Res, Ctx, F, Fut, IdType, E> Acknowledge<Res, Ctx, IdType> for F
102where
103 F: FnMut(&Result<Res, BoxDynError>, &Parts<Ctx, IdType>) -> Fut,
104 Fut: Future<Output = Result<(), E>>,
105{
106 type Error = E;
107 type Future = Fut;
108
109 fn ack(&mut self, res: &Result<Res, BoxDynError>, ctx: &Parts<Ctx, IdType>) -> Self::Future {
110 (self)(res, ctx)
111 }
112}
113
114#[derive(Debug, Clone)]
118pub struct AcknowledgeLayer<A> {
119 acknowledger: A,
120}
121
122impl<A> AcknowledgeLayer<A> {
123 pub fn new(acknowledger: A) -> Self {
125 Self { acknowledger }
126 }
127}
128
129impl<S, A> Layer<S> for AcknowledgeLayer<A>
130where
131 A: Clone,
132{
133 type Service = AcknowledgeService<S, A>;
134
135 fn layer(&self, inner: S) -> Self::Service {
136 AcknowledgeService {
137 inner,
138 acknowledger: self.acknowledger.clone(),
139 }
140 }
141}
142
143#[derive(Debug, Clone)]
148pub struct AcknowledgeService<S, A> {
149 inner: S,
150 acknowledger: A,
151}
152
153impl<S, A, Args, Ctx, Res, IdType> Service<Task<Args, Ctx, IdType>> for AcknowledgeService<S, A>
154where
155 S: Service<Task<Args, Ctx, IdType>, Response = Res>,
156 A: Acknowledge<Res, Ctx, IdType> + Clone + Send + 'static,
157 S::Error: Into<BoxDynError>,
158 A::Error: std::error::Error + Send + Sync + 'static,
159 S::Future: Send + 'static,
160 A::Future: Send + 'static,
161 Ctx: Clone + Sync + 'static + Send,
162 Res: Send,
163 IdType: Send + Clone + 'static,
164{
165 type Response = Res;
166 type Error = BoxDynError;
167 type Future = BoxFuture<'static, Result<Res, BoxDynError>>;
168
169 fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
170 self.inner.poll_ready(cx).map_err(|e| e.into())
171 }
172
173 fn call(&mut self, task: Task<Args, Ctx, IdType>) -> Self::Future {
174 let parts = task.parts.clone();
175 let worker: WorkerContext = task.parts.data.get().cloned().unwrap();
176 let future = self.inner.call(task);
177 let mut acknowledger = self.acknowledger.clone();
178 Box::pin(async move {
179 let res = future.await.map_err(|e| e.into());
180 worker.track(acknowledger.ack(&res, &parts).boxed()).await?; res
182 })
183 }
184}
185
186impl<Args, B, M, Ctx, Ack, Res> AcknowledgementExt<Args, Ctx, B, M, Ack, Res>
187 for WorkerBuilder<Args, Ctx, B, M>
188where
189 M: Layer<AcknowledgeLayer<Ack>>,
190 Ack: Acknowledge<Res, Ctx, B::IdType>,
191 B: Backend<Args>,
192{
193 fn ack_with(self, ack: Ack) -> WorkerBuilder<Args, Ctx, B, Stack<AcknowledgeLayer<Ack>, M>> {
194 let this = self.layer(AcknowledgeLayer::new(ack));
195 WorkerBuilder {
196 name: this.name,
197 request: this.request,
198 layer: this.layer,
199 source: this.source,
200 shutdown: this.shutdown,
201 event_handler: this.event_handler,
202 }
203 }
204}