Skip to main content

spikard_http/
background.rs

1use std::borrow::Cow;
2use std::sync::Arc;
3use std::time::Duration;
4
5use futures::FutureExt;
6use futures::future::BoxFuture;
7use tokio::sync::{Semaphore, mpsc};
8use tokio::task::JoinSet;
9use tokio::time::timeout;
10use tokio_util::sync::CancellationToken;
11
12/// Configuration for in-process background task execution.
13#[derive(Clone, Debug)]
14pub struct BackgroundTaskConfig {
15    pub max_queue_size: usize,
16    pub max_concurrent_tasks: usize,
17    pub drain_timeout_secs: u64,
18}
19
20impl Default for BackgroundTaskConfig {
21    fn default() -> Self {
22        Self {
23            max_queue_size: 1024,
24            max_concurrent_tasks: 128,
25            drain_timeout_secs: 30,
26        }
27    }
28}
29
30#[derive(Clone, Debug)]
31pub struct BackgroundJobMetadata {
32    pub name: Cow<'static, str>,
33    pub request_id: Option<String>,
34}
35
36impl Default for BackgroundJobMetadata {
37    fn default() -> Self {
38        Self {
39            name: Cow::Borrowed("background_task"),
40            request_id: None,
41        }
42    }
43}
44
45pub type BackgroundJobFuture = BoxFuture<'static, Result<(), BackgroundJobError>>;
46
47struct BackgroundJob {
48    pub future: BackgroundJobFuture,
49    pub metadata: BackgroundJobMetadata,
50}
51
52impl BackgroundJob {
53    fn new<F>(future: F, metadata: BackgroundJobMetadata) -> Self
54    where
55        F: futures::Future<Output = Result<(), BackgroundJobError>> + Send + 'static,
56    {
57        Self {
58            future: future.boxed(),
59            metadata,
60        }
61    }
62}
63
64#[derive(Debug, Clone)]
65pub struct BackgroundJobError {
66    pub message: String,
67}
68
69impl From<String> for BackgroundJobError {
70    fn from(message: String) -> Self {
71        Self { message }
72    }
73}
74
75impl From<&str> for BackgroundJobError {
76    fn from(message: &str) -> Self {
77        Self {
78            message: message.to_string(),
79        }
80    }
81}
82
83#[derive(Debug, Clone)]
84pub enum BackgroundSpawnError {
85    QueueFull,
86}
87
88impl std::fmt::Display for BackgroundSpawnError {
89    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90        match self {
91            BackgroundSpawnError::QueueFull => write!(f, "background task queue is full"),
92        }
93    }
94}
95
96impl std::error::Error for BackgroundSpawnError {}
97
98#[derive(Debug)]
99pub struct BackgroundShutdownError;
100
101#[derive(Default)]
102struct BackgroundMetrics {
103    queued: std::sync::atomic::AtomicU64,
104    running: std::sync::atomic::AtomicU64,
105    failed: std::sync::atomic::AtomicU64,
106}
107
108impl BackgroundMetrics {
109    fn inc_queued(&self) {
110        self.queued.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
111    }
112
113    fn dec_queued(&self) {
114        self.queued.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
115    }
116
117    fn inc_running(&self) {
118        self.running.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
119    }
120
121    fn dec_running(&self) {
122        self.running.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
123    }
124
125    fn inc_failed(&self) {
126        self.failed.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
127    }
128}
129
130#[derive(Clone)]
131pub struct BackgroundHandle {
132    sender: mpsc::Sender<BackgroundJob>,
133    metrics: Arc<BackgroundMetrics>,
134}
135
136impl BackgroundHandle {
137    pub fn spawn<F, Fut>(&self, f: F) -> Result<(), BackgroundSpawnError>
138    where
139        F: FnOnce() -> Fut,
140        Fut: futures::Future<Output = Result<(), BackgroundJobError>> + Send + 'static,
141    {
142        let future = f();
143        self.spawn_with_metadata(future, BackgroundJobMetadata::default())
144    }
145
146    pub fn spawn_with_metadata<Fut>(
147        &self,
148        future: Fut,
149        metadata: BackgroundJobMetadata,
150    ) -> Result<(), BackgroundSpawnError>
151    where
152        Fut: futures::Future<Output = Result<(), BackgroundJobError>> + Send + 'static,
153    {
154        self.metrics.inc_queued();
155        let job = BackgroundJob::new(future, metadata);
156        self.sender.try_send(job).map_err(|_| {
157            self.metrics.dec_queued();
158            BackgroundSpawnError::QueueFull
159        })
160    }
161}
162
163pub struct BackgroundRuntime {
164    handle: BackgroundHandle,
165    drain_timeout: Duration,
166    shutdown_token: CancellationToken,
167    join_handle: tokio::task::JoinHandle<()>,
168}
169
170impl BackgroundRuntime {
171    pub async fn start(config: BackgroundTaskConfig) -> Self {
172        let (tx, rx) = mpsc::channel(config.max_queue_size);
173        let metrics = Arc::new(BackgroundMetrics::default());
174        let handle = BackgroundHandle {
175            sender: tx.clone(),
176            metrics: metrics.clone(),
177        };
178        let shutdown_token = CancellationToken::new();
179        let semaphore = Arc::new(Semaphore::new(config.max_concurrent_tasks));
180        let driver_token = shutdown_token.clone();
181
182        let join_handle = tokio::spawn(run_executor(rx, semaphore, metrics.clone(), driver_token));
183
184        Self {
185            handle,
186            drain_timeout: Duration::from_secs(config.drain_timeout_secs),
187            shutdown_token,
188            join_handle,
189        }
190    }
191
192    pub fn handle(&self) -> BackgroundHandle {
193        self.handle.clone()
194    }
195
196    pub async fn shutdown(self) -> Result<(), BackgroundShutdownError> {
197        self.shutdown_token.cancel();
198        drop(self.handle);
199        match timeout(self.drain_timeout, self.join_handle).await {
200            Ok(Ok(_)) => Ok(()),
201            _ => Err(BackgroundShutdownError),
202        }
203    }
204}
205
206async fn run_executor(
207    mut rx: mpsc::Receiver<BackgroundJob>,
208    semaphore: Arc<Semaphore>,
209    metrics: Arc<BackgroundMetrics>,
210    token: CancellationToken,
211) {
212    let mut join_set = JoinSet::new();
213    let token_clone = token.clone();
214
215    loop {
216        tokio::select! {
217            maybe_job = rx.recv() => {
218                match maybe_job {
219                    Some(job) => {
220                        metrics.dec_queued();
221                        let semaphore = semaphore.clone();
222                        let metrics_clone = metrics.clone();
223                        join_set.spawn(async move {
224                            let BackgroundJob { future, metadata } = job;
225                            match semaphore.acquire_owned().await {
226                                Ok(_permit) => {
227                                    metrics_clone.inc_running();
228                                    if let Err(err) = future.await {
229                                        metrics_clone.inc_failed();
230                                        tracing::error!(target = "spikard::background", task = %metadata.name, error = %err.message, "background task failed");
231                                    }
232                                    metrics_clone.dec_running();
233                                }
234                                Err(_) => {
235                                    metrics_clone.inc_failed();
236                                    tracing::warn!(target = "spikard::background", "failed to acquire semaphore permit for background task");
237                                }
238                            }
239                        });
240                    }
241                    None => break,
242                }
243            }
244            _ = token_clone.cancelled() => {
245                break;
246            }
247        }
248    }
249
250    let mut drain_attempts = 0;
251    loop {
252        match rx.try_recv() {
253            Ok(job) => {
254                metrics.dec_queued();
255                let semaphore = semaphore.clone();
256                let metrics_clone = metrics.clone();
257                join_set.spawn(async move {
258                    let BackgroundJob { future, metadata } = job;
259                    match semaphore.acquire_owned().await {
260                        Ok(_permit) => {
261                            metrics_clone.inc_running();
262                            if let Err(err) = future.await {
263                                metrics_clone.inc_failed();
264                                tracing::error!(target = "spikard::background", task = %metadata.name, error = %err.message, "background task failed");
265                            }
266                            metrics_clone.dec_running();
267                        }
268                        Err(_) => {
269                            metrics_clone.inc_failed();
270                            tracing::warn!(target = "spikard::background", "failed to acquire semaphore permit for background task");
271                        }
272                    }
273                });
274                drain_attempts = 0;
275            }
276            Err(mpsc::error::TryRecvError::Empty) => {
277                drain_attempts += 1;
278                if drain_attempts > 100 {
279                    break;
280                }
281                tokio::time::sleep(Duration::from_millis(10)).await;
282            }
283            Err(mpsc::error::TryRecvError::Disconnected) => {
284                break;
285            }
286        }
287    }
288
289    while join_set.join_next().await.is_some() {}
290}
291
292#[cfg(test)]
293mod tests {
294    use super::*;
295    use std::sync::atomic::{AtomicU64, Ordering};
296
297    #[tokio::test]
298    async fn test_basic_spawn_and_execution() {
299        let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
300        let handle = runtime.handle();
301
302        let counter = Arc::new(AtomicU64::new(0));
303        let counter_clone = counter.clone();
304
305        handle
306            .spawn(move || {
307                let c = counter_clone.clone();
308                async move {
309                    c.fetch_add(1, Ordering::SeqCst);
310                    Ok(())
311                }
312            })
313            .expect("spawn failed");
314
315        tokio::time::sleep(Duration::from_millis(100)).await;
316        assert_eq!(counter.load(Ordering::SeqCst), 1);
317
318        runtime.shutdown().await.expect("shutdown failed");
319    }
320
321    #[tokio::test]
322    async fn test_multiple_tasks() {
323        let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
324        let handle = runtime.handle();
325
326        let counter = Arc::new(AtomicU64::new(0));
327
328        for _ in 0..10 {
329            let counter_clone = counter.clone();
330            handle
331                .spawn(move || {
332                    let c = counter_clone.clone();
333                    async move {
334                        c.fetch_add(1, Ordering::SeqCst);
335                        Ok(())
336                    }
337                })
338                .expect("spawn failed");
339        }
340
341        tokio::time::sleep(Duration::from_millis(200)).await;
342        assert_eq!(counter.load(Ordering::SeqCst), 10);
343
344        runtime.shutdown().await.expect("shutdown failed");
345    }
346
347    #[tokio::test]
348    async fn test_task_with_metadata() {
349        let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
350        let handle = runtime.handle();
351
352        let metadata = BackgroundJobMetadata {
353            name: Cow::Owned("test_task".to_string()),
354            request_id: Some("req-123".to_string()),
355        };
356
357        let counter = Arc::new(AtomicU64::new(0));
358        let counter_clone = counter.clone();
359
360        let future = async move {
361            counter_clone.fetch_add(1, Ordering::SeqCst);
362            Ok(())
363        };
364
365        handle.spawn_with_metadata(future, metadata).expect("spawn failed");
366
367        tokio::time::sleep(Duration::from_millis(100)).await;
368        assert_eq!(counter.load(Ordering::SeqCst), 1);
369
370        runtime.shutdown().await.expect("shutdown failed");
371    }
372
373    #[tokio::test]
374    async fn test_queue_full_error() {
375        let config = BackgroundTaskConfig {
376            max_queue_size: 2,
377            max_concurrent_tasks: 10,
378            drain_timeout_secs: 5,
379        };
380
381        let runtime = BackgroundRuntime::start(config).await;
382        let handle = runtime.handle();
383
384        let blocking_barrier = Arc::new(tokio::sync::Barrier::new(3));
385
386        for _ in 0..2 {
387            let barrier = blocking_barrier.clone();
388            handle
389                .spawn(move || {
390                    let b = barrier.clone();
391                    async move {
392                        b.wait().await;
393                        tokio::time::sleep(Duration::from_secs(1)).await;
394                        Ok(())
395                    }
396                })
397                .expect("spawn failed");
398        }
399
400        let result = handle.spawn(move || async { Ok(()) });
401        assert!(matches!(result, Err(BackgroundSpawnError::QueueFull)));
402
403        blocking_barrier.wait().await;
404        tokio::time::sleep(Duration::from_millis(100)).await;
405
406        runtime.shutdown().await.expect("shutdown failed");
407    }
408
409    #[tokio::test]
410    async fn test_task_failure_handling() {
411        let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
412        let handle = runtime.handle();
413
414        let success_count = Arc::new(AtomicU64::new(0));
415        let success_count_clone = success_count.clone();
416
417        handle
418            .spawn(move || {
419                let s = success_count_clone.clone();
420                async move {
421                    s.fetch_add(1, Ordering::SeqCst);
422                    Err(BackgroundJobError::from("test error"))
423                }
424            })
425            .expect("spawn failed");
426
427        tokio::time::sleep(Duration::from_millis(100)).await;
428        assert_eq!(success_count.load(Ordering::SeqCst), 1);
429
430        runtime.shutdown().await.expect("shutdown failed");
431    }
432
433    #[tokio::test(flavor = "multi_thread")]
434    async fn test_concurrency_limit_with_proper_synchronization() {
435        let config = BackgroundTaskConfig {
436            max_queue_size: 100,
437            max_concurrent_tasks: 2,
438            drain_timeout_secs: 30,
439        };
440
441        let runtime = BackgroundRuntime::start(config).await;
442        let handle = runtime.handle();
443
444        let running_count = Arc::new(AtomicU64::new(0));
445        let max_concurrent = Arc::new(AtomicU64::new(0));
446
447        for _ in 0..5 {
448            let running = running_count.clone();
449            let max = max_concurrent.clone();
450
451            handle
452                .spawn(move || {
453                    let r = running.clone();
454                    let m = max.clone();
455                    async move {
456                        r.fetch_add(1, Ordering::SeqCst);
457                        let current_running = r.load(Ordering::SeqCst);
458                        let mut current_max = m.load(Ordering::SeqCst);
459                        while current_running > current_max {
460                            m.store(current_running, Ordering::SeqCst);
461                            current_max = current_running;
462                        }
463
464                        tokio::time::sleep(Duration::from_millis(100)).await;
465                        r.fetch_sub(1, Ordering::SeqCst);
466                        Ok(())
467                    }
468                })
469                .expect("spawn failed");
470        }
471
472        tokio::time::sleep(Duration::from_millis(700)).await;
473        let max_concurrent_observed = max_concurrent.load(Ordering::SeqCst);
474        assert!(
475            max_concurrent_observed <= 2,
476            "Max concurrent should be <= 2, but was {}",
477            max_concurrent_observed
478        );
479
480        runtime.shutdown().await.expect("shutdown failed");
481    }
482
483    #[tokio::test]
484    async fn test_graceful_shutdown() {
485        let config = BackgroundTaskConfig {
486            max_queue_size: 10,
487            max_concurrent_tasks: 2,
488            drain_timeout_secs: 5,
489        };
490
491        let runtime = BackgroundRuntime::start(config).await;
492        let handle = runtime.handle();
493
494        let counter = Arc::new(AtomicU64::new(0));
495        let counter_clone = counter.clone();
496
497        handle
498            .spawn(move || {
499                let c = counter_clone.clone();
500                async move {
501                    tokio::time::sleep(Duration::from_millis(50)).await;
502                    c.fetch_add(1, Ordering::SeqCst);
503                    Ok(())
504                }
505            })
506            .expect("spawn failed");
507
508        tokio::time::sleep(Duration::from_millis(200)).await;
509
510        let result = runtime.shutdown().await;
511        assert!(result.is_ok());
512        assert_eq!(counter.load(Ordering::SeqCst), 1);
513    }
514
515    #[tokio::test]
516    async fn test_shutdown_timeout() {
517        let config = BackgroundTaskConfig {
518            max_queue_size: 10,
519            max_concurrent_tasks: 2,
520            drain_timeout_secs: 1,
521        };
522
523        let runtime = BackgroundRuntime::start(config).await;
524        let handle = runtime.handle();
525
526        handle
527            .spawn(|| async {
528                tokio::time::sleep(Duration::from_secs(5)).await;
529                Ok(())
530            })
531            .expect("spawn failed");
532
533        tokio::time::sleep(Duration::from_millis(100)).await;
534
535        let result = runtime.shutdown().await;
536        assert!(result.is_err());
537    }
538
539    #[tokio::test]
540    async fn test_metrics_tracking() {
541        let config = BackgroundTaskConfig::default();
542        let runtime = BackgroundRuntime::start(config).await;
543        let handle = runtime.handle();
544
545        let barrier = Arc::new(tokio::sync::Barrier::new(2));
546
547        for _ in 0..2 {
548            let b = barrier.clone();
549            let _ = handle.spawn(move || {
550                let barrier = b.clone();
551                async move {
552                    barrier.wait().await;
553                    Ok(())
554                }
555            });
556        }
557
558        tokio::time::sleep(Duration::from_millis(150)).await;
559
560        runtime.shutdown().await.expect("shutdown failed");
561    }
562
563    #[tokio::test]
564    async fn test_task_cancellation_on_shutdown() {
565        let config = BackgroundTaskConfig {
566            max_queue_size: 10,
567            max_concurrent_tasks: 2,
568            drain_timeout_secs: 1,
569        };
570
571        let runtime = BackgroundRuntime::start(config).await;
572        let handle = runtime.handle();
573
574        let started_count = Arc::new(AtomicU64::new(0));
575        let _completed_count = Arc::new(AtomicU64::new(0));
576
577        let started = started_count.clone();
578
579        handle
580            .spawn(move || {
581                let s = started.clone();
582                async move {
583                    s.fetch_add(1, Ordering::SeqCst);
584                    tokio::time::sleep(Duration::from_secs(10)).await;
585                    Ok(())
586                }
587            })
588            .expect("spawn failed");
589
590        tokio::time::sleep(Duration::from_millis(100)).await;
591        assert_eq!(started_count.load(Ordering::SeqCst), 1);
592
593        let shutdown_start = std::time::Instant::now();
594        let result = runtime.shutdown().await;
595        let shutdown_elapsed = shutdown_start.elapsed();
596
597        assert!(result.is_err());
598        assert!(shutdown_elapsed < Duration::from_secs(3));
599    }
600
601    #[tokio::test]
602    async fn test_queue_overflow_multiple_spawns() {
603        let config = BackgroundTaskConfig {
604            max_queue_size: 3,
605            max_concurrent_tasks: 10,
606            drain_timeout_secs: 5,
607        };
608
609        let runtime = BackgroundRuntime::start(config).await;
610        let handle = runtime.handle();
611
612        let blocking_barrier = Arc::new(tokio::sync::Barrier::new(4));
613
614        for _ in 0..3 {
615            let b = blocking_barrier.clone();
616            handle
617                .spawn(move || {
618                    let barrier = b.clone();
619                    async move {
620                        barrier.wait().await;
621                        tokio::time::sleep(Duration::from_millis(100)).await;
622                        Ok(())
623                    }
624                })
625                .expect("spawn failed");
626        }
627
628        let result = handle.spawn(|| async { Ok(()) });
629        assert!(matches!(result, Err(BackgroundSpawnError::QueueFull)));
630
631        blocking_barrier.wait().await;
632        tokio::time::sleep(Duration::from_millis(200)).await;
633
634        let result = handle.spawn(|| async { Ok(()) });
635        assert!(result.is_ok());
636
637        runtime.shutdown().await.expect("shutdown failed");
638    }
639
640    #[tokio::test]
641    async fn test_concurrent_task_execution_order() {
642        let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
643        let handle = runtime.handle();
644
645        let execution_order = Arc::new(tokio::sync::Mutex::new(Vec::new()));
646
647        for i in 0..5 {
648            let order = execution_order.clone();
649            handle
650                .spawn(move || {
651                    let o = order.clone();
652                    async move {
653                        o.lock().await.push(i);
654                        Ok(())
655                    }
656                })
657                .expect("spawn failed");
658        }
659
660        tokio::time::sleep(Duration::from_millis(200)).await;
661
662        let order = execution_order.lock().await;
663        assert_eq!(order.len(), 5);
664        for i in 0..5 {
665            assert!(order.contains(&i));
666        }
667
668        runtime.shutdown().await.expect("shutdown failed");
669    }
670
671    #[tokio::test]
672    async fn test_error_from_string_conversion() {
673        let error = BackgroundJobError::from("test message");
674        assert_eq!(error.message, "test message");
675
676        let error2 = BackgroundJobError::from("test".to_string());
677        assert_eq!(error2.message, "test");
678    }
679
680    #[tokio::test]
681    async fn test_background_job_metadata_default() {
682        let metadata = BackgroundJobMetadata::default();
683        assert_eq!(metadata.name, "background_task");
684        assert_eq!(metadata.request_id, None);
685    }
686
687    #[tokio::test]
688    async fn test_background_job_metadata_custom() {
689        let metadata = BackgroundJobMetadata {
690            name: Cow::Borrowed("custom_task"),
691            request_id: Some("req-456".to_string()),
692        };
693        assert_eq!(metadata.name, "custom_task");
694        assert_eq!(metadata.request_id, Some("req-456".to_string()));
695    }
696
697    #[tokio::test]
698    async fn test_metrics_inc_dec_operations() {
699        let metrics = BackgroundMetrics::default();
700
701        metrics.inc_queued();
702        assert_eq!(metrics.queued.load(Ordering::Relaxed), 1);
703
704        metrics.inc_queued();
705        assert_eq!(metrics.queued.load(Ordering::Relaxed), 2);
706
707        metrics.dec_queued();
708        assert_eq!(metrics.queued.load(Ordering::Relaxed), 1);
709
710        metrics.inc_running();
711        assert_eq!(metrics.running.load(Ordering::Relaxed), 1);
712
713        metrics.dec_running();
714        assert_eq!(metrics.running.load(Ordering::Relaxed), 0);
715
716        metrics.inc_failed();
717        assert_eq!(metrics.failed.load(Ordering::Relaxed), 1);
718
719        metrics.inc_failed();
720        assert_eq!(metrics.failed.load(Ordering::Relaxed), 2);
721    }
722
723    #[tokio::test]
724    async fn test_spawn_error_display() {
725        let error = BackgroundSpawnError::QueueFull;
726        assert_eq!(error.to_string(), "background task queue is full");
727    }
728
729    #[tokio::test]
730    async fn test_background_config_default() {
731        let config = BackgroundTaskConfig::default();
732        assert_eq!(config.max_queue_size, 1024);
733        assert_eq!(config.max_concurrent_tasks, 128);
734        assert_eq!(config.drain_timeout_secs, 30);
735    }
736
737    #[tokio::test]
738    async fn test_shutdown_with_zero_pending_tasks() {
739        let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
740
741        let result = runtime.shutdown().await;
742        assert!(result.is_ok(), "shutdown should succeed with no tasks");
743    }
744
745    #[tokio::test]
746    async fn test_shutdown_with_only_running_tasks() {
747        let config = BackgroundTaskConfig {
748            max_queue_size: 10,
749            max_concurrent_tasks: 2,
750            drain_timeout_secs: 5,
751        };
752        let runtime = BackgroundRuntime::start(config).await;
753        let handle = runtime.handle();
754
755        let execution_started: Arc<std::sync::atomic::AtomicBool> = Arc::new(std::sync::atomic::AtomicBool::new(false));
756        let execution_completed: Arc<std::sync::atomic::AtomicBool> =
757            Arc::new(std::sync::atomic::AtomicBool::new(false));
758
759        let started = execution_started.clone();
760        let completed = execution_completed.clone();
761
762        handle
763            .spawn(move || {
764                let s = started.clone();
765                let c = completed.clone();
766                async move {
767                    s.store(true, std::sync::atomic::Ordering::SeqCst);
768                    tokio::time::sleep(Duration::from_millis(100)).await;
769                    c.store(true, std::sync::atomic::Ordering::SeqCst);
770                    Ok(())
771                }
772            })
773            .unwrap();
774
775        tokio::time::sleep(Duration::from_millis(20)).await;
776
777        let result = runtime.shutdown().await;
778        assert!(result.is_ok(), "shutdown should succeed and wait for running tasks");
779        assert!(
780            execution_completed.load(std::sync::atomic::Ordering::SeqCst),
781            "task should have completed"
782        );
783    }
784
785    // TODO: FAILING TEST - Architectural Issue
786    #[tokio::test]
787    async fn test_shutdown_drains_queued_tasks() {
788        let config = BackgroundTaskConfig {
789            max_queue_size: 100,
790            max_concurrent_tasks: 1,
791            drain_timeout_secs: 5,
792        };
793        let runtime = BackgroundRuntime::start(config).await;
794        let handle = runtime.handle();
795
796        let execution_count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
797
798        for _ in 0..10 {
799            let count = execution_count.clone();
800            handle
801                .spawn(move || {
802                    let c = count.clone();
803                    async move {
804                        c.fetch_add(1, Ordering::SeqCst);
805                        tokio::time::sleep(Duration::from_millis(10)).await;
806                        Ok(())
807                    }
808                })
809                .unwrap();
810        }
811
812        let result = runtime.shutdown().await;
813        assert!(result.is_ok());
814        assert_eq!(
815            execution_count.load(Ordering::SeqCst),
816            10,
817            "all queued tasks should execute"
818        );
819    }
820
821    #[tokio::test]
822    async fn test_shutdown_timeout_force_stops_long_tasks() {
823        let config = BackgroundTaskConfig {
824            max_queue_size: 10,
825            max_concurrent_tasks: 2,
826            drain_timeout_secs: 1,
827        };
828        let runtime = BackgroundRuntime::start(config).await;
829        let handle = runtime.handle();
830
831        let completed: Arc<std::sync::atomic::AtomicBool> = Arc::new(std::sync::atomic::AtomicBool::new(false));
832        let completed_clone = completed.clone();
833
834        handle
835            .spawn(move || {
836                let c = completed_clone.clone();
837                async move {
838                    tokio::time::sleep(Duration::from_secs(10)).await;
839                    c.store(true, std::sync::atomic::Ordering::SeqCst);
840                    Ok(())
841                }
842            })
843            .unwrap();
844
845        tokio::time::sleep(Duration::from_millis(50)).await;
846
847        let shutdown_start = std::time::Instant::now();
848        let result = runtime.shutdown().await;
849        let elapsed = shutdown_start.elapsed();
850
851        assert!(result.is_err(), "shutdown should timeout");
852        assert!(
853            elapsed < Duration::from_secs(3),
854            "shutdown should timeout near drain_timeout"
855        );
856        assert!(
857            !completed.load(std::sync::atomic::Ordering::SeqCst),
858            "long-running task should not complete"
859        );
860    }
861
862    #[tokio::test]
863    async fn test_multiple_shutdown_calls_idempotent() {
864        let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
865
866        let result1 = runtime.shutdown().await;
867        assert!(result1.is_ok(), "first shutdown should succeed");
868    }
869
870    #[tokio::test]
871    async fn test_spawn_after_all_senders_dropped_fails() {
872        let config = BackgroundTaskConfig::default();
873        let runtime = BackgroundRuntime::start(config).await;
874        let handle = runtime.handle();
875
876        runtime.shutdown().await.expect("shutdown should succeed");
877
878        tokio::time::sleep(Duration::from_millis(50)).await;
879
880        let result = handle.spawn(|| async { Ok(()) });
881        assert!(result.is_err(), "spawn should fail after all senders are dropped");
882    }
883
884    #[tokio::test]
885    async fn test_concurrent_spawns_hit_semaphore_limit() {
886        let config = BackgroundTaskConfig {
887            max_queue_size: 100,
888            max_concurrent_tasks: 3,
889            drain_timeout_secs: 10,
890        };
891        let runtime = BackgroundRuntime::start(config).await;
892        let handle = runtime.handle();
893
894        let barrier: Arc<tokio::sync::Barrier> = Arc::new(tokio::sync::Barrier::new(3));
895        let running_count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
896        let peak_concurrent: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
897
898        for _ in 0..5 {
899            let b = barrier.clone();
900            let running = running_count.clone();
901            let peak = peak_concurrent.clone();
902
903            handle
904                .spawn(move || {
905                    let barrier = b.clone();
906                    let r = running.clone();
907                    let p = peak.clone();
908                    async move {
909                        let current = r.fetch_add(1, Ordering::SeqCst) + 1;
910                        let mut peak_val = p.load(Ordering::SeqCst);
911                        while current > peak_val {
912                            if p.compare_exchange(peak_val, current, Ordering::SeqCst, Ordering::SeqCst)
913                                .is_ok()
914                            {
915                                break;
916                            }
917                            peak_val = p.load(Ordering::SeqCst);
918                        }
919
920                        barrier.wait().await;
921                        tokio::time::sleep(Duration::from_millis(200)).await;
922                        r.fetch_sub(1, Ordering::SeqCst);
923                        Ok(())
924                    }
925                })
926                .unwrap();
927        }
928
929        barrier.wait().await;
930        tokio::time::sleep(Duration::from_millis(100)).await;
931
932        let peak = peak_concurrent.load(Ordering::SeqCst);
933        assert!(
934            peak <= 3,
935            "concurrent execution should not exceed semaphore limit of 3, got {}",
936            peak
937        );
938
939        runtime.shutdown().await.unwrap();
940    }
941
942    #[tokio::test]
943    async fn test_task_panic_cleanup_still_occurs() {
944        let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
945        let handle = runtime.handle();
946
947        let mut spawned_count: u32 = 0;
948        let panic_task_executed: Arc<std::sync::atomic::AtomicBool> =
949            Arc::new(std::sync::atomic::AtomicBool::new(false));
950        let after_panic_executed: Arc<std::sync::atomic::AtomicBool> =
951            Arc::new(std::sync::atomic::AtomicBool::new(false));
952
953        let panic_flag = panic_task_executed.clone();
954        handle
955            .spawn(move || {
956                let p = panic_flag.clone();
957                async move {
958                    p.store(true, std::sync::atomic::Ordering::SeqCst);
959                    Err(BackgroundJobError::from("simulated task failure"))
960                }
961            })
962            .unwrap();
963        spawned_count += 1;
964
965        let after_flag = after_panic_executed.clone();
966        handle
967            .spawn(move || {
968                let a = after_flag.clone();
969                async move {
970                    tokio::time::sleep(Duration::from_millis(50)).await;
971                    a.store(true, std::sync::atomic::Ordering::SeqCst);
972                    Ok(())
973                }
974            })
975            .unwrap();
976        spawned_count += 1;
977
978        tokio::time::sleep(Duration::from_millis(200)).await;
979
980        assert!(panic_task_executed.load(std::sync::atomic::Ordering::SeqCst));
981        assert!(after_panic_executed.load(std::sync::atomic::Ordering::SeqCst));
982        assert_eq!(spawned_count, 2);
983
984        runtime.shutdown().await.unwrap();
985    }
986
987    #[tokio::test]
988    async fn test_queue_overflow_with_immediate_rejection() {
989        let config = BackgroundTaskConfig {
990            max_queue_size: 2,
991            max_concurrent_tasks: 100,
992            drain_timeout_secs: 5,
993        };
994        let runtime = BackgroundRuntime::start(config).await;
995        let handle = runtime.handle();
996
997        let barrier: Arc<tokio::sync::Barrier> = Arc::new(tokio::sync::Barrier::new(3));
998
999        for _ in 0..2 {
1000            let b = barrier.clone();
1001            handle
1002                .spawn(move || {
1003                    let barrier = b.clone();
1004                    async move {
1005                        barrier.wait().await;
1006                        tokio::time::sleep(Duration::from_millis(500)).await;
1007                        Ok(())
1008                    }
1009                })
1010                .unwrap();
1011        }
1012
1013        let overflow_result = handle.spawn(|| async { Ok(()) });
1014        assert!(matches!(overflow_result, Err(BackgroundSpawnError::QueueFull)));
1015
1016        barrier.wait().await;
1017        runtime.shutdown().await.unwrap();
1018    }
1019
1020    #[tokio::test]
1021    async fn test_metrics_accuracy_under_concurrent_load() {
1022        let config = BackgroundTaskConfig {
1023            max_queue_size: 50,
1024            max_concurrent_tasks: 5,
1025            drain_timeout_secs: 10,
1026        };
1027        let runtime = BackgroundRuntime::start(config).await;
1028        let handle = runtime.handle();
1029
1030        let completed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1031
1032        for _ in 0..20 {
1033            let c = completed.clone();
1034            handle
1035                .spawn(move || {
1036                    let count = c.clone();
1037                    async move {
1038                        tokio::time::sleep(Duration::from_millis(50)).await;
1039                        count.fetch_add(1, Ordering::SeqCst);
1040                        Ok(())
1041                    }
1042                })
1043                .unwrap();
1044        }
1045
1046        runtime.shutdown().await.unwrap();
1047        assert_eq!(completed.load(Ordering::SeqCst), 20, "all tasks should complete");
1048    }
1049
1050    #[tokio::test]
1051    async fn test_drain_with_slowly_completing_tasks() {
1052        let config = BackgroundTaskConfig {
1053            max_queue_size: 50,
1054            max_concurrent_tasks: 2,
1055            drain_timeout_secs: 10,
1056        };
1057        let runtime = BackgroundRuntime::start(config).await;
1058        let handle = runtime.handle();
1059
1060        let completed_count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1061
1062        for i in 0..5 {
1063            let count = completed_count.clone();
1064            handle
1065                .spawn(move || {
1066                    let c = count.clone();
1067                    async move {
1068                        let sleep_ms = 100 + (i as u64 * 50);
1069                        tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
1070                        c.fetch_add(1, Ordering::SeqCst);
1071                        Ok(())
1072                    }
1073                })
1074                .unwrap();
1075        }
1076
1077        let result = runtime.shutdown().await;
1078        assert!(result.is_ok());
1079        assert_eq!(completed_count.load(Ordering::SeqCst), 5);
1080    }
1081
1082    #[tokio::test]
1083    async fn test_semaphore_starvation_doesnt_deadlock() {
1084        let config = BackgroundTaskConfig {
1085            max_queue_size: 100,
1086            max_concurrent_tasks: 1,
1087            drain_timeout_secs: 10,
1088        };
1089        let runtime = BackgroundRuntime::start(config).await;
1090        let handle = runtime.handle();
1091
1092        let completion_order: Arc<tokio::sync::Mutex<Vec<u32>>> = Arc::new(tokio::sync::Mutex::new(Vec::new()));
1093
1094        for i in 0..10 {
1095            let order = completion_order.clone();
1096            handle
1097                .spawn(move || {
1098                    let o = order.clone();
1099                    async move {
1100                        tokio::time::sleep(Duration::from_millis(5)).await;
1101                        let mut guard = o.lock().await;
1102                        guard.push(i);
1103                        Ok(())
1104                    }
1105                })
1106                .unwrap();
1107        }
1108
1109        let result = runtime.shutdown().await;
1110        assert!(result.is_ok());
1111
1112        let order = completion_order.lock().await;
1113        assert_eq!(order.len(), 10);
1114    }
1115
1116    #[tokio::test]
1117    async fn test_cancel_task_mid_execution() {
1118        let config = BackgroundTaskConfig {
1119            max_queue_size: 10,
1120            max_concurrent_tasks: 2,
1121            drain_timeout_secs: 1,
1122        };
1123        let runtime = BackgroundRuntime::start(config).await;
1124        let handle = runtime.handle();
1125
1126        let started: Arc<std::sync::atomic::AtomicBool> = Arc::new(std::sync::atomic::AtomicBool::new(false));
1127        let ended: Arc<std::sync::atomic::AtomicBool> = Arc::new(std::sync::atomic::AtomicBool::new(false));
1128
1129        let start_flag = started.clone();
1130        let end_flag = ended.clone();
1131
1132        handle
1133            .spawn(move || {
1134                let s = start_flag.clone();
1135                let e = end_flag.clone();
1136                async move {
1137                    s.store(true, std::sync::atomic::Ordering::SeqCst);
1138                    tokio::time::sleep(Duration::from_secs(10)).await;
1139                    e.store(true, std::sync::atomic::Ordering::SeqCst);
1140                    Ok(())
1141                }
1142            })
1143            .unwrap();
1144
1145        tokio::time::sleep(Duration::from_millis(50)).await;
1146        assert!(started.load(std::sync::atomic::Ordering::SeqCst));
1147
1148        let result = runtime.shutdown().await;
1149        assert!(result.is_err(), "shutdown should timeout due to long task");
1150        assert!(
1151            !ended.load(std::sync::atomic::Ordering::SeqCst),
1152            "task should not complete"
1153        );
1154    }
1155
1156    #[tokio::test]
1157    async fn test_rapid_spawn_and_shutdown() {
1158        let config = BackgroundTaskConfig {
1159            max_queue_size: 1000,
1160            max_concurrent_tasks: 10,
1161            drain_timeout_secs: 5,
1162        };
1163        let runtime = BackgroundRuntime::start(config).await;
1164        let handle = runtime.handle();
1165
1166        let count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1167
1168        for _ in 0..100 {
1169            let c = count.clone();
1170            let _ = handle.spawn(move || {
1171                let counter = c.clone();
1172                async move {
1173                    counter.fetch_add(1, Ordering::SeqCst);
1174                    Ok(())
1175                }
1176            });
1177        }
1178
1179        let result = runtime.shutdown().await;
1180        assert!(result.is_ok());
1181
1182        let final_count = count.load(Ordering::SeqCst);
1183        assert!(final_count > 0, "at least some tasks should execute");
1184        assert!(final_count <= 100, "no more than spawned count should execute");
1185    }
1186
1187    #[tokio::test]
1188    async fn test_shutdown_with_mixed_success_and_failure_tasks() {
1189        let config = BackgroundTaskConfig::default();
1190        let runtime = BackgroundRuntime::start(config).await;
1191        let handle = runtime.handle();
1192
1193        let success_count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1194        let failure_count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1195
1196        for i in 0..10 {
1197            if i % 2 == 0 {
1198                let s = success_count.clone();
1199                handle
1200                    .spawn(move || {
1201                        let counter = s.clone();
1202                        async move {
1203                            counter.fetch_add(1, Ordering::SeqCst);
1204                            Ok(())
1205                        }
1206                    })
1207                    .unwrap();
1208            } else {
1209                let f = failure_count.clone();
1210                handle
1211                    .spawn(move || {
1212                        let counter = f.clone();
1213                        async move {
1214                            counter.fetch_add(1, Ordering::SeqCst);
1215                            Err(BackgroundJobError::from("intentional failure"))
1216                        }
1217                    })
1218                    .unwrap();
1219            }
1220        }
1221
1222        tokio::time::sleep(Duration::from_millis(200)).await;
1223
1224        let result = runtime.shutdown().await;
1225        assert!(result.is_ok());
1226        assert_eq!(success_count.load(Ordering::SeqCst), 5);
1227        assert_eq!(failure_count.load(Ordering::SeqCst), 5);
1228    }
1229
1230    #[tokio::test]
1231    async fn test_concurrent_handle_clones_spawn_independently() {
1232        let config = BackgroundTaskConfig::default();
1233        let runtime = BackgroundRuntime::start(config).await;
1234        let handle = runtime.handle();
1235
1236        let count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1237
1238        let mut join_handles = vec![];
1239
1240        for _ in 0..3 {
1241            let h = handle.clone();
1242            let c = count.clone();
1243
1244            let jh = tokio::spawn(async move {
1245                for _ in 0..5 {
1246                    let counter = c.clone();
1247                    let _ = h.spawn(move || {
1248                        let cnt = counter.clone();
1249                        async move {
1250                            cnt.fetch_add(1, Ordering::SeqCst);
1251                            Ok(())
1252                        }
1253                    });
1254                }
1255            });
1256            join_handles.push(jh);
1257        }
1258
1259        for jh in join_handles {
1260            let _ = jh.await;
1261        }
1262
1263        tokio::time::sleep(Duration::from_millis(200)).await;
1264
1265        let result = runtime.shutdown().await;
1266        assert!(result.is_ok());
1267        assert_eq!(count.load(Ordering::SeqCst), 15);
1268    }
1269
1270    #[tokio::test]
1271    async fn test_queue_full_metrics_updated() {
1272        let config = BackgroundTaskConfig {
1273            max_queue_size: 2,
1274            max_concurrent_tasks: 100,
1275            drain_timeout_secs: 5,
1276        };
1277        let runtime = BackgroundRuntime::start(config).await;
1278        let handle = runtime.handle();
1279
1280        let barrier: Arc<tokio::sync::Barrier> = Arc::new(tokio::sync::Barrier::new(3));
1281
1282        for _ in 0..2 {
1283            let b = barrier.clone();
1284            handle
1285                .spawn(move || {
1286                    let barrier = b.clone();
1287                    async move {
1288                        barrier.wait().await;
1289                        tokio::time::sleep(Duration::from_secs(1)).await;
1290                        Ok(())
1291                    }
1292                })
1293                .unwrap();
1294        }
1295
1296        let result = handle.spawn(|| async { Ok(()) });
1297        assert!(matches!(result, Err(BackgroundSpawnError::QueueFull)));
1298
1299        barrier.wait().await;
1300        tokio::time::sleep(Duration::from_millis(100)).await;
1301
1302        runtime.shutdown().await.unwrap();
1303    }
1304
1305    #[tokio::test]
1306    async fn test_handle_persistence_across_spawns() {
1307        let config = BackgroundTaskConfig::default();
1308        let runtime = BackgroundRuntime::start(config).await;
1309        let handle = runtime.handle();
1310
1311        let count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1312
1313        for _ in 0..5 {
1314            let c = count.clone();
1315            handle
1316                .spawn(move || {
1317                    let counter = c.clone();
1318                    async move {
1319                        counter.fetch_add(1, Ordering::SeqCst);
1320                        Ok(())
1321                    }
1322                })
1323                .unwrap();
1324        }
1325
1326        tokio::time::sleep(Duration::from_millis(150)).await;
1327        assert_eq!(count.load(Ordering::SeqCst), 5);
1328
1329        runtime.shutdown().await.unwrap();
1330    }
1331
1332    #[tokio::test]
1333    async fn test_shutdown_with_queue_at_capacity() {
1334        let config = BackgroundTaskConfig {
1335            max_queue_size: 5,
1336            max_concurrent_tasks: 1,
1337            drain_timeout_secs: 10,
1338        };
1339        let runtime = BackgroundRuntime::start(config).await;
1340        let handle = runtime.handle();
1341
1342        let completion_count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1343
1344        for _ in 0..5 {
1345            let c = completion_count.clone();
1346            handle
1347                .spawn(move || {
1348                    let counter = c.clone();
1349                    async move {
1350                        tokio::time::sleep(Duration::from_millis(20)).await;
1351                        counter.fetch_add(1, Ordering::SeqCst);
1352                        Ok(())
1353                    }
1354                })
1355                .unwrap();
1356        }
1357
1358        let result = runtime.shutdown().await;
1359        assert!(result.is_ok());
1360        assert_eq!(completion_count.load(Ordering::SeqCst), 5);
1361    }
1362
1363    #[tokio::test]
1364    async fn test_metadata_preserved_through_execution() {
1365        let runtime = BackgroundRuntime::start(BackgroundTaskConfig::default()).await;
1366        let handle = runtime.handle();
1367
1368        let metadata = BackgroundJobMetadata {
1369            name: Cow::Owned("test_metadata_task".to_string()),
1370            request_id: Some("req-metadata-123".to_string()),
1371        };
1372
1373        let executed: Arc<std::sync::atomic::AtomicBool> = Arc::new(std::sync::atomic::AtomicBool::new(false));
1374        let executed_clone = executed.clone();
1375
1376        let future = async move {
1377            executed_clone.store(true, std::sync::atomic::Ordering::SeqCst);
1378            Ok(())
1379        };
1380
1381        handle.spawn_with_metadata(future, metadata).unwrap();
1382
1383        tokio::time::sleep(Duration::from_millis(100)).await;
1384
1385        assert!(executed.load(std::sync::atomic::Ordering::SeqCst));
1386        runtime.shutdown().await.unwrap();
1387    }
1388
1389    #[tokio::test]
1390    async fn test_very_short_drain_timeout_forces_stop() {
1391        let config = BackgroundTaskConfig {
1392            max_queue_size: 10,
1393            max_concurrent_tasks: 2,
1394            drain_timeout_secs: 0,
1395        };
1396        let runtime = BackgroundRuntime::start(config).await;
1397        let handle = runtime.handle();
1398
1399        handle
1400            .spawn(|| async {
1401                tokio::time::sleep(Duration::from_secs(1)).await;
1402                Ok(())
1403            })
1404            .unwrap();
1405
1406        tokio::time::sleep(Duration::from_millis(10)).await;
1407
1408        let result = runtime.shutdown().await;
1409        assert!(result.is_err());
1410    }
1411
1412    #[tokio::test]
1413    async fn test_spawn_many_tasks_sequential_drain() {
1414        let config = BackgroundTaskConfig {
1415            max_queue_size: 200,
1416            max_concurrent_tasks: 2,
1417            drain_timeout_secs: 15,
1418        };
1419        let runtime = BackgroundRuntime::start(config).await;
1420        let handle = runtime.handle();
1421
1422        let count: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
1423
1424        for _ in 0..50 {
1425            let c = count.clone();
1426            handle
1427                .spawn(move || {
1428                    let counter = c.clone();
1429                    async move {
1430                        tokio::time::sleep(Duration::from_millis(1)).await;
1431                        counter.fetch_add(1, Ordering::SeqCst);
1432                        Ok(())
1433                    }
1434                })
1435                .unwrap();
1436        }
1437
1438        let result = runtime.shutdown().await;
1439        assert!(result.is_ok());
1440        assert_eq!(count.load(Ordering::SeqCst), 50);
1441    }
1442
1443    #[tokio::test]
1444    async fn test_no_deadlock_with_max_concurrency_barrier() {
1445        let config = BackgroundTaskConfig {
1446            max_queue_size: 100,
1447            max_concurrent_tasks: 3,
1448            drain_timeout_secs: 10,
1449        };
1450        let runtime = BackgroundRuntime::start(config).await;
1451        let handle = runtime.handle();
1452
1453        let barrier: Arc<tokio::sync::Barrier> = Arc::new(tokio::sync::Barrier::new(4));
1454
1455        for _ in 0..3 {
1456            let b = barrier.clone();
1457            handle
1458                .spawn(move || {
1459                    let barrier = b.clone();
1460                    async move {
1461                        barrier.wait().await;
1462                        tokio::time::sleep(Duration::from_millis(50)).await;
1463                        Ok(())
1464                    }
1465                })
1466                .unwrap();
1467        }
1468
1469        barrier.wait().await;
1470        tokio::time::sleep(Duration::from_millis(100)).await;
1471
1472        let result = runtime.shutdown().await;
1473        assert!(result.is_ok());
1474    }
1475
1476    #[tokio::test]
1477    async fn test_error_from_owned_string() {
1478        let message = String::from("error message");
1479        let error = BackgroundJobError::from(message);
1480        assert_eq!(error.message, "error message");
1481    }
1482
1483    #[tokio::test]
1484    async fn test_borrowed_str_conversion() {
1485        let error = BackgroundJobError::from("borrowed message");
1486        assert_eq!(error.message, "borrowed message");
1487    }
1488
1489    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1490    async fn test_phase1_semaphore_acquisition_with_concurrent_load() {
1491        let config = BackgroundTaskConfig {
1492            max_queue_size: 50,
1493            max_concurrent_tasks: 1,
1494            drain_timeout_secs: 10,
1495        };
1496
1497        let runtime = BackgroundRuntime::start(config).await;
1498        let handle = runtime.handle();
1499
1500        let execution_count = Arc::new(AtomicU64::new(0));
1501
1502        let blocking_barrier = Arc::new(tokio::sync::Barrier::new(2));
1503        let barrier_clone = blocking_barrier.clone();
1504
1505        handle
1506            .spawn(move || {
1507                let b = barrier_clone.clone();
1508                async move {
1509                    b.wait().await;
1510                    tokio::time::sleep(Duration::from_millis(100)).await;
1511                    Ok(())
1512                }
1513            })
1514            .unwrap();
1515
1516        tokio::time::sleep(Duration::from_millis(50)).await;
1517
1518        for _ in 0..3 {
1519            let exec_clone = execution_count.clone();
1520            handle
1521                .spawn(move || {
1522                    let e = exec_clone.clone();
1523                    async move {
1524                        e.fetch_add(1, Ordering::SeqCst);
1525                        Ok(())
1526                    }
1527                })
1528                .unwrap();
1529        }
1530
1531        blocking_barrier.wait().await;
1532        tokio::time::sleep(Duration::from_millis(250)).await;
1533
1534        assert_eq!(execution_count.load(Ordering::SeqCst), 3);
1535
1536        runtime.shutdown().await.unwrap();
1537    }
1538
1539    #[tokio::test(flavor = "multi_thread")]
1540    async fn test_concurrent_task_completion_race_conditions() {
1541        let config = BackgroundTaskConfig {
1542            max_queue_size: 100,
1543            max_concurrent_tasks: 8,
1544            drain_timeout_secs: 10,
1545        };
1546
1547        let runtime = BackgroundRuntime::start(config).await;
1548        let handle = runtime.handle();
1549
1550        let completion_counter = Arc::new(AtomicU64::new(0));
1551        let task_count = 50;
1552
1553        for i in 0..task_count {
1554            let counter = completion_counter.clone();
1555            handle
1556                .spawn(move || {
1557                    let c = counter.clone();
1558                    async move {
1559                        let sleep_ms = (i as u64 * 11) % 100;
1560                        tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
1561                        c.fetch_add(1, Ordering::SeqCst);
1562                        Ok(())
1563                    }
1564                })
1565                .unwrap();
1566        }
1567
1568        runtime.shutdown().await.unwrap();
1569        assert_eq!(
1570            completion_counter.load(Ordering::SeqCst),
1571            task_count,
1572            "all tasks should complete despite race conditions"
1573        );
1574    }
1575
1576    #[tokio::test(flavor = "multi_thread")]
1577    async fn test_failure_metric_tracking_under_concurrent_errors() {
1578        let config = BackgroundTaskConfig {
1579            max_queue_size: 50,
1580            max_concurrent_tasks: 5,
1581            drain_timeout_secs: 10,
1582        };
1583
1584        let runtime = BackgroundRuntime::start(config).await;
1585        let handle = runtime.handle();
1586
1587        let success_count = Arc::new(AtomicU64::new(0));
1588        let failure_count = Arc::new(AtomicU64::new(0));
1589
1590        for i in 0..20 {
1591            if i % 3 == 0 {
1592                let fail_clone = failure_count.clone();
1593                handle
1594                    .spawn(move || {
1595                        let f = fail_clone.clone();
1596                        async move {
1597                            f.fetch_add(1, Ordering::SeqCst);
1598                            Err(BackgroundJobError::from("intentional failure"))
1599                        }
1600                    })
1601                    .unwrap();
1602            } else {
1603                let succ_clone = success_count.clone();
1604                handle
1605                    .spawn(move || {
1606                        let s = succ_clone.clone();
1607                        async move {
1608                            s.fetch_add(1, Ordering::SeqCst);
1609                            Ok(())
1610                        }
1611                    })
1612                    .unwrap();
1613            }
1614        }
1615
1616        runtime.shutdown().await.unwrap();
1617
1618        let final_success = success_count.load(Ordering::SeqCst);
1619        let final_failure = failure_count.load(Ordering::SeqCst);
1620
1621        assert_eq!(final_success + final_failure, 20);
1622        assert_eq!(final_failure, 7);
1623        assert_eq!(final_success, 13);
1624    }
1625
1626    #[tokio::test(flavor = "multi_thread")]
1627    async fn test_handle_clone_isolation_concurrent_spawns() {
1628        let config = BackgroundTaskConfig {
1629            max_queue_size: 100,
1630            max_concurrent_tasks: 4,
1631            drain_timeout_secs: 10,
1632        };
1633
1634        let runtime = BackgroundRuntime::start(config).await;
1635        let handle = runtime.handle();
1636
1637        let completion_counters: Vec<Arc<AtomicU64>> = (0..5).map(|_| Arc::new(AtomicU64::new(0))).collect();
1638
1639        let mut join_handles = vec![];
1640
1641        for worker_id in 0..5 {
1642            let h = handle.clone();
1643            let counter = completion_counters[worker_id].clone();
1644
1645            let jh = tokio::spawn(async move {
1646                for task_id in 0..10 {
1647                    let c = counter.clone();
1648                    let _ = h.spawn(move || {
1649                        let cnt = c.clone();
1650                        async move {
1651                            let delay = (worker_id as u64 * 10 + task_id as u64) % 50;
1652                            tokio::time::sleep(Duration::from_millis(delay)).await;
1653                            cnt.fetch_add(1, Ordering::SeqCst);
1654                            Ok(())
1655                        }
1656                    });
1657                }
1658            });
1659            join_handles.push(jh);
1660        }
1661
1662        for jh in join_handles {
1663            let _ = jh.await;
1664        }
1665
1666        runtime.shutdown().await.unwrap();
1667
1668        for (i, counter) in completion_counters.iter().enumerate() {
1669            assert_eq!(
1670                counter.load(Ordering::SeqCst),
1671                10,
1672                "worker {} should have exactly 10 task completions",
1673                i
1674            );
1675        }
1676    }
1677
1678    #[tokio::test]
1679    async fn test_background_job_error_with_string_slice() {
1680        let errors = vec![
1681            BackgroundJobError::from("simple error"),
1682            BackgroundJobError::from(String::from("owned string error")),
1683        ];
1684
1685        for error in errors {
1686            assert!(!error.message.is_empty());
1687        }
1688    }
1689
1690    #[tokio::test]
1691    async fn test_spawn_error_display_formatting() {
1692        let error = BackgroundSpawnError::QueueFull;
1693        let formatted = format!("{}", error);
1694        assert_eq!(formatted, "background task queue is full");
1695
1696        let result: Result<(), BackgroundSpawnError> = Err(error);
1697        assert!(result.is_err());
1698    }
1699
1700    #[tokio::test]
1701    async fn test_background_metrics_concurrent_increments() {
1702        let metrics = Arc::new(BackgroundMetrics::default());
1703        let mut handles = vec![];
1704
1705        for _ in 0..10 {
1706            let m = metrics.clone();
1707            let h = tokio::spawn(async move {
1708                for _ in 0..10 {
1709                    m.inc_queued();
1710                    m.inc_running();
1711                    m.inc_failed();
1712                    m.dec_queued();
1713                    m.dec_running();
1714                }
1715            });
1716            handles.push(h);
1717        }
1718
1719        for h in handles {
1720            let _ = h.await;
1721        }
1722
1723        assert_eq!(metrics.queued.load(Ordering::Relaxed), 0);
1724        assert_eq!(metrics.running.load(Ordering::Relaxed), 0);
1725        assert_eq!(metrics.failed.load(Ordering::Relaxed), 100);
1726    }
1727
1728    #[tokio::test]
1729    async fn test_drain_phase_execution_with_lingering_senders() {
1730        let config = BackgroundTaskConfig {
1731            max_queue_size: 20,
1732            max_concurrent_tasks: 2,
1733            drain_timeout_secs: 10,
1734        };
1735
1736        let runtime = BackgroundRuntime::start(config).await;
1737        let handle = runtime.handle();
1738
1739        let executed = Arc::new(AtomicU64::new(0));
1740
1741        for _ in 0..5 {
1742            let e = executed.clone();
1743            handle
1744                .spawn(move || {
1745                    let ex = e.clone();
1746                    async move {
1747                        tokio::time::sleep(Duration::from_millis(10)).await;
1748                        ex.fetch_add(1, Ordering::SeqCst);
1749                        Ok(())
1750                    }
1751                })
1752                .unwrap();
1753        }
1754
1755        let result = runtime.shutdown().await;
1756        assert!(result.is_ok());
1757
1758        assert_eq!(executed.load(Ordering::SeqCst), 5);
1759    }
1760
1761    #[tokio::test]
1762    async fn test_concurrent_queue_status_transitions() {
1763        let config = BackgroundTaskConfig {
1764            max_queue_size: 10,
1765            max_concurrent_tasks: 2,
1766            drain_timeout_secs: 10,
1767        };
1768
1769        let runtime = BackgroundRuntime::start(config).await;
1770        let handle = runtime.handle();
1771
1772        let state_transitions = Arc::new(tokio::sync::Mutex::new(Vec::new()));
1773
1774        for i in 0..10 {
1775            let st = state_transitions.clone();
1776            handle
1777                .spawn(move || {
1778                    let s = st.clone();
1779                    async move {
1780                        let mut transitions = s.lock().await;
1781                        transitions.push(("spawned", i));
1782                        drop(transitions);
1783
1784                        tokio::time::sleep(Duration::from_millis(50)).await;
1785
1786                        let mut transitions = s.lock().await;
1787                        transitions.push(("completed", i));
1788                        Ok(())
1789                    }
1790                })
1791                .unwrap();
1792        }
1793
1794        tokio::time::sleep(Duration::from_millis(300)).await;
1795
1796        runtime.shutdown().await.unwrap();
1797
1798        let final_transitions = state_transitions.lock().await;
1799        let completed_count = final_transitions
1800            .iter()
1801            .filter(|(action, _)| *action == "completed")
1802            .count();
1803
1804        assert_eq!(completed_count, 10, "all tasks should complete");
1805    }
1806
1807    #[tokio::test(flavor = "multi_thread")]
1808    async fn test_semaphore_no_starvation_with_uneven_task_duration() {
1809        let config = BackgroundTaskConfig {
1810            max_queue_size: 100,
1811            max_concurrent_tasks: 2,
1812            drain_timeout_secs: 10,
1813        };
1814
1815        let runtime = BackgroundRuntime::start(config).await;
1816        let handle = runtime.handle();
1817
1818        let fast_executed = Arc::new(AtomicU64::new(0));
1819        let slow_executed = Arc::new(AtomicU64::new(0));
1820
1821        for _ in 0..5 {
1822            let s = slow_executed.clone();
1823            handle
1824                .spawn(move || {
1825                    let slow = s.clone();
1826                    async move {
1827                        tokio::time::sleep(Duration::from_millis(100)).await;
1828                        slow.fetch_add(1, Ordering::SeqCst);
1829                        Ok(())
1830                    }
1831                })
1832                .unwrap();
1833        }
1834
1835        tokio::time::sleep(Duration::from_millis(10)).await;
1836
1837        for _ in 0..5 {
1838            let f = fast_executed.clone();
1839            handle
1840                .spawn(move || {
1841                    let fast = f.clone();
1842                    async move {
1843                        tokio::time::sleep(Duration::from_millis(10)).await;
1844                        fast.fetch_add(1, Ordering::SeqCst);
1845                        Ok(())
1846                    }
1847                })
1848                .unwrap();
1849        }
1850
1851        runtime.shutdown().await.unwrap();
1852
1853        assert_eq!(
1854            fast_executed.load(Ordering::SeqCst),
1855            5,
1856            "fast tasks should not be starved"
1857        );
1858        assert_eq!(slow_executed.load(Ordering::SeqCst), 5, "slow tasks should execute");
1859    }
1860}