use std::cmp::Reverse;
use log::debug;
use crate::{
Jiffies, Pid,
destination::Destination,
events::{Event, EventQueue, FaultEvent, InjestableEvents, NetworkFsm, PidEvent, TimedEvent},
global::{self, local_access::InjestableEventBatch},
list_pool,
message::MessagePtr,
network::{Network, NetworkDecision},
random::Seed,
runners::fault::FaultState,
};
use super::clock::Clock;
pub(crate) struct RunnerCore {
pub(super) seed: Seed,
pub(super) clock: Clock,
network: Network,
pub event_queue: EventQueue,
pub time_budget: Jiffies,
started: bool,
fault: FaultState,
}
impl RunnerCore {
pub fn new(seed: Seed, network: Network, time_budget: Jiffies, size: usize) -> Self {
Self {
seed,
clock: Clock::default(),
network,
event_queue: EventQueue::new(),
time_budget,
started: false,
fault: FaultState::new(size),
}
}
pub fn seed_events(&mut self, events: Vec<Reverse<TimedEvent>>) {
for event in events {
self.event_queue.push(event);
}
}
pub fn mark_started(&mut self) -> bool {
if self.started {
return false;
}
self.started = true;
true
}
pub fn handle_pid_event(
&mut self,
invocation_time: Jiffies,
pid: Pid,
pid_event: PidEvent,
) -> Option<PidEvent> {
let result = match pid_event {
PidEvent::Start { .. } => return Some(pid_event),
PidEvent::Network { source, .. } => {
if cfg!(fault_enabled) {
if self.fault.is_link_broken(pid, source) {
debug!("Discard event, source: P{source}, target: P{pid}");
return None;
}
}
self.network.decide(invocation_time, pid, pid_event)
}
PidEvent::Timer { .. } => return Some(pid_event),
};
match result {
NetworkDecision::Keep(timed_event) => {
self.event_queue.push(Reverse(timed_event));
None
}
NetworkDecision::Ready(event) => Some(event),
}
}
pub fn handle_fault_event(&mut self, event: FaultEvent) {
match event {
FaultEvent::BreakLink { pid1, pid2 } => self.fault.break_link(pid1, pid2),
FaultEvent::RestoreLink { pid1, pid2 } => self.fault.restore_link(pid1, pid2),
FaultEvent::Isolate { pid } => self.fault.isolate(pid),
FaultEvent::FinishIsolation { pid } => self.fault.restore(pid),
}
}
pub fn resolve_events(&mut self, events: InjestableEventBatch) {
let now = self.clock.now();
for event in events {
match event {
InjestableEvents::NetworkEvent {
source,
destination,
message,
} => self.resolve_network_event(now, source, destination, message),
InjestableEvents::TimerEvent {
pid,
id,
fire_after,
} => self.resolve_timer_event(now, pid, id, fire_after),
}
}
}
pub fn advance_time(&mut self, time: Jiffies) {
self.clock.fast_forward(time);
}
fn resolve_network_event(
&mut self,
now: Jiffies,
source: usize,
destination: Destination,
message: MessagePtr,
) {
let mut enqueue = |source, target, message, fault: &FaultState| {
if cfg!(fault_enabled) {
if fault.is_link_broken(source, target) {
debug!("Discard event, source: P{source}, target: P{target}");
return;
}
}
let latency = self.network.compute_latency(source, target);
self.event_queue.push(Reverse(TimedEvent {
invocation_time: now + latency,
event: Event::Pid {
pid: target,
event: PidEvent::Network {
source,
message,
fsm: NetworkFsm::KeepLatency,
},
},
}));
};
match destination {
Destination::Target(target) => enqueue(source, target, message, &mut self.fault),
Destination::Pool(pool) => {
for &target in list_pool(pool) {
enqueue(source, target, message.clone(), &mut self.fault);
}
}
}
}
fn resolve_timer_event(&mut self, now: Jiffies, pid: usize, id: usize, fire_after: Jiffies) {
self.event_queue.push(Reverse(TimedEvent {
invocation_time: now + fire_after,
event: Event::Pid {
pid,
event: PidEvent::Timer { id },
},
}));
}
}
impl Drop for RunnerCore {
fn drop(&mut self) {
global::reset();
}
}