use std::cmp::Reverse;
use log::debug;
use crate::{
Jiffies, Pid,
destination::Destination,
events::{Event, EventQueue, FaultEvent, IngestableEvents, MessageState, PidEvent, TimedEvent},
global::{self, local_access::IngestableEventBatch},
list_pool,
message::MessagePtr,
network::{Network, NetworkPoll},
random::Seed,
runners::fault::FaultState,
};
use super::clock::Clock;
pub(crate) struct RunnerCore {
pub(super) seed: Seed,
pub(super) clock: Clock,
network: Network,
pub event_queue: EventQueue,
pub time_budget: Jiffies,
started: bool,
fault: FaultState,
}
impl RunnerCore {
pub fn new(seed: Seed, network: Network, time_budget: Jiffies, size: usize) -> Self {
Self {
seed,
clock: Clock::default(),
network,
event_queue: EventQueue::new(),
time_budget,
started: false,
fault: FaultState::new(size),
}
}
pub fn seed_events(&mut self, events: Vec<Reverse<TimedEvent>>) {
for event in events {
self.event_queue.push(event);
}
}
pub fn mark_started(&mut self) -> bool {
if self.started {
return false;
}
self.started = true;
true
}
pub fn handle_pid_event(
&mut self,
invocation_time: Jiffies,
pid: Pid,
pid_event: PidEvent,
) -> Option<PidEvent> {
match pid_event {
PidEvent::Start { .. } => return Some(pid_event),
PidEvent::Message { source, .. } => {
match self.network.poll(invocation_time, pid, pid_event) {
NetworkPoll::Await(timed_event) => {
self.event_queue.push(Reverse(timed_event));
None
}
NetworkPoll::Ready(event) => {
if self.fault.is_link_broken(pid, source) {
debug!("Discarded network message, source: P{source}, target: P{pid}");
return None;
}
Some(event)
}
}
}
PidEvent::Timer { .. } => return Some(pid_event),
}
}
pub fn handle_fault_event(&mut self, event: FaultEvent) {
match event {
FaultEvent::BreakLink { pid1, pid2 } => self.fault.break_link(pid1, pid2),
FaultEvent::RestoreLink { pid1, pid2 } => self.fault.restore_link(pid1, pid2),
FaultEvent::Isolate { pid } => self.fault.isolate(pid),
FaultEvent::FinishIsolation { pid } => self.fault.restore(pid),
}
}
pub fn resolve_events(&mut self, events: IngestableEventBatch) {
let now = self.clock.now();
for event in events {
match event {
IngestableEvents::Message {
source,
destination,
message,
} => self.resolve_network_event(now, source, destination, message),
IngestableEvents::Timer {
pid,
id,
fire_after,
} => self.resolve_timer_event(now, pid, id, fire_after),
}
}
}
pub fn advance_time(&mut self, time: Jiffies) {
self.clock.fast_forward(time);
}
fn resolve_network_event(
&mut self,
now: Jiffies,
source: usize,
destination: Destination,
message: MessagePtr,
) {
let mut enqueue = |source, target, message, fault: &FaultState| {
if fault.is_link_broken(source, target) {
debug!("Discarded network message, source: P{source}, target: P{target}");
return;
}
match self.network.poll(
now,
target,
PidEvent::Message {
source,
message,
state: MessageState::Init,
},
) {
NetworkPoll::Await(event) => self.event_queue.push(Reverse(event)),
NetworkPoll::Ready(_) => unreachable!(),
}
};
match destination {
Destination::Target(target) => enqueue(source, target, message, &mut self.fault),
Destination::Pool(pool) => {
for &target in list_pool(pool) {
enqueue(source, target, message.clone(), &mut self.fault);
}
}
}
}
fn resolve_timer_event(&mut self, now: Jiffies, pid: usize, id: usize, fire_after: Jiffies) {
self.event_queue.push(Reverse(TimedEvent {
invocation_time: now + fire_after,
event: Event::Pid {
pid,
event: PidEvent::Timer { id },
},
}));
}
}
impl Drop for RunnerCore {
fn drop(&mut self) {
global::reset();
}
}