1use chrono::Duration;
7use serde::{Deserialize, Serialize};
8use std::sync::Arc;
9use tokio::task::JoinHandle;
10use tokio::time::{interval, sleep};
11use tracing::{debug, error, info};
12
13use super::{
14 RetryPolicy, WorkerRegistry, processor::JobProcessor, scheduler::JobScheduler,
15 worker::WorkerConfig,
16};
17use crate::error::{QmlError, Result};
18use crate::storage::Storage;
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct ServerConfig {
23 pub server_name: String,
25 pub worker_count: usize,
27 pub polling_interval: Duration,
29 pub job_timeout: Duration,
31 pub queues: Vec<String>,
33 pub auto_start: bool,
35 pub fetch_batch_size: usize,
37 pub enable_scheduler: bool,
39 pub scheduler_poll_interval: Duration,
41}
42
43impl Default for ServerConfig {
44 fn default() -> Self {
45 Self {
46 server_name: "qml-server".to_string(),
47 worker_count: 5,
48 polling_interval: Duration::seconds(1),
49 job_timeout: Duration::minutes(5),
50 queues: vec!["default".to_string()],
51 auto_start: true,
52 fetch_batch_size: 10,
53 enable_scheduler: true,
54 scheduler_poll_interval: Duration::seconds(30),
55 }
56 }
57}
58
59impl ServerConfig {
60 pub fn new(server_name: impl Into<String>) -> Self {
62 Self {
63 server_name: server_name.into(),
64 ..Default::default()
65 }
66 }
67
68 pub fn worker_count(mut self, count: usize) -> Self {
70 self.worker_count = count;
71 self
72 }
73
74 pub fn polling_interval(mut self, interval: Duration) -> Self {
76 self.polling_interval = interval;
77 self
78 }
79
80 pub fn job_timeout(mut self, timeout: Duration) -> Self {
82 self.job_timeout = timeout;
83 self
84 }
85
86 pub fn queues(mut self, queues: Vec<String>) -> Self {
88 self.queues = queues;
89 self
90 }
91
92 pub fn fetch_batch_size(mut self, size: usize) -> Self {
94 self.fetch_batch_size = size;
95 self
96 }
97
98 pub fn enable_scheduler(mut self, enable: bool) -> Self {
100 self.enable_scheduler = enable;
101 self
102 }
103}
104
105pub struct BackgroundJobServer {
107 config: ServerConfig,
108 storage: Arc<dyn Storage>,
109 worker_registry: Arc<WorkerRegistry>,
110 retry_policy: RetryPolicy,
111 #[allow(dead_code)]
112 scheduler: Option<JobScheduler>,
113 is_running: Arc<tokio::sync::RwLock<bool>>,
114 worker_handles: Arc<tokio::sync::RwLock<Vec<JoinHandle<()>>>>,
115}
116
117impl BackgroundJobServer {
118 pub fn new(
120 config: ServerConfig,
121 storage: Arc<dyn Storage>,
122 worker_registry: Arc<WorkerRegistry>,
123 ) -> Self {
124 let scheduler = if config.enable_scheduler {
125 Some(JobScheduler::with_poll_interval(
126 storage.clone(),
127 config.scheduler_poll_interval,
128 ))
129 } else {
130 None
131 };
132
133 Self {
134 config,
135 storage,
136 worker_registry,
137 retry_policy: RetryPolicy::default(),
138 scheduler,
139 is_running: Arc::new(tokio::sync::RwLock::new(false)),
140 worker_handles: Arc::new(tokio::sync::RwLock::new(Vec::new())),
141 }
142 }
143
144 pub fn with_retry_policy(
146 config: ServerConfig,
147 storage: Arc<dyn Storage>,
148 worker_registry: Arc<WorkerRegistry>,
149 retry_policy: RetryPolicy,
150 ) -> Self {
151 let mut server = Self::new(config, storage, worker_registry);
152 server.retry_policy = retry_policy;
153 server
154 }
155
156 pub async fn start(&self) -> Result<()> {
158 let mut is_running = self.is_running.write().await;
159 if *is_running {
160 return Err(QmlError::ConfigurationError {
161 message: "Server is already running".to_string(),
162 });
163 }
164
165 info!(
166 "Starting background job server '{}' with {} workers",
167 self.config.server_name, self.config.worker_count
168 );
169
170 *is_running = true;
171 drop(is_running);
172
173 if self.config.enable_scheduler {
175 let scheduler = JobScheduler::with_poll_interval(
176 self.storage.clone(),
177 self.config.scheduler_poll_interval,
178 );
179 let is_running_clone = self.is_running.clone();
180
181 let scheduler_handle = tokio::spawn(async move {
182 while *is_running_clone.read().await {
183 if let Err(e) = scheduler.run().await {
184 error!("Scheduler error: {}", e);
185 sleep(std::time::Duration::from_secs(5)).await;
186 }
187 }
188 });
189
190 self.worker_handles.write().await.push(scheduler_handle);
191 }
192
193 self.start_workers().await?;
195
196 info!("Background job server started successfully");
197 Ok(())
198 }
199
200 pub async fn stop(&self) -> Result<()> {
202 let mut is_running = self.is_running.write().await;
203 if !*is_running {
204 return Ok(());
205 }
206
207 info!(
208 "Stopping background job server '{}'",
209 self.config.server_name
210 );
211
212 *is_running = false;
213 drop(is_running);
214
215 let mut handles = self.worker_handles.write().await;
217 for handle in handles.drain(..) {
218 handle.abort();
219 }
220
221 info!("Background job server stopped");
222 Ok(())
223 }
224
225 pub async fn is_running(&self) -> bool {
227 *self.is_running.read().await
228 }
229
230 pub fn config(&self) -> &ServerConfig {
232 &self.config
233 }
234
235 async fn start_workers(&self) -> Result<()> {
237 let mut handles = self.worker_handles.write().await;
238
239 for worker_id in 0..self.config.worker_count {
240 let worker_config =
241 WorkerConfig::new(format!("{}:worker:{}", self.config.server_name, worker_id))
242 .server_name(&self.config.server_name)
243 .queues(self.config.queues.clone())
244 .job_timeout(self.config.job_timeout)
245 .polling_interval(self.config.polling_interval);
246
247 let processor = JobProcessor::with_retry_policy(
248 self.worker_registry.clone(),
249 self.storage.clone(),
250 worker_config,
251 self.retry_policy.clone(),
252 );
253
254 let storage_clone = self.storage.clone();
255 let config_clone = self.config.clone();
256 let is_running_clone = self.is_running.clone();
257
258 let handle = tokio::spawn(async move {
259 Self::worker_loop(processor, storage_clone, config_clone, is_running_clone).await;
260 });
261
262 handles.push(handle);
263 }
264
265 info!("Started {} worker threads", self.config.worker_count);
266 Ok(())
267 }
268
269 async fn worker_loop(
271 processor: JobProcessor,
272 storage: Arc<dyn Storage>,
273 config: ServerConfig,
274 is_running: Arc<tokio::sync::RwLock<bool>>,
275 ) {
276 debug!("Worker thread started");
277
278 let mut interval = interval(
279 config
280 .polling_interval
281 .to_std()
282 .unwrap_or(std::time::Duration::from_secs(1)),
283 );
284
285 while *is_running.read().await {
286 interval.tick().await;
287
288 match storage
290 .get_available_jobs(Some(config.fetch_batch_size))
291 .await
292 {
293 Ok(jobs) => {
294 if !jobs.is_empty() {
295 debug!("Fetched {} jobs for processing", jobs.len());
296 }
297
298 for job in jobs {
299 if !config.queues.is_empty() && !config.queues.contains(&job.queue) {
301 continue;
302 }
303
304 if let Err(e) = processor.process_job(job).await {
306 error!("Error processing job: {}", e);
307 }
308
309 if !*is_running.read().await {
311 break;
312 }
313 }
314 }
315 Err(e) => {
316 error!("Error fetching jobs: {}", e);
317 sleep(std::time::Duration::from_secs(5)).await;
319 }
320 }
321 }
322
323 debug!("Worker thread stopped");
324 }
325}
326
327impl Clone for BackgroundJobServer {
328 fn clone(&self) -> Self {
329 let scheduler = if self.config.enable_scheduler {
330 Some(JobScheduler::with_poll_interval(
331 self.storage.clone(),
332 self.config.scheduler_poll_interval,
333 ))
334 } else {
335 None
336 };
337
338 Self {
339 config: self.config.clone(),
340 storage: self.storage.clone(),
341 worker_registry: self.worker_registry.clone(),
342 retry_policy: self.retry_policy.clone(),
343 scheduler,
344 is_running: Arc::new(tokio::sync::RwLock::new(false)),
345 worker_handles: Arc::new(tokio::sync::RwLock::new(Vec::new())),
346 }
347 }
348}
349
350#[cfg(test)]
351mod tests {
352 use super::*;
353 use crate::processing::{Worker, WorkerContext, WorkerResult};
354 use crate::storage::MemoryStorage;
355 use async_trait::async_trait;
356 use std::sync::atomic::{AtomicUsize, Ordering};
357
358 struct TestWorker {
359 method: String,
360 call_count: Arc<AtomicUsize>,
361 }
362
363 impl TestWorker {
364 fn new(method: &str) -> Self {
365 Self {
366 method: method.to_string(),
367 call_count: Arc::new(AtomicUsize::new(0)),
368 }
369 }
370
371 #[allow(dead_code)]
372 fn call_count(&self) -> usize {
373 self.call_count.load(Ordering::Relaxed)
374 }
375 }
376
377 #[async_trait]
378 impl Worker for TestWorker {
379 async fn execute(
380 &self,
381 _job: &crate::core::Job,
382 _context: &WorkerContext,
383 ) -> Result<WorkerResult> {
384 self.call_count.fetch_add(1, Ordering::Relaxed);
385 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
386 Ok(WorkerResult::success(
387 Some("Test completed".to_string()),
388 10,
389 ))
390 }
391
392 fn method_name(&self) -> &str {
393 &self.method
394 }
395 }
396
397 #[tokio::test]
398 async fn test_server_start_stop() {
399 let storage = Arc::new(MemoryStorage::new());
400 let mut registry = WorkerRegistry::new();
401 registry.register(TestWorker::new("test_method"));
402 let registry = Arc::new(registry);
403
404 let config = ServerConfig::new("test-server")
405 .worker_count(2)
406 .polling_interval(Duration::milliseconds(100))
407 .enable_scheduler(false);
408
409 let server = BackgroundJobServer::new(config, storage, registry);
410
411 server.start().await.unwrap();
413 assert!(server.is_running().await);
414
415 server.stop().await.unwrap();
417 assert!(!server.is_running().await);
418 }
419
420 #[tokio::test]
421 async fn test_job_processing() {
422 let storage = Arc::new(MemoryStorage::new());
423 let worker = TestWorker::new("test_method");
424 let call_count = worker.call_count.clone();
425
426 let mut registry = WorkerRegistry::new();
427 registry.register(worker);
428 let registry = Arc::new(registry);
429
430 let config = ServerConfig::new("test-server")
431 .worker_count(1)
432 .polling_interval(Duration::milliseconds(10))
433 .fetch_batch_size(1)
434 .enable_scheduler(false);
435
436 let server = BackgroundJobServer::new(config, storage.clone(), registry);
437
438 let job = crate::core::Job::new("test_method", vec!["arg1".to_string()]);
440 storage.enqueue(&job).await.unwrap();
441
442 server.start().await.unwrap();
444
445 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
447
448 server.stop().await.unwrap();
450
451 assert!(call_count.load(Ordering::Relaxed) > 0);
453 }
454}