qml_rs/processing/
server.rs

1//! Background job server for managing job processing
2//!
3//! This module contains the BackgroundJobServer that coordinates job processing,
4//! manages worker threads, and handles the overall job processing lifecycle.
5
6use chrono::Duration;
7use serde::{Deserialize, Serialize};
8use std::sync::Arc;
9use tokio::task::JoinHandle;
10use tokio::time::{interval, sleep};
11use tracing::{debug, error, info};
12
13use super::{
14    RetryPolicy, WorkerRegistry, processor::JobProcessor, scheduler::JobScheduler,
15    worker::WorkerConfig,
16};
17use crate::error::{QmlError, Result};
18use crate::storage::Storage;
19
20/// Configuration for the background job server
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct ServerConfig {
23    /// Server name identifier
24    pub server_name: String,
25    /// Number of worker threads to run
26    pub worker_count: usize,
27    /// Polling interval for checking new jobs
28    pub polling_interval: Duration,
29    /// Timeout for job execution
30    pub job_timeout: Duration,
31    /// Queues to process (empty means all queues)
32    pub queues: Vec<String>,
33    /// Whether the server should start automatically
34    pub auto_start: bool,
35    /// Maximum number of jobs to fetch per polling cycle
36    pub fetch_batch_size: usize,
37    /// Enable the job scheduler
38    pub enable_scheduler: bool,
39    /// Scheduler polling interval
40    pub scheduler_poll_interval: Duration,
41}
42
43impl Default for ServerConfig {
44    fn default() -> Self {
45        Self {
46            server_name: "qml-server".to_string(),
47            worker_count: 5,
48            polling_interval: Duration::seconds(1),
49            job_timeout: Duration::minutes(5),
50            queues: vec!["default".to_string()],
51            auto_start: true,
52            fetch_batch_size: 10,
53            enable_scheduler: true,
54            scheduler_poll_interval: Duration::seconds(30),
55        }
56    }
57}
58
59impl ServerConfig {
60    /// Create a new server configuration
61    pub fn new(server_name: impl Into<String>) -> Self {
62        Self {
63            server_name: server_name.into(),
64            ..Default::default()
65        }
66    }
67
68    /// Set the number of workers
69    pub fn worker_count(mut self, count: usize) -> Self {
70        self.worker_count = count;
71        self
72    }
73
74    /// Set the polling interval
75    pub fn polling_interval(mut self, interval: Duration) -> Self {
76        self.polling_interval = interval;
77        self
78    }
79
80    /// Set the job timeout
81    pub fn job_timeout(mut self, timeout: Duration) -> Self {
82        self.job_timeout = timeout;
83        self
84    }
85
86    /// Set the queues to process
87    pub fn queues(mut self, queues: Vec<String>) -> Self {
88        self.queues = queues;
89        self
90    }
91
92    /// Set the fetch batch size
93    pub fn fetch_batch_size(mut self, size: usize) -> Self {
94        self.fetch_batch_size = size;
95        self
96    }
97
98    /// Enable or disable the scheduler
99    pub fn enable_scheduler(mut self, enable: bool) -> Self {
100        self.enable_scheduler = enable;
101        self
102    }
103}
104
105/// Background job server that manages job processing
106pub struct BackgroundJobServer {
107    config: ServerConfig,
108    storage: Arc<dyn Storage>,
109    worker_registry: Arc<WorkerRegistry>,
110    retry_policy: RetryPolicy,
111    #[allow(dead_code)]
112    scheduler: Option<JobScheduler>,
113    is_running: Arc<tokio::sync::RwLock<bool>>,
114    worker_handles: Arc<tokio::sync::RwLock<Vec<JoinHandle<()>>>>,
115}
116
117impl BackgroundJobServer {
118    /// Create a new background job server
119    pub fn new(
120        config: ServerConfig,
121        storage: Arc<dyn Storage>,
122        worker_registry: Arc<WorkerRegistry>,
123    ) -> Self {
124        let scheduler = if config.enable_scheduler {
125            Some(JobScheduler::with_poll_interval(
126                storage.clone(),
127                config.scheduler_poll_interval,
128            ))
129        } else {
130            None
131        };
132
133        Self {
134            config,
135            storage,
136            worker_registry,
137            retry_policy: RetryPolicy::default(),
138            scheduler,
139            is_running: Arc::new(tokio::sync::RwLock::new(false)),
140            worker_handles: Arc::new(tokio::sync::RwLock::new(Vec::new())),
141        }
142    }
143
144    /// Create a new background job server with custom retry policy
145    pub fn with_retry_policy(
146        config: ServerConfig,
147        storage: Arc<dyn Storage>,
148        worker_registry: Arc<WorkerRegistry>,
149        retry_policy: RetryPolicy,
150    ) -> Self {
151        let mut server = Self::new(config, storage, worker_registry);
152        server.retry_policy = retry_policy;
153        server
154    }
155
156    /// Start the background job server
157    pub async fn start(&self) -> Result<()> {
158        let mut is_running = self.is_running.write().await;
159        if *is_running {
160            return Err(QmlError::ConfigurationError {
161                message: "Server is already running".to_string(),
162            });
163        }
164
165        info!(
166            "Starting background job server '{}' with {} workers",
167            self.config.server_name, self.config.worker_count
168        );
169
170        *is_running = true;
171        drop(is_running);
172
173        // Start scheduler if enabled
174        if self.config.enable_scheduler {
175            let scheduler = JobScheduler::with_poll_interval(
176                self.storage.clone(),
177                self.config.scheduler_poll_interval,
178            );
179            let is_running_clone = self.is_running.clone();
180
181            let scheduler_handle = tokio::spawn(async move {
182                while *is_running_clone.read().await {
183                    if let Err(e) = scheduler.run().await {
184                        error!("Scheduler error: {}", e);
185                        sleep(std::time::Duration::from_secs(5)).await;
186                    }
187                }
188            });
189
190            self.worker_handles.write().await.push(scheduler_handle);
191        }
192
193        // Start worker threads
194        self.start_workers().await?;
195
196        info!("Background job server started successfully");
197        Ok(())
198    }
199
200    /// Stop the background job server
201    pub async fn stop(&self) -> Result<()> {
202        let mut is_running = self.is_running.write().await;
203        if !*is_running {
204            return Ok(());
205        }
206
207        info!(
208            "Stopping background job server '{}'",
209            self.config.server_name
210        );
211
212        *is_running = false;
213        drop(is_running);
214
215        // Wait for all workers to complete
216        let mut handles = self.worker_handles.write().await;
217        for handle in handles.drain(..) {
218            handle.abort();
219        }
220
221        info!("Background job server stopped");
222        Ok(())
223    }
224
225    /// Check if the server is running
226    pub async fn is_running(&self) -> bool {
227        *self.is_running.read().await
228    }
229
230    /// Get server configuration
231    pub fn config(&self) -> &ServerConfig {
232        &self.config
233    }
234
235    /// Start worker threads
236    async fn start_workers(&self) -> Result<()> {
237        let mut handles = self.worker_handles.write().await;
238
239        for worker_id in 0..self.config.worker_count {
240            let worker_config =
241                WorkerConfig::new(format!("{}:worker:{}", self.config.server_name, worker_id))
242                    .server_name(&self.config.server_name)
243                    .queues(self.config.queues.clone())
244                    .job_timeout(self.config.job_timeout)
245                    .polling_interval(self.config.polling_interval);
246
247            let processor = JobProcessor::with_retry_policy(
248                self.worker_registry.clone(),
249                self.storage.clone(),
250                worker_config,
251                self.retry_policy.clone(),
252            );
253
254            let storage_clone = self.storage.clone();
255            let config_clone = self.config.clone();
256            let is_running_clone = self.is_running.clone();
257
258            let handle = tokio::spawn(async move {
259                Self::worker_loop(processor, storage_clone, config_clone, is_running_clone).await;
260            });
261
262            handles.push(handle);
263        }
264
265        info!("Started {} worker threads", self.config.worker_count);
266        Ok(())
267    }
268
269    /// Main worker loop for processing jobs
270    async fn worker_loop(
271        processor: JobProcessor,
272        storage: Arc<dyn Storage>,
273        config: ServerConfig,
274        is_running: Arc<tokio::sync::RwLock<bool>>,
275    ) {
276        debug!("Worker thread started");
277
278        let mut interval = interval(
279            config
280                .polling_interval
281                .to_std()
282                .unwrap_or(std::time::Duration::from_secs(1)),
283        );
284
285        while *is_running.read().await {
286            interval.tick().await;
287
288            // Fetch available jobs
289            match storage
290                .get_available_jobs(Some(config.fetch_batch_size))
291                .await
292            {
293                Ok(jobs) => {
294                    if !jobs.is_empty() {
295                        debug!("Fetched {} jobs for processing", jobs.len());
296                    }
297
298                    for job in jobs {
299                        // Check if we should process this job based on queue filter
300                        if !config.queues.is_empty() && !config.queues.contains(&job.queue) {
301                            continue;
302                        }
303
304                        // Process the job
305                        if let Err(e) = processor.process_job(job).await {
306                            error!("Error processing job: {}", e);
307                        }
308
309                        // Check if we should stop
310                        if !*is_running.read().await {
311                            break;
312                        }
313                    }
314                }
315                Err(e) => {
316                    error!("Error fetching jobs: {}", e);
317                    // Back off on error
318                    sleep(std::time::Duration::from_secs(5)).await;
319                }
320            }
321        }
322
323        debug!("Worker thread stopped");
324    }
325}
326
327impl Clone for BackgroundJobServer {
328    fn clone(&self) -> Self {
329        let scheduler = if self.config.enable_scheduler {
330            Some(JobScheduler::with_poll_interval(
331                self.storage.clone(),
332                self.config.scheduler_poll_interval,
333            ))
334        } else {
335            None
336        };
337
338        Self {
339            config: self.config.clone(),
340            storage: self.storage.clone(),
341            worker_registry: self.worker_registry.clone(),
342            retry_policy: self.retry_policy.clone(),
343            scheduler,
344            is_running: Arc::new(tokio::sync::RwLock::new(false)),
345            worker_handles: Arc::new(tokio::sync::RwLock::new(Vec::new())),
346        }
347    }
348}
349
350#[cfg(test)]
351mod tests {
352    use super::*;
353    use crate::processing::{Worker, WorkerContext, WorkerResult};
354    use crate::storage::MemoryStorage;
355    use async_trait::async_trait;
356    use std::sync::atomic::{AtomicUsize, Ordering};
357
358    struct TestWorker {
359        method: String,
360        call_count: Arc<AtomicUsize>,
361    }
362
363    impl TestWorker {
364        fn new(method: &str) -> Self {
365            Self {
366                method: method.to_string(),
367                call_count: Arc::new(AtomicUsize::new(0)),
368            }
369        }
370
371        #[allow(dead_code)]
372        fn call_count(&self) -> usize {
373            self.call_count.load(Ordering::Relaxed)
374        }
375    }
376
377    #[async_trait]
378    impl Worker for TestWorker {
379        async fn execute(
380            &self,
381            _job: &crate::core::Job,
382            _context: &WorkerContext,
383        ) -> Result<WorkerResult> {
384            self.call_count.fetch_add(1, Ordering::Relaxed);
385            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
386            Ok(WorkerResult::success(
387                Some("Test completed".to_string()),
388                10,
389            ))
390        }
391
392        fn method_name(&self) -> &str {
393            &self.method
394        }
395    }
396
397    #[tokio::test]
398    async fn test_server_start_stop() {
399        let storage = Arc::new(MemoryStorage::new());
400        let mut registry = WorkerRegistry::new();
401        registry.register(TestWorker::new("test_method"));
402        let registry = Arc::new(registry);
403
404        let config = ServerConfig::new("test-server")
405            .worker_count(2)
406            .polling_interval(Duration::milliseconds(100))
407            .enable_scheduler(false);
408
409        let server = BackgroundJobServer::new(config, storage, registry);
410
411        // Start server
412        server.start().await.unwrap();
413        assert!(server.is_running().await);
414
415        // Stop server
416        server.stop().await.unwrap();
417        assert!(!server.is_running().await);
418    }
419
420    #[tokio::test]
421    async fn test_job_processing() {
422        let storage = Arc::new(MemoryStorage::new());
423        let worker = TestWorker::new("test_method");
424        let call_count = worker.call_count.clone();
425
426        let mut registry = WorkerRegistry::new();
427        registry.register(worker);
428        let registry = Arc::new(registry);
429
430        let config = ServerConfig::new("test-server")
431            .worker_count(1)
432            .polling_interval(Duration::milliseconds(10))
433            .fetch_batch_size(1)
434            .enable_scheduler(false);
435
436        let server = BackgroundJobServer::new(config, storage.clone(), registry);
437
438        // Enqueue a test job
439        let job = crate::core::Job::new("test_method", vec!["arg1".to_string()]);
440        storage.enqueue(&job).await.unwrap();
441
442        // Start server
443        server.start().await.unwrap();
444
445        // Wait for job to be processed
446        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
447
448        // Stop server
449        server.stop().await.unwrap();
450
451        // Check that the job was processed
452        assert!(call_count.load(Ordering::Relaxed) > 0);
453    }
454}