1use std::borrow::Cow;
2use std::sync::Arc;
3use std::time::Duration;
4
5use futures::FutureExt;
6use futures::future::BoxFuture;
7use tokio::sync::{Semaphore, mpsc};
8use tokio::task::JoinSet;
9use tokio::time::timeout;
10use tokio_util::sync::CancellationToken;
11
12#[derive(Clone, Debug)]
14pub struct BackgroundTaskConfig {
15 pub max_queue_size: usize,
16 pub max_concurrent_tasks: usize,
17 pub drain_timeout_secs: u64,
18}
19
20impl Default for BackgroundTaskConfig {
21 fn default() -> Self {
22 Self {
23 max_queue_size: 1024,
24 max_concurrent_tasks: 128,
25 drain_timeout_secs: 30,
26 }
27 }
28}
29
30#[derive(Clone, Debug)]
31pub struct BackgroundJobMetadata {
32 pub name: Cow<'static, str>,
33 pub request_id: Option<String>,
34}
35
36impl Default for BackgroundJobMetadata {
37 fn default() -> Self {
38 Self {
39 name: Cow::Borrowed("background_task"),
40 request_id: None,
41 }
42 }
43}
44
45pub type BackgroundJobFuture = BoxFuture<'static, Result<(), BackgroundJobError>>;
46
47struct BackgroundJob {
48 pub future: BackgroundJobFuture,
49 pub metadata: BackgroundJobMetadata,
50}
51
52impl BackgroundJob {
53 fn new<F>(future: F, metadata: BackgroundJobMetadata) -> Self
54 where
55 F: futures::Future<Output = Result<(), BackgroundJobError>> + Send + 'static,
56 {
57 Self {
58 future: future.boxed(),
59 metadata,
60 }
61 }
62}
63
64#[derive(Debug, Clone)]
65pub struct BackgroundJobError {
66 pub message: String,
67}
68
69impl From<String> for BackgroundJobError {
70 fn from(message: String) -> Self {
71 Self { message }
72 }
73}
74
75impl From<&str> for BackgroundJobError {
76 fn from(message: &str) -> Self {
77 Self {
78 message: message.to_string(),
79 }
80 }
81}
82
83#[derive(Debug, Clone)]
84pub enum BackgroundSpawnError {
85 QueueFull,
86}
87
88impl std::fmt::Display for BackgroundSpawnError {
89 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90 match self {
91 BackgroundSpawnError::QueueFull => write!(f, "background task queue is full"),
92 }
93 }
94}
95
96impl std::error::Error for BackgroundSpawnError {}
97
98#[derive(Debug)]
99pub struct BackgroundShutdownError;
100
101#[derive(Default)]
102struct BackgroundMetrics {
103 queued: std::sync::atomic::AtomicU64,
104 running: std::sync::atomic::AtomicU64,
105 failed: std::sync::atomic::AtomicU64,
106}
107
108impl BackgroundMetrics {
109 fn inc_queued(&self) {
110 self.queued.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
111 }
112
113 fn dec_queued(&self) {
114 self.queued.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
115 }
116
117 fn inc_running(&self) {
118 self.running.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
119 }
120
121 fn dec_running(&self) {
122 self.running.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
123 }
124
125 fn inc_failed(&self) {
126 self.failed.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
127 }
128}
129
130#[derive(Clone)]
131pub struct BackgroundHandle {
132 sender: mpsc::Sender<BackgroundJob>,
133 metrics: Arc<BackgroundMetrics>,
134}
135
136impl BackgroundHandle {
137 pub fn spawn<F, Fut>(&self, f: F) -> Result<(), BackgroundSpawnError>
138 where
139 F: FnOnce() -> Fut,
140 Fut: futures::Future<Output = Result<(), BackgroundJobError>> + Send + 'static,
141 {
142 let future = f();
143 self.spawn_with_metadata(future, BackgroundJobMetadata::default())
144 }
145
146 pub fn spawn_with_metadata<Fut>(
147 &self,
148 future: Fut,
149 metadata: BackgroundJobMetadata,
150 ) -> Result<(), BackgroundSpawnError>
151 where
152 Fut: futures::Future<Output = Result<(), BackgroundJobError>> + Send + 'static,
153 {
154 self.metrics.inc_queued();
155 let job = BackgroundJob::new(future, metadata);
156 self.sender.try_send(job).map_err(|_| {
157 self.metrics.dec_queued();
158 BackgroundSpawnError::QueueFull
159 })
160 }
161}
162
163pub struct BackgroundRuntime {
164 handle: BackgroundHandle,
165 drain_timeout: Duration,
166 shutdown_token: CancellationToken,
167 join_handle: tokio::task::JoinHandle<()>,
168}
169
170impl BackgroundRuntime {
171 pub async fn start(config: BackgroundTaskConfig) -> Self {
172 let (tx, rx) = mpsc::channel(config.max_queue_size);
173 let metrics = Arc::new(BackgroundMetrics::default());
174 let handle = BackgroundHandle {
175 sender: tx.clone(),
176 metrics: metrics.clone(),
177 };
178 let shutdown_token = CancellationToken::new();
179 let semaphore = Arc::new(Semaphore::new(config.max_concurrent_tasks));
180 let driver_token = shutdown_token.clone();
181
182 let join_handle = tokio::spawn(run_executor(rx, semaphore, metrics.clone(), driver_token));
183
184 Self {
185 handle,
186 drain_timeout: Duration::from_secs(config.drain_timeout_secs),
187 shutdown_token,
188 join_handle,
189 }
190 }
191
192 pub fn handle(&self) -> BackgroundHandle {
193 self.handle.clone()
194 }
195
196 pub async fn shutdown(self) -> Result<(), BackgroundShutdownError> {
197 self.shutdown_token.cancel();
198 drop(self.handle);
199 match timeout(self.drain_timeout, self.join_handle).await {
200 Ok(Ok(_)) => Ok(()),
201 _ => Err(BackgroundShutdownError),
202 }
203 }
204}
205
206async fn run_executor(
207 mut rx: mpsc::Receiver<BackgroundJob>,
208 semaphore: Arc<Semaphore>,
209 metrics: Arc<BackgroundMetrics>,
210 token: CancellationToken,
211) {
212 let mut join_set = JoinSet::new();
213 let token_clone = token.clone();
214
215 loop {
216 tokio::select! {
217 maybe_job = rx.recv() => {
218 match maybe_job {
219 Some(job) => {
220 metrics.dec_queued();
221 let semaphore = semaphore.clone();
222 let metrics_clone = metrics.clone();
223 join_set.spawn(async move {
224 let BackgroundJob { future, metadata } = job;
225 match semaphore.acquire_owned().await {
226 Ok(_permit) => {
227 metrics_clone.inc_running();
228 if let Err(err) = future.await {
229 metrics_clone.inc_failed();
230 tracing::error!(target = "spikard::background", task = %metadata.name, error = %err.message, "background task failed");
231 }
232 metrics_clone.dec_running();
233 }
234 Err(_) => {
235 metrics_clone.inc_failed();
236 tracing::warn!(target = "spikard::background", "failed to acquire semaphore permit for background task");
237 }
238 }
239 });
240 }
241 None => break,
242 }
243 }
244 _ = token_clone.cancelled() => {
245 break;
246 }
247 }
248 }
249
250 let mut drain_attempts = 0;
251 loop {
252 match rx.try_recv() {
253 Ok(job) => {
254 metrics.dec_queued();
255 let semaphore = semaphore.clone();
256 let metrics_clone = metrics.clone();
257 join_set.spawn(async move {
258 let BackgroundJob { future, metadata } = job;
259 match semaphore.acquire_owned().await {
260 Ok(_permit) => {
261 metrics_clone.inc_running();
262 if let Err(err) = future.await {
263 metrics_clone.inc_failed();
264 tracing::error!(target = "spikard::background", task = %metadata.name, error = %err.message, "background task failed");
265 }
266 metrics_clone.dec_running();
267 }
268 Err(_) => {
269 metrics_clone.inc_failed();
270 tracing::warn!(target = "spikard::background", "failed to acquire semaphore permit for background task");
271 }
272 }
273 });
274 drain_attempts = 0;
275 }
276 Err(mpsc::error::TryRecvError::Empty) => {
277 drain_attempts += 1;
278 if drain_attempts > 100 {
279 break;
280 }
281 tokio::time::sleep(Duration::from_millis(10)).await;
282 }
283 Err(mpsc::error::TryRecvError::Disconnected) => {
284 break;
285 }
286 }
287 }
288
289 while join_set.join_next().await.is_some() {}
290}
291
292#[cfg(test)]
293mod tests {
294 use super::*;
295 use std::sync::atomic::{AtomicU64, Ordering};
296
297 #[tokio::test]
298 async fn test_basic_spawn_and_execution() {
299 let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
300 let handle = runtime.handle();
301
302 let counter = Arc::new(AtomicU64::new(0));
303 let counter_clone = counter.clone();
304
305 handle
306 .spawn(move || {
307 let c = counter_clone.clone();
308 async move {
309 c.fetch_add(1, Ordering::SeqCst);
310 Ok(())
311 }
312 })
313 .expect("spawn failed");
314
315 tokio::time::sleep(Duration::from_millis(100)).await;
316 assert_eq!(counter.load(Ordering::SeqCst), 1);
317
318 runtime.shutdown().await.expect("shutdown failed");
319 }
320
321 #[tokio::test]
322 async fn test_multiple_tasks() {
323 let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
324 let handle = runtime.handle();
325
326 let counter = Arc::new(AtomicU64::new(0));
327
328 for _ in 0..10 {
329 let counter_clone = counter.clone();
330 handle
331 .spawn(move || {
332 let c = counter_clone.clone();
333 async move {
334 c.fetch_add(1, Ordering::SeqCst);
335 Ok(())
336 }
337 })
338 .expect("spawn failed");
339 }
340
341 tokio::time::sleep(Duration::from_millis(200)).await;
342 assert_eq!(counter.load(Ordering::SeqCst), 10);
343
344 runtime.shutdown().await.expect("shutdown failed");
345 }
346
347 #[tokio::test]
348 async fn test_task_with_metadata() {
349 let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
350 let handle = runtime.handle();
351
352 let metadata = BackgroundJobMetadata {
353 name: Cow::Owned("test_task".to_string()),
354 request_id: Some("req-123".to_string()),
355 };
356
357 let counter = Arc::new(AtomicU64::new(0));
358 let counter_clone = counter.clone();
359
360 let future = async move {
361 counter_clone.fetch_add(1, Ordering::SeqCst);
362 Ok(())
363 };
364
365 handle.spawn_with_metadata(future, metadata).expect("spawn failed");
366
367 tokio::time::sleep(Duration::from_millis(100)).await;
368 assert_eq!(counter.load(Ordering::SeqCst), 1);
369
370 runtime.shutdown().await.expect("shutdown failed");
371 }
372
373 #[tokio::test]
374 async fn test_queue_full_error() {
375 let config = BackgroundTaskConfig {
376 max_queue_size: 2,
377 max_concurrent_tasks: 10,
378 drain_timeout_secs: 5,
379 };
380
381 let runtime = BackgroundRuntime::start(config).await;
382 let handle = runtime.handle();
383
384 let blocking_barrier = Arc::new(tokio::sync::Barrier::new(3));
385
386 for _ in 0..2 {
387 let barrier = blocking_barrier.clone();
388 handle
389 .spawn(move || {
390 let b = barrier.clone();
391 async move {
392 b.wait().await;
393 tokio::time::sleep(Duration::from_secs(1)).await;
394 Ok(())
395 }
396 })
397 .expect("spawn failed");
398 }
399
400 let result = handle.spawn(move || async { Ok(()) });
401 assert!(matches!(result, Err(BackgroundSpawnError::QueueFull)));
402
403 blocking_barrier.wait().await;
404 tokio::time::sleep(Duration::from_millis(100)).await;
405
406 runtime.shutdown().await.expect("shutdown failed");
407 }
408
409 #[tokio::test]
410 async fn test_task_failure_handling() {
411 let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
412 let handle = runtime.handle();
413
414 let success_count = Arc::new(AtomicU64::new(0));
415 let success_count_clone = success_count.clone();
416
417 handle
418 .spawn(move || {
419 let s = success_count_clone.clone();
420 async move {
421 s.fetch_add(1, Ordering::SeqCst);
422 Err(BackgroundJobError::from("test error"))
423 }
424 })
425 .expect("spawn failed");
426
427 tokio::time::sleep(Duration::from_millis(100)).await;
428 assert_eq!(success_count.load(Ordering::SeqCst), 1);
429
430 runtime.shutdown().await.expect("shutdown failed");
431 }
432
433 #[tokio::test(flavor = "multi_thread")]
434 async fn test_concurrency_limit_with_proper_synchronization() {
435 let config = BackgroundTaskConfig {
436 max_queue_size: 100,
437 max_concurrent_tasks: 2,
438 drain_timeout_secs: 30,
439 };
440
441 let runtime = BackgroundRuntime::start(config).await;
442 let handle = runtime.handle();
443
444 let running_count = Arc::new(AtomicU64::new(0));
445 let max_concurrent = Arc::new(AtomicU64::new(0));
446
447 for _ in 0..5 {
448 let running = running_count.clone();
449 let max = max_concurrent.clone();
450
451 handle
452 .spawn(move || {
453 let r = running.clone();
454 let m = max.clone();
455 async move {
456 r.fetch_add(1, Ordering::SeqCst);
457 let current_running = r.load(Ordering::SeqCst);
458 let mut current_max = m.load(Ordering::SeqCst);
459 while current_running > current_max {
460 m.store(current_running, Ordering::SeqCst);
461 current_max = current_running;
462 }
463
464 tokio::time::sleep(Duration::from_millis(100)).await;
465 r.fetch_sub(1, Ordering::SeqCst);
466 Ok(())
467 }
468 })
469 .expect("spawn failed");
470 }
471
472 tokio::time::sleep(Duration::from_millis(700)).await;
473 let max_concurrent_observed = max_concurrent.load(Ordering::SeqCst);
474 assert!(
475 max_concurrent_observed <= 2,
476 "Max concurrent should be <= 2, but was {}",
477 max_concurrent_observed
478 );
479
480 runtime.shutdown().await.expect("shutdown failed");
481 }
482
483 #[tokio::test]
484 async fn test_graceful_shutdown() {
485 let config = BackgroundTaskConfig {
486 max_queue_size: 10,
487 max_concurrent_tasks: 2,
488 drain_timeout_secs: 5,
489 };
490
491 let runtime = BackgroundRuntime::start(config).await;
492 let handle = runtime.handle();
493
494 let counter = Arc::new(AtomicU64::new(0));
495 let counter_clone = counter.clone();
496
497 handle
498 .spawn(move || {
499 let c = counter_clone.clone();
500 async move {
501 tokio::time::sleep(Duration::from_millis(50)).await;
502 c.fetch_add(1, Ordering::SeqCst);
503 Ok(())
504 }
505 })
506 .expect("spawn failed");
507
508 tokio::time::sleep(Duration::from_millis(200)).await;
509
510 let result = runtime.shutdown().await;
511 assert!(result.is_ok());
512 assert_eq!(counter.load(Ordering::SeqCst), 1);
513 }
514
515 #[tokio::test]
516 async fn test_shutdown_timeout() {
517 let config = BackgroundTaskConfig {
518 max_queue_size: 10,
519 max_concurrent_tasks: 2,
520 drain_timeout_secs: 1,
521 };
522
523 let runtime = BackgroundRuntime::start(config).await;
524 let handle = runtime.handle();
525
526 handle
527 .spawn(|| async {
528 tokio::time::sleep(Duration::from_secs(5)).await;
529 Ok(())
530 })
531 .expect("spawn failed");
532
533 tokio::time::sleep(Duration::from_millis(100)).await;
534
535 let result = runtime.shutdown().await;
536 assert!(result.is_err());
537 }
538
539 #[tokio::test]
540 async fn test_metrics_tracking() {
541 let config = BackgroundTaskConfig::default();
542 let runtime = BackgroundRuntime::start(config).await;
543 let handle = runtime.handle();
544
545 let barrier = Arc::new(tokio::sync::Barrier::new(2));
546
547 for _ in 0..2 {
548 let b = barrier.clone();
549 let _ = handle.spawn(move || {
550 let barrier = b.clone();
551 async move {
552 barrier.wait().await;
553 Ok(())
554 }
555 });
556 }
557
558 tokio::time::sleep(Duration::from_millis(150)).await;
559
560 runtime.shutdown().await.expect("shutdown failed");
561 }
562
563 #[tokio::test]
564 async fn test_task_cancellation_on_shutdown() {
565 let config = BackgroundTaskConfig {
566 max_queue_size: 10,
567 max_concurrent_tasks: 2,
568 drain_timeout_secs: 1,
569 };
570
571 let runtime = BackgroundRuntime::start(config).await;
572 let handle = runtime.handle();
573
574 let started_count = Arc::new(AtomicU64::new(0));
575 let _completed_count = Arc::new(AtomicU64::new(0));
576
577 let started = started_count.clone();
578
579 handle
580 .spawn(move || {
581 let s = started.clone();
582 async move {
583 s.fetch_add(1, Ordering::SeqCst);
584 tokio::time::sleep(Duration::from_secs(10)).await;
585 Ok(())
586 }
587 })
588 .expect("spawn failed");
589
590 tokio::time::sleep(Duration::from_millis(100)).await;
591 assert_eq!(started_count.load(Ordering::SeqCst), 1);
592
593 let shutdown_start = std::time::Instant::now();
594 let result = runtime.shutdown().await;
595 let shutdown_elapsed = shutdown_start.elapsed();
596
597 assert!(result.is_err());
598 assert!(shutdown_elapsed < Duration::from_secs(3));
599 }
600
601 #[tokio::test]
602 async fn test_queue_overflow_multiple_spawns() {
603 let config = BackgroundTaskConfig {
604 max_queue_size: 3,
605 max_concurrent_tasks: 10,
606 drain_timeout_secs: 5,
607 };
608
609 let runtime = BackgroundRuntime::start(config).await;
610 let handle = runtime.handle();
611
612 let blocking_barrier = Arc::new(tokio::sync::Barrier::new(4));
613
614 for _ in 0..3 {
615 let b = blocking_barrier.clone();
616 handle
617 .spawn(move || {
618 let barrier = b.clone();
619 async move {
620 barrier.wait().await;
621 tokio::time::sleep(Duration::from_millis(100)).await;
622 Ok(())
623 }
624 })
625 .expect("spawn failed");
626 }
627
628 let result = handle.spawn(|| async { Ok(()) });
629 assert!(matches!(result, Err(BackgroundSpawnError::QueueFull)));
630
631 blocking_barrier.wait().await;
632 tokio::time::sleep(Duration::from_millis(200)).await;
633
634 let result = handle.spawn(|| async { Ok(()) });
635 assert!(result.is_ok());
636
637 runtime.shutdown().await.expect("shutdown failed");
638 }
639
640 #[tokio::test]
641 async fn test_concurrent_task_execution_order() {
642 let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
643 let handle = runtime.handle();
644
645 let execution_order = Arc::new(tokio::sync::Mutex::new(Vec::new()));
646
647 for i in 0..5 {
648 let order = execution_order.clone();
649 handle
650 .spawn(move || {
651 let o = order.clone();
652 async move {
653 o.lock().await.push(i);
654 Ok(())
655 }
656 })
657 .expect("spawn failed");
658 }
659
660 tokio::time::sleep(Duration::from_millis(200)).await;
661
662 let order = execution_order.lock().await;
663 assert_eq!(order.len(), 5);
664 for i in 0..5 {
665 assert!(order.contains(&i));
666 }
667
668 runtime.shutdown().await.expect("shutdown failed");
669 }
670
671 #[tokio::test]
672 async fn test_error_from_string_conversion() {
673 let error = BackgroundJobError::from("test message");
674 assert_eq!(error.message, "test message");
675
676 let error2 = BackgroundJobError::from("test".to_string());
677 assert_eq!(error2.message, "test");
678 }
679
680 #[tokio::test]
681 async fn test_background_job_metadata_default() {
682 let metadata = BackgroundJobMetadata::default();
683 assert_eq!(metadata.name, "background_task");
684 assert_eq!(metadata.request_id, None);
685 }
686
687 #[tokio::test]
688 async fn test_background_job_metadata_custom() {
689 let metadata = BackgroundJobMetadata {
690 name: Cow::Borrowed("custom_task"),
691 request_id: Some("req-456".to_string()),
692 };
693 assert_eq!(metadata.name, "custom_task");
694 assert_eq!(metadata.request_id, Some("req-456".to_string()));
695 }
696
697 #[tokio::test]
698 async fn test_metrics_inc_dec_operations() {
699 let metrics = BackgroundMetrics::default();
700
701 metrics.inc_queued();
702 assert_eq!(metrics.queued.load(Ordering::Relaxed), 1);
703
704 metrics.inc_queued();
705 assert_eq!(metrics.queued.load(Ordering::Relaxed), 2);
706
707 metrics.dec_queued();
708 assert_eq!(metrics.queued.load(Ordering::Relaxed), 1);
709
710 metrics.inc_running();
711 assert_eq!(metrics.running.load(Ordering::Relaxed), 1);
712
713 metrics.dec_running();
714 assert_eq!(metrics.running.load(Ordering::Relaxed), 0);
715
716 metrics.inc_failed();
717 assert_eq!(metrics.failed.load(Ordering::Relaxed), 1);
718
719 metrics.inc_failed();
720 assert_eq!(metrics.failed.load(Ordering::Relaxed), 2);
721 }
722
723 #[tokio::test]
724 async fn test_spawn_error_display() {
725 let error = BackgroundSpawnError::QueueFull;
726 assert_eq!(error.to_string(), "background task queue is full");
727 }
728
729 #[tokio::test]
730 async fn test_background_config_default() {
731 let config = BackgroundTaskConfig::default();
732 assert_eq!(config.max_queue_size, 1024);
733 assert_eq!(config.max_concurrent_tasks, 128);
734 assert_eq!(config.drain_timeout_secs, 30);
735 }
736
737 #[tokio::test]
738 async fn test_shutdown_with_zero_pending_tasks() {
739 let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
740
741 let result = runtime.shutdown().await;
742 assert!(result.is_ok(), "shutdown should succeed with no tasks");
743 }
744
745 #[tokio::test]
746 async fn test_shutdown_with_only_running_tasks() {
747 let config = BackgroundTaskConfig {
748 max_queue_size: 10,
749 max_concurrent_tasks: 2,
750 drain_timeout_secs: 5,
751 };
752 let runtime = BackgroundRuntime::start(config).await;
753 let handle = runtime.handle();
754
755 let execution_started: Arc<std::sync::atomic::AtomicBool> = Arc::new(std::sync::atomic::AtomicBool::new(false));
756 let execution_completed: Arc<std::sync::atomic::AtomicBool> =
757 Arc::new(std::sync::atomic::AtomicBool::new(false));
758
759 let started = execution_started.clone();
760 let completed = execution_completed.clone();
761
762 handle
763 .spawn(move || {
764 let s = started.clone();
765 let c = completed.clone();
766 async move {
767 s.store(true, std::sync::atomic::Ordering::SeqCst);
768 tokio::time::sleep(Duration::from_millis(100)).await;
769 c.store(true, std::sync::atomic::Ordering::SeqCst);
770 Ok(())
771 }
772 })
773 .unwrap();
774
775 tokio::time::sleep(Duration::from_millis(20)).await;
776
777 let result = runtime.shutdown().await;
778 assert!(result.is_ok(), "shutdown should succeed and wait for running tasks");
779 assert!(
780 execution_completed.load(std::sync::atomic::Ordering::SeqCst),
781 "task should have completed"
782 );
783 }
784
785 #[tokio::test]
786 async fn test_shutdown_drains_queued_tasks() {
787 let config = BackgroundTaskConfig {
788 max_queue_size: 100,
789 max_concurrent_tasks: 1,
790 drain_timeout_secs: 5,
791 };
792 let runtime = BackgroundRuntime::start(config).await;
793 let handle = runtime.handle();
794
795 let execution_count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
796
797 for _ in 0..10 {
798 let count = execution_count.clone();
799 handle
800 .spawn(move || {
801 let c = count.clone();
802 async move {
803 c.fetch_add(1, Ordering::SeqCst);
804 tokio::time::sleep(Duration::from_millis(10)).await;
805 Ok(())
806 }
807 })
808 .unwrap();
809 }
810
811 let result = runtime.shutdown().await;
812 assert!(result.is_ok());
813 assert_eq!(
814 execution_count.load(Ordering::SeqCst),
815 10,
816 "all queued tasks should execute"
817 );
818 }
819
820 #[tokio::test]
821 async fn test_shutdown_timeout_force_stops_long_tasks() {
822 let config = BackgroundTaskConfig {
823 max_queue_size: 10,
824 max_concurrent_tasks: 2,
825 drain_timeout_secs: 1,
826 };
827 let runtime = BackgroundRuntime::start(config).await;
828 let handle = runtime.handle();
829
830 let completed: Arc<std::sync::atomic::AtomicBool> = Arc::new(std::sync::atomic::AtomicBool::new(false));
831 let completed_clone = completed.clone();
832
833 handle
834 .spawn(move || {
835 let c = completed_clone.clone();
836 async move {
837 tokio::time::sleep(Duration::from_secs(10)).await;
838 c.store(true, std::sync::atomic::Ordering::SeqCst);
839 Ok(())
840 }
841 })
842 .unwrap();
843
844 tokio::time::sleep(Duration::from_millis(50)).await;
845
846 let shutdown_start = std::time::Instant::now();
847 let result = runtime.shutdown().await;
848 let elapsed = shutdown_start.elapsed();
849
850 assert!(result.is_err(), "shutdown should timeout");
851 assert!(
852 elapsed < Duration::from_secs(3),
853 "shutdown should timeout near drain_timeout"
854 );
855 assert!(
856 !completed.load(std::sync::atomic::Ordering::SeqCst),
857 "long-running task should not complete"
858 );
859 }
860
861 #[tokio::test]
862 async fn test_multiple_shutdown_calls_idempotent() {
863 let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
864
865 let result1 = runtime.shutdown().await;
866 assert!(result1.is_ok(), "first shutdown should succeed");
867 }
868
869 #[tokio::test]
870 async fn test_spawn_after_all_senders_dropped_fails() {
871 let config = BackgroundTaskConfig::default();
872 let runtime = BackgroundRuntime::start(config).await;
873 let handle = runtime.handle();
874
875 runtime.shutdown().await.expect("shutdown should succeed");
876
877 tokio::time::sleep(Duration::from_millis(50)).await;
878
879 let result = handle.spawn(|| async { Ok(()) });
880 assert!(result.is_err(), "spawn should fail after all senders are dropped");
881 }
882
883 #[tokio::test]
884 async fn test_concurrent_spawns_hit_semaphore_limit() {
885 let config = BackgroundTaskConfig {
886 max_queue_size: 100,
887 max_concurrent_tasks: 3,
888 drain_timeout_secs: 10,
889 };
890 let runtime = BackgroundRuntime::start(config).await;
891 let handle = runtime.handle();
892
893 let barrier: Arc<tokio::sync::Barrier> = Arc::new(tokio::sync::Barrier::new(3));
894 let running_count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
895 let peak_concurrent: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
896
897 for _ in 0..5 {
898 let b = barrier.clone();
899 let running = running_count.clone();
900 let peak = peak_concurrent.clone();
901
902 handle
903 .spawn(move || {
904 let barrier = b.clone();
905 let r = running.clone();
906 let p = peak.clone();
907 async move {
908 let current = r.fetch_add(1, Ordering::SeqCst) + 1;
909 let mut peak_val = p.load(Ordering::SeqCst);
910 while current > peak_val {
911 if p.compare_exchange(peak_val, current, Ordering::SeqCst, Ordering::SeqCst)
912 .is_ok()
913 {
914 break;
915 }
916 peak_val = p.load(Ordering::SeqCst);
917 }
918
919 barrier.wait().await;
920 tokio::time::sleep(Duration::from_millis(200)).await;
921 r.fetch_sub(1, Ordering::SeqCst);
922 Ok(())
923 }
924 })
925 .unwrap();
926 }
927
928 barrier.wait().await;
929 tokio::time::sleep(Duration::from_millis(100)).await;
930
931 let peak = peak_concurrent.load(Ordering::SeqCst);
932 assert!(
933 peak <= 3,
934 "concurrent execution should not exceed semaphore limit of 3, got {}",
935 peak
936 );
937
938 runtime.shutdown().await.unwrap();
939 }
940
941 #[tokio::test]
942 async fn test_task_panic_cleanup_still_occurs() {
943 let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
944 let handle = runtime.handle();
945
946 let mut spawned_count: u32 = 0;
947 let panic_task_executed: Arc<std::sync::atomic::AtomicBool> =
948 Arc::new(std::sync::atomic::AtomicBool::new(false));
949 let after_panic_executed: Arc<std::sync::atomic::AtomicBool> =
950 Arc::new(std::sync::atomic::AtomicBool::new(false));
951
952 let panic_flag = panic_task_executed.clone();
953 handle
954 .spawn(move || {
955 let p = panic_flag.clone();
956 async move {
957 p.store(true, std::sync::atomic::Ordering::SeqCst);
958 Err(BackgroundJobError::from("simulated task failure"))
959 }
960 })
961 .unwrap();
962 spawned_count += 1;
963
964 let after_flag = after_panic_executed.clone();
965 handle
966 .spawn(move || {
967 let a = after_flag.clone();
968 async move {
969 tokio::time::sleep(Duration::from_millis(50)).await;
970 a.store(true, std::sync::atomic::Ordering::SeqCst);
971 Ok(())
972 }
973 })
974 .unwrap();
975 spawned_count += 1;
976
977 tokio::time::sleep(Duration::from_millis(200)).await;
978
979 assert!(panic_task_executed.load(std::sync::atomic::Ordering::SeqCst));
980 assert!(after_panic_executed.load(std::sync::atomic::Ordering::SeqCst));
981 assert_eq!(spawned_count, 2);
982
983 runtime.shutdown().await.unwrap();
984 }
985
986 #[tokio::test]
987 async fn test_queue_overflow_with_immediate_rejection() {
988 let config = BackgroundTaskConfig {
989 max_queue_size: 2,
990 max_concurrent_tasks: 100,
991 drain_timeout_secs: 5,
992 };
993 let runtime = BackgroundRuntime::start(config).await;
994 let handle = runtime.handle();
995
996 let barrier: Arc<tokio::sync::Barrier> = Arc::new(tokio::sync::Barrier::new(3));
997
998 for _ in 0..2 {
999 let b = barrier.clone();
1000 handle
1001 .spawn(move || {
1002 let barrier = b.clone();
1003 async move {
1004 barrier.wait().await;
1005 tokio::time::sleep(Duration::from_millis(500)).await;
1006 Ok(())
1007 }
1008 })
1009 .unwrap();
1010 }
1011
1012 let overflow_result = handle.spawn(|| async { Ok(()) });
1013 assert!(matches!(overflow_result, Err(BackgroundSpawnError::QueueFull)));
1014
1015 barrier.wait().await;
1016 runtime.shutdown().await.unwrap();
1017 }
1018
1019 #[tokio::test]
1020 async fn test_metrics_accuracy_under_concurrent_load() {
1021 let config = BackgroundTaskConfig {
1022 max_queue_size: 50,
1023 max_concurrent_tasks: 5,
1024 drain_timeout_secs: 10,
1025 };
1026 let runtime = BackgroundRuntime::start(config).await;
1027 let handle = runtime.handle();
1028
1029 let completed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1030
1031 for _ in 0..20 {
1032 let c = completed.clone();
1033 handle
1034 .spawn(move || {
1035 let count = c.clone();
1036 async move {
1037 tokio::time::sleep(Duration::from_millis(50)).await;
1038 count.fetch_add(1, Ordering::SeqCst);
1039 Ok(())
1040 }
1041 })
1042 .unwrap();
1043 }
1044
1045 runtime.shutdown().await.unwrap();
1046 assert_eq!(completed.load(Ordering::SeqCst), 20, "all tasks should complete");
1047 }
1048
1049 #[tokio::test]
1050 async fn test_drain_with_slowly_completing_tasks() {
1051 let config = BackgroundTaskConfig {
1052 max_queue_size: 50,
1053 max_concurrent_tasks: 2,
1054 drain_timeout_secs: 10,
1055 };
1056 let runtime = BackgroundRuntime::start(config).await;
1057 let handle = runtime.handle();
1058
1059 let completed_count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1060
1061 for i in 0..5 {
1062 let count = completed_count.clone();
1063 handle
1064 .spawn(move || {
1065 let c = count.clone();
1066 async move {
1067 let sleep_ms = 100 + (i as u64 * 50);
1068 tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
1069 c.fetch_add(1, Ordering::SeqCst);
1070 Ok(())
1071 }
1072 })
1073 .unwrap();
1074 }
1075
1076 let result = runtime.shutdown().await;
1077 assert!(result.is_ok());
1078 assert_eq!(completed_count.load(Ordering::SeqCst), 5);
1079 }
1080
1081 #[tokio::test]
1082 async fn test_semaphore_starvation_doesnt_deadlock() {
1083 let config = BackgroundTaskConfig {
1084 max_queue_size: 100,
1085 max_concurrent_tasks: 1,
1086 drain_timeout_secs: 10,
1087 };
1088 let runtime = BackgroundRuntime::start(config).await;
1089 let handle = runtime.handle();
1090
1091 let completion_order: Arc<tokio::sync::Mutex<Vec<u32>>> = Arc::new(tokio::sync::Mutex::new(Vec::new()));
1092
1093 for i in 0..10 {
1094 let order = completion_order.clone();
1095 handle
1096 .spawn(move || {
1097 let o = order.clone();
1098 async move {
1099 tokio::time::sleep(Duration::from_millis(5)).await;
1100 let mut guard = o.lock().await;
1101 guard.push(i);
1102 Ok(())
1103 }
1104 })
1105 .unwrap();
1106 }
1107
1108 let result = runtime.shutdown().await;
1109 assert!(result.is_ok());
1110
1111 let order = completion_order.lock().await;
1112 assert_eq!(order.len(), 10);
1113 }
1114
1115 #[tokio::test]
1116 async fn test_cancel_task_mid_execution() {
1117 let config = BackgroundTaskConfig {
1118 max_queue_size: 10,
1119 max_concurrent_tasks: 2,
1120 drain_timeout_secs: 1,
1121 };
1122 let runtime = BackgroundRuntime::start(config).await;
1123 let handle = runtime.handle();
1124
1125 let started: Arc<std::sync::atomic::AtomicBool> = Arc::new(std::sync::atomic::AtomicBool::new(false));
1126 let ended: Arc<std::sync::atomic::AtomicBool> = Arc::new(std::sync::atomic::AtomicBool::new(false));
1127
1128 let start_flag = started.clone();
1129 let end_flag = ended.clone();
1130
1131 handle
1132 .spawn(move || {
1133 let s = start_flag.clone();
1134 let e = end_flag.clone();
1135 async move {
1136 s.store(true, std::sync::atomic::Ordering::SeqCst);
1137 tokio::time::sleep(Duration::from_secs(10)).await;
1138 e.store(true, std::sync::atomic::Ordering::SeqCst);
1139 Ok(())
1140 }
1141 })
1142 .unwrap();
1143
1144 tokio::time::sleep(Duration::from_millis(50)).await;
1145 assert!(started.load(std::sync::atomic::Ordering::SeqCst));
1146
1147 let result = runtime.shutdown().await;
1148 assert!(result.is_err(), "shutdown should timeout due to long task");
1149 assert!(
1150 !ended.load(std::sync::atomic::Ordering::SeqCst),
1151 "task should not complete"
1152 );
1153 }
1154
1155 #[tokio::test]
1156 async fn test_rapid_spawn_and_shutdown() {
1157 let config = BackgroundTaskConfig {
1158 max_queue_size: 1000,
1159 max_concurrent_tasks: 10,
1160 drain_timeout_secs: 5,
1161 };
1162 let runtime = BackgroundRuntime::start(config).await;
1163 let handle = runtime.handle();
1164
1165 let count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1166
1167 for _ in 0..100 {
1168 let c = count.clone();
1169 let _ = handle.spawn(move || {
1170 let counter = c.clone();
1171 async move {
1172 counter.fetch_add(1, Ordering::SeqCst);
1173 Ok(())
1174 }
1175 });
1176 }
1177
1178 let result = runtime.shutdown().await;
1179 assert!(result.is_ok());
1180
1181 let final_count = count.load(Ordering::SeqCst);
1182 assert!(final_count > 0, "at least some tasks should execute");
1183 assert!(final_count <= 100, "no more than spawned count should execute");
1184 }
1185
1186 #[tokio::test]
1187 async fn test_shutdown_with_mixed_success_and_failure_tasks() {
1188 let config = BackgroundTaskConfig::default();
1189 let runtime = BackgroundRuntime::start(config).await;
1190 let handle = runtime.handle();
1191
1192 let success_count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1193 let failure_count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1194
1195 for i in 0..10 {
1196 if i % 2 == 0 {
1197 let s = success_count.clone();
1198 handle
1199 .spawn(move || {
1200 let counter = s.clone();
1201 async move {
1202 counter.fetch_add(1, Ordering::SeqCst);
1203 Ok(())
1204 }
1205 })
1206 .unwrap();
1207 } else {
1208 let f = failure_count.clone();
1209 handle
1210 .spawn(move || {
1211 let counter = f.clone();
1212 async move {
1213 counter.fetch_add(1, Ordering::SeqCst);
1214 Err(BackgroundJobError::from("intentional failure"))
1215 }
1216 })
1217 .unwrap();
1218 }
1219 }
1220
1221 tokio::time::sleep(Duration::from_millis(200)).await;
1222
1223 let result = runtime.shutdown().await;
1224 assert!(result.is_ok());
1225 assert_eq!(success_count.load(Ordering::SeqCst), 5);
1226 assert_eq!(failure_count.load(Ordering::SeqCst), 5);
1227 }
1228
1229 #[tokio::test]
1230 async fn test_concurrent_handle_clones_spawn_independently() {
1231 let config = BackgroundTaskConfig::default();
1232 let runtime = BackgroundRuntime::start(config).await;
1233 let handle = runtime.handle();
1234
1235 let count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1236
1237 let mut join_handles = vec![];
1238
1239 for _ in 0..3 {
1240 let h = handle.clone();
1241 let c = count.clone();
1242
1243 let jh = tokio::spawn(async move {
1244 for _ in 0..5 {
1245 let counter = c.clone();
1246 let _ = h.spawn(move || {
1247 let cnt = counter.clone();
1248 async move {
1249 cnt.fetch_add(1, Ordering::SeqCst);
1250 Ok(())
1251 }
1252 });
1253 }
1254 });
1255 join_handles.push(jh);
1256 }
1257
1258 for jh in join_handles {
1259 let _ = jh.await;
1260 }
1261
1262 tokio::time::sleep(Duration::from_millis(200)).await;
1263
1264 let result = runtime.shutdown().await;
1265 assert!(result.is_ok());
1266 assert_eq!(count.load(Ordering::SeqCst), 15);
1267 }
1268
1269 #[tokio::test]
1270 async fn test_queue_full_metrics_updated() {
1271 let config = BackgroundTaskConfig {
1272 max_queue_size: 2,
1273 max_concurrent_tasks: 100,
1274 drain_timeout_secs: 5,
1275 };
1276 let runtime = BackgroundRuntime::start(config).await;
1277 let handle = runtime.handle();
1278
1279 let barrier: Arc<tokio::sync::Barrier> = Arc::new(tokio::sync::Barrier::new(3));
1280
1281 for _ in 0..2 {
1282 let b = barrier.clone();
1283 handle
1284 .spawn(move || {
1285 let barrier = b.clone();
1286 async move {
1287 barrier.wait().await;
1288 tokio::time::sleep(Duration::from_secs(1)).await;
1289 Ok(())
1290 }
1291 })
1292 .unwrap();
1293 }
1294
1295 let result = handle.spawn(|| async { Ok(()) });
1296 assert!(matches!(result, Err(BackgroundSpawnError::QueueFull)));
1297
1298 barrier.wait().await;
1299 tokio::time::sleep(Duration::from_millis(100)).await;
1300
1301 runtime.shutdown().await.unwrap();
1302 }
1303
1304 #[tokio::test]
1305 async fn test_handle_persistence_across_spawns() {
1306 let config = BackgroundTaskConfig::default();
1307 let runtime = BackgroundRuntime::start(config).await;
1308 let handle = runtime.handle();
1309
1310 let count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1311
1312 for _ in 0..5 {
1313 let c = count.clone();
1314 handle
1315 .spawn(move || {
1316 let counter = c.clone();
1317 async move {
1318 counter.fetch_add(1, Ordering::SeqCst);
1319 Ok(())
1320 }
1321 })
1322 .unwrap();
1323 }
1324
1325 tokio::time::sleep(Duration::from_millis(150)).await;
1326 assert_eq!(count.load(Ordering::SeqCst), 5);
1327
1328 runtime.shutdown().await.unwrap();
1329 }
1330
1331 #[tokio::test]
1332 async fn test_shutdown_with_queue_at_capacity() {
1333 let config = BackgroundTaskConfig {
1334 max_queue_size: 5,
1335 max_concurrent_tasks: 1,
1336 drain_timeout_secs: 10,
1337 };
1338 let runtime = BackgroundRuntime::start(config).await;
1339 let handle = runtime.handle();
1340
1341 let completion_count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1342
1343 for _ in 0..5 {
1344 let c = completion_count.clone();
1345 handle
1346 .spawn(move || {
1347 let counter = c.clone();
1348 async move {
1349 tokio::time::sleep(Duration::from_millis(20)).await;
1350 counter.fetch_add(1, Ordering::SeqCst);
1351 Ok(())
1352 }
1353 })
1354 .unwrap();
1355 }
1356
1357 let result = runtime.shutdown().await;
1358 assert!(result.is_ok());
1359 assert_eq!(completion_count.load(Ordering::SeqCst), 5);
1360 }
1361
1362 #[tokio::test]
1363 async fn test_metadata_preserved_through_execution() {
1364 let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
1365 let handle = runtime.handle();
1366
1367 let metadata = BackgroundJobMetadata {
1368 name: Cow::Owned("test_metadata_task".to_string()),
1369 request_id: Some("req-metadata-123".to_string()),
1370 };
1371
1372 let executed: Arc<std::sync::atomic::AtomicBool> = Arc::new(std::sync::atomic::AtomicBool::new(false));
1373 let executed_clone = executed.clone();
1374
1375 let future = async move {
1376 executed_clone.store(true, std::sync::atomic::Ordering::SeqCst);
1377 Ok(())
1378 };
1379
1380 handle.spawn_with_metadata(future, metadata).unwrap();
1381
1382 tokio::time::sleep(Duration::from_millis(100)).await;
1383
1384 assert!(executed.load(std::sync::atomic::Ordering::SeqCst));
1385 runtime.shutdown().await.unwrap();
1386 }
1387
1388 #[tokio::test]
1389 async fn test_very_short_drain_timeout_forces_stop() {
1390 let config = BackgroundTaskConfig {
1391 max_queue_size: 10,
1392 max_concurrent_tasks: 2,
1393 drain_timeout_secs: 0,
1394 };
1395 let runtime = BackgroundRuntime::start(config).await;
1396 let handle = runtime.handle();
1397
1398 handle
1399 .spawn(|| async {
1400 tokio::time::sleep(Duration::from_secs(1)).await;
1401 Ok(())
1402 })
1403 .unwrap();
1404
1405 tokio::time::sleep(Duration::from_millis(10)).await;
1406
1407 let result = runtime.shutdown().await;
1408 assert!(result.is_err());
1409 }
1410
1411 #[tokio::test]
1412 async fn test_spawn_many_tasks_sequential_drain() {
1413 let config = BackgroundTaskConfig {
1414 max_queue_size: 200,
1415 max_concurrent_tasks: 2,
1416 drain_timeout_secs: 15,
1417 };
1418 let runtime = BackgroundRuntime::start(config).await;
1419 let handle = runtime.handle();
1420
1421 let count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1422
1423 for _ in 0..50 {
1424 let c = count.clone();
1425 handle
1426 .spawn(move || {
1427 let counter = c.clone();
1428 async move {
1429 tokio::time::sleep(Duration::from_millis(1)).await;
1430 counter.fetch_add(1, Ordering::SeqCst);
1431 Ok(())
1432 }
1433 })
1434 .unwrap();
1435 }
1436
1437 let result = runtime.shutdown().await;
1438 assert!(result.is_ok());
1439 assert_eq!(count.load(Ordering::SeqCst), 50);
1440 }
1441
1442 #[tokio::test]
1443 async fn test_no_deadlock_with_max_concurrency_barrier() {
1444 let config = BackgroundTaskConfig {
1445 max_queue_size: 100,
1446 max_concurrent_tasks: 3,
1447 drain_timeout_secs: 10,
1448 };
1449 let runtime = BackgroundRuntime::start(config).await;
1450 let handle = runtime.handle();
1451
1452 let barrier: Arc<tokio::sync::Barrier> = Arc::new(tokio::sync::Barrier::new(4));
1453
1454 for _ in 0..3 {
1455 let b = barrier.clone();
1456 handle
1457 .spawn(move || {
1458 let barrier = b.clone();
1459 async move {
1460 barrier.wait().await;
1461 tokio::time::sleep(Duration::from_millis(50)).await;
1462 Ok(())
1463 }
1464 })
1465 .unwrap();
1466 }
1467
1468 barrier.wait().await;
1469 tokio::time::sleep(Duration::from_millis(100)).await;
1470
1471 let result = runtime.shutdown().await;
1472 assert!(result.is_ok());
1473 }
1474
1475 #[tokio::test]
1476 async fn test_error_from_owned_string() {
1477 let message = String::from("error message");
1478 let error = BackgroundJobError::from(message);
1479 assert_eq!(error.message, "error message");
1480 }
1481
1482 #[tokio::test]
1483 async fn test_borrowed_str_conversion() {
1484 let error = BackgroundJobError::from("borrowed message");
1485 assert_eq!(error.message, "borrowed message");
1486 }
1487
1488 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1489 async fn test_phase1_semaphore_acquisition_with_concurrent_load() {
1490 let config = BackgroundTaskConfig {
1491 max_queue_size: 50,
1492 max_concurrent_tasks: 1,
1493 drain_timeout_secs: 10,
1494 };
1495
1496 let runtime = BackgroundRuntime::start(config).await;
1497 let handle = runtime.handle();
1498
1499 let execution_count = Arc::new(AtomicU64::new(0));
1500
1501 let blocking_barrier = Arc::new(tokio::sync::Barrier::new(2));
1502 let barrier_clone = blocking_barrier.clone();
1503
1504 handle
1505 .spawn(move || {
1506 let b = barrier_clone.clone();
1507 async move {
1508 b.wait().await;
1509 tokio::time::sleep(Duration::from_millis(100)).await;
1510 Ok(())
1511 }
1512 })
1513 .unwrap();
1514
1515 tokio::time::sleep(Duration::from_millis(50)).await;
1516
1517 for _ in 0..3 {
1518 let exec_clone = execution_count.clone();
1519 handle
1520 .spawn(move || {
1521 let e = exec_clone.clone();
1522 async move {
1523 e.fetch_add(1, Ordering::SeqCst);
1524 Ok(())
1525 }
1526 })
1527 .unwrap();
1528 }
1529
1530 blocking_barrier.wait().await;
1531 tokio::time::sleep(Duration::from_millis(250)).await;
1532
1533 assert_eq!(execution_count.load(Ordering::SeqCst), 3);
1534
1535 runtime.shutdown().await.unwrap();
1536 }
1537
1538 #[tokio::test(flavor = "multi_thread")]
1539 async fn test_concurrent_task_completion_race_conditions() {
1540 let config = BackgroundTaskConfig {
1541 max_queue_size: 100,
1542 max_concurrent_tasks: 8,
1543 drain_timeout_secs: 10,
1544 };
1545
1546 let runtime = BackgroundRuntime::start(config).await;
1547 let handle = runtime.handle();
1548
1549 let completion_counter = Arc::new(AtomicU64::new(0));
1550 let task_count = 50;
1551
1552 for i in 0..task_count {
1553 let counter = completion_counter.clone();
1554 handle
1555 .spawn(move || {
1556 let c = counter.clone();
1557 async move {
1558 let sleep_ms = (i as u64 * 11) % 100;
1559 tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
1560 c.fetch_add(1, Ordering::SeqCst);
1561 Ok(())
1562 }
1563 })
1564 .unwrap();
1565 }
1566
1567 runtime.shutdown().await.unwrap();
1568 assert_eq!(
1569 completion_counter.load(Ordering::SeqCst),
1570 task_count,
1571 "all tasks should complete despite race conditions"
1572 );
1573 }
1574
1575 #[tokio::test(flavor = "multi_thread")]
1576 async fn test_failure_metric_tracking_under_concurrent_errors() {
1577 let config = BackgroundTaskConfig {
1578 max_queue_size: 50,
1579 max_concurrent_tasks: 5,
1580 drain_timeout_secs: 10,
1581 };
1582
1583 let runtime = BackgroundRuntime::start(config).await;
1584 let handle = runtime.handle();
1585
1586 let success_count = Arc::new(AtomicU64::new(0));
1587 let failure_count = Arc::new(AtomicU64::new(0));
1588
1589 for i in 0..20 {
1590 if i % 3 == 0 {
1591 let fail_clone = failure_count.clone();
1592 handle
1593 .spawn(move || {
1594 let f = fail_clone.clone();
1595 async move {
1596 f.fetch_add(1, Ordering::SeqCst);
1597 Err(BackgroundJobError::from("intentional failure"))
1598 }
1599 })
1600 .unwrap();
1601 } else {
1602 let succ_clone = success_count.clone();
1603 handle
1604 .spawn(move || {
1605 let s = succ_clone.clone();
1606 async move {
1607 s.fetch_add(1, Ordering::SeqCst);
1608 Ok(())
1609 }
1610 })
1611 .unwrap();
1612 }
1613 }
1614
1615 runtime.shutdown().await.unwrap();
1616
1617 let final_success = success_count.load(Ordering::SeqCst);
1618 let final_failure = failure_count.load(Ordering::SeqCst);
1619
1620 assert_eq!(final_success + final_failure, 20);
1621 assert_eq!(final_failure, 7);
1622 assert_eq!(final_success, 13);
1623 }
1624
1625 #[tokio::test(flavor = "multi_thread")]
1626 async fn test_handle_clone_isolation_concurrent_spawns() {
1627 let config = BackgroundTaskConfig {
1628 max_queue_size: 100,
1629 max_concurrent_tasks: 4,
1630 drain_timeout_secs: 10,
1631 };
1632
1633 let runtime = BackgroundRuntime::start(config).await;
1634 let handle = runtime.handle();
1635
1636 let completion_counters: Vec<Arc<AtomicU64>> = (0..5).map(|_| Arc::new(AtomicU64::new(0))).collect();
1637
1638 let mut join_handles = vec![];
1639
1640 for worker_id in 0..5 {
1641 let h = handle.clone();
1642 let counter = completion_counters[worker_id].clone();
1643
1644 let jh = tokio::spawn(async move {
1645 for task_id in 0..10 {
1646 let c = counter.clone();
1647 let _ = h.spawn(move || {
1648 let cnt = c.clone();
1649 async move {
1650 let delay = (worker_id as u64 * 10 + task_id as u64) % 50;
1651 tokio::time::sleep(Duration::from_millis(delay)).await;
1652 cnt.fetch_add(1, Ordering::SeqCst);
1653 Ok(())
1654 }
1655 });
1656 }
1657 });
1658 join_handles.push(jh);
1659 }
1660
1661 for jh in join_handles {
1662 let _ = jh.await;
1663 }
1664
1665 runtime.shutdown().await.unwrap();
1666
1667 for (i, counter) in completion_counters.iter().enumerate() {
1668 assert_eq!(
1669 counter.load(Ordering::SeqCst),
1670 10,
1671 "worker {} should have exactly 10 task completions",
1672 i
1673 );
1674 }
1675 }
1676
1677 #[tokio::test]
1678 async fn test_background_job_error_with_string_slice() {
1679 let errors = vec![
1680 BackgroundJobError::from("simple error"),
1681 BackgroundJobError::from(String::from("owned string error")),
1682 ];
1683
1684 for error in errors {
1685 assert!(!error.message.is_empty());
1686 }
1687 }
1688
1689 #[tokio::test]
1690 async fn test_spawn_error_display_formatting() {
1691 let error = BackgroundSpawnError::QueueFull;
1692 let formatted = format!("{}", error);
1693 assert_eq!(formatted, "background task queue is full");
1694
1695 let result: Result<(), BackgroundSpawnError> = Err(error);
1696 assert!(result.is_err());
1697 }
1698
1699 #[tokio::test]
1700 async fn test_background_metrics_concurrent_increments() {
1701 let metrics = Arc::new(BackgroundMetrics::default());
1702 let mut handles = vec![];
1703
1704 for _ in 0..10 {
1705 let m = metrics.clone();
1706 let h = tokio::spawn(async move {
1707 for _ in 0..10 {
1708 m.inc_queued();
1709 m.inc_running();
1710 m.inc_failed();
1711 m.dec_queued();
1712 m.dec_running();
1713 }
1714 });
1715 handles.push(h);
1716 }
1717
1718 for h in handles {
1719 let _ = h.await;
1720 }
1721
1722 assert_eq!(metrics.queued.load(Ordering::Relaxed), 0);
1723 assert_eq!(metrics.running.load(Ordering::Relaxed), 0);
1724 assert_eq!(metrics.failed.load(Ordering::Relaxed), 100);
1725 }
1726
1727 #[tokio::test]
1728 async fn test_drain_phase_execution_with_lingering_senders() {
1729 let config = BackgroundTaskConfig {
1730 max_queue_size: 20,
1731 max_concurrent_tasks: 2,
1732 drain_timeout_secs: 10,
1733 };
1734
1735 let runtime = BackgroundRuntime::start(config).await;
1736 let handle = runtime.handle();
1737
1738 let executed = Arc::new(AtomicU64::new(0));
1739
1740 for _ in 0..5 {
1741 let e = executed.clone();
1742 handle
1743 .spawn(move || {
1744 let ex = e.clone();
1745 async move {
1746 tokio::time::sleep(Duration::from_millis(10)).await;
1747 ex.fetch_add(1, Ordering::SeqCst);
1748 Ok(())
1749 }
1750 })
1751 .unwrap();
1752 }
1753
1754 let result = runtime.shutdown().await;
1755 assert!(result.is_ok());
1756
1757 assert_eq!(executed.load(Ordering::SeqCst), 5);
1758 }
1759
1760 #[tokio::test]
1761 async fn test_concurrent_queue_status_transitions() {
1762 let config = BackgroundTaskConfig {
1763 max_queue_size: 10,
1764 max_concurrent_tasks: 2,
1765 drain_timeout_secs: 10,
1766 };
1767
1768 let runtime = BackgroundRuntime::start(config).await;
1769 let handle = runtime.handle();
1770
1771 let state_transitions = Arc::new(tokio::sync::Mutex::new(Vec::new()));
1772
1773 for i in 0..10 {
1774 let st = state_transitions.clone();
1775 handle
1776 .spawn(move || {
1777 let s = st.clone();
1778 async move {
1779 let mut transitions = s.lock().await;
1780 transitions.push(("spawned", i));
1781 drop(transitions);
1782
1783 tokio::time::sleep(Duration::from_millis(50)).await;
1784
1785 let mut transitions = s.lock().await;
1786 transitions.push(("completed", i));
1787 Ok(())
1788 }
1789 })
1790 .unwrap();
1791 }
1792
1793 tokio::time::sleep(Duration::from_millis(300)).await;
1794
1795 runtime.shutdown().await.unwrap();
1796
1797 let final_transitions = state_transitions.lock().await;
1798 let completed_count = final_transitions
1799 .iter()
1800 .filter(|(action, _)| *action == "completed")
1801 .count();
1802
1803 assert_eq!(completed_count, 10, "all tasks should complete");
1804 }
1805
1806 #[tokio::test(flavor = "multi_thread")]
1807 async fn test_semaphore_no_starvation_with_uneven_task_duration() {
1808 let config = BackgroundTaskConfig {
1809 max_queue_size: 100,
1810 max_concurrent_tasks: 2,
1811 drain_timeout_secs: 10,
1812 };
1813
1814 let runtime = BackgroundRuntime::start(config).await;
1815 let handle = runtime.handle();
1816
1817 let fast_executed = Arc::new(AtomicU64::new(0));
1818 let slow_executed = Arc::new(AtomicU64::new(0));
1819
1820 for _ in 0..5 {
1821 let s = slow_executed.clone();
1822 handle
1823 .spawn(move || {
1824 let slow = s.clone();
1825 async move {
1826 tokio::time::sleep(Duration::from_millis(100)).await;
1827 slow.fetch_add(1, Ordering::SeqCst);
1828 Ok(())
1829 }
1830 })
1831 .unwrap();
1832 }
1833
1834 tokio::time::sleep(Duration::from_millis(10)).await;
1835
1836 for _ in 0..5 {
1837 let f = fast_executed.clone();
1838 handle
1839 .spawn(move || {
1840 let fast = f.clone();
1841 async move {
1842 tokio::time::sleep(Duration::from_millis(10)).await;
1843 fast.fetch_add(1, Ordering::SeqCst);
1844 Ok(())
1845 }
1846 })
1847 .unwrap();
1848 }
1849
1850 runtime.shutdown().await.unwrap();
1851
1852 assert_eq!(
1853 fast_executed.load(Ordering::SeqCst),
1854 5,
1855 "fast tasks should not be starved"
1856 );
1857 assert_eq!(slow_executed.load(Ordering::SeqCst), 5, "slow tasks should execute");
1858 }
1859}