dscale 0.7.1

A fast & deterministic simulation framework for benchmarking and testing distributed systems
Documentation
// DScale: deterministic distributed systems simulator
// Copyright (C) 2026  Konstantin Shprenger

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program.  If not, see <https://www.gnu.org/licenses/>.

use std::cell::RefCell;
use std::mem;
use std::sync::mpsc::SyncSender;

use smallvec::SmallVec;

use crate::destination::Destination;
use crate::events::IngestableEvents;
use crate::random::{Randomizer, Seed};
use crate::runners::scalable::task::{TaskId, TaskResult};
use crate::{MessagePtr, unique_id};

use crate::{Message, Pid, dscale_debug, jiffy::Jiffies, timer::TimerId, topology::GLOBAL_POOL};

const PREDICTION_SCHEDULED_PER_STEP: usize = 2;

pub(crate) type IngestableEventBatch = SmallVec<[IngestableEvents; PREDICTION_SCHEDULED_PER_STEP]>;

thread_local! {
    pub(crate) static PROCESS_ACCESS: RefCell<ProcessAccess> = RefCell::new(ProcessAccess::default());
}

fn with_process_access<R>(f: impl FnOnce(&mut ProcessAccess) -> R) -> R {
    PROCESS_ACCESS.with(|cell| f(&mut cell.borrow_mut()))
}

pub(crate) fn setup_process_access(seed: Seed, coordinator: Option<SyncSender<TaskResult>>) {
    with_process_access(|access| {
        access.random = Randomizer::new(seed);
        access.coordinator = coordinator
    });
}

#[derive(Default)]
pub(crate) struct ProcessAccess {
    pid: Pid,
    current_task: TaskId,
    now: Jiffies,
    random: Randomizer,
    scheduled_events: IngestableEventBatch,
    coordinator: Option<SyncSender<TaskResult>>,
}

impl ProcessAccess {
    fn broadcast_within_pool(&mut self, pool_name: &'static str, message: impl Message + 'static) {
        self.scheduled_events.push(IngestableEvents::Message {
            destination: Destination::Pool(pool_name),
            message: MessagePtr::new(message),
        });
    }

    fn send_to(&mut self, pid: Pid, message: impl Message + 'static) {
        self.scheduled_events.push(IngestableEvents::Message {
            destination: Destination::Target(pid),
            message: MessagePtr::new(message),
        });
    }

    fn send_random_from_pool(&mut self, pool: &str, message: impl Message + 'static) {
        let target = self.choose_from_pool(pool);
        self.send_to(target, message);
    }

    fn choose_from_pool(&mut self, pool_name: &str) -> Pid {
        let pool = super::topology::list_pool(pool_name);
        self.random.choose_from_slice(&pool)
    }

    fn schedule_timer_after(&mut self, after: Jiffies) -> TimerId {
        let timer_id = unique_id();
        self.scheduled_events.push(IngestableEvents::Timer {
            id: timer_id,
            fire_after: after,
        });
        timer_id
    }

    fn notify_coordinator(&mut self) {
        self.coordinator
            .as_ref()
            .expect("coordinator not found")
            .send(TaskResult {
                id: self.current_task,
                pid: self.pid,
                invocation_time: self.now,
                events: mem::take(&mut self.scheduled_events),
            })
            .expect("coordinator disconnected");
    }

    fn take_events(&mut self) -> IngestableEventBatch {
        mem::take(&mut self.scheduled_events)
    }

    fn pid(&self) -> Pid {
        self.pid
    }
}

pub(crate) fn prepare(pid: Pid, now: Jiffies) {
    with_process_access(|access| {
        access.pid = pid;
        access.now = now;
    });
}

pub(crate) fn prepare_task(task_id: TaskId, pid: Pid) {
    with_process_access(|access| {
        access.current_task = task_id;
        access.pid = pid;
        access.now = task_id.0;
    });
}

pub(crate) fn notify_coordinator() {
    with_process_access(|access| access.notify_coordinator());
}

pub(crate) fn take_events() -> IngestableEventBatch {
    with_process_access(|access| access.take_events())
}

/// Schedules a timer for the current process, firing after the given delay.
/// Returns a [`TimerId`] that will be passed to [`crate::Process::on_timer`].
pub fn schedule_timer_after(after: Jiffies) -> TimerId {
    dscale_debug!("scheduling timer after {after}");
    with_process_access(|access| access.schedule_timer_after(after))
}

/// Sends a message to all processes in [`GLOBAL_POOL`] (i.e. every process).
pub fn broadcast(message: impl Message + 'static) {
    with_process_access(|access| access.broadcast_within_pool(GLOBAL_POOL, message));
}

/// Sends a message to all processes within the named pool.
pub fn broadcast_within_pool(pool: &'static str, message: impl Message + 'static) {
    dscale_debug!("broadcasting within: {pool}");
    with_process_access(|access| access.broadcast_within_pool(pool, message));
}

/// Sends a message to the process with the given pid.
pub fn send_to(pid: Pid, message: impl Message + 'static) {
    dscale_debug!("sending to: P{pid}");
    with_process_access(|access| access.send_to(pid, message));
}

/// Sends a message to a randomly chosen process from [`GLOBAL_POOL`].
pub fn send_random(message: impl Message + 'static) {
    dscale_debug!("sending random P from GLOBAL_POOL");
    with_process_access(|access| access.send_random_from_pool(GLOBAL_POOL, message));
}

/// Sends a message to a randomly chosen process from the named pool.
pub fn send_random_from_pool(pool: &'static str, message: impl Message + 'static) {
    dscale_debug!("sending random from pool {pool}");
    with_process_access(|access| access.send_random_from_pool(pool, message));
}

/// Returns the pid of the currently executing process.
pub fn pid() -> Pid {
    with_process_access(|access| access.pid())
}

/// Picks a random process pid from the named pool.
pub fn choose_from_pool(pool_name: &str) -> Pid {
    with_process_access(|access| access.choose_from_pool(pool_name))
}

/// Returns current time.
pub fn now() -> Jiffies {
    with_process_access(|access| access.now)
}