1use crate::backend::Backend;
89use crate::error::{BoxDynError, WorkerError};
90use crate::monitor::shutdown::Shutdown;
91use crate::task::Task;
92use crate::task::attempt::Attempt;
93use crate::task::data::Data;
94use crate::worker::call_all::{CallAllError, CallAllUnordered};
95use crate::worker::context::{Tracked, WorkerContext};
96use crate::worker::event::{Event, RawEventListener};
97use futures_core::stream::BoxStream;
98use futures_util::{Future, FutureExt, Stream, StreamExt, TryFutureExt};
99use std::fmt::Debug;
100use std::fmt::{self};
101use std::marker::PhantomData;
102use std::pin::Pin;
103use std::sync::atomic::Ordering;
104use std::task::{Context, Poll};
105use tower_layer::{Layer, Stack};
106use tower_service::Service;
107
108pub mod builder;
109pub mod call_all;
110pub mod context;
111pub mod event;
112pub mod ext;
113mod state;
114pub mod test_worker;
115
116#[must_use = "Workers must be run or streamed to execute tasks"]
147pub struct Worker<Args, Ctx, Backend, Svc, Middleware> {
148 pub(crate) name: String,
149 pub(crate) backend: Backend,
150 pub(crate) service: Svc,
151 pub(crate) middleware: Middleware,
152 pub(crate) task_marker: PhantomData<(Args, Ctx)>,
153 pub(crate) shutdown: Option<Shutdown>,
154 pub(crate) event_handler: RawEventListener,
155}
156
157impl<Args, Ctx, B, Svc, Middleware> fmt::Debug for Worker<Args, Ctx, B, Svc, Middleware>
158where
159 Svc: fmt::Debug,
160 B: fmt::Debug,
161{
162 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163 f.debug_struct("Worker")
164 .field("service", &self.service)
165 .field("backend", &self.backend)
166 .finish()
167 }
168}
169
170impl<Args, Ctx, B, Svc, M> Worker<Args, Ctx, B, Svc, M> {
171 pub fn new(name: String, backend: B, service: Svc, layers: M) -> Self {
173 Self {
174 name,
175 backend,
176 service,
177 middleware: layers,
178 task_marker: PhantomData,
179 shutdown: None,
180 event_handler: Box::new(|_, _| {}),
181 }
182 }
183}
184
185impl<Args, S, B, M> Worker<Args, B::Context, B, S, M>
186where
187 B: Backend<Args = Args>,
188 S: Service<Task<Args, B::Context, B::IdType>> + Send + 'static,
189 B::Stream: Unpin + Send + 'static,
190 B::Beat: Unpin + Send + 'static,
191 Args: Send + 'static,
192 B::Context: Send + 'static,
193 B::Error: Into<BoxDynError> + Send + 'static,
194 B::Layer: Layer<ReadinessService<TrackerService<S>>>,
195 M: Layer<<B::Layer as Layer<ReadinessService<TrackerService<S>>>>::Service>,
196 <M as Layer<<B::Layer as Layer<ReadinessService<TrackerService<S>>>>::Service>>::Service:
197 Service<Task<Args, B::Context, B::IdType>> + Send + 'static,
198 <<M as Layer<<B::Layer as Layer<ReadinessService<TrackerService<S>>>>::Service>>::Service as Service<Task<Args, B::Context, B::IdType>>>::Future: Send,
199 <<M as Layer<<B::Layer as Layer<ReadinessService<TrackerService<S>>>>::Service>>::Service as Service<Task<Args, B::Context, B::IdType>>>::Error: Into<BoxDynError> + Send + Sync + 'static,
200 B::IdType: Send + 'static,
201{
202 pub async fn run(self) -> Result<(), WorkerError> {
231 let mut ctx = WorkerContext::new::<M::Service>(&self.name);
232 self.run_with_ctx(&mut ctx).await
233 }
234
235 pub async fn run_with_ctx(self, ctx: &mut WorkerContext) -> Result<(), WorkerError> {
239 let mut stream = self.stream_with_ctx(ctx);
240 while let Some(res) = stream.next().await {
241 match res {
242 Ok(_) => {},
243 Err(WorkerError::GracefulExit) => return Ok(()),
244 Err(e) => return Err(e),
245 }
246 }
247 Ok(())
248 }
249
250 pub async fn run_until<Fut, Err>(mut self, signal: Fut) -> Result<(), WorkerError>
252 where
253 Fut: Future<Output = Result<(), Err>> + Send + 'static,
254 B: Send,
255 M: Send,
256 Err: Into<WorkerError> + Send + 'static,
257 {
258 let shutdown = self.shutdown.take().unwrap_or_default();
259 let terminator = shutdown.shutdown_after(signal);
260 let mut ctx = WorkerContext::new::<M::Service>(&self.name);
261 let c = ctx.clone();
262 let worker = self.run_with_ctx(&mut ctx).boxed();
263 futures_util::try_join!(
264 terminator.map_ok(|_| c.stop()).map_err(|e| e.into()),
265 worker
266 )
267 .map(|_| ())
268 }
269
270 pub async fn run_until_ctx<F, Fut>(mut self, fut: F) -> Result<(), WorkerError>
308 where
309 F: FnOnce(WorkerContext) -> Fut + Send + 'static,
310 Fut: Future<Output = Result<(), WorkerError>> + Send + 'static,
311 B: Send,
312 M: Send,
313 {
314 let shutdown = self.shutdown.take().unwrap_or_default();
315 let mut ctx = WorkerContext::new::<M::Service>(&self.name);
316 let c = ctx.clone();
317 let terminator = shutdown.shutdown_after(fut(c));
318 let worker = self.run_with_ctx(&mut ctx).boxed();
319 futures_util::try_join!(terminator.map_ok(|_| ()), worker).map(|_| ())
320 }
321
322 pub fn stream(self) -> impl Stream<Item = Result<Event, WorkerError>> + use<Args, S, B, M> {
352 let mut ctx = WorkerContext::new::<M::Service>(&self.name);
353 self.stream_with_ctx(&mut ctx)
354 }
355
356 pub fn stream_with_ctx(
361 self,
362 ctx: &mut WorkerContext,
363 ) -> impl Stream<Item = Result<Event, WorkerError>> + use<Args, S, B, M> {
364 let backend = self.backend;
365 let event_handler = self.event_handler;
366 ctx.wrap_listener(event_handler);
367 let worker = ctx.clone();
368 let backend_middleware = backend.middleware();
369 struct ServiceBuilder<L> {
370 layer: L,
371 }
372
373 impl<L> ServiceBuilder<L> {
374 fn layer<T>(self, layer: T) -> ServiceBuilder<Stack<T, L>> {
375 ServiceBuilder {
376 layer: Stack::new(layer, self.layer),
377 }
378 }
379 fn service<S>(&self, service: S) -> L::Service
380 where
381 L: Layer<S>,
382 {
383 self.layer.layer(service)
384 }
385 }
386 let svc = ServiceBuilder {
387 layer: Data::new(worker.clone()),
388 };
389 let service = svc
390 .layer(self.middleware)
392 .layer(backend_middleware)
394 .layer(ReadinessLayer::new(worker.clone()))
396 .layer(TrackerLayer::new(worker.clone()))
400 .service(self.service);
401 let heartbeat = backend.heartbeat(&worker).map(|res| match res {
402 Ok(_) => Ok(Event::HeartBeat),
403 Err(e) => Err(WorkerError::HeartbeatError(e.into())),
404 });
405
406 let stream = backend.poll(&worker);
407
408 let tasks = Self::poll_tasks(service, stream);
409 let mut w = worker.clone();
410 let mut ww = w.clone();
411 let starter: BoxStream<'static, _> = futures_util::stream::once(async move {
412 if !ww.is_running() {
413 ww.start()?;
414 }
415 Ok(None)
416 })
417 .filter_map(|res: Result<Option<Event>, WorkerError>| async move {
418 match res {
419 Ok(_) => None,
420 Err(e) => Some(Err(e)),
421 }
422 })
423 .boxed();
424 let wait_for_exit: BoxStream<'static, _> = futures_util::stream::once(async move {
425 match worker.await {
426 Ok(_) => Err(WorkerError::GracefulExit),
427 Err(e) => Err(e),
428 }
429 })
430 .boxed();
431
432 #[allow(clippy::needless_continue)]
433 let work_stream =
434 futures_util::stream_select!(wait_for_exit, heartbeat, tasks).map(move |res| {
435 if let Ok(e) = &res {
436 w.emit(e);
437 }
438 res
439 });
440 starter.chain(work_stream)
441 }
442 fn poll_tasks<Svc, Stm, E, Ctx>(
443 service: Svc,
444 stream: Stm,
445 ) -> BoxStream<'static, Result<Event, WorkerError>>
446 where
447 Svc: Service<Task<Args, Ctx, B::IdType>> + Send + 'static,
448 Stm: Stream<Item = Result<Option<Task<Args, Ctx, B::IdType>>, E>> + Send + Unpin + 'static,
449 Args: Send + 'static,
450 Svc::Future: Send,
451 Ctx: Send + 'static,
452 Svc::Error: Into<BoxDynError> + Sync + Send,
453 E: Into<BoxDynError> + Send + 'static,
454 {
455 let stream = CallAllUnordered::new(service, stream).map(|r| match r {
456 Ok(Some(_)) => Ok(Event::Success),
457 Ok(None) => Ok(Event::Idle),
458 Err(CallAllError::ServiceError(err)) => Ok(Event::Error(err.into().into())),
459 Err(CallAllError::StreamError(err)) => Err(WorkerError::StreamError(err)),
460 });
461 stream.boxed()
462 }
463}
464
465#[derive(Debug, Clone)]
466struct TrackerLayer {
467 ctx: WorkerContext,
468}
469
470impl TrackerLayer {
471 fn new(ctx: WorkerContext) -> Self {
472 Self { ctx }
473 }
474}
475
476impl<S> Layer<S> for TrackerLayer {
477 type Service = TrackerService<S>;
478
479 fn layer(&self, service: S) -> Self::Service {
480 TrackerService {
481 ctx: self.ctx.clone(),
482 service,
483 }
484 }
485}
486#[derive(Debug, Clone)]
488pub struct TrackerService<S> {
489 ctx: WorkerContext,
490 service: S,
491}
492
493impl<S, Args, Ctx, IdType> Service<Task<Args, Ctx, IdType>> for TrackerService<S>
494where
495 S: Service<Task<Args, Ctx, IdType>>,
496{
497 type Response = S::Response;
498 type Error = S::Error;
499 type Future = Tracked<AttemptOnPollFuture<S::Future>>;
500
501 fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
502 self.service.poll_ready(cx)
503 }
504
505 fn call(&mut self, task: Task<Args, Ctx, IdType>) -> Self::Future {
506 let attempt = task.parts.attempt.clone();
507 self.ctx.track(AttemptOnPollFuture {
508 attempt,
509 fut: self.service.call(task),
510 polled: false,
511 })
512 }
513}
514
515#[pin_project::pin_project]
517#[derive(Debug)]
518pub struct AttemptOnPollFuture<Fut> {
519 attempt: Attempt,
520 #[pin]
521 fut: Fut,
522 polled: bool,
523}
524
525impl<Fut> AttemptOnPollFuture<Fut> {
526 pub fn new(attempt: Attempt, fut: Fut) -> Self {
528 Self {
529 attempt,
530 fut,
531 polled: false,
532 }
533 }
534}
535
536impl<Fut: Future> Future for AttemptOnPollFuture<Fut> {
537 type Output = Fut::Output;
538
539 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
540 let mut this = self.project();
541 if !(*this.polled) {
542 *this.polled = true;
543 let _ = this.attempt.increment();
544 }
545 this.fut.poll_unpin(cx)
546 }
547}
548
549#[derive(Debug, Clone)]
551struct ReadinessLayer {
552 ctx: WorkerContext,
553}
554
555impl ReadinessLayer {
556 fn new(ctx: WorkerContext) -> Self {
557 Self { ctx }
558 }
559}
560
561impl<S> Layer<S> for ReadinessLayer {
562 type Service = ReadinessService<S>;
563
564 fn layer(&self, inner: S) -> Self::Service {
565 ReadinessService {
566 inner,
567 ctx: self.ctx.clone(),
568 }
569 }
570}
571#[derive(Debug, Clone)]
575pub struct ReadinessService<S> {
576 inner: S,
577 ctx: WorkerContext,
578}
579
580impl<S, Request> Service<Request> for ReadinessService<S>
581where
582 S: Service<Request>,
583{
584 type Response = S::Response;
585 type Error = S::Error;
586 type Future = S::Future;
587
588 fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
589 if self.ctx.is_shutting_down() || self.ctx.is_paused() {
590 self.ctx.is_ready.store(false, Ordering::SeqCst);
591 return Poll::Pending;
592 }
593 let result = self.inner.poll_ready(cx);
595 match &result {
597 Poll::Ready(Ok(_)) => self.ctx.is_ready.store(true, Ordering::SeqCst),
598 Poll::Pending | Poll::Ready(Err(_)) => self.ctx.is_ready.store(false, Ordering::SeqCst),
599 }
600
601 result
602 }
603
604 fn call(&mut self, req: Request) -> Self::Future {
605 self.inner.call(req)
606 }
607}
608
609#[cfg(test)]
610mod tests {
611 use std::{
612 future::ready,
613 ops::Deref,
614 sync::{Arc, atomic::AtomicUsize},
615 time::Duration,
616 };
617
618 use futures_channel::mpsc::SendError;
619 use futures_core::future::BoxFuture;
620
621 use crate::{
622 backend::{TaskSink, memory::MemoryStorage},
623 task::Parts,
624 worker::{
625 builder::WorkerBuilder,
626 ext::{
627 ack::{Acknowledge, AcknowledgementExt},
628 circuit_breaker::CircuitBreaker,
629 event_listener::EventListenerExt,
630 long_running::LongRunningExt,
631 },
632 },
633 };
634
635 use super::*;
636
637 const ITEMS: u32 = 100;
638
639 #[tokio::test]
640 async fn basic_worker_run() {
641 let mut in_memory = MemoryStorage::new();
642 for i in 0..ITEMS {
643 in_memory.push(i.into()).await.unwrap();
644 }
645
646 #[derive(Clone, Debug, Default)]
647 struct Count(Arc<AtomicUsize>);
648
649 impl Deref for Count {
650 type Target = Arc<AtomicUsize>;
651 fn deref(&self) -> &Self::Target {
652 &self.0
653 }
654 }
655
656 async fn task(
657 task: u32,
658 count: Data<Count>,
659 ctx: WorkerContext,
660 ) -> Result<(), BoxDynError> {
661 tokio::time::sleep(Duration::from_secs(1)).await;
662 count.fetch_add(1, Ordering::Relaxed);
663 if task == ITEMS - 1 {
664 ctx.stop().unwrap();
665 return Err("Worker stopped!")?;
666 }
667 Ok(())
668 }
669
670 #[derive(Debug, Clone)]
671 struct MyAcknowledger;
672
673 impl<Ctx: Debug, IdType: Debug> Acknowledge<(), Ctx, IdType> for MyAcknowledger {
674 type Error = SendError;
675 type Future = BoxFuture<'static, Result<(), SendError>>;
676 fn ack(
677 &mut self,
678 res: &Result<(), BoxDynError>,
679 parts: &Parts<Ctx, IdType>,
680 ) -> Self::Future {
681 println!("{res:?}, {parts:?}");
682 ready(Ok(())).boxed()
684 }
685 }
686
687 let worker = WorkerBuilder::new("rango-tango")
688 .backend(in_memory)
689 .data(Count::default())
690 .break_circuit()
691 .long_running()
692 .ack_with(MyAcknowledger)
693 .on_event(|ctx, ev| {
694 println!("On Event = {ev:?} from {}", ctx.name());
695 })
696 .build(task);
697 worker.run().await.unwrap();
698 }
699
700 #[tokio::test]
701 async fn basic_worker_stream() {
702 let mut in_memory = MemoryStorage::new();
703
704 for i in 0..ITEMS {
705 in_memory.push(i).await.unwrap();
706 }
707
708 #[derive(Clone, Debug, Default)]
709 struct Count(Arc<AtomicUsize>);
710
711 impl Deref for Count {
712 type Target = Arc<AtomicUsize>;
713 fn deref(&self) -> &Self::Target {
714 &self.0
715 }
716 }
717
718 async fn task(task: u32, count: Data<Count>, worker: WorkerContext) {
719 tokio::time::sleep(Duration::from_secs(1)).await;
720 count.fetch_add(1, Ordering::Relaxed);
721 if task == ITEMS - 1 {
722 worker.stop().unwrap();
723 }
724 }
725 let worker = WorkerBuilder::new("rango-tango")
726 .backend(in_memory)
727 .data(Count::default())
728 .break_circuit()
729 .long_running()
730 .on_event(|ctx, ev| {
731 println!("CTX {:?}, On Event = {ev:?}", ctx.name());
732 })
733 .build(task);
734 let mut event_stream = worker.stream();
735 while let Some(Ok(ev)) = event_stream.next().await {
736 println!("On Event = {ev:?}");
737 }
738 }
739
740 #[tokio::test]
741 async fn with_shutdown_signal() {
742 let mut in_memory = MemoryStorage::new();
743 for i in 0..ITEMS {
744 in_memory.push(i).await.unwrap();
745 }
746
747 async fn task(_: u32) -> Result<(), BoxDynError> {
748 Ok(())
749 }
750
751 let worker = WorkerBuilder::new("rango-tango")
752 .backend(in_memory)
753 .on_event(|ctx, ev| {
754 println!("On Event = {ev:?} from {}", ctx.name());
755 })
756 .build(task);
757 let signal = async {
758 let ctrl_c = tokio::signal::ctrl_c().map_err(|e| e.into());
759 let timeout = tokio::time::sleep(Duration::from_secs(5))
760 .map(|_| Err::<(), WorkerError>(WorkerError::GracefulExit));
761 let _ = futures_util::try_join!(ctrl_c, timeout)?;
762 Ok::<(), WorkerError>(())
763 };
764 let res = worker.run_until(signal).await;
765 match res {
766 Err(WorkerError::GracefulExit) => {
767 println!("Worker exited gracefully");
768 }
769 _ => panic!("Expected graceful exit error"),
770 }
771 }
772}