Skip to main content

spikard_http/
background.rs

1use std::borrow::Cow;
2use std::sync::Arc;
3use std::time::Duration;
4
5use futures::FutureExt;
6use futures::future::BoxFuture;
7use tokio::sync::{Semaphore, mpsc};
8use tokio::task::JoinSet;
9use tokio::time::timeout;
10use tokio_util::sync::CancellationToken;
11
12/// Configuration for in-process background task execution.
13#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
14#[serde(default)]
15pub struct BackgroundTaskConfig {
16    pub max_queue_size: usize,
17    pub max_concurrent_tasks: usize,
18    pub drain_timeout_secs: u64,
19}
20
21impl Default for BackgroundTaskConfig {
22    fn default() -> Self {
23        Self {
24            max_queue_size: 1024,
25            max_concurrent_tasks: 128,
26            drain_timeout_secs: 30,
27        }
28    }
29}
30
31#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
32pub struct BackgroundJobMetadata {
33    pub name: Cow<'static, str>,
34    pub request_id: Option<String>,
35}
36
37impl Default for BackgroundJobMetadata {
38    fn default() -> Self {
39        Self {
40            name: Cow::Borrowed("background_task"),
41            request_id: None,
42        }
43    }
44}
45
46pub(crate) type BackgroundJobFuture = BoxFuture<'static, Result<(), BackgroundJobError>>;
47
48struct BackgroundJob {
49    pub future: BackgroundJobFuture,
50    pub metadata: BackgroundJobMetadata,
51}
52
53impl BackgroundJob {
54    fn new<F>(future: F, metadata: BackgroundJobMetadata) -> Self
55    where
56        F: futures::Future<Output = Result<(), BackgroundJobError>> + Send + 'static,
57    {
58        Self {
59            future: future.boxed(),
60            metadata,
61        }
62    }
63}
64
65#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
66pub struct BackgroundJobError {
67    pub message: String,
68}
69
70impl From<String> for BackgroundJobError {
71    fn from(message: String) -> Self {
72        Self { message }
73    }
74}
75
76impl From<&str> for BackgroundJobError {
77    fn from(message: &str) -> Self {
78        Self {
79            message: message.to_string(),
80        }
81    }
82}
83
84#[derive(Debug, Clone)]
85pub enum BackgroundSpawnError {
86    QueueFull,
87}
88
89impl std::fmt::Display for BackgroundSpawnError {
90    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91        match self {
92            BackgroundSpawnError::QueueFull => write!(f, "background task queue is full"),
93        }
94    }
95}
96
97impl std::error::Error for BackgroundSpawnError {}
98
99#[derive(Debug)]
100pub struct BackgroundShutdownError;
101
102#[derive(Default, Debug)]
103struct BackgroundMetrics {
104    queued: std::sync::atomic::AtomicU64,
105    running: std::sync::atomic::AtomicU64,
106    failed: std::sync::atomic::AtomicU64,
107}
108
109impl BackgroundMetrics {
110    fn inc_queued(&self) {
111        self.queued.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
112    }
113
114    fn dec_queued(&self) {
115        self.queued.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
116    }
117
118    fn inc_running(&self) {
119        self.running.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
120    }
121
122    fn dec_running(&self) {
123        self.running.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
124    }
125
126    fn inc_failed(&self) {
127        self.failed.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
128    }
129}
130
131#[derive(Clone, Debug)]
132pub struct BackgroundHandle {
133    sender: mpsc::Sender<BackgroundJob>,
134    metrics: Arc<BackgroundMetrics>,
135}
136
137impl BackgroundHandle {
138    pub fn spawn<F, Fut>(&self, f: F) -> Result<(), BackgroundSpawnError>
139    where
140        F: FnOnce() -> Fut,
141        Fut: futures::Future<Output = Result<(), BackgroundJobError>> + Send + 'static,
142    {
143        let future = f();
144        self.spawn_with_metadata(future, BackgroundJobMetadata::default())
145    }
146
147    pub fn spawn_with_metadata<Fut>(
148        &self,
149        future: Fut,
150        metadata: BackgroundJobMetadata,
151    ) -> Result<(), BackgroundSpawnError>
152    where
153        Fut: futures::Future<Output = Result<(), BackgroundJobError>> + Send + 'static,
154    {
155        self.metrics.inc_queued();
156        let job = BackgroundJob::new(future, metadata);
157        self.sender.try_send(job).map_err(|_| {
158            self.metrics.dec_queued();
159            BackgroundSpawnError::QueueFull
160        })
161    }
162}
163
164pub struct BackgroundRuntime {
165    handle: BackgroundHandle,
166    drain_timeout: Duration,
167    shutdown_token: CancellationToken,
168    join_handle: tokio::task::JoinHandle<()>,
169}
170
171impl BackgroundRuntime {
172    pub async fn start(config: BackgroundTaskConfig) -> Self {
173        let (tx, rx) = mpsc::channel(config.max_queue_size);
174        let metrics = Arc::new(BackgroundMetrics::default());
175        let handle = BackgroundHandle {
176            sender: tx.clone(),
177            metrics: metrics.clone(),
178        };
179        let shutdown_token = CancellationToken::new();
180        let semaphore = Arc::new(Semaphore::new(config.max_concurrent_tasks));
181        let driver_token = shutdown_token.clone();
182
183        let join_handle = tokio::spawn(run_executor(rx, semaphore, metrics.clone(), driver_token));
184
185        Self {
186            handle,
187            drain_timeout: Duration::from_secs(config.drain_timeout_secs),
188            shutdown_token,
189            join_handle,
190        }
191    }
192
193    pub fn handle(&self) -> BackgroundHandle {
194        self.handle.clone()
195    }
196
197    pub async fn shutdown(self) -> Result<(), BackgroundShutdownError> {
198        self.shutdown_token.cancel();
199        drop(self.handle);
200        match timeout(self.drain_timeout, self.join_handle).await {
201            Ok(Ok(_)) => Ok(()),
202            _ => Err(BackgroundShutdownError),
203        }
204    }
205}
206
207async fn run_executor(
208    mut rx: mpsc::Receiver<BackgroundJob>,
209    semaphore: Arc<Semaphore>,
210    metrics: Arc<BackgroundMetrics>,
211    token: CancellationToken,
212) {
213    let mut join_set = JoinSet::new();
214    let token_clone = token.clone();
215
216    loop {
217        tokio::select! {
218            maybe_job = rx.recv() => {
219                match maybe_job {
220                    Some(job) => {
221                        metrics.dec_queued();
222                        let semaphore = semaphore.clone();
223                        let metrics_clone = metrics.clone();
224                        join_set.spawn(async move {
225                            let BackgroundJob { future, metadata } = job;
226                            match semaphore.acquire_owned().await {
227                                Ok(_permit) => {
228                                    metrics_clone.inc_running();
229                                    if let Err(err) = future.await {
230                                        metrics_clone.inc_failed();
231                                        tracing::error!(target = "spikard::background", task = %metadata.name, error = %err.message, "background task failed");
232                                    }
233                                    metrics_clone.dec_running();
234                                }
235                                Err(_) => {
236                                    metrics_clone.inc_failed();
237                                    tracing::warn!(target = "spikard::background", "failed to acquire semaphore permit for background task");
238                                }
239                            }
240                        });
241                    }
242                    None => break,
243                }
244            }
245            _ = token_clone.cancelled() => {
246                break;
247            }
248        }
249    }
250
251    let mut drain_attempts = 0;
252    loop {
253        match rx.try_recv() {
254            Ok(job) => {
255                metrics.dec_queued();
256                let semaphore = semaphore.clone();
257                let metrics_clone = metrics.clone();
258                join_set.spawn(async move {
259                    let BackgroundJob { future, metadata } = job;
260                    match semaphore.acquire_owned().await {
261                        Ok(_permit) => {
262                            metrics_clone.inc_running();
263                            if let Err(err) = future.await {
264                                metrics_clone.inc_failed();
265                                tracing::error!(target = "spikard::background", task = %metadata.name, error = %err.message, "background task failed");
266                            }
267                            metrics_clone.dec_running();
268                        }
269                        Err(_) => {
270                            metrics_clone.inc_failed();
271                            tracing::warn!(target = "spikard::background", "failed to acquire semaphore permit for background task");
272                        }
273                    }
274                });
275                drain_attempts = 0;
276            }
277            Err(mpsc::error::TryRecvError::Empty) => {
278                drain_attempts += 1;
279                if drain_attempts > 100 {
280                    break;
281                }
282                tokio::time::sleep(Duration::from_millis(10)).await;
283            }
284            Err(mpsc::error::TryRecvError::Disconnected) => {
285                break;
286            }
287        }
288    }
289
290    while join_set.join_next().await.is_some() {}
291}
292
293#[cfg(test)]
294mod tests {
295    use super::*;
296    use std::sync::atomic::{AtomicU64, Ordering};
297
298    #[tokio::test]
299    async fn test_basic_spawn_and_execution() {
300        let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
301        let handle = runtime.handle();
302
303        let counter = Arc::new(AtomicU64::new(0));
304        let counter_clone = counter.clone();
305
306        handle
307            .spawn(move || {
308                let c = counter_clone.clone();
309                async move {
310                    c.fetch_add(1, Ordering::SeqCst);
311                    Ok(())
312                }
313            })
314            .expect("spawn failed");
315
316        tokio::time::sleep(Duration::from_millis(100)).await;
317        assert_eq!(counter.load(Ordering::SeqCst), 1);
318
319        runtime.shutdown().await.expect("shutdown failed");
320    }
321
322    #[tokio::test]
323    async fn test_multiple_tasks() {
324        let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
325        let handle = runtime.handle();
326
327        let counter = Arc::new(AtomicU64::new(0));
328
329        for _ in 0..10 {
330            let counter_clone = counter.clone();
331            handle
332                .spawn(move || {
333                    let c = counter_clone.clone();
334                    async move {
335                        c.fetch_add(1, Ordering::SeqCst);
336                        Ok(())
337                    }
338                })
339                .expect("spawn failed");
340        }
341
342        tokio::time::sleep(Duration::from_millis(200)).await;
343        assert_eq!(counter.load(Ordering::SeqCst), 10);
344
345        runtime.shutdown().await.expect("shutdown failed");
346    }
347
348    #[tokio::test]
349    async fn test_task_with_metadata() {
350        let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
351        let handle = runtime.handle();
352
353        let metadata = BackgroundJobMetadata {
354            name: Cow::Owned("test_task".to_string()),
355            request_id: Some("req-123".to_string()),
356        };
357
358        let counter = Arc::new(AtomicU64::new(0));
359        let counter_clone = counter.clone();
360
361        let future = async move {
362            counter_clone.fetch_add(1, Ordering::SeqCst);
363            Ok(())
364        };
365
366        handle.spawn_with_metadata(future, metadata).expect("spawn failed");
367
368        tokio::time::sleep(Duration::from_millis(100)).await;
369        assert_eq!(counter.load(Ordering::SeqCst), 1);
370
371        runtime.shutdown().await.expect("shutdown failed");
372    }
373
374    #[tokio::test]
375    async fn test_queue_full_error() {
376        let config = BackgroundTaskConfig {
377            max_queue_size: 2,
378            max_concurrent_tasks: 10,
379            drain_timeout_secs: 5,
380        };
381
382        let runtime = BackgroundRuntime::start(config).await;
383        let handle = runtime.handle();
384
385        let blocking_barrier = Arc::new(tokio::sync::Barrier::new(3));
386
387        for _ in 0..2 {
388            let barrier = blocking_barrier.clone();
389            handle
390                .spawn(move || {
391                    let b = barrier.clone();
392                    async move {
393                        b.wait().await;
394                        tokio::time::sleep(Duration::from_secs(1)).await;
395                        Ok(())
396                    }
397                })
398                .expect("spawn failed");
399        }
400
401        let result = handle.spawn(move || async { Ok(()) });
402        assert!(matches!(result, Err(BackgroundSpawnError::QueueFull)));
403
404        blocking_barrier.wait().await;
405        tokio::time::sleep(Duration::from_millis(100)).await;
406
407        runtime.shutdown().await.expect("shutdown failed");
408    }
409
410    #[tokio::test]
411    async fn test_task_failure_handling() {
412        let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
413        let handle = runtime.handle();
414
415        let success_count = Arc::new(AtomicU64::new(0));
416        let success_count_clone = success_count.clone();
417
418        handle
419            .spawn(move || {
420                let s = success_count_clone.clone();
421                async move {
422                    s.fetch_add(1, Ordering::SeqCst);
423                    Err(BackgroundJobError::from("test error"))
424                }
425            })
426            .expect("spawn failed");
427
428        tokio::time::sleep(Duration::from_millis(100)).await;
429        assert_eq!(success_count.load(Ordering::SeqCst), 1);
430
431        runtime.shutdown().await.expect("shutdown failed");
432    }
433
434    #[tokio::test(flavor = "multi_thread")]
435    async fn test_concurrency_limit_with_proper_synchronization() {
436        let config = BackgroundTaskConfig {
437            max_queue_size: 100,
438            max_concurrent_tasks: 2,
439            drain_timeout_secs: 30,
440        };
441
442        let runtime = BackgroundRuntime::start(config).await;
443        let handle = runtime.handle();
444
445        let running_count = Arc::new(AtomicU64::new(0));
446        let max_concurrent = Arc::new(AtomicU64::new(0));
447
448        for _ in 0..5 {
449            let running = running_count.clone();
450            let max = max_concurrent.clone();
451
452            handle
453                .spawn(move || {
454                    let r = running.clone();
455                    let m = max.clone();
456                    async move {
457                        r.fetch_add(1, Ordering::SeqCst);
458                        let current_running = r.load(Ordering::SeqCst);
459                        let mut current_max = m.load(Ordering::SeqCst);
460                        while current_running > current_max {
461                            m.store(current_running, Ordering::SeqCst);
462                            current_max = current_running;
463                        }
464
465                        tokio::time::sleep(Duration::from_millis(100)).await;
466                        r.fetch_sub(1, Ordering::SeqCst);
467                        Ok(())
468                    }
469                })
470                .expect("spawn failed");
471        }
472
473        tokio::time::sleep(Duration::from_millis(700)).await;
474        let max_concurrent_observed = max_concurrent.load(Ordering::SeqCst);
475        assert!(
476            max_concurrent_observed <= 2,
477            "Max concurrent should be <= 2, but was {}",
478            max_concurrent_observed
479        );
480
481        runtime.shutdown().await.expect("shutdown failed");
482    }
483
484    #[tokio::test]
485    async fn test_graceful_shutdown() {
486        let config = BackgroundTaskConfig {
487            max_queue_size: 10,
488            max_concurrent_tasks: 2,
489            drain_timeout_secs: 5,
490        };
491
492        let runtime = BackgroundRuntime::start(config).await;
493        let handle = runtime.handle();
494
495        let counter = Arc::new(AtomicU64::new(0));
496        let counter_clone = counter.clone();
497
498        handle
499            .spawn(move || {
500                let c = counter_clone.clone();
501                async move {
502                    tokio::time::sleep(Duration::from_millis(50)).await;
503                    c.fetch_add(1, Ordering::SeqCst);
504                    Ok(())
505                }
506            })
507            .expect("spawn failed");
508
509        tokio::time::sleep(Duration::from_millis(200)).await;
510
511        let result = runtime.shutdown().await;
512        assert!(result.is_ok());
513        assert_eq!(counter.load(Ordering::SeqCst), 1);
514    }
515
516    #[tokio::test]
517    async fn test_shutdown_timeout() {
518        let config = BackgroundTaskConfig {
519            max_queue_size: 10,
520            max_concurrent_tasks: 2,
521            drain_timeout_secs: 1,
522        };
523
524        let runtime = BackgroundRuntime::start(config).await;
525        let handle = runtime.handle();
526
527        handle
528            .spawn(|| async {
529                tokio::time::sleep(Duration::from_secs(5)).await;
530                Ok(())
531            })
532            .expect("spawn failed");
533
534        tokio::time::sleep(Duration::from_millis(100)).await;
535
536        let result = runtime.shutdown().await;
537        assert!(result.is_err());
538    }
539
540    #[tokio::test]
541    async fn test_metrics_tracking() {
542        let config = BackgroundTaskConfig::default();
543        let runtime = BackgroundRuntime::start(config).await;
544        let handle = runtime.handle();
545
546        let barrier = Arc::new(tokio::sync::Barrier::new(2));
547
548        for _ in 0..2 {
549            let b = barrier.clone();
550            let _ = handle.spawn(move || {
551                let barrier = b.clone();
552                async move {
553                    barrier.wait().await;
554                    Ok(())
555                }
556            });
557        }
558
559        tokio::time::sleep(Duration::from_millis(150)).await;
560
561        runtime.shutdown().await.expect("shutdown failed");
562    }
563
564    #[tokio::test]
565    async fn test_task_cancellation_on_shutdown() {
566        let config = BackgroundTaskConfig {
567            max_queue_size: 10,
568            max_concurrent_tasks: 2,
569            drain_timeout_secs: 1,
570        };
571
572        let runtime = BackgroundRuntime::start(config).await;
573        let handle = runtime.handle();
574
575        let started_count = Arc::new(AtomicU64::new(0));
576        let _completed_count = Arc::new(AtomicU64::new(0));
577
578        let started = started_count.clone();
579
580        handle
581            .spawn(move || {
582                let s = started.clone();
583                async move {
584                    s.fetch_add(1, Ordering::SeqCst);
585                    tokio::time::sleep(Duration::from_secs(10)).await;
586                    Ok(())
587                }
588            })
589            .expect("spawn failed");
590
591        tokio::time::sleep(Duration::from_millis(100)).await;
592        assert_eq!(started_count.load(Ordering::SeqCst), 1);
593
594        let shutdown_start = std::time::Instant::now();
595        let result = runtime.shutdown().await;
596        let shutdown_elapsed = shutdown_start.elapsed();
597
598        assert!(result.is_err());
599        assert!(shutdown_elapsed < Duration::from_secs(3));
600    }
601
602    #[tokio::test]
603    async fn test_queue_overflow_multiple_spawns() {
604        let config = BackgroundTaskConfig {
605            max_queue_size: 3,
606            max_concurrent_tasks: 10,
607            drain_timeout_secs: 5,
608        };
609
610        let runtime = BackgroundRuntime::start(config).await;
611        let handle = runtime.handle();
612
613        let blocking_barrier = Arc::new(tokio::sync::Barrier::new(4));
614
615        for _ in 0..3 {
616            let b = blocking_barrier.clone();
617            handle
618                .spawn(move || {
619                    let barrier = b.clone();
620                    async move {
621                        barrier.wait().await;
622                        tokio::time::sleep(Duration::from_millis(100)).await;
623                        Ok(())
624                    }
625                })
626                .expect("spawn failed");
627        }
628
629        let result = handle.spawn(|| async { Ok(()) });
630        assert!(matches!(result, Err(BackgroundSpawnError::QueueFull)));
631
632        blocking_barrier.wait().await;
633        tokio::time::sleep(Duration::from_millis(200)).await;
634
635        let result = handle.spawn(|| async { Ok(()) });
636        assert!(result.is_ok());
637
638        runtime.shutdown().await.expect("shutdown failed");
639    }
640
641    #[tokio::test]
642    async fn test_concurrent_task_execution_order() {
643        let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
644        let handle = runtime.handle();
645
646        let execution_order = Arc::new(tokio::sync::Mutex::new(Vec::new()));
647
648        for i in 0..5 {
649            let order = execution_order.clone();
650            handle
651                .spawn(move || {
652                    let o = order.clone();
653                    async move {
654                        o.lock().await.push(i);
655                        Ok(())
656                    }
657                })
658                .expect("spawn failed");
659        }
660
661        tokio::time::sleep(Duration::from_millis(200)).await;
662
663        let order = execution_order.lock().await;
664        assert_eq!(order.len(), 5);
665        for i in 0..5 {
666            assert!(order.contains(&i));
667        }
668
669        runtime.shutdown().await.expect("shutdown failed");
670    }
671
672    #[tokio::test]
673    async fn test_error_from_string_conversion() {
674        let error = BackgroundJobError::from("test message");
675        assert_eq!(error.message, "test message");
676
677        let error2 = BackgroundJobError::from("test".to_string());
678        assert_eq!(error2.message, "test");
679    }
680
681    #[tokio::test]
682    async fn test_background_job_metadata_default() {
683        let metadata = BackgroundJobMetadata::default();
684        assert_eq!(metadata.name, "background_task");
685        assert_eq!(metadata.request_id, None);
686    }
687
688    #[tokio::test]
689    async fn test_background_job_metadata_custom() {
690        let metadata = BackgroundJobMetadata {
691            name: Cow::Borrowed("custom_task"),
692            request_id: Some("req-456".to_string()),
693        };
694        assert_eq!(metadata.name, "custom_task");
695        assert_eq!(metadata.request_id, Some("req-456".to_string()));
696    }
697
698    #[tokio::test]
699    async fn test_metrics_inc_dec_operations() {
700        let metrics = BackgroundMetrics::default();
701
702        metrics.inc_queued();
703        assert_eq!(metrics.queued.load(Ordering::Relaxed), 1);
704
705        metrics.inc_queued();
706        assert_eq!(metrics.queued.load(Ordering::Relaxed), 2);
707
708        metrics.dec_queued();
709        assert_eq!(metrics.queued.load(Ordering::Relaxed), 1);
710
711        metrics.inc_running();
712        assert_eq!(metrics.running.load(Ordering::Relaxed), 1);
713
714        metrics.dec_running();
715        assert_eq!(metrics.running.load(Ordering::Relaxed), 0);
716
717        metrics.inc_failed();
718        assert_eq!(metrics.failed.load(Ordering::Relaxed), 1);
719
720        metrics.inc_failed();
721        assert_eq!(metrics.failed.load(Ordering::Relaxed), 2);
722    }
723
724    #[tokio::test]
725    async fn test_spawn_error_display() {
726        let error = BackgroundSpawnError::QueueFull;
727        assert_eq!(error.to_string(), "background task queue is full");
728    }
729
730    #[tokio::test]
731    async fn test_background_config_default() {
732        let config = BackgroundTaskConfig::default();
733        assert_eq!(config.max_queue_size, 1024);
734        assert_eq!(config.max_concurrent_tasks, 128);
735        assert_eq!(config.drain_timeout_secs, 30);
736    }
737
738    #[tokio::test]
739    async fn test_shutdown_with_zero_pending_tasks() {
740        let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
741
742        let result = runtime.shutdown().await;
743        assert!(result.is_ok(), "shutdown should succeed with no tasks");
744    }
745
746    #[tokio::test]
747    async fn test_shutdown_with_only_running_tasks() {
748        let config = BackgroundTaskConfig {
749            max_queue_size: 10,
750            max_concurrent_tasks: 2,
751            drain_timeout_secs: 5,
752        };
753        let runtime = BackgroundRuntime::start(config).await;
754        let handle = runtime.handle();
755
756        let execution_started: Arc<std::sync::atomic::AtomicBool> = Arc::new(std::sync::atomic::AtomicBool::new(false));
757        let execution_completed: Arc<std::sync::atomic::AtomicBool> =
758            Arc::new(std::sync::atomic::AtomicBool::new(false));
759
760        let started = execution_started.clone();
761        let completed = execution_completed.clone();
762
763        handle
764            .spawn(move || {
765                let s = started.clone();
766                let c = completed.clone();
767                async move {
768                    s.store(true, std::sync::atomic::Ordering::SeqCst);
769                    tokio::time::sleep(Duration::from_millis(100)).await;
770                    c.store(true, std::sync::atomic::Ordering::SeqCst);
771                    Ok(())
772                }
773            })
774            .unwrap();
775
776        tokio::time::sleep(Duration::from_millis(20)).await;
777
778        let result = runtime.shutdown().await;
779        assert!(result.is_ok(), "shutdown should succeed and wait for running tasks");
780        assert!(
781            execution_completed.load(std::sync::atomic::Ordering::SeqCst),
782            "task should have completed"
783        );
784    }
785
786    #[tokio::test]
787    async fn test_shutdown_drains_queued_tasks() {
788        let config = BackgroundTaskConfig {
789            max_queue_size: 100,
790            max_concurrent_tasks: 1,
791            drain_timeout_secs: 5,
792        };
793        let runtime = BackgroundRuntime::start(config).await;
794        let handle = runtime.handle();
795
796        let execution_count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
797
798        for _ in 0..10 {
799            let count = execution_count.clone();
800            handle
801                .spawn(move || {
802                    let c = count.clone();
803                    async move {
804                        c.fetch_add(1, Ordering::SeqCst);
805                        tokio::time::sleep(Duration::from_millis(10)).await;
806                        Ok(())
807                    }
808                })
809                .unwrap();
810        }
811
812        let result = runtime.shutdown().await;
813        assert!(result.is_ok());
814        assert_eq!(
815            execution_count.load(Ordering::SeqCst),
816            10,
817            "all queued tasks should execute"
818        );
819    }
820
821    #[tokio::test]
822    async fn test_shutdown_timeout_force_stops_long_tasks() {
823        let config = BackgroundTaskConfig {
824            max_queue_size: 10,
825            max_concurrent_tasks: 2,
826            drain_timeout_secs: 1,
827        };
828        let runtime = BackgroundRuntime::start(config).await;
829        let handle = runtime.handle();
830
831        let completed: Arc<std::sync::atomic::AtomicBool> = Arc::new(std::sync::atomic::AtomicBool::new(false));
832        let completed_clone = completed.clone();
833
834        handle
835            .spawn(move || {
836                let c = completed_clone.clone();
837                async move {
838                    tokio::time::sleep(Duration::from_secs(10)).await;
839                    c.store(true, std::sync::atomic::Ordering::SeqCst);
840                    Ok(())
841                }
842            })
843            .unwrap();
844
845        tokio::time::sleep(Duration::from_millis(50)).await;
846
847        let shutdown_start = std::time::Instant::now();
848        let result = runtime.shutdown().await;
849        let elapsed = shutdown_start.elapsed();
850
851        assert!(result.is_err(), "shutdown should timeout");
852        assert!(
853            elapsed < Duration::from_secs(3),
854            "shutdown should timeout near drain_timeout"
855        );
856        assert!(
857            !completed.load(std::sync::atomic::Ordering::SeqCst),
858            "long-running task should not complete"
859        );
860    }
861
862    #[tokio::test]
863    async fn test_multiple_shutdown_calls_idempotent() {
864        let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
865
866        let result1 = runtime.shutdown().await;
867        assert!(result1.is_ok(), "first shutdown should succeed");
868    }
869
870    #[tokio::test]
871    async fn test_spawn_after_all_senders_dropped_fails() {
872        let config = BackgroundTaskConfig::default();
873        let runtime = BackgroundRuntime::start(config).await;
874        let handle = runtime.handle();
875
876        runtime.shutdown().await.expect("shutdown should succeed");
877
878        tokio::time::sleep(Duration::from_millis(50)).await;
879
880        let result = handle.spawn(|| async { Ok(()) });
881        assert!(result.is_err(), "spawn should fail after all senders are dropped");
882    }
883
884    #[tokio::test]
885    async fn test_concurrent_spawns_hit_semaphore_limit() {
886        let config = BackgroundTaskConfig {
887            max_queue_size: 100,
888            max_concurrent_tasks: 3,
889            drain_timeout_secs: 10,
890        };
891        let runtime = BackgroundRuntime::start(config).await;
892        let handle = runtime.handle();
893
894        let barrier: Arc<tokio::sync::Barrier> = Arc::new(tokio::sync::Barrier::new(3));
895        let running_count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
896        let peak_concurrent: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
897
898        for _ in 0..5 {
899            let b = barrier.clone();
900            let running = running_count.clone();
901            let peak = peak_concurrent.clone();
902
903            handle
904                .spawn(move || {
905                    let barrier = b.clone();
906                    let r = running.clone();
907                    let p = peak.clone();
908                    async move {
909                        let current = r.fetch_add(1, Ordering::SeqCst) + 1;
910                        let mut peak_val = p.load(Ordering::SeqCst);
911                        while current > peak_val {
912                            if p.compare_exchange(peak_val, current, Ordering::SeqCst, Ordering::SeqCst)
913                                .is_ok()
914                            {
915                                break;
916                            }
917                            peak_val = p.load(Ordering::SeqCst);
918                        }
919
920                        barrier.wait().await;
921                        tokio::time::sleep(Duration::from_millis(200)).await;
922                        r.fetch_sub(1, Ordering::SeqCst);
923                        Ok(())
924                    }
925                })
926                .unwrap();
927        }
928
929        barrier.wait().await;
930        tokio::time::sleep(Duration::from_millis(100)).await;
931
932        let peak = peak_concurrent.load(Ordering::SeqCst);
933        assert!(
934            peak <= 3,
935            "concurrent execution should not exceed semaphore limit of 3, got {}",
936            peak
937        );
938
939        runtime.shutdown().await.unwrap();
940    }
941
942    #[tokio::test]
943    async fn test_task_panic_cleanup_still_occurs() {
944        let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
945        let handle = runtime.handle();
946
947        let mut spawned_count: u32 = 0;
948        let panic_task_executed: Arc<std::sync::atomic::AtomicBool> =
949            Arc::new(std::sync::atomic::AtomicBool::new(false));
950        let after_panic_executed: Arc<std::sync::atomic::AtomicBool> =
951            Arc::new(std::sync::atomic::AtomicBool::new(false));
952
953        let panic_flag = panic_task_executed.clone();
954        handle
955            .spawn(move || {
956                let p = panic_flag.clone();
957                async move {
958                    p.store(true, std::sync::atomic::Ordering::SeqCst);
959                    Err(BackgroundJobError::from("simulated task failure"))
960                }
961            })
962            .unwrap();
963        spawned_count += 1;
964
965        let after_flag = after_panic_executed.clone();
966        handle
967            .spawn(move || {
968                let a = after_flag.clone();
969                async move {
970                    tokio::time::sleep(Duration::from_millis(50)).await;
971                    a.store(true, std::sync::atomic::Ordering::SeqCst);
972                    Ok(())
973                }
974            })
975            .unwrap();
976        spawned_count += 1;
977
978        tokio::time::sleep(Duration::from_millis(200)).await;
979
980        assert!(panic_task_executed.load(std::sync::atomic::Ordering::SeqCst));
981        assert!(after_panic_executed.load(std::sync::atomic::Ordering::SeqCst));
982        assert_eq!(spawned_count, 2);
983
984        runtime.shutdown().await.unwrap();
985    }
986
987    #[tokio::test]
988    async fn test_queue_overflow_with_immediate_rejection() {
989        let config = BackgroundTaskConfig {
990            max_queue_size: 2,
991            max_concurrent_tasks: 100,
992            drain_timeout_secs: 5,
993        };
994        let runtime = BackgroundRuntime::start(config).await;
995        let handle = runtime.handle();
996
997        let barrier: Arc<tokio::sync::Barrier> = Arc::new(tokio::sync::Barrier::new(3));
998
999        for _ in 0..2 {
1000            let b = barrier.clone();
1001            handle
1002                .spawn(move || {
1003                    let barrier = b.clone();
1004                    async move {
1005                        barrier.wait().await;
1006                        tokio::time::sleep(Duration::from_millis(500)).await;
1007                        Ok(())
1008                    }
1009                })
1010                .unwrap();
1011        }
1012
1013        let overflow_result = handle.spawn(|| async { Ok(()) });
1014        assert!(matches!(overflow_result, Err(BackgroundSpawnError::QueueFull)));
1015
1016        barrier.wait().await;
1017        runtime.shutdown().await.unwrap();
1018    }
1019
1020    #[tokio::test]
1021    async fn test_metrics_accuracy_under_concurrent_load() {
1022        let config = BackgroundTaskConfig {
1023            max_queue_size: 50,
1024            max_concurrent_tasks: 5,
1025            drain_timeout_secs: 10,
1026        };
1027        let runtime = BackgroundRuntime::start(config).await;
1028        let handle = runtime.handle();
1029
1030        let completed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1031
1032        for _ in 0..20 {
1033            let c = completed.clone();
1034            handle
1035                .spawn(move || {
1036                    let count = c.clone();
1037                    async move {
1038                        tokio::time::sleep(Duration::from_millis(50)).await;
1039                        count.fetch_add(1, Ordering::SeqCst);
1040                        Ok(())
1041                    }
1042                })
1043                .unwrap();
1044        }
1045
1046        runtime.shutdown().await.unwrap();
1047        assert_eq!(completed.load(Ordering::SeqCst), 20, "all tasks should complete");
1048    }
1049
1050    #[tokio::test]
1051    async fn test_drain_with_slowly_completing_tasks() {
1052        let config = BackgroundTaskConfig {
1053            max_queue_size: 50,
1054            max_concurrent_tasks: 2,
1055            drain_timeout_secs: 10,
1056        };
1057        let runtime = BackgroundRuntime::start(config).await;
1058        let handle = runtime.handle();
1059
1060        let completed_count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1061
1062        for i in 0..5 {
1063            let count = completed_count.clone();
1064            handle
1065                .spawn(move || {
1066                    let c = count.clone();
1067                    async move {
1068                        let sleep_ms = 100 + (i as u64 * 50);
1069                        tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
1070                        c.fetch_add(1, Ordering::SeqCst);
1071                        Ok(())
1072                    }
1073                })
1074                .unwrap();
1075        }
1076
1077        let result = runtime.shutdown().await;
1078        assert!(result.is_ok());
1079        assert_eq!(completed_count.load(Ordering::SeqCst), 5);
1080    }
1081
1082    #[tokio::test]
1083    async fn test_semaphore_starvation_doesnt_deadlock() {
1084        let config = BackgroundTaskConfig {
1085            max_queue_size: 100,
1086            max_concurrent_tasks: 1,
1087            drain_timeout_secs: 10,
1088        };
1089        let runtime = BackgroundRuntime::start(config).await;
1090        let handle = runtime.handle();
1091
1092        let completion_order: Arc<tokio::sync::Mutex<Vec<u32>>> = Arc::new(tokio::sync::Mutex::new(Vec::new()));
1093
1094        for i in 0..10 {
1095            let order = completion_order.clone();
1096            handle
1097                .spawn(move || {
1098                    let o = order.clone();
1099                    async move {
1100                        tokio::time::sleep(Duration::from_millis(5)).await;
1101                        let mut guard = o.lock().await;
1102                        guard.push(i);
1103                        Ok(())
1104                    }
1105                })
1106                .unwrap();
1107        }
1108
1109        let result = runtime.shutdown().await;
1110        assert!(result.is_ok());
1111
1112        let order = completion_order.lock().await;
1113        assert_eq!(order.len(), 10);
1114    }
1115
1116    #[tokio::test]
1117    async fn test_cancel_task_mid_execution() {
1118        let config = BackgroundTaskConfig {
1119            max_queue_size: 10,
1120            max_concurrent_tasks: 2,
1121            drain_timeout_secs: 1,
1122        };
1123        let runtime = BackgroundRuntime::start(config).await;
1124        let handle = runtime.handle();
1125
1126        let started: Arc<std::sync::atomic::AtomicBool> = Arc::new(std::sync::atomic::AtomicBool::new(false));
1127        let ended: Arc<std::sync::atomic::AtomicBool> = Arc::new(std::sync::atomic::AtomicBool::new(false));
1128
1129        let start_flag = started.clone();
1130        let end_flag = ended.clone();
1131
1132        handle
1133            .spawn(move || {
1134                let s = start_flag.clone();
1135                let e = end_flag.clone();
1136                async move {
1137                    s.store(true, std::sync::atomic::Ordering::SeqCst);
1138                    tokio::time::sleep(Duration::from_secs(10)).await;
1139                    e.store(true, std::sync::atomic::Ordering::SeqCst);
1140                    Ok(())
1141                }
1142            })
1143            .unwrap();
1144
1145        tokio::time::sleep(Duration::from_millis(50)).await;
1146        assert!(started.load(std::sync::atomic::Ordering::SeqCst));
1147
1148        let result = runtime.shutdown().await;
1149        assert!(result.is_err(), "shutdown should timeout due to long task");
1150        assert!(
1151            !ended.load(std::sync::atomic::Ordering::SeqCst),
1152            "task should not complete"
1153        );
1154    }
1155
1156    #[tokio::test]
1157    async fn test_rapid_spawn_and_shutdown() {
1158        let config = BackgroundTaskConfig {
1159            max_queue_size: 1000,
1160            max_concurrent_tasks: 10,
1161            drain_timeout_secs: 5,
1162        };
1163        let runtime = BackgroundRuntime::start(config).await;
1164        let handle = runtime.handle();
1165
1166        let count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1167
1168        for _ in 0..100 {
1169            let c = count.clone();
1170            let _ = handle.spawn(move || {
1171                let counter = c.clone();
1172                async move {
1173                    counter.fetch_add(1, Ordering::SeqCst);
1174                    Ok(())
1175                }
1176            });
1177        }
1178
1179        let result = runtime.shutdown().await;
1180        assert!(result.is_ok());
1181
1182        let final_count = count.load(Ordering::SeqCst);
1183        assert!(final_count > 0, "at least some tasks should execute");
1184        assert!(final_count <= 100, "no more than spawned count should execute");
1185    }
1186
1187    #[tokio::test]
1188    async fn test_shutdown_with_mixed_success_and_failure_tasks() {
1189        let config = BackgroundTaskConfig::default();
1190        let runtime = BackgroundRuntime::start(config).await;
1191        let handle = runtime.handle();
1192
1193        let success_count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1194        let failure_count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1195
1196        for i in 0..10 {
1197            if i % 2 == 0 {
1198                let s = success_count.clone();
1199                handle
1200                    .spawn(move || {
1201                        let counter = s.clone();
1202                        async move {
1203                            counter.fetch_add(1, Ordering::SeqCst);
1204                            Ok(())
1205                        }
1206                    })
1207                    .unwrap();
1208            } else {
1209                let f = failure_count.clone();
1210                handle
1211                    .spawn(move || {
1212                        let counter = f.clone();
1213                        async move {
1214                            counter.fetch_add(1, Ordering::SeqCst);
1215                            Err(BackgroundJobError::from("intentional failure"))
1216                        }
1217                    })
1218                    .unwrap();
1219            }
1220        }
1221
1222        tokio::time::sleep(Duration::from_millis(200)).await;
1223
1224        let result = runtime.shutdown().await;
1225        assert!(result.is_ok());
1226        assert_eq!(success_count.load(Ordering::SeqCst), 5);
1227        assert_eq!(failure_count.load(Ordering::SeqCst), 5);
1228    }
1229
1230    #[tokio::test]
1231    async fn test_concurrent_handle_clones_spawn_independently() {
1232        let config = BackgroundTaskConfig::default();
1233        let runtime = BackgroundRuntime::start(config).await;
1234        let handle = runtime.handle();
1235
1236        let count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1237
1238        let mut join_handles = vec![];
1239
1240        for _ in 0..3 {
1241            let h = handle.clone();
1242            let c = count.clone();
1243
1244            let jh = tokio::spawn(async move {
1245                for _ in 0..5 {
1246                    let counter = c.clone();
1247                    let _ = h.spawn(move || {
1248                        let cnt = counter.clone();
1249                        async move {
1250                            cnt.fetch_add(1, Ordering::SeqCst);
1251                            Ok(())
1252                        }
1253                    });
1254                }
1255            });
1256            join_handles.push(jh);
1257        }
1258
1259        for jh in join_handles {
1260            let _ = jh.await;
1261        }
1262
1263        tokio::time::sleep(Duration::from_millis(200)).await;
1264
1265        let result = runtime.shutdown().await;
1266        assert!(result.is_ok());
1267        assert_eq!(count.load(Ordering::SeqCst), 15);
1268    }
1269
1270    #[tokio::test]
1271    async fn test_queue_full_metrics_updated() {
1272        let config = BackgroundTaskConfig {
1273            max_queue_size: 2,
1274            max_concurrent_tasks: 100,
1275            drain_timeout_secs: 5,
1276        };
1277        let runtime = BackgroundRuntime::start(config).await;
1278        let handle = runtime.handle();
1279
1280        let barrier: Arc<tokio::sync::Barrier> = Arc::new(tokio::sync::Barrier::new(3));
1281
1282        for _ in 0..2 {
1283            let b = barrier.clone();
1284            handle
1285                .spawn(move || {
1286                    let barrier = b.clone();
1287                    async move {
1288                        barrier.wait().await;
1289                        tokio::time::sleep(Duration::from_secs(1)).await;
1290                        Ok(())
1291                    }
1292                })
1293                .unwrap();
1294        }
1295
1296        let result = handle.spawn(|| async { Ok(()) });
1297        assert!(matches!(result, Err(BackgroundSpawnError::QueueFull)));
1298
1299        barrier.wait().await;
1300        tokio::time::sleep(Duration::from_millis(100)).await;
1301
1302        runtime.shutdown().await.unwrap();
1303    }
1304
1305    #[tokio::test]
1306    async fn test_handle_persistence_across_spawns() {
1307        let config = BackgroundTaskConfig::default();
1308        let runtime = BackgroundRuntime::start(config).await;
1309        let handle = runtime.handle();
1310
1311        let count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1312
1313        for _ in 0..5 {
1314            let c = count.clone();
1315            handle
1316                .spawn(move || {
1317                    let counter = c.clone();
1318                    async move {
1319                        counter.fetch_add(1, Ordering::SeqCst);
1320                        Ok(())
1321                    }
1322                })
1323                .unwrap();
1324        }
1325
1326        tokio::time::sleep(Duration::from_millis(150)).await;
1327        assert_eq!(count.load(Ordering::SeqCst), 5);
1328
1329        runtime.shutdown().await.unwrap();
1330    }
1331
1332    #[tokio::test]
1333    async fn test_shutdown_with_queue_at_capacity() {
1334        let config = BackgroundTaskConfig {
1335            max_queue_size: 5,
1336            max_concurrent_tasks: 1,
1337            drain_timeout_secs: 10,
1338        };
1339        let runtime = BackgroundRuntime::start(config).await;
1340        let handle = runtime.handle();
1341
1342        let completion_count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1343
1344        for _ in 0..5 {
1345            let c = completion_count.clone();
1346            handle
1347                .spawn(move || {
1348                    let counter = c.clone();
1349                    async move {
1350                        tokio::time::sleep(Duration::from_millis(20)).await;
1351                        counter.fetch_add(1, Ordering::SeqCst);
1352                        Ok(())
1353                    }
1354                })
1355                .unwrap();
1356        }
1357
1358        let result = runtime.shutdown().await;
1359        assert!(result.is_ok());
1360        assert_eq!(completion_count.load(Ordering::SeqCst), 5);
1361    }
1362
1363    #[tokio::test]
1364    async fn test_metadata_preserved_through_execution() {
1365        let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
1366        let handle = runtime.handle();
1367
1368        let metadata = BackgroundJobMetadata {
1369            name: Cow::Owned("test_metadata_task".to_string()),
1370            request_id: Some("req-metadata-123".to_string()),
1371        };
1372
1373        let executed: Arc<std::sync::atomic::AtomicBool> = Arc::new(std::sync::atomic::AtomicBool::new(false));
1374        let executed_clone = executed.clone();
1375
1376        let future = async move {
1377            executed_clone.store(true, std::sync::atomic::Ordering::SeqCst);
1378            Ok(())
1379        };
1380
1381        handle.spawn_with_metadata(future, metadata).unwrap();
1382
1383        tokio::time::sleep(Duration::from_millis(100)).await;
1384
1385        assert!(executed.load(std::sync::atomic::Ordering::SeqCst));
1386        runtime.shutdown().await.unwrap();
1387    }
1388
1389    #[tokio::test]
1390    async fn test_very_short_drain_timeout_forces_stop() {
1391        let config = BackgroundTaskConfig {
1392            max_queue_size: 10,
1393            max_concurrent_tasks: 2,
1394            drain_timeout_secs: 0,
1395        };
1396        let runtime = BackgroundRuntime::start(config).await;
1397        let handle = runtime.handle();
1398
1399        handle
1400            .spawn(|| async {
1401                tokio::time::sleep(Duration::from_secs(1)).await;
1402                Ok(())
1403            })
1404            .unwrap();
1405
1406        tokio::time::sleep(Duration::from_millis(10)).await;
1407
1408        let result = runtime.shutdown().await;
1409        assert!(result.is_err());
1410    }
1411
1412    #[tokio::test]
1413    async fn test_spawn_many_tasks_sequential_drain() {
1414        let config = BackgroundTaskConfig {
1415            max_queue_size: 200,
1416            max_concurrent_tasks: 2,
1417            drain_timeout_secs: 15,
1418        };
1419        let runtime = BackgroundRuntime::start(config).await;
1420        let handle = runtime.handle();
1421
1422        let count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1423
1424        for _ in 0..50 {
1425            let c = count.clone();
1426            handle
1427                .spawn(move || {
1428                    let counter = c.clone();
1429                    async move {
1430                        tokio::time::sleep(Duration::from_millis(1)).await;
1431                        counter.fetch_add(1, Ordering::SeqCst);
1432                        Ok(())
1433                    }
1434                })
1435                .unwrap();
1436        }
1437
1438        let result = runtime.shutdown().await;
1439        assert!(result.is_ok());
1440        assert_eq!(count.load(Ordering::SeqCst), 50);
1441    }
1442
1443    #[tokio::test]
1444    async fn test_no_deadlock_with_max_concurrency_barrier() {
1445        let config = BackgroundTaskConfig {
1446            max_queue_size: 100,
1447            max_concurrent_tasks: 3,
1448            drain_timeout_secs: 10,
1449        };
1450        let runtime = BackgroundRuntime::start(config).await;
1451        let handle = runtime.handle();
1452
1453        let barrier: Arc<tokio::sync::Barrier> = Arc::new(tokio::sync::Barrier::new(4));
1454
1455        for _ in 0..3 {
1456            let b = barrier.clone();
1457            handle
1458                .spawn(move || {
1459                    let barrier = b.clone();
1460                    async move {
1461                        barrier.wait().await;
1462                        tokio::time::sleep(Duration::from_millis(50)).await;
1463                        Ok(())
1464                    }
1465                })
1466                .unwrap();
1467        }
1468
1469        barrier.wait().await;
1470        tokio::time::sleep(Duration::from_millis(100)).await;
1471
1472        let result = runtime.shutdown().await;
1473        assert!(result.is_ok());
1474    }
1475
1476    #[tokio::test]
1477    async fn test_error_from_owned_string() {
1478        let message = String::from("error message");
1479        let error = BackgroundJobError::from(message);
1480        assert_eq!(error.message, "error message");
1481    }
1482
1483    #[tokio::test]
1484    async fn test_borrowed_str_conversion() {
1485        let error = BackgroundJobError::from("borrowed message");
1486        assert_eq!(error.message, "borrowed message");
1487    }
1488
1489    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1490    async fn test_phase1_semaphore_acquisition_with_concurrent_load() {
1491        let config = BackgroundTaskConfig {
1492            max_queue_size: 50,
1493            max_concurrent_tasks: 1,
1494            drain_timeout_secs: 10,
1495        };
1496
1497        let runtime = BackgroundRuntime::start(config).await;
1498        let handle = runtime.handle();
1499
1500        let execution_count = Arc::new(AtomicU64::new(0));
1501
1502        let blocking_barrier = Arc::new(tokio::sync::Barrier::new(2));
1503        let barrier_clone = blocking_barrier.clone();
1504
1505        handle
1506            .spawn(move || {
1507                let b = barrier_clone.clone();
1508                async move {
1509                    b.wait().await;
1510                    tokio::time::sleep(Duration::from_millis(100)).await;
1511                    Ok(())
1512                }
1513            })
1514            .unwrap();
1515
1516        tokio::time::sleep(Duration::from_millis(50)).await;
1517
1518        for _ in 0..3 {
1519            let exec_clone = execution_count.clone();
1520            handle
1521                .spawn(move || {
1522                    let e = exec_clone.clone();
1523                    async move {
1524                        e.fetch_add(1, Ordering::SeqCst);
1525                        Ok(())
1526                    }
1527                })
1528                .unwrap();
1529        }
1530
1531        blocking_barrier.wait().await;
1532        tokio::time::sleep(Duration::from_millis(250)).await;
1533
1534        assert_eq!(execution_count.load(Ordering::SeqCst), 3);
1535
1536        runtime.shutdown().await.unwrap();
1537    }
1538
1539    #[tokio::test(flavor = "multi_thread")]
1540    async fn test_concurrent_task_completion_race_conditions() {
1541        let config = BackgroundTaskConfig {
1542            max_queue_size: 100,
1543            max_concurrent_tasks: 8,
1544            drain_timeout_secs: 10,
1545        };
1546
1547        let runtime = BackgroundRuntime::start(config).await;
1548        let handle = runtime.handle();
1549
1550        let completion_counter = Arc::new(AtomicU64::new(0));
1551        let task_count = 50;
1552
1553        for i in 0..task_count {
1554            let counter = completion_counter.clone();
1555            handle
1556                .spawn(move || {
1557                    let c = counter.clone();
1558                    async move {
1559                        let sleep_ms = (i as u64 * 11) % 100;
1560                        tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
1561                        c.fetch_add(1, Ordering::SeqCst);
1562                        Ok(())
1563                    }
1564                })
1565                .unwrap();
1566        }
1567
1568        runtime.shutdown().await.unwrap();
1569        assert_eq!(
1570            completion_counter.load(Ordering::SeqCst),
1571            task_count,
1572            "all tasks should complete despite race conditions"
1573        );
1574    }
1575
1576    #[tokio::test(flavor = "multi_thread")]
1577    async fn test_failure_metric_tracking_under_concurrent_errors() {
1578        let config = BackgroundTaskConfig {
1579            max_queue_size: 50,
1580            max_concurrent_tasks: 5,
1581            drain_timeout_secs: 10,
1582        };
1583
1584        let runtime = BackgroundRuntime::start(config).await;
1585        let handle = runtime.handle();
1586
1587        let success_count = Arc::new(AtomicU64::new(0));
1588        let failure_count = Arc::new(AtomicU64::new(0));
1589
1590        for i in 0..20 {
1591            if i % 3 == 0 {
1592                let fail_clone = failure_count.clone();
1593                handle
1594                    .spawn(move || {
1595                        let f = fail_clone.clone();
1596                        async move {
1597                            f.fetch_add(1, Ordering::SeqCst);
1598                            Err(BackgroundJobError::from("intentional failure"))
1599                        }
1600                    })
1601                    .unwrap();
1602            } else {
1603                let succ_clone = success_count.clone();
1604                handle
1605                    .spawn(move || {
1606                        let s = succ_clone.clone();
1607                        async move {
1608                            s.fetch_add(1, Ordering::SeqCst);
1609                            Ok(())
1610                        }
1611                    })
1612                    .unwrap();
1613            }
1614        }
1615
1616        runtime.shutdown().await.unwrap();
1617
1618        let final_success = success_count.load(Ordering::SeqCst);
1619        let final_failure = failure_count.load(Ordering::SeqCst);
1620
1621        assert_eq!(final_success + final_failure, 20);
1622        assert_eq!(final_failure, 7);
1623        assert_eq!(final_success, 13);
1624    }
1625
1626    #[tokio::test(flavor = "multi_thread")]
1627    async fn test_handle_clone_isolation_concurrent_spawns() {
1628        let config = BackgroundTaskConfig {
1629            max_queue_size: 100,
1630            max_concurrent_tasks: 4,
1631            drain_timeout_secs: 10,
1632        };
1633
1634        let runtime = BackgroundRuntime::start(config).await;
1635        let handle = runtime.handle();
1636
1637        let completion_counters: Vec<Arc<AtomicU64>> = (0..5).map(|_| Arc::new(AtomicU64::new(0))).collect();
1638
1639        let mut join_handles = vec![];
1640
1641        for worker_id in 0..5 {
1642            let h = handle.clone();
1643            let counter = completion_counters[worker_id].clone();
1644
1645            let jh = tokio::spawn(async move {
1646                for task_id in 0..10 {
1647                    let c = counter.clone();
1648                    let _ = h.spawn(move || {
1649                        let cnt = c.clone();
1650                        async move {
1651                            let delay = (worker_id as u64 * 10 + task_id as u64) % 50;
1652                            tokio::time::sleep(Duration::from_millis(delay)).await;
1653                            cnt.fetch_add(1, Ordering::SeqCst);
1654                            Ok(())
1655                        }
1656                    });
1657                }
1658            });
1659            join_handles.push(jh);
1660        }
1661
1662        for jh in join_handles {
1663            let _ = jh.await;
1664        }
1665
1666        runtime.shutdown().await.unwrap();
1667
1668        for (i, counter) in completion_counters.iter().enumerate() {
1669            assert_eq!(
1670                counter.load(Ordering::SeqCst),
1671                10,
1672                "worker {} should have exactly 10 task completions",
1673                i
1674            );
1675        }
1676    }
1677
1678    #[tokio::test]
1679    async fn test_background_job_error_with_string_slice() {
1680        let errors = vec![
1681            BackgroundJobError::from("simple error"),
1682            BackgroundJobError::from(String::from("owned string error")),
1683        ];
1684
1685        for error in errors {
1686            assert!(!error.message.is_empty());
1687        }
1688    }
1689
1690    #[tokio::test]
1691    async fn test_spawn_error_display_formatting() {
1692        let error = BackgroundSpawnError::QueueFull;
1693        let formatted = format!("{}", error);
1694        assert_eq!(formatted, "background task queue is full");
1695
1696        let result: Result<(), BackgroundSpawnError> = Err(error);
1697        assert!(result.is_err());
1698    }
1699
1700    #[tokio::test]
1701    async fn test_background_metrics_concurrent_increments() {
1702        let metrics = Arc::new(BackgroundMetrics::default());
1703        let mut handles = vec![];
1704
1705        for _ in 0..10 {
1706            let m = metrics.clone();
1707            let h = tokio::spawn(async move {
1708                for _ in 0..10 {
1709                    m.inc_queued();
1710                    m.inc_running();
1711                    m.inc_failed();
1712                    m.dec_queued();
1713                    m.dec_running();
1714                }
1715            });
1716            handles.push(h);
1717        }
1718
1719        for h in handles {
1720            let _ = h.await;
1721        }
1722
1723        assert_eq!(metrics.queued.load(Ordering::Relaxed), 0);
1724        assert_eq!(metrics.running.load(Ordering::Relaxed), 0);
1725        assert_eq!(metrics.failed.load(Ordering::Relaxed), 100);
1726    }
1727
1728    #[tokio::test]
1729    async fn test_drain_phase_execution_with_lingering_senders() {
1730        let config = BackgroundTaskConfig {
1731            max_queue_size: 20,
1732            max_concurrent_tasks: 2,
1733            drain_timeout_secs: 10,
1734        };
1735
1736        let runtime = BackgroundRuntime::start(config).await;
1737        let handle = runtime.handle();
1738
1739        let executed = Arc::new(AtomicU64::new(0));
1740
1741        for _ in 0..5 {
1742            let e = executed.clone();
1743            handle
1744                .spawn(move || {
1745                    let ex = e.clone();
1746                    async move {
1747                        tokio::time::sleep(Duration::from_millis(10)).await;
1748                        ex.fetch_add(1, Ordering::SeqCst);
1749                        Ok(())
1750                    }
1751                })
1752                .unwrap();
1753        }
1754
1755        let result = runtime.shutdown().await;
1756        assert!(result.is_ok());
1757
1758        assert_eq!(executed.load(Ordering::SeqCst), 5);
1759    }
1760
1761    #[tokio::test]
1762    async fn test_concurrent_queue_status_transitions() {
1763        let config = BackgroundTaskConfig {
1764            max_queue_size: 10,
1765            max_concurrent_tasks: 2,
1766            drain_timeout_secs: 10,
1767        };
1768
1769        let runtime = BackgroundRuntime::start(config).await;
1770        let handle = runtime.handle();
1771
1772        let state_transitions = Arc::new(tokio::sync::Mutex::new(Vec::new()));
1773
1774        for i in 0..10 {
1775            let st = state_transitions.clone();
1776            handle
1777                .spawn(move || {
1778                    let s = st.clone();
1779                    async move {
1780                        let mut transitions = s.lock().await;
1781                        transitions.push(("spawned", i));
1782                        drop(transitions);
1783
1784                        tokio::time::sleep(Duration::from_millis(50)).await;
1785
1786                        let mut transitions = s.lock().await;
1787                        transitions.push(("completed", i));
1788                        Ok(())
1789                    }
1790                })
1791                .unwrap();
1792        }
1793
1794        tokio::time::sleep(Duration::from_millis(300)).await;
1795
1796        runtime.shutdown().await.unwrap();
1797
1798        let final_transitions = state_transitions.lock().await;
1799        let completed_count = final_transitions
1800            .iter()
1801            .filter(|(action, _)| *action == "completed")
1802            .count();
1803
1804        assert_eq!(completed_count, 10, "all tasks should complete");
1805    }
1806
1807    #[tokio::test(flavor = "multi_thread")]
1808    async fn test_semaphore_no_starvation_with_uneven_task_duration() {
1809        let config = BackgroundTaskConfig {
1810            max_queue_size: 100,
1811            max_concurrent_tasks: 2,
1812            drain_timeout_secs: 10,
1813        };
1814
1815        let runtime = BackgroundRuntime::start(config).await;
1816        let handle = runtime.handle();
1817
1818        let fast_executed = Arc::new(AtomicU64::new(0));
1819        let slow_executed = Arc::new(AtomicU64::new(0));
1820
1821        for _ in 0..5 {
1822            let s = slow_executed.clone();
1823            handle
1824                .spawn(move || {
1825                    let slow = s.clone();
1826                    async move {
1827                        tokio::time::sleep(Duration::from_millis(100)).await;
1828                        slow.fetch_add(1, Ordering::SeqCst);
1829                        Ok(())
1830                    }
1831                })
1832                .unwrap();
1833        }
1834
1835        tokio::time::sleep(Duration::from_millis(10)).await;
1836
1837        for _ in 0..5 {
1838            let f = fast_executed.clone();
1839            handle
1840                .spawn(move || {
1841                    let fast = f.clone();
1842                    async move {
1843                        tokio::time::sleep(Duration::from_millis(10)).await;
1844                        fast.fetch_add(1, Ordering::SeqCst);
1845                        Ok(())
1846                    }
1847                })
1848                .unwrap();
1849        }
1850
1851        runtime.shutdown().await.unwrap();
1852
1853        assert_eq!(
1854            fast_executed.load(Ordering::SeqCst),
1855            5,
1856            "fast tasks should not be starved"
1857        );
1858        assert_eq!(slow_executed.load(Ordering::SeqCst), 5, "slow tasks should execute");
1859    }
1860}