#[cfg(test)]
mod deadlock_regression_tests {
use crossbeam::channel::bounded;
use rayon::prelude::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
#[test]
fn test_scoped_sender_cleanup_prevents_deadlock() {
let channel_capacity = 2;
let num_chunks = 100;
let (sender, receiver) = bounded::<usize>(channel_capacity);
let processed = Arc::new(AtomicUsize::new(0));
let processed_clone = Arc::clone(&processed);
let consumer = thread::spawn(move || {
let mut count = 0;
for _value in receiver.into_iter() {
count += 1;
thread::sleep(Duration::from_micros(10));
processed_clone.fetch_add(1, Ordering::Relaxed);
}
count
});
{
let sender_clone = sender.clone();
let chunks: Vec<usize> = (0..num_chunks).collect();
chunks
.into_par_iter()
.try_for_each_init(
|| sender_clone.clone(),
|tx, chunk_id| -> Result<(), String> {
thread::sleep(Duration::from_micros(5));
tx.send(chunk_id).map_err(|e| e.to_string())
},
)
.expect("Producer should complete without deadlock");
}
drop(sender);
let received_count = consumer.join().expect("Consumer should complete");
assert_eq!(received_count, num_chunks, "All chunks should be received");
assert_eq!(
processed.load(Ordering::Relaxed),
num_chunks,
"All chunks should be processed"
);
println!(
"✓ Deadlock prevention test passed: {} chunks processed with capacity {}",
num_chunks, channel_capacity
);
}
#[test]
fn test_without_scope_demonstrates_issue() {
let channel_capacity = 2;
let (sender, receiver) = bounded::<usize>(channel_capacity);
let consumer = thread::spawn(move || {
receiver.into_iter().count()
});
let sender_clone = sender.clone();
drop(sender_clone);
drop(sender);
let count = consumer.join().unwrap();
assert_eq!(count, 0);
println!("✓ Demonstrated that sender clone lifetime matters");
}
#[test]
fn test_channel_capacity_stress() {
for capacity in [1, 2, 4, 8, 16] {
let num_chunks = capacity * 10;
let (sender, receiver) = bounded::<usize>(capacity);
let received = Arc::new(AtomicUsize::new(0));
let received_clone = Arc::clone(&received);
let consumer = thread::spawn(move || {
for _ in receiver.into_iter() {
received_clone.fetch_add(1, Ordering::Relaxed);
thread::sleep(Duration::from_micros(1));
}
});
{
let sender_clone = sender.clone();
let chunks: Vec<usize> = (0..num_chunks).collect();
chunks
.into_par_iter()
.try_for_each_init(
|| sender_clone.clone(),
|tx, chunk_id| -> Result<(), String> {
tx.send(chunk_id).map_err(|e| e.to_string())
},
)
.expect("Should not deadlock");
}
drop(sender);
consumer.join().expect("Consumer should complete");
assert_eq!(
received.load(Ordering::Relaxed),
num_chunks,
"Capacity {} should handle {} chunks",
capacity,
num_chunks
);
}
println!("✓ Stress test passed for all capacities");
}
}