use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone)]
pub struct DeadlockConfig {
pub timeout_secs: u64,
pub recover_enabled: bool,
}
impl Default for DeadlockConfig {
fn default() -> Self {
Self {
timeout_secs: 10, recover_enabled: false,
}
}
}
impl DeadlockConfig {
#[must_use]
pub fn new(timeout_secs: u64, recover_enabled: bool) -> Self {
Self { timeout_secs, recover_enabled }
}
#[must_use]
pub fn disabled() -> Self {
Self { timeout_secs: 0, recover_enabled: false }
}
#[must_use]
pub fn is_enabled(&self) -> bool {
self.timeout_secs > 0
}
}
#[derive(Debug)]
pub struct DeadlockState {
pub q1_last_push: AtomicU64,
pub q1_last_pop: AtomicU64,
pub q2_last_push: AtomicU64,
pub q2_last_pop: AtomicU64,
pub q2b_last_push: AtomicU64,
pub q2b_last_pop: AtomicU64,
pub q2_5_last_push: AtomicU64,
pub q2_5_last_pop: AtomicU64,
pub q3_last_push: AtomicU64,
pub q3_last_pop: AtomicU64,
pub q4_last_push: AtomicU64,
pub q4_last_pop: AtomicU64,
pub q5_last_push: AtomicU64,
pub q5_last_pop: AtomicU64,
pub q6_last_push: AtomicU64,
pub q6_last_pop: AtomicU64,
pub q7_last_push: AtomicU64,
pub q7_last_pop: AtomicU64,
pub last_progress_time: AtomicU64,
pub last_recovery_time: AtomicU64,
pub current_memory_limit: AtomicU64,
pub original_memory_limit: u64,
pub stable_memory_limit: AtomicU64,
pub deadlock_at_current_limit: AtomicBool,
pub timeout_secs: u64,
pub recover_enabled: bool,
}
impl DeadlockState {
#[must_use]
pub fn new(config: &DeadlockConfig, original_memory_limit: u64) -> Self {
let now = now_secs();
Self {
q1_last_push: AtomicU64::new(now),
q1_last_pop: AtomicU64::new(now),
q2_last_push: AtomicU64::new(now),
q2_last_pop: AtomicU64::new(now),
q2b_last_push: AtomicU64::new(now),
q2b_last_pop: AtomicU64::new(now),
q2_5_last_push: AtomicU64::new(now),
q2_5_last_pop: AtomicU64::new(now),
q3_last_push: AtomicU64::new(now),
q3_last_pop: AtomicU64::new(now),
q4_last_push: AtomicU64::new(now),
q4_last_pop: AtomicU64::new(now),
q5_last_push: AtomicU64::new(now),
q5_last_pop: AtomicU64::new(now),
q6_last_push: AtomicU64::new(now),
q6_last_pop: AtomicU64::new(now),
q7_last_push: AtomicU64::new(now),
q7_last_pop: AtomicU64::new(now),
last_progress_time: AtomicU64::new(now),
last_recovery_time: AtomicU64::new(0),
current_memory_limit: AtomicU64::new(original_memory_limit),
original_memory_limit,
stable_memory_limit: AtomicU64::new(0),
deadlock_at_current_limit: AtomicBool::new(false),
timeout_secs: config.timeout_secs,
recover_enabled: config.recover_enabled,
}
}
#[must_use]
pub fn disabled() -> Self {
Self::new(&DeadlockConfig::disabled(), 0)
}
#[must_use]
pub fn is_enabled(&self) -> bool {
self.timeout_secs > 0
}
#[must_use]
pub fn get_memory_limit(&self) -> u64 {
self.current_memory_limit.load(Ordering::Relaxed)
}
#[inline]
pub fn record_q1_push(&self) {
if self.is_enabled() {
let now = now_secs();
self.q1_last_push.store(now, Ordering::Relaxed);
self.last_progress_time.store(now, Ordering::Relaxed);
}
}
#[inline]
pub fn record_q1_pop(&self) {
if self.is_enabled() {
let now = now_secs();
self.q1_last_pop.store(now, Ordering::Relaxed);
self.last_progress_time.store(now, Ordering::Relaxed);
}
}
#[inline]
pub fn record_q2_push(&self) {
if self.is_enabled() {
let now = now_secs();
self.q2_last_push.store(now, Ordering::Relaxed);
self.last_progress_time.store(now, Ordering::Relaxed);
}
}
#[inline]
pub fn record_q2_pop(&self) {
if self.is_enabled() {
let now = now_secs();
self.q2_last_pop.store(now, Ordering::Relaxed);
self.last_progress_time.store(now, Ordering::Relaxed);
}
}
#[inline]
pub fn record_q2b_push(&self) {
if self.is_enabled() {
let now = now_secs();
self.q2b_last_push.store(now, Ordering::Relaxed);
self.last_progress_time.store(now, Ordering::Relaxed);
}
}
#[inline]
pub fn record_q2b_pop(&self) {
if self.is_enabled() {
let now = now_secs();
self.q2b_last_pop.store(now, Ordering::Relaxed);
self.last_progress_time.store(now, Ordering::Relaxed);
}
}
#[inline]
pub fn record_q2_5_push(&self) {
if self.is_enabled() {
let now = now_secs();
self.q2_5_last_push.store(now, Ordering::Relaxed);
self.last_progress_time.store(now, Ordering::Relaxed);
}
}
#[inline]
pub fn record_q2_5_pop(&self) {
if self.is_enabled() {
let now = now_secs();
self.q2_5_last_pop.store(now, Ordering::Relaxed);
self.last_progress_time.store(now, Ordering::Relaxed);
}
}
#[inline]
pub fn record_q3_push(&self) {
if self.is_enabled() {
let now = now_secs();
self.q3_last_push.store(now, Ordering::Relaxed);
self.last_progress_time.store(now, Ordering::Relaxed);
}
}
#[inline]
pub fn record_q3_pop(&self) {
if self.is_enabled() {
let now = now_secs();
self.q3_last_pop.store(now, Ordering::Relaxed);
self.last_progress_time.store(now, Ordering::Relaxed);
}
}
#[inline]
pub fn record_q4_push(&self) {
if self.is_enabled() {
let now = now_secs();
self.q4_last_push.store(now, Ordering::Relaxed);
self.last_progress_time.store(now, Ordering::Relaxed);
}
}
#[inline]
pub fn record_q4_pop(&self) {
if self.is_enabled() {
let now = now_secs();
self.q4_last_pop.store(now, Ordering::Relaxed);
self.last_progress_time.store(now, Ordering::Relaxed);
}
}
#[inline]
pub fn record_q5_push(&self) {
if self.is_enabled() {
let now = now_secs();
self.q5_last_push.store(now, Ordering::Relaxed);
self.last_progress_time.store(now, Ordering::Relaxed);
}
}
#[inline]
pub fn record_q5_pop(&self) {
if self.is_enabled() {
let now = now_secs();
self.q5_last_pop.store(now, Ordering::Relaxed);
self.last_progress_time.store(now, Ordering::Relaxed);
}
}
#[inline]
pub fn record_q6_push(&self) {
if self.is_enabled() {
let now = now_secs();
self.q6_last_push.store(now, Ordering::Relaxed);
self.last_progress_time.store(now, Ordering::Relaxed);
}
}
#[inline]
pub fn record_q6_pop(&self) {
if self.is_enabled() {
let now = now_secs();
self.q6_last_pop.store(now, Ordering::Relaxed);
self.last_progress_time.store(now, Ordering::Relaxed);
}
}
#[inline]
pub fn record_q7_push(&self) {
if self.is_enabled() {
let now = now_secs();
self.q7_last_push.store(now, Ordering::Relaxed);
self.last_progress_time.store(now, Ordering::Relaxed);
}
}
#[inline]
pub fn record_q7_pop(&self) {
if self.is_enabled() {
let now = now_secs();
self.q7_last_pop.store(now, Ordering::Relaxed);
self.last_progress_time.store(now, Ordering::Relaxed);
}
}
}
#[inline]
#[must_use]
pub fn now_secs() -> u64 {
SystemTime::now().duration_since(UNIX_EPOCH).map_or(0, |d| d.as_secs())
}
#[derive(Debug, Clone)]
pub struct QueueSnapshot {
pub q1_len: usize,
pub q2_len: usize,
pub q2b_len: usize,
pub q3_len: usize,
pub q4_len: usize,
pub q5_len: usize,
pub q6_len: usize,
pub q7_len: usize,
pub q2_reorder_mem: u64,
pub q3_reorder_mem: u64,
pub memory_limit: u64,
pub read_done: bool,
pub group_done: bool,
pub draining: bool,
pub extra_state: Option<String>,
}
pub fn check_deadlock_and_restore(
deadlock_state: &DeadlockState,
snapshot: &QueueSnapshot,
) -> DeadlockAction {
if !deadlock_state.is_enabled() {
return DeadlockAction::None;
}
let now = now_secs();
let last_progress = deadlock_state.last_progress_time.load(Ordering::Relaxed);
let timeout = deadlock_state.timeout_secs;
if now.saturating_sub(last_progress) >= timeout {
let has_in_flight_work = snapshot.q1_len > 0
|| snapshot.q2_len > 0
|| snapshot.q2b_len > 0
|| snapshot.q3_len > 0
|| snapshot.q4_len > 0
|| snapshot.q5_len > 0
|| snapshot.q6_len > 0
|| snapshot.q7_len > 0
|| snapshot.q2_reorder_mem > 0
|| snapshot.q3_reorder_mem > 0;
if !has_in_flight_work {
log::debug!(
"Deadlock detector: no queue activity for {}s but all queues \
are empty (read_done={}); treating as upstream starvation, \
not deadlock",
timeout,
snapshot.read_done
);
deadlock_state.last_progress_time.store(now, Ordering::Relaxed);
return DeadlockAction::None;
}
return handle_deadlock(deadlock_state, snapshot, now);
}
if deadlock_state.recover_enabled {
try_restore_limits(deadlock_state, now);
}
DeadlockAction::None
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DeadlockAction {
None,
Detected,
Recovered(u64),
}
fn handle_deadlock(
deadlock_state: &DeadlockState,
snapshot: &QueueSnapshot,
now: u64,
) -> DeadlockAction {
let current = deadlock_state.current_memory_limit.load(Ordering::Relaxed);
log::warn!("DEADLOCK DETECTED: No progress for {}s", deadlock_state.timeout_secs);
log_queue_state(deadlock_state, snapshot);
let action = if deadlock_state.recover_enabled {
deadlock_state.deadlock_at_current_limit.store(true, Ordering::Relaxed);
if current > 0 {
let new_stable = current.saturating_mul(2);
let old_stable = deadlock_state.stable_memory_limit.load(Ordering::Relaxed);
if new_stable > old_stable {
deadlock_state.stable_memory_limit.store(new_stable, Ordering::Relaxed);
log::warn!(" Stable limit updated: {old_stable} -> {new_stable} bytes");
}
}
let new_limit = if current == 0 {
0 } else {
current.saturating_mul(2)
};
let final_limit = if new_limit == 0
|| new_limit > 8u64.saturating_mul(deadlock_state.original_memory_limit)
{
log::warn!(" Recovery: Unbinding limits (unlimited)");
0
} else {
log::warn!(" Recovery: {current} -> {new_limit} bytes (2x)");
new_limit
};
deadlock_state.current_memory_limit.store(final_limit, Ordering::SeqCst);
deadlock_state.last_recovery_time.store(now, Ordering::Relaxed);
DeadlockAction::Recovered(final_limit)
} else {
log::warn!(" Recovery disabled - use --deadlock-recover to enable automatic recovery");
DeadlockAction::Detected
};
deadlock_state.last_progress_time.store(now, Ordering::Relaxed);
action
}
fn try_restore_limits(deadlock_state: &DeadlockState, now: u64) {
let last_recovery = deadlock_state.last_recovery_time.load(Ordering::Relaxed);
if last_recovery == 0 || now.saturating_sub(last_recovery) < 30 {
return;
}
let current = deadlock_state.current_memory_limit.load(Ordering::Relaxed);
let original = deadlock_state.original_memory_limit;
let stable = deadlock_state.stable_memory_limit.load(Ordering::Relaxed);
if current == 0 {
let target = (original.saturating_mul(8)).max(stable);
if target > 0 {
log::info!(" Restoring: unlimited -> {target} bytes");
deadlock_state.current_memory_limit.store(target, Ordering::SeqCst);
deadlock_state.last_recovery_time.store(now, Ordering::Relaxed);
deadlock_state.deadlock_at_current_limit.store(false, Ordering::Relaxed);
}
return;
}
if current <= original || (stable > 0 && current <= stable) {
return; }
let new_limit = (current / 2).max(original).max(stable);
if new_limit < current {
log::info!(" Restoring: {current} -> {new_limit} bytes (halving)");
deadlock_state.current_memory_limit.store(new_limit, Ordering::SeqCst);
deadlock_state.last_recovery_time.store(now, Ordering::Relaxed);
deadlock_state.deadlock_at_current_limit.store(false, Ordering::Relaxed);
}
}
#[allow(clippy::cast_precision_loss)]
fn log_queue_state(deadlock_state: &DeadlockState, snapshot: &QueueSnapshot) {
let now = now_secs();
log::warn!(
" Queue depths: Q1={} Q2={} Q2b={} Q3={} Q4={} Q5={} Q6={} Q7={}",
snapshot.q1_len,
snapshot.q2_len,
snapshot.q2b_len,
snapshot.q3_len,
snapshot.q4_len,
snapshot.q5_len,
snapshot.q6_len,
snapshot.q7_len,
);
log::warn!(
" Q1: push={}s ago, pop={}s ago",
now.saturating_sub(deadlock_state.q1_last_push.load(Ordering::Relaxed)),
now.saturating_sub(deadlock_state.q1_last_pop.load(Ordering::Relaxed))
);
log::warn!(
" Q2: push={}s ago, pop={}s ago",
now.saturating_sub(deadlock_state.q2_last_push.load(Ordering::Relaxed)),
now.saturating_sub(deadlock_state.q2_last_pop.load(Ordering::Relaxed))
);
log::warn!(
" Q2b: push={}s ago, pop={}s ago",
now.saturating_sub(deadlock_state.q2b_last_push.load(Ordering::Relaxed)),
now.saturating_sub(deadlock_state.q2b_last_pop.load(Ordering::Relaxed))
);
log::warn!(
" Q2.5: push={}s ago, pop={}s ago",
now.saturating_sub(deadlock_state.q2_5_last_push.load(Ordering::Relaxed)),
now.saturating_sub(deadlock_state.q2_5_last_pop.load(Ordering::Relaxed))
);
log::warn!(
" Q3: push={}s ago, pop={}s ago",
now.saturating_sub(deadlock_state.q3_last_push.load(Ordering::Relaxed)),
now.saturating_sub(deadlock_state.q3_last_pop.load(Ordering::Relaxed))
);
log::warn!(
" Q4: push={}s ago, pop={}s ago",
now.saturating_sub(deadlock_state.q4_last_push.load(Ordering::Relaxed)),
now.saturating_sub(deadlock_state.q4_last_pop.load(Ordering::Relaxed))
);
log::warn!(
" Q5: push={}s ago, pop={}s ago",
now.saturating_sub(deadlock_state.q5_last_push.load(Ordering::Relaxed)),
now.saturating_sub(deadlock_state.q5_last_pop.load(Ordering::Relaxed))
);
log::warn!(
" Q6: push={}s ago, pop={}s ago",
now.saturating_sub(deadlock_state.q6_last_push.load(Ordering::Relaxed)),
now.saturating_sub(deadlock_state.q6_last_pop.load(Ordering::Relaxed))
);
log::warn!(
" Q7: push={}s ago, pop={}s ago",
now.saturating_sub(deadlock_state.q7_last_push.load(Ordering::Relaxed)),
now.saturating_sub(deadlock_state.q7_last_pop.load(Ordering::Relaxed))
);
log::warn!(
" Memory: Q2 reorder={:.1}MB, Q3 reorder={:.1}MB, limit={:.1}MB",
snapshot.q2_reorder_mem as f64 / 1_000_000.0,
snapshot.q3_reorder_mem as f64 / 1_000_000.0,
snapshot.memory_limit as f64 / 1_000_000.0,
);
log::warn!(
" State: read_done={}, group_done={}, draining={}",
snapshot.read_done,
snapshot.group_done,
snapshot.draining,
);
if let Some(ref extra) = snapshot.extra_state {
log::warn!(" Extra: {extra}");
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_progress_timestamp_updates_on_push() {
let config = DeadlockConfig::new(10, false);
let state = DeadlockState::new(&config, 512 * 1024 * 1024);
let initial = state.last_progress_time.load(Ordering::Relaxed);
std::thread::sleep(std::time::Duration::from_millis(10));
state.record_q1_push();
let after_push = state.last_progress_time.load(Ordering::Relaxed);
assert!(after_push >= initial);
}
#[test]
fn test_progress_timestamp_updates_on_pop() {
let config = DeadlockConfig::new(10, false);
let state = DeadlockState::new(&config, 512 * 1024 * 1024);
let initial = state.last_progress_time.load(Ordering::Relaxed);
std::thread::sleep(std::time::Duration::from_millis(10));
state.record_q1_pop();
let after_pop = state.last_progress_time.load(Ordering::Relaxed);
assert!(after_pop >= initial);
}
#[test]
fn test_per_queue_timestamps_independent() {
let config = DeadlockConfig::new(10, false);
let state = DeadlockState::new(&config, 512 * 1024 * 1024);
state.record_q1_push();
let q1_time = state.q1_last_push.load(Ordering::Relaxed);
std::thread::sleep(std::time::Duration::from_millis(10));
state.record_q3_push();
let q3_time = state.q3_last_push.load(Ordering::Relaxed);
assert_eq!(state.q1_last_push.load(Ordering::Relaxed), q1_time);
assert!(q3_time >= q1_time);
}
#[test]
fn test_disabled_state_skips_tracking() {
let state = DeadlockState::disabled();
let initial = state.last_progress_time.load(Ordering::Relaxed);
state.record_q1_push();
let after = state.last_progress_time.load(Ordering::Relaxed);
assert_eq!(initial, after);
}
#[test]
fn test_deadlock_detected_after_timeout() {
let config = DeadlockConfig::new(1, false); let state = DeadlockState::new(&config, 512 * 1024 * 1024);
let now = now_secs();
state.last_progress_time.store(now.saturating_sub(5), Ordering::Relaxed);
let snapshot = QueueSnapshot {
q1_len: 0,
q2_len: 10,
q2b_len: 5,
q3_len: 10,
q4_len: 0,
q5_len: 0,
q6_len: 0,
q7_len: 0,
q2_reorder_mem: 100_000_000,
q3_reorder_mem: 200_000_000,
memory_limit: 512_000_000,
read_done: false,
group_done: false,
draining: false,
extra_state: None,
};
let action = check_deadlock_and_restore(&state, &snapshot);
assert_eq!(action, DeadlockAction::Detected);
}
#[test]
fn test_no_detection_before_timeout() {
let config = DeadlockConfig::new(10, false); let state = DeadlockState::new(&config, 512 * 1024 * 1024);
let snapshot = QueueSnapshot {
q1_len: 0,
q2_len: 0,
q2b_len: 0,
q3_len: 0,
q4_len: 0,
q5_len: 0,
q6_len: 0,
q7_len: 0,
q2_reorder_mem: 0,
q3_reorder_mem: 0,
memory_limit: 0,
read_done: false,
group_done: false,
draining: false,
extra_state: None,
};
let action = check_deadlock_and_restore(&state, &snapshot);
assert_eq!(action, DeadlockAction::None);
}
#[test]
fn test_detection_disabled_returns_none() {
let state = DeadlockState::disabled();
let now = now_secs();
state.last_progress_time.store(now.saturating_sub(100), Ordering::Relaxed);
let snapshot = QueueSnapshot {
q1_len: 0,
q2_len: 0,
q2b_len: 0,
q3_len: 0,
q4_len: 0,
q5_len: 0,
q6_len: 0,
q7_len: 0,
q2_reorder_mem: 0,
q3_reorder_mem: 0,
memory_limit: 0,
read_done: false,
group_done: false,
draining: false,
extra_state: None,
};
let action = check_deadlock_and_restore(&state, &snapshot);
assert_eq!(action, DeadlockAction::None);
}
#[test]
fn test_recovery_doubles_limit() {
let config = DeadlockConfig::new(1, true); let original_limit = 512 * 1024 * 1024; let state = DeadlockState::new(&config, original_limit);
let now = now_secs();
state.last_progress_time.store(now.saturating_sub(5), Ordering::Relaxed);
let snapshot = QueueSnapshot {
q1_len: 0,
q2_len: 10,
q2b_len: 5,
q3_len: 10,
q4_len: 0,
q5_len: 0,
q6_len: 0,
q7_len: 0,
q2_reorder_mem: 400_000_000,
q3_reorder_mem: 500_000_000,
memory_limit: original_limit,
read_done: false,
group_done: false,
draining: false,
extra_state: None,
};
let action = check_deadlock_and_restore(&state, &snapshot);
let DeadlockAction::Recovered(new_limit) = action else {
unreachable!("Expected Recovered action, got {action:?}");
};
assert_eq!(new_limit, original_limit * 2);
}
#[test]
fn test_recovery_caps_at_8x() {
let config = DeadlockConfig::new(1, true);
let original_limit = 512 * 1024 * 1024; let state = DeadlockState::new(&config, original_limit);
state.current_memory_limit.store(original_limit * 8, Ordering::Relaxed);
let now = now_secs();
state.last_progress_time.store(now.saturating_sub(5), Ordering::Relaxed);
let snapshot = QueueSnapshot {
q1_len: 0,
q2_len: 10,
q2b_len: 5,
q3_len: 10,
q4_len: 0,
q5_len: 0,
q6_len: 0,
q7_len: 0,
q2_reorder_mem: 0,
q3_reorder_mem: 0,
memory_limit: original_limit * 8,
read_done: false,
group_done: false,
draining: false,
extra_state: None,
};
let action = check_deadlock_and_restore(&state, &snapshot);
let DeadlockAction::Recovered(new_limit) = action else {
unreachable!("Expected Recovered action, got {action:?}");
};
assert_eq!(new_limit, 0);
}
#[test]
fn test_recovery_updates_stable_limit() {
let config = DeadlockConfig::new(1, true);
let original_limit = 512 * 1024 * 1024;
let state = DeadlockState::new(&config, original_limit);
assert_eq!(state.stable_memory_limit.load(Ordering::Relaxed), 0);
let now = now_secs();
state.last_progress_time.store(now.saturating_sub(5), Ordering::Relaxed);
let snapshot = QueueSnapshot {
q1_len: 0,
q2_len: 0,
q2b_len: 0,
q3_len: 1,
q4_len: 0,
q5_len: 0,
q6_len: 0,
q7_len: 0,
q2_reorder_mem: 0,
q3_reorder_mem: 0,
memory_limit: original_limit,
read_done: false,
group_done: false,
draining: false,
extra_state: None,
};
check_deadlock_and_restore(&state, &snapshot);
let stable = state.stable_memory_limit.load(Ordering::Relaxed);
assert_eq!(stable, original_limit * 2);
}
#[test]
fn test_recovery_disabled_only_logs() {
let config = DeadlockConfig::new(1, false); let original_limit = 512 * 1024 * 1024;
let state = DeadlockState::new(&config, original_limit);
let now = now_secs();
state.last_progress_time.store(now.saturating_sub(5), Ordering::Relaxed);
let snapshot = QueueSnapshot {
q1_len: 0,
q2_len: 0,
q2b_len: 0,
q3_len: 1,
q4_len: 0,
q5_len: 0,
q6_len: 0,
q7_len: 0,
q2_reorder_mem: 0,
q3_reorder_mem: 0,
memory_limit: original_limit,
read_done: false,
group_done: false,
draining: false,
extra_state: None,
};
let action = check_deadlock_and_restore(&state, &snapshot);
assert_eq!(action, DeadlockAction::Detected);
assert_eq!(state.current_memory_limit.load(Ordering::Relaxed), original_limit);
}
#[test]
fn test_restoration_after_30s_progress() {
let config = DeadlockConfig::new(10, true);
let original_limit = 512 * 1024 * 1024;
let state = DeadlockState::new(&config, original_limit);
state.current_memory_limit.store(original_limit * 2, Ordering::SeqCst);
let now = now_secs();
state.last_recovery_time.store(now.saturating_sub(35), Ordering::Relaxed);
try_restore_limits(&state, now);
let new_limit = state.current_memory_limit.load(Ordering::Relaxed);
assert_eq!(new_limit, original_limit);
}
#[test]
fn test_no_restoration_before_30s() {
let config = DeadlockConfig::new(10, true);
let original_limit = 512 * 1024 * 1024;
let state = DeadlockState::new(&config, original_limit);
let elevated_limit = original_limit * 2;
state.current_memory_limit.store(elevated_limit, Ordering::SeqCst);
let now = now_secs();
state.last_recovery_time.store(now.saturating_sub(20), Ordering::Relaxed);
try_restore_limits(&state, now);
assert_eq!(state.current_memory_limit.load(Ordering::Relaxed), elevated_limit);
}
#[test]
fn test_restoration_respects_stable_limit() {
let config = DeadlockConfig::new(10, true);
let original_limit = 512 * 1024 * 1024;
let state = DeadlockState::new(&config, original_limit);
let stable_limit = 1024 * 1024 * 1024;
state.stable_memory_limit.store(stable_limit, Ordering::Relaxed);
let current_limit = 2 * 1024 * 1024 * 1024;
state.current_memory_limit.store(current_limit, Ordering::SeqCst);
let now = now_secs();
state.last_recovery_time.store(now.saturating_sub(35), Ordering::Relaxed);
try_restore_limits(&state, now);
let new_limit = state.current_memory_limit.load(Ordering::Relaxed);
assert_eq!(new_limit, stable_limit);
}
#[test]
fn test_restoration_from_unbounded() {
let config = DeadlockConfig::new(10, true);
let original_limit = 512 * 1024 * 1024;
let state = DeadlockState::new(&config, original_limit);
state.current_memory_limit.store(0, Ordering::SeqCst);
let now = now_secs();
state.last_recovery_time.store(now.saturating_sub(35), Ordering::Relaxed);
try_restore_limits(&state, now);
let new_limit = state.current_memory_limit.load(Ordering::Relaxed);
assert_eq!(new_limit, original_limit * 8);
}
#[test]
fn test_stable_limit_never_decreases() {
let config = DeadlockConfig::new(1, true);
let original_limit = 512 * 1024 * 1024;
let state = DeadlockState::new(&config, original_limit);
let stable_limit = 1024 * 1024 * 1024;
state.stable_memory_limit.store(stable_limit, Ordering::Relaxed);
let now = now_secs();
state.last_progress_time.store(now.saturating_sub(5), Ordering::Relaxed);
state.current_memory_limit.store(256 * 1024 * 1024, Ordering::Relaxed);
let snapshot = QueueSnapshot {
q1_len: 0,
q2_len: 0,
q2b_len: 0,
q3_len: 1,
q4_len: 0,
q5_len: 0,
q6_len: 0,
q7_len: 0,
q2_reorder_mem: 0,
q3_reorder_mem: 0,
memory_limit: 256 * 1024 * 1024,
read_done: false,
group_done: false,
draining: false,
extra_state: None,
};
let action = check_deadlock_and_restore(&state, &snapshot);
assert!(matches!(action, DeadlockAction::Recovered(_)));
let current_stable = state.stable_memory_limit.load(Ordering::Relaxed);
assert_eq!(current_stable, stable_limit);
}
#[test]
fn test_config_default() {
let config = DeadlockConfig::default();
assert_eq!(config.timeout_secs, 10);
assert!(!config.recover_enabled);
assert!(config.is_enabled());
}
#[test]
fn test_config_disabled() {
let config = DeadlockConfig::disabled();
assert_eq!(config.timeout_secs, 0);
assert!(!config.recover_enabled);
assert!(!config.is_enabled());
}
#[test]
fn test_get_memory_limit() {
let config = DeadlockConfig::new(10, true);
let original_limit = 512 * 1024 * 1024;
let state = DeadlockState::new(&config, original_limit);
assert_eq!(state.get_memory_limit(), original_limit);
state.current_memory_limit.store(1024 * 1024 * 1024, Ordering::Relaxed);
assert_eq!(state.get_memory_limit(), 1024 * 1024 * 1024);
}
#[test]
fn test_no_deadlock_on_empty_queues_stale_timestamp() {
let config = DeadlockConfig::new(1, false);
let state = DeadlockState::new(&config, 0);
let now = now_secs();
state.last_progress_time.store(now.saturating_sub(5), Ordering::Relaxed);
let snapshot = QueueSnapshot {
q1_len: 0,
q2_len: 0,
q2b_len: 0,
q3_len: 0,
q4_len: 0,
q5_len: 0,
q6_len: 0,
q7_len: 0,
q2_reorder_mem: 0,
q3_reorder_mem: 0,
memory_limit: 0,
read_done: false,
group_done: false,
draining: false,
extra_state: None,
};
let before = state.last_progress_time.load(Ordering::Relaxed);
let action = check_deadlock_and_restore(&state, &snapshot);
assert_eq!(
action,
DeadlockAction::None,
"empty queues + stale timestamp must be treated as starvation"
);
let after = state.last_progress_time.load(Ordering::Relaxed);
assert!(
after > before,
"empty-queue starvation path should refresh last_progress_time (before={before}, after={after})"
);
}
}