use std::cmp::Reverse;
use log::debug;
use crate::{
Jiffies,
destination::Destination,
events::{
Event, EventQueue, FaultEvents, InjestableEvents, NetworkFsm, PidHandlerEvent, TimedEvent,
},
global::{self, local_access::InjestableEventBatch},
list_pool,
message::MessagePtr,
network::{Network, NetworkDecision},
runners::fault::FaultState,
};
use super::clock::Clock;
pub(crate) struct RunnerCore {
pub(super) clock: Clock,
network: Network,
pub event_queue: EventQueue,
pub time_budget: Jiffies,
started: bool,
fault: FaultState,
}
impl RunnerCore {
pub fn new(network: Network, time_budget: Jiffies, size: usize) -> Self {
Self {
clock: Clock::default(),
network,
event_queue: EventQueue::new(),
time_budget,
started: false,
fault: FaultState::new(size),
}
}
pub fn seed_events(&mut self, events: Vec<Reverse<TimedEvent>>) {
for event in events {
self.event_queue.push(event);
}
}
pub fn mark_started(&mut self) -> bool {
if self.started {
return false;
}
self.started = true;
true
}
pub fn handle_pid_handler_event(
&mut self,
invocation_time: Jiffies,
handler_event: PidHandlerEvent,
) -> Option<PidHandlerEvent> {
let result = match handler_event {
PidHandlerEvent::Start { .. } => return Some(handler_event),
PidHandlerEvent::Network { target, source, .. } => {
if cfg!(fault_enabled) {
if self.fault.is_link_broken(target, source) {
debug!("Discard event, source: P{source}, target: P{target}");
return None;
}
}
self.network.decide(invocation_time, handler_event)
}
PidHandlerEvent::Timer { .. } => return Some(handler_event),
};
match result {
NetworkDecision::Keep(timed_event) => {
self.event_queue.push(Reverse(timed_event));
None
}
NetworkDecision::Ready(event) => Some(event),
}
}
pub fn handle_fault_event(&mut self, event: FaultEvents) {
match event {
FaultEvents::BreakLink { pid1, pid2 } => self.fault.break_link(pid1, pid2),
FaultEvents::RestoreLink { pid1, pid2 } => self.fault.restore_link(pid1, pid2),
FaultEvents::Isolate { pid } => self.fault.isolate(pid),
FaultEvents::FinishIsolation { pid } => self.fault.restore(pid),
}
}
pub fn resolve_events(&mut self, events: InjestableEventBatch) {
let now = self.clock.now();
for event in events {
match event {
InjestableEvents::NetworkEvent {
source,
destination,
message,
} => self.resolve_network_event(now, source, destination, message),
InjestableEvents::TimerEvent {
pid,
id,
fire_after,
} => self.resolve_timer_event(now, pid, id, fire_after),
}
}
}
pub fn advance_time(&mut self, time: Jiffies) {
self.clock.fast_forward(time);
}
fn resolve_network_event(
&mut self,
now: Jiffies,
source: usize,
destination: Destination,
message: MessagePtr,
) {
let mut enqueue = |source, target, message, fault: &FaultState| {
if cfg!(fault_enabled) {
if fault.is_link_broken(source, target) {
debug!("Discard event, source: P{source}, target: P{target}");
return;
}
}
let latency = self.network.compute_latency(source, target);
self.event_queue.push(Reverse(TimedEvent {
invocation_time: now + latency,
event: Event::Handler(PidHandlerEvent::Network {
source,
target,
message,
fsm: NetworkFsm::KeepLatency,
}),
}));
};
match destination {
Destination::Target(target) => enqueue(source, target, message, &mut self.fault),
Destination::BroadcastWithinPool(pool) => {
for &target in list_pool(pool) {
enqueue(source, target, message.clone(), &mut self.fault);
}
}
}
}
fn resolve_timer_event(&mut self, now: Jiffies, pid: usize, id: usize, fire_after: Jiffies) {
self.event_queue.push(Reverse(TimedEvent {
invocation_time: now + fire_after,
event: Event::Handler(PidHandlerEvent::Timer { pid, id }),
}));
}
}
impl Drop for RunnerCore {
fn drop(&mut self) {
global::reset();
}
}