1use crate::backend::Backend;
89use crate::error::{BoxDynError, WorkerError};
90use crate::monitor::shutdown::Shutdown;
91use crate::task::Task;
92use crate::task::attempt::Attempt;
93use crate::task::data::Data;
94use crate::worker::call_all::{CallAllError, CallAllUnordered};
95use crate::worker::context::{Tracked, WorkerContext};
96use crate::worker::event::{Event, RawEventListener};
97use futures_core::stream::BoxStream;
98use futures_util::{Future, FutureExt, Stream, StreamExt, TryFutureExt};
99use std::fmt::Debug;
100use std::fmt::{self};
101use std::marker::PhantomData;
102use std::pin::Pin;
103use std::sync::atomic::Ordering;
104use std::task::{Context, Poll};
105use tower_layer::{Layer, Stack};
106use tower_service::Service;
107
108pub mod builder;
109pub mod call_all;
110pub mod context;
111pub mod event;
112pub mod ext;
113mod state;
114pub mod test_worker;
115
116#[must_use = "Workers must be run or streamed to execute tasks"]
147pub struct Worker<Args, Ctx, Backend, Svc, Middleware> {
148 pub(crate) name: String,
149 pub(crate) backend: Backend,
150 pub(crate) service: Svc,
151 pub(crate) middleware: Middleware,
152 pub(crate) task_marker: PhantomData<(Args, Ctx)>,
153 pub(crate) shutdown: Option<Shutdown>,
154 pub(crate) event_handler: RawEventListener,
155}
156
157impl<Args, Ctx, B, Svc, Middleware> fmt::Debug for Worker<Args, Ctx, B, Svc, Middleware>
158where
159 Svc: fmt::Debug,
160 B: fmt::Debug,
161{
162 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163 f.debug_struct("Worker")
164 .field("service", &self.service)
165 .field("backend", &self.backend)
166 .finish()
167 }
168}
169
170impl<Args, Ctx, B, Svc, M> Worker<Args, Ctx, B, Svc, M> {
171 pub fn new(name: String, backend: B, service: Svc, layers: M) -> Self {
173 Worker {
174 name,
175 backend,
176 service,
177 middleware: layers,
178 task_marker: PhantomData,
179 shutdown: None,
180 event_handler: Box::new(|_, _| {}),
181 }
182 }
183}
184
185impl<Args, S, B, M> Worker<Args, B::Context, B, S, M>
186where
187 B: Backend<Args = Args>,
188 S: Service<Task<Args, B::Context, B::IdType>> + Send + 'static,
189 B::Stream: Unpin + Send + 'static,
190 B::Beat: Unpin + Send + 'static,
191 Args: Send + 'static,
192 B::Context: Send + 'static,
193 B::Error: Into<BoxDynError> + Send + 'static,
194 B::Layer: Layer<ReadinessService<TrackerService<S>>>,
195 M: Layer<<B::Layer as Layer<ReadinessService<TrackerService<S>>>>::Service>,
196 <M as Layer<<B::Layer as Layer<ReadinessService<TrackerService<S>>>>::Service>>::Service:
197 Service<Task<Args, B::Context, B::IdType>> + Send + 'static,
198 <<M as Layer<<B::Layer as Layer<ReadinessService<TrackerService<S>>>>::Service>>::Service as Service<Task<Args, B::Context, B::IdType>>>::Future: Send,
199 <<M as Layer<<B::Layer as Layer<ReadinessService<TrackerService<S>>>>::Service>>::Service as Service<Task<Args, B::Context, B::IdType>>>::Error: Into<BoxDynError> + Send + Sync + 'static,
200 B::IdType: Send + 'static,
201{
202 pub async fn run(self) -> Result<(), WorkerError> {
231 let mut ctx = WorkerContext::new::<M::Service>(&self.name);
232 self.run_with_ctx(&mut ctx).await
233 }
234
235 pub async fn run_with_ctx(self, ctx: &mut WorkerContext) -> Result<(), WorkerError> {
239 let mut stream = self.stream_with_ctx(ctx);
240 while let Some(res) = stream.next().await {
241 match res {
242 Ok(_) => continue,
243 Err(WorkerError::GracefulExit) => return Ok(()),
244 Err(e) => return Err(e),
245 }
246 }
247 Ok(())
248 }
249
250 pub async fn run_until<Fut, Err>(mut self, signal: Fut) -> Result<(), WorkerError>
252 where
253 Fut: Future<Output = Result<(), Err>> + Send + 'static,
254 B: Send,
255 M: Send,
256 Err: Into<WorkerError> + Send + 'static,
257 {
258 let shutdown = self.shutdown.take().unwrap_or_default();
259 let terminator = shutdown.shutdown_after(signal);
260 let mut ctx = WorkerContext::new::<M::Service>(&self.name);
261 let c = ctx.clone();
262 let worker = self.run_with_ctx(&mut ctx).boxed();
263 futures_util::try_join!(
264 terminator.map_ok(|_| c.stop()).map_err(|e| e.into()),
265 worker
266 )
267 .map(|_| ())
268 }
269
270 pub async fn run_until_ctx<F, Fut>(mut self, fut: F) -> Result<(), WorkerError>
308 where
309 F: FnOnce(WorkerContext) -> Fut + Send + 'static,
310 Fut: Future<Output = Result<(), WorkerError>> + Send + 'static,
311 B: Send,
312 M: Send,
313 {
314 let shutdown = self.shutdown.take().unwrap_or_default();
315 let mut ctx = WorkerContext::new::<M::Service>(&self.name);
316 let c = ctx.clone();
317 let terminator = shutdown.shutdown_after(fut(c));
318 let worker = self.run_with_ctx(&mut ctx).boxed();
319 futures_util::try_join!(terminator.map_ok(|_| ()), worker).map(|_| ())
320 }
321
322 pub fn stream(self) -> impl Stream<Item = Result<Event, WorkerError>> + use<Args, S, B, M> {
352 let mut ctx = WorkerContext::new::<M::Service>(&self.name);
353 self.stream_with_ctx(&mut ctx)
354 }
355
356 pub fn stream_with_ctx(
361 self,
362 ctx: &mut WorkerContext,
363 ) -> impl Stream<Item = Result<Event, WorkerError>> + use<Args, S, B, M> {
364 let backend = self.backend;
365 let event_handler = self.event_handler;
366 ctx.wrap_listener(event_handler);
367 let worker = ctx.clone();
368 let backend_middleware = backend.middleware();
369 struct ServiceBuilder<L> {
370 layer: L,
371 }
372
373 impl<L> ServiceBuilder<L> {
374 fn layer<T>(self, layer: T) -> ServiceBuilder<Stack<T, L>> {
375 ServiceBuilder {
376 layer: Stack::new(layer, self.layer),
377 }
378 }
379 fn service<S>(&self, service: S) -> L::Service
380 where
381 L: Layer<S>,
382 {
383 self.layer.layer(service)
384 }
385 }
386 let svc = ServiceBuilder {
387 layer: Data::new(worker.clone()),
388 };
389 let service = svc
390 .layer(self.middleware)
392 .layer(backend_middleware)
394 .layer(ReadinessLayer::new(worker.clone()))
396 .layer(TrackerLayer::new(worker.clone()))
400 .service(self.service);
401 let heartbeat = backend.heartbeat(&worker).map(|res| match res {
402 Ok(_) => Ok(Event::HeartBeat),
403 Err(e) => Err(WorkerError::HeartbeatError(e.into())),
404 });
405
406 let stream = backend.poll(&worker);
407
408 let tasks = Self::poll_tasks(service, stream);
409 let mut w = worker.clone();
410 let mut ww = w.clone();
411 let starter: BoxStream<'static, _> = futures_util::stream::once(async move {
412 if !ww.is_running() {
413 ww.start()?;
414 }
415 Ok(None)
416 })
417 .filter_map(|res: Result<Option<Event>, WorkerError>| async move {
418 match res {
419 Ok(_) => None,
420 Err(e) => Some(Err(e)),
421 }
422 })
423 .boxed();
424 let wait_for_exit: BoxStream<'static, _> = futures_util::stream::once(async move {
425 match worker.await {
426 Ok(_) => Err(WorkerError::GracefulExit),
427 Err(e) => Err(e),
428 }
429 })
430 .boxed();
431 let work_stream =
432 futures_util::stream_select!(wait_for_exit, heartbeat, tasks).map(move |res| {
433 if let Ok(e) = &res {
434 w.emit(e);
435 }
436 res
437 });
438 starter.chain(work_stream)
439 }
440 fn poll_tasks<Svc, Stm, E, Ctx>(
441 service: Svc,
442 stream: Stm,
443 ) -> BoxStream<'static, Result<Event, WorkerError>>
444 where
445 Svc: Service<Task<Args, Ctx, B::IdType>> + Send + 'static,
446 Stm: Stream<Item = Result<Option<Task<Args, Ctx, B::IdType>>, E>> + Send + Unpin + 'static,
447 Args: Send + 'static,
448 Svc::Future: Send,
449 Ctx: Send + 'static,
450 Svc::Error: Into<BoxDynError> + Sync + Send,
451 E: Into<BoxDynError> + Send + 'static,
452 {
453 let stream = CallAllUnordered::new(service, stream).map(|r| match r {
454 Ok(Some(_)) => Ok(Event::Success),
455 Ok(None) => Ok(Event::Idle),
456 Err(CallAllError::ServiceError(err)) => Ok(Event::Error(err.into().into())),
457 Err(CallAllError::StreamError(err)) => Err(WorkerError::StreamError(err)),
458 });
459 stream.boxed()
460 }
461}
462
463#[derive(Debug, Clone)]
464struct TrackerLayer {
465 ctx: WorkerContext,
466}
467
468impl TrackerLayer {
469 fn new(ctx: WorkerContext) -> Self {
470 Self { ctx }
471 }
472}
473
474impl<S> Layer<S> for TrackerLayer {
475 type Service = TrackerService<S>;
476
477 fn layer(&self, service: S) -> Self::Service {
478 TrackerService {
479 ctx: self.ctx.clone(),
480 service,
481 }
482 }
483}
484#[derive(Debug, Clone)]
486pub struct TrackerService<S> {
487 ctx: WorkerContext,
488 service: S,
489}
490
491impl<S, Args, Ctx, IdType> Service<Task<Args, Ctx, IdType>> for TrackerService<S>
492where
493 S: Service<Task<Args, Ctx, IdType>>,
494{
495 type Response = S::Response;
496 type Error = S::Error;
497 type Future = Tracked<AttemptOnPollFuture<S::Future>>;
498
499 fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
500 self.service.poll_ready(cx)
501 }
502
503 fn call(&mut self, task: Task<Args, Ctx, IdType>) -> Self::Future {
504 let attempt = task.parts.attempt.clone();
505 self.ctx.track(AttemptOnPollFuture {
506 attempt,
507 fut: self.service.call(task),
508 polled: false,
509 })
510 }
511}
512
513#[pin_project::pin_project]
515#[derive(Debug)]
516pub struct AttemptOnPollFuture<Fut> {
517 attempt: Attempt,
518 #[pin]
519 fut: Fut,
520 polled: bool,
521}
522
523impl<Fut> AttemptOnPollFuture<Fut> {
524 pub fn new(attempt: Attempt, fut: Fut) -> Self {
526 Self {
527 attempt,
528 fut,
529 polled: false,
530 }
531 }
532}
533
534impl<Fut: Future> Future for AttemptOnPollFuture<Fut> {
535 type Output = Fut::Output;
536
537 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
538 let mut this = self.project();
539 if !(*this.polled) {
540 *this.polled = true;
541 this.attempt.increment();
542 }
543 this.fut.poll_unpin(cx)
544 }
545}
546
547#[derive(Debug, Clone)]
549struct ReadinessLayer {
550 ctx: WorkerContext,
551}
552
553impl ReadinessLayer {
554 fn new(ctx: WorkerContext) -> Self {
555 Self { ctx }
556 }
557}
558
559impl<S> Layer<S> for ReadinessLayer {
560 type Service = ReadinessService<S>;
561
562 fn layer(&self, inner: S) -> Self::Service {
563 ReadinessService {
564 inner,
565 ctx: self.ctx.clone(),
566 }
567 }
568}
569#[derive(Debug, Clone)]
573pub struct ReadinessService<S> {
574 inner: S,
575 ctx: WorkerContext,
576}
577
578impl<S, Request> Service<Request> for ReadinessService<S>
579where
580 S: Service<Request>,
581{
582 type Response = S::Response;
583 type Error = S::Error;
584 type Future = S::Future;
585
586 fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
587 if self.ctx.is_shutting_down() {
588 self.ctx.is_ready.store(false, Ordering::SeqCst);
589 return Poll::Pending;
590 }
591 let result = self.inner.poll_ready(cx);
593 match &result {
595 Poll::Ready(Ok(_)) => self.ctx.is_ready.store(true, Ordering::SeqCst),
596 Poll::Pending | Poll::Ready(Err(_)) => self.ctx.is_ready.store(false, Ordering::SeqCst),
597 }
598
599 result
600 }
601
602 fn call(&mut self, req: Request) -> Self::Future {
603 self.inner.call(req)
604 }
605}
606
607#[cfg(test)]
608#[cfg(feature = "json")]
609mod tests {
610 use std::{
611 future::ready,
612 ops::Deref,
613 sync::{Arc, atomic::AtomicUsize},
614 time::Duration,
615 };
616
617 use futures_channel::mpsc::SendError;
618 use futures_core::future::BoxFuture;
619
620 use crate::{
621 backend::{TaskSink, json::JsonStorage, memory::MemoryStorage},
622 task::Parts,
623 worker::{
624 builder::WorkerBuilder,
625 ext::{
626 ack::{Acknowledge, AcknowledgementExt},
627 circuit_breaker::CircuitBreaker,
628 event_listener::EventListenerExt,
629 long_running::LongRunningExt,
630 },
631 },
632 };
633
634 use super::*;
635
636 const ITEMS: u32 = 100;
637
638 #[tokio::test]
639 async fn basic_worker_run() {
640 let mut json_store = JsonStorage::new_temp().unwrap();
641 for i in 0..ITEMS {
642 json_store.push(i).await.unwrap();
643 }
644
645 #[derive(Clone, Debug, Default)]
646 struct Count(Arc<AtomicUsize>);
647
648 impl Deref for Count {
649 type Target = Arc<AtomicUsize>;
650 fn deref(&self) -> &Self::Target {
651 &self.0
652 }
653 }
654
655 async fn task(
656 task: u32,
657 count: Data<Count>,
658 ctx: WorkerContext,
659 ) -> Result<(), BoxDynError> {
660 tokio::time::sleep(Duration::from_secs(1)).await;
661 count.fetch_add(1, Ordering::Relaxed);
662 if task == ITEMS - 1 {
663 ctx.stop().unwrap();
664 return Err("Worker stopped!")?;
665 }
666 Ok(())
667 }
668
669 #[derive(Debug, Clone)]
670 struct MyAcknowledger;
671
672 impl<Ctx: Debug, IdType: Debug> Acknowledge<(), Ctx, IdType> for MyAcknowledger {
673 type Error = SendError;
674 type Future = BoxFuture<'static, Result<(), SendError>>;
675 fn ack(
676 &mut self,
677 res: &Result<(), BoxDynError>,
678 parts: &Parts<Ctx, IdType>,
679 ) -> Self::Future {
680 println!("{res:?}, {parts:?}");
681 ready(Ok(())).boxed()
683 }
684 }
685
686 let worker = WorkerBuilder::new("rango-tango")
687 .backend(json_store)
688 .data(Count::default())
689 .break_circuit()
690 .long_running()
691 .ack_with(MyAcknowledger)
692 .on_event(|ctx, ev| {
693 println!("On Event = {:?} from {}", ev, ctx.name());
694 })
695 .build(task);
696 worker.run().await.unwrap();
697 }
698
699 #[tokio::test]
700 async fn basic_worker_stream() {
701 let mut in_memory = MemoryStorage::new();
702
703 for i in 0..ITEMS {
704 in_memory.push(i).await.unwrap();
705 }
706
707 #[derive(Clone, Debug, Default)]
708 struct Count(Arc<AtomicUsize>);
709
710 impl Deref for Count {
711 type Target = Arc<AtomicUsize>;
712 fn deref(&self) -> &Self::Target {
713 &self.0
714 }
715 }
716
717 async fn task(task: u32, count: Data<Count>, worker: WorkerContext) {
718 tokio::time::sleep(Duration::from_secs(1)).await;
719 count.fetch_add(1, Ordering::Relaxed);
720 if task == ITEMS - 1 {
721 worker.stop().unwrap();
722 }
723 }
724 let worker = WorkerBuilder::new("rango-tango")
725 .backend(in_memory)
726 .data(Count::default())
727 .break_circuit()
728 .long_running()
729 .on_event(|ctx, ev| {
730 println!("CTX {:?}, On Event = {:?}", ctx.name(), ev);
731 })
732 .build(task);
733 let mut event_stream = worker.stream();
734 while let Some(Ok(ev)) = event_stream.next().await {
735 println!("On Event = {:?}", ev);
736 }
737 }
738
739 #[tokio::test]
740 async fn with_shutdown_signal() {
741 let mut in_memory = MemoryStorage::new();
742 for i in 0..ITEMS {
743 in_memory.push(i).await.unwrap();
744 }
745
746 async fn task(_: u32) -> Result<(), BoxDynError> {
747 Ok(())
748 }
749
750 let worker = WorkerBuilder::new("rango-tango")
751 .backend(in_memory)
752 .on_event(|ctx, ev| {
753 println!("On Event = {:?} from {}", ev, ctx.name());
754 })
755 .build(task);
756 let signal = async {
757 let ctrl_c = tokio::signal::ctrl_c().map_err(|e| e.into());
758 let timeout = tokio::time::sleep(Duration::from_secs(5))
759 .map(|_| Err::<(), WorkerError>(WorkerError::GracefulExit));
760 let _ = futures_util::try_join!(ctrl_c, timeout)?;
761 Ok::<(), WorkerError>(())
762 };
763 let res = worker.run_until(signal).await;
764 match res {
765 Err(WorkerError::GracefulExit) => {
766 println!("Worker exited gracefully");
767 }
768 _ => panic!("Expected graceful exit error"),
769 }
770 }
771}