dscale 0.5.2

A fast & deterministic simulation framework for benchmarking and testing distributed systems
Documentation
use std::cell::RefCell;
use std::mem;

use crossbeam_channel::Sender;
use smallvec::SmallVec;

use crate::destination::Destination;
use crate::events::InjestableEvents;
use crate::random::{Randomizer, Seed};
use crate::runners::task::{TaskId, TaskResult};
use crate::{MessagePtr, global_unique_id};

use crate::{Message, Pid, dscale_debug, jiffy::Jiffies, timer::TimerId, topology::GLOBAL_POOL};

const PREDICTION_SCHEDULED_PER_STEP: usize = 2;

pub(crate) type InjestableEventBatch = SmallVec<[InjestableEvents; PREDICTION_SCHEDULED_PER_STEP]>;

thread_local! {
    pub(crate) static LOCAL_ACCESS: RefCell<LocalAccess> = RefCell::new(LocalAccess::default());
}

fn with_local_access<R>(f: impl FnOnce(&mut LocalAccess) -> R) -> R {
    LOCAL_ACCESS.with(|cell| f(&mut cell.borrow_mut()))
}

pub(crate) fn setup_local_access(seed: Seed, coordinator: Sender<TaskResult>) {
    with_local_access(|access| {
        access.random = Randomizer::new(seed);
        access.coordinator = Some(coordinator)
    });
}

#[derive(Default)]
pub(crate) struct LocalAccess {
    pid: Pid,
    current_task: TaskId,
    now: Jiffies,
    random: Randomizer,
    scheduled_events: InjestableEventBatch,
    coordinator: Option<Sender<TaskResult>>,
}

impl LocalAccess {
    fn broadcast_within_pool(&mut self, pool_name: &'static str, message: impl Message + 'static) {
        self.scheduled_events.push(InjestableEvents::NetworkEvent {
            source: self.pid,
            destination: Destination::BroadcastWithinPool(pool_name),
            message: MessagePtr::new(message),
        });
    }

    fn send_to(&mut self, pid: Pid, message: impl Message + 'static) {
        self.scheduled_events.push(InjestableEvents::NetworkEvent {
            source: self.pid,
            destination: Destination::Target(pid),
            message: MessagePtr::new(message),
        });
    }

    fn send_random_from_pool(&mut self, pool: &str, message: impl Message + 'static) {
        let target = self.choose_from_pool(pool);
        self.send_to(target, message);
    }

    fn choose_from_pool(&mut self, pool_name: &str) -> Pid {
        let pool = super::shared_access::list_pool(pool_name);
        self.random.choose_from_slice(pool)
    }

    fn schedule_timer_after(&mut self, after: Jiffies) -> TimerId {
        let timer_id = global_unique_id();
        self.scheduled_events.push(InjestableEvents::TimerEvent {
            pid: self.pid,
            id: timer_id,
            fire_after: after,
        });
        timer_id
    }

    fn done(&mut self) {
        let _ = self
            .coordinator
            .as_ref()
            .expect("No coordinator")
            .send(TaskResult {
                id: self.current_task,
                pid: self.pid,
                events: mem::take(&mut self.scheduled_events),
            });
    }

    fn take_events(&mut self) -> InjestableEventBatch {
        mem::take(&mut self.scheduled_events)
    }

    fn pid(&self) -> Pid {
        self.pid
    }
}

pub(crate) fn prepare(pid: Pid, now: Jiffies) {
    with_local_access(|access| {
        access.pid = pid;
        access.now = now;
    });
}

pub(crate) fn prepare_task(task_id: TaskId, pid: Pid) {
    with_local_access(|access| {
        access.current_task = task_id;
        access.pid = pid;
        access.now = task_id.0;
    });
}

pub(crate) fn done() {
    with_local_access(|access| access.done());
}

pub(crate) fn take_events() -> InjestableEventBatch {
    with_local_access(|access| access.take_events())
}

/// Schedules a timer for the current process, firing after the given delay.
/// Returns a [`TimerId`] that will be passed to [`crate::ProcessHandle::on_timer`].
pub fn schedule_timer_after(after: Jiffies) -> TimerId {
    dscale_debug!("[Access] scheduling timer after {after}");
    with_local_access(|access| access.schedule_timer_after(after))
}

/// Sends a message to all processes in [`GLOBAL_POOL`] (i.e. every process).
pub fn broadcast(message: impl Message + 'static) {
    with_local_access(|access| access.broadcast_within_pool(GLOBAL_POOL, message));
}

/// Sends a message to all processes within the named pool.
pub fn broadcast_within_pool(pool: &'static str, message: impl Message + 'static) {
    dscale_debug!("[Access] broadcasting within: {pool}");
    with_local_access(|access| access.broadcast_within_pool(pool, message));
}

/// Sends a message to the process with the given pid.
pub fn send_to(pid: Pid, message: impl Message + 'static) {
    dscale_debug!("[Access] sending to: P{pid}");
    with_local_access(|access| access.send_to(pid, message));
}

/// Sends a message to a randomly chosen process from [`GLOBAL_POOL`].
pub fn send_random(message: impl Message + 'static) {
    dscale_debug!("[Access] sending random P from GLOBAL_POOL");
    with_local_access(|access| access.send_random_from_pool(GLOBAL_POOL, message));
}

/// Sends a message to a randomly chosen process from the named pool.
pub fn send_random_from_pool(pool: &'static str, message: impl Message + 'static) {
    dscale_debug!("[Access] sending random from pool {pool}");
    with_local_access(|access| access.send_random_from_pool(pool, message));
}

/// Returns the pid of the currently executing process.
pub fn pid() -> Pid {
    with_local_access(|access| access.pid())
}

/// Picks a random process pid from the named pool.
pub fn choose_from_pool(pool_name: &str) -> Pid {
    with_local_access(|access| access.choose_from_pool(pool_name))
}

/// Returns current time.
pub fn now() -> Jiffies {
    with_local_access(|access| access.now)
}

pub(crate) fn reset() {
    LOCAL_ACCESS.with(|cell| *cell.borrow_mut() = LocalAccess::default());
}