1use std::borrow::Cow;
2use std::sync::Arc;
3use std::time::Duration;
4
5use futures::FutureExt;
6use futures::future::BoxFuture;
7use tokio::sync::{Semaphore, mpsc};
8use tokio::task::JoinSet;
9use tokio::time::timeout;
10use tokio_util::sync::CancellationToken;
11
12#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
14#[serde(default)]
15pub struct BackgroundTaskConfig {
16 pub max_queue_size: usize,
17 pub max_concurrent_tasks: usize,
18 pub drain_timeout_secs: u64,
19}
20
21impl Default for BackgroundTaskConfig {
22 fn default() -> Self {
23 Self {
24 max_queue_size: 1024,
25 max_concurrent_tasks: 128,
26 drain_timeout_secs: 30,
27 }
28 }
29}
30
31#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
32pub struct BackgroundJobMetadata {
33 pub name: Cow<'static, str>,
34 pub request_id: Option<String>,
35}
36
37impl Default for BackgroundJobMetadata {
38 fn default() -> Self {
39 Self {
40 name: Cow::Borrowed("background_task"),
41 request_id: None,
42 }
43 }
44}
45
46pub(crate) type BackgroundJobFuture = BoxFuture<'static, Result<(), BackgroundJobError>>;
47
48struct BackgroundJob {
49 pub future: BackgroundJobFuture,
50 pub metadata: BackgroundJobMetadata,
51}
52
53impl BackgroundJob {
54 fn new<F>(future: F, metadata: BackgroundJobMetadata) -> Self
55 where
56 F: futures::Future<Output = Result<(), BackgroundJobError>> + Send + 'static,
57 {
58 Self {
59 future: future.boxed(),
60 metadata,
61 }
62 }
63}
64
65#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
66pub struct BackgroundJobError {
67 pub message: String,
68}
69
70impl From<String> for BackgroundJobError {
71 fn from(message: String) -> Self {
72 Self { message }
73 }
74}
75
76impl From<&str> for BackgroundJobError {
77 fn from(message: &str) -> Self {
78 Self {
79 message: message.to_string(),
80 }
81 }
82}
83
84#[derive(Debug, Clone)]
85pub enum BackgroundSpawnError {
86 QueueFull,
87}
88
89impl std::fmt::Display for BackgroundSpawnError {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 match self {
92 BackgroundSpawnError::QueueFull => write!(f, "background task queue is full"),
93 }
94 }
95}
96
97impl std::error::Error for BackgroundSpawnError {}
98
99#[derive(Debug)]
100pub struct BackgroundShutdownError;
101
102#[derive(Default, Debug)]
103struct BackgroundMetrics {
104 queued: std::sync::atomic::AtomicU64,
105 running: std::sync::atomic::AtomicU64,
106 failed: std::sync::atomic::AtomicU64,
107}
108
109impl BackgroundMetrics {
110 fn inc_queued(&self) {
111 self.queued.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
112 }
113
114 fn dec_queued(&self) {
115 self.queued.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
116 }
117
118 fn inc_running(&self) {
119 self.running.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
120 }
121
122 fn dec_running(&self) {
123 self.running.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
124 }
125
126 fn inc_failed(&self) {
127 self.failed.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
128 }
129}
130
131#[derive(Clone, Debug)]
132pub struct BackgroundHandle {
133 sender: mpsc::Sender<BackgroundJob>,
134 metrics: Arc<BackgroundMetrics>,
135}
136
137impl BackgroundHandle {
138 pub fn spawn<F, Fut>(&self, f: F) -> Result<(), BackgroundSpawnError>
139 where
140 F: FnOnce() -> Fut,
141 Fut: futures::Future<Output = Result<(), BackgroundJobError>> + Send + 'static,
142 {
143 let future = f();
144 self.spawn_with_metadata(future, BackgroundJobMetadata::default())
145 }
146
147 pub fn spawn_with_metadata<Fut>(
148 &self,
149 future: Fut,
150 metadata: BackgroundJobMetadata,
151 ) -> Result<(), BackgroundSpawnError>
152 where
153 Fut: futures::Future<Output = Result<(), BackgroundJobError>> + Send + 'static,
154 {
155 self.metrics.inc_queued();
156 let job = BackgroundJob::new(future, metadata);
157 self.sender.try_send(job).map_err(|_| {
158 self.metrics.dec_queued();
159 BackgroundSpawnError::QueueFull
160 })
161 }
162}
163
164pub struct BackgroundRuntime {
165 handle: BackgroundHandle,
166 drain_timeout: Duration,
167 shutdown_token: CancellationToken,
168 join_handle: tokio::task::JoinHandle<()>,
169}
170
171impl BackgroundRuntime {
172 pub async fn start(config: BackgroundTaskConfig) -> Self {
173 let (tx, rx) = mpsc::channel(config.max_queue_size);
174 let metrics = Arc::new(BackgroundMetrics::default());
175 let handle = BackgroundHandle {
176 sender: tx.clone(),
177 metrics: metrics.clone(),
178 };
179 let shutdown_token = CancellationToken::new();
180 let semaphore = Arc::new(Semaphore::new(config.max_concurrent_tasks));
181 let driver_token = shutdown_token.clone();
182
183 let join_handle = tokio::spawn(run_executor(rx, semaphore, metrics.clone(), driver_token));
184
185 Self {
186 handle,
187 drain_timeout: Duration::from_secs(config.drain_timeout_secs),
188 shutdown_token,
189 join_handle,
190 }
191 }
192
193 pub fn handle(&self) -> BackgroundHandle {
194 self.handle.clone()
195 }
196
197 pub async fn shutdown(self) -> Result<(), BackgroundShutdownError> {
198 self.shutdown_token.cancel();
199 drop(self.handle);
200 match timeout(self.drain_timeout, self.join_handle).await {
201 Ok(Ok(_)) => Ok(()),
202 _ => Err(BackgroundShutdownError),
203 }
204 }
205}
206
207async fn run_executor(
208 mut rx: mpsc::Receiver<BackgroundJob>,
209 semaphore: Arc<Semaphore>,
210 metrics: Arc<BackgroundMetrics>,
211 token: CancellationToken,
212) {
213 let mut join_set = JoinSet::new();
214 let token_clone = token.clone();
215
216 loop {
217 tokio::select! {
218 maybe_job = rx.recv() => {
219 match maybe_job {
220 Some(job) => {
221 metrics.dec_queued();
222 let semaphore = semaphore.clone();
223 let metrics_clone = metrics.clone();
224 join_set.spawn(async move {
225 let BackgroundJob { future, metadata } = job;
226 match semaphore.acquire_owned().await {
227 Ok(_permit) => {
228 metrics_clone.inc_running();
229 if let Err(err) = future.await {
230 metrics_clone.inc_failed();
231 tracing::error!(target = "spikard::background", task = %metadata.name, error = %err.message, "background task failed");
232 }
233 metrics_clone.dec_running();
234 }
235 Err(_) => {
236 metrics_clone.inc_failed();
237 tracing::warn!(target = "spikard::background", "failed to acquire semaphore permit for background task");
238 }
239 }
240 });
241 }
242 None => break,
243 }
244 }
245 _ = token_clone.cancelled() => {
246 break;
247 }
248 }
249 }
250
251 let mut drain_attempts = 0;
252 loop {
253 match rx.try_recv() {
254 Ok(job) => {
255 metrics.dec_queued();
256 let semaphore = semaphore.clone();
257 let metrics_clone = metrics.clone();
258 join_set.spawn(async move {
259 let BackgroundJob { future, metadata } = job;
260 match semaphore.acquire_owned().await {
261 Ok(_permit) => {
262 metrics_clone.inc_running();
263 if let Err(err) = future.await {
264 metrics_clone.inc_failed();
265 tracing::error!(target = "spikard::background", task = %metadata.name, error = %err.message, "background task failed");
266 }
267 metrics_clone.dec_running();
268 }
269 Err(_) => {
270 metrics_clone.inc_failed();
271 tracing::warn!(target = "spikard::background", "failed to acquire semaphore permit for background task");
272 }
273 }
274 });
275 drain_attempts = 0;
276 }
277 Err(mpsc::error::TryRecvError::Empty) => {
278 drain_attempts += 1;
279 if drain_attempts > 100 {
280 break;
281 }
282 tokio::time::sleep(Duration::from_millis(10)).await;
283 }
284 Err(mpsc::error::TryRecvError::Disconnected) => {
285 break;
286 }
287 }
288 }
289
290 while join_set.join_next().await.is_some() {}
291}
292
293#[cfg(test)]
294mod tests {
295 use super::*;
296 use std::sync::atomic::{AtomicU64, Ordering};
297
298 #[tokio::test]
299 async fn test_basic_spawn_and_execution() {
300 let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
301 let handle = runtime.handle();
302
303 let counter = Arc::new(AtomicU64::new(0));
304 let counter_clone = counter.clone();
305
306 handle
307 .spawn(move || {
308 let c = counter_clone.clone();
309 async move {
310 c.fetch_add(1, Ordering::SeqCst);
311 Ok(())
312 }
313 })
314 .expect("spawn failed");
315
316 tokio::time::sleep(Duration::from_millis(100)).await;
317 assert_eq!(counter.load(Ordering::SeqCst), 1);
318
319 runtime.shutdown().await.expect("shutdown failed");
320 }
321
322 #[tokio::test]
323 async fn test_multiple_tasks() {
324 let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
325 let handle = runtime.handle();
326
327 let counter = Arc::new(AtomicU64::new(0));
328
329 for _ in 0..10 {
330 let counter_clone = counter.clone();
331 handle
332 .spawn(move || {
333 let c = counter_clone.clone();
334 async move {
335 c.fetch_add(1, Ordering::SeqCst);
336 Ok(())
337 }
338 })
339 .expect("spawn failed");
340 }
341
342 tokio::time::sleep(Duration::from_millis(200)).await;
343 assert_eq!(counter.load(Ordering::SeqCst), 10);
344
345 runtime.shutdown().await.expect("shutdown failed");
346 }
347
348 #[tokio::test]
349 async fn test_task_with_metadata() {
350 let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
351 let handle = runtime.handle();
352
353 let metadata = BackgroundJobMetadata {
354 name: Cow::Owned("test_task".to_string()),
355 request_id: Some("req-123".to_string()),
356 };
357
358 let counter = Arc::new(AtomicU64::new(0));
359 let counter_clone = counter.clone();
360
361 let future = async move {
362 counter_clone.fetch_add(1, Ordering::SeqCst);
363 Ok(())
364 };
365
366 handle.spawn_with_metadata(future, metadata).expect("spawn failed");
367
368 tokio::time::sleep(Duration::from_millis(100)).await;
369 assert_eq!(counter.load(Ordering::SeqCst), 1);
370
371 runtime.shutdown().await.expect("shutdown failed");
372 }
373
374 #[tokio::test]
375 async fn test_queue_full_error() {
376 let config = BackgroundTaskConfig {
377 max_queue_size: 2,
378 max_concurrent_tasks: 10,
379 drain_timeout_secs: 5,
380 };
381
382 let runtime = BackgroundRuntime::start(config).await;
383 let handle = runtime.handle();
384
385 let blocking_barrier = Arc::new(tokio::sync::Barrier::new(3));
386
387 for _ in 0..2 {
388 let barrier = blocking_barrier.clone();
389 handle
390 .spawn(move || {
391 let b = barrier.clone();
392 async move {
393 b.wait().await;
394 tokio::time::sleep(Duration::from_secs(1)).await;
395 Ok(())
396 }
397 })
398 .expect("spawn failed");
399 }
400
401 let result = handle.spawn(move || async { Ok(()) });
402 assert!(matches!(result, Err(BackgroundSpawnError::QueueFull)));
403
404 blocking_barrier.wait().await;
405 tokio::time::sleep(Duration::from_millis(100)).await;
406
407 runtime.shutdown().await.expect("shutdown failed");
408 }
409
410 #[tokio::test]
411 async fn test_task_failure_handling() {
412 let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
413 let handle = runtime.handle();
414
415 let success_count = Arc::new(AtomicU64::new(0));
416 let success_count_clone = success_count.clone();
417
418 handle
419 .spawn(move || {
420 let s = success_count_clone.clone();
421 async move {
422 s.fetch_add(1, Ordering::SeqCst);
423 Err(BackgroundJobError::from("test error"))
424 }
425 })
426 .expect("spawn failed");
427
428 tokio::time::sleep(Duration::from_millis(100)).await;
429 assert_eq!(success_count.load(Ordering::SeqCst), 1);
430
431 runtime.shutdown().await.expect("shutdown failed");
432 }
433
434 #[tokio::test(flavor = "multi_thread")]
435 async fn test_concurrency_limit_with_proper_synchronization() {
436 let config = BackgroundTaskConfig {
437 max_queue_size: 100,
438 max_concurrent_tasks: 2,
439 drain_timeout_secs: 30,
440 };
441
442 let runtime = BackgroundRuntime::start(config).await;
443 let handle = runtime.handle();
444
445 let running_count = Arc::new(AtomicU64::new(0));
446 let max_concurrent = Arc::new(AtomicU64::new(0));
447
448 for _ in 0..5 {
449 let running = running_count.clone();
450 let max = max_concurrent.clone();
451
452 handle
453 .spawn(move || {
454 let r = running.clone();
455 let m = max.clone();
456 async move {
457 r.fetch_add(1, Ordering::SeqCst);
458 let current_running = r.load(Ordering::SeqCst);
459 let mut current_max = m.load(Ordering::SeqCst);
460 while current_running > current_max {
461 m.store(current_running, Ordering::SeqCst);
462 current_max = current_running;
463 }
464
465 tokio::time::sleep(Duration::from_millis(100)).await;
466 r.fetch_sub(1, Ordering::SeqCst);
467 Ok(())
468 }
469 })
470 .expect("spawn failed");
471 }
472
473 tokio::time::sleep(Duration::from_millis(700)).await;
474 let max_concurrent_observed = max_concurrent.load(Ordering::SeqCst);
475 assert!(
476 max_concurrent_observed <= 2,
477 "Max concurrent should be <= 2, but was {}",
478 max_concurrent_observed
479 );
480
481 runtime.shutdown().await.expect("shutdown failed");
482 }
483
484 #[tokio::test]
485 async fn test_graceful_shutdown() {
486 let config = BackgroundTaskConfig {
487 max_queue_size: 10,
488 max_concurrent_tasks: 2,
489 drain_timeout_secs: 5,
490 };
491
492 let runtime = BackgroundRuntime::start(config).await;
493 let handle = runtime.handle();
494
495 let counter = Arc::new(AtomicU64::new(0));
496 let counter_clone = counter.clone();
497
498 handle
499 .spawn(move || {
500 let c = counter_clone.clone();
501 async move {
502 tokio::time::sleep(Duration::from_millis(50)).await;
503 c.fetch_add(1, Ordering::SeqCst);
504 Ok(())
505 }
506 })
507 .expect("spawn failed");
508
509 tokio::time::sleep(Duration::from_millis(200)).await;
510
511 let result = runtime.shutdown().await;
512 assert!(result.is_ok());
513 assert_eq!(counter.load(Ordering::SeqCst), 1);
514 }
515
516 #[tokio::test]
517 async fn test_shutdown_timeout() {
518 let config = BackgroundTaskConfig {
519 max_queue_size: 10,
520 max_concurrent_tasks: 2,
521 drain_timeout_secs: 1,
522 };
523
524 let runtime = BackgroundRuntime::start(config).await;
525 let handle = runtime.handle();
526
527 handle
528 .spawn(|| async {
529 tokio::time::sleep(Duration::from_secs(5)).await;
530 Ok(())
531 })
532 .expect("spawn failed");
533
534 tokio::time::sleep(Duration::from_millis(100)).await;
535
536 let result = runtime.shutdown().await;
537 assert!(result.is_err());
538 }
539
540 #[tokio::test]
541 async fn test_metrics_tracking() {
542 let config = BackgroundTaskConfig::default();
543 let runtime = BackgroundRuntime::start(config).await;
544 let handle = runtime.handle();
545
546 let barrier = Arc::new(tokio::sync::Barrier::new(2));
547
548 for _ in 0..2 {
549 let b = barrier.clone();
550 let _ = handle.spawn(move || {
551 let barrier = b.clone();
552 async move {
553 barrier.wait().await;
554 Ok(())
555 }
556 });
557 }
558
559 tokio::time::sleep(Duration::from_millis(150)).await;
560
561 runtime.shutdown().await.expect("shutdown failed");
562 }
563
564 #[tokio::test]
565 async fn test_task_cancellation_on_shutdown() {
566 let config = BackgroundTaskConfig {
567 max_queue_size: 10,
568 max_concurrent_tasks: 2,
569 drain_timeout_secs: 1,
570 };
571
572 let runtime = BackgroundRuntime::start(config).await;
573 let handle = runtime.handle();
574
575 let started_count = Arc::new(AtomicU64::new(0));
576 let _completed_count = Arc::new(AtomicU64::new(0));
577
578 let started = started_count.clone();
579
580 handle
581 .spawn(move || {
582 let s = started.clone();
583 async move {
584 s.fetch_add(1, Ordering::SeqCst);
585 tokio::time::sleep(Duration::from_secs(10)).await;
586 Ok(())
587 }
588 })
589 .expect("spawn failed");
590
591 tokio::time::sleep(Duration::from_millis(100)).await;
592 assert_eq!(started_count.load(Ordering::SeqCst), 1);
593
594 let shutdown_start = std::time::Instant::now();
595 let result = runtime.shutdown().await;
596 let shutdown_elapsed = shutdown_start.elapsed();
597
598 assert!(result.is_err());
599 assert!(shutdown_elapsed < Duration::from_secs(3));
600 }
601
602 #[tokio::test]
603 async fn test_queue_overflow_multiple_spawns() {
604 let config = BackgroundTaskConfig {
605 max_queue_size: 3,
606 max_concurrent_tasks: 10,
607 drain_timeout_secs: 5,
608 };
609
610 let runtime = BackgroundRuntime::start(config).await;
611 let handle = runtime.handle();
612
613 let blocking_barrier = Arc::new(tokio::sync::Barrier::new(4));
614
615 for _ in 0..3 {
616 let b = blocking_barrier.clone();
617 handle
618 .spawn(move || {
619 let barrier = b.clone();
620 async move {
621 barrier.wait().await;
622 tokio::time::sleep(Duration::from_millis(100)).await;
623 Ok(())
624 }
625 })
626 .expect("spawn failed");
627 }
628
629 let result = handle.spawn(|| async { Ok(()) });
630 assert!(matches!(result, Err(BackgroundSpawnError::QueueFull)));
631
632 blocking_barrier.wait().await;
633 tokio::time::sleep(Duration::from_millis(200)).await;
634
635 let result = handle.spawn(|| async { Ok(()) });
636 assert!(result.is_ok());
637
638 runtime.shutdown().await.expect("shutdown failed");
639 }
640
641 #[tokio::test]
642 async fn test_concurrent_task_execution_order() {
643 let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
644 let handle = runtime.handle();
645
646 let execution_order = Arc::new(tokio::sync::Mutex::new(Vec::new()));
647
648 for i in 0..5 {
649 let order = execution_order.clone();
650 handle
651 .spawn(move || {
652 let o = order.clone();
653 async move {
654 o.lock().await.push(i);
655 Ok(())
656 }
657 })
658 .expect("spawn failed");
659 }
660
661 tokio::time::sleep(Duration::from_millis(200)).await;
662
663 let order = execution_order.lock().await;
664 assert_eq!(order.len(), 5);
665 for i in 0..5 {
666 assert!(order.contains(&i));
667 }
668
669 runtime.shutdown().await.expect("shutdown failed");
670 }
671
672 #[tokio::test]
673 async fn test_error_from_string_conversion() {
674 let error = BackgroundJobError::from("test message");
675 assert_eq!(error.message, "test message");
676
677 let error2 = BackgroundJobError::from("test".to_string());
678 assert_eq!(error2.message, "test");
679 }
680
681 #[tokio::test]
682 async fn test_background_job_metadata_default() {
683 let metadata = BackgroundJobMetadata::default();
684 assert_eq!(metadata.name, "background_task");
685 assert_eq!(metadata.request_id, None);
686 }
687
688 #[tokio::test]
689 async fn test_background_job_metadata_custom() {
690 let metadata = BackgroundJobMetadata {
691 name: Cow::Borrowed("custom_task"),
692 request_id: Some("req-456".to_string()),
693 };
694 assert_eq!(metadata.name, "custom_task");
695 assert_eq!(metadata.request_id, Some("req-456".to_string()));
696 }
697
698 #[tokio::test]
699 async fn test_metrics_inc_dec_operations() {
700 let metrics = BackgroundMetrics::default();
701
702 metrics.inc_queued();
703 assert_eq!(metrics.queued.load(Ordering::Relaxed), 1);
704
705 metrics.inc_queued();
706 assert_eq!(metrics.queued.load(Ordering::Relaxed), 2);
707
708 metrics.dec_queued();
709 assert_eq!(metrics.queued.load(Ordering::Relaxed), 1);
710
711 metrics.inc_running();
712 assert_eq!(metrics.running.load(Ordering::Relaxed), 1);
713
714 metrics.dec_running();
715 assert_eq!(metrics.running.load(Ordering::Relaxed), 0);
716
717 metrics.inc_failed();
718 assert_eq!(metrics.failed.load(Ordering::Relaxed), 1);
719
720 metrics.inc_failed();
721 assert_eq!(metrics.failed.load(Ordering::Relaxed), 2);
722 }
723
724 #[tokio::test]
725 async fn test_spawn_error_display() {
726 let error = BackgroundSpawnError::QueueFull;
727 assert_eq!(error.to_string(), "background task queue is full");
728 }
729
730 #[tokio::test]
731 async fn test_background_config_default() {
732 let config = BackgroundTaskConfig::default();
733 assert_eq!(config.max_queue_size, 1024);
734 assert_eq!(config.max_concurrent_tasks, 128);
735 assert_eq!(config.drain_timeout_secs, 30);
736 }
737
738 #[tokio::test]
739 async fn test_shutdown_with_zero_pending_tasks() {
740 let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
741
742 let result = runtime.shutdown().await;
743 assert!(result.is_ok(), "shutdown should succeed with no tasks");
744 }
745
746 #[tokio::test]
747 async fn test_shutdown_with_only_running_tasks() {
748 let config = BackgroundTaskConfig {
749 max_queue_size: 10,
750 max_concurrent_tasks: 2,
751 drain_timeout_secs: 5,
752 };
753 let runtime = BackgroundRuntime::start(config).await;
754 let handle = runtime.handle();
755
756 let execution_started: Arc<std::sync::atomic::AtomicBool> = Arc::new(std::sync::atomic::AtomicBool::new(false));
757 let execution_completed: Arc<std::sync::atomic::AtomicBool> =
758 Arc::new(std::sync::atomic::AtomicBool::new(false));
759
760 let started = execution_started.clone();
761 let completed = execution_completed.clone();
762
763 handle
764 .spawn(move || {
765 let s = started.clone();
766 let c = completed.clone();
767 async move {
768 s.store(true, std::sync::atomic::Ordering::SeqCst);
769 tokio::time::sleep(Duration::from_millis(100)).await;
770 c.store(true, std::sync::atomic::Ordering::SeqCst);
771 Ok(())
772 }
773 })
774 .unwrap();
775
776 tokio::time::sleep(Duration::from_millis(20)).await;
777
778 let result = runtime.shutdown().await;
779 assert!(result.is_ok(), "shutdown should succeed and wait for running tasks");
780 assert!(
781 execution_completed.load(std::sync::atomic::Ordering::SeqCst),
782 "task should have completed"
783 );
784 }
785
786 #[tokio::test]
787 async fn test_shutdown_drains_queued_tasks() {
788 let config = BackgroundTaskConfig {
789 max_queue_size: 100,
790 max_concurrent_tasks: 1,
791 drain_timeout_secs: 5,
792 };
793 let runtime = BackgroundRuntime::start(config).await;
794 let handle = runtime.handle();
795
796 let execution_count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
797
798 for _ in 0..10 {
799 let count = execution_count.clone();
800 handle
801 .spawn(move || {
802 let c = count.clone();
803 async move {
804 c.fetch_add(1, Ordering::SeqCst);
805 tokio::time::sleep(Duration::from_millis(10)).await;
806 Ok(())
807 }
808 })
809 .unwrap();
810 }
811
812 let result = runtime.shutdown().await;
813 assert!(result.is_ok());
814 assert_eq!(
815 execution_count.load(Ordering::SeqCst),
816 10,
817 "all queued tasks should execute"
818 );
819 }
820
821 #[tokio::test]
822 async fn test_shutdown_timeout_force_stops_long_tasks() {
823 let config = BackgroundTaskConfig {
824 max_queue_size: 10,
825 max_concurrent_tasks: 2,
826 drain_timeout_secs: 1,
827 };
828 let runtime = BackgroundRuntime::start(config).await;
829 let handle = runtime.handle();
830
831 let completed: Arc<std::sync::atomic::AtomicBool> = Arc::new(std::sync::atomic::AtomicBool::new(false));
832 let completed_clone = completed.clone();
833
834 handle
835 .spawn(move || {
836 let c = completed_clone.clone();
837 async move {
838 tokio::time::sleep(Duration::from_secs(10)).await;
839 c.store(true, std::sync::atomic::Ordering::SeqCst);
840 Ok(())
841 }
842 })
843 .unwrap();
844
845 tokio::time::sleep(Duration::from_millis(50)).await;
846
847 let shutdown_start = std::time::Instant::now();
848 let result = runtime.shutdown().await;
849 let elapsed = shutdown_start.elapsed();
850
851 assert!(result.is_err(), "shutdown should timeout");
852 assert!(
853 elapsed < Duration::from_secs(3),
854 "shutdown should timeout near drain_timeout"
855 );
856 assert!(
857 !completed.load(std::sync::atomic::Ordering::SeqCst),
858 "long-running task should not complete"
859 );
860 }
861
862 #[tokio::test]
863 async fn test_multiple_shutdown_calls_idempotent() {
864 let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
865
866 let result1 = runtime.shutdown().await;
867 assert!(result1.is_ok(), "first shutdown should succeed");
868 }
869
870 #[tokio::test]
871 async fn test_spawn_after_all_senders_dropped_fails() {
872 let config = BackgroundTaskConfig::default();
873 let runtime = BackgroundRuntime::start(config).await;
874 let handle = runtime.handle();
875
876 runtime.shutdown().await.expect("shutdown should succeed");
877
878 tokio::time::sleep(Duration::from_millis(50)).await;
879
880 let result = handle.spawn(|| async { Ok(()) });
881 assert!(result.is_err(), "spawn should fail after all senders are dropped");
882 }
883
884 #[tokio::test]
885 async fn test_concurrent_spawns_hit_semaphore_limit() {
886 let config = BackgroundTaskConfig {
887 max_queue_size: 100,
888 max_concurrent_tasks: 3,
889 drain_timeout_secs: 10,
890 };
891 let runtime = BackgroundRuntime::start(config).await;
892 let handle = runtime.handle();
893
894 let barrier: Arc<tokio::sync::Barrier> = Arc::new(tokio::sync::Barrier::new(3));
895 let running_count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
896 let peak_concurrent: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
897
898 for _ in 0..5 {
899 let b = barrier.clone();
900 let running = running_count.clone();
901 let peak = peak_concurrent.clone();
902
903 handle
904 .spawn(move || {
905 let barrier = b.clone();
906 let r = running.clone();
907 let p = peak.clone();
908 async move {
909 let current = r.fetch_add(1, Ordering::SeqCst) + 1;
910 let mut peak_val = p.load(Ordering::SeqCst);
911 while current > peak_val {
912 if p.compare_exchange(peak_val, current, Ordering::SeqCst, Ordering::SeqCst)
913 .is_ok()
914 {
915 break;
916 }
917 peak_val = p.load(Ordering::SeqCst);
918 }
919
920 barrier.wait().await;
921 tokio::time::sleep(Duration::from_millis(200)).await;
922 r.fetch_sub(1, Ordering::SeqCst);
923 Ok(())
924 }
925 })
926 .unwrap();
927 }
928
929 barrier.wait().await;
930 tokio::time::sleep(Duration::from_millis(100)).await;
931
932 let peak = peak_concurrent.load(Ordering::SeqCst);
933 assert!(
934 peak <= 3,
935 "concurrent execution should not exceed semaphore limit of 3, got {}",
936 peak
937 );
938
939 runtime.shutdown().await.unwrap();
940 }
941
942 #[tokio::test]
943 async fn test_task_panic_cleanup_still_occurs() {
944 let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
945 let handle = runtime.handle();
946
947 let mut spawned_count: u32 = 0;
948 let panic_task_executed: Arc<std::sync::atomic::AtomicBool> =
949 Arc::new(std::sync::atomic::AtomicBool::new(false));
950 let after_panic_executed: Arc<std::sync::atomic::AtomicBool> =
951 Arc::new(std::sync::atomic::AtomicBool::new(false));
952
953 let panic_flag = panic_task_executed.clone();
954 handle
955 .spawn(move || {
956 let p = panic_flag.clone();
957 async move {
958 p.store(true, std::sync::atomic::Ordering::SeqCst);
959 Err(BackgroundJobError::from("simulated task failure"))
960 }
961 })
962 .unwrap();
963 spawned_count += 1;
964
965 let after_flag = after_panic_executed.clone();
966 handle
967 .spawn(move || {
968 let a = after_flag.clone();
969 async move {
970 tokio::time::sleep(Duration::from_millis(50)).await;
971 a.store(true, std::sync::atomic::Ordering::SeqCst);
972 Ok(())
973 }
974 })
975 .unwrap();
976 spawned_count += 1;
977
978 tokio::time::sleep(Duration::from_millis(200)).await;
979
980 assert!(panic_task_executed.load(std::sync::atomic::Ordering::SeqCst));
981 assert!(after_panic_executed.load(std::sync::atomic::Ordering::SeqCst));
982 assert_eq!(spawned_count, 2);
983
984 runtime.shutdown().await.unwrap();
985 }
986
987 #[tokio::test]
988 async fn test_queue_overflow_with_immediate_rejection() {
989 let config = BackgroundTaskConfig {
990 max_queue_size: 2,
991 max_concurrent_tasks: 100,
992 drain_timeout_secs: 5,
993 };
994 let runtime = BackgroundRuntime::start(config).await;
995 let handle = runtime.handle();
996
997 let barrier: Arc<tokio::sync::Barrier> = Arc::new(tokio::sync::Barrier::new(3));
998
999 for _ in 0..2 {
1000 let b = barrier.clone();
1001 handle
1002 .spawn(move || {
1003 let barrier = b.clone();
1004 async move {
1005 barrier.wait().await;
1006 tokio::time::sleep(Duration::from_millis(500)).await;
1007 Ok(())
1008 }
1009 })
1010 .unwrap();
1011 }
1012
1013 let overflow_result = handle.spawn(|| async { Ok(()) });
1014 assert!(matches!(overflow_result, Err(BackgroundSpawnError::QueueFull)));
1015
1016 barrier.wait().await;
1017 runtime.shutdown().await.unwrap();
1018 }
1019
1020 #[tokio::test]
1021 async fn test_metrics_accuracy_under_concurrent_load() {
1022 let config = BackgroundTaskConfig {
1023 max_queue_size: 50,
1024 max_concurrent_tasks: 5,
1025 drain_timeout_secs: 10,
1026 };
1027 let runtime = BackgroundRuntime::start(config).await;
1028 let handle = runtime.handle();
1029
1030 let completed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1031
1032 for _ in 0..20 {
1033 let c = completed.clone();
1034 handle
1035 .spawn(move || {
1036 let count = c.clone();
1037 async move {
1038 tokio::time::sleep(Duration::from_millis(50)).await;
1039 count.fetch_add(1, Ordering::SeqCst);
1040 Ok(())
1041 }
1042 })
1043 .unwrap();
1044 }
1045
1046 runtime.shutdown().await.unwrap();
1047 assert_eq!(completed.load(Ordering::SeqCst), 20, "all tasks should complete");
1048 }
1049
1050 #[tokio::test]
1051 async fn test_drain_with_slowly_completing_tasks() {
1052 let config = BackgroundTaskConfig {
1053 max_queue_size: 50,
1054 max_concurrent_tasks: 2,
1055 drain_timeout_secs: 10,
1056 };
1057 let runtime = BackgroundRuntime::start(config).await;
1058 let handle = runtime.handle();
1059
1060 let completed_count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1061
1062 for i in 0..5 {
1063 let count = completed_count.clone();
1064 handle
1065 .spawn(move || {
1066 let c = count.clone();
1067 async move {
1068 let sleep_ms = 100 + (i as u64 * 50);
1069 tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
1070 c.fetch_add(1, Ordering::SeqCst);
1071 Ok(())
1072 }
1073 })
1074 .unwrap();
1075 }
1076
1077 let result = runtime.shutdown().await;
1078 assert!(result.is_ok());
1079 assert_eq!(completed_count.load(Ordering::SeqCst), 5);
1080 }
1081
1082 #[tokio::test]
1083 async fn test_semaphore_starvation_doesnt_deadlock() {
1084 let config = BackgroundTaskConfig {
1085 max_queue_size: 100,
1086 max_concurrent_tasks: 1,
1087 drain_timeout_secs: 10,
1088 };
1089 let runtime = BackgroundRuntime::start(config).await;
1090 let handle = runtime.handle();
1091
1092 let completion_order: Arc<tokio::sync::Mutex<Vec<u32>>> = Arc::new(tokio::sync::Mutex::new(Vec::new()));
1093
1094 for i in 0..10 {
1095 let order = completion_order.clone();
1096 handle
1097 .spawn(move || {
1098 let o = order.clone();
1099 async move {
1100 tokio::time::sleep(Duration::from_millis(5)).await;
1101 let mut guard = o.lock().await;
1102 guard.push(i);
1103 Ok(())
1104 }
1105 })
1106 .unwrap();
1107 }
1108
1109 let result = runtime.shutdown().await;
1110 assert!(result.is_ok());
1111
1112 let order = completion_order.lock().await;
1113 assert_eq!(order.len(), 10);
1114 }
1115
1116 #[tokio::test]
1117 async fn test_cancel_task_mid_execution() {
1118 let config = BackgroundTaskConfig {
1119 max_queue_size: 10,
1120 max_concurrent_tasks: 2,
1121 drain_timeout_secs: 1,
1122 };
1123 let runtime = BackgroundRuntime::start(config).await;
1124 let handle = runtime.handle();
1125
1126 let started: Arc<std::sync::atomic::AtomicBool> = Arc::new(std::sync::atomic::AtomicBool::new(false));
1127 let ended: Arc<std::sync::atomic::AtomicBool> = Arc::new(std::sync::atomic::AtomicBool::new(false));
1128
1129 let start_flag = started.clone();
1130 let end_flag = ended.clone();
1131
1132 handle
1133 .spawn(move || {
1134 let s = start_flag.clone();
1135 let e = end_flag.clone();
1136 async move {
1137 s.store(true, std::sync::atomic::Ordering::SeqCst);
1138 tokio::time::sleep(Duration::from_secs(10)).await;
1139 e.store(true, std::sync::atomic::Ordering::SeqCst);
1140 Ok(())
1141 }
1142 })
1143 .unwrap();
1144
1145 tokio::time::sleep(Duration::from_millis(50)).await;
1146 assert!(started.load(std::sync::atomic::Ordering::SeqCst));
1147
1148 let result = runtime.shutdown().await;
1149 assert!(result.is_err(), "shutdown should timeout due to long task");
1150 assert!(
1151 !ended.load(std::sync::atomic::Ordering::SeqCst),
1152 "task should not complete"
1153 );
1154 }
1155
1156 #[tokio::test]
1157 async fn test_rapid_spawn_and_shutdown() {
1158 let config = BackgroundTaskConfig {
1159 max_queue_size: 1000,
1160 max_concurrent_tasks: 10,
1161 drain_timeout_secs: 5,
1162 };
1163 let runtime = BackgroundRuntime::start(config).await;
1164 let handle = runtime.handle();
1165
1166 let count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1167
1168 for _ in 0..100 {
1169 let c = count.clone();
1170 let _ = handle.spawn(move || {
1171 let counter = c.clone();
1172 async move {
1173 counter.fetch_add(1, Ordering::SeqCst);
1174 Ok(())
1175 }
1176 });
1177 }
1178
1179 let result = runtime.shutdown().await;
1180 assert!(result.is_ok());
1181
1182 let final_count = count.load(Ordering::SeqCst);
1183 assert!(final_count > 0, "at least some tasks should execute");
1184 assert!(final_count <= 100, "no more than spawned count should execute");
1185 }
1186
1187 #[tokio::test]
1188 async fn test_shutdown_with_mixed_success_and_failure_tasks() {
1189 let config = BackgroundTaskConfig::default();
1190 let runtime = BackgroundRuntime::start(config).await;
1191 let handle = runtime.handle();
1192
1193 let success_count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1194 let failure_count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1195
1196 for i in 0..10 {
1197 if i % 2 == 0 {
1198 let s = success_count.clone();
1199 handle
1200 .spawn(move || {
1201 let counter = s.clone();
1202 async move {
1203 counter.fetch_add(1, Ordering::SeqCst);
1204 Ok(())
1205 }
1206 })
1207 .unwrap();
1208 } else {
1209 let f = failure_count.clone();
1210 handle
1211 .spawn(move || {
1212 let counter = f.clone();
1213 async move {
1214 counter.fetch_add(1, Ordering::SeqCst);
1215 Err(BackgroundJobError::from("intentional failure"))
1216 }
1217 })
1218 .unwrap();
1219 }
1220 }
1221
1222 tokio::time::sleep(Duration::from_millis(200)).await;
1223
1224 let result = runtime.shutdown().await;
1225 assert!(result.is_ok());
1226 assert_eq!(success_count.load(Ordering::SeqCst), 5);
1227 assert_eq!(failure_count.load(Ordering::SeqCst), 5);
1228 }
1229
1230 #[tokio::test]
1231 async fn test_concurrent_handle_clones_spawn_independently() {
1232 let config = BackgroundTaskConfig::default();
1233 let runtime = BackgroundRuntime::start(config).await;
1234 let handle = runtime.handle();
1235
1236 let count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1237
1238 let mut join_handles = vec![];
1239
1240 for _ in 0..3 {
1241 let h = handle.clone();
1242 let c = count.clone();
1243
1244 let jh = tokio::spawn(async move {
1245 for _ in 0..5 {
1246 let counter = c.clone();
1247 let _ = h.spawn(move || {
1248 let cnt = counter.clone();
1249 async move {
1250 cnt.fetch_add(1, Ordering::SeqCst);
1251 Ok(())
1252 }
1253 });
1254 }
1255 });
1256 join_handles.push(jh);
1257 }
1258
1259 for jh in join_handles {
1260 let _ = jh.await;
1261 }
1262
1263 tokio::time::sleep(Duration::from_millis(200)).await;
1264
1265 let result = runtime.shutdown().await;
1266 assert!(result.is_ok());
1267 assert_eq!(count.load(Ordering::SeqCst), 15);
1268 }
1269
1270 #[tokio::test]
1271 async fn test_queue_full_metrics_updated() {
1272 let config = BackgroundTaskConfig {
1273 max_queue_size: 2,
1274 max_concurrent_tasks: 100,
1275 drain_timeout_secs: 5,
1276 };
1277 let runtime = BackgroundRuntime::start(config).await;
1278 let handle = runtime.handle();
1279
1280 let barrier: Arc<tokio::sync::Barrier> = Arc::new(tokio::sync::Barrier::new(3));
1281
1282 for _ in 0..2 {
1283 let b = barrier.clone();
1284 handle
1285 .spawn(move || {
1286 let barrier = b.clone();
1287 async move {
1288 barrier.wait().await;
1289 tokio::time::sleep(Duration::from_secs(1)).await;
1290 Ok(())
1291 }
1292 })
1293 .unwrap();
1294 }
1295
1296 let result = handle.spawn(|| async { Ok(()) });
1297 assert!(matches!(result, Err(BackgroundSpawnError::QueueFull)));
1298
1299 barrier.wait().await;
1300 tokio::time::sleep(Duration::from_millis(100)).await;
1301
1302 runtime.shutdown().await.unwrap();
1303 }
1304
1305 #[tokio::test]
1306 async fn test_handle_persistence_across_spawns() {
1307 let config = BackgroundTaskConfig::default();
1308 let runtime = BackgroundRuntime::start(config).await;
1309 let handle = runtime.handle();
1310
1311 let count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1312
1313 for _ in 0..5 {
1314 let c = count.clone();
1315 handle
1316 .spawn(move || {
1317 let counter = c.clone();
1318 async move {
1319 counter.fetch_add(1, Ordering::SeqCst);
1320 Ok(())
1321 }
1322 })
1323 .unwrap();
1324 }
1325
1326 tokio::time::sleep(Duration::from_millis(150)).await;
1327 assert_eq!(count.load(Ordering::SeqCst), 5);
1328
1329 runtime.shutdown().await.unwrap();
1330 }
1331
1332 #[tokio::test]
1333 async fn test_shutdown_with_queue_at_capacity() {
1334 let config = BackgroundTaskConfig {
1335 max_queue_size: 5,
1336 max_concurrent_tasks: 1,
1337 drain_timeout_secs: 10,
1338 };
1339 let runtime = BackgroundRuntime::start(config).await;
1340 let handle = runtime.handle();
1341
1342 let completion_count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1343
1344 for _ in 0..5 {
1345 let c = completion_count.clone();
1346 handle
1347 .spawn(move || {
1348 let counter = c.clone();
1349 async move {
1350 tokio::time::sleep(Duration::from_millis(20)).await;
1351 counter.fetch_add(1, Ordering::SeqCst);
1352 Ok(())
1353 }
1354 })
1355 .unwrap();
1356 }
1357
1358 let result = runtime.shutdown().await;
1359 assert!(result.is_ok());
1360 assert_eq!(completion_count.load(Ordering::SeqCst), 5);
1361 }
1362
1363 #[tokio::test]
1364 async fn test_metadata_preserved_through_execution() {
1365 let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
1366 let handle = runtime.handle();
1367
1368 let metadata = BackgroundJobMetadata {
1369 name: Cow::Owned("test_metadata_task".to_string()),
1370 request_id: Some("req-metadata-123".to_string()),
1371 };
1372
1373 let executed: Arc<std::sync::atomic::AtomicBool> = Arc::new(std::sync::atomic::AtomicBool::new(false));
1374 let executed_clone = executed.clone();
1375
1376 let future = async move {
1377 executed_clone.store(true, std::sync::atomic::Ordering::SeqCst);
1378 Ok(())
1379 };
1380
1381 handle.spawn_with_metadata(future, metadata).unwrap();
1382
1383 tokio::time::sleep(Duration::from_millis(100)).await;
1384
1385 assert!(executed.load(std::sync::atomic::Ordering::SeqCst));
1386 runtime.shutdown().await.unwrap();
1387 }
1388
1389 #[tokio::test]
1390 async fn test_very_short_drain_timeout_forces_stop() {
1391 let config = BackgroundTaskConfig {
1392 max_queue_size: 10,
1393 max_concurrent_tasks: 2,
1394 drain_timeout_secs: 0,
1395 };
1396 let runtime = BackgroundRuntime::start(config).await;
1397 let handle = runtime.handle();
1398
1399 handle
1400 .spawn(|| async {
1401 tokio::time::sleep(Duration::from_secs(1)).await;
1402 Ok(())
1403 })
1404 .unwrap();
1405
1406 tokio::time::sleep(Duration::from_millis(10)).await;
1407
1408 let result = runtime.shutdown().await;
1409 assert!(result.is_err());
1410 }
1411
1412 #[tokio::test]
1413 async fn test_spawn_many_tasks_sequential_drain() {
1414 let config = BackgroundTaskConfig {
1415 max_queue_size: 200,
1416 max_concurrent_tasks: 2,
1417 drain_timeout_secs: 15,
1418 };
1419 let runtime = BackgroundRuntime::start(config).await;
1420 let handle = runtime.handle();
1421
1422 let count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1423
1424 for _ in 0..50 {
1425 let c = count.clone();
1426 handle
1427 .spawn(move || {
1428 let counter = c.clone();
1429 async move {
1430 tokio::time::sleep(Duration::from_millis(1)).await;
1431 counter.fetch_add(1, Ordering::SeqCst);
1432 Ok(())
1433 }
1434 })
1435 .unwrap();
1436 }
1437
1438 let result = runtime.shutdown().await;
1439 assert!(result.is_ok());
1440 assert_eq!(count.load(Ordering::SeqCst), 50);
1441 }
1442
1443 #[tokio::test]
1444 async fn test_no_deadlock_with_max_concurrency_barrier() {
1445 let config = BackgroundTaskConfig {
1446 max_queue_size: 100,
1447 max_concurrent_tasks: 3,
1448 drain_timeout_secs: 10,
1449 };
1450 let runtime = BackgroundRuntime::start(config).await;
1451 let handle = runtime.handle();
1452
1453 let barrier: Arc<tokio::sync::Barrier> = Arc::new(tokio::sync::Barrier::new(4));
1454
1455 for _ in 0..3 {
1456 let b = barrier.clone();
1457 handle
1458 .spawn(move || {
1459 let barrier = b.clone();
1460 async move {
1461 barrier.wait().await;
1462 tokio::time::sleep(Duration::from_millis(50)).await;
1463 Ok(())
1464 }
1465 })
1466 .unwrap();
1467 }
1468
1469 barrier.wait().await;
1470 tokio::time::sleep(Duration::from_millis(100)).await;
1471
1472 let result = runtime.shutdown().await;
1473 assert!(result.is_ok());
1474 }
1475
1476 #[tokio::test]
1477 async fn test_error_from_owned_string() {
1478 let message = String::from("error message");
1479 let error = BackgroundJobError::from(message);
1480 assert_eq!(error.message, "error message");
1481 }
1482
1483 #[tokio::test]
1484 async fn test_borrowed_str_conversion() {
1485 let error = BackgroundJobError::from("borrowed message");
1486 assert_eq!(error.message, "borrowed message");
1487 }
1488
1489 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1490 async fn test_phase1_semaphore_acquisition_with_concurrent_load() {
1491 let config = BackgroundTaskConfig {
1492 max_queue_size: 50,
1493 max_concurrent_tasks: 1,
1494 drain_timeout_secs: 10,
1495 };
1496
1497 let runtime = BackgroundRuntime::start(config).await;
1498 let handle = runtime.handle();
1499
1500 let execution_count = Arc::new(AtomicU64::new(0));
1501
1502 let blocking_barrier = Arc::new(tokio::sync::Barrier::new(2));
1503 let barrier_clone = blocking_barrier.clone();
1504
1505 handle
1506 .spawn(move || {
1507 let b = barrier_clone.clone();
1508 async move {
1509 b.wait().await;
1510 tokio::time::sleep(Duration::from_millis(100)).await;
1511 Ok(())
1512 }
1513 })
1514 .unwrap();
1515
1516 tokio::time::sleep(Duration::from_millis(50)).await;
1517
1518 for _ in 0..3 {
1519 let exec_clone = execution_count.clone();
1520 handle
1521 .spawn(move || {
1522 let e = exec_clone.clone();
1523 async move {
1524 e.fetch_add(1, Ordering::SeqCst);
1525 Ok(())
1526 }
1527 })
1528 .unwrap();
1529 }
1530
1531 blocking_barrier.wait().await;
1532 tokio::time::sleep(Duration::from_millis(250)).await;
1533
1534 assert_eq!(execution_count.load(Ordering::SeqCst), 3);
1535
1536 runtime.shutdown().await.unwrap();
1537 }
1538
1539 #[tokio::test(flavor = "multi_thread")]
1540 async fn test_concurrent_task_completion_race_conditions() {
1541 let config = BackgroundTaskConfig {
1542 max_queue_size: 100,
1543 max_concurrent_tasks: 8,
1544 drain_timeout_secs: 10,
1545 };
1546
1547 let runtime = BackgroundRuntime::start(config).await;
1548 let handle = runtime.handle();
1549
1550 let completion_counter = Arc::new(AtomicU64::new(0));
1551 let task_count = 50;
1552
1553 for i in 0..task_count {
1554 let counter = completion_counter.clone();
1555 handle
1556 .spawn(move || {
1557 let c = counter.clone();
1558 async move {
1559 let sleep_ms = (i as u64 * 11) % 100;
1560 tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
1561 c.fetch_add(1, Ordering::SeqCst);
1562 Ok(())
1563 }
1564 })
1565 .unwrap();
1566 }
1567
1568 runtime.shutdown().await.unwrap();
1569 assert_eq!(
1570 completion_counter.load(Ordering::SeqCst),
1571 task_count,
1572 "all tasks should complete despite race conditions"
1573 );
1574 }
1575
1576 #[tokio::test(flavor = "multi_thread")]
1577 async fn test_failure_metric_tracking_under_concurrent_errors() {
1578 let config = BackgroundTaskConfig {
1579 max_queue_size: 50,
1580 max_concurrent_tasks: 5,
1581 drain_timeout_secs: 10,
1582 };
1583
1584 let runtime = BackgroundRuntime::start(config).await;
1585 let handle = runtime.handle();
1586
1587 let success_count = Arc::new(AtomicU64::new(0));
1588 let failure_count = Arc::new(AtomicU64::new(0));
1589
1590 for i in 0..20 {
1591 if i % 3 == 0 {
1592 let fail_clone = failure_count.clone();
1593 handle
1594 .spawn(move || {
1595 let f = fail_clone.clone();
1596 async move {
1597 f.fetch_add(1, Ordering::SeqCst);
1598 Err(BackgroundJobError::from("intentional failure"))
1599 }
1600 })
1601 .unwrap();
1602 } else {
1603 let succ_clone = success_count.clone();
1604 handle
1605 .spawn(move || {
1606 let s = succ_clone.clone();
1607 async move {
1608 s.fetch_add(1, Ordering::SeqCst);
1609 Ok(())
1610 }
1611 })
1612 .unwrap();
1613 }
1614 }
1615
1616 runtime.shutdown().await.unwrap();
1617
1618 let final_success = success_count.load(Ordering::SeqCst);
1619 let final_failure = failure_count.load(Ordering::SeqCst);
1620
1621 assert_eq!(final_success + final_failure, 20);
1622 assert_eq!(final_failure, 7);
1623 assert_eq!(final_success, 13);
1624 }
1625
1626 #[tokio::test(flavor = "multi_thread")]
1627 async fn test_handle_clone_isolation_concurrent_spawns() {
1628 let config = BackgroundTaskConfig {
1629 max_queue_size: 100,
1630 max_concurrent_tasks: 4,
1631 drain_timeout_secs: 10,
1632 };
1633
1634 let runtime = BackgroundRuntime::start(config).await;
1635 let handle = runtime.handle();
1636
1637 let completion_counters: Vec<Arc<AtomicU64>> = (0..5).map(|_| Arc::new(AtomicU64::new(0))).collect();
1638
1639 let mut join_handles = vec![];
1640
1641 for worker_id in 0..5 {
1642 let h = handle.clone();
1643 let counter = completion_counters[worker_id].clone();
1644
1645 let jh = tokio::spawn(async move {
1646 for task_id in 0..10 {
1647 let c = counter.clone();
1648 let _ = h.spawn(move || {
1649 let cnt = c.clone();
1650 async move {
1651 let delay = (worker_id as u64 * 10 + task_id as u64) % 50;
1652 tokio::time::sleep(Duration::from_millis(delay)).await;
1653 cnt.fetch_add(1, Ordering::SeqCst);
1654 Ok(())
1655 }
1656 });
1657 }
1658 });
1659 join_handles.push(jh);
1660 }
1661
1662 for jh in join_handles {
1663 let _ = jh.await;
1664 }
1665
1666 runtime.shutdown().await.unwrap();
1667
1668 for (i, counter) in completion_counters.iter().enumerate() {
1669 assert_eq!(
1670 counter.load(Ordering::SeqCst),
1671 10,
1672 "worker {} should have exactly 10 task completions",
1673 i
1674 );
1675 }
1676 }
1677
1678 #[tokio::test]
1679 async fn test_background_job_error_with_string_slice() {
1680 let errors = vec![
1681 BackgroundJobError::from("simple error"),
1682 BackgroundJobError::from(String::from("owned string error")),
1683 ];
1684
1685 for error in errors {
1686 assert!(!error.message.is_empty());
1687 }
1688 }
1689
1690 #[tokio::test]
1691 async fn test_spawn_error_display_formatting() {
1692 let error = BackgroundSpawnError::QueueFull;
1693 let formatted = format!("{}", error);
1694 assert_eq!(formatted, "background task queue is full");
1695
1696 let result: Result<(), BackgroundSpawnError> = Err(error);
1697 assert!(result.is_err());
1698 }
1699
1700 #[tokio::test]
1701 async fn test_background_metrics_concurrent_increments() {
1702 let metrics = Arc::new(BackgroundMetrics::default());
1703 let mut handles = vec![];
1704
1705 for _ in 0..10 {
1706 let m = metrics.clone();
1707 let h = tokio::spawn(async move {
1708 for _ in 0..10 {
1709 m.inc_queued();
1710 m.inc_running();
1711 m.inc_failed();
1712 m.dec_queued();
1713 m.dec_running();
1714 }
1715 });
1716 handles.push(h);
1717 }
1718
1719 for h in handles {
1720 let _ = h.await;
1721 }
1722
1723 assert_eq!(metrics.queued.load(Ordering::Relaxed), 0);
1724 assert_eq!(metrics.running.load(Ordering::Relaxed), 0);
1725 assert_eq!(metrics.failed.load(Ordering::Relaxed), 100);
1726 }
1727
1728 #[tokio::test]
1729 async fn test_drain_phase_execution_with_lingering_senders() {
1730 let config = BackgroundTaskConfig {
1731 max_queue_size: 20,
1732 max_concurrent_tasks: 2,
1733 drain_timeout_secs: 10,
1734 };
1735
1736 let runtime = BackgroundRuntime::start(config).await;
1737 let handle = runtime.handle();
1738
1739 let executed = Arc::new(AtomicU64::new(0));
1740
1741 for _ in 0..5 {
1742 let e = executed.clone();
1743 handle
1744 .spawn(move || {
1745 let ex = e.clone();
1746 async move {
1747 tokio::time::sleep(Duration::from_millis(10)).await;
1748 ex.fetch_add(1, Ordering::SeqCst);
1749 Ok(())
1750 }
1751 })
1752 .unwrap();
1753 }
1754
1755 let result = runtime.shutdown().await;
1756 assert!(result.is_ok());
1757
1758 assert_eq!(executed.load(Ordering::SeqCst), 5);
1759 }
1760
1761 #[tokio::test]
1762 async fn test_concurrent_queue_status_transitions() {
1763 let config = BackgroundTaskConfig {
1764 max_queue_size: 10,
1765 max_concurrent_tasks: 2,
1766 drain_timeout_secs: 10,
1767 };
1768
1769 let runtime = BackgroundRuntime::start(config).await;
1770 let handle = runtime.handle();
1771
1772 let state_transitions = Arc::new(tokio::sync::Mutex::new(Vec::new()));
1773
1774 for i in 0..10 {
1775 let st = state_transitions.clone();
1776 handle
1777 .spawn(move || {
1778 let s = st.clone();
1779 async move {
1780 let mut transitions = s.lock().await;
1781 transitions.push(("spawned", i));
1782 drop(transitions);
1783
1784 tokio::time::sleep(Duration::from_millis(50)).await;
1785
1786 let mut transitions = s.lock().await;
1787 transitions.push(("completed", i));
1788 Ok(())
1789 }
1790 })
1791 .unwrap();
1792 }
1793
1794 tokio::time::sleep(Duration::from_millis(300)).await;
1795
1796 runtime.shutdown().await.unwrap();
1797
1798 let final_transitions = state_transitions.lock().await;
1799 let completed_count = final_transitions
1800 .iter()
1801 .filter(|(action, _)| *action == "completed")
1802 .count();
1803
1804 assert_eq!(completed_count, 10, "all tasks should complete");
1805 }
1806
1807 #[tokio::test(flavor = "multi_thread")]
1808 async fn test_semaphore_no_starvation_with_uneven_task_duration() {
1809 let config = BackgroundTaskConfig {
1810 max_queue_size: 100,
1811 max_concurrent_tasks: 2,
1812 drain_timeout_secs: 10,
1813 };
1814
1815 let runtime = BackgroundRuntime::start(config).await;
1816 let handle = runtime.handle();
1817
1818 let fast_executed = Arc::new(AtomicU64::new(0));
1819 let slow_executed = Arc::new(AtomicU64::new(0));
1820
1821 for _ in 0..5 {
1822 let s = slow_executed.clone();
1823 handle
1824 .spawn(move || {
1825 let slow = s.clone();
1826 async move {
1827 tokio::time::sleep(Duration::from_millis(100)).await;
1828 slow.fetch_add(1, Ordering::SeqCst);
1829 Ok(())
1830 }
1831 })
1832 .unwrap();
1833 }
1834
1835 tokio::time::sleep(Duration::from_millis(10)).await;
1836
1837 for _ in 0..5 {
1838 let f = fast_executed.clone();
1839 handle
1840 .spawn(move || {
1841 let fast = f.clone();
1842 async move {
1843 tokio::time::sleep(Duration::from_millis(10)).await;
1844 fast.fetch_add(1, Ordering::SeqCst);
1845 Ok(())
1846 }
1847 })
1848 .unwrap();
1849 }
1850
1851 runtime.shutdown().await.unwrap();
1852
1853 assert_eq!(
1854 fast_executed.load(Ordering::SeqCst),
1855 5,
1856 "fast tasks should not be starved"
1857 );
1858 assert_eq!(slow_executed.load(Ordering::SeqCst), 5, "slow tasks should execute");
1859 }
1860}