dscale 0.5.2

A fast & deterministic simulation framework for benchmarking and testing distributed systems
Documentation
use std::cmp::Reverse;

use log::debug;

use crate::{
    Jiffies,
    destination::Destination,
    events::{
        Event, EventQueue, FaultEvents, InjestableEvents, NetworkFsm, PidHandlerEvent, TimedEvent,
    },
    global::{self, local_access::InjestableEventBatch},
    list_pool,
    message::MessagePtr,
    network::{Network, NetworkDecision},
    runners::fault::FaultState,
};

use super::clock::Clock;

pub(crate) struct RunnerCore {
    pub(super) clock: Clock,
    network: Network,
    pub event_queue: EventQueue,
    pub time_budget: Jiffies,
    started: bool,
    fault: FaultState,
}

impl RunnerCore {
    pub fn new(network: Network, time_budget: Jiffies, size: usize) -> Self {
        Self {
            clock: Clock::default(),
            network,
            event_queue: EventQueue::new(),
            time_budget,
            started: false,
            fault: FaultState::new(size),
        }
    }

    pub fn seed_events(&mut self, events: Vec<Reverse<TimedEvent>>) {
        for event in events {
            self.event_queue.push(event);
        }
    }

    pub fn mark_started(&mut self) -> bool {
        if self.started {
            return false;
        }
        self.started = true;
        true
    }

    pub fn handle_pid_handler_event(
        &mut self,
        invocation_time: Jiffies,
        handler_event: PidHandlerEvent,
    ) -> Option<PidHandlerEvent> {
        let result = match handler_event {
            PidHandlerEvent::Start { .. } => return Some(handler_event),
            PidHandlerEvent::Network { target, source, .. } => {
                if cfg!(fault_enabled) {
                    if self.fault.is_link_broken(target, source) {
                        debug!("Discard event, source: P{source}, target: P{target}");
                        return None;
                    }
                }
                self.network.decide(invocation_time, handler_event)
            }
            PidHandlerEvent::Timer { .. } => return Some(handler_event),
        };
        match result {
            NetworkDecision::Keep(timed_event) => {
                self.event_queue.push(Reverse(timed_event));
                None
            }
            NetworkDecision::Ready(event) => Some(event),
        }
    }

    pub fn handle_fault_event(&mut self, event: FaultEvents) {
        match event {
            FaultEvents::BreakLink { pid1, pid2 } => self.fault.break_link(pid1, pid2),
            FaultEvents::RestoreLink { pid1, pid2 } => self.fault.restore_link(pid1, pid2),
            FaultEvents::Isolate { pid } => self.fault.isolate(pid),
            FaultEvents::FinishIsolation { pid } => self.fault.restore(pid),
        }
    }

    pub fn resolve_events(&mut self, events: InjestableEventBatch) {
        let now = self.clock.now();
        for event in events {
            match event {
                InjestableEvents::NetworkEvent {
                    source,
                    destination,
                    message,
                } => self.resolve_network_event(now, source, destination, message),
                InjestableEvents::TimerEvent {
                    pid,
                    id,
                    fire_after,
                } => self.resolve_timer_event(now, pid, id, fire_after),
            }
        }
    }

    pub fn advance_time(&mut self, time: Jiffies) {
        self.clock.fast_forward(time);
    }

    fn resolve_network_event(
        &mut self,
        now: Jiffies,
        source: usize,
        destination: Destination,
        message: MessagePtr,
    ) {
        let mut enqueue = |source, target, message, fault: &FaultState| {
            if cfg!(fault_enabled) {
                if fault.is_link_broken(source, target) {
                    debug!("Discard event, source: P{source}, target: P{target}");
                    return;
                }
            }
            let latency = self.network.compute_latency(source, target);
            self.event_queue.push(Reverse(TimedEvent {
                invocation_time: now + latency,
                event: Event::Handler(PidHandlerEvent::Network {
                    source,
                    target,
                    message,
                    fsm: NetworkFsm::KeepLatency,
                }),
            }));
        };
        match destination {
            Destination::Target(target) => enqueue(source, target, message, &mut self.fault),
            Destination::BroadcastWithinPool(pool) => {
                for &target in list_pool(pool) {
                    enqueue(source, target, message.clone(), &mut self.fault);
                }
            }
        }
    }

    fn resolve_timer_event(&mut self, now: Jiffies, pid: usize, id: usize, fire_after: Jiffies) {
        self.event_queue.push(Reverse(TimedEvent {
            invocation_time: now + fire_after,
            event: Event::Handler(PidHandlerEvent::Timer { pid, id }),
        }));
    }
}

impl Drop for RunnerCore {
    fn drop(&mut self) {
        global::reset();
    }
}