1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
use crate::queue::Queueable;
use crate::runnable::COMMON_TYPE;
use crate::worker::Worker;
use crate::FangError;
use crate::RetentionMode;
use crate::SleepParams;
use log::error;
use log::info;
use std::thread;
use typed_builder::TypedBuilder;

#[derive(Clone, TypedBuilder)]
pub struct WorkerPool<BQueue>
where
    BQueue: Queueable + Clone + Sync + Send + 'static,
{
    /// the AsyncWorkerPool uses a queue to control the tasks that will be executed.
    #[builder(setter(into))]
    pub queue: BQueue,
    /// sleep_params controls how much time a worker will sleep while waiting for tasks
    /// execute.
    #[builder(setter(into), default)]
    pub sleep_params: SleepParams,
    /// retention_mode controls if  tasks should be persisted after execution
    #[builder(setter(into), default)]
    pub retention_mode: RetentionMode,
    /// the number of workers of the AsyncWorkerPool.
    #[builder(setter(into))]
    pub number_of_workers: u32,
    /// The type of tasks that will be executed by `AsyncWorkerPool`.
    #[builder(setter(into), default=COMMON_TYPE.into())]
    pub task_type: String,
}

#[derive(Clone, TypedBuilder)]
pub struct WorkerThread<BQueue>
where
    BQueue: Queueable + Clone + Sync + Send + 'static,
{
    pub name: String,
    pub restarts: u64,
    pub worker_pool: WorkerPool<BQueue>,
}

#[derive(Clone)]
pub struct WorkerParams {
    pub retention_mode: Option<RetentionMode>,
    pub sleep_params: Option<SleepParams>,
    pub task_type: Option<String>,
}

impl<BQueue> WorkerPool<BQueue>
where
    BQueue: Queueable + Clone + Sync + Send + 'static,
{
    /// Starts the configured number of workers
    /// This is necessary in order to execute tasks.
    pub fn start(&mut self) -> Result<(), FangError> {
        for idx in 1..self.number_of_workers + 1 {
            let name = format!("worker_{}{idx}", self.task_type);

            let worker_thread = WorkerThread::builder()
                .name(name.clone())
                .restarts(0)
                .worker_pool(self.clone())
                .build();

            worker_thread.spawn()?;
        }
        Ok(())
    }
}

impl<BQueue> WorkerThread<BQueue>
where
    BQueue: Queueable + Clone + Sync + Send + 'static,
{
    fn spawn(self) -> Result<(), FangError> {
        info!(
            "starting a worker thread {}, number of restarts {}",
            self.name, self.restarts
        );

        let builder = thread::Builder::new().name(self.name.clone());

        builder
            .spawn(move || {
                let mut worker: Worker<BQueue> = Worker::builder()
                    .queue(self.worker_pool.queue.clone())
                    .task_type(self.worker_pool.task_type.clone())
                    .retention_mode(self.worker_pool.retention_mode.clone())
                    .sleep_params(self.worker_pool.sleep_params.clone())
                    .build();

                // Run worker
                if let Err(error) = worker.run_tasks() {
                    error!(
                        "Error executing tasks in worker '{}': {:?}",
                        self.name, error
                    );
                }
            })
            .map_err(FangError::from)?;

        Ok(())
    }
}

impl<BQueue> Drop for WorkerThread<BQueue>
where
    BQueue: Queueable + Clone + Sync + Send + 'static,
{
    fn drop(&mut self) {
        self.restarts += 1;

        error!(
            "Worker {} stopped. Restarting. The number of restarts {}",
            self.name, self.restarts,
        );

        self.clone().spawn().unwrap();
    }
}