use std::time::Duration;
use magnetar_proto::{Backoff, ConnectionConfig, SupervisorConfig};
use magnetar_runtime_moonpool::ConnectionShared;
fn supervisor_with_grace(grace: Duration) -> SupervisorConfig {
SupervisorConfig {
initial_backoff: Duration::from_millis(100),
max_backoff: Duration::from_secs(60),
drop_grace: grace,
..SupervisorConfig::default()
}
}
#[test]
fn supervisor_config_roundtrips_through_connection_shared() {
let grace = Duration::from_millis(250);
let cfg = ConnectionConfig {
supervisor: Some(supervisor_with_grace(grace)),
..ConnectionConfig::default()
};
let shared = ConnectionShared::new(cfg);
let conn = shared.inner.lock();
let supervisor = conn
.supervisor_config()
.expect("supervisor config must be present");
assert_eq!(supervisor.drop_grace, grace);
assert!(supervisor.should_reset_backoff(grace + Duration::from_millis(1)));
assert!(!supervisor.should_reset_backoff(grace));
}
#[test]
fn persisted_backoff_grows_under_storm_pattern() {
let cfg = supervisor_with_grace(Duration::from_millis(500));
let mut backoff: Backoff = cfg.build_backoff(1);
let mut delays = Vec::with_capacity(8);
for _ in 0..8 {
let socket_alive = Duration::from_millis(5);
if cfg.should_reset_backoff(socket_alive) {
backoff.reset();
}
delays.push(backoff.next());
}
let first = delays[0];
assert!(
first <= Duration::from_millis(100),
"first delay starts at initial (with jitter), got {first:?}"
);
let third = delays[2];
assert!(
third >= Duration::from_millis(320),
"by the 3rd reconnect the schedule must reflect ≥ 4x growth (got {third:?})"
);
let last = *delays.last().expect("delays not empty");
assert!(
last >= Duration::from_secs(10),
"by the 8th reconnect the schedule must approach max_backoff (got {last:?})"
);
}
#[test]
fn stable_socket_resets_persisted_backoff_to_initial() {
let cfg = supervisor_with_grace(Duration::from_millis(500));
let mut backoff = cfg.build_backoff(1);
for _ in 0..6 {
if cfg.should_reset_backoff(Duration::from_millis(5)) {
backoff.reset();
}
let _ = backoff.next();
}
if cfg.should_reset_backoff(Duration::from_secs(2)) {
backoff.reset();
}
let post_reset = backoff.next();
assert!(
post_reset <= Duration::from_millis(100),
"schedule must collapse back to initial after a stable socket, got {post_reset:?}"
);
}
#[test]
fn give_up_budget_fires_behind_tcp_accepting_endpoint() {
let cfg = SupervisorConfig {
max_attempts: Some(3),
..supervisor_with_grace(Duration::from_millis(500))
};
let shared = ConnectionShared::new(ConnectionConfig {
supervisor: Some(cfg.clone()),
..ConnectionConfig::default()
});
assert_eq!(
shared
.inner
.lock()
.supervisor_config()
.expect("supervisor config present")
.max_attempts,
Some(3)
);
let mut give_up_attempts: u32 = 0;
let mut gave_up_after = None;
for cycle in 0..20 {
let prev_socket_alive = Duration::from_millis(5);
if cfg.should_reset_backoff(prev_socket_alive) {
give_up_attempts = 0;
}
give_up_attempts = give_up_attempts.saturating_add(1);
if cfg.should_give_up(give_up_attempts) {
gave_up_after = Some(cycle);
break;
}
}
assert_eq!(
gave_up_after,
Some(3),
"the supervisor must give up at max_attempts behind a TCP-accept endpoint \
(previously it looped forever)"
);
assert_eq!(
give_up_attempts, 4,
"counter spans the full dial+handshake cycle"
);
}