use crate::event::payload::{SupervisorEvent, What, Where};
use crate::event::time::{CorrelationId, EventSequence, EventSequenceSource, EventTime, When};
use crate::id::types::{ChildId, ChildStartCount, Generation, SupervisorPath};
use crate::runtime::lifecycle::{RuntimeControlPlane, RuntimeExitReport};
use crate::runtime::watchdog::RuntimeWatchdog;
use crate::{control::handle::SupervisorHandle, runtime::message::RuntimeLoopMessage};
use serde::{Deserialize, Serialize};
use tokio::sync::{broadcast, mpsc};
use uuid::Uuid;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct PausedTime {
pub unix_nanos: u128,
pub monotonic_nanos: u128,
pub uptime_ms: u64,
}
impl PausedTime {
pub fn new(unix_nanos: u128, monotonic_nanos: u128, uptime_ms: u64) -> Self {
Self {
unix_nanos,
monotonic_nanos,
uptime_ms,
}
}
pub fn event_time(
&self,
generation: Generation,
child_start_count: ChildStartCount,
) -> EventTime {
EventTime::deterministic(
self.unix_nanos,
self.monotonic_nanos,
self.uptime_ms,
generation,
child_start_count,
)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct DeterministicJitter {
pub percent: i64,
}
impl DeterministicJitter {
pub fn new(percent: i64) -> Self {
Self { percent }
}
pub fn apply_ms(&self, base_ms: u64) -> u64 {
let base = i128::from(base_ms);
let delta = base.saturating_mul(i128::from(self.percent)) / 100;
base.saturating_add(delta).max(0) as u64
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct EventCollector {
pub events: Vec<SupervisorEvent>,
}
impl EventCollector {
pub fn new() -> Self {
Self::default()
}
pub fn push(&mut self, event: SupervisorEvent) {
self.events.push(event);
}
pub fn event_names(&self) -> Vec<&'static str> {
self.events.iter().map(|event| event.what.name()).collect()
}
}
#[derive(Debug)]
pub struct EventFixture {
pub paused_time: PausedTime,
pub sequences: EventSequenceSource,
pub correlation_id: CorrelationId,
pub config_version: u64,
}
impl EventFixture {
pub fn new(paused_time: PausedTime, config_version: u64) -> Self {
Self {
paused_time,
sequences: EventSequenceSource::new(),
correlation_id: CorrelationId::from_uuid(Uuid::nil()),
config_version,
}
}
pub fn child_event(
&self,
child_id: ChildId,
child_name: impl Into<String>,
what: What,
) -> SupervisorEvent {
let path = SupervisorPath::root().join(child_id.to_string());
let location = Where::new(path.clone()).with_child(child_id, child_name);
SupervisorEvent::new(
When::new(
self.paused_time
.event_time(Generation::initial(), ChildStartCount::first()),
),
location,
what,
self.sequences.next(),
self.correlation_id,
self.config_version,
)
}
pub fn supervisor_event(&self, what: What) -> SupervisorEvent {
SupervisorEvent::new(
When::new(
self.paused_time
.event_time(Generation::initial(), ChildStartCount::first()),
),
Where::new(SupervisorPath::root()),
what,
self.sequences.next(),
self.correlation_id,
self.config_version,
)
}
pub fn sequence(value: u64) -> EventSequence {
EventSequence::new(value)
}
}
pub async fn runtime_control_plane_failed_handle() -> SupervisorHandle {
let (command_sender, command_receiver) = mpsc::channel::<RuntimeLoopMessage>(1);
drop(command_receiver);
let (event_sender, _) = broadcast::channel(16);
let control_plane = RuntimeControlPlane::new();
control_plane.mark_alive();
let join_handle = tokio::spawn(async move {
panic!("runtime control loop panic fixture");
#[allow(unreachable_code)]
RuntimeExitReport::completed("unreachable", "unreachable")
});
RuntimeWatchdog::spawn(control_plane.clone(), join_handle, event_sender.clone());
let handle = SupervisorHandle::new(command_sender, event_sender, control_plane);
let _report = handle.join().await.expect("failed runtime joins");
handle
}
pub fn deterministic_backoff_policy(
initial: std::time::Duration,
max: std::time::Duration,
jitter_percent: u8,
reset_after: std::time::Duration,
seed: u64,
) -> crate::policy::backoff::BackoffPolicy {
crate::policy::backoff::BackoffPolicy::new(initial, max, jitter_percent, reset_after)
.with_deterministic_jitter(seed)
}
pub fn full_jitter_backoff_policy(
initial: std::time::Duration,
max: std::time::Duration,
seed: u64,
) -> crate::policy::backoff::BackoffPolicy {
let mut policy = crate::policy::backoff::BackoffPolicy::new(
initial,
max,
100,
std::time::Duration::from_secs(300),
);
policy.jitter_mode = crate::policy::backoff::JitterMode::FullJitter { seed };
policy
}
pub fn decorrelated_jitter_backoff_policy(
initial: std::time::Duration,
max: std::time::Duration,
seed: u64,
) -> crate::policy::backoff::BackoffPolicy {
let mut policy = crate::policy::backoff::BackoffPolicy::new(
initial,
max,
100,
std::time::Duration::from_secs(300),
);
policy.jitter_mode = crate::policy::backoff::JitterMode::DecorrelatedJitter { seed };
policy
}