dscale 0.6.0

A fast & deterministic simulation framework for benchmarking and testing distributed systems
Documentation
use std::cmp::Reverse;

use log::debug;

use crate::{
    Jiffies, Pid,
    destination::Destination,
    events::{Event, EventQueue, FaultEvent, IngestableEvents, MessageState, PidEvent, TimedEvent},
    global::{self, local_access::IngestableEventBatch},
    list_pool,
    message::MessagePtr,
    network::{Network, NetworkPoll},
    random::Seed,
    runners::fault::FaultState,
};

use super::clock::Clock;

pub(crate) struct RunnerCore {
    pub(super) seed: Seed,
    pub(super) clock: Clock,
    network: Network,
    pub event_queue: EventQueue,
    pub time_budget: Jiffies,
    started: bool,
    fault: FaultState,
}

impl RunnerCore {
    pub fn new(seed: Seed, network: Network, time_budget: Jiffies, size: usize) -> Self {
        Self {
            seed,
            clock: Clock::default(),
            network,
            event_queue: EventQueue::new(),
            time_budget,
            started: false,
            fault: FaultState::new(size),
        }
    }

    pub fn seed_events(&mut self, events: Vec<Reverse<TimedEvent>>) {
        for event in events {
            self.event_queue.push(event);
        }
    }

    pub fn mark_started(&mut self) -> bool {
        if self.started {
            return false;
        }
        self.started = true;
        true
    }

    pub fn handle_pid_event(
        &mut self,
        invocation_time: Jiffies,
        pid: Pid,
        pid_event: PidEvent,
    ) -> Option<PidEvent> {
        match pid_event {
            PidEvent::Start { .. } => return Some(pid_event),
            PidEvent::Message { source, .. } => {
                match self.network.poll(invocation_time, pid, pid_event) {
                    NetworkPoll::Await(timed_event) => {
                        self.event_queue.push(Reverse(timed_event));
                        None
                    }
                    NetworkPoll::Ready(event) => {
                        if self.fault.is_link_broken(pid, source) {
                            debug!("Discarded network message, source: P{source}, target: P{pid}");
                            return None;
                        }
                        Some(event)
                    }
                }
            }
            PidEvent::Timer { .. } => return Some(pid_event),
        }
    }

    pub fn handle_fault_event(&mut self, event: FaultEvent) {
        match event {
            FaultEvent::BreakLink { pid1, pid2 } => self.fault.break_link(pid1, pid2),
            FaultEvent::RestoreLink { pid1, pid2 } => self.fault.restore_link(pid1, pid2),
            FaultEvent::Isolate { pid } => self.fault.isolate(pid),
            FaultEvent::FinishIsolation { pid } => self.fault.restore(pid),
        }
    }

    pub fn resolve_events(&mut self, events: IngestableEventBatch) {
        let now = self.clock.now();
        for event in events {
            match event {
                IngestableEvents::Message {
                    source,
                    destination,
                    message,
                } => self.resolve_network_event(now, source, destination, message),
                IngestableEvents::Timer {
                    pid,
                    id,
                    fire_after,
                } => self.resolve_timer_event(now, pid, id, fire_after),
            }
        }
    }

    pub fn advance_time(&mut self, time: Jiffies) {
        self.clock.fast_forward(time);
    }

    fn resolve_network_event(
        &mut self,
        now: Jiffies,
        source: usize,
        destination: Destination,
        message: MessagePtr,
    ) {
        let mut enqueue = |source, target, message, fault: &FaultState| {
            if fault.is_link_broken(source, target) {
                debug!("Discarded network message, source: P{source}, target: P{target}");
                return;
            }
            match self.network.poll(
                now,
                target,
                PidEvent::Message {
                    source,
                    message,
                    state: MessageState::Init,
                },
            ) {
                NetworkPoll::Await(event) => self.event_queue.push(Reverse(event)),
                NetworkPoll::Ready(_) => unreachable!(),
            }
        };
        match destination {
            Destination::Target(target) => enqueue(source, target, message, &mut self.fault),
            Destination::Pool(pool) => {
                for &target in list_pool(pool) {
                    enqueue(source, target, message.clone(), &mut self.fault);
                }
            }
        }
    }

    fn resolve_timer_event(&mut self, now: Jiffies, pid: usize, id: usize, fire_after: Jiffies) {
        self.event_queue.push(Reverse(TimedEvent {
            invocation_time: now + fire_after,
            event: Event::Pid {
                pid,
                event: PidEvent::Timer { id },
            },
        }));
    }
}

impl Drop for RunnerCore {
    fn drop(&mut self) {
        global::reset();
    }
}