apalis_core/worker/
mod.rs

1//! Utilities for building and running workers.
2//!
3//! A `Worker` polls tasks from a backend, executes them using
4//! a service, emits lifecycle events, and handles graceful shutdowns. A worker is typically
5//! constructed using a [`WorkerBuilder`](crate::worker::builder).
6//!
7//! # Features
8//! - Pluggable backends for task queues (e.g., in-memory, Redis).
9//! - Middleware support for task processing.
10//! - Stream or future-based worker execution modes.
11//! - Built-in event system for logging or metrics.
12//! - Task tracking and controlled worker readiness.
13//!
14//! # Lifecycle
15//!
16//! ```mermaid
17//! graph TD
18//!     A[Start Worker] --> B[Initialize Context & Heartbeat]
19//!     B --> C[Poll Backend for Tasks]
20//!     C --> D{Task Available?}
21//!     D -- Yes --> E[Execute Task via Service Stack]
22//!     E --> F[Emit Events]
23//!     F --> C
24//!     D -- No --> F
25//!     F --> G{Shutdown Signal?}
26//!     G -- Yes --> H[Graceful Shutdown]
27//!     G -- No --> C
28//! ```
29//! Worker lifecycle is composed of several stages:
30//! - Initialize context and heartbeat
31//! - Poll backend for tasks
32//! - Execute tasks via service stack
33//! - Emit events (Idle, Success, Error, HeartBeat)
34//! - Graceful shutdown on signal or stop
35//!
36//! # Examples
37//!
38//! ## Run as a future
39//! ```rust,no_run
40//! # use apalis_core::{worker::builder::WorkerBuilder, backend::memory::MemoryStorage};
41//! # use apalis_core::error::BoxDynError;
42//! # use apalis_core::backend::TaskSink;
43//!
44//! #[tokio::main]
45//! async fn main() -> Result<(), BoxDynError> {
46//!     let mut storage = MemoryStorage::new();
47//!     for i in 0..5 {
48//!         storage.push(i).await?;
49//!     }
50//!
51//!     async fn handler(task: u32) {
52//!         println!("Processing task: {task}");
53//!     }
54//!
55//!     let worker = WorkerBuilder::new("worker-1")
56//!         .backend(storage)
57//!         .build(handler);
58//!
59//!     worker.run().await?;
60//!     Ok(())
61//! }
62//! ```
63//!
64//! ## Runner as a stream
65//! The `stream` interface yields worker events (e.g., `Success`, `Error`) while running:
66//! ```rust,no_run
67//! # use apalis_core::worker::builder::WorkerBuilder;
68//! # use apalis_core::backend::memory::MemoryStorage;
69//! # use futures_util::StreamExt;
70//! # #[tokio::main]
71//! # async fn main() {
72//! #   let mut storage = MemoryStorage::new();
73//! #   async fn handler(task: u32) {
74//! #        println!("Processing task: {task}");
75//! #    }
76//! #   let worker = WorkerBuilder::new("worker-1")
77//! #        .backend(storage)
78//! #        .build(handler);
79//! let mut stream = worker.stream();
80//! while let Some(evt) = stream.next().await {
81//!     println!("Event: {:?}", evt);
82//! }
83//! # }
84//! ```
85//!
86//! # Test Utilities
87//! The [`test_worker`] module includes utilities for unit tests and validation of worker behavior.
88use crate::backend::Backend;
89use crate::error::{BoxDynError, WorkerError};
90use crate::monitor::shutdown::Shutdown;
91use crate::task::Task;
92use crate::task::attempt::Attempt;
93use crate::task::data::Data;
94use crate::worker::call_all::{CallAllError, CallAllUnordered};
95use crate::worker::context::{Tracked, WorkerContext};
96use crate::worker::event::{Event, RawEventListener};
97use futures_core::stream::BoxStream;
98use futures_util::{Future, FutureExt, Stream, StreamExt, TryFutureExt};
99use std::fmt::Debug;
100use std::fmt::{self};
101use std::marker::PhantomData;
102use std::pin::Pin;
103use std::sync::atomic::Ordering;
104use std::task::{Context, Poll};
105use tower_layer::{Layer, Stack};
106use tower_service::Service;
107
108pub mod builder;
109pub mod call_all;
110pub mod context;
111pub mod event;
112pub mod ext;
113mod state;
114pub mod test_worker;
115
116/// Core component responsible for task polling, execution, and lifecycle management.
117///
118/// # Example
119/// Basic example:
120/// ```rust,no_run
121/// # use apalis_core::error::BoxDynError;
122/// # use apalis_core::backend::memory::MemoryStorage;
123/// # use apalis_core::worker::builder::WorkerBuilder;
124/// # use apalis_core::backend::TaskSink;
125///
126/// #[tokio::main]
127/// async fn main() -> Result<(), BoxDynError> {
128///     let mut storage = MemoryStorage::new();
129///     for i in 0..5 {
130///         storage.push(i).await?;
131///     }
132///
133///     async fn handler(task: u32) {
134///         println!("Processing task: {task}");
135///     }
136///
137///     let worker = WorkerBuilder::new("worker-1")
138///         .backend(storage)
139///         .build(handler);
140///
141///     worker.run().await?;
142///     Ok(())
143/// }
144/// ```
145/// See [module level documentation](self) for more details.
146#[must_use = "Workers must be run or streamed to execute tasks"]
147pub struct Worker<Args, Ctx, Backend, Svc, Middleware> {
148    pub(crate) name: String,
149    pub(crate) backend: Backend,
150    pub(crate) service: Svc,
151    pub(crate) middleware: Middleware,
152    pub(crate) task_marker: PhantomData<(Args, Ctx)>,
153    pub(crate) shutdown: Option<Shutdown>,
154    pub(crate) event_handler: RawEventListener,
155}
156
157impl<Args, Ctx, B, Svc, Middleware> fmt::Debug for Worker<Args, Ctx, B, Svc, Middleware>
158where
159    Svc: fmt::Debug,
160    B: fmt::Debug,
161{
162    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163        f.debug_struct("Worker")
164            .field("service", &self.service)
165            .field("backend", &self.backend)
166            .finish()
167    }
168}
169
170impl<Args, Ctx, B, Svc, M> Worker<Args, Ctx, B, Svc, M> {
171    /// Build a worker that is ready for execution
172    pub fn new(name: String, backend: B, service: Svc, layers: M) -> Self {
173        Worker {
174            name,
175            backend,
176            service,
177            middleware: layers,
178            task_marker: PhantomData,
179            shutdown: None,
180            event_handler: Box::new(|_, _| {}),
181        }
182    }
183}
184
185impl<Args, S, B, M> Worker<Args, B::Context, B, S, M>
186where
187    B: Backend<Args = Args>,
188    S: Service<Task<Args, B::Context, B::IdType>> + Send + 'static,
189    B::Stream: Unpin + Send + 'static,
190    B::Beat: Unpin + Send + 'static,
191    Args: Send + 'static,
192    B::Context: Send + 'static,
193    B::Error: Into<BoxDynError> + Send + 'static,
194    B::Layer: Layer<ReadinessService<TrackerService<S>>>,
195    M: Layer<<B::Layer as Layer<ReadinessService<TrackerService<S>>>>::Service>,
196    <M as Layer<<B::Layer as Layer<ReadinessService<TrackerService<S>>>>::Service>>::Service:
197        Service<Task<Args, B::Context, B::IdType>> + Send + 'static,
198    <<M as Layer<<B::Layer as Layer<ReadinessService<TrackerService<S>>>>::Service>>::Service as Service<Task<Args, B::Context, B::IdType>>>::Future: Send,
199        <<M as Layer<<B::Layer as Layer<ReadinessService<TrackerService<S>>>>::Service>>::Service as Service<Task<Args, B::Context, B::IdType>>>::Error: Into<BoxDynError> + Send + Sync + 'static,
200    B::IdType: Send + 'static,
201{
202    /// Run the worker until completion
203    ///
204    /// # Example
205    /// ```no_run
206    /// # use apalis_core::error::BoxDynError;
207    /// # use apalis_core::backend::memory::MemoryStorage;
208    /// # use apalis_core::backend::TaskSink;
209    /// # use apalis_core::worker::builder::WorkerBuilder;
210    ///
211    /// #[tokio::main]
212    /// async fn main() -> Result<(), BoxDynError> {
213    ///     let mut storage = MemoryStorage::new();
214    ///     for i in 0..5 {
215    ///         storage.push(i).await?;
216    ///     }
217    ///
218    ///     async fn handler(task: u32) {
219    ///         println!("Processing task: {task}");
220    ///     }
221    ///
222    ///     let worker = WorkerBuilder::new("worker-1")
223    ///         .backend(storage)
224    ///         .build(handler);
225    ///
226    ///     worker.run().await?;
227    ///     Ok(())
228    /// }
229    /// ```
230    pub async fn run(self) -> Result<(), WorkerError> {
231        let mut ctx = WorkerContext::new::<M::Service>(&self.name);
232        self.run_with_ctx(&mut ctx).await
233    }
234
235    /// Run the worker with the given context.
236    ///
237    /// See [`run`](Self::run) for an example.
238    pub async fn run_with_ctx(self, ctx: &mut WorkerContext) -> Result<(), WorkerError> {
239        let mut stream = self.stream_with_ctx(ctx);
240        while let Some(res) = stream.next().await {
241            match res {
242                Ok(_) => continue,
243                Err(WorkerError::GracefulExit) => return Ok(()),
244                Err(e) => return Err(e),
245            }
246        }
247        Ok(())
248    }
249
250    /// Run the worker until a shutdown signal future is complete.
251    pub async fn run_until<Fut, Err>(mut self, signal: Fut) -> Result<(), WorkerError>
252    where
253        Fut: Future<Output = Result<(), Err>> + Send + 'static,
254        B: Send,
255        M: Send,
256        Err: Into<WorkerError> + Send + 'static,
257    {
258        let shutdown = self.shutdown.take().unwrap_or_default();
259        let terminator = shutdown.shutdown_after(signal);
260        let mut ctx = WorkerContext::new::<M::Service>(&self.name);
261        let c = ctx.clone();
262        let worker = self.run_with_ctx(&mut ctx).boxed();
263        futures_util::try_join!(
264            terminator.map_ok(|_| c.stop()).map_err(|e| e.into()),
265            worker
266        )
267        .map(|_| ())
268    }
269
270    /// Run the worker until a shutdown signal future is complete.
271    ///
272    /// *Note*: Using this function requires you to call `ctx.stop()` in the future to completely stop the worker.
273    ///
274    /// This can also be very powerful with pausing and resuming the worker using the context.
275    ///
276    /// # Example
277    ///
278    /// ```rust,no_run
279    ///
280    /// # use apalis_core::{worker::builder::WorkerBuilder, backend::memory::MemoryStorage};
281    /// # use apalis_core::error::BoxDynError;
282    /// # use apalis_core::backend::TaskSink;
283    /// # use std::time::Duration;
284    /// # use tokio::time::sleep;
285    /// # use apalis_core::error::WorkerError;
286    ///
287    /// #[tokio::main]
288    /// async fn main() -> Result<(), BoxDynError> {
289    ///     let mut storage = MemoryStorage::new();
290    ///     for i in 0..5 {
291    ///         storage.push(i).await?;
292    ///     }
293    ///     async fn handler(task: u32) {
294    ///         println!("Processing task: {task}");
295    ///     }
296    ///     let worker = WorkerBuilder::new("worker-1")
297    ///         .backend(storage)
298    ///         .build(handler);
299    ///     worker.run_until_ctx(|ctx| async move {
300    ///         sleep(Duration::from_secs(1)).await;
301    ///         ctx.stop()?;
302    ///         Ok(())
303    ///     }).await?;
304    ///     Ok(())
305    /// }
306    /// ```
307    pub async fn run_until_ctx<F, Fut>(mut self, fut: F) -> Result<(), WorkerError>
308    where
309        F: FnOnce(WorkerContext) -> Fut + Send + 'static,
310        Fut: Future<Output = Result<(), WorkerError>> + Send + 'static,
311        B: Send,
312        M: Send,
313    {
314        let shutdown = self.shutdown.take().unwrap_or_default();
315        let mut ctx = WorkerContext::new::<M::Service>(&self.name);
316        let c = ctx.clone();
317        let terminator = shutdown.shutdown_after(fut(c));
318        let worker = self.run_with_ctx(&mut ctx).boxed();
319        futures_util::try_join!(terminator.map_ok(|_| ()), worker).map(|_| ())
320    }
321
322    /// Returns a stream that will yield events as they occur within the worker's lifecycle
323    ///
324    /// # Example
325    ///
326    /// ```rust,no_run
327    /// # use apalis_core::error::BoxDynError;
328    /// # use apalis_core::backend::memory::MemoryStorage;
329    /// # use apalis_core::worker::builder::WorkerBuilder;
330    /// # use apalis_core::backend::TaskSink;
331    /// # use futures_util::StreamExt;
332    /// #[tokio::main]
333    /// async fn main() -> Result<(), BoxDynError> {
334    ///     let mut storage = MemoryStorage::new();
335    ///     for i in 0..5 {
336    ///         storage.push(i).await?;
337    ///     }
338    ///     async fn handler(task: u32) {
339    ///         println!("Processing task: {task}");
340    ///     }
341    ///     let worker = WorkerBuilder::new("worker-1")
342    ///         .backend(storage)
343    ///         .build(handler);
344    ///     let mut stream = worker.stream();
345    ///     while let Some(evt) = stream.next().await {
346    ///         println!("Event: {:?}", evt);
347    ///     }
348    ///     Ok(())
349    /// }
350    /// ```
351    pub fn stream(self) -> impl Stream<Item = Result<Event, WorkerError>> + use<Args, S, B, M> {
352        let mut ctx = WorkerContext::new::<M::Service>(&self.name);
353        self.stream_with_ctx(&mut ctx)
354    }
355
356    /// Returns a stream that will yield events as they occur within the worker's lifecycle when provided
357    /// with a [`WorkerContext`].
358    ///
359    /// See [`stream`](Self::stream) for an example.
360    pub fn stream_with_ctx(
361        self,
362        ctx: &mut WorkerContext,
363    ) -> impl Stream<Item = Result<Event, WorkerError>> + use<Args, S, B, M> {
364        let backend = self.backend;
365        let event_handler = self.event_handler;
366        ctx.wrap_listener(event_handler);
367        let worker = ctx.clone();
368        let backend_middleware = backend.middleware();
369        struct ServiceBuilder<L> {
370            layer: L,
371        }
372
373        impl<L> ServiceBuilder<L> {
374            fn layer<T>(self, layer: T) -> ServiceBuilder<Stack<T, L>> {
375                ServiceBuilder {
376                    layer: Stack::new(layer, self.layer),
377                }
378            }
379            fn service<S>(&self, service: S) -> L::Service
380            where
381                L: Layer<S>,
382            {
383                self.layer.layer(service)
384            }
385        }
386        let svc = ServiceBuilder {
387            layer: Data::new(worker.clone()),
388        };
389        let service = svc
390            // pass the user defined middleware first
391            .layer(self.middleware)
392            // backend middleware should be the next layer so it can observe all requests released by user middleware
393            .layer(backend_middleware)
394            // when all layers are ready, inform the worker is ready to accept tasks
395            .layer(ReadinessLayer::new(worker.clone()))
396            // Track all tasks to allow graceful shutdowns
397            // we also increment the attempt count on the first poll
398            // this ensures that the attempt count is accurate even if the task fails before completion
399            .layer(TrackerLayer::new(worker.clone()))
400            .service(self.service);
401        let heartbeat = backend.heartbeat(&worker).map(|res| match res {
402            Ok(_) => Ok(Event::HeartBeat),
403            Err(e) => Err(WorkerError::HeartbeatError(e.into())),
404        });
405
406        let stream = backend.poll(&worker);
407
408        let tasks = Self::poll_tasks(service, stream);
409        let mut w = worker.clone();
410        let mut ww = w.clone();
411        let starter: BoxStream<'static, _> = futures_util::stream::once(async move {
412            if !ww.is_running() {
413                ww.start()?;
414            }
415            Ok(None)
416        })
417        .filter_map(|res: Result<Option<Event>, WorkerError>| async move {
418            match res {
419                Ok(_) => None,
420                Err(e) => Some(Err(e)),
421            }
422        })
423        .boxed();
424        let wait_for_exit: BoxStream<'static, _> = futures_util::stream::once(async move {
425            match worker.await {
426                Ok(_) => Err(WorkerError::GracefulExit),
427                Err(e) => Err(e),
428            }
429        })
430        .boxed();
431        let work_stream =
432            futures_util::stream_select!(wait_for_exit, heartbeat, tasks).map(move |res| {
433                if let Ok(e) = &res {
434                    w.emit(e);
435                }
436                res
437            });
438        starter.chain(work_stream)
439    }
440    fn poll_tasks<Svc, Stm, E, Ctx>(
441        service: Svc,
442        stream: Stm,
443    ) -> BoxStream<'static, Result<Event, WorkerError>>
444    where
445        Svc: Service<Task<Args, Ctx, B::IdType>> + Send + 'static,
446        Stm: Stream<Item = Result<Option<Task<Args, Ctx, B::IdType>>, E>> + Send + Unpin + 'static,
447        Args: Send + 'static,
448        Svc::Future: Send,
449        Ctx: Send + 'static,
450        Svc::Error: Into<BoxDynError> + Sync + Send,
451        E: Into<BoxDynError> + Send + 'static,
452    {
453        let stream = CallAllUnordered::new(service, stream).map(|r| match r {
454            Ok(Some(_)) => Ok(Event::Success),
455            Ok(None) => Ok(Event::Idle),
456            Err(CallAllError::ServiceError(err)) => Ok(Event::Error(err.into().into())),
457            Err(CallAllError::StreamError(err)) => Err(WorkerError::StreamError(err)),
458        });
459        stream.boxed()
460    }
461}
462
463#[derive(Debug, Clone)]
464struct TrackerLayer {
465    ctx: WorkerContext,
466}
467
468impl TrackerLayer {
469    fn new(ctx: WorkerContext) -> Self {
470        Self { ctx }
471    }
472}
473
474impl<S> Layer<S> for TrackerLayer {
475    type Service = TrackerService<S>;
476
477    fn layer(&self, service: S) -> Self::Service {
478        TrackerService {
479            ctx: self.ctx.clone(),
480            service,
481        }
482    }
483}
484/// Service that tracks a tasks future allowing graceful shutdowns
485#[derive(Debug, Clone)]
486pub struct TrackerService<S> {
487    ctx: WorkerContext,
488    service: S,
489}
490
491impl<S, Args, Ctx, IdType> Service<Task<Args, Ctx, IdType>> for TrackerService<S>
492where
493    S: Service<Task<Args, Ctx, IdType>>,
494{
495    type Response = S::Response;
496    type Error = S::Error;
497    type Future = Tracked<AttemptOnPollFuture<S::Future>>;
498
499    fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
500        self.service.poll_ready(cx)
501    }
502
503    fn call(&mut self, task: Task<Args, Ctx, IdType>) -> Self::Future {
504        let attempt = task.parts.attempt.clone();
505        self.ctx.track(AttemptOnPollFuture {
506            attempt,
507            fut: self.service.call(task),
508            polled: false,
509        })
510    }
511}
512
513/// A future that increments the attempt count on the first poll
514#[pin_project::pin_project]
515#[derive(Debug)]
516pub struct AttemptOnPollFuture<Fut> {
517    attempt: Attempt,
518    #[pin]
519    fut: Fut,
520    polled: bool,
521}
522
523impl<Fut> AttemptOnPollFuture<Fut> {
524    /// Create a new attempt on poll future
525    pub fn new(attempt: Attempt, fut: Fut) -> Self {
526        Self {
527            attempt,
528            fut,
529            polled: false,
530        }
531    }
532}
533
534impl<Fut: Future> Future for AttemptOnPollFuture<Fut> {
535    type Output = Fut::Output;
536
537    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
538        let mut this = self.project();
539        if !(*this.polled) {
540            *this.polled = true;
541            this.attempt.increment();
542        }
543        this.fut.poll_unpin(cx)
544    }
545}
546
547/// Injects the [`ReadinessService`] to track when workers are ready to accept new tasks
548#[derive(Debug, Clone)]
549struct ReadinessLayer {
550    ctx: WorkerContext,
551}
552
553impl ReadinessLayer {
554    fn new(ctx: WorkerContext) -> Self {
555        Self { ctx }
556    }
557}
558
559impl<S> Layer<S> for ReadinessLayer {
560    type Service = ReadinessService<S>;
561
562    fn layer(&self, inner: S) -> Self::Service {
563        ReadinessService {
564            inner,
565            ctx: self.ctx.clone(),
566        }
567    }
568}
569/// Service that tracks the readiness of underlying services
570///
571/// Should be the innermost service
572#[derive(Debug, Clone)]
573pub struct ReadinessService<S> {
574    inner: S,
575    ctx: WorkerContext,
576}
577
578impl<S, Request> Service<Request> for ReadinessService<S>
579where
580    S: Service<Request>,
581{
582    type Response = S::Response;
583    type Error = S::Error;
584    type Future = S::Future;
585
586    fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
587        if self.ctx.is_shutting_down() {
588            self.ctx.is_ready.store(false, Ordering::SeqCst);
589            return Poll::Pending;
590        }
591        // Delegate poll_ready to the inner service
592        let result = self.inner.poll_ready(cx);
593        // Update the readiness state based on the result
594        match &result {
595            Poll::Ready(Ok(_)) => self.ctx.is_ready.store(true, Ordering::SeqCst),
596            Poll::Pending | Poll::Ready(Err(_)) => self.ctx.is_ready.store(false, Ordering::SeqCst),
597        }
598
599        result
600    }
601
602    fn call(&mut self, req: Request) -> Self::Future {
603        self.inner.call(req)
604    }
605}
606
607#[cfg(test)]
608#[cfg(feature = "json")]
609mod tests {
610    use std::{
611        future::ready,
612        ops::Deref,
613        sync::{Arc, atomic::AtomicUsize},
614        time::Duration,
615    };
616
617    use futures_channel::mpsc::SendError;
618    use futures_core::future::BoxFuture;
619
620    use crate::{
621        backend::{TaskSink, json::JsonStorage, memory::MemoryStorage},
622        task::Parts,
623        worker::{
624            builder::WorkerBuilder,
625            ext::{
626                ack::{Acknowledge, AcknowledgementExt},
627                circuit_breaker::CircuitBreaker,
628                event_listener::EventListenerExt,
629                long_running::LongRunningExt,
630            },
631        },
632    };
633
634    use super::*;
635
636    const ITEMS: u32 = 100;
637
638    #[tokio::test]
639    async fn basic_worker_run() {
640        let mut json_store = JsonStorage::new_temp().unwrap();
641        for i in 0..ITEMS {
642            json_store.push(i).await.unwrap();
643        }
644
645        #[derive(Clone, Debug, Default)]
646        struct Count(Arc<AtomicUsize>);
647
648        impl Deref for Count {
649            type Target = Arc<AtomicUsize>;
650            fn deref(&self) -> &Self::Target {
651                &self.0
652            }
653        }
654
655        async fn task(
656            task: u32,
657            count: Data<Count>,
658            ctx: WorkerContext,
659        ) -> Result<(), BoxDynError> {
660            tokio::time::sleep(Duration::from_secs(1)).await;
661            count.fetch_add(1, Ordering::Relaxed);
662            if task == ITEMS - 1 {
663                ctx.stop().unwrap();
664                return Err("Worker stopped!")?;
665            }
666            Ok(())
667        }
668
669        #[derive(Debug, Clone)]
670        struct MyAcknowledger;
671
672        impl<Ctx: Debug, IdType: Debug> Acknowledge<(), Ctx, IdType> for MyAcknowledger {
673            type Error = SendError;
674            type Future = BoxFuture<'static, Result<(), SendError>>;
675            fn ack(
676                &mut self,
677                res: &Result<(), BoxDynError>,
678                parts: &Parts<Ctx, IdType>,
679            ) -> Self::Future {
680                println!("{res:?}, {parts:?}");
681                // Call webhook with the result and parts?
682                ready(Ok(())).boxed()
683            }
684        }
685
686        let worker = WorkerBuilder::new("rango-tango")
687            .backend(json_store)
688            .data(Count::default())
689            .break_circuit()
690            .long_running()
691            .ack_with(MyAcknowledger)
692            .on_event(|ctx, ev| {
693                println!("On Event = {:?} from {}", ev, ctx.name());
694            })
695            .build(task);
696        worker.run().await.unwrap();
697    }
698
699    #[tokio::test]
700    async fn basic_worker_stream() {
701        let mut in_memory = MemoryStorage::new();
702
703        for i in 0..ITEMS {
704            in_memory.push(i).await.unwrap();
705        }
706
707        #[derive(Clone, Debug, Default)]
708        struct Count(Arc<AtomicUsize>);
709
710        impl Deref for Count {
711            type Target = Arc<AtomicUsize>;
712            fn deref(&self) -> &Self::Target {
713                &self.0
714            }
715        }
716
717        async fn task(task: u32, count: Data<Count>, worker: WorkerContext) {
718            tokio::time::sleep(Duration::from_secs(1)).await;
719            count.fetch_add(1, Ordering::Relaxed);
720            if task == ITEMS - 1 {
721                worker.stop().unwrap();
722            }
723        }
724        let worker = WorkerBuilder::new("rango-tango")
725            .backend(in_memory)
726            .data(Count::default())
727            .break_circuit()
728            .long_running()
729            .on_event(|ctx, ev| {
730                println!("CTX {:?}, On Event = {:?}", ctx.name(), ev);
731            })
732            .build(task);
733        let mut event_stream = worker.stream();
734        while let Some(Ok(ev)) = event_stream.next().await {
735            println!("On Event = {:?}", ev);
736        }
737    }
738
739    #[tokio::test]
740    async fn with_shutdown_signal() {
741        let mut in_memory = MemoryStorage::new();
742        for i in 0..ITEMS {
743            in_memory.push(i).await.unwrap();
744        }
745
746        async fn task(_: u32) -> Result<(), BoxDynError> {
747            Ok(())
748        }
749
750        let worker = WorkerBuilder::new("rango-tango")
751            .backend(in_memory)
752            .on_event(|ctx, ev| {
753                println!("On Event = {:?} from {}", ev, ctx.name());
754            })
755            .build(task);
756        let signal = async {
757            let ctrl_c = tokio::signal::ctrl_c().map_err(|e| e.into());
758            let timeout = tokio::time::sleep(Duration::from_secs(5))
759                .map(|_| Err::<(), WorkerError>(WorkerError::GracefulExit));
760            let _ = futures_util::try_join!(ctrl_c, timeout)?;
761            Ok::<(), WorkerError>(())
762        };
763        let res = worker.run_until(signal).await;
764        match res {
765            Err(WorkerError::GracefulExit) => {
766                println!("Worker exited gracefully");
767            }
768            _ => panic!("Expected graceful exit error"),
769        }
770    }
771}