#![allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
use futures::FutureExt;
use std::sync::{
Arc,
atomic::{AtomicBool, AtomicU32, Ordering},
};
use std::time::Duration;
use tokio::time::timeout;
#[tokio::test]
async fn test_panic_recovery_in_iteration() {
let panic_occurred = Arc::new(AtomicBool::new(false));
let iteration_count = Arc::new(AtomicU32::new(0));
let panic_clone = panic_occurred.clone();
let count_clone = iteration_count.clone();
let task = tokio::spawn(async move {
for i in 0..5 {
let result = std::panic::AssertUnwindSafe(async {
count_clone.fetch_add(1, Ordering::SeqCst);
if i == 2 {
panic_clone.store(true, Ordering::SeqCst);
panic!("Simulated panic in iteration");
}
tokio::time::sleep(Duration::from_millis(10)).await;
})
.catch_unwind()
.await;
if result.is_err() {
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
});
let _ = timeout(Duration::from_secs(2), task).await;
assert!(
panic_occurred.load(Ordering::SeqCst),
"Panic should have occurred"
);
assert_eq!(
iteration_count.load(Ordering::SeqCst),
5,
"Should complete all 5 iterations despite panic"
);
}
#[tokio::test]
async fn test_multiple_panics_recovery() {
let panic_count = Arc::new(AtomicU32::new(0));
let iteration_count = Arc::new(AtomicU32::new(0));
let panic_clone = panic_count.clone();
let count_clone = iteration_count.clone();
let task = tokio::spawn(async move {
for i in 0..10 {
let result = std::panic::AssertUnwindSafe(async {
count_clone.fetch_add(1, Ordering::SeqCst);
if [2, 3, 4, 7].contains(&i) {
panic_clone.fetch_add(1, Ordering::SeqCst);
panic!("Simulated panic #{i}");
}
tokio::time::sleep(Duration::from_millis(5)).await;
})
.catch_unwind()
.await;
if result.is_err() {
tokio::time::sleep(Duration::from_millis(5)).await;
}
}
});
let _ = timeout(Duration::from_secs(2), task).await;
assert_eq!(
panic_count.load(Ordering::SeqCst),
4,
"Should have 4 panics"
);
assert_eq!(
iteration_count.load(Ordering::SeqCst),
10,
"Should complete all iterations"
);
}
#[tokio::test]
async fn test_joinhandle_detects_task_exit() {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<()>();
let task = tokio::spawn(async move {
for _ in 0..3 {
tokio::time::sleep(Duration::from_millis(10)).await;
}
let _ = tx.send(());
});
tokio::select! {
result = task => {
match result {
Ok(()) => {
}
Err(e) => {
panic!("Task panicked: {e}");
}
}
}
_ = rx.recv() => {
}
}
}
#[tokio::test]
async fn test_joinhandle_detects_panic() {
let task = tokio::spawn(async {
tokio::time::sleep(Duration::from_millis(10)).await;
panic!("Simulated task panic");
});
let result = task.await;
assert!(result.is_err(), "Should detect panic in spawned task");
let e = result.unwrap_err();
assert!(e.is_panic(), "Error should be a panic");
}
#[tokio::test]
async fn test_graceful_shutdown_on_unsupported_driver() {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<()>();
let task = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(10)).await;
let _ = tx.send(());
});
let result = timeout(Duration::from_secs(1), rx.recv()).await;
assert!(result.is_ok(), "Should receive shutdown signal");
assert!(
result.unwrap().is_some(),
"Shutdown signal should be received"
);
let _ = timeout(Duration::from_millis(100), task).await;
}
#[tokio::test]
async fn test_concurrent_panic_and_success() {
let success_count = Arc::new(AtomicU32::new(0));
let panic_count = Arc::new(AtomicU32::new(0));
let success_clone = success_count.clone();
let panic_clone = panic_count.clone();
let task = tokio::spawn(async move {
for i in 0..20 {
let result = std::panic::AssertUnwindSafe(async {
tokio::time::sleep(Duration::from_millis(1)).await;
if i % 2 == 0 {
panic_clone.fetch_add(1, Ordering::SeqCst);
panic!("Even iteration panic");
} else {
success_clone.fetch_add(1, Ordering::SeqCst);
}
})
.catch_unwind()
.await;
if result.is_err() {
tokio::time::sleep(Duration::from_millis(1)).await;
}
}
});
let _ = timeout(Duration::from_secs(2), task).await;
assert_eq!(
success_count.load(Ordering::SeqCst),
10,
"Should have 10 successes"
);
assert_eq!(
panic_count.load(Ordering::SeqCst),
10,
"Should have 10 panics"
);
}
#[tokio::test]
async fn test_panic_in_async_context() {
let panic_caught = Arc::new(AtomicBool::new(false));
let caught_clone = panic_caught.clone();
let task = tokio::spawn(async move {
let result = std::panic::AssertUnwindSafe(async {
let nested = async {
tokio::time::sleep(Duration::from_millis(5)).await;
panic!("Nested async panic");
};
nested.await
})
.catch_unwind()
.await;
if result.is_err() {
caught_clone.store(true, Ordering::SeqCst);
}
});
let _ = timeout(Duration::from_secs(1), task).await;
assert!(
panic_caught.load(Ordering::SeqCst),
"Should catch nested async panic"
);
}
#[tokio::test]
async fn test_select_race_condition() {
let (_shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
let monitor = tokio::spawn(async {
tokio::time::sleep(Duration::from_millis(50)).await;
});
let server_running = Arc::new(AtomicBool::new(true));
let server_clone = server_running.clone();
let result = tokio::select! {
_result = monitor => {
"monitor_exited"
}
_ = shutdown_rx.recv() => {
"shutdown_received"
}
() = tokio::time::sleep(Duration::from_secs(1)) => {
server_clone.store(false, Ordering::SeqCst);
"timeout"
}
};
assert_eq!(result, "monitor_exited", "Monitor task should exit first");
}
#[tokio::test]
async fn test_stress_rapid_iterations() {
let total_iterations = Arc::new(AtomicU32::new(0));
let panic_recoveries = Arc::new(AtomicU32::new(0));
let total_clone = total_iterations.clone();
let panic_clone = panic_recoveries.clone();
let task = tokio::spawn(async move {
for i in 0..1000 {
let result = std::panic::AssertUnwindSafe(async {
total_clone.fetch_add(1, Ordering::SeqCst);
assert!(!(i % 100 == 0 && i > 0), "Stress test panic");
})
.catch_unwind()
.await;
if result.is_err() {
panic_clone.fetch_add(1, Ordering::SeqCst);
}
}
});
let result = timeout(Duration::from_secs(5), task).await;
assert!(result.is_ok(), "Stress test should complete");
assert_eq!(
total_iterations.load(Ordering::SeqCst),
1000,
"All iterations should run"
);
assert_eq!(
panic_recoveries.load(Ordering::SeqCst),
9,
"Should recover from 9 panics"
);
}
#[tokio::test]
async fn test_panic_with_state_corruption() {
let state = Arc::new(AtomicU32::new(0));
let state_clone = state.clone();
let task = tokio::spawn(async move {
for i in 0..10 {
let result = std::panic::AssertUnwindSafe(async {
let current = state_clone.load(Ordering::SeqCst);
state_clone.store(current + 1, Ordering::SeqCst);
assert!(i != 5, "State corruption test");
tokio::time::sleep(Duration::from_millis(5)).await;
})
.catch_unwind()
.await;
if result.is_err() {
let current = state_clone.load(Ordering::SeqCst);
assert_eq!(current, 6, "State should be 6 after panic on iteration 5");
}
}
});
let _ = timeout(Duration::from_secs(2), task).await;
assert_eq!(
state.load(Ordering::SeqCst),
10,
"State should survive panics"
);
}
#[tokio::test]
async fn test_timeout_on_stuck_iteration() {
let completed = Arc::new(AtomicBool::new(false));
let completed_clone = completed.clone();
let task = tokio::spawn(async move {
let result = timeout(Duration::from_millis(100), async {
tokio::time::sleep(Duration::from_secs(10)).await;
})
.await;
if result.is_err() {
completed_clone.store(true, Ordering::SeqCst);
}
});
let _ = timeout(Duration::from_secs(1), task).await;
assert!(
completed.load(Ordering::SeqCst),
"Should detect and handle timeout"
);
}
#[tokio::test]
async fn test_shutdown_signal_propagation() {
let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
let monitor_running = Arc::new(AtomicBool::new(true));
let server_running = Arc::new(AtomicBool::new(true));
let monitor_clone = monitor_running.clone();
let server_clone = server_running.clone();
let monitor_task = tokio::spawn(async move {
tokio::select! {
() = tokio::time::sleep(Duration::from_secs(10)) => {}
() = async {
while monitor_clone.load(Ordering::SeqCst) {
tokio::time::sleep(Duration::from_millis(10)).await;
}
} => {}
}
});
let server_task = tokio::spawn(async move {
shutdown_rx.recv().await;
server_clone.store(false, Ordering::SeqCst);
});
tokio::time::sleep(Duration::from_millis(50)).await;
monitor_running.store(false, Ordering::SeqCst);
let _ = shutdown_tx.send(());
let monitor_result = timeout(Duration::from_millis(500), monitor_task).await;
let server_result = timeout(Duration::from_millis(500), server_task).await;
assert!(monitor_result.is_ok(), "Monitor should shutdown");
assert!(server_result.is_ok(), "Server should shutdown");
assert!(
!server_running.load(Ordering::SeqCst),
"Server should be stopped"
);
}