1use crate::backend::Backend;
89use crate::error::{BoxDynError, WorkerError};
90use crate::monitor::shutdown::Shutdown;
91use crate::task::Task;
92use crate::task::attempt::Attempt;
93use crate::task::data::Data;
94use crate::worker::call_all::{CallAllError, CallAllUnordered};
95use crate::worker::context::{Tracked, WorkerContext};
96use crate::worker::event::{Event, RawEventListener};
97use futures_core::stream::BoxStream;
98use futures_util::{Future, FutureExt, Stream, StreamExt, TryFutureExt};
99use std::fmt::Debug;
100use std::fmt::{self};
101use std::marker::PhantomData;
102use std::pin::Pin;
103use std::sync::atomic::Ordering;
104use std::task::{Context, Poll};
105use tower_layer::{Layer, Stack};
106use tower_service::Service;
107
108pub mod builder;
109pub mod call_all;
110pub mod context;
111pub mod event;
112pub mod ext;
113mod state;
114pub mod test_worker;
115
116#[must_use = "Workers must be run or streamed to execute tasks"]
147pub struct Worker<Args, Ctx, Backend, Svc, Middleware> {
148 pub(crate) name: String,
149 pub(crate) backend: Backend,
150 pub(crate) service: Svc,
151 pub(crate) middleware: Middleware,
152 pub(crate) task_marker: PhantomData<(Args, Ctx)>,
153 pub(crate) shutdown: Option<Shutdown>,
154 pub(crate) event_handler: RawEventListener,
155}
156
157impl<Args, Ctx, B, Svc, Middleware> fmt::Debug for Worker<Args, Ctx, B, Svc, Middleware>
158where
159 Svc: fmt::Debug,
160 B: fmt::Debug,
161{
162 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163 f.debug_struct("Worker")
164 .field("service", &self.service)
165 .field("backend", &self.backend)
166 .finish()
167 }
168}
169
170impl<Args, Ctx, B, Svc, M> Worker<Args, Ctx, B, Svc, M> {
171 pub fn new(name: String, backend: B, service: Svc, layers: M) -> Self {
173 Worker {
174 name,
175 backend,
176 service,
177 middleware: layers,
178 task_marker: PhantomData,
179 shutdown: None,
180 event_handler: Box::new(|_, _| {}),
181 }
182 }
183}
184
185impl<Args, S, B, M> Worker<Args, B::Context, B, S, M>
186where
187 B: Backend<Args = Args>,
188 S: Service<Task<Args, B::Context, B::IdType>> + Send + 'static,
189 B::Stream: Unpin + Send + 'static,
190 B::Beat: Unpin + Send + 'static,
191 Args: Send + 'static,
192 B::Context: Send + 'static,
193 B::Error: Into<BoxDynError> + Send + 'static,
194 M: Layer<ReadinessService<TrackerService<S>>>,
195 B::Layer: Layer<M::Service>,
196 <B::Layer as Layer<M::Service>>::Service: Service<Task<Args, B::Context, B::IdType>> + Send + 'static,
197 <<B::Layer as Layer<M::Service>>::Service as Service<Task<Args, B::Context, B::IdType>>>::Error:
198 Into<BoxDynError> + Send + Sync + 'static,
199 <<B::Layer as Layer<M::Service>>::Service as Service<Task<Args, B::Context, B::IdType>>>::Future: Send,
200 B::IdType: Send + 'static,
201{
202 pub async fn run(self) -> Result<(), WorkerError> {
231 let mut ctx = WorkerContext::new::<M::Service>(&self.name);
232 self.run_with_ctx(&mut ctx).await
233 }
234
235 pub async fn run_with_ctx(self, ctx: &mut WorkerContext) -> Result<(), WorkerError> {
239 let mut stream = self.stream_with_ctx(ctx);
240 while let Some(res) = stream.next().await {
241 match res {
242 Ok(_) => continue,
243 Err(WorkerError::GracefulExit) => return Ok(()),
244 Err(e) => return Err(e),
245 }
246 }
247 Ok(())
248 }
249
250 pub async fn run_until<Fut, Err>(mut self, signal: Fut) -> Result<(), WorkerError>
252 where
253 Fut: Future<Output = Result<(), Err>> + Send + 'static,
254 B: Send,
255 M: Send,
256 Err: Into<WorkerError> + Send + 'static,
257 {
258 let shutdown = self.shutdown.take().unwrap_or_default();
259 let terminator = shutdown.shutdown_after(signal);
260 let mut ctx = WorkerContext::new::<M::Service>(&self.name);
261 let c = ctx.clone();
262 let worker = self.run_with_ctx(&mut ctx).boxed();
263 futures_util::try_join!(
264 terminator.map_ok(|_| c.stop()).map_err(|e| e.into()),
265 worker
266 )
267 .map(|_| ())
268 }
269
270 pub async fn run_until_ctx<F, Fut>(mut self, fut: F) -> Result<(), WorkerError>
308 where
309 F: FnOnce(WorkerContext) -> Fut + Send + 'static,
310 Fut: Future<Output = Result<(), WorkerError>> + Send + 'static,
311 B: Send,
312 M: Send,
313 {
314 let shutdown = self.shutdown.take().unwrap_or_default();
315 let mut ctx = WorkerContext::new::<M::Service>(&self.name);
316 let c = ctx.clone();
317 let terminator = shutdown.shutdown_after(fut(c));
318 let worker = self.run_with_ctx(&mut ctx).boxed();
319 futures_util::try_join!(terminator.map_ok(|_| ()), worker).map(|_| ())
320 }
321
322 pub fn stream(self) -> impl Stream<Item = Result<Event, WorkerError>> + use<Args, S, B, M> {
352 let mut ctx = WorkerContext::new::<M::Service>(&self.name);
353 self.stream_with_ctx(&mut ctx)
354 }
355
356 pub fn stream_with_ctx(
361 self,
362 ctx: &mut WorkerContext,
363 ) -> impl Stream<Item = Result<Event, WorkerError>> + use<Args, S, B, M> {
364 let backend = self.backend;
365 let event_handler = self.event_handler;
366 ctx.wrap_listener(event_handler);
367 let worker = ctx.clone();
368 let backend_middleware = backend.middleware();
369 struct ServiceBuilder<L> {
370 layer: L,
371 }
372
373 impl<L> ServiceBuilder<L> {
374 fn layer<T>(self, layer: T) -> ServiceBuilder<Stack<T, L>> {
375 ServiceBuilder {
376 layer: Stack::new(layer, self.layer),
377 }
378 }
379 fn service<S>(&self, service: S) -> L::Service
380 where
381 L: Layer<S>,
382 {
383 self.layer.layer(service)
384 }
385 }
386 let svc = ServiceBuilder {
387 layer: Data::new(worker.clone()),
388 };
389 let service = svc
390 .layer(backend_middleware)
392 .layer(self.middleware)
394 .layer(ReadinessLayer::new(worker.clone()))
396 .layer(TrackerLayer::new(worker.clone()))
400 .service(self.service);
401 let heartbeat = backend.heartbeat(&worker).map(|res| match res {
402 Ok(_) => Ok(Event::HeartBeat),
403 Err(e) => Err(WorkerError::HeartbeatError(e.into())),
404 });
405
406 let stream = backend.poll(&worker);
407
408 let tasks = Self::poll_tasks(service, stream);
409 let mut w = worker.clone();
410 let mut ww = w.clone();
411 let starter: BoxStream<'static, _> = futures_util::stream::once(async move {
412 if !ww.is_running() {
413 ww.start()?;
414 }
415 Ok(None)
416 })
417 .filter_map(|res: Result<Option<Event>, WorkerError>| async move {
418 match res {
419 Ok(_) => None,
420 Err(e) => Some(Err(e)),
421 }
422 })
423 .boxed();
424 let wait_for_exit: BoxStream<'static, _> = futures_util::stream::once(async move {
425 match worker.await {
426 Ok(_) => Err(WorkerError::GracefulExit),
427 Err(e) => Err(e),
428 }
429 })
430 .boxed();
431 let work_stream =
432 futures_util::stream_select!(wait_for_exit, heartbeat, tasks).map(move |res| {
433 if let Ok(e) = &res {
434 w.emit(e);
435 }
436 res
437 });
438 starter.chain(work_stream)
439 }
440 fn poll_tasks<Svc, Stm, E, Ctx>(
441 service: Svc,
442 stream: Stm,
443 ) -> BoxStream<'static, Result<Event, WorkerError>>
444 where
445 Svc: Service<Task<Args, Ctx, B::IdType>> + Send + 'static,
446 Stm: Stream<Item = Result<Option<Task<Args, Ctx, B::IdType>>, E>> + Send + Unpin + 'static,
447 Args: Send + 'static,
448 Svc::Future: Send,
449 Ctx: Send + 'static,
450 Svc::Error: Into<BoxDynError> + Sync + Send,
451 E: Into<BoxDynError> + Send + 'static,
452 {
453 let stream = CallAllUnordered::new(service, stream).map(|r| match r {
454 Ok(Some(_)) => Ok(Event::Success),
455 Ok(None) => Ok(Event::Idle),
456 Err(CallAllError::ServiceError(err)) => Ok(Event::Error(err.into().into())),
457 Err(CallAllError::StreamError(err)) => Err(WorkerError::StreamError(err)),
458 });
459 stream.boxed()
460 }
461}
462
463#[derive(Debug, Clone)]
464struct TrackerLayer {
465 ctx: WorkerContext,
466}
467
468impl TrackerLayer {
469 fn new(ctx: WorkerContext) -> Self {
470 Self { ctx }
471 }
472}
473
474impl<S> Layer<S> for TrackerLayer {
475 type Service = TrackerService<S>;
476
477 fn layer(&self, service: S) -> Self::Service {
478 TrackerService {
479 ctx: self.ctx.clone(),
480 service,
481 }
482 }
483}
484#[derive(Debug, Clone)]
486pub struct TrackerService<S> {
487 ctx: WorkerContext,
488 service: S,
489}
490
491impl<S, Args, Ctx, IdType> Service<Task<Args, Ctx, IdType>> for TrackerService<S>
492where
493 S: Service<Task<Args, Ctx, IdType>>,
494{
495 type Response = S::Response;
496 type Error = S::Error;
497 type Future = Tracked<AttemptOnPollFuture<S::Future>>;
498
499 fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
500 self.service.poll_ready(cx)
501 }
502
503 fn call(&mut self, task: Task<Args, Ctx, IdType>) -> Self::Future {
504 let attempt = task.parts.attempt.clone();
505 self.ctx.track(AttemptOnPollFuture {
506 attempt,
507 fut: self.service.call(task),
508 polled: false,
509 })
510 }
511}
512
513#[pin_project::pin_project]
515#[derive(Debug)]
516pub struct AttemptOnPollFuture<Fut> {
517 attempt: Attempt,
518 #[pin]
519 fut: Fut,
520 polled: bool,
521}
522
523impl<Fut> AttemptOnPollFuture<Fut> {
524 pub fn new(attempt: Attempt, fut: Fut) -> Self {
526 Self {
527 attempt,
528 fut,
529 polled: false,
530 }
531 }
532}
533
534impl<Fut: Future> Future for AttemptOnPollFuture<Fut> {
535 type Output = Fut::Output;
536
537 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
538 let mut this = self.project();
539 if !(*this.polled) {
540 *this.polled = true;
541 this.attempt.increment();
542 }
543 this.fut.poll_unpin(cx)
544 }
545}
546
547#[derive(Debug, Clone)]
549struct ReadinessLayer {
550 ctx: WorkerContext,
551}
552
553impl ReadinessLayer {
554 fn new(ctx: WorkerContext) -> Self {
555 Self { ctx }
556 }
557}
558
559impl<S> Layer<S> for ReadinessLayer {
560 type Service = ReadinessService<S>;
561
562 fn layer(&self, inner: S) -> Self::Service {
563 ReadinessService {
564 inner,
565 ctx: self.ctx.clone(),
566 }
567 }
568}
569#[derive(Debug, Clone)]
573pub struct ReadinessService<S> {
574 inner: S,
575 ctx: WorkerContext,
576}
577
578impl<S, Request> Service<Request> for ReadinessService<S>
579where
580 S: Service<Request>,
581{
582 type Response = S::Response;
583 type Error = S::Error;
584 type Future = S::Future;
585
586 fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
587 if self.ctx.is_shutting_down() {
588 self.ctx.is_ready.store(false, Ordering::SeqCst);
589 return Poll::Pending;
590 }
591 let result = self.inner.poll_ready(cx);
593 match &result {
595 Poll::Ready(Ok(_)) => self.ctx.is_ready.store(true, Ordering::SeqCst),
596 Poll::Pending | Poll::Ready(Err(_)) => self.ctx.is_ready.store(false, Ordering::SeqCst),
597 }
598
599 result
600 }
601
602 fn call(&mut self, req: Request) -> Self::Future {
603 self.inner.call(req)
604 }
605}
606
607#[cfg(test)]
608mod tests {
609 use std::{
610 future::ready,
611 ops::Deref,
612 sync::{Arc, atomic::AtomicUsize},
613 time::Duration,
614 };
615
616 use futures_channel::mpsc::SendError;
617 use futures_core::future::BoxFuture;
618
619 use crate::{
620 backend::{TaskSink, json::JsonStorage, memory::MemoryStorage},
621 task::Parts,
622 worker::{
623 builder::WorkerBuilder,
624 ext::{
625 ack::{Acknowledge, AcknowledgementExt},
626 circuit_breaker::CircuitBreaker,
627 event_listener::EventListenerExt,
628 long_running::LongRunningExt,
629 },
630 },
631 };
632
633 use super::*;
634
635 const ITEMS: u32 = 100;
636
637 #[tokio::test]
638 async fn basic_worker_run() {
639 let mut json_store = JsonStorage::new_temp().unwrap();
640 for i in 0..ITEMS {
641 json_store.push(i).await.unwrap();
642 }
643
644 #[derive(Clone, Debug, Default)]
645 struct Count(Arc<AtomicUsize>);
646
647 impl Deref for Count {
648 type Target = Arc<AtomicUsize>;
649 fn deref(&self) -> &Self::Target {
650 &self.0
651 }
652 }
653
654 async fn task(
655 task: u32,
656 count: Data<Count>,
657 ctx: WorkerContext,
658 ) -> Result<(), BoxDynError> {
659 tokio::time::sleep(Duration::from_secs(1)).await;
660 count.fetch_add(1, Ordering::Relaxed);
661 if task == ITEMS - 1 {
662 ctx.stop().unwrap();
663 return Err("Worker stopped!")?;
664 }
665 Ok(())
666 }
667
668 #[derive(Debug, Clone)]
669 struct MyAcknowledger;
670
671 impl<Ctx: Debug, IdType: Debug> Acknowledge<(), Ctx, IdType> for MyAcknowledger {
672 type Error = SendError;
673 type Future = BoxFuture<'static, Result<(), SendError>>;
674 fn ack(
675 &mut self,
676 res: &Result<(), BoxDynError>,
677 parts: &Parts<Ctx, IdType>,
678 ) -> Self::Future {
679 println!("{res:?}, {parts:?}");
680 ready(Ok(())).boxed()
682 }
683 }
684
685 let worker = WorkerBuilder::new("rango-tango")
686 .backend(json_store)
687 .data(Count::default())
688 .break_circuit()
689 .long_running()
690 .ack_with(MyAcknowledger)
691 .on_event(|ctx, ev| {
692 println!("On Event = {:?} from {}", ev, ctx.name());
693 })
694 .build(task);
695 worker.run().await.unwrap();
696 }
697
698 #[tokio::test]
699 async fn basic_worker_stream() {
700 let mut in_memory = MemoryStorage::new();
701
702 for i in 0..ITEMS {
703 in_memory.push(i).await.unwrap();
704 }
705
706 #[derive(Clone, Debug, Default)]
707 struct Count(Arc<AtomicUsize>);
708
709 impl Deref for Count {
710 type Target = Arc<AtomicUsize>;
711 fn deref(&self) -> &Self::Target {
712 &self.0
713 }
714 }
715
716 async fn task(task: u32, count: Data<Count>, worker: WorkerContext) {
717 tokio::time::sleep(Duration::from_secs(1)).await;
718 count.fetch_add(1, Ordering::Relaxed);
719 if task == ITEMS - 1 {
720 worker.stop().unwrap();
721 }
722 }
723 let worker = WorkerBuilder::new("rango-tango")
724 .backend(in_memory)
725 .data(Count::default())
726 .break_circuit()
727 .long_running()
728 .on_event(|ctx, ev| {
729 println!("CTX {:?}, On Event = {:?}", ctx.name(), ev);
730 })
731 .build(task);
732 let mut event_stream = worker.stream();
733 while let Some(Ok(ev)) = event_stream.next().await {
734 println!("On Event = {:?}", ev);
735 }
736 }
737
738 #[tokio::test]
739 async fn with_shutdown_signal() {
740 let mut in_memory = MemoryStorage::new();
741 for i in 0..ITEMS {
742 in_memory.push(i).await.unwrap();
743 }
744
745 async fn task(_: u32) -> Result<(), BoxDynError> {
746 Ok(())
747 }
748
749 let worker = WorkerBuilder::new("rango-tango")
750 .backend(in_memory)
751 .on_event(|ctx, ev| {
752 println!("On Event = {:?} from {}", ev, ctx.name());
753 })
754 .build(task);
755 let signal = async {
756 let ctrl_c = tokio::signal::ctrl_c().map_err(|e| e.into());
757 let timeout = tokio::time::sleep(Duration::from_secs(5))
758 .map(|_| Err::<(), WorkerError>(WorkerError::GracefulExit));
759 let _ = futures_util::try_join!(ctrl_c, timeout)?;
760 Ok::<(), WorkerError>(())
761 };
762 let res = worker.run_until(signal).await;
763 match res {
764 Err(WorkerError::GracefulExit) => {
765 println!("Worker exited gracefully");
766 }
767 _ => panic!("Expected graceful exit error"),
768 }
769 }
770}