use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
fn simulate_execution_polling(
thread_handle: JoinHandle<Result<i64, &'static str>>,
max_execution_seconds: f64,
) -> Result<i64, TestError> {
let timeout = Duration::from_secs_f64(max_execution_seconds);
let start = Instant::now();
loop {
if thread_handle.is_finished() {
break;
}
if start.elapsed() >= timeout {
return Err(TestError::Timeout);
}
thread::sleep(Duration::from_millis(10));
}
thread_handle
.join()
.map_err(|_| TestError::Panic)?
.map_err(|_| TestError::ExecutionFailed)
}
#[derive(Debug, PartialEq)]
enum TestError {
Timeout,
Panic,
ExecutionFailed,
}
#[test]
fn test_timeout_triggers_correctly() {
let handle = thread::spawn(|| -> Result<i64, &'static str> {
loop {
thread::sleep(Duration::from_millis(100));
}
});
let start = Instant::now();
let result = simulate_execution_polling(handle, 0.1); let elapsed = start.elapsed();
assert_eq!(result, Err(TestError::Timeout));
assert!(
elapsed >= Duration::from_millis(100) && elapsed < Duration::from_millis(200),
"Elapsed: {:?}",
elapsed
);
}
#[test]
fn test_very_short_timeout() {
let handle = thread::spawn(|| -> Result<i64, &'static str> {
thread::sleep(Duration::from_secs(10));
Ok(42)
});
let result = simulate_execution_polling(handle, 0.02); assert_eq!(result, Err(TestError::Timeout));
}
#[test]
fn test_completion_just_before_timeout() {
let handle = thread::spawn(|| -> Result<i64, &'static str> {
thread::sleep(Duration::from_millis(40));
Ok(42)
});
let result = simulate_execution_polling(handle, 0.1); assert_eq!(result, Ok(42));
}
#[test]
fn test_immediate_completion() {
let handle = thread::spawn(|| -> Result<i64, &'static str> { Ok(123) });
let result = simulate_execution_polling(handle, 1.0);
assert_eq!(result, Ok(123));
}
#[test]
fn test_thread_panic_detected() {
let handle = thread::spawn(|| -> Result<i64, &'static str> { panic!("intentional panic") });
thread::sleep(Duration::from_millis(50));
let result = simulate_execution_polling(handle, 1.0);
assert_eq!(result, Err(TestError::Panic));
}
#[test]
fn test_thread_panic_with_string_message() {
let handle = thread::spawn(|| -> Result<i64, &'static str> {
panic!("panic with String message: {}", 42)
});
thread::sleep(Duration::from_millis(50));
let result = simulate_execution_polling(handle, 1.0);
assert_eq!(result, Err(TestError::Panic));
}
#[test]
fn test_thread_panic_with_custom_type() {
struct CustomPanic;
let handle = thread::spawn(|| -> Result<i64, &'static str> {
std::panic::panic_any(CustomPanic);
});
thread::sleep(Duration::from_millis(50));
let result = simulate_execution_polling(handle, 1.0);
assert_eq!(result, Err(TestError::Panic));
}
#[test]
fn test_multiple_concurrent_executions() {
let completed = Arc::new(AtomicU32::new(0));
let handles: Vec<_> = (0..4)
.map(|i| {
let completed = completed.clone();
thread::spawn(move || {
let handle = thread::spawn(move || -> Result<i64, &'static str> {
thread::sleep(Duration::from_millis(50 + i * 10));
Ok(i as i64)
});
let result = simulate_execution_polling(handle, 1.0);
if result.is_ok() {
completed.fetch_add(1, Ordering::SeqCst);
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
assert_eq!(completed.load(Ordering::SeqCst), 4);
}
#[test]
fn test_rapid_successive_executions() {
for i in 0..10 {
let handle = thread::spawn(move || -> Result<i64, &'static str> {
thread::sleep(Duration::from_millis(5));
Ok(i)
});
let result = simulate_execution_polling(handle, 0.5);
assert_eq!(result, Ok(i));
}
}
#[test]
fn test_zero_timeout() {
let handle = thread::spawn(|| -> Result<i64, &'static str> { Ok(42) });
let result = simulate_execution_polling(handle, 0.0);
assert!(result == Err(TestError::Timeout) || result == Ok(42));
}
#[test]
fn test_thread_finishes_exactly_at_timeout_boundary() {
let finished = Arc::new(AtomicBool::new(false));
let finished_clone = finished.clone();
let handle = thread::spawn(move || -> Result<i64, &'static str> {
thread::sleep(Duration::from_millis(100));
finished_clone.store(true, Ordering::SeqCst);
Ok(42)
});
let result = simulate_execution_polling(handle, 0.1);
match result {
Ok(42) => assert!(finished.load(Ordering::SeqCst)),
Err(TestError::Timeout) => {
}
other => panic!("Unexpected result: {:?}", other),
}
}
#[test]
fn test_store_returned_on_success() {
let _store_returned = Arc::new(AtomicBool::new(false));
let handle = thread::spawn(move || -> (Result<i64, &'static str>, bool) {
thread::sleep(Duration::from_millis(10));
(Ok(42), true) });
let timeout = Duration::from_millis(500);
let start = Instant::now();
loop {
if handle.is_finished() {
break;
}
if start.elapsed() >= timeout {
panic!("Should not timeout");
}
thread::sleep(Duration::from_millis(10));
}
let (result, store) = handle.join().unwrap();
assert!(store); assert_eq!(result, Ok(42));
}
#[test]
fn test_store_lost_on_timeout() {
let store_lost = Arc::new(AtomicBool::new(true));
let store_lost_clone = store_lost.clone();
let handle = thread::spawn(move || -> (Result<i64, &'static str>, bool) {
thread::sleep(Duration::from_secs(10)); store_lost_clone.store(false, Ordering::SeqCst);
(Ok(42), true)
});
let timeout = Duration::from_millis(50);
let start = Instant::now();
loop {
if handle.is_finished() {
break;
}
if start.elapsed() >= timeout {
break;
}
thread::sleep(Duration::from_millis(10));
}
assert!(store_lost.load(Ordering::SeqCst));
}
#[test]
fn test_polling_does_not_spin() {
let poll_count = Arc::new(AtomicU32::new(0));
let poll_count_clone = poll_count.clone();
let handle = thread::spawn(move || -> Result<i64, &'static str> {
thread::sleep(Duration::from_millis(100));
Ok(42)
});
let timeout = Duration::from_millis(500);
let start = Instant::now();
loop {
poll_count_clone.fetch_add(1, Ordering::SeqCst);
if handle.is_finished() {
break;
}
if start.elapsed() >= timeout {
panic!("Should not timeout");
}
thread::sleep(Duration::from_millis(10)); }
let _join = handle.join();
let polls = poll_count.load(Ordering::SeqCst);
assert!(
(5..=20).contains(&polls),
"Expected 5-20 polls, got {}",
polls
);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_block_in_place_from_async_context() {
let handle = tokio::runtime::Handle::current();
let task_handle = tokio::task::spawn_blocking(|| {
thread::sleep(Duration::from_millis(20));
42i64
});
let result = tokio::task::block_in_place(|| handle.block_on(task_handle));
assert_eq!(result.unwrap(), 42);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_block_in_place_with_async_task() {
let handle = tokio::runtime::Handle::current();
let task_handle = tokio::task::spawn_blocking(|| {
let mut sum = 0i64;
for i in 0..1000 {
sum += i;
}
sum
});
let result = tokio::task::block_in_place(|| handle.block_on(task_handle));
assert_eq!(result.unwrap(), 499500);
}
#[test]
fn test_orphaned_threads_eventually_complete() {
let completed = Arc::new(AtomicU32::new(0));
let handles: Vec<_> = (0..3)
.map(|i| {
let completed = completed.clone();
thread::spawn(move || -> Result<i64, &'static str> {
thread::sleep(Duration::from_millis(200 + i * 100));
completed.fetch_add(1, Ordering::SeqCst);
Ok(i as i64)
})
})
.collect();
for handle in handles {
let _result = simulate_execution_polling(handle, 0.05);
}
thread::sleep(Duration::from_millis(600));
assert_eq!(completed.load(Ordering::SeqCst), 3);
}