1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
use crate::scheduler::Scheduler;
use crate::task::TaskRegistry;
use std::sync::Arc;
use crate::worker::Worker;
use tokio::task::JoinSet;
use crate::database::Db;
#[derive(Debug)]
pub struct Manager {
pub join_set: JoinSet<()>,
pub server: String,
pub author: String
}
impl Manager {
/// Initializes the `Manager` struct, enabling the use of worker and scheduler functionalities for task management.
pub async fn new(server: String, author: String) -> Self {
Self {
join_set: JoinSet::new(),
server,
author
}
}
/// Launches a worker that polls the task queue and executes tasks asynchronously.
///
/// This function spawns a worker in the background using the provided parameters.
/// The worker monitors the task queue, executes tasks using the handlers in the task registry,
/// and continues polling based on the specified interval.
///
/// # Parameters
///
/// - `queue_name`:
/// The name of the queue/worker to monitor and process tasks.
/// - `num_threads`:
/// The number of threads to spawn for concurrent task execution.
/// - `task_registry`:
/// An `Arc`-wrapped [`TaskRegistry`] containing the handlers for task execution.
/// - `poll_interval`:
/// The duration, in seconds, to pause between polling cycles.
/// - `db`:
/// An optional `Arc`-wrapped database instance for use by the worker.
///
/// # Returns
///
/// This function returns a `Result`:
///
/// - `Ok(())` if the worker was successfully launched.
/// - `Err(String)` if an error occurs during the worker setup.
pub async fn worker(&mut self, queue_name: String, num_threads: usize, task_registry: Arc<TaskRegistry>, poll_interval: u64, db: Option<Arc<Db>>) -> Result<(),String> {
let server: String = self.server.clone();
let author: String = self.author.clone();
self.join_set.spawn( async move {
let worker = Worker::new(db.clone(),server,author).await;
let result = worker.watch(task_registry,num_threads, Some(queue_name),Some(poll_interval)).await;
assert!(result.is_ok(),"{:?}",result.unwrap_err());
});
Ok(())
}
/// Launches the scheduler that polls for scheduled tasks and pushes them to the queue for execution.
///
/// This function runs a scheduler in the background, periodically polling for tasks
/// that are due for execution. The scheduler identifies tasks based on the provided
/// scheduler name and then enqueues them for processing. The polling interval determines
/// how often the scheduler checks for tasks.
///
/// # Parameters
///
/// - `scheduler_name`:
/// A string representing the name of the scheduler. This is used to identify the
/// scheduler instance and may correspond to a specific set of tasks.
/// - `poll_interval`:
/// The duration, in seconds, that the scheduler will pause between polling cycles.
/// A lower value increases polling frequency but may use more resources.
/// - `db`:
/// An optional `Arc`-wrapped database instance used for retrieving scheduled tasks
/// and their associated data.
///
/// # Returns
///
/// This function returns a `Result`:
///
/// - `Ok(())` if the scheduler was successfully launched and is polling as expected.
/// - `Err(String)` if an error occurs during the setup or execution of the scheduler.
pub async fn scheduler(&mut self, scheduler_name: String, poll_interval: u64, db: Option<Arc<Db>>) -> Result<(),String> {
let server: String = self.server.clone();
let author: String = self.author.clone();
self.join_set.spawn( async move {
let scheduler = Scheduler::new(db.clone(),server,author).await;
let _ = scheduler.watch(Some(scheduler_name), Some(poll_interval)).await;
});
Ok(())
}
/// Waits for the scheduler and worker to complete their tasks.
pub async fn wait(self) -> Vec<()> {
self.join_set.join_all().await
}
}