1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
/// Represents a worker that is ready to consume jobs
pub mod ready;
use crate::executor::Executor;
use async_trait::async_trait;
use futures::Future;
use graceful_shutdown::Shutdown;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::fmt::{self, Display};
use std::time::Duration;
use thiserror::Error;

/// A worker name wrapper usually used by Worker builder
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct WorkerId {
    name: String,
}

impl Display for WorkerId {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.write_str(self.name())
    }
}

impl WorkerId {
    /// Build a new worker ref
    pub fn new<T: AsRef<str>>(name: T) -> Self {
        Self {
            name: name.as_ref().to_string(),
        }
    }
    /// Get the name of the worker
    pub fn name(&self) -> &str {
        &self.name
    }
}

/// Possible errors that can occur when starting a worker.
#[derive(Error, Debug)]
pub enum WorkerError {
    /// An error occurred while processing a job.
    #[error("Failed to process job: {0}")]
    JobProcessingError(String),
    /// An error occurred in the worker's service.
    #[error("Service error: {0}")]
    ServiceError(String),
    /// An error occurred while trying to start the worker.
    #[error("Failed to start worker: {0}")]
    StartError(String),
}
/// The `Worker` trait represents a type that can execute jobs. It is used
/// to define workers that can be managed by the `Monitor`.
///
/// Each `Worker` implementation must define a `start` method that takes a
/// `WorkerContext` and returns a `Result` indicating whether the worker
/// was able to execute its jobs successfully or not.
#[async_trait]
pub trait Worker<Job>: Sized {
    /// The [tower] service type that this worker will use to execute jobs.
    type Service;

    /// The source type that this worker will use to receive jobs.
    type Source;

    /// A worker must be identifiable and unique
    fn id(&self) -> WorkerId;

    /// Starts the worker, taking ownership of `self` and the provided `ctx`.
    ///
    /// This method should run indefinitely or until it returns an error.
    /// If an error occurs, it should return a `WorkerError` describing
    /// the reason for the failure.
    async fn start<E: Executor + Send + Sync + 'static>(
        self,
        ctx: WorkerContext<E>,
    ) -> Result<(), WorkerError>;
}

/// Stores the Workers context
#[derive(Clone)]
pub struct WorkerContext<E: Executor> {
    pub(crate) shutdown: Shutdown,
    pub(crate) executor: E,
    pub(crate) worker_id: WorkerId,
}

impl<E: Executor> fmt::Debug for WorkerContext<E> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("WorkerContext")
            .field("shutdown", &["Shutdown handle"])
            .field("worker_id", &self.worker_id)
            .finish()
    }
}

impl<E: Executor + Send> WorkerContext<E> {
    /// Get the Worker ID
    pub fn id(&self) -> WorkerId {
        self.worker_id.clone()
    }
    /// Allows spawning of futures that will be gracefully shutdown by the worker
    pub fn spawn(&self, future: impl Future<Output = ()> + Send + 'static) {
        self.executor.spawn(self.shutdown.graceful(future));
    }

    /// Calling this function triggers shutting down the worker
    pub fn shutdown(&self) {}
}

/// A worker can have heartbeats to keep alive or enqueue new jobs
#[async_trait::async_trait]
pub trait HeartBeat {
    /// The future of a single beat
    async fn heart_beat(&mut self);
    /// The interval for each beat to be called
    fn interval(&self) -> Duration;
}