apalis_core/worker/
mod.rs

1//! Utilities for building and running workers.
2//!
3//! A `Worker` polls tasks from a backend, executes them using
4//! a service, emits lifecycle events, and handles graceful shutdowns. A worker is typically
5//! constructed using a [`WorkerBuilder`](crate::worker::builder).
6//!
7//! # Features
8//! - Pluggable backends for task queues (e.g., in-memory, Redis).
9//! - Middleware support for request processing.
10//! - Stream or future-based worker execution modes.
11//! - Built-in event system for logging or metrics.
12//! - Job tracking and readiness probing.
13//!
14//! # Lifecycle
15//!
16//! ```mermaid
17//! graph TD
18//!     A[Start Worker] --> B[Initialize Context & Heartbeat]
19//!     B --> C[Poll Backend for Tasks]
20//!     C --> D{Task Available?}
21//!     D -- Yes --> E[Execute Task via Service Stack]
22//!     E --> F[Emit Events]
23//!     F --> C
24//!     D -- No --> F
25//!     F --> G{Shutdown Signal?}
26//!     G -- Yes --> H[Graceful Shutdown]
27//!     G -- No --> C
28//! ```
29//! Worker lifecycle is composed of several stages:
30//! - Initialize context and heartbeat
31//! - Poll backend for tasks
32//! - Execute tasks via service stack
33//! - Emit events (Idle, Success, Error, HeartBeat)
34//! - Graceful shutdown on signal or stop
35//!
36//! # Examples
37//!
38//! ## Run as a future
39//! ```rust,no_run
40//! # use apalis_core::{worker::builder::WorkerBuilder, backend::memory::MemoryStorage};
41//! # use apalis_core::error::BoxDynError;
42//! # use apalis_core::backend::TaskSink;
43//!
44//! #[tokio::main]
45//! async fn main() -> Result<(), BoxDynError> {
46//!     let mut storage = MemoryStorage::new();
47//!     for i in 0..5 {
48//!         storage.push(i).await?;
49//!     }
50//!
51//!     async fn handler(task: u32) {
52//!         println!("Processing task: {task}");
53//!     }
54//!
55//!     let worker = WorkerBuilder::new("worker-1")
56//!         .backend(storage)
57//!         .build(handler);
58//!
59//!     worker.run().await?;
60//!     Ok(())
61//! }
62//! ```
63//!
64//! ## Runner as a stream
65//! The `stream` interface yields worker events (e.g., `Success`, `Error`) while running:
66//! ```rust,no_run
67//! # use apalis_core::worker::builder::WorkerBuilder;
68//! # use apalis_core::backend::memory::MemoryStorage;
69//! # use futures_util::StreamExt;
70//! # #[tokio::main]
71//! # async fn main() {
72//! #   let mut storage = MemoryStorage::new();
73//! #   async fn handler(task: u32) {
74//! #        println!("Processing task: {task}");
75//! #    }
76//! #   let worker = WorkerBuilder::new("worker-1")
77//! #        .backend(storage)
78//! #        .build(handler);
79//! let mut stream = worker.stream();
80//! while let Some(evt) = stream.next().await {
81//!     println!("Event: {:?}", evt);
82//! }
83//! # }
84//! ```
85//!
86//! # Test Utilities
87//! The [`test_worker`] module includes utilities for unit tests and validation of worker behavior.
88use crate::backend::Backend;
89use crate::error::{BoxDynError, WorkerError};
90use crate::monitor::shutdown::Shutdown;
91use crate::task::Task;
92use crate::task::attempt::Attempt;
93use crate::task::data::Data;
94use crate::worker::call_all::{CallAllError, CallAllUnordered};
95use crate::worker::context::{Tracked, WorkerContext};
96use crate::worker::event::Event;
97use futures_core::stream::BoxStream;
98use futures_util::{Future, FutureExt, Stream, StreamExt};
99use std::fmt::Debug;
100use std::fmt::{self};
101use std::marker::PhantomData;
102use std::pin::Pin;
103use std::sync::atomic::Ordering;
104use std::task::{Context, Poll};
105use tower_layer::{Layer, Stack};
106use tower_service::Service;
107
108pub mod builder;
109pub mod call_all;
110pub mod context;
111pub mod event;
112pub mod ext;
113mod state;
114pub mod test_worker;
115
116/// Core component responsible for task polling, execution, and lifecycle management.
117///
118/// # Example
119/// Basic example:
120/// ```rust,no_run
121/// # use apalis_core::error::BoxDynError;
122/// # use apalis_core::backend::memory::MemoryStorage;
123/// # use apalis_core::worker::builder::WorkerBuilder;
124/// # use apalis_core::backend::TaskSink;
125///
126/// #[tokio::main]
127/// async fn main() -> Result<(), BoxDynError> {
128///     let mut storage = MemoryStorage::new();
129///     for i in 0..5 {
130///         storage.push(i).await?;
131///     }
132///
133///     async fn handler(task: u32) {
134///         println!("Processing task: {task}");
135///     }
136///
137///     let worker = WorkerBuilder::new("worker-1")
138///         .backend(storage)
139///         .build(handler);
140///
141///     worker.run().await?;
142///     Ok(())
143/// }
144/// ```
145/// See [module level documentation](self) for more details.
146#[must_use = "Workers must be run or streamed to execute tasks"]
147pub struct Worker<Args, Ctx, Backend, Svc, Middleware> {
148    pub(crate) name: String,
149    pub(crate) backend: Backend,
150    pub(crate) service: Svc,
151    pub(crate) middleware: Middleware,
152    pub(crate) task_marker: PhantomData<(Args, Ctx)>,
153    pub(crate) shutdown: Option<Shutdown>,
154    pub(crate) event_handler: Box<dyn Fn(&WorkerContext, &Event) + Send + Sync>,
155}
156
157impl<Args, Ctx, B, Svc, Middleware> fmt::Debug for Worker<Args, Ctx, B, Svc, Middleware>
158where
159    Svc: fmt::Debug,
160    B: fmt::Debug,
161{
162    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163        f.debug_struct("Worker")
164            .field("service", &self.service)
165            .field("backend", &self.backend)
166            .finish()
167    }
168}
169
170impl<Args, Ctx, B, Svc, M> Worker<Args, Ctx, B, Svc, M> {
171    /// Build a worker that is ready for execution
172    pub fn new(name: String, backend: B, service: Svc, layers: M) -> Self {
173        Worker {
174            name,
175            backend,
176            service,
177            middleware: layers,
178            task_marker: PhantomData,
179            shutdown: None,
180            event_handler: Box::new(|_, _| {}),
181        }
182    }
183}
184
185impl<Args, S, B, M> Worker<Args, B::Context, B, S, M>
186where
187    B: Backend<Args>,
188    S: Service<Task<Args, B::Context, B::IdType>> + Send + 'static,
189    B::Stream: Unpin + Send + 'static,
190    B::Beat: Unpin + Send + 'static,
191    Args: Send + 'static,
192    B::Context: Send + 'static,
193    B::Error: Into<BoxDynError> + Send + 'static,
194    M: Layer<ReadinessService<TrackerService<S>>>,
195    B::Layer: Layer<M::Service>,
196    <B::Layer as Layer<M::Service>>::Service:
197        Service<Task<Args, B::Context, B::IdType>> + Send + 'static,
198    <<B::Layer as Layer<M::Service>>::Service as Service<Task<Args, B::Context, B::IdType>>>::Error:
199        Into<BoxDynError> + Send + Sync + 'static,
200    <<B::Layer as Layer<M::Service>>::Service as Service<Task<Args, B::Context, B::IdType>>>::Future:
201        Send,
202    M::Service: Service<Task<Args, B::Context, B::IdType>> + Send + 'static,
203    <<M as Layer<ReadinessService<TrackerService<S>>>>::Service as Service<
204        Task<Args, B::Context, B::IdType>,
205    >>::Future: Send,
206    <<M as Layer<ReadinessService<TrackerService<S>>>>::Service as Service<
207        Task<Args, B::Context, B::IdType>,
208    >>::Error: Into<BoxDynError> + Send + Sync + 'static,
209    B::IdType: Send + 'static,
210{
211    /// Run the worker until completion
212    ///
213    /// # Example
214    /// ```no_run
215    /// # use apalis_core::error::BoxDynError;
216    /// # use apalis_core::backend::memory::MemoryStorage;
217    /// # use apalis_core::backend::TaskSink;
218    /// # use apalis_core::worker::builder::WorkerBuilder;
219    ///
220    /// #[tokio::main]
221    /// async fn main() -> Result<(), BoxDynError> {
222    ///     let mut storage = MemoryStorage::new();
223    ///     for i in 0..5 {
224    ///         storage.push(i).await?;
225    ///     }
226    ///
227    ///     async fn handler(task: u32) {
228    ///         println!("Processing task: {task}");
229    ///     }
230    ///
231    ///     let worker = WorkerBuilder::new("worker-1")
232    ///         .backend(storage)
233    ///         .build(handler);
234    ///
235    ///     worker.run().await?;
236    ///     Ok(())
237    /// }
238    /// ```
239    pub async fn run(self) -> Result<(), WorkerError> {
240        let mut ctx = WorkerContext::new::<<B::Layer as Layer<M::Service>>::Service>(&self.name);
241        self.run_with_ctx(&mut ctx).await
242    }
243
244    /// Run the worker with the given context.
245    ///
246    /// See [`run`](Self::run) for an example.
247    pub async fn run_with_ctx(self, ctx: &mut WorkerContext) -> Result<(), WorkerError> {
248        let mut stream = self.stream_with_ctx(ctx);
249        while let Some(res) = stream.next().await {
250            match res {
251                Ok(_) => continue,
252                Err(WorkerError::GracefulExit) => return Ok(()),
253                Err(e) => return Err(e),
254            }
255        }
256        Ok(())
257    }
258
259    /// Returns a stream that will yield events as they occur within the worker's lifecycle
260    ///
261    /// # Example
262    ///
263    /// ```rust,no_run
264    /// # use apalis_core::error::BoxDynError;
265    /// # use apalis_core::backend::memory::MemoryStorage;
266    /// # use apalis_core::worker::builder::WorkerBuilder;
267    /// # use apalis_core::backend::TaskSink;
268    /// # use futures_util::StreamExt;
269    /// #[tokio::main]
270    /// async fn main() -> Result<(), BoxDynError> {
271    ///     let mut storage = MemoryStorage::new();
272    ///     for i in 0..5 {
273    ///         storage.push(i).await?;
274    ///     }
275    ///     async fn handler(task: u32) {
276    ///         println!("Processing task: {task}");
277    ///     }
278    ///     let worker = WorkerBuilder::new("worker-1")
279    ///         .backend(storage)
280    ///         .build(handler);
281    ///     let mut stream = worker.stream();
282    ///     while let Some(evt) = stream.next().await {
283    ///         println!("Event: {:?}", evt);
284    ///     }
285    ///     Ok(())
286    /// }
287    /// ```
288    pub fn stream(self) -> impl Stream<Item = Result<Event, WorkerError>> + use<Args, S, B, M> {
289        let mut ctx = WorkerContext::new::<<B::Layer as Layer<M::Service>>::Service>(&self.name);
290        self.stream_with_ctx(&mut ctx)
291    }
292
293    /// Returns a stream that will yield events as they occur within the worker's lifecycle when provided
294    /// with a [`WorkerContext`].
295    ///
296    /// See [`stream`](Self::stream) for an example.
297    pub fn stream_with_ctx(
298        self,
299        ctx: &mut WorkerContext,
300    ) -> impl Stream<Item = Result<Event, WorkerError>> + use<Args, S, B, M> {
301        let backend = self.backend;
302        let event_handler = self.event_handler;
303        ctx.wrap_listener(event_handler);
304        let worker = ctx.clone();
305        let inner_layers = backend.middleware();
306        struct ServiceBuilder<L> {
307            layer: L,
308        }
309
310        impl<L> ServiceBuilder<L> {
311            fn layer<T>(self, layer: T) -> ServiceBuilder<Stack<T, L>> {
312                ServiceBuilder {
313                    layer: Stack::new(layer, self.layer),
314                }
315            }
316            fn service<S>(&self, service: S) -> L::Service
317            where
318                L: Layer<S>,
319            {
320                self.layer.layer(service)
321            }
322        }
323        let svc = ServiceBuilder {
324            layer: Data::new(worker.clone()),
325        };
326        let service = svc
327            .layer(inner_layers)
328            .layer(self.middleware)
329            .layer(ReadinessLayer::new(worker.clone()))
330            .layer(TrackerLayer::new(worker.clone()))
331            .service(self.service);
332        let heartbeat = backend.heartbeat(&worker).map(|res| match res {
333            Ok(_) => Ok(Event::HeartBeat),
334            Err(e) => Err(WorkerError::HeartbeatError(e.into())),
335        });
336
337        let stream = backend.poll(&worker);
338
339        let tasks = Self::poll_tasks(service, stream);
340        let mut w = worker.clone();
341        let mut ww = w.clone();
342        let starter: BoxStream<'static, _> = futures_util::stream::once(async move {
343            if !ww.is_running() {
344                ww.start()?;
345            }
346            Ok(None)
347        })
348        .filter_map(|res: Result<Option<Event>, WorkerError>| async move {
349            match res {
350                Ok(_) => None,
351                Err(e) => Some(Err(e)),
352            }
353        })
354        .boxed();
355        let wait_for_exit: BoxStream<'static, _> = futures_util::stream::once(async move {
356            match worker.await {
357                Ok(_) => Err(WorkerError::GracefulExit),
358                Err(e) => Err(e),
359            }
360        })
361        .boxed();
362        let work_stream =
363            futures_util::stream_select!(wait_for_exit, heartbeat, tasks).map(move |res| {
364                if let Ok(e) = &res {
365                    w.emit(e);
366                }
367                res
368            });
369        starter.chain(work_stream)
370    }
371    fn poll_tasks<Svc, Stm, E, Ctx>(
372        service: Svc,
373        stream: Stm,
374    ) -> BoxStream<'static, Result<Event, WorkerError>>
375    where
376        Svc: Service<Task<Args, Ctx, B::IdType>> + Send + 'static,
377        Stm: Stream<Item = Result<Option<Task<Args, Ctx, B::IdType>>, E>> + Send + Unpin + 'static,
378        Args: Send + 'static,
379        Svc::Future: Send,
380        Ctx: Send + 'static,
381        Svc::Error: Into<BoxDynError> + Sync + Send,
382        E: Into<BoxDynError> + Send + 'static,
383    {
384        let stream = CallAllUnordered::new(service, stream).map(|r| match r {
385            Ok(Some(_)) => Ok(Event::Success),
386            Ok(None) => Ok(Event::Idle),
387            Err(CallAllError::ServiceError(err)) => Ok(Event::Error(err.into().into())),
388            Err(CallAllError::StreamError(err)) => Err(WorkerError::StreamError(err)),
389        });
390        stream.boxed()
391    }
392}
393
394#[derive(Debug, Clone)]
395struct TrackerLayer {
396    ctx: WorkerContext,
397}
398
399impl TrackerLayer {
400    fn new(ctx: WorkerContext) -> Self {
401        Self { ctx }
402    }
403}
404
405impl<S> Layer<S> for TrackerLayer {
406    type Service = TrackerService<S>;
407
408    fn layer(&self, service: S) -> Self::Service {
409        TrackerService {
410            ctx: self.ctx.clone(),
411            service,
412        }
413    }
414}
415/// Service that tracks a tasks future allowing graceful shutdowns
416#[derive(Debug, Clone)]
417pub struct TrackerService<S> {
418    ctx: WorkerContext,
419    service: S,
420}
421
422impl<S, Args, Ctx, IdType> Service<Task<Args, Ctx, IdType>> for TrackerService<S>
423where
424    S: Service<Task<Args, Ctx, IdType>>,
425{
426    type Response = S::Response;
427    type Error = S::Error;
428    type Future = Tracked<AttemptOnPollFuture<S::Future>>;
429
430    fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
431        self.service.poll_ready(cx)
432    }
433
434    fn call(&mut self, task: Task<Args, Ctx, IdType>) -> Self::Future {
435        let attempt = task.parts.attempt.clone();
436        self.ctx.track(AttemptOnPollFuture {
437            attempt,
438            fut: self.service.call(task),
439            polled: false,
440        })
441    }
442}
443
444/// A future that increments the attempt count on the first poll
445#[pin_project::pin_project]
446#[derive(Debug)]
447pub struct AttemptOnPollFuture<Fut> {
448    attempt: Attempt,
449    #[pin]
450    fut: Fut,
451    polled: bool,
452}
453
454impl<Fut> AttemptOnPollFuture<Fut> {
455    /// Create a new attempt on poll future
456    pub fn new(attempt: Attempt, fut: Fut) -> Self {
457        Self {
458            attempt,
459            fut,
460            polled: false,
461        }
462    }
463}
464
465impl<Fut: Future> Future for AttemptOnPollFuture<Fut> {
466    type Output = Fut::Output;
467
468    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
469        let mut this = self.project();
470        if *this.polled == false {
471            *this.polled = true;
472            this.attempt.increment();
473        }
474        this.fut.poll_unpin(cx)
475    }
476}
477
478/// Injects the [`ReadinessService`] to track when workers are ready to accept new tasks
479#[derive(Debug, Clone)]
480struct ReadinessLayer {
481    ctx: WorkerContext,
482}
483
484impl ReadinessLayer {
485    fn new(ctx: WorkerContext) -> Self {
486        Self { ctx }
487    }
488}
489
490impl<S> Layer<S> for ReadinessLayer {
491    type Service = ReadinessService<S>;
492
493    fn layer(&self, inner: S) -> Self::Service {
494        ReadinessService {
495            inner,
496            ctx: self.ctx.clone(),
497        }
498    }
499}
500/// Service that tracks the readiness of underlying services
501///
502/// Should be the innermost service
503#[derive(Debug, Clone)]
504pub struct ReadinessService<S> {
505    inner: S,
506    ctx: WorkerContext,
507}
508
509impl<S, Request> Service<Request> for ReadinessService<S>
510where
511    S: Service<Request>,
512{
513    type Response = S::Response;
514    type Error = S::Error;
515    type Future = S::Future;
516
517    fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
518        // Delegate poll_ready to the inner service
519        let result = self.inner.poll_ready(cx);
520        // Update the readiness state based on the result
521        match &result {
522            Poll::Ready(Ok(_)) => self.ctx.is_ready.store(true, Ordering::SeqCst),
523            Poll::Pending | Poll::Ready(Err(_)) => self.ctx.is_ready.store(false, Ordering::SeqCst),
524        }
525
526        result
527    }
528
529    fn call(&mut self, req: Request) -> Self::Future {
530        self.inner.call(req)
531    }
532}
533
534#[cfg(test)]
535mod tests {
536    use std::{
537        future::ready,
538        ops::Deref,
539        sync::{Arc, atomic::AtomicUsize},
540        time::Duration,
541    };
542
543    use futures_channel::mpsc::SendError;
544    use futures_core::future::BoxFuture;
545
546    use crate::{
547        backend::{TaskSink, json::JsonStorage, memory::MemoryStorage},
548        task::Parts,
549        worker::{
550            builder::WorkerBuilder,
551            ext::{
552                ack::{Acknowledge, AcknowledgementExt},
553                circuit_breaker::CircuitBreaker,
554                event_listener::EventListenerExt,
555                long_running::LongRunningExt,
556            },
557        },
558    };
559
560    use super::*;
561
562    const ITEMS: u32 = 100;
563
564    #[tokio::test]
565    async fn basic_worker_run() {
566        let mut json_store = JsonStorage::new_temp().unwrap();
567        for i in 0..ITEMS {
568            json_store.push(i).await.unwrap();
569        }
570
571        #[derive(Clone, Debug, Default)]
572        struct Count(Arc<AtomicUsize>);
573
574        impl Deref for Count {
575            type Target = Arc<AtomicUsize>;
576            fn deref(&self) -> &Self::Target {
577                &self.0
578            }
579        }
580
581        async fn task(
582            task: u32,
583            count: Data<Count>,
584            ctx: WorkerContext,
585        ) -> Result<(), BoxDynError> {
586            tokio::time::sleep(Duration::from_secs(1)).await;
587            count.fetch_add(1, Ordering::Relaxed);
588            if task == ITEMS - 1 {
589                ctx.stop().unwrap();
590                return Err("Worker stopped!")?;
591            }
592            Ok(())
593        }
594
595        #[derive(Debug, Clone)]
596        struct MyAcknowledger;
597
598        impl<Ctx: Debug, IdType: Debug> Acknowledge<(), Ctx, IdType> for MyAcknowledger {
599            type Error = SendError;
600            type Future = BoxFuture<'static, Result<(), SendError>>;
601            fn ack(
602                &mut self,
603                res: &Result<(), BoxDynError>,
604                parts: &Parts<Ctx, IdType>,
605            ) -> Self::Future {
606                println!("{res:?}, {parts:?}");
607                // Call webhook with the result and parts?
608                ready(Ok(())).boxed()
609            }
610        }
611
612        let worker = WorkerBuilder::new("rango-tango")
613            .backend(json_store)
614            .data(Count::default())
615            .break_circuit()
616            .long_running()
617            .ack_with(MyAcknowledger)
618            .on_event(|ctx, ev| {
619                println!("On Event = {:?} from {}", ev, ctx.name());
620            })
621            .build(task);
622        worker.run().await.unwrap();
623    }
624
625    #[tokio::test]
626    async fn basic_worker_stream() {
627        let mut in_memory = MemoryStorage::new();
628
629        for i in 0..ITEMS {
630            in_memory.push(i).await.unwrap();
631        }
632
633        #[derive(Clone, Debug, Default)]
634        struct Count(Arc<AtomicUsize>);
635
636        impl Deref for Count {
637            type Target = Arc<AtomicUsize>;
638            fn deref(&self) -> &Self::Target {
639                &self.0
640            }
641        }
642
643        async fn task(task: u32, count: Data<Count>, worker: WorkerContext) {
644            tokio::time::sleep(Duration::from_secs(1)).await;
645            count.fetch_add(1, Ordering::Relaxed);
646            if task == ITEMS - 1 {
647                worker.stop().unwrap();
648            }
649        }
650        let worker = WorkerBuilder::new("rango-tango")
651            .backend(in_memory)
652            .data(Count::default())
653            .break_circuit()
654            .long_running()
655            .on_event(|ctx, ev| {
656                println!("CTX {:?}, On Event = {:?}", ctx.name(), ev);
657            })
658            .build(task);
659        let mut event_stream = worker.stream();
660        while let Some(Ok(ev)) = event_stream.next().await {
661            println!("On Event = {:?}", ev);
662        }
663    }
664}