1use std::borrow::Cow;
2use std::sync::Arc;
3use std::time::Duration;
4
5use futures::FutureExt;
6use futures::future::BoxFuture;
7use tokio::sync::{Semaphore, mpsc};
8use tokio::task::JoinSet;
9use tokio::time::timeout;
10use tokio_util::sync::CancellationToken;
11
12#[derive(Clone, Debug)]
14pub struct BackgroundTaskConfig {
15 pub max_queue_size: usize,
16 pub max_concurrent_tasks: usize,
17 pub drain_timeout_secs: u64,
18}
19
20impl Default for BackgroundTaskConfig {
21 fn default() -> Self {
22 Self {
23 max_queue_size: 1024,
24 max_concurrent_tasks: 128,
25 drain_timeout_secs: 30,
26 }
27 }
28}
29
30#[derive(Clone, Debug)]
31pub struct BackgroundJobMetadata {
32 pub name: Cow<'static, str>,
33 pub request_id: Option<String>,
34}
35
36impl Default for BackgroundJobMetadata {
37 fn default() -> Self {
38 Self {
39 name: Cow::Borrowed("background_task"),
40 request_id: None,
41 }
42 }
43}
44
45pub type BackgroundJobFuture = BoxFuture<'static, Result<(), BackgroundJobError>>;
46
47struct BackgroundJob {
48 pub future: BackgroundJobFuture,
49 pub metadata: BackgroundJobMetadata,
50}
51
52impl BackgroundJob {
53 fn new<F>(future: F, metadata: BackgroundJobMetadata) -> Self
54 where
55 F: futures::Future<Output = Result<(), BackgroundJobError>> + Send + 'static,
56 {
57 Self {
58 future: future.boxed(),
59 metadata,
60 }
61 }
62}
63
64#[derive(Debug, Clone)]
65pub struct BackgroundJobError {
66 pub message: String,
67}
68
69impl From<String> for BackgroundJobError {
70 fn from(message: String) -> Self {
71 Self { message }
72 }
73}
74
75impl From<&str> for BackgroundJobError {
76 fn from(message: &str) -> Self {
77 Self {
78 message: message.to_string(),
79 }
80 }
81}
82
83#[derive(Debug, Clone)]
84pub enum BackgroundSpawnError {
85 QueueFull,
86}
87
88impl std::fmt::Display for BackgroundSpawnError {
89 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90 match self {
91 BackgroundSpawnError::QueueFull => write!(f, "background task queue is full"),
92 }
93 }
94}
95
96impl std::error::Error for BackgroundSpawnError {}
97
98#[derive(Debug)]
99pub struct BackgroundShutdownError;
100
101#[derive(Default)]
102struct BackgroundMetrics {
103 queued: std::sync::atomic::AtomicU64,
104 running: std::sync::atomic::AtomicU64,
105 failed: std::sync::atomic::AtomicU64,
106}
107
108impl BackgroundMetrics {
109 fn inc_queued(&self) {
110 self.queued.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
111 }
112
113 fn dec_queued(&self) {
114 self.queued.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
115 }
116
117 fn inc_running(&self) {
118 self.running.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
119 }
120
121 fn dec_running(&self) {
122 self.running.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
123 }
124
125 fn inc_failed(&self) {
126 self.failed.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
127 }
128}
129
130#[derive(Clone)]
131pub struct BackgroundHandle {
132 sender: mpsc::Sender<BackgroundJob>,
133 metrics: Arc<BackgroundMetrics>,
134}
135
136impl BackgroundHandle {
137 pub fn spawn<F, Fut>(&self, f: F) -> Result<(), BackgroundSpawnError>
138 where
139 F: FnOnce() -> Fut,
140 Fut: futures::Future<Output = Result<(), BackgroundJobError>> + Send + 'static,
141 {
142 let future = f();
143 self.spawn_with_metadata(future, BackgroundJobMetadata::default())
144 }
145
146 pub fn spawn_with_metadata<Fut>(
147 &self,
148 future: Fut,
149 metadata: BackgroundJobMetadata,
150 ) -> Result<(), BackgroundSpawnError>
151 where
152 Fut: futures::Future<Output = Result<(), BackgroundJobError>> + Send + 'static,
153 {
154 self.metrics.inc_queued();
155 let job = BackgroundJob::new(future, metadata);
156 self.sender.try_send(job).map_err(|_| {
157 self.metrics.dec_queued();
158 BackgroundSpawnError::QueueFull
159 })
160 }
161}
162
163pub struct BackgroundRuntime {
164 handle: BackgroundHandle,
165 drain_timeout: Duration,
166 shutdown_token: CancellationToken,
167 join_handle: tokio::task::JoinHandle<()>,
168}
169
170impl BackgroundRuntime {
171 pub async fn start(config: BackgroundTaskConfig) -> Self {
172 let (tx, rx) = mpsc::channel(config.max_queue_size);
173 let metrics = Arc::new(BackgroundMetrics::default());
174 let handle = BackgroundHandle {
175 sender: tx.clone(),
176 metrics: metrics.clone(),
177 };
178 let shutdown_token = CancellationToken::new();
179 let semaphore = Arc::new(Semaphore::new(config.max_concurrent_tasks));
180 let driver_token = shutdown_token.clone();
181
182 let join_handle = tokio::spawn(run_executor(rx, semaphore, metrics.clone(), driver_token));
183
184 Self {
185 handle,
186 drain_timeout: Duration::from_secs(config.drain_timeout_secs),
187 shutdown_token,
188 join_handle,
189 }
190 }
191
192 pub fn handle(&self) -> BackgroundHandle {
193 self.handle.clone()
194 }
195
196 pub async fn shutdown(self) -> Result<(), BackgroundShutdownError> {
197 self.shutdown_token.cancel();
198 drop(self.handle);
199 match timeout(self.drain_timeout, self.join_handle).await {
200 Ok(Ok(_)) => Ok(()),
201 _ => Err(BackgroundShutdownError),
202 }
203 }
204}
205
206async fn run_executor(
207 mut rx: mpsc::Receiver<BackgroundJob>,
208 semaphore: Arc<Semaphore>,
209 metrics: Arc<BackgroundMetrics>,
210 token: CancellationToken,
211) {
212 let mut join_set = JoinSet::new();
213 let token_clone = token.clone();
214
215 loop {
216 tokio::select! {
217 maybe_job = rx.recv() => {
218 match maybe_job {
219 Some(job) => {
220 metrics.dec_queued();
221 let semaphore = semaphore.clone();
222 let metrics_clone = metrics.clone();
223 join_set.spawn(async move {
224 let BackgroundJob { future, metadata } = job;
225 match semaphore.acquire_owned().await {
226 Ok(_permit) => {
227 metrics_clone.inc_running();
228 if let Err(err) = future.await {
229 metrics_clone.inc_failed();
230 tracing::error!(target = "spikard::background", task = %metadata.name, error = %err.message, "background task failed");
231 }
232 metrics_clone.dec_running();
233 }
234 Err(_) => {
235 metrics_clone.inc_failed();
236 tracing::warn!(target = "spikard::background", "failed to acquire semaphore permit for background task");
237 }
238 }
239 });
240 }
241 None => break,
242 }
243 }
244 _ = token_clone.cancelled() => {
245 break;
246 }
247 }
248 }
249
250 let mut drain_attempts = 0;
251 loop {
252 match rx.try_recv() {
253 Ok(job) => {
254 metrics.dec_queued();
255 let semaphore = semaphore.clone();
256 let metrics_clone = metrics.clone();
257 join_set.spawn(async move {
258 let BackgroundJob { future, metadata } = job;
259 match semaphore.acquire_owned().await {
260 Ok(_permit) => {
261 metrics_clone.inc_running();
262 if let Err(err) = future.await {
263 metrics_clone.inc_failed();
264 tracing::error!(target = "spikard::background", task = %metadata.name, error = %err.message, "background task failed");
265 }
266 metrics_clone.dec_running();
267 }
268 Err(_) => {
269 metrics_clone.inc_failed();
270 tracing::warn!(target = "spikard::background", "failed to acquire semaphore permit for background task");
271 }
272 }
273 });
274 drain_attempts = 0;
275 }
276 Err(mpsc::error::TryRecvError::Empty) => {
277 drain_attempts += 1;
278 if drain_attempts > 100 {
279 break;
280 }
281 tokio::time::sleep(Duration::from_millis(10)).await;
282 }
283 Err(mpsc::error::TryRecvError::Disconnected) => {
284 break;
285 }
286 }
287 }
288
289 while join_set.join_next().await.is_some() {}
290}
291
292#[cfg(test)]
293mod tests {
294 use super::*;
295 use std::sync::atomic::{AtomicU64, Ordering};
296
297 #[tokio::test]
298 async fn test_basic_spawn_and_execution() {
299 let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
300 let handle = runtime.handle();
301
302 let counter = Arc::new(AtomicU64::new(0));
303 let counter_clone = counter.clone();
304
305 handle
306 .spawn(move || {
307 let c = counter_clone.clone();
308 async move {
309 c.fetch_add(1, Ordering::SeqCst);
310 Ok(())
311 }
312 })
313 .expect("spawn failed");
314
315 tokio::time::sleep(Duration::from_millis(100)).await;
316 assert_eq!(counter.load(Ordering::SeqCst), 1);
317
318 runtime.shutdown().await.expect("shutdown failed");
319 }
320
321 #[tokio::test]
322 async fn test_multiple_tasks() {
323 let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
324 let handle = runtime.handle();
325
326 let counter = Arc::new(AtomicU64::new(0));
327
328 for _ in 0..10 {
329 let counter_clone = counter.clone();
330 handle
331 .spawn(move || {
332 let c = counter_clone.clone();
333 async move {
334 c.fetch_add(1, Ordering::SeqCst);
335 Ok(())
336 }
337 })
338 .expect("spawn failed");
339 }
340
341 tokio::time::sleep(Duration::from_millis(200)).await;
342 assert_eq!(counter.load(Ordering::SeqCst), 10);
343
344 runtime.shutdown().await.expect("shutdown failed");
345 }
346
347 #[tokio::test]
348 async fn test_task_with_metadata() {
349 let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
350 let handle = runtime.handle();
351
352 let metadata = BackgroundJobMetadata {
353 name: Cow::Owned("test_task".to_string()),
354 request_id: Some("req-123".to_string()),
355 };
356
357 let counter = Arc::new(AtomicU64::new(0));
358 let counter_clone = counter.clone();
359
360 let future = async move {
361 counter_clone.fetch_add(1, Ordering::SeqCst);
362 Ok(())
363 };
364
365 handle.spawn_with_metadata(future, metadata).expect("spawn failed");
366
367 tokio::time::sleep(Duration::from_millis(100)).await;
368 assert_eq!(counter.load(Ordering::SeqCst), 1);
369
370 runtime.shutdown().await.expect("shutdown failed");
371 }
372
373 #[tokio::test]
374 async fn test_queue_full_error() {
375 let config = BackgroundTaskConfig {
376 max_queue_size: 2,
377 max_concurrent_tasks: 10,
378 drain_timeout_secs: 5,
379 };
380
381 let runtime = BackgroundRuntime::start(config).await;
382 let handle = runtime.handle();
383
384 let blocking_barrier = Arc::new(tokio::sync::Barrier::new(3));
385
386 for _ in 0..2 {
387 let barrier = blocking_barrier.clone();
388 handle
389 .spawn(move || {
390 let b = barrier.clone();
391 async move {
392 b.wait().await;
393 tokio::time::sleep(Duration::from_secs(1)).await;
394 Ok(())
395 }
396 })
397 .expect("spawn failed");
398 }
399
400 let result = handle.spawn(move || async { Ok(()) });
401 assert!(matches!(result, Err(BackgroundSpawnError::QueueFull)));
402
403 blocking_barrier.wait().await;
404 tokio::time::sleep(Duration::from_millis(100)).await;
405
406 runtime.shutdown().await.expect("shutdown failed");
407 }
408
409 #[tokio::test]
410 async fn test_task_failure_handling() {
411 let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
412 let handle = runtime.handle();
413
414 let success_count = Arc::new(AtomicU64::new(0));
415 let success_count_clone = success_count.clone();
416
417 handle
418 .spawn(move || {
419 let s = success_count_clone.clone();
420 async move {
421 s.fetch_add(1, Ordering::SeqCst);
422 Err(BackgroundJobError::from("test error"))
423 }
424 })
425 .expect("spawn failed");
426
427 tokio::time::sleep(Duration::from_millis(100)).await;
428 assert_eq!(success_count.load(Ordering::SeqCst), 1);
429
430 runtime.shutdown().await.expect("shutdown failed");
431 }
432
433 #[tokio::test(flavor = "multi_thread")]
434 async fn test_concurrency_limit_with_proper_synchronization() {
435 let config = BackgroundTaskConfig {
436 max_queue_size: 100,
437 max_concurrent_tasks: 2,
438 drain_timeout_secs: 30,
439 };
440
441 let runtime = BackgroundRuntime::start(config).await;
442 let handle = runtime.handle();
443
444 let running_count = Arc::new(AtomicU64::new(0));
445 let max_concurrent = Arc::new(AtomicU64::new(0));
446
447 for _ in 0..5 {
448 let running = running_count.clone();
449 let max = max_concurrent.clone();
450
451 handle
452 .spawn(move || {
453 let r = running.clone();
454 let m = max.clone();
455 async move {
456 r.fetch_add(1, Ordering::SeqCst);
457 let current_running = r.load(Ordering::SeqCst);
458 let mut current_max = m.load(Ordering::SeqCst);
459 while current_running > current_max {
460 m.store(current_running, Ordering::SeqCst);
461 current_max = current_running;
462 }
463
464 tokio::time::sleep(Duration::from_millis(100)).await;
465 r.fetch_sub(1, Ordering::SeqCst);
466 Ok(())
467 }
468 })
469 .expect("spawn failed");
470 }
471
472 tokio::time::sleep(Duration::from_millis(700)).await;
473 let max_concurrent_observed = max_concurrent.load(Ordering::SeqCst);
474 assert!(
475 max_concurrent_observed <= 2,
476 "Max concurrent should be <= 2, but was {}",
477 max_concurrent_observed
478 );
479
480 runtime.shutdown().await.expect("shutdown failed");
481 }
482
483 #[tokio::test]
484 async fn test_graceful_shutdown() {
485 let config = BackgroundTaskConfig {
486 max_queue_size: 10,
487 max_concurrent_tasks: 2,
488 drain_timeout_secs: 5,
489 };
490
491 let runtime = BackgroundRuntime::start(config).await;
492 let handle = runtime.handle();
493
494 let counter = Arc::new(AtomicU64::new(0));
495 let counter_clone = counter.clone();
496
497 handle
498 .spawn(move || {
499 let c = counter_clone.clone();
500 async move {
501 tokio::time::sleep(Duration::from_millis(50)).await;
502 c.fetch_add(1, Ordering::SeqCst);
503 Ok(())
504 }
505 })
506 .expect("spawn failed");
507
508 tokio::time::sleep(Duration::from_millis(200)).await;
509
510 let result = runtime.shutdown().await;
511 assert!(result.is_ok());
512 assert_eq!(counter.load(Ordering::SeqCst), 1);
513 }
514
515 #[tokio::test]
516 async fn test_shutdown_timeout() {
517 let config = BackgroundTaskConfig {
518 max_queue_size: 10,
519 max_concurrent_tasks: 2,
520 drain_timeout_secs: 1,
521 };
522
523 let runtime = BackgroundRuntime::start(config).await;
524 let handle = runtime.handle();
525
526 handle
527 .spawn(|| async {
528 tokio::time::sleep(Duration::from_secs(5)).await;
529 Ok(())
530 })
531 .expect("spawn failed");
532
533 tokio::time::sleep(Duration::from_millis(100)).await;
534
535 let result = runtime.shutdown().await;
536 assert!(result.is_err());
537 }
538
539 #[tokio::test]
540 async fn test_metrics_tracking() {
541 let config = BackgroundTaskConfig::default();
542 let runtime = BackgroundRuntime::start(config).await;
543 let handle = runtime.handle();
544
545 let barrier = Arc::new(tokio::sync::Barrier::new(2));
546
547 for _ in 0..2 {
548 let b = barrier.clone();
549 let _ = handle.spawn(move || {
550 let barrier = b.clone();
551 async move {
552 barrier.wait().await;
553 Ok(())
554 }
555 });
556 }
557
558 tokio::time::sleep(Duration::from_millis(150)).await;
559
560 runtime.shutdown().await.expect("shutdown failed");
561 }
562
563 #[tokio::test]
564 async fn test_task_cancellation_on_shutdown() {
565 let config = BackgroundTaskConfig {
566 max_queue_size: 10,
567 max_concurrent_tasks: 2,
568 drain_timeout_secs: 1,
569 };
570
571 let runtime = BackgroundRuntime::start(config).await;
572 let handle = runtime.handle();
573
574 let started_count = Arc::new(AtomicU64::new(0));
575 let _completed_count = Arc::new(AtomicU64::new(0));
576
577 let started = started_count.clone();
578
579 handle
580 .spawn(move || {
581 let s = started.clone();
582 async move {
583 s.fetch_add(1, Ordering::SeqCst);
584 tokio::time::sleep(Duration::from_secs(10)).await;
585 Ok(())
586 }
587 })
588 .expect("spawn failed");
589
590 tokio::time::sleep(Duration::from_millis(100)).await;
591 assert_eq!(started_count.load(Ordering::SeqCst), 1);
592
593 let shutdown_start = std::time::Instant::now();
594 let result = runtime.shutdown().await;
595 let shutdown_elapsed = shutdown_start.elapsed();
596
597 assert!(result.is_err());
598 assert!(shutdown_elapsed < Duration::from_secs(3));
599 }
600
601 #[tokio::test]
602 async fn test_queue_overflow_multiple_spawns() {
603 let config = BackgroundTaskConfig {
604 max_queue_size: 3,
605 max_concurrent_tasks: 10,
606 drain_timeout_secs: 5,
607 };
608
609 let runtime = BackgroundRuntime::start(config).await;
610 let handle = runtime.handle();
611
612 let blocking_barrier = Arc::new(tokio::sync::Barrier::new(4));
613
614 for _ in 0..3 {
615 let b = blocking_barrier.clone();
616 handle
617 .spawn(move || {
618 let barrier = b.clone();
619 async move {
620 barrier.wait().await;
621 tokio::time::sleep(Duration::from_millis(100)).await;
622 Ok(())
623 }
624 })
625 .expect("spawn failed");
626 }
627
628 let result = handle.spawn(|| async { Ok(()) });
629 assert!(matches!(result, Err(BackgroundSpawnError::QueueFull)));
630
631 blocking_barrier.wait().await;
632 tokio::time::sleep(Duration::from_millis(200)).await;
633
634 let result = handle.spawn(|| async { Ok(()) });
635 assert!(result.is_ok());
636
637 runtime.shutdown().await.expect("shutdown failed");
638 }
639
640 #[tokio::test]
641 async fn test_concurrent_task_execution_order() {
642 let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
643 let handle = runtime.handle();
644
645 let execution_order = Arc::new(tokio::sync::Mutex::new(Vec::new()));
646
647 for i in 0..5 {
648 let order = execution_order.clone();
649 handle
650 .spawn(move || {
651 let o = order.clone();
652 async move {
653 o.lock().await.push(i);
654 Ok(())
655 }
656 })
657 .expect("spawn failed");
658 }
659
660 tokio::time::sleep(Duration::from_millis(200)).await;
661
662 let order = execution_order.lock().await;
663 assert_eq!(order.len(), 5);
664 for i in 0..5 {
665 assert!(order.contains(&i));
666 }
667
668 runtime.shutdown().await.expect("shutdown failed");
669 }
670
671 #[tokio::test]
672 async fn test_error_from_string_conversion() {
673 let error = BackgroundJobError::from("test message");
674 assert_eq!(error.message, "test message");
675
676 let error2 = BackgroundJobError::from("test".to_string());
677 assert_eq!(error2.message, "test");
678 }
679
680 #[tokio::test]
681 async fn test_background_job_metadata_default() {
682 let metadata = BackgroundJobMetadata::default();
683 assert_eq!(metadata.name, "background_task");
684 assert_eq!(metadata.request_id, None);
685 }
686
687 #[tokio::test]
688 async fn test_background_job_metadata_custom() {
689 let metadata = BackgroundJobMetadata {
690 name: Cow::Borrowed("custom_task"),
691 request_id: Some("req-456".to_string()),
692 };
693 assert_eq!(metadata.name, "custom_task");
694 assert_eq!(metadata.request_id, Some("req-456".to_string()));
695 }
696
697 #[tokio::test]
698 async fn test_metrics_inc_dec_operations() {
699 let metrics = BackgroundMetrics::default();
700
701 metrics.inc_queued();
702 assert_eq!(metrics.queued.load(Ordering::Relaxed), 1);
703
704 metrics.inc_queued();
705 assert_eq!(metrics.queued.load(Ordering::Relaxed), 2);
706
707 metrics.dec_queued();
708 assert_eq!(metrics.queued.load(Ordering::Relaxed), 1);
709
710 metrics.inc_running();
711 assert_eq!(metrics.running.load(Ordering::Relaxed), 1);
712
713 metrics.dec_running();
714 assert_eq!(metrics.running.load(Ordering::Relaxed), 0);
715
716 metrics.inc_failed();
717 assert_eq!(metrics.failed.load(Ordering::Relaxed), 1);
718
719 metrics.inc_failed();
720 assert_eq!(metrics.failed.load(Ordering::Relaxed), 2);
721 }
722
723 #[tokio::test]
724 async fn test_spawn_error_display() {
725 let error = BackgroundSpawnError::QueueFull;
726 assert_eq!(error.to_string(), "background task queue is full");
727 }
728
729 #[tokio::test]
730 async fn test_background_config_default() {
731 let config = BackgroundTaskConfig::default();
732 assert_eq!(config.max_queue_size, 1024);
733 assert_eq!(config.max_concurrent_tasks, 128);
734 assert_eq!(config.drain_timeout_secs, 30);
735 }
736
737 #[tokio::test]
738 async fn test_shutdown_with_zero_pending_tasks() {
739 let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
740
741 let result = runtime.shutdown().await;
742 assert!(result.is_ok(), "shutdown should succeed with no tasks");
743 }
744
745 #[tokio::test]
746 async fn test_shutdown_with_only_running_tasks() {
747 let config = BackgroundTaskConfig {
748 max_queue_size: 10,
749 max_concurrent_tasks: 2,
750 drain_timeout_secs: 5,
751 };
752 let runtime = BackgroundRuntime::start(config).await;
753 let handle = runtime.handle();
754
755 let execution_started: Arc<std::sync::atomic::AtomicBool> = Arc::new(std::sync::atomic::AtomicBool::new(false));
756 let execution_completed: Arc<std::sync::atomic::AtomicBool> =
757 Arc::new(std::sync::atomic::AtomicBool::new(false));
758
759 let started = execution_started.clone();
760 let completed = execution_completed.clone();
761
762 handle
763 .spawn(move || {
764 let s = started.clone();
765 let c = completed.clone();
766 async move {
767 s.store(true, std::sync::atomic::Ordering::SeqCst);
768 tokio::time::sleep(Duration::from_millis(100)).await;
769 c.store(true, std::sync::atomic::Ordering::SeqCst);
770 Ok(())
771 }
772 })
773 .unwrap();
774
775 tokio::time::sleep(Duration::from_millis(20)).await;
776
777 let result = runtime.shutdown().await;
778 assert!(result.is_ok(), "shutdown should succeed and wait for running tasks");
779 assert!(
780 execution_completed.load(std::sync::atomic::Ordering::SeqCst),
781 "task should have completed"
782 );
783 }
784
785 #[tokio::test]
787 async fn test_shutdown_drains_queued_tasks() {
788 let config = BackgroundTaskConfig {
789 max_queue_size: 100,
790 max_concurrent_tasks: 1,
791 drain_timeout_secs: 5,
792 };
793 let runtime = BackgroundRuntime::start(config).await;
794 let handle = runtime.handle();
795
796 let execution_count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
797
798 for _ in 0..10 {
799 let count = execution_count.clone();
800 handle
801 .spawn(move || {
802 let c = count.clone();
803 async move {
804 c.fetch_add(1, Ordering::SeqCst);
805 tokio::time::sleep(Duration::from_millis(10)).await;
806 Ok(())
807 }
808 })
809 .unwrap();
810 }
811
812 let result = runtime.shutdown().await;
813 assert!(result.is_ok());
814 assert_eq!(
815 execution_count.load(Ordering::SeqCst),
816 10,
817 "all queued tasks should execute"
818 );
819 }
820
821 #[tokio::test]
822 async fn test_shutdown_timeout_force_stops_long_tasks() {
823 let config = BackgroundTaskConfig {
824 max_queue_size: 10,
825 max_concurrent_tasks: 2,
826 drain_timeout_secs: 1,
827 };
828 let runtime = BackgroundRuntime::start(config).await;
829 let handle = runtime.handle();
830
831 let completed: Arc<std::sync::atomic::AtomicBool> = Arc::new(std::sync::atomic::AtomicBool::new(false));
832 let completed_clone = completed.clone();
833
834 handle
835 .spawn(move || {
836 let c = completed_clone.clone();
837 async move {
838 tokio::time::sleep(Duration::from_secs(10)).await;
839 c.store(true, std::sync::atomic::Ordering::SeqCst);
840 Ok(())
841 }
842 })
843 .unwrap();
844
845 tokio::time::sleep(Duration::from_millis(50)).await;
846
847 let shutdown_start = std::time::Instant::now();
848 let result = runtime.shutdown().await;
849 let elapsed = shutdown_start.elapsed();
850
851 assert!(result.is_err(), "shutdown should timeout");
852 assert!(
853 elapsed < Duration::from_secs(3),
854 "shutdown should timeout near drain_timeout"
855 );
856 assert!(
857 !completed.load(std::sync::atomic::Ordering::SeqCst),
858 "long-running task should not complete"
859 );
860 }
861
862 #[tokio::test]
863 async fn test_multiple_shutdown_calls_idempotent() {
864 let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
865
866 let result1 = runtime.shutdown().await;
867 assert!(result1.is_ok(), "first shutdown should succeed");
868 }
869
870 #[tokio::test]
871 async fn test_spawn_after_all_senders_dropped_fails() {
872 let config = BackgroundTaskConfig::default();
873 let runtime = BackgroundRuntime::start(config).await;
874 let handle = runtime.handle();
875
876 runtime.shutdown().await.expect("shutdown should succeed");
877
878 tokio::time::sleep(Duration::from_millis(50)).await;
879
880 let result = handle.spawn(|| async { Ok(()) });
881 assert!(result.is_err(), "spawn should fail after all senders are dropped");
882 }
883
884 #[tokio::test]
885 async fn test_concurrent_spawns_hit_semaphore_limit() {
886 let config = BackgroundTaskConfig {
887 max_queue_size: 100,
888 max_concurrent_tasks: 3,
889 drain_timeout_secs: 10,
890 };
891 let runtime = BackgroundRuntime::start(config).await;
892 let handle = runtime.handle();
893
894 let barrier: Arc<tokio::sync::Barrier> = Arc::new(tokio::sync::Barrier::new(3));
895 let running_count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
896 let peak_concurrent: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
897
898 for _ in 0..5 {
899 let b = barrier.clone();
900 let running = running_count.clone();
901 let peak = peak_concurrent.clone();
902
903 handle
904 .spawn(move || {
905 let barrier = b.clone();
906 let r = running.clone();
907 let p = peak.clone();
908 async move {
909 let current = r.fetch_add(1, Ordering::SeqCst) + 1;
910 let mut peak_val = p.load(Ordering::SeqCst);
911 while current > peak_val {
912 if p.compare_exchange(peak_val, current, Ordering::SeqCst, Ordering::SeqCst)
913 .is_ok()
914 {
915 break;
916 }
917 peak_val = p.load(Ordering::SeqCst);
918 }
919
920 barrier.wait().await;
921 tokio::time::sleep(Duration::from_millis(200)).await;
922 r.fetch_sub(1, Ordering::SeqCst);
923 Ok(())
924 }
925 })
926 .unwrap();
927 }
928
929 barrier.wait().await;
930 tokio::time::sleep(Duration::from_millis(100)).await;
931
932 let peak = peak_concurrent.load(Ordering::SeqCst);
933 assert!(
934 peak <= 3,
935 "concurrent execution should not exceed semaphore limit of 3, got {}",
936 peak
937 );
938
939 runtime.shutdown().await.unwrap();
940 }
941
942 #[tokio::test]
943 async fn test_task_panic_cleanup_still_occurs() {
944 let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
945 let handle = runtime.handle();
946
947 let mut spawned_count: u32 = 0;
948 let panic_task_executed: Arc<std::sync::atomic::AtomicBool> =
949 Arc::new(std::sync::atomic::AtomicBool::new(false));
950 let after_panic_executed: Arc<std::sync::atomic::AtomicBool> =
951 Arc::new(std::sync::atomic::AtomicBool::new(false));
952
953 let panic_flag = panic_task_executed.clone();
954 handle
955 .spawn(move || {
956 let p = panic_flag.clone();
957 async move {
958 p.store(true, std::sync::atomic::Ordering::SeqCst);
959 Err(BackgroundJobError::from("simulated task failure"))
960 }
961 })
962 .unwrap();
963 spawned_count += 1;
964
965 let after_flag = after_panic_executed.clone();
966 handle
967 .spawn(move || {
968 let a = after_flag.clone();
969 async move {
970 tokio::time::sleep(Duration::from_millis(50)).await;
971 a.store(true, std::sync::atomic::Ordering::SeqCst);
972 Ok(())
973 }
974 })
975 .unwrap();
976 spawned_count += 1;
977
978 tokio::time::sleep(Duration::from_millis(200)).await;
979
980 assert!(panic_task_executed.load(std::sync::atomic::Ordering::SeqCst));
981 assert!(after_panic_executed.load(std::sync::atomic::Ordering::SeqCst));
982 assert_eq!(spawned_count, 2);
983
984 runtime.shutdown().await.unwrap();
985 }
986
987 #[tokio::test]
988 async fn test_queue_overflow_with_immediate_rejection() {
989 let config = BackgroundTaskConfig {
990 max_queue_size: 2,
991 max_concurrent_tasks: 100,
992 drain_timeout_secs: 5,
993 };
994 let runtime = BackgroundRuntime::start(config).await;
995 let handle = runtime.handle();
996
997 let barrier: Arc<tokio::sync::Barrier> = Arc::new(tokio::sync::Barrier::new(3));
998
999 for _ in 0..2 {
1000 let b = barrier.clone();
1001 handle
1002 .spawn(move || {
1003 let barrier = b.clone();
1004 async move {
1005 barrier.wait().await;
1006 tokio::time::sleep(Duration::from_millis(500)).await;
1007 Ok(())
1008 }
1009 })
1010 .unwrap();
1011 }
1012
1013 let overflow_result = handle.spawn(|| async { Ok(()) });
1014 assert!(matches!(overflow_result, Err(BackgroundSpawnError::QueueFull)));
1015
1016 barrier.wait().await;
1017 runtime.shutdown().await.unwrap();
1018 }
1019
1020 #[tokio::test]
1021 async fn test_metrics_accuracy_under_concurrent_load() {
1022 let config = BackgroundTaskConfig {
1023 max_queue_size: 50,
1024 max_concurrent_tasks: 5,
1025 drain_timeout_secs: 10,
1026 };
1027 let runtime = BackgroundRuntime::start(config).await;
1028 let handle = runtime.handle();
1029
1030 let completed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1031
1032 for _ in 0..20 {
1033 let c = completed.clone();
1034 handle
1035 .spawn(move || {
1036 let count = c.clone();
1037 async move {
1038 tokio::time::sleep(Duration::from_millis(50)).await;
1039 count.fetch_add(1, Ordering::SeqCst);
1040 Ok(())
1041 }
1042 })
1043 .unwrap();
1044 }
1045
1046 runtime.shutdown().await.unwrap();
1047 assert_eq!(completed.load(Ordering::SeqCst), 20, "all tasks should complete");
1048 }
1049
1050 #[tokio::test]
1051 async fn test_drain_with_slowly_completing_tasks() {
1052 let config = BackgroundTaskConfig {
1053 max_queue_size: 50,
1054 max_concurrent_tasks: 2,
1055 drain_timeout_secs: 10,
1056 };
1057 let runtime = BackgroundRuntime::start(config).await;
1058 let handle = runtime.handle();
1059
1060 let completed_count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1061
1062 for i in 0..5 {
1063 let count = completed_count.clone();
1064 handle
1065 .spawn(move || {
1066 let c = count.clone();
1067 async move {
1068 let sleep_ms = 100 + (i as u64 * 50);
1069 tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
1070 c.fetch_add(1, Ordering::SeqCst);
1071 Ok(())
1072 }
1073 })
1074 .unwrap();
1075 }
1076
1077 let result = runtime.shutdown().await;
1078 assert!(result.is_ok());
1079 assert_eq!(completed_count.load(Ordering::SeqCst), 5);
1080 }
1081
1082 #[tokio::test]
1083 async fn test_semaphore_starvation_doesnt_deadlock() {
1084 let config = BackgroundTaskConfig {
1085 max_queue_size: 100,
1086 max_concurrent_tasks: 1,
1087 drain_timeout_secs: 10,
1088 };
1089 let runtime = BackgroundRuntime::start(config).await;
1090 let handle = runtime.handle();
1091
1092 let completion_order: Arc<tokio::sync::Mutex<Vec<u32>>> = Arc::new(tokio::sync::Mutex::new(Vec::new()));
1093
1094 for i in 0..10 {
1095 let order = completion_order.clone();
1096 handle
1097 .spawn(move || {
1098 let o = order.clone();
1099 async move {
1100 tokio::time::sleep(Duration::from_millis(5)).await;
1101 let mut guard = o.lock().await;
1102 guard.push(i);
1103 Ok(())
1104 }
1105 })
1106 .unwrap();
1107 }
1108
1109 let result = runtime.shutdown().await;
1110 assert!(result.is_ok());
1111
1112 let order = completion_order.lock().await;
1113 assert_eq!(order.len(), 10);
1114 }
1115
1116 #[tokio::test]
1117 async fn test_cancel_task_mid_execution() {
1118 let config = BackgroundTaskConfig {
1119 max_queue_size: 10,
1120 max_concurrent_tasks: 2,
1121 drain_timeout_secs: 1,
1122 };
1123 let runtime = BackgroundRuntime::start(config).await;
1124 let handle = runtime.handle();
1125
1126 let started: Arc<std::sync::atomic::AtomicBool> = Arc::new(std::sync::atomic::AtomicBool::new(false));
1127 let ended: Arc<std::sync::atomic::AtomicBool> = Arc::new(std::sync::atomic::AtomicBool::new(false));
1128
1129 let start_flag = started.clone();
1130 let end_flag = ended.clone();
1131
1132 handle
1133 .spawn(move || {
1134 let s = start_flag.clone();
1135 let e = end_flag.clone();
1136 async move {
1137 s.store(true, std::sync::atomic::Ordering::SeqCst);
1138 tokio::time::sleep(Duration::from_secs(10)).await;
1139 e.store(true, std::sync::atomic::Ordering::SeqCst);
1140 Ok(())
1141 }
1142 })
1143 .unwrap();
1144
1145 tokio::time::sleep(Duration::from_millis(50)).await;
1146 assert!(started.load(std::sync::atomic::Ordering::SeqCst));
1147
1148 let result = runtime.shutdown().await;
1149 assert!(result.is_err(), "shutdown should timeout due to long task");
1150 assert!(
1151 !ended.load(std::sync::atomic::Ordering::SeqCst),
1152 "task should not complete"
1153 );
1154 }
1155
1156 #[tokio::test]
1157 async fn test_rapid_spawn_and_shutdown() {
1158 let config = BackgroundTaskConfig {
1159 max_queue_size: 1000,
1160 max_concurrent_tasks: 10,
1161 drain_timeout_secs: 5,
1162 };
1163 let runtime = BackgroundRuntime::start(config).await;
1164 let handle = runtime.handle();
1165
1166 let count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1167
1168 for _ in 0..100 {
1169 let c = count.clone();
1170 let _ = handle.spawn(move || {
1171 let counter = c.clone();
1172 async move {
1173 counter.fetch_add(1, Ordering::SeqCst);
1174 Ok(())
1175 }
1176 });
1177 }
1178
1179 let result = runtime.shutdown().await;
1180 assert!(result.is_ok());
1181
1182 let final_count = count.load(Ordering::SeqCst);
1183 assert!(final_count > 0, "at least some tasks should execute");
1184 assert!(final_count <= 100, "no more than spawned count should execute");
1185 }
1186
1187 #[tokio::test]
1188 async fn test_shutdown_with_mixed_success_and_failure_tasks() {
1189 let config = BackgroundTaskConfig::default();
1190 let runtime = BackgroundRuntime::start(config).await;
1191 let handle = runtime.handle();
1192
1193 let success_count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1194 let failure_count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1195
1196 for i in 0..10 {
1197 if i % 2 == 0 {
1198 let s = success_count.clone();
1199 handle
1200 .spawn(move || {
1201 let counter = s.clone();
1202 async move {
1203 counter.fetch_add(1, Ordering::SeqCst);
1204 Ok(())
1205 }
1206 })
1207 .unwrap();
1208 } else {
1209 let f = failure_count.clone();
1210 handle
1211 .spawn(move || {
1212 let counter = f.clone();
1213 async move {
1214 counter.fetch_add(1, Ordering::SeqCst);
1215 Err(BackgroundJobError::from("intentional failure"))
1216 }
1217 })
1218 .unwrap();
1219 }
1220 }
1221
1222 tokio::time::sleep(Duration::from_millis(200)).await;
1223
1224 let result = runtime.shutdown().await;
1225 assert!(result.is_ok());
1226 assert_eq!(success_count.load(Ordering::SeqCst), 5);
1227 assert_eq!(failure_count.load(Ordering::SeqCst), 5);
1228 }
1229
1230 #[tokio::test]
1231 async fn test_concurrent_handle_clones_spawn_independently() {
1232 let config = BackgroundTaskConfig::default();
1233 let runtime = BackgroundRuntime::start(config).await;
1234 let handle = runtime.handle();
1235
1236 let count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1237
1238 let mut join_handles = vec![];
1239
1240 for _ in 0..3 {
1241 let h = handle.clone();
1242 let c = count.clone();
1243
1244 let jh = tokio::spawn(async move {
1245 for _ in 0..5 {
1246 let counter = c.clone();
1247 let _ = h.spawn(move || {
1248 let cnt = counter.clone();
1249 async move {
1250 cnt.fetch_add(1, Ordering::SeqCst);
1251 Ok(())
1252 }
1253 });
1254 }
1255 });
1256 join_handles.push(jh);
1257 }
1258
1259 for jh in join_handles {
1260 let _ = jh.await;
1261 }
1262
1263 tokio::time::sleep(Duration::from_millis(200)).await;
1264
1265 let result = runtime.shutdown().await;
1266 assert!(result.is_ok());
1267 assert_eq!(count.load(Ordering::SeqCst), 15);
1268 }
1269
1270 #[tokio::test]
1271 async fn test_queue_full_metrics_updated() {
1272 let config = BackgroundTaskConfig {
1273 max_queue_size: 2,
1274 max_concurrent_tasks: 100,
1275 drain_timeout_secs: 5,
1276 };
1277 let runtime = BackgroundRuntime::start(config).await;
1278 let handle = runtime.handle();
1279
1280 let barrier: Arc<tokio::sync::Barrier> = Arc::new(tokio::sync::Barrier::new(3));
1281
1282 for _ in 0..2 {
1283 let b = barrier.clone();
1284 handle
1285 .spawn(move || {
1286 let barrier = b.clone();
1287 async move {
1288 barrier.wait().await;
1289 tokio::time::sleep(Duration::from_secs(1)).await;
1290 Ok(())
1291 }
1292 })
1293 .unwrap();
1294 }
1295
1296 let result = handle.spawn(|| async { Ok(()) });
1297 assert!(matches!(result, Err(BackgroundSpawnError::QueueFull)));
1298
1299 barrier.wait().await;
1300 tokio::time::sleep(Duration::from_millis(100)).await;
1301
1302 runtime.shutdown().await.unwrap();
1303 }
1304
1305 #[tokio::test]
1306 async fn test_handle_persistence_across_spawns() {
1307 let config = BackgroundTaskConfig::default();
1308 let runtime = BackgroundRuntime::start(config).await;
1309 let handle = runtime.handle();
1310
1311 let count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1312
1313 for _ in 0..5 {
1314 let c = count.clone();
1315 handle
1316 .spawn(move || {
1317 let counter = c.clone();
1318 async move {
1319 counter.fetch_add(1, Ordering::SeqCst);
1320 Ok(())
1321 }
1322 })
1323 .unwrap();
1324 }
1325
1326 tokio::time::sleep(Duration::from_millis(150)).await;
1327 assert_eq!(count.load(Ordering::SeqCst), 5);
1328
1329 runtime.shutdown().await.unwrap();
1330 }
1331
1332 #[tokio::test]
1333 async fn test_shutdown_with_queue_at_capacity() {
1334 let config = BackgroundTaskConfig {
1335 max_queue_size: 5,
1336 max_concurrent_tasks: 1,
1337 drain_timeout_secs: 10,
1338 };
1339 let runtime = BackgroundRuntime::start(config).await;
1340 let handle = runtime.handle();
1341
1342 let completion_count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1343
1344 for _ in 0..5 {
1345 let c = completion_count.clone();
1346 handle
1347 .spawn(move || {
1348 let counter = c.clone();
1349 async move {
1350 tokio::time::sleep(Duration::from_millis(20)).await;
1351 counter.fetch_add(1, Ordering::SeqCst);
1352 Ok(())
1353 }
1354 })
1355 .unwrap();
1356 }
1357
1358 let result = runtime.shutdown().await;
1359 assert!(result.is_ok());
1360 assert_eq!(completion_count.load(Ordering::SeqCst), 5);
1361 }
1362
1363 #[tokio::test]
1364 async fn test_metadata_preserved_through_execution() {
1365 let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
1366 let handle = runtime.handle();
1367
1368 let metadata = BackgroundJobMetadata {
1369 name: Cow::Owned("test_metadata_task".to_string()),
1370 request_id: Some("req-metadata-123".to_string()),
1371 };
1372
1373 let executed: Arc<std::sync::atomic::AtomicBool> = Arc::new(std::sync::atomic::AtomicBool::new(false));
1374 let executed_clone = executed.clone();
1375
1376 let future = async move {
1377 executed_clone.store(true, std::sync::atomic::Ordering::SeqCst);
1378 Ok(())
1379 };
1380
1381 handle.spawn_with_metadata(future, metadata).unwrap();
1382
1383 tokio::time::sleep(Duration::from_millis(100)).await;
1384
1385 assert!(executed.load(std::sync::atomic::Ordering::SeqCst));
1386 runtime.shutdown().await.unwrap();
1387 }
1388
1389 #[tokio::test]
1390 async fn test_very_short_drain_timeout_forces_stop() {
1391 let config = BackgroundTaskConfig {
1392 max_queue_size: 10,
1393 max_concurrent_tasks: 2,
1394 drain_timeout_secs: 0,
1395 };
1396 let runtime = BackgroundRuntime::start(config).await;
1397 let handle = runtime.handle();
1398
1399 handle
1400 .spawn(|| async {
1401 tokio::time::sleep(Duration::from_secs(1)).await;
1402 Ok(())
1403 })
1404 .unwrap();
1405
1406 tokio::time::sleep(Duration::from_millis(10)).await;
1407
1408 let result = runtime.shutdown().await;
1409 assert!(result.is_err());
1410 }
1411
1412 #[tokio::test]
1413 async fn test_spawn_many_tasks_sequential_drain() {
1414 let config = BackgroundTaskConfig {
1415 max_queue_size: 200,
1416 max_concurrent_tasks: 2,
1417 drain_timeout_secs: 15,
1418 };
1419 let runtime = BackgroundRuntime::start(config).await;
1420 let handle = runtime.handle();
1421
1422 let count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1423
1424 for _ in 0..50 {
1425 let c = count.clone();
1426 handle
1427 .spawn(move || {
1428 let counter = c.clone();
1429 async move {
1430 tokio::time::sleep(Duration::from_millis(1)).await;
1431 counter.fetch_add(1, Ordering::SeqCst);
1432 Ok(())
1433 }
1434 })
1435 .unwrap();
1436 }
1437
1438 let result = runtime.shutdown().await;
1439 assert!(result.is_ok());
1440 assert_eq!(count.load(Ordering::SeqCst), 50);
1441 }
1442
1443 #[tokio::test]
1444 async fn test_no_deadlock_with_max_concurrency_barrier() {
1445 let config = BackgroundTaskConfig {
1446 max_queue_size: 100,
1447 max_concurrent_tasks: 3,
1448 drain_timeout_secs: 10,
1449 };
1450 let runtime = BackgroundRuntime::start(config).await;
1451 let handle = runtime.handle();
1452
1453 let barrier: Arc<tokio::sync::Barrier> = Arc::new(tokio::sync::Barrier::new(4));
1454
1455 for _ in 0..3 {
1456 let b = barrier.clone();
1457 handle
1458 .spawn(move || {
1459 let barrier = b.clone();
1460 async move {
1461 barrier.wait().await;
1462 tokio::time::sleep(Duration::from_millis(50)).await;
1463 Ok(())
1464 }
1465 })
1466 .unwrap();
1467 }
1468
1469 barrier.wait().await;
1470 tokio::time::sleep(Duration::from_millis(100)).await;
1471
1472 let result = runtime.shutdown().await;
1473 assert!(result.is_ok());
1474 }
1475
1476 #[tokio::test]
1477 async fn test_error_from_owned_string() {
1478 let message = String::from("error message");
1479 let error = BackgroundJobError::from(message);
1480 assert_eq!(error.message, "error message");
1481 }
1482
1483 #[tokio::test]
1484 async fn test_borrowed_str_conversion() {
1485 let error = BackgroundJobError::from("borrowed message");
1486 assert_eq!(error.message, "borrowed message");
1487 }
1488
1489 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1490 async fn test_phase1_semaphore_acquisition_with_concurrent_load() {
1491 let config = BackgroundTaskConfig {
1492 max_queue_size: 50,
1493 max_concurrent_tasks: 1,
1494 drain_timeout_secs: 10,
1495 };
1496
1497 let runtime = BackgroundRuntime::start(config).await;
1498 let handle = runtime.handle();
1499
1500 let execution_count = Arc::new(AtomicU64::new(0));
1501
1502 let blocking_barrier = Arc::new(tokio::sync::Barrier::new(2));
1503 let barrier_clone = blocking_barrier.clone();
1504
1505 handle
1506 .spawn(move || {
1507 let b = barrier_clone.clone();
1508 async move {
1509 b.wait().await;
1510 tokio::time::sleep(Duration::from_millis(100)).await;
1511 Ok(())
1512 }
1513 })
1514 .unwrap();
1515
1516 tokio::time::sleep(Duration::from_millis(50)).await;
1517
1518 for _ in 0..3 {
1519 let exec_clone = execution_count.clone();
1520 handle
1521 .spawn(move || {
1522 let e = exec_clone.clone();
1523 async move {
1524 e.fetch_add(1, Ordering::SeqCst);
1525 Ok(())
1526 }
1527 })
1528 .unwrap();
1529 }
1530
1531 blocking_barrier.wait().await;
1532 tokio::time::sleep(Duration::from_millis(250)).await;
1533
1534 assert_eq!(execution_count.load(Ordering::SeqCst), 3);
1535
1536 runtime.shutdown().await.unwrap();
1537 }
1538
1539 #[tokio::test(flavor = "multi_thread")]
1540 async fn test_concurrent_task_completion_race_conditions() {
1541 let config = BackgroundTaskConfig {
1542 max_queue_size: 100,
1543 max_concurrent_tasks: 8,
1544 drain_timeout_secs: 10,
1545 };
1546
1547 let runtime = BackgroundRuntime::start(config).await;
1548 let handle = runtime.handle();
1549
1550 let completion_counter = Arc::new(AtomicU64::new(0));
1551 let task_count = 50;
1552
1553 for i in 0..task_count {
1554 let counter = completion_counter.clone();
1555 handle
1556 .spawn(move || {
1557 let c = counter.clone();
1558 async move {
1559 let sleep_ms = (i as u64 * 11) % 100;
1560 tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
1561 c.fetch_add(1, Ordering::SeqCst);
1562 Ok(())
1563 }
1564 })
1565 .unwrap();
1566 }
1567
1568 runtime.shutdown().await.unwrap();
1569 assert_eq!(
1570 completion_counter.load(Ordering::SeqCst),
1571 task_count,
1572 "all tasks should complete despite race conditions"
1573 );
1574 }
1575
1576 #[tokio::test(flavor = "multi_thread")]
1577 async fn test_failure_metric_tracking_under_concurrent_errors() {
1578 let config = BackgroundTaskConfig {
1579 max_queue_size: 50,
1580 max_concurrent_tasks: 5,
1581 drain_timeout_secs: 10,
1582 };
1583
1584 let runtime = BackgroundRuntime::start(config).await;
1585 let handle = runtime.handle();
1586
1587 let success_count = Arc::new(AtomicU64::new(0));
1588 let failure_count = Arc::new(AtomicU64::new(0));
1589
1590 for i in 0..20 {
1591 if i % 3 == 0 {
1592 let fail_clone = failure_count.clone();
1593 handle
1594 .spawn(move || {
1595 let f = fail_clone.clone();
1596 async move {
1597 f.fetch_add(1, Ordering::SeqCst);
1598 Err(BackgroundJobError::from("intentional failure"))
1599 }
1600 })
1601 .unwrap();
1602 } else {
1603 let succ_clone = success_count.clone();
1604 handle
1605 .spawn(move || {
1606 let s = succ_clone.clone();
1607 async move {
1608 s.fetch_add(1, Ordering::SeqCst);
1609 Ok(())
1610 }
1611 })
1612 .unwrap();
1613 }
1614 }
1615
1616 runtime.shutdown().await.unwrap();
1617
1618 let final_success = success_count.load(Ordering::SeqCst);
1619 let final_failure = failure_count.load(Ordering::SeqCst);
1620
1621 assert_eq!(final_success + final_failure, 20);
1622 assert_eq!(final_failure, 7);
1623 assert_eq!(final_success, 13);
1624 }
1625
1626 #[tokio::test(flavor = "multi_thread")]
1627 async fn test_handle_clone_isolation_concurrent_spawns() {
1628 let config = BackgroundTaskConfig {
1629 max_queue_size: 100,
1630 max_concurrent_tasks: 4,
1631 drain_timeout_secs: 10,
1632 };
1633
1634 let runtime = BackgroundRuntime::start(config).await;
1635 let handle = runtime.handle();
1636
1637 let completion_counters: Vec<Arc<AtomicU64>> = (0..5).map(|_| Arc::new(AtomicU64::new(0))).collect();
1638
1639 let mut join_handles = vec![];
1640
1641 for worker_id in 0..5 {
1642 let h = handle.clone();
1643 let counter = completion_counters[worker_id].clone();
1644
1645 let jh = tokio::spawn(async move {
1646 for task_id in 0..10 {
1647 let c = counter.clone();
1648 let _ = h.spawn(move || {
1649 let cnt = c.clone();
1650 async move {
1651 let delay = (worker_id as u64 * 10 + task_id as u64) % 50;
1652 tokio::time::sleep(Duration::from_millis(delay)).await;
1653 cnt.fetch_add(1, Ordering::SeqCst);
1654 Ok(())
1655 }
1656 });
1657 }
1658 });
1659 join_handles.push(jh);
1660 }
1661
1662 for jh in join_handles {
1663 let _ = jh.await;
1664 }
1665
1666 runtime.shutdown().await.unwrap();
1667
1668 for (i, counter) in completion_counters.iter().enumerate() {
1669 assert_eq!(
1670 counter.load(Ordering::SeqCst),
1671 10,
1672 "worker {} should have exactly 10 task completions",
1673 i
1674 );
1675 }
1676 }
1677
1678 #[tokio::test]
1679 async fn test_background_job_error_with_string_slice() {
1680 let errors = vec![
1681 BackgroundJobError::from("simple error"),
1682 BackgroundJobError::from(String::from("owned string error")),
1683 ];
1684
1685 for error in errors {
1686 assert!(!error.message.is_empty());
1687 }
1688 }
1689
1690 #[tokio::test]
1691 async fn test_spawn_error_display_formatting() {
1692 let error = BackgroundSpawnError::QueueFull;
1693 let formatted = format!("{}", error);
1694 assert_eq!(formatted, "background task queue is full");
1695
1696 let result: Result<(), BackgroundSpawnError> = Err(error);
1697 assert!(result.is_err());
1698 }
1699
1700 #[tokio::test]
1701 async fn test_background_metrics_concurrent_increments() {
1702 let metrics = Arc::new(BackgroundMetrics::default());
1703 let mut handles = vec![];
1704
1705 for _ in 0..10 {
1706 let m = metrics.clone();
1707 let h = tokio::spawn(async move {
1708 for _ in 0..10 {
1709 m.inc_queued();
1710 m.inc_running();
1711 m.inc_failed();
1712 m.dec_queued();
1713 m.dec_running();
1714 }
1715 });
1716 handles.push(h);
1717 }
1718
1719 for h in handles {
1720 let _ = h.await;
1721 }
1722
1723 assert_eq!(metrics.queued.load(Ordering::Relaxed), 0);
1724 assert_eq!(metrics.running.load(Ordering::Relaxed), 0);
1725 assert_eq!(metrics.failed.load(Ordering::Relaxed), 100);
1726 }
1727
1728 #[tokio::test]
1729 async fn test_drain_phase_execution_with_lingering_senders() {
1730 let config = BackgroundTaskConfig {
1731 max_queue_size: 20,
1732 max_concurrent_tasks: 2,
1733 drain_timeout_secs: 10,
1734 };
1735
1736 let runtime = BackgroundRuntime::start(config).await;
1737 let handle = runtime.handle();
1738
1739 let executed = Arc::new(AtomicU64::new(0));
1740
1741 for _ in 0..5 {
1742 let e = executed.clone();
1743 handle
1744 .spawn(move || {
1745 let ex = e.clone();
1746 async move {
1747 tokio::time::sleep(Duration::from_millis(10)).await;
1748 ex.fetch_add(1, Ordering::SeqCst);
1749 Ok(())
1750 }
1751 })
1752 .unwrap();
1753 }
1754
1755 let result = runtime.shutdown().await;
1756 assert!(result.is_ok());
1757
1758 assert_eq!(executed.load(Ordering::SeqCst), 5);
1759 }
1760
1761 #[tokio::test]
1762 async fn test_concurrent_queue_status_transitions() {
1763 let config = BackgroundTaskConfig {
1764 max_queue_size: 10,
1765 max_concurrent_tasks: 2,
1766 drain_timeout_secs: 10,
1767 };
1768
1769 let runtime = BackgroundRuntime::start(config).await;
1770 let handle = runtime.handle();
1771
1772 let state_transitions = Arc::new(tokio::sync::Mutex::new(Vec::new()));
1773
1774 for i in 0..10 {
1775 let st = state_transitions.clone();
1776 handle
1777 .spawn(move || {
1778 let s = st.clone();
1779 async move {
1780 let mut transitions = s.lock().await;
1781 transitions.push(("spawned", i));
1782 drop(transitions);
1783
1784 tokio::time::sleep(Duration::from_millis(50)).await;
1785
1786 let mut transitions = s.lock().await;
1787 transitions.push(("completed", i));
1788 Ok(())
1789 }
1790 })
1791 .unwrap();
1792 }
1793
1794 tokio::time::sleep(Duration::from_millis(300)).await;
1795
1796 runtime.shutdown().await.unwrap();
1797
1798 let final_transitions = state_transitions.lock().await;
1799 let completed_count = final_transitions
1800 .iter()
1801 .filter(|(action, _)| *action == "completed")
1802 .count();
1803
1804 assert_eq!(completed_count, 10, "all tasks should complete");
1805 }
1806
1807 #[tokio::test(flavor = "multi_thread")]
1808 async fn test_semaphore_no_starvation_with_uneven_task_duration() {
1809 let config = BackgroundTaskConfig {
1810 max_queue_size: 100,
1811 max_concurrent_tasks: 2,
1812 drain_timeout_secs: 10,
1813 };
1814
1815 let runtime = BackgroundRuntime::start(config).await;
1816 let handle = runtime.handle();
1817
1818 let fast_executed = Arc::new(AtomicU64::new(0));
1819 let slow_executed = Arc::new(AtomicU64::new(0));
1820
1821 for _ in 0..5 {
1822 let s = slow_executed.clone();
1823 handle
1824 .spawn(move || {
1825 let slow = s.clone();
1826 async move {
1827 tokio::time::sleep(Duration::from_millis(100)).await;
1828 slow.fetch_add(1, Ordering::SeqCst);
1829 Ok(())
1830 }
1831 })
1832 .unwrap();
1833 }
1834
1835 tokio::time::sleep(Duration::from_millis(10)).await;
1836
1837 for _ in 0..5 {
1838 let f = fast_executed.clone();
1839 handle
1840 .spawn(move || {
1841 let fast = f.clone();
1842 async move {
1843 tokio::time::sleep(Duration::from_millis(10)).await;
1844 fast.fetch_add(1, Ordering::SeqCst);
1845 Ok(())
1846 }
1847 })
1848 .unwrap();
1849 }
1850
1851 runtime.shutdown().await.unwrap();
1852
1853 assert_eq!(
1854 fast_executed.load(Ordering::SeqCst),
1855 5,
1856 "fast tasks should not be starved"
1857 );
1858 assert_eq!(slow_executed.load(Ordering::SeqCst), 5, "slow tasks should execute");
1859 }
1860}