dscale 0.7.1

A fast & deterministic simulation framework for benchmarking and testing distributed systems
Documentation
// DScale: deterministic distributed systems simulator
// Copyright (C) 2026  Konstantin Shprenger

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program.  If not, see <https://www.gnu.org/licenses/>.

use std::{
    cmp::Reverse,
    hint::{self},
    sync::Arc,
};

use log::{debug, trace};

use crate::{
    CompleteStatus, Jiffies, Pid,
    destination::Destination,
    events::{Event, EventQueue, FaultEvent, IngestableEvents, MessageState, PidEvent, TimedEvent},
    message::MessagePtr,
    network::{Network, NetworkPoll},
    random::Seed,
    runners::fault::FaultState,
    services::{self, Services, process_access::IngestableEventBatch, setup_services},
    topology::Topology,
};

use super::clock::Clock;

pub(crate) struct RunnerCore {
    pub(super) seed: Seed,
    pub(super) clock: Clock,
    network: Network,
    pub(super) event_queue: EventQueue,
    pub(super) time_budget: Jiffies,
    started: bool,
    fault: FaultState,
    pub(super) topology: Arc<Topology>,

    // CompleteStatus checks
    pub(super) max_steps: Option<usize>,
    pub(super) deadline: Jiffies,
    pub(super) steps_made: usize,
}

impl RunnerCore {
    pub fn new(
        services: Arc<Services>,
        seed: Seed,
        network: Network,
        time_budget: Jiffies,
        size: usize,
        topology: Arc<Topology>,
    ) -> Self {
        setup_services(services);
        Self {
            seed,
            clock: Clock::default(),
            network,
            event_queue: EventQueue::new(),
            time_budget,
            started: false,
            fault: FaultState::new(size),
            topology,
            max_steps: None,
            deadline: Jiffies(0),
            steps_made: 0,
        }
    }

    pub fn seed_events(&mut self, events: Vec<Reverse<TimedEvent>>) {
        for event in events {
            self.event_queue.push(event);
        }
        trace!("new event queue state:\n{:#?}", self.event_queue);
    }

    pub fn mark_started(&mut self) -> bool {
        if self.started {
            return false;
        }
        self.started = true;
        true
    }

    pub fn handle_pid_event(
        &mut self,
        invocation_time: Jiffies,
        pid: Pid,
        pid_event: PidEvent,
    ) -> Option<PidEvent> {
        match pid_event {
            PidEvent::Start { .. } => {
                hint::cold_path();
                return Some(pid_event);
            }
            PidEvent::Message { source, .. } => {
                match self.network.poll(invocation_time, pid, pid_event) {
                    NetworkPoll::Await(timed_event) => {
                        self.event_queue.push(Reverse(timed_event));
                        trace!("new event queue state:\n{:#?}", self.event_queue);
                        None
                    }
                    NetworkPoll::Ready(event) => {
                        if self.fault.is_link_broken(pid, source) {
                            hint::cold_path();
                            debug!("Discarded network message, source: P{source}, target: P{pid}");
                            return None;
                        }
                        Some(event)
                    }
                }
            }
            PidEvent::Timer { .. } => return Some(pid_event),
        }
    }

    pub fn handle_fault_event(&mut self, event: FaultEvent) {
        match event {
            FaultEvent::BreakLink { pid1, pid2 } => {
                self.fault.break_link(pid1, pid2);
                debug!("Break link P{pid1}<->P{pid2}");
            }
            FaultEvent::RestoreLink { pid1, pid2 } => {
                self.fault.restore_link(pid1, pid2);
                debug!("Restored link P{pid1}<->P{pid2}");
            }
            FaultEvent::Isolate { pid } => {
                self.fault.isolate(pid);
                debug!("Isolated P{pid}");
            }
            FaultEvent::FinishIsolation { pid } => {
                self.fault.restore(pid);
                debug!("Restored P{pid}");
            }
        }
    }

    pub fn resolve_events(&mut self, now: Jiffies, source: Pid, events: IngestableEventBatch) {
        for event in events {
            match event {
                IngestableEvents::Message {
                    destination,
                    message,
                } => self.resolve_network_event(now, source, destination, message),
                IngestableEvents::Timer { id, fire_after } => {
                    self.resolve_timer_event(now, source, id, fire_after)
                }
            }
        }
    }

    pub fn advance_time(&mut self, time: Jiffies) {
        self.clock.fast_forward(time);
    }

    fn resolve_network_event(
        &mut self,
        now: Jiffies,
        source: usize,
        destination: Destination,
        message: MessagePtr,
    ) {
        let mut enqueue = |source, target, message, fault: &FaultState| {
            if fault.is_link_broken(source, target) {
                hint::cold_path();
                debug!("Discarded network message, source: P{source}, target: P{target}");
                return;
            }
            match self.network.poll(
                now,
                target,
                PidEvent::Message {
                    source,
                    message,
                    state: MessageState::Init,
                },
            ) {
                NetworkPoll::Await(event) => {
                    self.event_queue.push(Reverse(event));
                    trace!("new event queue state:\n{:#?}", self.event_queue);
                }
                NetworkPoll::Ready(_) => unsafe { hint::unreachable_unchecked() },
            }
        };
        match destination {
            Destination::Target(target) => enqueue(source, target, message, &mut self.fault),
            Destination::Pool(pool) => {
                for &target in self.topology.list_pool(pool) {
                    enqueue(source, target, message.clone(), &mut self.fault);
                }
            }
        }
    }

    fn resolve_timer_event(&mut self, now: Jiffies, pid: usize, id: usize, fire_after: Jiffies) {
        self.event_queue.push(Reverse(TimedEvent {
            invocation_time: now + fire_after,
            event: Event::Pid {
                pid,
                event: PidEvent::Timer { id },
            },
        }));
        trace!("new event queue state:\n{:#?}", self.event_queue);
    }

    pub(super) fn check_steps(&self) -> Option<CompleteStatus> {
        if let Some(k) = self.max_steps {
            if self.steps_made >= k {
                hint::cold_path();
                return Some(CompleteStatus::Completed {
                    steps: self.steps_made,
                });
            }
        }
        None
    }

    pub(super) fn check_deadline(&self) -> Option<CompleteStatus> {
        let now = self.clock.now();
        if now >= self.deadline {
            hint::cold_path();
            if now >= self.time_budget {
                return Some(CompleteStatus::TimeBudgetExhausted {
                    steps: self.steps_made,
                });
            }
            return Some(CompleteStatus::Completed {
                steps: self.steps_made,
            });
        }
        None
    }
}

impl Drop for RunnerCore {
    fn drop(&mut self) {
        services::reset();
    }
}