zirv_queue/worker/mod.rs
1use crate::config::Config;
2use crate::models::job::Job;
3use crate::utils::deserialize;
4use crate::traits::Queueable;
5
6use std::error::Error;
7use std::thread;
8use std::time::Duration;
9
10/// Starts an autoscaling worker system based on the configured number of concurrent jobs.
11///
12/// This function spawns multiple worker threads—one per configured worker—and continuously
13/// polls the database for pending jobs. When a worker retrieves a job, it attempts to execute
14/// that job’s logic by deserializing the job’s payload into a type implementing the [`JobTrait`].
15///
16/// # Behavior
17///
18/// - **Thread Spawning**: Spawns `max_concurrent_jobs` threads as defined in [`Config::get_config`].
19/// - **Polling**: Each thread periodically fetches the next pending job via [`Job::fetch_pending_job`].
20/// - **Execution**: If a job is found, [`execute_job_logic`] is called to handle it. If `execute_job_logic`
21/// returns an error, it's logged, and the worker moves on.
22/// - **Idle Wait**: If no job is found or an error occurs, the thread sleeps for the configured `tick_rate_ms`
23/// duration before polling again.
24///
25/// This function does not return; the spawned threads run indefinitely.
26///
27/// # Example
28///
29/// ```rust,ignore
30/// fn main() {
31/// // Configure your DB and environment
32/// // ...
33///
34/// // Start the worker system
35/// start_autoscaling_worker();
36///
37/// // The workers will now run in the background
38/// // ...
39/// }
40/// ```
41pub fn start_autoscaling_worker() {
42 let config = Config::get_config();
43
44 let worker_count = config.max_concurrent_jobs;
45 println!(
46 "Starting {} worker threads, poll interval: {} ms",
47 worker_count, config.tick_rate_ms
48 );
49
50 for thread_id in 0..worker_count {
51 let poll_interval = config.tick_rate_ms;
52
53 thread::spawn(move || {
54 println!("[Worker {}] started.", thread_id);
55 loop {
56 // Try to fetch a pending job from DB
57 match Job::fetch_pending_job() {
58 Ok(Some(mut job)) => {
59 // If we got a job, execute its payload
60 match execute_job_logic(&job) {
61 Ok(return_value) => {
62 // Job completed successfully
63 match job.complete(format!("[Worker {}] {}", thread_id, return_value)) {
64 Ok(_) => {
65 println!("[Worker {}] Job {} completed.", thread_id, job.id);
66 }
67 Err(e) => {
68 eprintln!("[Worker {}] Error completing job {}: {:?}", thread_id, job.id, e);
69 }
70 };
71 }
72 Err(e) => {
73 eprintln!("[Worker {}] Job {} failed: {}", thread_id, job.id, e);
74
75 match job.fail(format!("[Worker {}] {}", thread_id, e).as_str()) {
76 Ok(_) => {
77 println!("[Worker {}] Job {} failed.", thread_id, job.id);
78 }
79 Err(e) => {
80 eprintln!("[Worker {}] Error failing job {}: {:?}", thread_id, job.id, e);
81 }
82 };
83 }
84 }
85 }
86 Ok(None) => {
87 // No job found, so sleep briefly
88 thread::sleep(Duration::from_millis(poll_interval));
89 }
90 Err(e) => {
91 eprintln!("[Worker {}] Error fetching job: {:?}", thread_id, e);
92 thread::sleep(Duration::from_millis(poll_interval));
93 }
94 }
95 }
96 });
97 }
98}
99
100/// Executes job logic by deserializing the job’s payload into a type implementing
101/// the [`Queueable`] and calling its [`Queueable::handle`] method.
102///
103/// # Parameters
104///
105/// - `job`: A reference to the [`Job`] record pulled from the database, containing
106/// the payload to deserialize.
107///
108/// # Returns
109///
110/// - `Ok(())` if deserialization and job handling both succeed.
111/// - `Err(Box<dyn Error>)` if an error occurs during deserialization or in `job_trait.handle()`.
112///
113/// # Errors
114///
115/// - If the payload cannot be deserialized into a valid [`Queueable`] object, an error is returned.
116/// - If [`Queueable::handle`] fails, its error is propagated.
117///
118/// # Example
119///
120/// ```rust,ignore
121/// let job = Job {
122/// id: 1,
123/// payload: r#"{"type":"MyConcreteJob","field":"value"}"#.to_string(),
124/// // ...
125/// };
126///
127/// if let Err(e) = execute_job_logic(&job) {
128/// eprintln!("Failed to execute job logic: {}", e);
129/// }
130/// ```
131fn execute_job_logic(job: &Job) -> Result<String, Box<dyn Error>> {
132 let job_trait: Box<dyn Queueable> = match deserialize::deserialize(&job.payload) {
133 Ok(j) => j,
134 Err(e) => return Err(Box::new(e)),
135 };
136
137 match job_trait.handle() {
138 Ok(return_value) => {
139 match return_value {
140 Some(value) => Ok(value),
141 None => Ok("Job completed successfully.".to_string()),
142 }
143 },
144 Err(e) => Err(e),
145 }
146}