zirv_queue/worker/
mod.rs

1use crate::config::Config;
2use crate::models::job::Job;
3use crate::utils::deserialize;
4use crate::traits::Queueable;
5
6use std::error::Error;
7use std::thread;
8use std::time::Duration;
9
10/// Starts an autoscaling worker system based on the configured number of concurrent jobs.
11///
12/// This function spawns multiple worker threads—one per configured worker—and continuously
13/// polls the database for pending jobs. When a worker retrieves a job, it attempts to execute
14/// that job’s logic by deserializing the job’s payload into a type implementing the [`JobTrait`].
15///
16/// # Behavior
17///
18/// - **Thread Spawning**: Spawns `max_concurrent_jobs` threads as defined in [`Config::get_config`].
19/// - **Polling**: Each thread periodically fetches the next pending job via [`Job::fetch_pending_job`].
20/// - **Execution**: If a job is found, [`execute_job_logic`] is called to handle it. If `execute_job_logic`
21///   returns an error, it's logged, and the worker moves on.
22/// - **Idle Wait**: If no job is found or an error occurs, the thread sleeps for the configured `tick_rate_ms`
23///   duration before polling again.
24///
25/// This function does not return; the spawned threads run indefinitely.
26///
27/// # Example
28///
29/// ```rust,ignore
30/// fn main() {
31///     // Configure your DB and environment
32///     // ...
33///
34///     // Start the worker system
35///     start_autoscaling_worker();
36///
37///     // The workers will now run in the background
38///     // ...
39/// }
40/// ```
41pub fn start_autoscaling_worker() {
42    let config = Config::get_config();
43
44    let worker_count = config.max_concurrent_jobs;
45    println!(
46        "Starting {} worker threads, poll interval: {} ms",
47        worker_count, config.tick_rate_ms
48    );
49
50    for thread_id in 0..worker_count {
51        let poll_interval = config.tick_rate_ms;
52
53        thread::spawn(move || {
54            println!("[Worker {}] started.", thread_id);
55            loop {
56                // Try to fetch a pending job from DB
57                match Job::fetch_pending_job() {
58                    Ok(Some(mut job)) => {
59                        // If we got a job, execute its payload
60                        match execute_job_logic(&job) {
61                            Ok(return_value) => {
62                                // Job completed successfully
63                                match job.complete(format!("[Worker {}] {}", thread_id, return_value)) {
64                                    Ok(_) => {
65                                        println!("[Worker {}] Job {} completed.", thread_id, job.id);
66                                    }
67                                    Err(e) => {
68                                        eprintln!("[Worker {}] Error completing job {}: {:?}", thread_id, job.id, e);
69                                    }
70                                };
71                            }
72                            Err(e) => {
73                                eprintln!("[Worker {}] Job {} failed: {}", thread_id, job.id, e);
74
75                                match job.fail(format!("[Worker {}] {}", thread_id, e).as_str()) {
76                                    Ok(_) => {
77                                        println!("[Worker {}] Job {} failed.", thread_id, job.id);
78                                    }
79                                    Err(e) => {
80                                        eprintln!("[Worker {}] Error failing job {}: {:?}", thread_id, job.id, e);
81                                    }
82                                };
83                            }
84                        }
85                    }
86                    Ok(None) => {
87                        // No job found, so sleep briefly
88                        thread::sleep(Duration::from_millis(poll_interval));
89                    }
90                    Err(e) => {
91                        eprintln!("[Worker {}] Error fetching job: {:?}", thread_id, e);
92                        thread::sleep(Duration::from_millis(poll_interval));
93                    }
94                }
95            }
96        });
97    }
98}
99
100/// Executes job logic by deserializing the job’s payload into a type implementing
101/// the [`Queueable`] and calling its [`Queueable::handle`] method.
102///
103/// # Parameters
104///
105/// - `job`: A reference to the [`Job`] record pulled from the database, containing
106///   the payload to deserialize.
107///
108/// # Returns
109///
110/// - `Ok(())` if deserialization and job handling both succeed.
111/// - `Err(Box<dyn Error>)` if an error occurs during deserialization or in `job_trait.handle()`.
112///
113/// # Errors
114///
115/// - If the payload cannot be deserialized into a valid [`Queueable`] object, an error is returned.
116/// - If [`Queueable::handle`] fails, its error is propagated.
117///
118/// # Example
119///
120/// ```rust,ignore
121/// let job = Job {
122///     id: 1,
123///     payload: r#"{"type":"MyConcreteJob","field":"value"}"#.to_string(),
124///     // ...
125/// };
126///
127/// if let Err(e) = execute_job_logic(&job) {
128///     eprintln!("Failed to execute job logic: {}", e);
129/// }
130/// ```
131fn execute_job_logic(job: &Job) -> Result<String, Box<dyn Error>> {
132    let job_trait: Box<dyn Queueable> = match deserialize::deserialize(&job.payload) {
133        Ok(j) => j,
134        Err(e) => return Err(Box::new(e)),
135    };
136
137    match job_trait.handle() {
138        Ok(return_value) => {
139            match return_value {
140                Some(value) => Ok(value),
141                None => Ok("Job completed successfully.".to_string()),
142            }
143        },
144        Err(e) => Err(e),
145    }
146}