1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use std::{error::Error, fmt::Debug};

use futures::{Future, Stream, StreamExt};

use tower::ServiceBuilder;
use tower::{Service, ServiceExt};
use tracing::info;
use tracing::warn;

use crate::context::HasJobContext;
use crate::executor::Executor;
use crate::job::Job;
#[cfg(feature = "extensions")]
use crate::layers::extensions::Extension;
use crate::utils::Timer;

use super::HeartBeat;
use super::WorkerId;
use super::{Worker, WorkerContext, WorkerError};
use futures::future::FutureExt;
use std::fmt::Formatter;

/// A worker that is ready to consume jobs
pub struct ReadyWorker<Stream, Service> {
    pub(crate) id: WorkerId,
    pub(crate) stream: Stream,
    pub(crate) service: Service,
    pub(crate) beats: Vec<Box<dyn HeartBeat + Send>>,
}

impl<Stream, Service> Debug for ReadyWorker<Stream, Service> {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("ReadyWorker")
            .field("name", &self.id)
            .field("stream", &std::any::type_name::<Stream>())
            .field("service", &std::any::type_name::<Service>())
            .finish()
    }
}

#[async_trait::async_trait]
impl<
        Strm: Unpin + Send + Stream<Item = Result<Option<Req>, E>> + 'static,
        Serv: Service<Req, Future = Fut> + Send + 'static,
        J: Job + 'static,
        E: 'static + Send + Error + Sync,
        Req: Send + HasJobContext,
        Fut: Future + Send + 'static,
    > Worker<J> for ReadyWorker<Strm, Serv>
where
    <Serv as Service<Req>>::Error: Debug,
{
    type Service = Serv;
    type Source = Strm;

    fn id(&self) -> WorkerId {
        self.id.clone()
    }
    async fn start<Exec: Executor + Send + Sync + 'static>(
        self,
        ctx: WorkerContext<Exec>,
    ) -> Result<(), WorkerError> {
        #[cfg(feature = "extensions")]
        let mut service = ServiceBuilder::new()
            .layer(Extension(ctx.clone()))
            .service(self.service);
        #[cfg(not(feature = "extensions"))]
        let mut service = self.service;
        let mut stream = ctx.shutdown.graceful_stream(self.stream);
        let (send, mut recv) = futures::channel::mpsc::channel::<()>(1);
        // Setup any heartbeats by the worker
        for mut beat in self.beats {
            ctx.executor.spawn(async move {
                #[cfg(feature = "async-std-comp")]
                #[allow(unused_variables)]
                let sleeper = crate::utils::timer::AsyncStdTimer;
                #[cfg(feature = "tokio-comp")]
                let sleeper = crate::utils::timer::TokioTimer;
                loop {
                    let interval = beat.interval();
                    beat.heart_beat().await;
                    sleeper.sleep(interval).await;
                }
            });
        }
        while let Some(res) = futures::select! {
            res = stream.next().fuse() => res,
            _ = ctx.shutdown.clone().fuse() => None
        } {
            let send_clone = send.clone();
            match res {
                Ok(Some(item)) => {
                    let svc = service
                        .ready()
                        .await
                        .map_err(|e| WorkerError::ServiceError(format!("{e:?}")))?;
                    let fut = svc.call(item);
                    ctx.spawn(async move {
                        fut.await;
                    });
                    drop(send_clone);
                }
                Err(e) => {
                    warn!("Error processing stream {e}");
                    drop(send_clone);
                }
                _ => {
                    drop(send_clone);
                }
            }
        }
        drop(send);
        info!("Shutting down {} worker", self.id);
        let _ = recv.next().await;
        info!("Shutdown {} worker successfully", self.id);
        Ok(())
    }
}