apalis_core/worker/ext/ack/
mod.rs

1//! Traits and utilities for acknowledging task completion
2//!
3//! The [`Acknowledge`] trait and related types are responsible for adding custom
4//! acknowledgment logic to workers. You can use [`AcknowledgeLayer`] to wrap
5//! a worker service and invoke your acknowledgment handler after each task execution.
6//!
7//! # Example
8//!
9//! ```rust
10//! # use apalis_core::worker::{builder::WorkerBuilder, ext::ack::{Acknowledge, AcknowledgeLayer}};
11//! # use apalis_core::backend::memory::MemoryStorage;
12//! # use apalis_core::worker::context::WorkerContext;
13//! # use apalis_core::task::Parts;
14//! # use apalis_core::error::BoxDynError;
15//! # use futures_util::{future::{ready, BoxFuture}, FutureExt};
16//! # use std::fmt::Debug;
17//! # use tokio::sync::mpsc::error::SendError;
18//! # use apalis_core::worker::ext::ack::AcknowledgementExt;
19//! # use apalis_core::backend::TaskSink;
20//! # use crate::apalis_core::worker::ext::event_listener::EventListenerExt;
21//!
22//! #[tokio::main]
23//! async fn main() {
24//!     let mut in_memory = MemoryStorage::new();
25//!     in_memory.push(42).await.unwrap();
26//!
27//!     async fn task(
28//!         task: u32,
29//!         ctx: WorkerContext,
30//!     ) -> Result<(), BoxDynError> {
31//! #       ctx.stop().unwrap();
32//!         Ok(())
33//!     }
34//!
35//!     #[derive(Debug, Clone)]
36//!     struct MyAcknowledger;
37//!
38//!     impl<Ctx: Debug, IdType: Debug> Acknowledge<(), Ctx, IdType> for MyAcknowledger {
39//!         type Error = SendError<()>;
40//!         type Future = BoxFuture<'static, Result<(), Self::Error>>;
41//!         fn ack(
42//!             &mut self,
43//!             res: &Result<(), BoxDynError>,
44//!             parts: &Parts<Ctx, IdType>,
45//!         ) -> Self::Future {
46//!             println!("{res:?}, {parts:?}");
47//!             ready(Ok(())).boxed()
48//!         }
49//!     }
50//!
51//!     let worker = WorkerBuilder::new("rango-tango")
52//!         .backend(in_memory)
53//!         .ack_with(MyAcknowledger)
54//!         .on_event(|ctx, ev| {
55//!             println!("On Event = {:?}", ev);
56//!         })
57//!         .build(task);
58//!     worker.run().await.unwrap();
59//! }
60//! ```
61use futures_util::future::BoxFuture;
62use futures_util::FutureExt;
63use std::{future::Future, task::Poll};
64use tower_layer::{Layer, Stack};
65use tower_service::Service;
66
67use crate::{
68    backend::Backend,
69    error::BoxDynError,
70    task::{Parts, Task},
71    worker::{builder::WorkerBuilder, context::WorkerContext},
72};
73
74/// Extension trait for adding acknowledgment handling to workers
75///
76/// See [module level documentation](self) for more details.
77pub trait AcknowledgementExt<Args, Ctx, Source, Middleware, Ack, Res>: Sized
78where
79    Source: Backend<Args>,
80    Ack: Acknowledge<Res, Ctx, Source::IdType>,
81{
82    /// Add an acknowledgment handler to the worker
83    fn ack_with(
84        self,
85        ack: Ack,
86    ) -> WorkerBuilder<Args, Ctx, Source, Stack<AcknowledgeLayer<Ack>, Middleware>>;
87}
88
89/// Acknowledge the result of a task processing
90///
91/// See [module level documentation](self) for more details.
92pub trait Acknowledge<Res, Ctx, IdType> {
93    /// The error type returned by the acknowledgment process
94    type Error;
95    /// The future returned by the `ack` method
96    type Future: Future<Output = Result<(), Self::Error>>;
97    /// Acknowledge the result of a task processing
98    fn ack(&mut self, res: &Result<Res, BoxDynError>, ctx: &Parts<Ctx, IdType>) -> Self::Future;
99}
100
101impl<Res, Ctx, F, Fut, IdType, E> Acknowledge<Res, Ctx, IdType> for F
102where
103    F: FnMut(&Result<Res, BoxDynError>, &Parts<Ctx, IdType>) -> Fut,
104    Fut: Future<Output = Result<(), E>>,
105{
106    type Error = E;
107    type Future = Fut;
108
109    fn ack(&mut self, res: &Result<Res, BoxDynError>, ctx: &Parts<Ctx, IdType>) -> Self::Future {
110        (self)(res, ctx)
111    }
112}
113
114/// Layer that adds acknowledgment functionality to services
115///
116/// See [module level documentation](self) for more details.
117#[derive(Debug, Clone)]
118pub struct AcknowledgeLayer<A> {
119    acknowledger: A,
120}
121
122impl<A> AcknowledgeLayer<A> {
123    /// Create a new acknowledgment layer
124    pub fn new(acknowledger: A) -> Self {
125        Self { acknowledger }
126    }
127}
128
129impl<S, A> Layer<S> for AcknowledgeLayer<A>
130where
131    A: Clone,
132{
133    type Service = AcknowledgeService<S, A>;
134
135    fn layer(&self, inner: S) -> Self::Service {
136        AcknowledgeService {
137            inner,
138            acknowledger: self.acknowledger.clone(),
139        }
140    }
141}
142
143/// Service that wraps another service and acknowledges task completion
144///
145/// See [module level documentation](self) for more details.
146
147#[derive(Debug, Clone)]
148pub struct AcknowledgeService<S, A> {
149    inner: S,
150    acknowledger: A,
151}
152
153impl<S, A, Args, Ctx, Res, IdType> Service<Task<Args, Ctx, IdType>> for AcknowledgeService<S, A>
154where
155    S: Service<Task<Args, Ctx, IdType>, Response = Res>,
156    A: Acknowledge<Res, Ctx, IdType> + Clone + Send + 'static,
157    S::Error: Into<BoxDynError>,
158    A::Error: std::error::Error + Send + Sync + 'static,
159    S::Future: Send + 'static,
160    A::Future: Send + 'static,
161    Ctx: Clone + Sync + 'static + Send,
162    Res: Send,
163    IdType: Send + Clone + 'static,
164{
165    type Response = Res;
166    type Error = BoxDynError;
167    type Future = BoxFuture<'static, Result<Res, BoxDynError>>;
168
169    fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
170        self.inner.poll_ready(cx).map_err(|e| e.into())
171    }
172
173    fn call(&mut self, task: Task<Args, Ctx, IdType>) -> Self::Future {
174        let parts = task.parts.clone();
175        let worker: WorkerContext = task.parts.data.get().cloned().unwrap();
176        let future = self.inner.call(task);
177        let mut acknowledger = self.acknowledger.clone();
178        Box::pin(async move {
179            let res = future.await.map_err(|e| e.into());
180            worker.track(acknowledger.ack(&res, &parts).boxed()).await?; // Ensure ack is gracefully shutdown
181            res
182        })
183    }
184}
185
186impl<Args, B, M, Ctx, Ack, Res> AcknowledgementExt<Args, Ctx, B, M, Ack, Res>
187    for WorkerBuilder<Args, Ctx, B, M>
188where
189    M: Layer<AcknowledgeLayer<Ack>>,
190    Ack: Acknowledge<Res, Ctx, B::IdType>,
191    B: Backend<Args>,
192{
193    fn ack_with(self, ack: Ack) -> WorkerBuilder<Args, Ctx, B, Stack<AcknowledgeLayer<Ack>, M>> {
194        let this = self.layer(AcknowledgeLayer::new(ack));
195        WorkerBuilder {
196            name: this.name,
197            request: this.request,
198            layer: this.layer,
199            source: this.source,
200            shutdown: this.shutdown,
201            event_handler: this.event_handler,
202        }
203    }
204}