use std::{
cmp::Reverse,
hint::{self},
sync::Arc,
};
use log::{debug, trace};
use crate::{
CompleteStatus, Jiffies, Pid,
destination::Destination,
events::{Event, EventQueue, FaultEvent, IngestableEvents, MessageState, PidEvent, TimedEvent},
message::MessagePtr,
network::{Network, NetworkPoll},
random::Seed,
runners::fault::FaultState,
services::{self, Services, process_access::IngestableEventBatch, setup_services},
topology::Topology,
};
use super::clock::Clock;
pub(crate) struct RunnerCore {
pub(super) seed: Seed,
pub(super) clock: Clock,
network: Network,
pub(super) event_queue: EventQueue,
pub(super) time_budget: Jiffies,
started: bool,
fault: FaultState,
pub(super) topology: Arc<Topology>,
pub(super) max_steps: Option<usize>,
pub(super) deadline: Jiffies,
pub(super) steps_made: usize,
}
impl RunnerCore {
pub fn new(
services: Arc<Services>,
seed: Seed,
network: Network,
time_budget: Jiffies,
size: usize,
topology: Arc<Topology>,
) -> Self {
setup_services(services);
Self {
seed,
clock: Clock::default(),
network,
event_queue: EventQueue::new(),
time_budget,
started: false,
fault: FaultState::new(size),
topology,
max_steps: None,
deadline: Jiffies(0),
steps_made: 0,
}
}
pub fn seed_events(&mut self, events: Vec<Reverse<TimedEvent>>) {
for event in events {
self.event_queue.push(event);
}
trace!("new event queue state:\n{:#?}", self.event_queue);
}
pub fn mark_started(&mut self) -> bool {
if self.started {
return false;
}
self.started = true;
true
}
pub fn handle_pid_event(
&mut self,
invocation_time: Jiffies,
pid: Pid,
pid_event: PidEvent,
) -> Option<PidEvent> {
match pid_event {
PidEvent::Start { .. } => {
hint::cold_path();
return Some(pid_event);
}
PidEvent::Message { source, .. } => {
match self.network.poll(invocation_time, pid, pid_event) {
NetworkPoll::Await(timed_event) => {
self.event_queue.push(Reverse(timed_event));
trace!("new event queue state:\n{:#?}", self.event_queue);
None
}
NetworkPoll::Ready(event) => {
if self.fault.is_link_broken(pid, source) {
hint::cold_path();
debug!("Discarded network message, source: P{source}, target: P{pid}");
return None;
}
Some(event)
}
}
}
PidEvent::Timer { .. } => return Some(pid_event),
}
}
pub fn handle_fault_event(&mut self, event: FaultEvent) {
match event {
FaultEvent::BreakLink { pid1, pid2 } => {
self.fault.break_link(pid1, pid2);
debug!("Break link P{pid1}<->P{pid2}");
}
FaultEvent::RestoreLink { pid1, pid2 } => {
self.fault.restore_link(pid1, pid2);
debug!("Restored link P{pid1}<->P{pid2}");
}
FaultEvent::Isolate { pid } => {
self.fault.isolate(pid);
debug!("Isolated P{pid}");
}
FaultEvent::FinishIsolation { pid } => {
self.fault.restore(pid);
debug!("Restored P{pid}");
}
}
}
pub fn resolve_events(&mut self, now: Jiffies, source: Pid, events: IngestableEventBatch) {
for event in events {
match event {
IngestableEvents::Message {
destination,
message,
} => self.resolve_network_event(now, source, destination, message),
IngestableEvents::Timer { id, fire_after } => {
self.resolve_timer_event(now, source, id, fire_after)
}
}
}
}
pub fn advance_time(&mut self, time: Jiffies) {
self.clock.fast_forward(time);
}
fn resolve_network_event(
&mut self,
now: Jiffies,
source: usize,
destination: Destination,
message: MessagePtr,
) {
let mut enqueue = |source, target, message, fault: &FaultState| {
if fault.is_link_broken(source, target) {
hint::cold_path();
debug!("Discarded network message, source: P{source}, target: P{target}");
return;
}
match self.network.poll(
now,
target,
PidEvent::Message {
source,
message,
state: MessageState::Init,
},
) {
NetworkPoll::Await(event) => {
self.event_queue.push(Reverse(event));
trace!("new event queue state:\n{:#?}", self.event_queue);
}
NetworkPoll::Ready(_) => unsafe { hint::unreachable_unchecked() },
}
};
match destination {
Destination::Target(target) => enqueue(source, target, message, &mut self.fault),
Destination::Pool(pool) => {
for &target in self.topology.list_pool(pool) {
enqueue(source, target, message.clone(), &mut self.fault);
}
}
}
}
fn resolve_timer_event(&mut self, now: Jiffies, pid: usize, id: usize, fire_after: Jiffies) {
self.event_queue.push(Reverse(TimedEvent {
invocation_time: now + fire_after,
event: Event::Pid {
pid,
event: PidEvent::Timer { id },
},
}));
trace!("new event queue state:\n{:#?}", self.event_queue);
}
pub(super) fn check_steps(&self) -> Option<CompleteStatus> {
if let Some(k) = self.max_steps {
if self.steps_made >= k {
hint::cold_path();
return Some(CompleteStatus::Completed {
steps: self.steps_made,
});
}
}
None
}
pub(super) fn check_deadline(&self) -> Option<CompleteStatus> {
let now = self.clock.now();
if now >= self.deadline {
hint::cold_path();
if now >= self.time_budget {
return Some(CompleteStatus::TimeBudgetExhausted {
steps: self.steps_made,
});
}
return Some(CompleteStatus::Completed {
steps: self.steps_made,
});
}
None
}
}
impl Drop for RunnerCore {
fn drop(&mut self) {
services::reset();
}
}