use parking_lot::Mutex;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use crate::channel::mpsc::{SendError, Sender};
use crate::cx::Cx;
use crate::evidence_sink::EvidenceSink;
use crate::lab::chaos::ChaosRng;
use franken_evidence::EvidenceLedger;
#[derive(Debug, Clone)]
pub struct CrashConfig {
pub crash_probability: f64,
pub crash_after_sends: Option<u64>,
pub max_restarts: Option<u32>,
pub restart_mode: RestartMode,
pub seed: u64,
}
impl CrashConfig {
#[must_use]
pub const fn new(seed: u64) -> Self {
Self {
crash_probability: 0.0,
crash_after_sends: None,
max_restarts: None,
restart_mode: RestartMode::Cold,
seed,
}
}
#[must_use]
pub fn with_crash_probability(mut self, probability: f64) -> Self {
assert!(
(0.0..=1.0).contains(&probability),
"crash probability must be in [0.0, 1.0], got {probability}"
);
self.crash_probability = probability;
self
}
#[must_use]
pub const fn with_crash_after_sends(mut self, count: u64) -> Self {
self.crash_after_sends = Some(count);
self
}
#[must_use]
pub const fn with_max_restarts(mut self, max: u32) -> Self {
self.max_restarts = Some(max);
self
}
#[must_use]
pub const fn with_restart_mode(mut self, mode: RestartMode) -> Self {
self.restart_mode = mode;
self
}
#[must_use]
pub fn is_enabled(&self) -> bool {
self.crash_probability > 0.0 || self.crash_after_sends.is_some()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RestartMode {
Cold,
Warm,
}
#[derive(Debug)]
pub struct CrashStats {
pub sends_attempted: AtomicU64,
pub sends_succeeded: AtomicU64,
pub sends_rejected: AtomicU64,
pub crashes: AtomicU64,
pub restarts: AtomicU64,
}
impl CrashStats {
fn new() -> Self {
Self {
sends_attempted: AtomicU64::new(0),
sends_succeeded: AtomicU64::new(0),
sends_rejected: AtomicU64::new(0),
crashes: AtomicU64::new(0),
restarts: AtomicU64::new(0),
}
}
#[must_use]
pub fn snapshot(&self) -> CrashStatsSnapshot {
CrashStatsSnapshot {
sends_attempted: self.sends_attempted.load(Ordering::Relaxed),
sends_succeeded: self.sends_succeeded.load(Ordering::Relaxed),
sends_rejected: self.sends_rejected.load(Ordering::Relaxed),
crashes: self.crashes.load(Ordering::Relaxed),
restarts: self.restarts.load(Ordering::Relaxed),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CrashStatsSnapshot {
pub sends_attempted: u64,
pub sends_succeeded: u64,
pub sends_rejected: u64,
pub crashes: u64,
pub restarts: u64,
}
impl std::fmt::Display for CrashStatsSnapshot {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"CrashStats {{ attempted: {}, succeeded: {}, rejected: {}, crashes: {}, restarts: {} }}",
self.sends_attempted,
self.sends_succeeded,
self.sends_rejected,
self.crashes,
self.restarts,
)
}
}
pub struct CrashController {
state: Mutex<CrashState>,
stats: CrashStats,
evidence_sink: Arc<dyn EvidenceSink>,
evidence_seq: AtomicU64,
crashed: AtomicBool,
exhausted: AtomicBool,
restart_mode: RestartMode,
}
struct CrashState {
crashed: bool,
exhausted: bool,
crash_count: u32,
restart_count: u32,
max_restarts: Option<u32>,
}
impl std::fmt::Debug for CrashController {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let state = self.state.lock();
f.debug_struct("CrashController")
.field("crashed", &state.crashed)
.field("exhausted", &state.exhausted)
.field("crash_count", &state.crash_count)
.field("restart_count", &state.restart_count)
.finish_non_exhaustive()
}
}
impl CrashController {
#[must_use]
pub fn new(config: &CrashConfig, evidence_sink: Arc<dyn EvidenceSink>) -> Self {
Self {
state: Mutex::new(CrashState {
crashed: false,
exhausted: false,
crash_count: 0,
restart_count: 0,
max_restarts: config.max_restarts,
}),
stats: CrashStats::new(),
evidence_sink,
evidence_seq: AtomicU64::new(0),
crashed: AtomicBool::new(false),
exhausted: AtomicBool::new(false),
restart_mode: config.restart_mode,
}
}
pub fn crash(&self) -> bool {
let mut state = self.state.lock();
if state.crashed || state.exhausted {
return false;
}
state.crashed = true;
self.crashed.store(true, Ordering::Release);
state.crash_count += 1;
self.stats.crashes.fetch_add(1, Ordering::Relaxed);
emit_crash_evidence(
&self.evidence_sink,
self.next_evidence_ts(),
"crash",
state.crash_count,
);
true
}
pub fn restart(&self) -> bool {
let mut state = self.state.lock();
if !state.crashed || state.exhausted {
return false;
}
if let Some(max) = state.max_restarts {
if state.restart_count >= max {
state.exhausted = true;
self.exhausted.store(true, Ordering::Release);
emit_crash_evidence(
&self.evidence_sink,
self.next_evidence_ts(),
"restart_exhausted",
state.restart_count,
);
return false;
}
}
state.crashed = false;
self.crashed.store(false, Ordering::Release);
state.restart_count += 1;
self.stats.restarts.fetch_add(1, Ordering::Relaxed);
emit_crash_evidence(
&self.evidence_sink,
self.next_evidence_ts(),
"restart",
state.restart_count,
);
true
}
#[must_use]
pub fn is_crashed(&self) -> bool {
self.crashed.load(Ordering::Acquire)
}
#[must_use]
pub fn is_exhausted(&self) -> bool {
self.exhausted.load(Ordering::Acquire)
}
#[must_use]
pub fn restart_mode(&self) -> RestartMode {
self.restart_mode
}
#[must_use]
pub fn stats(&self) -> &CrashStats {
&self.stats
}
fn next_evidence_ts(&self) -> u64 {
self.evidence_seq
.fetch_add(1, Ordering::Relaxed)
.saturating_add(1)
}
}
pub struct CrashSender<T> {
inner: Sender<T>,
controller: Arc<CrashController>,
config: CrashConfig,
rng: Mutex<ChaosRng>,
send_count: AtomicU64,
evidence_sink: Arc<dyn EvidenceSink>,
}
impl<T: std::fmt::Debug> std::fmt::Debug for CrashSender<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CrashSender")
.field("config", &self.config)
.field("controller", &self.controller)
.finish_non_exhaustive()
}
}
impl<T> CrashSender<T> {
#[must_use]
pub fn new(
sender: Sender<T>,
controller: Arc<CrashController>,
config: CrashConfig,
evidence_sink: Arc<dyn EvidenceSink>,
) -> Self {
let rng = ChaosRng::new(config.seed);
Self {
inner: sender,
controller,
config,
rng: Mutex::new(rng),
send_count: AtomicU64::new(0),
evidence_sink,
}
}
pub async fn send(&self, cx: &Cx, value: T) -> Result<(), SendError<T>> {
self.controller
.stats
.sends_attempted
.fetch_add(1, Ordering::Relaxed);
if self.controller.is_crashed() {
self.controller
.stats
.sends_rejected
.fetch_add(1, Ordering::Relaxed);
emit_crash_evidence(
&self.evidence_sink,
self.controller.next_evidence_ts(),
"send_rejected_crashed",
0,
);
return Err(SendError::Disconnected(value));
}
if let Some(limit) = self.config.crash_after_sends {
let count = self.send_count.load(Ordering::Relaxed);
if count >= limit {
let actually_crashed = self.controller.crash();
self.controller
.stats
.sends_rejected
.fetch_add(1, Ordering::Relaxed);
let action = if actually_crashed {
"crash_after_sends"
} else {
"send_rejected_crashed"
};
emit_crash_evidence(
&self.evidence_sink,
self.controller.next_evidence_ts(),
action,
0,
);
return Err(SendError::Disconnected(value));
}
}
if self.config.crash_probability > 0.0 {
let should_crash = {
let mut rng = self.rng.lock();
rng.should_inject(self.config.crash_probability)
};
if should_crash {
let actually_crashed = self.controller.crash();
self.controller
.stats
.sends_rejected
.fetch_add(1, Ordering::Relaxed);
let action = if actually_crashed {
"crash_probabilistic"
} else {
"send_rejected_crashed"
};
emit_crash_evidence(
&self.evidence_sink,
self.controller.next_evidence_ts(),
action,
0,
);
return Err(SendError::Disconnected(value));
}
}
self.inner.send(cx, value).await?;
self.send_count.fetch_add(1, Ordering::Relaxed);
self.controller
.stats
.sends_succeeded
.fetch_add(1, Ordering::Relaxed);
Ok(())
}
#[must_use]
pub fn inner(&self) -> &Sender<T> {
&self.inner
}
#[must_use]
pub fn controller(&self) -> &Arc<CrashController> {
&self.controller
}
#[must_use]
pub fn send_count(&self) -> u64 {
self.send_count.load(Ordering::Relaxed)
}
pub fn reset_send_count(&self) {
self.send_count.store(0, Ordering::Relaxed);
}
}
#[must_use]
pub fn crash_channel<T>(
capacity: usize,
config: CrashConfig,
evidence_sink: Arc<dyn EvidenceSink>,
) -> (
CrashSender<T>,
crate::channel::mpsc::Receiver<T>,
Arc<CrashController>,
) {
let (tx, rx) = crate::channel::mpsc::channel(capacity);
let controller = Arc::new(CrashController::new(&config, evidence_sink.clone()));
let crash_tx = CrashSender::new(tx, controller.clone(), config, evidence_sink);
(crash_tx, rx, controller)
}
fn emit_crash_evidence(sink: &Arc<dyn EvidenceSink>, ts_unix_ms: u64, action: &str, count: u32) {
let action_str = format!("inject_{action}");
let entry = EvidenceLedger {
ts_unix_ms,
component: "channel_crash".to_string(),
expected_loss_by_action: std::collections::BTreeMap::from([(action_str.clone(), 0.0)]),
action: action_str,
posterior: vec![1.0],
chosen_expected_loss: 0.0,
calibration_score: 1.0,
fallback_active: false,
top_features: vec![("count".to_string(), f64::from(count))],
};
sink.emit(&entry);
}
#[cfg(test)]
mod tests {
use super::*;
use crate::channel::mpsc;
use crate::cx::Cx;
use crate::evidence_sink::CollectorSink;
use crate::types::Budget;
use crate::util::ArenaIndex;
use crate::{RegionId, TaskId};
use std::future::Future;
use std::sync::Arc;
use std::task::{Context, Poll};
fn test_cx() -> Cx {
Cx::new(
RegionId::from_arena(ArenaIndex::new(0, 0)),
TaskId::from_arena(ArenaIndex::new(0, 0)),
Budget::INFINITE,
)
}
fn block_on<F: Future>(f: F) -> F::Output {
let waker = std::task::Waker::noop().clone();
let mut cx = Context::from_waker(&waker);
let mut pinned = Box::pin(f);
loop {
match pinned.as_mut().poll(&mut cx) {
Poll::Ready(v) => return v,
Poll::Pending => std::thread::yield_now(),
}
}
}
fn make_crash_channel(
config: CrashConfig,
) -> (
CrashSender<u32>,
mpsc::Receiver<u32>,
Arc<CrashController>,
Arc<CollectorSink>,
) {
let collector = Arc::new(CollectorSink::new());
let sink: Arc<dyn EvidenceSink> = collector.clone();
let (tx, rx, ctrl) = crash_channel::<u32>(16, config, sink);
(tx, rx, ctrl, collector)
}
#[test]
#[should_panic(expected = "crash probability must be in [0.0, 1.0]")]
fn config_rejects_invalid_crash_probability() {
let _ = CrashConfig::new(42).with_crash_probability(1.5);
}
#[test]
fn config_default_is_disabled() {
let config = CrashConfig::new(42);
assert!(!config.is_enabled());
}
#[test]
fn config_probabilistic_is_enabled() {
let config = CrashConfig::new(42).with_crash_probability(0.5);
assert!(config.is_enabled());
}
#[test]
fn config_deterministic_is_enabled() {
let config = CrashConfig::new(42).with_crash_after_sends(10);
assert!(config.is_enabled());
}
#[test]
fn passthrough_when_disabled() {
let config = CrashConfig::new(42);
let (tx, mut rx, ctrl, _) = make_crash_channel(config);
let cx = test_cx();
for i in 0..10 {
block_on(tx.send(&cx, i)).unwrap();
}
for i in 0..10 {
assert_eq!(rx.try_recv().unwrap(), i);
}
assert!(!ctrl.is_crashed());
}
#[test]
fn manual_crash_rejects_sends() {
let config = CrashConfig::new(42);
let (tx, _rx, ctrl, _) = make_crash_channel(config);
let cx = test_cx();
block_on(tx.send(&cx, 1)).unwrap();
ctrl.crash();
let err = block_on(tx.send(&cx, 2)).unwrap_err();
assert!(matches!(err, SendError::Disconnected(2)));
}
#[test]
fn restart_re_enables_sends() {
let config = CrashConfig::new(42);
let (tx, mut rx, ctrl, _) = make_crash_channel(config);
let cx = test_cx();
ctrl.crash();
assert!(block_on(tx.send(&cx, 1)).is_err());
ctrl.restart();
block_on(tx.send(&cx, 2)).unwrap();
assert_eq!(rx.try_recv().unwrap(), 2);
}
#[test]
fn crash_already_crashed_returns_false() {
let config = CrashConfig::new(42);
let (_, _, ctrl, _) = make_crash_channel(config);
assert!(ctrl.crash());
assert!(!ctrl.crash()); }
#[test]
fn restart_when_not_crashed_returns_false() {
let config = CrashConfig::new(42);
let (_, _, ctrl, _) = make_crash_channel(config);
assert!(!ctrl.restart()); }
#[test]
fn crash_after_sends() {
let config = CrashConfig::new(42).with_crash_after_sends(5);
let (tx, mut rx, ctrl, _) = make_crash_channel(config);
let cx = test_cx();
for i in 0..5 {
block_on(tx.send(&cx, i)).unwrap();
}
let err = block_on(tx.send(&cx, 5)).unwrap_err();
assert!(matches!(err, SendError::Disconnected(5)));
assert!(ctrl.is_crashed());
for i in 0..5 {
assert_eq!(rx.try_recv().unwrap(), i);
}
}
#[test]
fn restart_exhaustion() {
let config = CrashConfig::new(42).with_max_restarts(2);
let (_, _, ctrl, _) = make_crash_channel(config);
ctrl.crash();
assert!(ctrl.restart()); ctrl.crash();
assert!(ctrl.restart()); ctrl.crash();
assert!(!ctrl.restart()); assert!(ctrl.is_exhausted());
}
#[test]
fn exhausted_controller_rejects_sends() {
let config = CrashConfig::new(42)
.with_crash_after_sends(1)
.with_max_restarts(0);
let (tx, _rx, ctrl, _) = make_crash_channel(config);
let cx = test_cx();
block_on(tx.send(&cx, 0)).unwrap(); assert!(block_on(tx.send(&cx, 1)).is_err()); assert!(ctrl.is_crashed());
assert!(!ctrl.restart());
assert!(ctrl.is_exhausted());
}
#[test]
fn stats_track_all_operations() {
let config = CrashConfig::new(42).with_crash_after_sends(3);
let (tx, _rx, ctrl, _) = make_crash_channel(config);
let cx = test_cx();
for i in 0..5 {
let _ = block_on(tx.send(&cx, i));
}
let snap = ctrl.stats().snapshot();
assert_eq!(snap.sends_attempted, 5);
assert_eq!(snap.sends_succeeded, 3);
assert_eq!(snap.sends_rejected, 2);
assert_eq!(snap.crashes, 1);
}
#[test]
fn evidence_logged_for_crash_events() {
let config = CrashConfig::new(42).with_crash_after_sends(2);
let (tx, _rx, ctrl, collector) = make_crash_channel(config);
let cx = test_cx();
block_on(tx.send(&cx, 0)).unwrap();
block_on(tx.send(&cx, 1)).unwrap();
let _ = block_on(tx.send(&cx, 2));
ctrl.restart();
let _ = block_on(tx.send(&cx, 3));
let entries = collector.entries();
let actions: Vec<String> = entries.iter().map(|e| e.action.clone()).collect();
assert!(
actions.iter().any(|a| a.contains("crash")),
"Expected crash evidence, got: {actions:?}"
);
}
#[test]
fn evidence_timestamps_follow_deterministic_event_sequence() {
let config = CrashConfig::new(42).with_crash_after_sends(1);
let (tx, _rx, ctrl, collector) = make_crash_channel(config);
let cx = test_cx();
block_on(tx.send(&cx, 0)).unwrap();
assert!(block_on(tx.send(&cx, 1)).is_err());
assert!(ctrl.restart());
let timestamps: Vec<u64> = collector
.entries()
.iter()
.map(|entry| entry.ts_unix_ms)
.collect();
assert_eq!(timestamps, vec![1, 2, 3]);
}
#[test]
fn cold_restart_resets_send_count() {
let config = CrashConfig::new(42)
.with_crash_after_sends(3)
.with_restart_mode(RestartMode::Cold);
let (tx, _rx, ctrl, _) = make_crash_channel(config);
let cx = test_cx();
for i in 0..3 {
block_on(tx.send(&cx, i)).unwrap();
}
assert!(block_on(tx.send(&cx, 3)).is_err());
assert!(ctrl.is_crashed());
ctrl.restart();
tx.reset_send_count();
for i in 10..13 {
block_on(tx.send(&cx, i)).unwrap();
}
assert!(block_on(tx.send(&cx, 13)).is_err());
assert!(ctrl.is_crashed());
}
#[test]
fn warm_restart_preserves_send_count() {
let config = CrashConfig::new(42)
.with_crash_after_sends(3)
.with_restart_mode(RestartMode::Warm);
let (tx, _rx, ctrl, _) = make_crash_channel(config);
let cx = test_cx();
for i in 0..3 {
block_on(tx.send(&cx, i)).unwrap();
}
assert!(block_on(tx.send(&cx, 3)).is_err());
ctrl.restart();
assert!(block_on(tx.send(&cx, 4)).is_err());
}
#[test]
fn restart_mode_debug_clone_copy_eq() {
let cold = RestartMode::Cold;
let warm = RestartMode::Warm;
let copied = cold;
let cloned = cold;
assert_eq!(copied, cloned);
assert_eq!(copied, RestartMode::Cold);
assert_ne!(cold, warm);
assert!(format!("{cold:?}").contains("Cold"));
assert!(format!("{warm:?}").contains("Warm"));
}
#[test]
fn crash_stats_snapshot_debug_clone_eq_display() {
let snap = CrashStatsSnapshot {
sends_attempted: 10,
sends_succeeded: 8,
sends_rejected: 2,
crashes: 1,
restarts: 1,
};
let cloned = snap.clone();
assert_eq!(cloned, snap);
let dbg = format!("{snap:?}");
assert!(dbg.contains("CrashStatsSnapshot"));
let display = format!("{snap}");
assert!(display.contains("attempted: 10"));
assert!(display.contains("crashes: 1"));
}
#[test]
fn crash_config_debug_clone() {
let config = CrashConfig::new(42)
.with_crash_probability(0.5)
.with_crash_after_sends(10)
.with_max_restarts(3)
.with_restart_mode(RestartMode::Warm);
let cloned = config.clone();
assert_eq!(cloned.seed, 42);
assert_eq!(cloned.restart_mode, RestartMode::Warm);
assert_eq!(cloned.max_restarts, Some(3));
let dbg = format!("{config:?}");
assert!(dbg.contains("CrashConfig"));
}
}