kafru/manager.rs
1
2use crate::scheduler::Scheduler;
3use crate::task::TaskRegistry;
4use std::sync::Arc;
5use crate::worker::Worker;
6use tokio::task::JoinSet;
7use crate::database::Db;
8
9#[derive(Debug)]
10pub struct Manager {
11 pub join_set: JoinSet<()>,
12 pub server: String,
13 pub author: String
14}
15
16impl Manager {
17
18 /// Initializes the `Manager` struct, enabling the use of worker and scheduler functionalities for task management.
19 pub async fn new(server: String, author: String) -> Self {
20 Self {
21 join_set: JoinSet::new(),
22 server,
23 author
24 }
25 }
26
27 /// Launches a worker that polls the task queue and executes tasks asynchronously.
28 ///
29 /// This function spawns a worker in the background using the provided parameters.
30 /// The worker monitors the task queue, executes tasks using the handlers in the task registry,
31 /// and continues polling based on the specified interval.
32 ///
33 /// # Parameters
34 ///
35 /// - `queue_name`:
36 /// The name of the queue/worker to monitor and process tasks.
37 /// - `num_threads`:
38 /// The number of threads to spawn for concurrent task execution.
39 /// - `task_registry`:
40 /// An `Arc`-wrapped [`TaskRegistry`] containing the handlers for task execution.
41 /// - `poll_interval`:
42 /// The duration, in seconds, to pause between polling cycles.
43 /// - `db`:
44 /// An optional `Arc`-wrapped database instance for use by the worker.
45 ///
46 /// # Returns
47 ///
48 /// This function returns a `Result`:
49 ///
50 /// - `Ok(())` if the worker was successfully launched.
51 /// - `Err(String)` if an error occurs during the worker setup.
52 pub async fn worker(&mut self, queue_name: String, num_threads: usize, task_registry: Arc<TaskRegistry>, poll_interval: u64, db: Option<Arc<Db>>) -> Result<(),String> {
53 let server: String = self.server.clone();
54 let author: String = self.author.clone();
55 self.join_set.spawn( async move {
56 let worker = Worker::new(db.clone(),server,author).await;
57 let result = worker.watch(task_registry,num_threads, Some(queue_name),Some(poll_interval)).await;
58 assert!(result.is_ok(),"{:?}",result.unwrap_err());
59 });
60 Ok(())
61 }
62
63 /// Launches the scheduler that polls for scheduled tasks and pushes them to the queue for execution.
64 ///
65 /// This function runs a scheduler in the background, periodically polling for tasks
66 /// that are due for execution. The scheduler identifies tasks based on the provided
67 /// scheduler name and then enqueues them for processing. The polling interval determines
68 /// how often the scheduler checks for tasks.
69 ///
70 /// # Parameters
71 ///
72 /// - `scheduler_name`:
73 /// A string representing the name of the scheduler. This is used to identify the
74 /// scheduler instance and may correspond to a specific set of tasks.
75 /// - `poll_interval`:
76 /// The duration, in seconds, that the scheduler will pause between polling cycles.
77 /// A lower value increases polling frequency but may use more resources.
78 /// - `db`:
79 /// An optional `Arc`-wrapped database instance used for retrieving scheduled tasks
80 /// and their associated data.
81 ///
82 /// # Returns
83 ///
84 /// This function returns a `Result`:
85 ///
86 /// - `Ok(())` if the scheduler was successfully launched and is polling as expected.
87 /// - `Err(String)` if an error occurs during the setup or execution of the scheduler.
88 pub async fn scheduler(&mut self, scheduler_name: String, poll_interval: u64, db: Option<Arc<Db>>) -> Result<(),String> {
89 let server: String = self.server.clone();
90 let author: String = self.author.clone();
91 self.join_set.spawn( async move {
92 let scheduler = Scheduler::new(db.clone(),server,author).await;
93 let _ = scheduler.watch(Some(scheduler_name), Some(poll_interval)).await;
94 });
95 Ok(())
96 }
97
98 /// Waits for the scheduler and worker to complete their tasks.
99 pub async fn wait(self) -> Vec<()> {
100 self.join_set.join_all().await
101 }
102
103}