use super::*;
use std::{
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
thread,
time::Duration,
};
use crate::ipc::EventScope;
#[test]
fn test_saturator_submit() {
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = Arc::clone(&counter);
let handle = spawn_saturator(
|x: i32| x * 2,
move |_result| {
counter_clone.fetch_add(1, Ordering::SeqCst);
},
);
handle.submit(1, None);
handle.submit(2, None);
handle.submit(3, None);
thread::sleep(Duration::from_millis(50));
assert_eq!(counter.load(Ordering::SeqCst), 3);
}
#[test]
fn test_saturator_submit_background() {
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = Arc::clone(&counter);
let handle = spawn_saturator(
|x: i32| x,
move |_result| {
counter_clone.fetch_add(1, Ordering::SeqCst);
},
);
handle.submit_background(1, None);
handle.submit_background(2, None);
thread::sleep(Duration::from_millis(50));
assert_eq!(counter.load(Ordering::SeqCst), 2);
}
#[test]
fn test_saturator_scope_tracking() {
let scope = EventScope::new();
let scope_clone = scope.clone();
let handle = spawn_saturator(|x: i32| x, |_result| {});
handle.submit(1, Some(&scope_clone));
handle.submit(2, Some(&scope_clone));
let completed = scope.wait_timeout(Duration::from_millis(100));
assert!(completed, "Scope should complete when all items processed");
}
#[test]
fn test_saturator_priority_ordering() {
let results = Arc::new(std::sync::Mutex::new(Vec::new()));
let results_clone = Arc::clone(&results);
let handle = spawn_saturator(
|x: i32| x,
move |result| {
results_clone.lock().unwrap().push(result);
},
);
handle.submit_background(3, None);
handle.submit(1, None);
handle.submit(2, None);
thread::sleep(Duration::from_millis(50));
let guard = results.lock().unwrap();
assert!(guard.contains(&1));
assert!(guard.contains(&2));
assert!(guard.contains(&3));
drop(guard);
}
#[test]
fn test_saturator_shutdown() {
let handle = spawn_saturator(|x: i32| x, |_result| {});
handle.shutdown();
assert!(handle.is_shutting_down());
}
#[test]
fn test_saturator_clone() {
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = Arc::clone(&counter);
let handle = spawn_saturator(
|x: i32| x,
move |_result| {
counter_clone.fetch_add(1, Ordering::SeqCst);
},
);
let handle2 = handle.clone();
handle.submit(1, None);
handle2.submit(2, None);
thread::sleep(Duration::from_millis(50));
assert_eq!(counter.load(Ordering::SeqCst), 2);
}
#[test]
fn test_request_priority_default() {
assert_eq!(RequestPriority::default(), RequestPriority::High);
}
#[test]
fn test_submit_request() {
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = Arc::clone(&counter);
let handle = spawn_saturator(
|x: i32| x,
move |_result| {
counter_clone.fetch_add(1, Ordering::SeqCst);
},
);
handle.submit_request(SaturationRequest {
data: 42,
priority: RequestPriority::High,
scope: None,
});
thread::sleep(Duration::from_millis(50));
assert_eq!(counter.load(Ordering::SeqCst), 1);
}
#[test]
fn test_submit_request_low_priority() {
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = Arc::clone(&counter);
let handle = spawn_saturator(
|x: i32| x,
move |_result| {
counter_clone.fetch_add(1, Ordering::SeqCst);
},
);
handle.submit_request(SaturationRequest {
data: 99,
priority: RequestPriority::Low,
scope: None,
});
thread::sleep(Duration::from_millis(50));
assert_eq!(counter.load(Ordering::SeqCst), 1);
}
#[test]
fn test_saturator_config_default() {
let config = SaturatorConfig::default();
assert!(config.drain_on_shutdown);
}
#[test]
#[cfg_attr(coverage_nightly, coverage(off))]
fn test_saturator_config_debug_clone() {
let config = SaturatorConfig {
drain_on_shutdown: false,
};
let cloned = config.clone();
assert!(!cloned.drain_on_shutdown);
let debug = format!("{config:?}");
assert!(debug.contains("SaturatorConfig"));
assert!(debug.contains("false"));
}
#[test]
fn test_submit_background_with_scope() {
let scope = EventScope::new();
let scope_clone = scope.clone();
let handle = spawn_saturator(|x: i32| x, |_result| {});
handle.submit_background(1, Some(&scope_clone));
handle.submit_background(2, Some(&scope_clone));
let completed = scope.wait_timeout(Duration::from_millis(100));
assert!(completed, "Scope should complete when all items processed");
}
#[test]
#[cfg_attr(coverage_nightly, coverage(off))]
fn test_saturation_request_debug() {
let request = SaturationRequest {
data: 42i32,
priority: RequestPriority::High,
scope: None,
};
let debug = format!("{request:?}");
assert!(debug.contains("SaturationRequest"));
assert!(debug.contains("42"));
}
#[test]
#[cfg_attr(coverage_nightly, coverage(off))]
fn test_request_priority_debug_clone_hash() {
use std::collections::HashSet;
let high = RequestPriority::High;
let low = RequestPriority::Low;
let cloned = high;
assert_eq!(cloned, RequestPriority::High);
let debug_high = format!("{high:?}");
assert!(debug_high.contains("High"));
let debug_low = format!("{low:?}");
assert!(debug_low.contains("Low"));
let mut set = HashSet::new();
set.insert(high);
set.insert(low);
assert_eq!(set.len(), 2);
}
#[test]
fn test_saturator_drop_joins_worker() {
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = Arc::clone(&counter);
{
let handle = spawn_saturator(
|x: i32| x,
move |_result| {
counter_clone.fetch_add(1, Ordering::SeqCst);
},
);
handle.submit(1, None);
thread::sleep(Duration::from_millis(50));
}
assert!(counter.load(Ordering::SeqCst) >= 1);
}
#[test]
fn test_submit_request_with_scope() {
let scope = EventScope::new();
let scope_clone = scope.clone();
let handle = spawn_saturator(|x: i32| x, |_result| {});
handle.submit_request(SaturationRequest {
data: 7,
priority: RequestPriority::High,
scope: Some(scope_clone),
});
let completed = scope.wait_timeout(Duration::from_millis(100));
assert!(completed);
}
#[test]
fn test_submit_with_scope_increments_and_decrements() {
let scope = EventScope::new();
let handle = spawn_saturator(|x: i32| x, |_result| {});
assert_eq!(scope.in_flight(), 0);
handle.submit(42, Some(&scope));
let completed = scope.wait_timeout(Duration::from_millis(200));
assert!(completed, "Scope should reach 0 after worker processes the item");
assert_eq!(scope.in_flight(), 0);
}
#[test]
fn test_submit_background_scope_increments_and_decrements() {
let scope = EventScope::new();
let handle = spawn_saturator(|x: i32| x, |_result| {});
assert_eq!(scope.in_flight(), 0);
handle.submit_background(10, Some(&scope));
let completed = scope.wait_timeout(Duration::from_millis(200));
assert!(completed, "Background scope should complete after worker processes");
assert_eq!(scope.in_flight(), 0);
}
#[test]
fn test_drop_original_handle_joins_worker_thread() {
use std::sync::atomic::AtomicBool;
let worker_started = Arc::new(AtomicBool::new(false));
let worker_started_clone = Arc::clone(&worker_started);
{
let handle = spawn_saturator(
move |x: i32| {
worker_started_clone.store(true, Ordering::SeqCst);
x
},
|_result| {},
);
handle.submit(1, None);
thread::sleep(Duration::from_millis(30));
}
assert!(worker_started.load(Ordering::SeqCst));
}
#[test]
fn test_drop_cloned_handle_does_not_join_worker() {
let handle = spawn_saturator(|x: i32| x, |_result| {});
let clone = handle.clone();
drop(clone);
handle.submit(1, None);
thread::sleep(Duration::from_millis(30));
drop(handle);
}