kafru/
manager.rs

1
2use crate::scheduler::Scheduler;
3use crate::task::TaskRegistry;
4use std::sync::Arc;
5use crate::worker::Worker;
6use tokio::task::JoinSet;
7use crate::database::Db;
8
9#[derive(Debug)]
10pub struct Manager {
11    pub join_set: JoinSet<()>,
12    pub server: String,
13    pub author: String
14}
15
16impl Manager {
17
18    /// Initializes the `Manager` struct, enabling the use of worker and scheduler functionalities for task management.
19    pub async fn new(server: String, author: String) -> Self {
20        Self {
21            join_set: JoinSet::new(),
22            server,
23            author
24        }
25    }
26
27    /// Launches a worker that polls the task queue and executes tasks asynchronously.
28    /// 
29    /// This function spawns a worker in the background using the provided parameters.
30    /// The worker monitors the task queue, executes tasks using the handlers in the task registry,
31    /// and continues polling based on the specified interval.
32    /// 
33    /// # Parameters
34    /// 
35    /// - `queue_name`:
36    ///   The name of the queue/worker to monitor and process tasks.
37    /// - `num_threads`:
38    ///   The number of threads to spawn for concurrent task execution.
39    /// - `task_registry`:
40    ///   An `Arc`-wrapped [`TaskRegistry`] containing the handlers for task execution.
41    /// - `poll_interval`:
42    ///   The duration, in seconds, to pause between polling cycles.
43    /// - `db`:
44    ///   An optional `Arc`-wrapped database instance for use by the worker.
45    /// 
46    /// # Returns
47    /// 
48    /// This function returns a `Result`:
49    /// 
50    /// - `Ok(())` if the worker was successfully launched.
51    /// - `Err(String)` if an error occurs during the worker setup.
52    pub async fn worker(&mut self, queue_name: String, num_threads: usize, task_registry: Arc<TaskRegistry>, poll_interval: u64, db: Option<Arc<Db>>) -> Result<(),String> {
53        let server: String = self.server.clone();
54        let author: String = self.author.clone();
55        self.join_set.spawn( async move {
56            let worker  = Worker::new(db.clone(),server,author).await;
57            let result = worker.watch(task_registry,num_threads, Some(queue_name),Some(poll_interval)).await;
58            assert!(result.is_ok(),"{:?}",result.unwrap_err());
59        });
60        Ok(())
61    }
62
63    /// Launches the scheduler that polls for scheduled tasks and pushes them to the queue for execution.
64    ///
65    /// This function runs a scheduler in the background, periodically polling for tasks
66    /// that are due for execution. The scheduler identifies tasks based on the provided
67    /// scheduler name and then enqueues them for processing. The polling interval determines
68    /// how often the scheduler checks for tasks.
69    ///
70    /// # Parameters
71    ///
72    /// - `scheduler_name`:
73    ///   A string representing the name of the scheduler. This is used to identify the
74    ///   scheduler instance and may correspond to a specific set of tasks.
75    /// - `poll_interval`:
76    ///   The duration, in seconds, that the scheduler will pause between polling cycles.
77    ///   A lower value increases polling frequency but may use more resources.
78    /// - `db`:
79    ///   An optional `Arc`-wrapped database instance used for retrieving scheduled tasks
80    ///   and their associated data.
81    ///
82    /// # Returns
83    ///
84    /// This function returns a `Result`:
85    ///
86    /// - `Ok(())` if the scheduler was successfully launched and is polling as expected.
87    /// - `Err(String)` if an error occurs during the setup or execution of the scheduler.
88    pub async fn scheduler(&mut self, scheduler_name: String, poll_interval: u64, db: Option<Arc<Db>>) -> Result<(),String> {
89        let server: String = self.server.clone();
90        let author: String = self.author.clone();
91        self.join_set.spawn( async move {
92            let scheduler  = Scheduler::new(db.clone(),server,author).await;
93            let _ = scheduler.watch(Some(scheduler_name), Some(poll_interval)).await;
94        });
95        Ok(())
96    }
97
98    /// Waits for the scheduler and worker to complete their tasks.
99    pub async fn wait(self) -> Vec<()> {
100        self.join_set.join_all().await
101    }
102
103}