1use chrono::Duration;
7use serde::{Deserialize, Serialize};
8use std::sync::Arc;
9use tokio::task::JoinHandle;
10use tokio::time::{MissedTickBehavior, interval, sleep};
11use tokio_util::sync::CancellationToken;
12use tracing::{debug, error, info, warn};
13
14use super::{
15 RetryPolicy, WorkerRegistry,
16 cleanup::{CleanupWorker, DEFAULT_CLEANUP_INTERVAL, DEFAULT_FAILED_TTL, DEFAULT_SUCCEEDED_TTL},
17 heartbeat::{DEFAULT_DEAD_SERVER_TIMEOUT, DEFAULT_HEARTBEAT_INTERVAL, HeartbeatWorker},
18 middleware::{JobMiddleware, TracingMiddleware},
19 processor::{JobProcessor, StateChangeHook},
20 recurring::RecurringJobPoller,
21 scheduler::JobScheduler,
22 worker::WorkerConfig,
23};
24use crate::core::{RecurringJob, ServerInfo};
25use crate::error::{QmlError, Result};
26use crate::storage::Storage;
27use crate::storage::prelude::*;
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct ServerConfig {
32 pub server_name: String,
34 pub worker_count: usize,
36 pub polling_interval: Duration,
38 pub job_timeout: Duration,
40 pub queues: Vec<String>,
42 pub auto_start: bool,
44 pub fetch_batch_size: usize,
46 pub enable_scheduler: bool,
48 pub scheduler_poll_interval: Duration,
50 pub shutdown_timeout: Duration,
55 pub stale_processing_after: Duration,
61 pub enable_recurring: bool,
64 pub recurring_poll_interval: Duration,
67 pub enable_cleanup: bool,
70 pub cleanup_interval: Duration,
72 pub succeeded_ttl: Duration,
74 pub failed_ttl: Duration,
76 pub enable_heartbeat: bool,
87 pub heartbeat_interval: Duration,
90 pub dead_server_timeout: Duration,
94}
95
96impl Default for ServerConfig {
97 fn default() -> Self {
98 Self {
99 server_name: "qml-server".to_string(),
100 worker_count: 5,
101 polling_interval: Duration::seconds(1),
102 job_timeout: Duration::minutes(5),
103 queues: vec!["default".to_string()],
104 auto_start: true,
105 fetch_batch_size: 10,
106 enable_scheduler: true,
107 scheduler_poll_interval: Duration::seconds(30),
108 shutdown_timeout: Duration::seconds(30),
109 stale_processing_after: Duration::minutes(5),
110 enable_recurring: true,
111 recurring_poll_interval: Duration::seconds(5),
112 enable_cleanup: true,
113 cleanup_interval: DEFAULT_CLEANUP_INTERVAL,
114 succeeded_ttl: DEFAULT_SUCCEEDED_TTL,
115 failed_ttl: DEFAULT_FAILED_TTL,
116 enable_heartbeat: false,
117 heartbeat_interval: DEFAULT_HEARTBEAT_INTERVAL,
118 dead_server_timeout: DEFAULT_DEAD_SERVER_TIMEOUT,
119 }
120 }
121}
122
123impl ServerConfig {
124 pub fn new(server_name: impl Into<String>) -> Self {
126 Self {
127 server_name: server_name.into(),
128 ..Default::default()
129 }
130 }
131
132 pub fn worker_count(mut self, count: usize) -> Self {
134 self.worker_count = count;
135 self
136 }
137
138 pub fn polling_interval(mut self, interval: Duration) -> Self {
140 self.polling_interval = interval;
141 self
142 }
143
144 pub fn job_timeout(mut self, timeout: Duration) -> Self {
146 self.job_timeout = timeout;
147 self
148 }
149
150 pub fn queues(mut self, queues: Vec<String>) -> Self {
152 self.queues = queues;
153 self
154 }
155
156 pub fn fetch_batch_size(mut self, size: usize) -> Self {
158 self.fetch_batch_size = size;
159 self
160 }
161
162 pub fn enable_scheduler(mut self, enable: bool) -> Self {
164 self.enable_scheduler = enable;
165 self
166 }
167
168 pub fn shutdown_timeout(mut self, timeout: Duration) -> Self {
170 self.shutdown_timeout = timeout;
171 self
172 }
173
174 pub fn stale_processing_after(mut self, threshold: Duration) -> Self {
177 self.stale_processing_after = threshold;
178 self
179 }
180
181 pub fn enable_recurring(mut self, enable: bool) -> Self {
183 self.enable_recurring = enable;
184 self
185 }
186
187 pub fn recurring_poll_interval(mut self, interval: Duration) -> Self {
189 self.recurring_poll_interval = interval;
190 self
191 }
192
193 pub fn enable_cleanup(mut self, enable: bool) -> Self {
195 self.enable_cleanup = enable;
196 self
197 }
198
199 pub fn cleanup_interval(mut self, interval: Duration) -> Self {
201 self.cleanup_interval = interval;
202 self
203 }
204
205 pub fn succeeded_ttl(mut self, ttl: Duration) -> Self {
207 self.succeeded_ttl = ttl;
208 self
209 }
210
211 pub fn failed_ttl(mut self, ttl: Duration) -> Self {
213 self.failed_ttl = ttl;
214 self
215 }
216
217 pub fn enable_heartbeat(mut self, enable: bool) -> Self {
219 self.enable_heartbeat = enable;
220 self
221 }
222
223 pub fn heartbeat_interval(mut self, interval: Duration) -> Self {
225 self.heartbeat_interval = interval;
226 self
227 }
228
229 pub fn dead_server_timeout(mut self, timeout: Duration) -> Self {
231 self.dead_server_timeout = timeout;
232 self
233 }
234}
235
236pub struct BackgroundJobServer {
238 config: ServerConfig,
239 storage: Arc<dyn Storage>,
240 worker_registry: Arc<WorkerRegistry>,
241 retry_policy: RetryPolicy,
242 middleware: Vec<Arc<dyn JobMiddleware>>,
250 on_state_change: Option<StateChangeHook>,
256 is_running: Arc<tokio::sync::RwLock<bool>>,
257 shutdown_token: Arc<tokio::sync::Mutex<CancellationToken>>,
262 worker_handles: Arc<tokio::sync::Mutex<Vec<JoinHandle<()>>>>,
263 server_id: Arc<tokio::sync::Mutex<Option<String>>>,
268}
269
270impl BackgroundJobServer {
271 pub fn new(
273 config: ServerConfig,
274 storage: Arc<dyn Storage>,
275 worker_registry: Arc<WorkerRegistry>,
276 ) -> Self {
277 Self {
278 config,
279 storage,
280 worker_registry,
281 retry_policy: RetryPolicy::default(),
282 middleware: vec![Arc::new(TracingMiddleware)],
283 on_state_change: None,
284 is_running: Arc::new(tokio::sync::RwLock::new(false)),
285 shutdown_token: Arc::new(tokio::sync::Mutex::new(CancellationToken::new())),
286 worker_handles: Arc::new(tokio::sync::Mutex::new(Vec::new())),
287 server_id: Arc::new(tokio::sync::Mutex::new(None)),
288 }
289 }
290
291 pub fn with_middleware(mut self, middleware: Vec<Arc<dyn JobMiddleware>>) -> Self {
303 self.middleware = middleware;
304 self
305 }
306
307 pub fn with_state_change_hook(mut self, hook: StateChangeHook) -> Self {
316 self.on_state_change = Some(hook);
317 self
318 }
319
320 pub fn with_retry_policy(
322 config: ServerConfig,
323 storage: Arc<dyn Storage>,
324 worker_registry: Arc<WorkerRegistry>,
325 retry_policy: RetryPolicy,
326 ) -> Self {
327 let mut server = Self::new(config, storage, worker_registry);
328 server.retry_policy = retry_policy;
329 server
330 }
331
332 pub async fn start(&self) -> Result<()> {
334 let mut is_running = self.is_running.write().await;
335 if *is_running {
336 return Err(QmlError::ConfigurationError {
337 message: "Server is already running".to_string(),
338 });
339 }
340
341 info!(
342 "Starting background job server '{}' with {} workers",
343 self.config.server_name, self.config.worker_count
344 );
345
346 let stale_before = chrono::Utc::now() - self.config.stale_processing_after;
350 match self.storage.requeue_stranded_jobs(stale_before).await {
351 Ok(0) => {}
352 Ok(n) => info!("Recovered {} stranded Processing job(s) on startup", n),
353 Err(e) => warn!(
354 "Failed to recover stranded Processing jobs on startup: {}",
355 e
356 ),
357 }
358
359 let shutdown_token = CancellationToken::new();
361 *self.shutdown_token.lock().await = shutdown_token.clone();
362
363 let server_identity = if self.config.enable_heartbeat {
369 let id = format!("{}#{}", self.config.server_name, uuid::Uuid::new_v4());
370 let info = ServerInfo::new(
371 id.clone(),
372 &self.config.server_name,
373 self.config.worker_count as u32,
374 self.config.queues.clone(),
375 );
376 self.storage
377 .register_server(&info)
378 .await
379 .map_err(|e| QmlError::StorageError {
380 message: format!("Failed to register server heartbeat: {}", e),
381 })?;
382 *self.server_id.lock().await = Some(id.clone());
383 Some(id)
384 } else {
385 *self.server_id.lock().await = None;
386 None
387 };
388
389 *is_running = true;
390 drop(is_running);
391
392 if self.config.enable_scheduler {
394 let scheduler = JobScheduler::with_poll_interval(
395 self.storage.clone(),
396 self.config.scheduler_poll_interval,
397 );
398 let scheduler_cancel = shutdown_token.clone();
399
400 let scheduler_handle = tokio::spawn(async move {
401 if let Err(e) = scheduler.run_until_cancelled(scheduler_cancel).await {
402 error!("Scheduler error: {}", e);
403 }
404 });
405
406 self.worker_handles.lock().await.push(scheduler_handle);
407 }
408
409 if self.config.enable_recurring {
411 let poller =
412 RecurringJobPoller::new(self.storage.clone(), self.config.recurring_poll_interval);
413 let cancel = shutdown_token.clone();
414 let handle = tokio::spawn(async move {
415 if let Err(e) = poller.run_until_cancelled(cancel).await {
416 error!("Recurring poller error: {}", e);
417 }
418 });
419 self.worker_handles.lock().await.push(handle);
420 }
421
422 if self.config.enable_cleanup {
424 let cleanup = CleanupWorker::new(self.storage.clone(), self.config.cleanup_interval);
425 let cancel = shutdown_token.clone();
426 let handle = tokio::spawn(async move {
427 if let Err(e) = cleanup.run_until_cancelled(cancel).await {
428 error!("Cleanup worker error: {}", e);
429 }
430 });
431 self.worker_handles.lock().await.push(handle);
432 }
433
434 if let Some(ref id) = server_identity {
438 let heartbeat = HeartbeatWorker::new(
439 self.storage.clone(),
440 id.clone(),
441 self.config.heartbeat_interval,
442 self.config.dead_server_timeout,
443 );
444 let cancel = shutdown_token.clone();
445 let handle = tokio::spawn(async move {
446 if let Err(e) = heartbeat.run_until_cancelled(cancel).await {
447 error!("Heartbeat worker error: {}", e);
448 }
449 });
450 self.worker_handles.lock().await.push(handle);
451 }
452
453 let stamped_name = server_identity
458 .clone()
459 .unwrap_or_else(|| self.config.server_name.clone());
460 self.start_workers(shutdown_token, stamped_name).await?;
461
462 info!("Background job server started successfully");
463 Ok(())
464 }
465
466 pub async fn stop(&self) -> Result<()> {
474 let mut is_running = self.is_running.write().await;
475 if !*is_running {
476 return Ok(());
477 }
478
479 info!(
480 "Stopping background job server '{}'",
481 self.config.server_name
482 );
483
484 self.shutdown_token.lock().await.cancel();
485 *is_running = false;
486 drop(is_running);
487
488 if let Some(id) = self.server_id.lock().await.take()
492 && let Err(e) = self.storage.deregister_server(&id).await
493 {
494 warn!("Failed to deregister server '{}' on stop: {}", id, e);
495 }
496
497 let handles = {
498 let mut guard = self.worker_handles.lock().await;
499 std::mem::take(&mut *guard)
500 };
501 let abort_handles: Vec<_> = handles.iter().map(|h| h.abort_handle()).collect();
502
503 let shutdown_timeout = self
504 .config
505 .shutdown_timeout
506 .to_std()
507 .unwrap_or(std::time::Duration::from_secs(30));
508
509 let join_all = async {
510 for handle in handles {
511 let _ = handle.await;
512 }
513 };
514
515 match tokio::time::timeout(shutdown_timeout, join_all).await {
516 Ok(()) => info!("Background job server stopped cleanly"),
517 Err(_) => {
518 warn!(
519 "Shutdown grace period of {:?} elapsed; aborting {} remaining task(s)",
520 shutdown_timeout,
521 abort_handles.len()
522 );
523 for handle in &abort_handles {
524 handle.abort();
525 }
526 }
527 }
528
529 Ok(())
530 }
531
532 pub async fn is_running(&self) -> bool {
534 *self.is_running.read().await
535 }
536
537 pub fn config(&self) -> &ServerConfig {
539 &self.config
540 }
541
542 pub async fn schedule_recurring(
552 &self,
553 id: impl Into<String>,
554 cron: impl Into<String>,
555 method: impl Into<String>,
556 payload: serde_json::Value,
557 queue: impl Into<String>,
558 ) -> Result<()> {
559 let recurring = RecurringJob::new(id, cron, method, payload, queue)?;
560 self.storage
561 .upsert_recurring_job(&recurring)
562 .await
563 .map_err(|e| QmlError::StorageError {
564 message: format!("Failed to upsert recurring job: {}", e),
565 })
566 }
567
568 pub async fn remove_recurring(&self, id: &str) -> Result<bool> {
571 self.storage
572 .remove_recurring_job(id)
573 .await
574 .map_err(|e| QmlError::StorageError {
575 message: format!("Failed to remove recurring job: {}", e),
576 })
577 }
578
579 async fn start_workers(
587 &self,
588 shutdown_token: CancellationToken,
589 stamped_server_name: String,
590 ) -> Result<()> {
591 let mut handles = self.worker_handles.lock().await;
592
593 for worker_id in 0..self.config.worker_count {
594 let worker_config =
595 WorkerConfig::new(format!("{}:worker:{}", self.config.server_name, worker_id))
596 .server_name(&stamped_server_name)
597 .queues(self.config.queues.clone())
598 .job_timeout(self.config.job_timeout)
599 .polling_interval(self.config.polling_interval);
600
601 let worker_cancel = shutdown_token.child_token();
605
606 let mut processor = JobProcessor::with_retry_policy(
607 self.worker_registry.clone(),
608 self.storage.clone(),
609 worker_config,
610 self.retry_policy.clone(),
611 )
612 .with_cancellation(worker_cancel.clone())
613 .with_ttls(self.config.succeeded_ttl, self.config.failed_ttl)
614 .with_middleware(self.middleware.clone());
615
616 if let Some(hook) = &self.on_state_change {
617 processor = processor.with_state_change_hook(hook.clone());
618 }
619
620 let storage_clone = self.storage.clone();
621 let config_clone = self.config.clone();
622
623 let handle = tokio::spawn(async move {
624 Self::worker_loop(processor, storage_clone, config_clone, worker_cancel).await;
625 });
626
627 handles.push(handle);
628 }
629
630 info!("Started {} worker threads", self.config.worker_count);
631 Ok(())
632 }
633
634 async fn worker_loop(
642 processor: JobProcessor,
643 storage: Arc<dyn Storage>,
644 config: ServerConfig,
645 cancel: CancellationToken,
646 ) {
647 debug!("Worker thread started");
648
649 let mut interval = interval(
650 config
651 .polling_interval
652 .to_std()
653 .unwrap_or(std::time::Duration::from_secs(1)),
654 );
655 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
661
662 loop {
663 tokio::select! {
664 biased;
665 _ = cancel.cancelled() => break,
666 _ = interval.tick() => {}
667 }
668
669 let queue_filter = if config.queues.is_empty() {
671 None
672 } else {
673 Some(config.queues.as_slice())
674 };
675
676 match storage
677 .fetch_and_lock_job(processor.get_worker_id(), queue_filter)
678 .await
679 {
680 Ok(Some(job)) => {
681 debug!("Fetched job {} for processing", job.id);
682
683 if let Err(e) = processor.process_job(job).await {
687 error!("Error processing job: {}", e);
688 }
689 }
690 Ok(None) => {
691 }
693 Err(e) => {
694 error!("Error fetching jobs: {}", e);
695 let jitter_ms = fastrand::u64(0..1500);
700 let backoff = std::time::Duration::from_millis(5_000 + jitter_ms);
701 tokio::select! {
702 _ = cancel.cancelled() => break,
703 _ = sleep(backoff) => {}
704 }
705 }
706 }
707 }
708
709 debug!("Worker thread stopped");
710 }
711}
712
713#[cfg(test)]
714mod tests {
715 use super::*;
716 use crate::processing::{Worker, WorkerContext, WorkerResult};
717 use crate::storage::{MemoryStorage, MonitoringApi};
718 use async_trait::async_trait;
719 use std::sync::atomic::{AtomicUsize, Ordering};
720
721 struct TestWorker {
722 method: String,
723 call_count: Arc<AtomicUsize>,
724 }
725
726 impl TestWorker {
727 fn new(method: &str) -> Self {
728 Self {
729 method: method.to_string(),
730 call_count: Arc::new(AtomicUsize::new(0)),
731 }
732 }
733
734 #[allow(dead_code)]
735 fn call_count(&self) -> usize {
736 self.call_count.load(Ordering::Relaxed)
737 }
738 }
739
740 #[async_trait]
741 impl Worker for TestWorker {
742 async fn execute(
743 &self,
744 _job: &crate::core::Job,
745 _context: &WorkerContext,
746 ) -> Result<WorkerResult> {
747 self.call_count.fetch_add(1, Ordering::Relaxed);
748 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
749 Ok(WorkerResult::success(
750 Some("Test completed".to_string()),
751 10,
752 ))
753 }
754
755 fn method_name(&self) -> &str {
756 &self.method
757 }
758 }
759
760 #[tokio::test]
761 async fn test_server_start_stop() {
762 let storage = Arc::new(MemoryStorage::new());
763 let mut registry = WorkerRegistry::new();
764 registry.register(TestWorker::new("test_method"));
765 let registry = Arc::new(registry);
766
767 let config = ServerConfig::new("test-server")
768 .worker_count(2)
769 .polling_interval(Duration::milliseconds(100))
770 .enable_scheduler(false);
771
772 let server = BackgroundJobServer::new(config, storage, registry);
773
774 server.start().await.unwrap();
776 assert!(server.is_running().await);
777
778 server.stop().await.unwrap();
780 assert!(!server.is_running().await);
781 }
782
783 #[tokio::test]
788 async fn stop_waits_for_inflight_job_to_complete() {
789 struct SlowWorker {
790 done: Arc<AtomicUsize>,
791 }
792
793 #[async_trait]
794 impl Worker for SlowWorker {
795 async fn execute(
796 &self,
797 _job: &crate::core::Job,
798 _ctx: &WorkerContext,
799 ) -> Result<WorkerResult> {
800 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
801 self.done.fetch_add(1, Ordering::Relaxed);
802 Ok(WorkerResult::success(None, 500))
803 }
804
805 fn method_name(&self) -> &str {
806 "slow_method"
807 }
808 }
809
810 let storage = Arc::new(MemoryStorage::new());
811 let done = Arc::new(AtomicUsize::new(0));
812
813 let mut registry = WorkerRegistry::new();
814 registry.register(SlowWorker { done: done.clone() });
815 let registry = Arc::new(registry);
816
817 let config = ServerConfig::new("s1-test")
818 .worker_count(1)
819 .polling_interval(Duration::milliseconds(10))
820 .enable_scheduler(false)
821 .shutdown_timeout(Duration::seconds(5));
822
823 let server = BackgroundJobServer::new(config, storage.clone(), registry);
824
825 let job = crate::core::Job::new("slow_method", serde_json::Value::Null);
826 let job_id = job.id.clone();
827 storage.enqueue(&job).await.unwrap();
828
829 server.start().await.unwrap();
830
831 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
834 server.stop().await.unwrap();
835
836 assert_eq!(done.load(Ordering::Relaxed), 1, "worker should complete");
839 let final_job = storage.get(&job_id).await.unwrap().unwrap();
840 assert!(
841 matches!(final_job.state, crate::core::JobState::Succeeded { .. }),
842 "job should be Succeeded after graceful stop, got {:?}",
843 final_job.state
844 );
845 }
846
847 #[tokio::test]
851 async fn worker_context_cancel_token_fires_on_stop() {
852 use tokio::sync::Notify;
853
854 struct CancellableWorker {
855 observed_cancel: Arc<AtomicUsize>,
856 started: Arc<Notify>,
857 }
858
859 #[async_trait]
860 impl Worker for CancellableWorker {
861 async fn execute(
862 &self,
863 _job: &crate::core::Job,
864 ctx: &WorkerContext,
865 ) -> Result<WorkerResult> {
866 self.started.notify_one();
867 tokio::select! {
868 _ = ctx.cancel.cancelled() => {
869 self.observed_cancel.fetch_add(1, Ordering::Relaxed);
870 Ok(WorkerResult::success(None, 0))
871 }
872 _ = tokio::time::sleep(tokio::time::Duration::from_secs(10)) => {
873 Ok(WorkerResult::success(None, 10_000))
874 }
875 }
876 }
877
878 fn method_name(&self) -> &str {
879 "cancellable"
880 }
881 }
882
883 let storage = Arc::new(MemoryStorage::new());
884 let observed_cancel = Arc::new(AtomicUsize::new(0));
885 let started = Arc::new(Notify::new());
886
887 let mut registry = WorkerRegistry::new();
888 registry.register(CancellableWorker {
889 observed_cancel: observed_cancel.clone(),
890 started: started.clone(),
891 });
892 let registry = Arc::new(registry);
893
894 let config = ServerConfig::new("s2-test")
895 .worker_count(1)
896 .polling_interval(Duration::milliseconds(10))
897 .enable_scheduler(false)
898 .shutdown_timeout(Duration::seconds(5));
899
900 let server = BackgroundJobServer::new(config, storage.clone(), registry);
901
902 let job = crate::core::Job::new("cancellable", serde_json::Value::Null);
903 storage.enqueue(&job).await.unwrap();
904
905 server.start().await.unwrap();
906 started.notified().await;
908
909 server.stop().await.unwrap();
910 assert_eq!(
911 observed_cancel.load(Ordering::Relaxed),
912 1,
913 "worker should have observed its cancel token firing"
914 );
915 }
916
917 #[tokio::test]
920 async fn start_recovers_stranded_processing_jobs() {
921 let storage = Arc::new(MemoryStorage::new());
922
923 let mut stranded = crate::core::Job::new("noop", serde_json::Value::Null);
926 stranded.state = crate::core::JobState::Processing {
927 started_at: chrono::Utc::now() - Duration::hours(1),
928 worker_id: "dead-worker".to_string(),
929 server_name: "dead-server".to_string(),
930 };
931 let stranded_id = stranded.id.clone();
932 storage.enqueue(&stranded).await.unwrap();
933
934 let mut registry = WorkerRegistry::new();
935 registry.register(TestWorker::new("noop"));
936 let registry = Arc::new(registry);
937
938 let config = ServerConfig::new("s3-test")
939 .worker_count(0) .enable_scheduler(false)
941 .stale_processing_after(Duration::minutes(5));
942
943 let server = BackgroundJobServer::new(config, storage.clone(), registry);
944 server.start().await.unwrap();
945 tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
947 server.stop().await.unwrap();
948
949 let recovered = storage.get(&stranded_id).await.unwrap().unwrap();
950 assert!(
951 matches!(recovered.state, crate::core::JobState::Enqueued { .. }),
952 "stranded job should have been requeued, got {:?}",
953 recovered.state
954 );
955 }
956
957 #[tokio::test]
958 async fn test_job_processing() {
959 let storage = Arc::new(MemoryStorage::new());
960 let worker = TestWorker::new("test_method");
961 let call_count = worker.call_count.clone();
962
963 let mut registry = WorkerRegistry::new();
964 registry.register(worker);
965 let registry = Arc::new(registry);
966
967 let config = ServerConfig::new("test-server")
968 .worker_count(1)
969 .polling_interval(Duration::milliseconds(10))
970 .fetch_batch_size(1)
971 .enable_scheduler(false);
972
973 let server = BackgroundJobServer::new(config, storage.clone(), registry);
974
975 let job = crate::core::Job::new("test_method", serde_json::json!(["arg1".to_string()]));
977 storage.enqueue(&job).await.unwrap();
978
979 server.start().await.unwrap();
981
982 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
984
985 server.stop().await.unwrap();
987
988 assert!(call_count.load(Ordering::Relaxed) > 0);
990 }
991}