#![allow(dead_code)]
#![allow(clippy::redundant_pub_crate)]
use core::sync::atomic::{AtomicU64, Ordering};
use core::time::Duration;
use std::time::Instant;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum FaultState {
Running,
Faulted {
reason: FaultReason,
since_ms: u32,
},
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum FaultReason {
BudgetExceeded {
took_ms: u32,
budget_ms: u32,
},
ExecutorFaulted,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum ExecutorFaultState {
Running,
Faulted {
reason: ExecutorFaultReason,
since_ms: u32,
},
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum ExecutorFaultReason {
IterationBudgetExceeded {
task_idx: u32,
took_ms: u32,
budget_ms: u32,
},
}
#[derive(Debug, Default)]
pub(crate) struct FaultAtomic(AtomicU64);
impl FaultAtomic {
pub(crate) const fn new() -> Self {
Self(AtomicU64::new(0))
}
#[allow(clippy::cast_possible_truncation)]
pub(crate) fn pack(state: FaultState) -> u64 {
match state {
FaultState::Running => 0,
FaultState::Faulted { reason, since_ms } => {
let (disc, took_ms) = match reason {
FaultReason::BudgetExceeded { took_ms, .. } => (1_u64, took_ms),
FaultReason::ExecutorFaulted => (2_u64, 0_u32),
};
(disc << 62) | ((u64::from(took_ms) & 0x3FFF_FFFF) << 32) | u64::from(since_ms)
}
}
}
#[allow(clippy::cast_possible_truncation)]
pub(crate) const fn unpack(packed: u64, budget_ms: u32) -> FaultState {
let disc = (packed >> 62) & 0x3;
let took_ms = ((packed >> 32) & 0x3FFF_FFFF) as u32;
let since_ms = (packed & 0xFFFF_FFFF) as u32;
match disc {
1 => FaultState::Faulted {
reason: FaultReason::BudgetExceeded { took_ms, budget_ms },
since_ms,
},
2 => FaultState::Faulted {
reason: FaultReason::ExecutorFaulted,
since_ms,
},
_ => FaultState::Running,
}
}
pub(crate) fn load(&self, budget_ms: u32) -> FaultState {
Self::unpack(self.0.load(Ordering::Acquire), budget_ms)
}
pub(crate) fn swap(&self, state: FaultState, budget_ms: u32) -> FaultState {
Self::unpack(self.0.swap(Self::pack(state), Ordering::AcqRel), budget_ms)
}
}
#[derive(Debug, Default)]
pub(crate) struct ExecutorFaultAtomic(AtomicU64);
impl ExecutorFaultAtomic {
pub(crate) const fn new() -> Self {
Self(AtomicU64::new(0))
}
#[allow(clippy::cast_possible_truncation)]
pub(crate) fn pack(state: ExecutorFaultState) -> u64 {
match state {
ExecutorFaultState::Running => 0,
ExecutorFaultState::Faulted { reason, since_ms } => {
let took_ms = match reason {
ExecutorFaultReason::IterationBudgetExceeded { took_ms, .. } => took_ms,
};
(1_u64 << 62) | ((u64::from(took_ms) & 0x3FFF_FFFF) << 32) | u64::from(since_ms)
}
}
}
#[allow(clippy::cast_possible_truncation)]
pub(crate) const fn unpack(packed: u64, task_idx: u32, budget_ms: u32) -> ExecutorFaultState {
let disc = (packed >> 62) & 0x3;
let took_ms = ((packed >> 32) & 0x3FFF_FFFF) as u32;
let since_ms = (packed & 0xFFFF_FFFF) as u32;
match disc {
1 => ExecutorFaultState::Faulted {
reason: ExecutorFaultReason::IterationBudgetExceeded {
task_idx,
took_ms,
budget_ms,
},
since_ms,
},
_ => ExecutorFaultState::Running,
}
}
pub(crate) fn load(&self, task_idx: u32, budget_ms: u32) -> ExecutorFaultState {
Self::unpack(self.0.load(Ordering::Acquire), task_idx, budget_ms)
}
pub(crate) fn swap(
&self,
state: ExecutorFaultState,
task_idx: u32,
budget_ms: u32,
) -> ExecutorFaultState {
Self::unpack(
self.0.swap(Self::pack(state), Ordering::AcqRel),
task_idx,
budget_ms,
)
}
}
#[allow(clippy::cast_possible_truncation)]
pub(crate) fn duration_to_ms_sat(d: Duration) -> u32 {
let ms = d.as_millis();
if ms > u128::from(u32::MAX) {
u32::MAX
} else {
ms as u32
}
}
pub(crate) fn instant_to_since_ms(at: Instant, start: Instant) -> u32 {
duration_to_ms_sat(at.saturating_duration_since(start))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn pack_unpack_running() {
let s = FaultState::Running;
let p = FaultAtomic::pack(s);
assert_eq!(p, 0);
assert_eq!(FaultAtomic::unpack(p, 100), FaultState::Running);
}
#[test]
fn pack_unpack_budget_exceeded_round_trip() {
let s = FaultState::Faulted {
reason: FaultReason::BudgetExceeded {
took_ms: 17,
budget_ms: 10,
},
since_ms: 1_234,
};
let p = FaultAtomic::pack(s);
let restored = FaultAtomic::unpack(p, 10);
assert_eq!(restored, s);
}
#[test]
fn pack_unpack_executor_faulted_round_trip() {
let s = FaultState::Faulted {
reason: FaultReason::ExecutorFaulted,
since_ms: u32::MAX,
};
let p = FaultAtomic::pack(s);
let restored = FaultAtomic::unpack(p, 999);
assert_eq!(restored, s);
}
#[test]
fn pack_unpack_saturates_took_ms() {
let s = FaultState::Faulted {
reason: FaultReason::BudgetExceeded {
took_ms: 0x3FFF_FFFF,
budget_ms: 5,
},
since_ms: 42,
};
let p = FaultAtomic::pack(s);
let restored = FaultAtomic::unpack(p, 5);
assert_eq!(restored, s);
}
#[test]
fn fault_atomic_swap_returns_previous() {
let fa = FaultAtomic::new();
assert_eq!(fa.load(0), FaultState::Running);
let prev = fa.swap(
FaultState::Faulted {
reason: FaultReason::BudgetExceeded {
took_ms: 5,
budget_ms: 3,
},
since_ms: 100,
},
3,
);
assert_eq!(prev, FaultState::Running);
let prev = fa.swap(FaultState::Running, 3);
assert!(matches!(
prev,
FaultState::Faulted {
reason: FaultReason::BudgetExceeded { .. },
..
}
));
}
#[test]
fn executor_fault_atomic_swap_returns_previous() {
let efa = ExecutorFaultAtomic::new();
assert_eq!(efa.load(0, 0), ExecutorFaultState::Running);
let prev = efa.swap(
ExecutorFaultState::Faulted {
reason: ExecutorFaultReason::IterationBudgetExceeded {
task_idx: 3,
took_ms: 20,
budget_ms: 10,
},
since_ms: 50,
},
3,
10,
);
assert_eq!(prev, ExecutorFaultState::Running);
}
#[test]
fn duration_helpers_saturate() {
assert_eq!(duration_to_ms_sat(Duration::from_millis(5)), 5);
let huge = Duration::from_secs(60 * 60 * 24 * 365 * 100); assert_eq!(duration_to_ms_sat(huge), u32::MAX);
}
}