Skip to main content

spikard_http/
background.rs

1use std::borrow::Cow;
2use std::sync::Arc;
3use std::time::Duration;
4
5use futures::FutureExt;
6use futures::future::BoxFuture;
7use tokio::sync::{Semaphore, mpsc};
8use tokio::task::JoinSet;
9use tokio::time::timeout;
10use tokio_util::sync::CancellationToken;
11
12/// Configuration for in-process background task execution.
13#[derive(Clone, Debug)]
14pub struct BackgroundTaskConfig {
15    pub max_queue_size: usize,
16    pub max_concurrent_tasks: usize,
17    pub drain_timeout_secs: u64,
18}
19
20impl Default for BackgroundTaskConfig {
21    fn default() -> Self {
22        Self {
23            max_queue_size: 1024,
24            max_concurrent_tasks: 128,
25            drain_timeout_secs: 30,
26        }
27    }
28}
29
30#[derive(Clone, Debug)]
31pub struct BackgroundJobMetadata {
32    pub name: Cow<'static, str>,
33    pub request_id: Option<String>,
34}
35
36impl Default for BackgroundJobMetadata {
37    fn default() -> Self {
38        Self {
39            name: Cow::Borrowed("background_task"),
40            request_id: None,
41        }
42    }
43}
44
45pub type BackgroundJobFuture = BoxFuture<'static, Result<(), BackgroundJobError>>;
46
47struct BackgroundJob {
48    pub future: BackgroundJobFuture,
49    pub metadata: BackgroundJobMetadata,
50}
51
52impl BackgroundJob {
53    fn new<F>(future: F, metadata: BackgroundJobMetadata) -> Self
54    where
55        F: futures::Future<Output = Result<(), BackgroundJobError>> + Send + 'static,
56    {
57        Self {
58            future: future.boxed(),
59            metadata,
60        }
61    }
62}
63
64#[derive(Debug, Clone)]
65pub struct BackgroundJobError {
66    pub message: String,
67}
68
69impl From<String> for BackgroundJobError {
70    fn from(message: String) -> Self {
71        Self { message }
72    }
73}
74
75impl From<&str> for BackgroundJobError {
76    fn from(message: &str) -> Self {
77        Self {
78            message: message.to_string(),
79        }
80    }
81}
82
83#[derive(Debug, Clone)]
84pub enum BackgroundSpawnError {
85    QueueFull,
86}
87
88impl std::fmt::Display for BackgroundSpawnError {
89    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90        match self {
91            BackgroundSpawnError::QueueFull => write!(f, "background task queue is full"),
92        }
93    }
94}
95
96impl std::error::Error for BackgroundSpawnError {}
97
98#[derive(Debug)]
99pub struct BackgroundShutdownError;
100
101#[derive(Default)]
102struct BackgroundMetrics {
103    queued: std::sync::atomic::AtomicU64,
104    running: std::sync::atomic::AtomicU64,
105    failed: std::sync::atomic::AtomicU64,
106}
107
108impl BackgroundMetrics {
109    fn inc_queued(&self) {
110        self.queued.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
111    }
112
113    fn dec_queued(&self) {
114        self.queued.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
115    }
116
117    fn inc_running(&self) {
118        self.running.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
119    }
120
121    fn dec_running(&self) {
122        self.running.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
123    }
124
125    fn inc_failed(&self) {
126        self.failed.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
127    }
128}
129
130#[derive(Clone)]
131pub struct BackgroundHandle {
132    sender: mpsc::Sender<BackgroundJob>,
133    metrics: Arc<BackgroundMetrics>,
134}
135
136impl BackgroundHandle {
137    pub fn spawn<F, Fut>(&self, f: F) -> Result<(), BackgroundSpawnError>
138    where
139        F: FnOnce() -> Fut,
140        Fut: futures::Future<Output = Result<(), BackgroundJobError>> + Send + 'static,
141    {
142        let future = f();
143        self.spawn_with_metadata(future, BackgroundJobMetadata::default())
144    }
145
146    pub fn spawn_with_metadata<Fut>(
147        &self,
148        future: Fut,
149        metadata: BackgroundJobMetadata,
150    ) -> Result<(), BackgroundSpawnError>
151    where
152        Fut: futures::Future<Output = Result<(), BackgroundJobError>> + Send + 'static,
153    {
154        self.metrics.inc_queued();
155        let job = BackgroundJob::new(future, metadata);
156        self.sender.try_send(job).map_err(|_| {
157            self.metrics.dec_queued();
158            BackgroundSpawnError::QueueFull
159        })
160    }
161}
162
163pub struct BackgroundRuntime {
164    handle: BackgroundHandle,
165    drain_timeout: Duration,
166    shutdown_token: CancellationToken,
167    join_handle: tokio::task::JoinHandle<()>,
168}
169
170impl BackgroundRuntime {
171    pub async fn start(config: BackgroundTaskConfig) -> Self {
172        let (tx, rx) = mpsc::channel(config.max_queue_size);
173        let metrics = Arc::new(BackgroundMetrics::default());
174        let handle = BackgroundHandle {
175            sender: tx.clone(),
176            metrics: metrics.clone(),
177        };
178        let shutdown_token = CancellationToken::new();
179        let semaphore = Arc::new(Semaphore::new(config.max_concurrent_tasks));
180        let driver_token = shutdown_token.clone();
181
182        let join_handle = tokio::spawn(run_executor(rx, semaphore, metrics.clone(), driver_token));
183
184        Self {
185            handle,
186            drain_timeout: Duration::from_secs(config.drain_timeout_secs),
187            shutdown_token,
188            join_handle,
189        }
190    }
191
192    pub fn handle(&self) -> BackgroundHandle {
193        self.handle.clone()
194    }
195
196    pub async fn shutdown(self) -> Result<(), BackgroundShutdownError> {
197        self.shutdown_token.cancel();
198        drop(self.handle);
199        match timeout(self.drain_timeout, self.join_handle).await {
200            Ok(Ok(_)) => Ok(()),
201            _ => Err(BackgroundShutdownError),
202        }
203    }
204}
205
206async fn run_executor(
207    mut rx: mpsc::Receiver<BackgroundJob>,
208    semaphore: Arc<Semaphore>,
209    metrics: Arc<BackgroundMetrics>,
210    token: CancellationToken,
211) {
212    let mut join_set = JoinSet::new();
213    let token_clone = token.clone();
214
215    loop {
216        tokio::select! {
217            maybe_job = rx.recv() => {
218                match maybe_job {
219                    Some(job) => {
220                        metrics.dec_queued();
221                        let semaphore = semaphore.clone();
222                        let metrics_clone = metrics.clone();
223                        join_set.spawn(async move {
224                            let BackgroundJob { future, metadata } = job;
225                            match semaphore.acquire_owned().await {
226                                Ok(_permit) => {
227                                    metrics_clone.inc_running();
228                                    if let Err(err) = future.await {
229                                        metrics_clone.inc_failed();
230                                        tracing::error!(target = "spikard::background", task = %metadata.name, error = %err.message, "background task failed");
231                                    }
232                                    metrics_clone.dec_running();
233                                }
234                                Err(_) => {
235                                    metrics_clone.inc_failed();
236                                    tracing::warn!(target = "spikard::background", "failed to acquire semaphore permit for background task");
237                                }
238                            }
239                        });
240                    }
241                    None => break,
242                }
243            }
244            _ = token_clone.cancelled() => {
245                break;
246            }
247        }
248    }
249
250    let mut drain_attempts = 0;
251    loop {
252        match rx.try_recv() {
253            Ok(job) => {
254                metrics.dec_queued();
255                let semaphore = semaphore.clone();
256                let metrics_clone = metrics.clone();
257                join_set.spawn(async move {
258                    let BackgroundJob { future, metadata } = job;
259                    match semaphore.acquire_owned().await {
260                        Ok(_permit) => {
261                            metrics_clone.inc_running();
262                            if let Err(err) = future.await {
263                                metrics_clone.inc_failed();
264                                tracing::error!(target = "spikard::background", task = %metadata.name, error = %err.message, "background task failed");
265                            }
266                            metrics_clone.dec_running();
267                        }
268                        Err(_) => {
269                            metrics_clone.inc_failed();
270                            tracing::warn!(target = "spikard::background", "failed to acquire semaphore permit for background task");
271                        }
272                    }
273                });
274                drain_attempts = 0;
275            }
276            Err(mpsc::error::TryRecvError::Empty) => {
277                drain_attempts += 1;
278                if drain_attempts > 100 {
279                    break;
280                }
281                tokio::time::sleep(Duration::from_millis(10)).await;
282            }
283            Err(mpsc::error::TryRecvError::Disconnected) => {
284                break;
285            }
286        }
287    }
288
289    while join_set.join_next().await.is_some() {}
290}
291
292#[cfg(test)]
293mod tests {
294    use super::*;
295    use std::sync::atomic::{AtomicU64, Ordering};
296
297    #[tokio::test]
298    async fn test_basic_spawn_and_execution() {
299        let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
300        let handle = runtime.handle();
301
302        let counter = Arc::new(AtomicU64::new(0));
303        let counter_clone = counter.clone();
304
305        handle
306            .spawn(move || {
307                let c = counter_clone.clone();
308                async move {
309                    c.fetch_add(1, Ordering::SeqCst);
310                    Ok(())
311                }
312            })
313            .expect("spawn failed");
314
315        tokio::time::sleep(Duration::from_millis(100)).await;
316        assert_eq!(counter.load(Ordering::SeqCst), 1);
317
318        runtime.shutdown().await.expect("shutdown failed");
319    }
320
321    #[tokio::test]
322    async fn test_multiple_tasks() {
323        let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
324        let handle = runtime.handle();
325
326        let counter = Arc::new(AtomicU64::new(0));
327
328        for _ in 0..10 {
329            let counter_clone = counter.clone();
330            handle
331                .spawn(move || {
332                    let c = counter_clone.clone();
333                    async move {
334                        c.fetch_add(1, Ordering::SeqCst);
335                        Ok(())
336                    }
337                })
338                .expect("spawn failed");
339        }
340
341        tokio::time::sleep(Duration::from_millis(200)).await;
342        assert_eq!(counter.load(Ordering::SeqCst), 10);
343
344        runtime.shutdown().await.expect("shutdown failed");
345    }
346
347    #[tokio::test]
348    async fn test_task_with_metadata() {
349        let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
350        let handle = runtime.handle();
351
352        let metadata = BackgroundJobMetadata {
353            name: Cow::Owned("test_task".to_string()),
354            request_id: Some("req-123".to_string()),
355        };
356
357        let counter = Arc::new(AtomicU64::new(0));
358        let counter_clone = counter.clone();
359
360        let future = async move {
361            counter_clone.fetch_add(1, Ordering::SeqCst);
362            Ok(())
363        };
364
365        handle.spawn_with_metadata(future, metadata).expect("spawn failed");
366
367        tokio::time::sleep(Duration::from_millis(100)).await;
368        assert_eq!(counter.load(Ordering::SeqCst), 1);
369
370        runtime.shutdown().await.expect("shutdown failed");
371    }
372
373    #[tokio::test]
374    async fn test_queue_full_error() {
375        let config = BackgroundTaskConfig {
376            max_queue_size: 2,
377            max_concurrent_tasks: 10,
378            drain_timeout_secs: 5,
379        };
380
381        let runtime = BackgroundRuntime::start(config).await;
382        let handle = runtime.handle();
383
384        let blocking_barrier = Arc::new(tokio::sync::Barrier::new(3));
385
386        for _ in 0..2 {
387            let barrier = blocking_barrier.clone();
388            handle
389                .spawn(move || {
390                    let b = barrier.clone();
391                    async move {
392                        b.wait().await;
393                        tokio::time::sleep(Duration::from_secs(1)).await;
394                        Ok(())
395                    }
396                })
397                .expect("spawn failed");
398        }
399
400        let result = handle.spawn(move || async { Ok(()) });
401        assert!(matches!(result, Err(BackgroundSpawnError::QueueFull)));
402
403        blocking_barrier.wait().await;
404        tokio::time::sleep(Duration::from_millis(100)).await;
405
406        runtime.shutdown().await.expect("shutdown failed");
407    }
408
409    #[tokio::test]
410    async fn test_task_failure_handling() {
411        let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
412        let handle = runtime.handle();
413
414        let success_count = Arc::new(AtomicU64::new(0));
415        let success_count_clone = success_count.clone();
416
417        handle
418            .spawn(move || {
419                let s = success_count_clone.clone();
420                async move {
421                    s.fetch_add(1, Ordering::SeqCst);
422                    Err(BackgroundJobError::from("test error"))
423                }
424            })
425            .expect("spawn failed");
426
427        tokio::time::sleep(Duration::from_millis(100)).await;
428        assert_eq!(success_count.load(Ordering::SeqCst), 1);
429
430        runtime.shutdown().await.expect("shutdown failed");
431    }
432
433    #[tokio::test(flavor = "multi_thread")]
434    async fn test_concurrency_limit_with_proper_synchronization() {
435        let config = BackgroundTaskConfig {
436            max_queue_size: 100,
437            max_concurrent_tasks: 2,
438            drain_timeout_secs: 30,
439        };
440
441        let runtime = BackgroundRuntime::start(config).await;
442        let handle = runtime.handle();
443
444        let running_count = Arc::new(AtomicU64::new(0));
445        let max_concurrent = Arc::new(AtomicU64::new(0));
446
447        for _ in 0..5 {
448            let running = running_count.clone();
449            let max = max_concurrent.clone();
450
451            handle
452                .spawn(move || {
453                    let r = running.clone();
454                    let m = max.clone();
455                    async move {
456                        r.fetch_add(1, Ordering::SeqCst);
457                        let current_running = r.load(Ordering::SeqCst);
458                        let mut current_max = m.load(Ordering::SeqCst);
459                        while current_running > current_max {
460                            m.store(current_running, Ordering::SeqCst);
461                            current_max = current_running;
462                        }
463
464                        tokio::time::sleep(Duration::from_millis(100)).await;
465                        r.fetch_sub(1, Ordering::SeqCst);
466                        Ok(())
467                    }
468                })
469                .expect("spawn failed");
470        }
471
472        tokio::time::sleep(Duration::from_millis(700)).await;
473        let max_concurrent_observed = max_concurrent.load(Ordering::SeqCst);
474        assert!(
475            max_concurrent_observed <= 2,
476            "Max concurrent should be <= 2, but was {}",
477            max_concurrent_observed
478        );
479
480        runtime.shutdown().await.expect("shutdown failed");
481    }
482
483    #[tokio::test]
484    async fn test_graceful_shutdown() {
485        let config = BackgroundTaskConfig {
486            max_queue_size: 10,
487            max_concurrent_tasks: 2,
488            drain_timeout_secs: 5,
489        };
490
491        let runtime = BackgroundRuntime::start(config).await;
492        let handle = runtime.handle();
493
494        let counter = Arc::new(AtomicU64::new(0));
495        let counter_clone = counter.clone();
496
497        handle
498            .spawn(move || {
499                let c = counter_clone.clone();
500                async move {
501                    tokio::time::sleep(Duration::from_millis(50)).await;
502                    c.fetch_add(1, Ordering::SeqCst);
503                    Ok(())
504                }
505            })
506            .expect("spawn failed");
507
508        tokio::time::sleep(Duration::from_millis(200)).await;
509
510        let result = runtime.shutdown().await;
511        assert!(result.is_ok());
512        assert_eq!(counter.load(Ordering::SeqCst), 1);
513    }
514
515    #[tokio::test]
516    async fn test_shutdown_timeout() {
517        let config = BackgroundTaskConfig {
518            max_queue_size: 10,
519            max_concurrent_tasks: 2,
520            drain_timeout_secs: 1,
521        };
522
523        let runtime = BackgroundRuntime::start(config).await;
524        let handle = runtime.handle();
525
526        handle
527            .spawn(|| async {
528                tokio::time::sleep(Duration::from_secs(5)).await;
529                Ok(())
530            })
531            .expect("spawn failed");
532
533        tokio::time::sleep(Duration::from_millis(100)).await;
534
535        let result = runtime.shutdown().await;
536        assert!(result.is_err());
537    }
538
539    #[tokio::test]
540    async fn test_metrics_tracking() {
541        let config = BackgroundTaskConfig::default();
542        let runtime = BackgroundRuntime::start(config).await;
543        let handle = runtime.handle();
544
545        let barrier = Arc::new(tokio::sync::Barrier::new(2));
546
547        for _ in 0..2 {
548            let b = barrier.clone();
549            let _ = handle.spawn(move || {
550                let barrier = b.clone();
551                async move {
552                    barrier.wait().await;
553                    Ok(())
554                }
555            });
556        }
557
558        tokio::time::sleep(Duration::from_millis(150)).await;
559
560        runtime.shutdown().await.expect("shutdown failed");
561    }
562
563    #[tokio::test]
564    async fn test_task_cancellation_on_shutdown() {
565        let config = BackgroundTaskConfig {
566            max_queue_size: 10,
567            max_concurrent_tasks: 2,
568            drain_timeout_secs: 1,
569        };
570
571        let runtime = BackgroundRuntime::start(config).await;
572        let handle = runtime.handle();
573
574        let started_count = Arc::new(AtomicU64::new(0));
575        let _completed_count = Arc::new(AtomicU64::new(0));
576
577        let started = started_count.clone();
578
579        handle
580            .spawn(move || {
581                let s = started.clone();
582                async move {
583                    s.fetch_add(1, Ordering::SeqCst);
584                    tokio::time::sleep(Duration::from_secs(10)).await;
585                    Ok(())
586                }
587            })
588            .expect("spawn failed");
589
590        tokio::time::sleep(Duration::from_millis(100)).await;
591        assert_eq!(started_count.load(Ordering::SeqCst), 1);
592
593        let shutdown_start = std::time::Instant::now();
594        let result = runtime.shutdown().await;
595        let shutdown_elapsed = shutdown_start.elapsed();
596
597        assert!(result.is_err());
598        assert!(shutdown_elapsed < Duration::from_secs(3));
599    }
600
601    #[tokio::test]
602    async fn test_queue_overflow_multiple_spawns() {
603        let config = BackgroundTaskConfig {
604            max_queue_size: 3,
605            max_concurrent_tasks: 10,
606            drain_timeout_secs: 5,
607        };
608
609        let runtime = BackgroundRuntime::start(config).await;
610        let handle = runtime.handle();
611
612        let blocking_barrier = Arc::new(tokio::sync::Barrier::new(4));
613
614        for _ in 0..3 {
615            let b = blocking_barrier.clone();
616            handle
617                .spawn(move || {
618                    let barrier = b.clone();
619                    async move {
620                        barrier.wait().await;
621                        tokio::time::sleep(Duration::from_millis(100)).await;
622                        Ok(())
623                    }
624                })
625                .expect("spawn failed");
626        }
627
628        let result = handle.spawn(|| async { Ok(()) });
629        assert!(matches!(result, Err(BackgroundSpawnError::QueueFull)));
630
631        blocking_barrier.wait().await;
632        tokio::time::sleep(Duration::from_millis(200)).await;
633
634        let result = handle.spawn(|| async { Ok(()) });
635        assert!(result.is_ok());
636
637        runtime.shutdown().await.expect("shutdown failed");
638    }
639
640    #[tokio::test]
641    async fn test_concurrent_task_execution_order() {
642        let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
643        let handle = runtime.handle();
644
645        let execution_order = Arc::new(tokio::sync::Mutex::new(Vec::new()));
646
647        for i in 0..5 {
648            let order = execution_order.clone();
649            handle
650                .spawn(move || {
651                    let o = order.clone();
652                    async move {
653                        o.lock().await.push(i);
654                        Ok(())
655                    }
656                })
657                .expect("spawn failed");
658        }
659
660        tokio::time::sleep(Duration::from_millis(200)).await;
661
662        let order = execution_order.lock().await;
663        assert_eq!(order.len(), 5);
664        for i in 0..5 {
665            assert!(order.contains(&i));
666        }
667
668        runtime.shutdown().await.expect("shutdown failed");
669    }
670
671    #[tokio::test]
672    async fn test_error_from_string_conversion() {
673        let error = BackgroundJobError::from("test message");
674        assert_eq!(error.message, "test message");
675
676        let error2 = BackgroundJobError::from("test".to_string());
677        assert_eq!(error2.message, "test");
678    }
679
680    #[tokio::test]
681    async fn test_background_job_metadata_default() {
682        let metadata = BackgroundJobMetadata::default();
683        assert_eq!(metadata.name, "background_task");
684        assert_eq!(metadata.request_id, None);
685    }
686
687    #[tokio::test]
688    async fn test_background_job_metadata_custom() {
689        let metadata = BackgroundJobMetadata {
690            name: Cow::Borrowed("custom_task"),
691            request_id: Some("req-456".to_string()),
692        };
693        assert_eq!(metadata.name, "custom_task");
694        assert_eq!(metadata.request_id, Some("req-456".to_string()));
695    }
696
697    #[tokio::test]
698    async fn test_metrics_inc_dec_operations() {
699        let metrics = BackgroundMetrics::default();
700
701        metrics.inc_queued();
702        assert_eq!(metrics.queued.load(Ordering::Relaxed), 1);
703
704        metrics.inc_queued();
705        assert_eq!(metrics.queued.load(Ordering::Relaxed), 2);
706
707        metrics.dec_queued();
708        assert_eq!(metrics.queued.load(Ordering::Relaxed), 1);
709
710        metrics.inc_running();
711        assert_eq!(metrics.running.load(Ordering::Relaxed), 1);
712
713        metrics.dec_running();
714        assert_eq!(metrics.running.load(Ordering::Relaxed), 0);
715
716        metrics.inc_failed();
717        assert_eq!(metrics.failed.load(Ordering::Relaxed), 1);
718
719        metrics.inc_failed();
720        assert_eq!(metrics.failed.load(Ordering::Relaxed), 2);
721    }
722
723    #[tokio::test]
724    async fn test_spawn_error_display() {
725        let error = BackgroundSpawnError::QueueFull;
726        assert_eq!(error.to_string(), "background task queue is full");
727    }
728
729    #[tokio::test]
730    async fn test_background_config_default() {
731        let config = BackgroundTaskConfig::default();
732        assert_eq!(config.max_queue_size, 1024);
733        assert_eq!(config.max_concurrent_tasks, 128);
734        assert_eq!(config.drain_timeout_secs, 30);
735    }
736
737    #[tokio::test]
738    async fn test_shutdown_with_zero_pending_tasks() {
739        let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
740
741        let result = runtime.shutdown().await;
742        assert!(result.is_ok(), "shutdown should succeed with no tasks");
743    }
744
745    #[tokio::test]
746    async fn test_shutdown_with_only_running_tasks() {
747        let config = BackgroundTaskConfig {
748            max_queue_size: 10,
749            max_concurrent_tasks: 2,
750            drain_timeout_secs: 5,
751        };
752        let runtime = BackgroundRuntime::start(config).await;
753        let handle = runtime.handle();
754
755        let execution_started: Arc<std::sync::atomic::AtomicBool> = Arc::new(std::sync::atomic::AtomicBool::new(false));
756        let execution_completed: Arc<std::sync::atomic::AtomicBool> =
757            Arc::new(std::sync::atomic::AtomicBool::new(false));
758
759        let started = execution_started.clone();
760        let completed = execution_completed.clone();
761
762        handle
763            .spawn(move || {
764                let s = started.clone();
765                let c = completed.clone();
766                async move {
767                    s.store(true, std::sync::atomic::Ordering::SeqCst);
768                    tokio::time::sleep(Duration::from_millis(100)).await;
769                    c.store(true, std::sync::atomic::Ordering::SeqCst);
770                    Ok(())
771                }
772            })
773            .unwrap();
774
775        tokio::time::sleep(Duration::from_millis(20)).await;
776
777        let result = runtime.shutdown().await;
778        assert!(result.is_ok(), "shutdown should succeed and wait for running tasks");
779        assert!(
780            execution_completed.load(std::sync::atomic::Ordering::SeqCst),
781            "task should have completed"
782        );
783    }
784
785    #[tokio::test]
786    async fn test_shutdown_drains_queued_tasks() {
787        let config = BackgroundTaskConfig {
788            max_queue_size: 100,
789            max_concurrent_tasks: 1,
790            drain_timeout_secs: 5,
791        };
792        let runtime = BackgroundRuntime::start(config).await;
793        let handle = runtime.handle();
794
795        let execution_count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
796
797        for _ in 0..10 {
798            let count = execution_count.clone();
799            handle
800                .spawn(move || {
801                    let c = count.clone();
802                    async move {
803                        c.fetch_add(1, Ordering::SeqCst);
804                        tokio::time::sleep(Duration::from_millis(10)).await;
805                        Ok(())
806                    }
807                })
808                .unwrap();
809        }
810
811        let result = runtime.shutdown().await;
812        assert!(result.is_ok());
813        assert_eq!(
814            execution_count.load(Ordering::SeqCst),
815            10,
816            "all queued tasks should execute"
817        );
818    }
819
820    #[tokio::test]
821    async fn test_shutdown_timeout_force_stops_long_tasks() {
822        let config = BackgroundTaskConfig {
823            max_queue_size: 10,
824            max_concurrent_tasks: 2,
825            drain_timeout_secs: 1,
826        };
827        let runtime = BackgroundRuntime::start(config).await;
828        let handle = runtime.handle();
829
830        let completed: Arc<std::sync::atomic::AtomicBool> = Arc::new(std::sync::atomic::AtomicBool::new(false));
831        let completed_clone = completed.clone();
832
833        handle
834            .spawn(move || {
835                let c = completed_clone.clone();
836                async move {
837                    tokio::time::sleep(Duration::from_secs(10)).await;
838                    c.store(true, std::sync::atomic::Ordering::SeqCst);
839                    Ok(())
840                }
841            })
842            .unwrap();
843
844        tokio::time::sleep(Duration::from_millis(50)).await;
845
846        let shutdown_start = std::time::Instant::now();
847        let result = runtime.shutdown().await;
848        let elapsed = shutdown_start.elapsed();
849
850        assert!(result.is_err(), "shutdown should timeout");
851        assert!(
852            elapsed < Duration::from_secs(3),
853            "shutdown should timeout near drain_timeout"
854        );
855        assert!(
856            !completed.load(std::sync::atomic::Ordering::SeqCst),
857            "long-running task should not complete"
858        );
859    }
860
861    #[tokio::test]
862    async fn test_multiple_shutdown_calls_idempotent() {
863        let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
864
865        let result1 = runtime.shutdown().await;
866        assert!(result1.is_ok(), "first shutdown should succeed");
867    }
868
869    #[tokio::test]
870    async fn test_spawn_after_all_senders_dropped_fails() {
871        let config = BackgroundTaskConfig::default();
872        let runtime = BackgroundRuntime::start(config).await;
873        let handle = runtime.handle();
874
875        runtime.shutdown().await.expect("shutdown should succeed");
876
877        tokio::time::sleep(Duration::from_millis(50)).await;
878
879        let result = handle.spawn(|| async { Ok(()) });
880        assert!(result.is_err(), "spawn should fail after all senders are dropped");
881    }
882
883    #[tokio::test]
884    async fn test_concurrent_spawns_hit_semaphore_limit() {
885        let config = BackgroundTaskConfig {
886            max_queue_size: 100,
887            max_concurrent_tasks: 3,
888            drain_timeout_secs: 10,
889        };
890        let runtime = BackgroundRuntime::start(config).await;
891        let handle = runtime.handle();
892
893        let barrier: Arc<tokio::sync::Barrier> = Arc::new(tokio::sync::Barrier::new(3));
894        let running_count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
895        let peak_concurrent: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
896
897        for _ in 0..5 {
898            let b = barrier.clone();
899            let running = running_count.clone();
900            let peak = peak_concurrent.clone();
901
902            handle
903                .spawn(move || {
904                    let barrier = b.clone();
905                    let r = running.clone();
906                    let p = peak.clone();
907                    async move {
908                        let current = r.fetch_add(1, Ordering::SeqCst) + 1;
909                        let mut peak_val = p.load(Ordering::SeqCst);
910                        while current > peak_val {
911                            if p.compare_exchange(peak_val, current, Ordering::SeqCst, Ordering::SeqCst)
912                                .is_ok()
913                            {
914                                break;
915                            }
916                            peak_val = p.load(Ordering::SeqCst);
917                        }
918
919                        barrier.wait().await;
920                        tokio::time::sleep(Duration::from_millis(200)).await;
921                        r.fetch_sub(1, Ordering::SeqCst);
922                        Ok(())
923                    }
924                })
925                .unwrap();
926        }
927
928        barrier.wait().await;
929        tokio::time::sleep(Duration::from_millis(100)).await;
930
931        let peak = peak_concurrent.load(Ordering::SeqCst);
932        assert!(
933            peak <= 3,
934            "concurrent execution should not exceed semaphore limit of 3, got {}",
935            peak
936        );
937
938        runtime.shutdown().await.unwrap();
939    }
940
941    #[tokio::test]
942    async fn test_task_panic_cleanup_still_occurs() {
943        let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
944        let handle = runtime.handle();
945
946        let mut spawned_count: u32 = 0;
947        let panic_task_executed: Arc<std::sync::atomic::AtomicBool> =
948            Arc::new(std::sync::atomic::AtomicBool::new(false));
949        let after_panic_executed: Arc<std::sync::atomic::AtomicBool> =
950            Arc::new(std::sync::atomic::AtomicBool::new(false));
951
952        let panic_flag = panic_task_executed.clone();
953        handle
954            .spawn(move || {
955                let p = panic_flag.clone();
956                async move {
957                    p.store(true, std::sync::atomic::Ordering::SeqCst);
958                    Err(BackgroundJobError::from("simulated task failure"))
959                }
960            })
961            .unwrap();
962        spawned_count += 1;
963
964        let after_flag = after_panic_executed.clone();
965        handle
966            .spawn(move || {
967                let a = after_flag.clone();
968                async move {
969                    tokio::time::sleep(Duration::from_millis(50)).await;
970                    a.store(true, std::sync::atomic::Ordering::SeqCst);
971                    Ok(())
972                }
973            })
974            .unwrap();
975        spawned_count += 1;
976
977        tokio::time::sleep(Duration::from_millis(200)).await;
978
979        assert!(panic_task_executed.load(std::sync::atomic::Ordering::SeqCst));
980        assert!(after_panic_executed.load(std::sync::atomic::Ordering::SeqCst));
981        assert_eq!(spawned_count, 2);
982
983        runtime.shutdown().await.unwrap();
984    }
985
986    #[tokio::test]
987    async fn test_queue_overflow_with_immediate_rejection() {
988        let config = BackgroundTaskConfig {
989            max_queue_size: 2,
990            max_concurrent_tasks: 100,
991            drain_timeout_secs: 5,
992        };
993        let runtime = BackgroundRuntime::start(config).await;
994        let handle = runtime.handle();
995
996        let barrier: Arc<tokio::sync::Barrier> = Arc::new(tokio::sync::Barrier::new(3));
997
998        for _ in 0..2 {
999            let b = barrier.clone();
1000            handle
1001                .spawn(move || {
1002                    let barrier = b.clone();
1003                    async move {
1004                        barrier.wait().await;
1005                        tokio::time::sleep(Duration::from_millis(500)).await;
1006                        Ok(())
1007                    }
1008                })
1009                .unwrap();
1010        }
1011
1012        let overflow_result = handle.spawn(|| async { Ok(()) });
1013        assert!(matches!(overflow_result, Err(BackgroundSpawnError::QueueFull)));
1014
1015        barrier.wait().await;
1016        runtime.shutdown().await.unwrap();
1017    }
1018
1019    #[tokio::test]
1020    async fn test_metrics_accuracy_under_concurrent_load() {
1021        let config = BackgroundTaskConfig {
1022            max_queue_size: 50,
1023            max_concurrent_tasks: 5,
1024            drain_timeout_secs: 10,
1025        };
1026        let runtime = BackgroundRuntime::start(config).await;
1027        let handle = runtime.handle();
1028
1029        let completed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1030
1031        for _ in 0..20 {
1032            let c = completed.clone();
1033            handle
1034                .spawn(move || {
1035                    let count = c.clone();
1036                    async move {
1037                        tokio::time::sleep(Duration::from_millis(50)).await;
1038                        count.fetch_add(1, Ordering::SeqCst);
1039                        Ok(())
1040                    }
1041                })
1042                .unwrap();
1043        }
1044
1045        runtime.shutdown().await.unwrap();
1046        assert_eq!(completed.load(Ordering::SeqCst), 20, "all tasks should complete");
1047    }
1048
1049    #[tokio::test]
1050    async fn test_drain_with_slowly_completing_tasks() {
1051        let config = BackgroundTaskConfig {
1052            max_queue_size: 50,
1053            max_concurrent_tasks: 2,
1054            drain_timeout_secs: 10,
1055        };
1056        let runtime = BackgroundRuntime::start(config).await;
1057        let handle = runtime.handle();
1058
1059        let completed_count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1060
1061        for i in 0..5 {
1062            let count = completed_count.clone();
1063            handle
1064                .spawn(move || {
1065                    let c = count.clone();
1066                    async move {
1067                        let sleep_ms = 100 + (i as u64 * 50);
1068                        tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
1069                        c.fetch_add(1, Ordering::SeqCst);
1070                        Ok(())
1071                    }
1072                })
1073                .unwrap();
1074        }
1075
1076        let result = runtime.shutdown().await;
1077        assert!(result.is_ok());
1078        assert_eq!(completed_count.load(Ordering::SeqCst), 5);
1079    }
1080
1081    #[tokio::test]
1082    async fn test_semaphore_starvation_doesnt_deadlock() {
1083        let config = BackgroundTaskConfig {
1084            max_queue_size: 100,
1085            max_concurrent_tasks: 1,
1086            drain_timeout_secs: 10,
1087        };
1088        let runtime = BackgroundRuntime::start(config).await;
1089        let handle = runtime.handle();
1090
1091        let completion_order: Arc<tokio::sync::Mutex<Vec<u32>>> = Arc::new(tokio::sync::Mutex::new(Vec::new()));
1092
1093        for i in 0..10 {
1094            let order = completion_order.clone();
1095            handle
1096                .spawn(move || {
1097                    let o = order.clone();
1098                    async move {
1099                        tokio::time::sleep(Duration::from_millis(5)).await;
1100                        let mut guard = o.lock().await;
1101                        guard.push(i);
1102                        Ok(())
1103                    }
1104                })
1105                .unwrap();
1106        }
1107
1108        let result = runtime.shutdown().await;
1109        assert!(result.is_ok());
1110
1111        let order = completion_order.lock().await;
1112        assert_eq!(order.len(), 10);
1113    }
1114
1115    #[tokio::test]
1116    async fn test_cancel_task_mid_execution() {
1117        let config = BackgroundTaskConfig {
1118            max_queue_size: 10,
1119            max_concurrent_tasks: 2,
1120            drain_timeout_secs: 1,
1121        };
1122        let runtime = BackgroundRuntime::start(config).await;
1123        let handle = runtime.handle();
1124
1125        let started: Arc<std::sync::atomic::AtomicBool> = Arc::new(std::sync::atomic::AtomicBool::new(false));
1126        let ended: Arc<std::sync::atomic::AtomicBool> = Arc::new(std::sync::atomic::AtomicBool::new(false));
1127
1128        let start_flag = started.clone();
1129        let end_flag = ended.clone();
1130
1131        handle
1132            .spawn(move || {
1133                let s = start_flag.clone();
1134                let e = end_flag.clone();
1135                async move {
1136                    s.store(true, std::sync::atomic::Ordering::SeqCst);
1137                    tokio::time::sleep(Duration::from_secs(10)).await;
1138                    e.store(true, std::sync::atomic::Ordering::SeqCst);
1139                    Ok(())
1140                }
1141            })
1142            .unwrap();
1143
1144        tokio::time::sleep(Duration::from_millis(50)).await;
1145        assert!(started.load(std::sync::atomic::Ordering::SeqCst));
1146
1147        let result = runtime.shutdown().await;
1148        assert!(result.is_err(), "shutdown should timeout due to long task");
1149        assert!(
1150            !ended.load(std::sync::atomic::Ordering::SeqCst),
1151            "task should not complete"
1152        );
1153    }
1154
1155    #[tokio::test]
1156    async fn test_rapid_spawn_and_shutdown() {
1157        let config = BackgroundTaskConfig {
1158            max_queue_size: 1000,
1159            max_concurrent_tasks: 10,
1160            drain_timeout_secs: 5,
1161        };
1162        let runtime = BackgroundRuntime::start(config).await;
1163        let handle = runtime.handle();
1164
1165        let count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1166
1167        for _ in 0..100 {
1168            let c = count.clone();
1169            let _ = handle.spawn(move || {
1170                let counter = c.clone();
1171                async move {
1172                    counter.fetch_add(1, Ordering::SeqCst);
1173                    Ok(())
1174                }
1175            });
1176        }
1177
1178        let result = runtime.shutdown().await;
1179        assert!(result.is_ok());
1180
1181        let final_count = count.load(Ordering::SeqCst);
1182        assert!(final_count > 0, "at least some tasks should execute");
1183        assert!(final_count <= 100, "no more than spawned count should execute");
1184    }
1185
1186    #[tokio::test]
1187    async fn test_shutdown_with_mixed_success_and_failure_tasks() {
1188        let config = BackgroundTaskConfig::default();
1189        let runtime = BackgroundRuntime::start(config).await;
1190        let handle = runtime.handle();
1191
1192        let success_count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1193        let failure_count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1194
1195        for i in 0..10 {
1196            if i % 2 == 0 {
1197                let s = success_count.clone();
1198                handle
1199                    .spawn(move || {
1200                        let counter = s.clone();
1201                        async move {
1202                            counter.fetch_add(1, Ordering::SeqCst);
1203                            Ok(())
1204                        }
1205                    })
1206                    .unwrap();
1207            } else {
1208                let f = failure_count.clone();
1209                handle
1210                    .spawn(move || {
1211                        let counter = f.clone();
1212                        async move {
1213                            counter.fetch_add(1, Ordering::SeqCst);
1214                            Err(BackgroundJobError::from("intentional failure"))
1215                        }
1216                    })
1217                    .unwrap();
1218            }
1219        }
1220
1221        tokio::time::sleep(Duration::from_millis(200)).await;
1222
1223        let result = runtime.shutdown().await;
1224        assert!(result.is_ok());
1225        assert_eq!(success_count.load(Ordering::SeqCst), 5);
1226        assert_eq!(failure_count.load(Ordering::SeqCst), 5);
1227    }
1228
1229    #[tokio::test]
1230    async fn test_concurrent_handle_clones_spawn_independently() {
1231        let config = BackgroundTaskConfig::default();
1232        let runtime = BackgroundRuntime::start(config).await;
1233        let handle = runtime.handle();
1234
1235        let count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1236
1237        let mut join_handles = vec![];
1238
1239        for _ in 0..3 {
1240            let h = handle.clone();
1241            let c = count.clone();
1242
1243            let jh = tokio::spawn(async move {
1244                for _ in 0..5 {
1245                    let counter = c.clone();
1246                    let _ = h.spawn(move || {
1247                        let cnt = counter.clone();
1248                        async move {
1249                            cnt.fetch_add(1, Ordering::SeqCst);
1250                            Ok(())
1251                        }
1252                    });
1253                }
1254            });
1255            join_handles.push(jh);
1256        }
1257
1258        for jh in join_handles {
1259            let _ = jh.await;
1260        }
1261
1262        tokio::time::sleep(Duration::from_millis(200)).await;
1263
1264        let result = runtime.shutdown().await;
1265        assert!(result.is_ok());
1266        assert_eq!(count.load(Ordering::SeqCst), 15);
1267    }
1268
1269    #[tokio::test]
1270    async fn test_queue_full_metrics_updated() {
1271        let config = BackgroundTaskConfig {
1272            max_queue_size: 2,
1273            max_concurrent_tasks: 100,
1274            drain_timeout_secs: 5,
1275        };
1276        let runtime = BackgroundRuntime::start(config).await;
1277        let handle = runtime.handle();
1278
1279        let barrier: Arc<tokio::sync::Barrier> = Arc::new(tokio::sync::Barrier::new(3));
1280
1281        for _ in 0..2 {
1282            let b = barrier.clone();
1283            handle
1284                .spawn(move || {
1285                    let barrier = b.clone();
1286                    async move {
1287                        barrier.wait().await;
1288                        tokio::time::sleep(Duration::from_secs(1)).await;
1289                        Ok(())
1290                    }
1291                })
1292                .unwrap();
1293        }
1294
1295        let result = handle.spawn(|| async { Ok(()) });
1296        assert!(matches!(result, Err(BackgroundSpawnError::QueueFull)));
1297
1298        barrier.wait().await;
1299        tokio::time::sleep(Duration::from_millis(100)).await;
1300
1301        runtime.shutdown().await.unwrap();
1302    }
1303
1304    #[tokio::test]
1305    async fn test_handle_persistence_across_spawns() {
1306        let config = BackgroundTaskConfig::default();
1307        let runtime = BackgroundRuntime::start(config).await;
1308        let handle = runtime.handle();
1309
1310        let count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1311
1312        for _ in 0..5 {
1313            let c = count.clone();
1314            handle
1315                .spawn(move || {
1316                    let counter = c.clone();
1317                    async move {
1318                        counter.fetch_add(1, Ordering::SeqCst);
1319                        Ok(())
1320                    }
1321                })
1322                .unwrap();
1323        }
1324
1325        tokio::time::sleep(Duration::from_millis(150)).await;
1326        assert_eq!(count.load(Ordering::SeqCst), 5);
1327
1328        runtime.shutdown().await.unwrap();
1329    }
1330
1331    #[tokio::test]
1332    async fn test_shutdown_with_queue_at_capacity() {
1333        let config = BackgroundTaskConfig {
1334            max_queue_size: 5,
1335            max_concurrent_tasks: 1,
1336            drain_timeout_secs: 10,
1337        };
1338        let runtime = BackgroundRuntime::start(config).await;
1339        let handle = runtime.handle();
1340
1341        let completion_count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1342
1343        for _ in 0..5 {
1344            let c = completion_count.clone();
1345            handle
1346                .spawn(move || {
1347                    let counter = c.clone();
1348                    async move {
1349                        tokio::time::sleep(Duration::from_millis(20)).await;
1350                        counter.fetch_add(1, Ordering::SeqCst);
1351                        Ok(())
1352                    }
1353                })
1354                .unwrap();
1355        }
1356
1357        let result = runtime.shutdown().await;
1358        assert!(result.is_ok());
1359        assert_eq!(completion_count.load(Ordering::SeqCst), 5);
1360    }
1361
1362    #[tokio::test]
1363    async fn test_metadata_preserved_through_execution() {
1364        let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
1365        let handle = runtime.handle();
1366
1367        let metadata = BackgroundJobMetadata {
1368            name: Cow::Owned("test_metadata_task".to_string()),
1369            request_id: Some("req-metadata-123".to_string()),
1370        };
1371
1372        let executed: Arc<std::sync::atomic::AtomicBool> = Arc::new(std::sync::atomic::AtomicBool::new(false));
1373        let executed_clone = executed.clone();
1374
1375        let future = async move {
1376            executed_clone.store(true, std::sync::atomic::Ordering::SeqCst);
1377            Ok(())
1378        };
1379
1380        handle.spawn_with_metadata(future, metadata).unwrap();
1381
1382        tokio::time::sleep(Duration::from_millis(100)).await;
1383
1384        assert!(executed.load(std::sync::atomic::Ordering::SeqCst));
1385        runtime.shutdown().await.unwrap();
1386    }
1387
1388    #[tokio::test]
1389    async fn test_very_short_drain_timeout_forces_stop() {
1390        let config = BackgroundTaskConfig {
1391            max_queue_size: 10,
1392            max_concurrent_tasks: 2,
1393            drain_timeout_secs: 0,
1394        };
1395        let runtime = BackgroundRuntime::start(config).await;
1396        let handle = runtime.handle();
1397
1398        handle
1399            .spawn(|| async {
1400                tokio::time::sleep(Duration::from_secs(1)).await;
1401                Ok(())
1402            })
1403            .unwrap();
1404
1405        tokio::time::sleep(Duration::from_millis(10)).await;
1406
1407        let result = runtime.shutdown().await;
1408        assert!(result.is_err());
1409    }
1410
1411    #[tokio::test]
1412    async fn test_spawn_many_tasks_sequential_drain() {
1413        let config = BackgroundTaskConfig {
1414            max_queue_size: 200,
1415            max_concurrent_tasks: 2,
1416            drain_timeout_secs: 15,
1417        };
1418        let runtime = BackgroundRuntime::start(config).await;
1419        let handle = runtime.handle();
1420
1421        let count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1422
1423        for _ in 0..50 {
1424            let c = count.clone();
1425            handle
1426                .spawn(move || {
1427                    let counter = c.clone();
1428                    async move {
1429                        tokio::time::sleep(Duration::from_millis(1)).await;
1430                        counter.fetch_add(1, Ordering::SeqCst);
1431                        Ok(())
1432                    }
1433                })
1434                .unwrap();
1435        }
1436
1437        let result = runtime.shutdown().await;
1438        assert!(result.is_ok());
1439        assert_eq!(count.load(Ordering::SeqCst), 50);
1440    }
1441
1442    #[tokio::test]
1443    async fn test_no_deadlock_with_max_concurrency_barrier() {
1444        let config = BackgroundTaskConfig {
1445            max_queue_size: 100,
1446            max_concurrent_tasks: 3,
1447            drain_timeout_secs: 10,
1448        };
1449        let runtime = BackgroundRuntime::start(config).await;
1450        let handle = runtime.handle();
1451
1452        let barrier: Arc<tokio::sync::Barrier> = Arc::new(tokio::sync::Barrier::new(4));
1453
1454        for _ in 0..3 {
1455            let b = barrier.clone();
1456            handle
1457                .spawn(move || {
1458                    let barrier = b.clone();
1459                    async move {
1460                        barrier.wait().await;
1461                        tokio::time::sleep(Duration::from_millis(50)).await;
1462                        Ok(())
1463                    }
1464                })
1465                .unwrap();
1466        }
1467
1468        barrier.wait().await;
1469        tokio::time::sleep(Duration::from_millis(100)).await;
1470
1471        let result = runtime.shutdown().await;
1472        assert!(result.is_ok());
1473    }
1474
1475    #[tokio::test]
1476    async fn test_error_from_owned_string() {
1477        let message = String::from("error message");
1478        let error = BackgroundJobError::from(message);
1479        assert_eq!(error.message, "error message");
1480    }
1481
1482    #[tokio::test]
1483    async fn test_borrowed_str_conversion() {
1484        let error = BackgroundJobError::from("borrowed message");
1485        assert_eq!(error.message, "borrowed message");
1486    }
1487
1488    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1489    async fn test_phase1_semaphore_acquisition_with_concurrent_load() {
1490        let config = BackgroundTaskConfig {
1491            max_queue_size: 50,
1492            max_concurrent_tasks: 1,
1493            drain_timeout_secs: 10,
1494        };
1495
1496        let runtime = BackgroundRuntime::start(config).await;
1497        let handle = runtime.handle();
1498
1499        let execution_count = Arc::new(AtomicU64::new(0));
1500
1501        let blocking_barrier = Arc::new(tokio::sync::Barrier::new(2));
1502        let barrier_clone = blocking_barrier.clone();
1503
1504        handle
1505            .spawn(move || {
1506                let b = barrier_clone.clone();
1507                async move {
1508                    b.wait().await;
1509                    tokio::time::sleep(Duration::from_millis(100)).await;
1510                    Ok(())
1511                }
1512            })
1513            .unwrap();
1514
1515        tokio::time::sleep(Duration::from_millis(50)).await;
1516
1517        for _ in 0..3 {
1518            let exec_clone = execution_count.clone();
1519            handle
1520                .spawn(move || {
1521                    let e = exec_clone.clone();
1522                    async move {
1523                        e.fetch_add(1, Ordering::SeqCst);
1524                        Ok(())
1525                    }
1526                })
1527                .unwrap();
1528        }
1529
1530        blocking_barrier.wait().await;
1531        tokio::time::sleep(Duration::from_millis(250)).await;
1532
1533        assert_eq!(execution_count.load(Ordering::SeqCst), 3);
1534
1535        runtime.shutdown().await.unwrap();
1536    }
1537
1538    #[tokio::test(flavor = "multi_thread")]
1539    async fn test_concurrent_task_completion_race_conditions() {
1540        let config = BackgroundTaskConfig {
1541            max_queue_size: 100,
1542            max_concurrent_tasks: 8,
1543            drain_timeout_secs: 10,
1544        };
1545
1546        let runtime = BackgroundRuntime::start(config).await;
1547        let handle = runtime.handle();
1548
1549        let completion_counter = Arc::new(AtomicU64::new(0));
1550        let task_count = 50;
1551
1552        for i in 0..task_count {
1553            let counter = completion_counter.clone();
1554            handle
1555                .spawn(move || {
1556                    let c = counter.clone();
1557                    async move {
1558                        let sleep_ms = (i as u64 * 11) % 100;
1559                        tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
1560                        c.fetch_add(1, Ordering::SeqCst);
1561                        Ok(())
1562                    }
1563                })
1564                .unwrap();
1565        }
1566
1567        runtime.shutdown().await.unwrap();
1568        assert_eq!(
1569            completion_counter.load(Ordering::SeqCst),
1570            task_count,
1571            "all tasks should complete despite race conditions"
1572        );
1573    }
1574
1575    #[tokio::test(flavor = "multi_thread")]
1576    async fn test_failure_metric_tracking_under_concurrent_errors() {
1577        let config = BackgroundTaskConfig {
1578            max_queue_size: 50,
1579            max_concurrent_tasks: 5,
1580            drain_timeout_secs: 10,
1581        };
1582
1583        let runtime = BackgroundRuntime::start(config).await;
1584        let handle = runtime.handle();
1585
1586        let success_count = Arc::new(AtomicU64::new(0));
1587        let failure_count = Arc::new(AtomicU64::new(0));
1588
1589        for i in 0..20 {
1590            if i % 3 == 0 {
1591                let fail_clone = failure_count.clone();
1592                handle
1593                    .spawn(move || {
1594                        let f = fail_clone.clone();
1595                        async move {
1596                            f.fetch_add(1, Ordering::SeqCst);
1597                            Err(BackgroundJobError::from("intentional failure"))
1598                        }
1599                    })
1600                    .unwrap();
1601            } else {
1602                let succ_clone = success_count.clone();
1603                handle
1604                    .spawn(move || {
1605                        let s = succ_clone.clone();
1606                        async move {
1607                            s.fetch_add(1, Ordering::SeqCst);
1608                            Ok(())
1609                        }
1610                    })
1611                    .unwrap();
1612            }
1613        }
1614
1615        runtime.shutdown().await.unwrap();
1616
1617        let final_success = success_count.load(Ordering::SeqCst);
1618        let final_failure = failure_count.load(Ordering::SeqCst);
1619
1620        assert_eq!(final_success + final_failure, 20);
1621        assert_eq!(final_failure, 7);
1622        assert_eq!(final_success, 13);
1623    }
1624
1625    #[tokio::test(flavor = "multi_thread")]
1626    async fn test_handle_clone_isolation_concurrent_spawns() {
1627        let config = BackgroundTaskConfig {
1628            max_queue_size: 100,
1629            max_concurrent_tasks: 4,
1630            drain_timeout_secs: 10,
1631        };
1632
1633        let runtime = BackgroundRuntime::start(config).await;
1634        let handle = runtime.handle();
1635
1636        let completion_counters: Vec<Arc<AtomicU64>> = (0..5).map(|_| Arc::new(AtomicU64::new(0))).collect();
1637
1638        let mut join_handles = vec![];
1639
1640        for worker_id in 0..5 {
1641            let h = handle.clone();
1642            let counter = completion_counters[worker_id].clone();
1643
1644            let jh = tokio::spawn(async move {
1645                for task_id in 0..10 {
1646                    let c = counter.clone();
1647                    let _ = h.spawn(move || {
1648                        let cnt = c.clone();
1649                        async move {
1650                            let delay = (worker_id as u64 * 10 + task_id as u64) % 50;
1651                            tokio::time::sleep(Duration::from_millis(delay)).await;
1652                            cnt.fetch_add(1, Ordering::SeqCst);
1653                            Ok(())
1654                        }
1655                    });
1656                }
1657            });
1658            join_handles.push(jh);
1659        }
1660
1661        for jh in join_handles {
1662            let _ = jh.await;
1663        }
1664
1665        runtime.shutdown().await.unwrap();
1666
1667        for (i, counter) in completion_counters.iter().enumerate() {
1668            assert_eq!(
1669                counter.load(Ordering::SeqCst),
1670                10,
1671                "worker {} should have exactly 10 task completions",
1672                i
1673            );
1674        }
1675    }
1676
1677    #[tokio::test]
1678    async fn test_background_job_error_with_string_slice() {
1679        let errors = vec![
1680            BackgroundJobError::from("simple error"),
1681            BackgroundJobError::from(String::from("owned string error")),
1682        ];
1683
1684        for error in errors {
1685            assert!(!error.message.is_empty());
1686        }
1687    }
1688
1689    #[tokio::test]
1690    async fn test_spawn_error_display_formatting() {
1691        let error = BackgroundSpawnError::QueueFull;
1692        let formatted = format!("{}", error);
1693        assert_eq!(formatted, "background task queue is full");
1694
1695        let result: Result<(), BackgroundSpawnError> = Err(error);
1696        assert!(result.is_err());
1697    }
1698
1699    #[tokio::test]
1700    async fn test_background_metrics_concurrent_increments() {
1701        let metrics = Arc::new(BackgroundMetrics::default());
1702        let mut handles = vec![];
1703
1704        for _ in 0..10 {
1705            let m = metrics.clone();
1706            let h = tokio::spawn(async move {
1707                for _ in 0..10 {
1708                    m.inc_queued();
1709                    m.inc_running();
1710                    m.inc_failed();
1711                    m.dec_queued();
1712                    m.dec_running();
1713                }
1714            });
1715            handles.push(h);
1716        }
1717
1718        for h in handles {
1719            let _ = h.await;
1720        }
1721
1722        assert_eq!(metrics.queued.load(Ordering::Relaxed), 0);
1723        assert_eq!(metrics.running.load(Ordering::Relaxed), 0);
1724        assert_eq!(metrics.failed.load(Ordering::Relaxed), 100);
1725    }
1726
1727    #[tokio::test]
1728    async fn test_drain_phase_execution_with_lingering_senders() {
1729        let config = BackgroundTaskConfig {
1730            max_queue_size: 20,
1731            max_concurrent_tasks: 2,
1732            drain_timeout_secs: 10,
1733        };
1734
1735        let runtime = BackgroundRuntime::start(config).await;
1736        let handle = runtime.handle();
1737
1738        let executed = Arc::new(AtomicU64::new(0));
1739
1740        for _ in 0..5 {
1741            let e = executed.clone();
1742            handle
1743                .spawn(move || {
1744                    let ex = e.clone();
1745                    async move {
1746                        tokio::time::sleep(Duration::from_millis(10)).await;
1747                        ex.fetch_add(1, Ordering::SeqCst);
1748                        Ok(())
1749                    }
1750                })
1751                .unwrap();
1752        }
1753
1754        let result = runtime.shutdown().await;
1755        assert!(result.is_ok());
1756
1757        assert_eq!(executed.load(Ordering::SeqCst), 5);
1758    }
1759
1760    #[tokio::test]
1761    async fn test_concurrent_queue_status_transitions() {
1762        let config = BackgroundTaskConfig {
1763            max_queue_size: 10,
1764            max_concurrent_tasks: 2,
1765            drain_timeout_secs: 10,
1766        };
1767
1768        let runtime = BackgroundRuntime::start(config).await;
1769        let handle = runtime.handle();
1770
1771        let state_transitions = Arc::new(tokio::sync::Mutex::new(Vec::new()));
1772
1773        for i in 0..10 {
1774            let st = state_transitions.clone();
1775            handle
1776                .spawn(move || {
1777                    let s = st.clone();
1778                    async move {
1779                        let mut transitions = s.lock().await;
1780                        transitions.push(("spawned", i));
1781                        drop(transitions);
1782
1783                        tokio::time::sleep(Duration::from_millis(50)).await;
1784
1785                        let mut transitions = s.lock().await;
1786                        transitions.push(("completed", i));
1787                        Ok(())
1788                    }
1789                })
1790                .unwrap();
1791        }
1792
1793        tokio::time::sleep(Duration::from_millis(300)).await;
1794
1795        runtime.shutdown().await.unwrap();
1796
1797        let final_transitions = state_transitions.lock().await;
1798        let completed_count = final_transitions
1799            .iter()
1800            .filter(|(action, _)| *action == "completed")
1801            .count();
1802
1803        assert_eq!(completed_count, 10, "all tasks should complete");
1804    }
1805
1806    #[tokio::test(flavor = "multi_thread")]
1807    async fn test_semaphore_no_starvation_with_uneven_task_duration() {
1808        let config = BackgroundTaskConfig {
1809            max_queue_size: 100,
1810            max_concurrent_tasks: 2,
1811            drain_timeout_secs: 10,
1812        };
1813
1814        let runtime = BackgroundRuntime::start(config).await;
1815        let handle = runtime.handle();
1816
1817        let fast_executed = Arc::new(AtomicU64::new(0));
1818        let slow_executed = Arc::new(AtomicU64::new(0));
1819
1820        for _ in 0..5 {
1821            let s = slow_executed.clone();
1822            handle
1823                .spawn(move || {
1824                    let slow = s.clone();
1825                    async move {
1826                        tokio::time::sleep(Duration::from_millis(100)).await;
1827                        slow.fetch_add(1, Ordering::SeqCst);
1828                        Ok(())
1829                    }
1830                })
1831                .unwrap();
1832        }
1833
1834        tokio::time::sleep(Duration::from_millis(10)).await;
1835
1836        for _ in 0..5 {
1837            let f = fast_executed.clone();
1838            handle
1839                .spawn(move || {
1840                    let fast = f.clone();
1841                    async move {
1842                        tokio::time::sleep(Duration::from_millis(10)).await;
1843                        fast.fetch_add(1, Ordering::SeqCst);
1844                        Ok(())
1845                    }
1846                })
1847                .unwrap();
1848        }
1849
1850        runtime.shutdown().await.unwrap();
1851
1852        assert_eq!(
1853            fast_executed.load(Ordering::SeqCst),
1854            5,
1855            "fast tasks should not be starved"
1856        );
1857        assert_eq!(slow_executed.load(Ordering::SeqCst), 5, "slow tasks should execute");
1858    }
1859}