1use crate::backend::Backend;
89use crate::error::{BoxDynError, WorkerError};
90use crate::monitor::shutdown::Shutdown;
91use crate::task::Task;
92use crate::task::attempt::Attempt;
93use crate::task::data::Data;
94use crate::worker::call_all::{CallAllError, CallAllUnordered};
95use crate::worker::context::{Tracked, WorkerContext};
96use crate::worker::event::Event;
97use futures_core::stream::BoxStream;
98use futures_util::{Future, FutureExt, Stream, StreamExt};
99use std::fmt::Debug;
100use std::fmt::{self};
101use std::marker::PhantomData;
102use std::pin::Pin;
103use std::sync::atomic::Ordering;
104use std::task::{Context, Poll};
105use tower_layer::{Layer, Stack};
106use tower_service::Service;
107
108pub mod builder;
109pub mod call_all;
110pub mod context;
111pub mod event;
112pub mod ext;
113mod state;
114pub mod test_worker;
115
116#[must_use = "Workers must be run or streamed to execute tasks"]
147pub struct Worker<Args, Ctx, Backend, Svc, Middleware> {
148 pub(crate) name: String,
149 pub(crate) backend: Backend,
150 pub(crate) service: Svc,
151 pub(crate) middleware: Middleware,
152 pub(crate) task_marker: PhantomData<(Args, Ctx)>,
153 pub(crate) shutdown: Option<Shutdown>,
154 pub(crate) event_handler: Box<dyn Fn(&WorkerContext, &Event) + Send + Sync>,
155}
156
157impl<Args, Ctx, B, Svc, Middleware> fmt::Debug for Worker<Args, Ctx, B, Svc, Middleware>
158where
159 Svc: fmt::Debug,
160 B: fmt::Debug,
161{
162 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163 f.debug_struct("Worker")
164 .field("service", &self.service)
165 .field("backend", &self.backend)
166 .finish()
167 }
168}
169
170impl<Args, Ctx, B, Svc, M> Worker<Args, Ctx, B, Svc, M> {
171 pub fn new(name: String, backend: B, service: Svc, layers: M) -> Self {
173 Worker {
174 name,
175 backend,
176 service,
177 middleware: layers,
178 task_marker: PhantomData,
179 shutdown: None,
180 event_handler: Box::new(|_, _| {}),
181 }
182 }
183}
184
185impl<Args, S, B, M> Worker<Args, B::Context, B, S, M>
186where
187 B: Backend<Args>,
188 S: Service<Task<Args, B::Context, B::IdType>> + Send + 'static,
189 B::Stream: Unpin + Send + 'static,
190 B::Beat: Unpin + Send + 'static,
191 Args: Send + 'static,
192 B::Context: Send + 'static,
193 B::Error: Into<BoxDynError> + Send + 'static,
194 M: Layer<ReadinessService<TrackerService<S>>>,
195 B::Layer: Layer<M::Service>,
196 <B::Layer as Layer<M::Service>>::Service:
197 Service<Task<Args, B::Context, B::IdType>> + Send + 'static,
198 <<B::Layer as Layer<M::Service>>::Service as Service<Task<Args, B::Context, B::IdType>>>::Error:
199 Into<BoxDynError> + Send + Sync + 'static,
200 <<B::Layer as Layer<M::Service>>::Service as Service<Task<Args, B::Context, B::IdType>>>::Future:
201 Send,
202 M::Service: Service<Task<Args, B::Context, B::IdType>> + Send + 'static,
203 <<M as Layer<ReadinessService<TrackerService<S>>>>::Service as Service<
204 Task<Args, B::Context, B::IdType>,
205 >>::Future: Send,
206 <<M as Layer<ReadinessService<TrackerService<S>>>>::Service as Service<
207 Task<Args, B::Context, B::IdType>,
208 >>::Error: Into<BoxDynError> + Send + Sync + 'static,
209 B::IdType: Send + 'static,
210{
211 pub async fn run(self) -> Result<(), WorkerError> {
240 let mut ctx = WorkerContext::new::<<B::Layer as Layer<M::Service>>::Service>(&self.name);
241 self.run_with_ctx(&mut ctx).await
242 }
243
244 pub async fn run_with_ctx(self, ctx: &mut WorkerContext) -> Result<(), WorkerError> {
248 let mut stream = self.stream_with_ctx(ctx);
249 while let Some(res) = stream.next().await {
250 match res {
251 Ok(_) => continue,
252 Err(WorkerError::GracefulExit) => return Ok(()),
253 Err(e) => return Err(e),
254 }
255 }
256 Ok(())
257 }
258
259 pub fn stream(self) -> impl Stream<Item = Result<Event, WorkerError>> + use<Args, S, B, M> {
289 let mut ctx = WorkerContext::new::<<B::Layer as Layer<M::Service>>::Service>(&self.name);
290 self.stream_with_ctx(&mut ctx)
291 }
292
293 pub fn stream_with_ctx(
298 self,
299 ctx: &mut WorkerContext,
300 ) -> impl Stream<Item = Result<Event, WorkerError>> + use<Args, S, B, M> {
301 let backend = self.backend;
302 let event_handler = self.event_handler;
303 ctx.wrap_listener(event_handler);
304 let worker = ctx.clone();
305 let inner_layers = backend.middleware();
306 struct ServiceBuilder<L> {
307 layer: L,
308 }
309
310 impl<L> ServiceBuilder<L> {
311 fn layer<T>(self, layer: T) -> ServiceBuilder<Stack<T, L>> {
312 ServiceBuilder {
313 layer: Stack::new(layer, self.layer),
314 }
315 }
316 fn service<S>(&self, service: S) -> L::Service
317 where
318 L: Layer<S>,
319 {
320 self.layer.layer(service)
321 }
322 }
323 let svc = ServiceBuilder {
324 layer: Data::new(worker.clone()),
325 };
326 let service = svc
327 .layer(inner_layers)
328 .layer(self.middleware)
329 .layer(ReadinessLayer::new(worker.clone()))
330 .layer(TrackerLayer::new(worker.clone()))
331 .service(self.service);
332 let heartbeat = backend.heartbeat(&worker).map(|res| match res {
333 Ok(_) => Ok(Event::HeartBeat),
334 Err(e) => Err(WorkerError::HeartbeatError(e.into())),
335 });
336
337 let stream = backend.poll(&worker);
338
339 let tasks = Self::poll_tasks(service, stream);
340 let mut w = worker.clone();
341 let mut ww = w.clone();
342 let starter: BoxStream<'static, _> = futures_util::stream::once(async move {
343 if !ww.is_running() {
344 ww.start()?;
345 }
346 Ok(None)
347 })
348 .filter_map(|res: Result<Option<Event>, WorkerError>| async move {
349 match res {
350 Ok(_) => None,
351 Err(e) => Some(Err(e)),
352 }
353 })
354 .boxed();
355 let wait_for_exit: BoxStream<'static, _> = futures_util::stream::once(async move {
356 match worker.await {
357 Ok(_) => Err(WorkerError::GracefulExit),
358 Err(e) => Err(e),
359 }
360 })
361 .boxed();
362 let work_stream =
363 futures_util::stream_select!(wait_for_exit, heartbeat, tasks).map(move |res| {
364 if let Ok(e) = &res {
365 w.emit(e);
366 }
367 res
368 });
369 starter.chain(work_stream)
370 }
371 fn poll_tasks<Svc, Stm, E, Ctx>(
372 service: Svc,
373 stream: Stm,
374 ) -> BoxStream<'static, Result<Event, WorkerError>>
375 where
376 Svc: Service<Task<Args, Ctx, B::IdType>> + Send + 'static,
377 Stm: Stream<Item = Result<Option<Task<Args, Ctx, B::IdType>>, E>> + Send + Unpin + 'static,
378 Args: Send + 'static,
379 Svc::Future: Send,
380 Ctx: Send + 'static,
381 Svc::Error: Into<BoxDynError> + Sync + Send,
382 E: Into<BoxDynError> + Send + 'static,
383 {
384 let stream = CallAllUnordered::new(service, stream).map(|r| match r {
385 Ok(Some(_)) => Ok(Event::Success),
386 Ok(None) => Ok(Event::Idle),
387 Err(CallAllError::ServiceError(err)) => Ok(Event::Error(err.into().into())),
388 Err(CallAllError::StreamError(err)) => Err(WorkerError::StreamError(err)),
389 });
390 stream.boxed()
391 }
392}
393
394#[derive(Debug, Clone)]
395struct TrackerLayer {
396 ctx: WorkerContext,
397}
398
399impl TrackerLayer {
400 fn new(ctx: WorkerContext) -> Self {
401 Self { ctx }
402 }
403}
404
405impl<S> Layer<S> for TrackerLayer {
406 type Service = TrackerService<S>;
407
408 fn layer(&self, service: S) -> Self::Service {
409 TrackerService {
410 ctx: self.ctx.clone(),
411 service,
412 }
413 }
414}
415#[derive(Debug, Clone)]
417pub struct TrackerService<S> {
418 ctx: WorkerContext,
419 service: S,
420}
421
422impl<S, Args, Ctx, IdType> Service<Task<Args, Ctx, IdType>> for TrackerService<S>
423where
424 S: Service<Task<Args, Ctx, IdType>>,
425{
426 type Response = S::Response;
427 type Error = S::Error;
428 type Future = Tracked<AttemptOnPollFuture<S::Future>>;
429
430 fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
431 self.service.poll_ready(cx)
432 }
433
434 fn call(&mut self, task: Task<Args, Ctx, IdType>) -> Self::Future {
435 let attempt = task.parts.attempt.clone();
436 self.ctx.track(AttemptOnPollFuture {
437 attempt,
438 fut: self.service.call(task),
439 polled: false,
440 })
441 }
442}
443
444#[pin_project::pin_project]
446#[derive(Debug)]
447pub struct AttemptOnPollFuture<Fut> {
448 attempt: Attempt,
449 #[pin]
450 fut: Fut,
451 polled: bool,
452}
453
454impl<Fut> AttemptOnPollFuture<Fut> {
455 pub fn new(attempt: Attempt, fut: Fut) -> Self {
457 Self {
458 attempt,
459 fut,
460 polled: false,
461 }
462 }
463}
464
465impl<Fut: Future> Future for AttemptOnPollFuture<Fut> {
466 type Output = Fut::Output;
467
468 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
469 let mut this = self.project();
470 if *this.polled == false {
471 *this.polled = true;
472 this.attempt.increment();
473 }
474 this.fut.poll_unpin(cx)
475 }
476}
477
478#[derive(Debug, Clone)]
480struct ReadinessLayer {
481 ctx: WorkerContext,
482}
483
484impl ReadinessLayer {
485 fn new(ctx: WorkerContext) -> Self {
486 Self { ctx }
487 }
488}
489
490impl<S> Layer<S> for ReadinessLayer {
491 type Service = ReadinessService<S>;
492
493 fn layer(&self, inner: S) -> Self::Service {
494 ReadinessService {
495 inner,
496 ctx: self.ctx.clone(),
497 }
498 }
499}
500#[derive(Debug, Clone)]
504pub struct ReadinessService<S> {
505 inner: S,
506 ctx: WorkerContext,
507}
508
509impl<S, Request> Service<Request> for ReadinessService<S>
510where
511 S: Service<Request>,
512{
513 type Response = S::Response;
514 type Error = S::Error;
515 type Future = S::Future;
516
517 fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
518 let result = self.inner.poll_ready(cx);
520 match &result {
522 Poll::Ready(Ok(_)) => self.ctx.is_ready.store(true, Ordering::SeqCst),
523 Poll::Pending | Poll::Ready(Err(_)) => self.ctx.is_ready.store(false, Ordering::SeqCst),
524 }
525
526 result
527 }
528
529 fn call(&mut self, req: Request) -> Self::Future {
530 self.inner.call(req)
531 }
532}
533
534#[cfg(test)]
535mod tests {
536 use std::{
537 future::ready,
538 ops::Deref,
539 sync::{Arc, atomic::AtomicUsize},
540 time::Duration,
541 };
542
543 use futures_channel::mpsc::SendError;
544 use futures_core::future::BoxFuture;
545
546 use crate::{
547 backend::{TaskSink, json::JsonStorage, memory::MemoryStorage},
548 task::Parts,
549 worker::{
550 builder::WorkerBuilder,
551 ext::{
552 ack::{Acknowledge, AcknowledgementExt},
553 circuit_breaker::CircuitBreaker,
554 event_listener::EventListenerExt,
555 long_running::LongRunningExt,
556 },
557 },
558 };
559
560 use super::*;
561
562 const ITEMS: u32 = 100;
563
564 #[tokio::test]
565 async fn basic_worker_run() {
566 let mut json_store = JsonStorage::new_temp().unwrap();
567 for i in 0..ITEMS {
568 json_store.push(i).await.unwrap();
569 }
570
571 #[derive(Clone, Debug, Default)]
572 struct Count(Arc<AtomicUsize>);
573
574 impl Deref for Count {
575 type Target = Arc<AtomicUsize>;
576 fn deref(&self) -> &Self::Target {
577 &self.0
578 }
579 }
580
581 async fn task(
582 task: u32,
583 count: Data<Count>,
584 ctx: WorkerContext,
585 ) -> Result<(), BoxDynError> {
586 tokio::time::sleep(Duration::from_secs(1)).await;
587 count.fetch_add(1, Ordering::Relaxed);
588 if task == ITEMS - 1 {
589 ctx.stop().unwrap();
590 return Err("Worker stopped!")?;
591 }
592 Ok(())
593 }
594
595 #[derive(Debug, Clone)]
596 struct MyAcknowledger;
597
598 impl<Ctx: Debug, IdType: Debug> Acknowledge<(), Ctx, IdType> for MyAcknowledger {
599 type Error = SendError;
600 type Future = BoxFuture<'static, Result<(), SendError>>;
601 fn ack(
602 &mut self,
603 res: &Result<(), BoxDynError>,
604 parts: &Parts<Ctx, IdType>,
605 ) -> Self::Future {
606 println!("{res:?}, {parts:?}");
607 ready(Ok(())).boxed()
609 }
610 }
611
612 let worker = WorkerBuilder::new("rango-tango")
613 .backend(json_store)
614 .data(Count::default())
615 .break_circuit()
616 .long_running()
617 .ack_with(MyAcknowledger)
618 .on_event(|ctx, ev| {
619 println!("On Event = {:?} from {}", ev, ctx.name());
620 })
621 .build(task);
622 worker.run().await.unwrap();
623 }
624
625 #[tokio::test]
626 async fn basic_worker_stream() {
627 let mut in_memory = MemoryStorage::new();
628
629 for i in 0..ITEMS {
630 in_memory.push(i).await.unwrap();
631 }
632
633 #[derive(Clone, Debug, Default)]
634 struct Count(Arc<AtomicUsize>);
635
636 impl Deref for Count {
637 type Target = Arc<AtomicUsize>;
638 fn deref(&self) -> &Self::Target {
639 &self.0
640 }
641 }
642
643 async fn task(task: u32, count: Data<Count>, worker: WorkerContext) {
644 tokio::time::sleep(Duration::from_secs(1)).await;
645 count.fetch_add(1, Ordering::Relaxed);
646 if task == ITEMS - 1 {
647 worker.stop().unwrap();
648 }
649 }
650 let worker = WorkerBuilder::new("rango-tango")
651 .backend(in_memory)
652 .data(Count::default())
653 .break_circuit()
654 .long_running()
655 .on_event(|ctx, ev| {
656 println!("CTX {:?}, On Event = {:?}", ctx.name(), ev);
657 })
658 .build(task);
659 let mut event_stream = worker.stream();
660 while let Some(Ok(ev)) = event_stream.next().await {
661 println!("On Event = {:?}", ev);
662 }
663 }
664}