Skip to main content

apalis_core/worker/
mod.rs

1//! Utilities for building and running workers.
2//!
3//! A `Worker` polls tasks from a backend, executes them using
4//! a service, emits lifecycle events, and handles graceful shutdowns. A worker is typically
5//! constructed using a [`WorkerBuilder`](crate::worker::builder).
6//!
7//! # Features
8//! - Pluggable backends for task queues (e.g., in-memory, Redis).
9//! - Middleware support for task processing.
10//! - Stream or future-based worker execution modes.
11//! - Built-in event system for logging or metrics.
12//! - Task tracking and controlled worker readiness.
13//!
14//! # Lifecycle
15//!
16//! ```mermaid
17//! graph TD
18//!     A[Start Worker] --> B[Initialize Context & Heartbeat]
19//!     B --> C[Poll Backend for Tasks]
20//!     C --> D{Task Available?}
21//!     D -- Yes --> E[Execute Task via Service Stack]
22//!     E --> F[Emit Events]
23//!     F --> C
24//!     D -- No --> F
25//!     F --> G{Shutdown Signal?}
26//!     G -- Yes --> H[Graceful Shutdown]
27//!     G -- No --> C
28//! ```
29//! Worker lifecycle is composed of several stages:
30//! - Initialize context and heartbeat
31//! - Poll backend for tasks
32//! - Execute tasks via service stack
33//! - Emit events (Idle, Success, Error, HeartBeat)
34//! - Graceful shutdown on signal or stop
35//!
36//! # Examples
37//!
38//! ## Run as a future
39//! ```rust,no_run
40//! # use apalis_core::{worker::builder::WorkerBuilder, backend::memory::MemoryStorage};
41//! # use apalis_core::error::BoxDynError;
42//! # use apalis_core::backend::TaskSink;
43//!
44//! #[tokio::main]
45//! async fn main() -> Result<(), BoxDynError> {
46//!     let mut storage = MemoryStorage::new();
47//!     for i in 0..5 {
48//!         storage.push(i).await?;
49//!     }
50//!
51//!     async fn handler(task: u32) {
52//!         println!("Processing task: {task}");
53//!     }
54//!
55//!     let worker = WorkerBuilder::new("worker-1")
56//!         .backend(storage)
57//!         .build(handler);
58//!
59//!     worker.run().await?;
60//!     Ok(())
61//! }
62//! ```
63//!
64//! ## Runner as a stream
65//! The `stream` interface yields worker events (e.g., `Success`, `Error`) while running:
66//! ```rust,no_run
67//! # use apalis_core::worker::builder::WorkerBuilder;
68//! # use apalis_core::backend::memory::MemoryStorage;
69//! # use futures_util::StreamExt;
70//! # #[tokio::main]
71//! # async fn main() {
72//! #   let mut storage = MemoryStorage::new();
73//! #   async fn handler(task: u32) {
74//! #        println!("Processing task: {task}");
75//! #    }
76//! #   let worker = WorkerBuilder::new("worker-1")
77//! #        .backend(storage)
78//! #        .build(handler);
79//! let mut stream = worker.stream();
80//! while let Some(evt) = stream.next().await {
81//!     println!("Event: {:?}", evt);
82//! }
83//! # }
84//! ```
85//!
86//! # Test Utilities
87//! The [`test_worker`] module includes utilities for unit tests and validation of worker behavior.
88use crate::backend::Backend;
89use crate::error::{BoxDynError, WorkerError};
90use crate::monitor::shutdown::Shutdown;
91use crate::task::Task;
92use crate::task::attempt::Attempt;
93use crate::task::data::Data;
94use crate::worker::call_all::{CallAllError, CallAllUnordered};
95use crate::worker::context::{Tracked, WorkerContext};
96use crate::worker::event::{Event, RawEventListener};
97use futures_core::stream::BoxStream;
98use futures_util::{Future, FutureExt, Stream, StreamExt, TryFutureExt};
99use std::fmt::Debug;
100use std::fmt::{self};
101use std::marker::PhantomData;
102use std::pin::Pin;
103use std::sync::atomic::Ordering;
104use std::task::{Context, Poll};
105use tower_layer::{Layer, Stack};
106use tower_service::Service;
107
108pub mod builder;
109pub mod call_all;
110pub mod context;
111pub mod event;
112pub mod ext;
113mod state;
114pub mod test_worker;
115
116/// Core component responsible for task polling, execution, and lifecycle management.
117///
118/// # Example
119/// Basic example:
120/// ```rust,no_run
121/// # use apalis_core::error::BoxDynError;
122/// # use apalis_core::backend::memory::MemoryStorage;
123/// # use apalis_core::worker::builder::WorkerBuilder;
124/// # use apalis_core::backend::TaskSink;
125///
126/// #[tokio::main]
127/// async fn main() -> Result<(), BoxDynError> {
128///     let mut storage = MemoryStorage::new();
129///     for i in 0..5 {
130///         storage.push(i).await?;
131///     }
132///
133///     async fn handler(task: u32) {
134///         println!("Processing task: {task}");
135///     }
136///
137///     let worker = WorkerBuilder::new("worker-1")
138///         .backend(storage)
139///         .build(handler);
140///
141///     worker.run().await?;
142///     Ok(())
143/// }
144/// ```
145/// See [module level documentation](self) for more details.
146#[must_use = "Workers must be run or streamed to execute tasks"]
147pub struct Worker<Args, Ctx, Backend, Svc, Middleware> {
148    pub(crate) name: String,
149    pub(crate) backend: Backend,
150    pub(crate) service: Svc,
151    pub(crate) middleware: Middleware,
152    pub(crate) task_marker: PhantomData<(Args, Ctx)>,
153    pub(crate) shutdown: Option<Shutdown>,
154    pub(crate) event_handler: RawEventListener,
155}
156
157impl<Args, Ctx, B, Svc, Middleware> fmt::Debug for Worker<Args, Ctx, B, Svc, Middleware>
158where
159    Svc: fmt::Debug,
160    B: fmt::Debug,
161{
162    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163        f.debug_struct("Worker")
164            .field("service", &self.service)
165            .field("backend", &self.backend)
166            .finish()
167    }
168}
169
170impl<Args, Ctx, B, Svc, M> Worker<Args, Ctx, B, Svc, M> {
171    /// Build a worker that is ready for execution
172    pub fn new(name: String, backend: B, service: Svc, layers: M) -> Self {
173        Self {
174            name,
175            backend,
176            service,
177            middleware: layers,
178            task_marker: PhantomData,
179            shutdown: None,
180            event_handler: Box::new(|_, _| {}),
181        }
182    }
183}
184
185impl<Args, S, B, M> Worker<Args, B::Context, B, S, M>
186where
187    B: Backend<Args = Args>,
188    S: Service<Task<Args, B::Context, B::IdType>> + Send + 'static,
189    B::Stream: Unpin + Send + 'static,
190    B::Beat: Unpin + Send + 'static,
191    Args: Send + 'static,
192    B::Context: Send + 'static,
193    B::Error: Into<BoxDynError> + Send + 'static,
194    B::Layer: Layer<ReadinessService<TrackerService<S>>>,
195    M: Layer<<B::Layer as Layer<ReadinessService<TrackerService<S>>>>::Service>,
196    <M as Layer<<B::Layer as Layer<ReadinessService<TrackerService<S>>>>::Service>>::Service:
197        Service<Task<Args, B::Context, B::IdType>> + Send + 'static,
198    <<M as Layer<<B::Layer as Layer<ReadinessService<TrackerService<S>>>>::Service>>::Service as Service<Task<Args, B::Context, B::IdType>>>::Future: Send,
199        <<M as Layer<<B::Layer as Layer<ReadinessService<TrackerService<S>>>>::Service>>::Service as Service<Task<Args, B::Context, B::IdType>>>::Error: Into<BoxDynError> + Send + Sync + 'static,
200    B::IdType: Send + 'static,
201{
202    /// Run the worker until completion
203    ///
204    /// # Example
205    /// ```no_run
206    /// # use apalis_core::error::BoxDynError;
207    /// # use apalis_core::backend::memory::MemoryStorage;
208    /// # use apalis_core::backend::TaskSink;
209    /// # use apalis_core::worker::builder::WorkerBuilder;
210    ///
211    /// #[tokio::main]
212    /// async fn main() -> Result<(), BoxDynError> {
213    ///     let mut storage = MemoryStorage::new();
214    ///     for i in 0..5 {
215    ///         storage.push(i).await?;
216    ///     }
217    ///
218    ///     async fn handler(task: u32) {
219    ///         println!("Processing task: {task}");
220    ///     }
221    ///
222    ///     let worker = WorkerBuilder::new("worker-1")
223    ///         .backend(storage)
224    ///         .build(handler);
225    ///
226    ///     worker.run().await?;
227    ///     Ok(())
228    /// }
229    /// ```
230    pub async fn run(self) -> Result<(), WorkerError> {
231        let mut ctx = WorkerContext::new::<M::Service>(&self.name);
232        self.run_with_ctx(&mut ctx).await
233    }
234
235    /// Run the worker with the given context.
236    ///
237    /// See [`run`](Self::run) for an example.
238    pub async fn run_with_ctx(self, ctx: &mut WorkerContext) -> Result<(), WorkerError> {
239        let mut stream = self.stream_with_ctx(ctx);
240        while let Some(res) = stream.next().await {
241            match res {
242                Ok(_) => {},
243                Err(WorkerError::GracefulExit) => return Ok(()),
244                Err(e) => return Err(e),
245            }
246        }
247        Ok(())
248    }
249
250    /// Run the worker until a shutdown signal future is complete.
251    pub async fn run_until<Fut, Err>(mut self, signal: Fut) -> Result<(), WorkerError>
252    where
253        Fut: Future<Output = Result<(), Err>> + Send + 'static,
254        B: Send,
255        M: Send,
256        Err: Into<WorkerError> + Send + 'static,
257    {
258        let shutdown = self.shutdown.take().unwrap_or_default();
259        let terminator = shutdown.shutdown_after(signal);
260        let mut ctx = WorkerContext::new::<M::Service>(&self.name);
261        let c = ctx.clone();
262        let worker = self.run_with_ctx(&mut ctx).boxed();
263        futures_util::try_join!(
264            terminator.map_ok(|_| c.stop()).map_err(|e| e.into()),
265            worker
266        )
267        .map(|_| ())
268    }
269
270    /// Run the worker until a shutdown signal future is complete.
271    ///
272    /// *Note*: Using this function requires you to call `ctx.stop()` in the future to completely stop the worker.
273    ///
274    /// This can also be very powerful with pausing and resuming the worker using the context.
275    ///
276    /// # Example
277    ///
278    /// ```rust,no_run
279    ///
280    /// # use apalis_core::{worker::builder::WorkerBuilder, backend::memory::MemoryStorage};
281    /// # use apalis_core::error::BoxDynError;
282    /// # use apalis_core::backend::TaskSink;
283    /// # use std::time::Duration;
284    /// # use tokio::time::sleep;
285    /// # use apalis_core::error::WorkerError;
286    ///
287    /// #[tokio::main]
288    /// async fn main() -> Result<(), BoxDynError> {
289    ///     let mut storage = MemoryStorage::new();
290    ///     for i in 0..5 {
291    ///         storage.push(i).await?;
292    ///     }
293    ///     async fn handler(task: u32) {
294    ///         println!("Processing task: {task}");
295    ///     }
296    ///     let worker = WorkerBuilder::new("worker-1")
297    ///         .backend(storage)
298    ///         .build(handler);
299    ///     worker.run_until_ctx(|ctx| async move {
300    ///         sleep(Duration::from_secs(1)).await;
301    ///         ctx.stop()?;
302    ///         Ok(())
303    ///     }).await?;
304    ///     Ok(())
305    /// }
306    /// ```
307    pub async fn run_until_ctx<F, Fut>(mut self, fut: F) -> Result<(), WorkerError>
308    where
309        F: FnOnce(WorkerContext) -> Fut + Send + 'static,
310        Fut: Future<Output = Result<(), WorkerError>> + Send + 'static,
311        B: Send,
312        M: Send,
313    {
314        let shutdown = self.shutdown.take().unwrap_or_default();
315        let mut ctx = WorkerContext::new::<M::Service>(&self.name);
316        let c = ctx.clone();
317        let terminator = shutdown.shutdown_after(fut(c));
318        let worker = self.run_with_ctx(&mut ctx).boxed();
319        futures_util::try_join!(terminator.map_ok(|_| ()), worker).map(|_| ())
320    }
321
322    /// Returns a stream that will yield events as they occur within the worker's lifecycle
323    ///
324    /// # Example
325    ///
326    /// ```rust,no_run
327    /// # use apalis_core::error::BoxDynError;
328    /// # use apalis_core::backend::memory::MemoryStorage;
329    /// # use apalis_core::worker::builder::WorkerBuilder;
330    /// # use apalis_core::backend::TaskSink;
331    /// # use futures_util::StreamExt;
332    /// #[tokio::main]
333    /// async fn main() -> Result<(), BoxDynError> {
334    ///     let mut storage = MemoryStorage::new();
335    ///     for i in 0..5 {
336    ///         storage.push(i).await?;
337    ///     }
338    ///     async fn handler(task: u32) {
339    ///         println!("Processing task: {task}");
340    ///     }
341    ///     let worker = WorkerBuilder::new("worker-1")
342    ///         .backend(storage)
343    ///         .build(handler);
344    ///     let mut stream = worker.stream();
345    ///     while let Some(evt) = stream.next().await {
346    ///         println!("Event: {:?}", evt);
347    ///     }
348    ///     Ok(())
349    /// }
350    /// ```
351    pub fn stream(self) -> impl Stream<Item = Result<Event, WorkerError>> + use<Args, S, B, M> {
352        let mut ctx = WorkerContext::new::<M::Service>(&self.name);
353        self.stream_with_ctx(&mut ctx)
354    }
355
356    /// Returns a stream that will yield events as they occur within the worker's lifecycle when provided
357    /// with a [`WorkerContext`].
358    ///
359    /// See [`stream`](Self::stream) for an example.
360    pub fn stream_with_ctx(
361        self,
362        ctx: &mut WorkerContext,
363    ) -> impl Stream<Item = Result<Event, WorkerError>> + use<Args, S, B, M> {
364        let backend = self.backend;
365        let event_handler = self.event_handler;
366        ctx.wrap_listener(event_handler);
367        let worker = ctx.clone();
368        let backend_middleware = backend.middleware();
369        struct ServiceBuilder<L> {
370            layer: L,
371        }
372
373        impl<L> ServiceBuilder<L> {
374            fn layer<T>(self, layer: T) -> ServiceBuilder<Stack<T, L>> {
375                ServiceBuilder {
376                    layer: Stack::new(layer, self.layer),
377                }
378            }
379            fn service<S>(&self, service: S) -> L::Service
380            where
381                L: Layer<S>,
382            {
383                self.layer.layer(service)
384            }
385        }
386        let svc = ServiceBuilder {
387            layer: Data::new(worker.clone()),
388        };
389        let service = svc
390            // pass the user defined middleware first
391            .layer(self.middleware)
392            // backend middleware should be the next layer so it can observe all requests released by user middleware
393            .layer(backend_middleware)
394            // when all layers are ready, inform the worker is ready to accept tasks
395            .layer(ReadinessLayer::new(worker.clone()))
396            // Track all tasks to allow graceful shutdowns
397            // we also increment the attempt count on the first poll
398            // this ensures that the attempt count is accurate even if the task fails before completion
399            .layer(TrackerLayer::new(worker.clone()))
400            .service(self.service);
401        let heartbeat = backend.heartbeat(&worker).map(|res| match res {
402            Ok(_) => Ok(Event::HeartBeat),
403            Err(e) => Err(WorkerError::HeartbeatError(e.into())),
404        });
405
406        let stream = backend.poll(&worker);
407
408        let tasks = Self::poll_tasks(service, stream);
409        let mut w = worker.clone();
410        let mut ww = w.clone();
411        let starter: BoxStream<'static, _> = futures_util::stream::once(async move {
412            if !ww.is_running() {
413                ww.start()?;
414            }
415            Ok(None)
416        })
417        .filter_map(|res: Result<Option<Event>, WorkerError>| async move {
418            match res {
419                Ok(_) => None,
420                Err(e) => Some(Err(e)),
421            }
422        })
423        .boxed();
424        let wait_for_exit: BoxStream<'static, _> = futures_util::stream::once(async move {
425            match worker.await {
426                Ok(_) => Err(WorkerError::GracefulExit),
427                Err(e) => Err(e),
428            }
429        })
430        .boxed();
431
432        #[allow(clippy::needless_continue)]
433        let work_stream =
434            futures_util::stream_select!(wait_for_exit, heartbeat, tasks).map(move |res| {
435                if let Ok(e) = &res {
436                    w.emit(e);
437                }
438                res
439            });
440        starter.chain(work_stream)
441    }
442    fn poll_tasks<Svc, Stm, E, Ctx>(
443        service: Svc,
444        stream: Stm,
445    ) -> BoxStream<'static, Result<Event, WorkerError>>
446    where
447        Svc: Service<Task<Args, Ctx, B::IdType>> + Send + 'static,
448        Stm: Stream<Item = Result<Option<Task<Args, Ctx, B::IdType>>, E>> + Send + Unpin + 'static,
449        Args: Send + 'static,
450        Svc::Future: Send,
451        Ctx: Send + 'static,
452        Svc::Error: Into<BoxDynError> + Sync + Send,
453        E: Into<BoxDynError> + Send + 'static,
454    {
455        let stream = CallAllUnordered::new(service, stream).map(|r| match r {
456            Ok(Some(_)) => Ok(Event::Success),
457            Ok(None) => Ok(Event::Idle),
458            Err(CallAllError::ServiceError(err)) => Ok(Event::Error(err.into().into())),
459            Err(CallAllError::StreamError(err)) => Err(WorkerError::StreamError(err)),
460        });
461        stream.boxed()
462    }
463}
464
465#[derive(Debug, Clone)]
466struct TrackerLayer {
467    ctx: WorkerContext,
468}
469
470impl TrackerLayer {
471    fn new(ctx: WorkerContext) -> Self {
472        Self { ctx }
473    }
474}
475
476impl<S> Layer<S> for TrackerLayer {
477    type Service = TrackerService<S>;
478
479    fn layer(&self, service: S) -> Self::Service {
480        TrackerService {
481            ctx: self.ctx.clone(),
482            service,
483        }
484    }
485}
486/// Service that tracks a tasks future allowing graceful shutdowns
487#[derive(Debug, Clone)]
488pub struct TrackerService<S> {
489    ctx: WorkerContext,
490    service: S,
491}
492
493impl<S, Args, Ctx, IdType> Service<Task<Args, Ctx, IdType>> for TrackerService<S>
494where
495    S: Service<Task<Args, Ctx, IdType>>,
496{
497    type Response = S::Response;
498    type Error = S::Error;
499    type Future = Tracked<AttemptOnPollFuture<S::Future>>;
500
501    fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
502        self.service.poll_ready(cx)
503    }
504
505    fn call(&mut self, task: Task<Args, Ctx, IdType>) -> Self::Future {
506        let attempt = task.parts.attempt.clone();
507        self.ctx.track(AttemptOnPollFuture {
508            attempt,
509            fut: self.service.call(task),
510            polled: false,
511        })
512    }
513}
514
515/// A future that increments the attempt count on the first poll
516#[pin_project::pin_project]
517#[derive(Debug)]
518pub struct AttemptOnPollFuture<Fut> {
519    attempt: Attempt,
520    #[pin]
521    fut: Fut,
522    polled: bool,
523}
524
525impl<Fut> AttemptOnPollFuture<Fut> {
526    /// Create a new attempt on poll future
527    pub fn new(attempt: Attempt, fut: Fut) -> Self {
528        Self {
529            attempt,
530            fut,
531            polled: false,
532        }
533    }
534}
535
536impl<Fut: Future> Future for AttemptOnPollFuture<Fut> {
537    type Output = Fut::Output;
538
539    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
540        let mut this = self.project();
541        if !(*this.polled) {
542            *this.polled = true;
543            let _ = this.attempt.increment();
544        }
545        this.fut.poll_unpin(cx)
546    }
547}
548
549/// Injects the [`ReadinessService`] to track when workers are ready to accept new tasks
550#[derive(Debug, Clone)]
551struct ReadinessLayer {
552    ctx: WorkerContext,
553}
554
555impl ReadinessLayer {
556    fn new(ctx: WorkerContext) -> Self {
557        Self { ctx }
558    }
559}
560
561impl<S> Layer<S> for ReadinessLayer {
562    type Service = ReadinessService<S>;
563
564    fn layer(&self, inner: S) -> Self::Service {
565        ReadinessService {
566            inner,
567            ctx: self.ctx.clone(),
568        }
569    }
570}
571/// Service that tracks the readiness of underlying services
572///
573/// Should be the innermost service
574#[derive(Debug, Clone)]
575pub struct ReadinessService<S> {
576    inner: S,
577    ctx: WorkerContext,
578}
579
580impl<S, Request> Service<Request> for ReadinessService<S>
581where
582    S: Service<Request>,
583{
584    type Response = S::Response;
585    type Error = S::Error;
586    type Future = S::Future;
587
588    fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
589        if self.ctx.is_shutting_down() || self.ctx.is_paused() {
590            self.ctx.is_ready.store(false, Ordering::SeqCst);
591            return Poll::Pending;
592        }
593        // Delegate poll_ready to the inner service
594        let result = self.inner.poll_ready(cx);
595        // Update the readiness state based on the result
596        match &result {
597            Poll::Ready(Ok(_)) => self.ctx.is_ready.store(true, Ordering::SeqCst),
598            Poll::Pending | Poll::Ready(Err(_)) => self.ctx.is_ready.store(false, Ordering::SeqCst),
599        }
600
601        result
602    }
603
604    fn call(&mut self, req: Request) -> Self::Future {
605        self.inner.call(req)
606    }
607}
608
609#[cfg(test)]
610mod tests {
611    use std::{
612        future::ready,
613        ops::Deref,
614        sync::{Arc, atomic::AtomicUsize},
615        time::Duration,
616    };
617
618    use futures_channel::mpsc::SendError;
619    use futures_core::future::BoxFuture;
620
621    use crate::{
622        backend::{TaskSink, memory::MemoryStorage},
623        task::Parts,
624        worker::{
625            builder::WorkerBuilder,
626            ext::{
627                ack::{Acknowledge, AcknowledgementExt},
628                circuit_breaker::CircuitBreaker,
629                event_listener::EventListenerExt,
630                long_running::LongRunningExt,
631            },
632        },
633    };
634
635    use super::*;
636
637    const ITEMS: u32 = 100;
638
639    #[tokio::test]
640    async fn basic_worker_run() {
641        let mut in_memory = MemoryStorage::new();
642        for i in 0..ITEMS {
643            in_memory.push(i.into()).await.unwrap();
644        }
645
646        #[derive(Clone, Debug, Default)]
647        struct Count(Arc<AtomicUsize>);
648
649        impl Deref for Count {
650            type Target = Arc<AtomicUsize>;
651            fn deref(&self) -> &Self::Target {
652                &self.0
653            }
654        }
655
656        async fn task(
657            task: u32,
658            count: Data<Count>,
659            ctx: WorkerContext,
660        ) -> Result<(), BoxDynError> {
661            tokio::time::sleep(Duration::from_secs(1)).await;
662            count.fetch_add(1, Ordering::Relaxed);
663            if task == ITEMS - 1 {
664                ctx.stop().unwrap();
665                return Err("Worker stopped!")?;
666            }
667            Ok(())
668        }
669
670        #[derive(Debug, Clone)]
671        struct MyAcknowledger;
672
673        impl<Ctx: Debug, IdType: Debug> Acknowledge<(), Ctx, IdType> for MyAcknowledger {
674            type Error = SendError;
675            type Future = BoxFuture<'static, Result<(), SendError>>;
676            fn ack(
677                &mut self,
678                res: &Result<(), BoxDynError>,
679                parts: &Parts<Ctx, IdType>,
680            ) -> Self::Future {
681                println!("{res:?}, {parts:?}");
682                // Call webhook with the result and parts?
683                ready(Ok(())).boxed()
684            }
685        }
686
687        let worker = WorkerBuilder::new("rango-tango")
688            .backend(in_memory)
689            .data(Count::default())
690            .break_circuit()
691            .long_running()
692            .ack_with(MyAcknowledger)
693            .on_event(|ctx, ev| {
694                println!("On Event = {ev:?} from {}", ctx.name());
695            })
696            .build(task);
697        worker.run().await.unwrap();
698    }
699
700    #[tokio::test]
701    async fn basic_worker_stream() {
702        let mut in_memory = MemoryStorage::new();
703
704        for i in 0..ITEMS {
705            in_memory.push(i).await.unwrap();
706        }
707
708        #[derive(Clone, Debug, Default)]
709        struct Count(Arc<AtomicUsize>);
710
711        impl Deref for Count {
712            type Target = Arc<AtomicUsize>;
713            fn deref(&self) -> &Self::Target {
714                &self.0
715            }
716        }
717
718        async fn task(task: u32, count: Data<Count>, worker: WorkerContext) {
719            tokio::time::sleep(Duration::from_secs(1)).await;
720            count.fetch_add(1, Ordering::Relaxed);
721            if task == ITEMS - 1 {
722                worker.stop().unwrap();
723            }
724        }
725        let worker = WorkerBuilder::new("rango-tango")
726            .backend(in_memory)
727            .data(Count::default())
728            .break_circuit()
729            .long_running()
730            .on_event(|ctx, ev| {
731                println!("CTX {:?}, On Event = {ev:?}", ctx.name());
732            })
733            .build(task);
734        let mut event_stream = worker.stream();
735        while let Some(Ok(ev)) = event_stream.next().await {
736            println!("On Event = {ev:?}");
737        }
738    }
739
740    #[tokio::test]
741    async fn with_shutdown_signal() {
742        let mut in_memory = MemoryStorage::new();
743        for i in 0..ITEMS {
744            in_memory.push(i).await.unwrap();
745        }
746
747        async fn task(_: u32) -> Result<(), BoxDynError> {
748            Ok(())
749        }
750
751        let worker = WorkerBuilder::new("rango-tango")
752            .backend(in_memory)
753            .on_event(|ctx, ev| {
754                println!("On Event = {ev:?} from {}", ctx.name());
755            })
756            .build(task);
757        let signal = async {
758            let ctrl_c = tokio::signal::ctrl_c().map_err(|e| e.into());
759            let timeout = tokio::time::sleep(Duration::from_secs(5))
760                .map(|_| Err::<(), WorkerError>(WorkerError::GracefulExit));
761            let _ = futures_util::try_join!(ctrl_c, timeout)?;
762            Ok::<(), WorkerError>(())
763        };
764        let res = worker.run_until(signal).await;
765        match res {
766            Err(WorkerError::GracefulExit) => {
767                println!("Worker exited gracefully");
768            }
769            _ => panic!("Expected graceful exit error"),
770        }
771    }
772}