dscale 0.5.1

A fast & deterministic simulation framework for benchmarking and testing distributed systems
Documentation
use std::sync::{Arc, Mutex};

use crossbeam_channel::{Receiver, RecvError};

use crate::{
    ProcessHandle, Rank,
    events::RankHandlerEvent,
    global::{
        configuration::setup_local_configuration,
        local_access::{self, setup_local_access},
    },
    random::Seed,
    runners::{
        task::{TaskId, TaskResult},
        threads::Threads,
    },
};

pub(crate) struct Workers {
    procs: Vec<Arc<Mutex<dyn ProcessHandle + Send>>>,
    pool: rayon::ThreadPool,
    rx: Receiver<TaskResult>,
}

impl Workers {
    pub(crate) fn new(
        procs: Vec<Arc<Mutex<dyn ProcessHandle + Send>>>,
        threads: Threads,
        seed: Seed,
    ) -> Self {
        for id in 0..procs.len() {
            setup_local_configuration(id, seed);
        }
        let threads_number: usize = threads.into();
        let (tx, rx) = crossbeam_channel::unbounded::<TaskResult>();
        log::warn!("Using {threads_number} threads for simulation");
        let pool = rayon::ThreadPoolBuilder::new()
            .num_threads(threads_number)
            .start_handler(move |_| {
                setup_local_access(seed, tx.clone());
            })
            .build()
            .expect("Could not build thread pool");
        Self { procs, pool, rx }
    }

    pub(crate) fn num_procs(&self) -> usize {
        self.procs.len()
    }

    pub(crate) fn spawn_event(&self, task_id: TaskId, event: RankHandlerEvent) {
        let (proc_id, work) = Self::event_into_work(event);
        let proc = self.procs[proc_id].clone();
        self.pool.spawn(Self::wrap(task_id, proc_id, proc, work));
    }

    pub(crate) fn try_next_result(&self) -> Option<TaskResult> {
        self.rx.try_recv().ok()
    }

    pub(crate) fn next_result(&self) -> Result<TaskResult, RecvError> {
        self.rx.recv()
    }

    fn event_into_work(
        event: RankHandlerEvent,
    ) -> (usize, Box<dyn FnOnce(&mut dyn ProcessHandle) + Send>) {
        match event {
            RankHandlerEvent::Start { rank } => (rank, Box::new(|proc| proc.on_start())),
            RankHandlerEvent::Network {
                source,
                target,
                message,
                ..
            } => (
                target,
                Box::new(move |proc| proc.on_message(source, message)),
            ),
            RankHandlerEvent::Timer { rank, id } => (rank, Box::new(move |proc| proc.on_timer(id))),
        }
    }

    fn wrap(
        task_id: TaskId,
        rank: Rank,
        proc: Arc<Mutex<dyn ProcessHandle + Send>>,
        work: Box<dyn FnOnce(&mut dyn ProcessHandle) + Send>,
    ) -> impl FnOnce() + Send {
        move || {
            local_access::prepare_task(task_id, rank);
            let mut guard = proc.lock().unwrap();
            work(&mut *guard);
            drop(guard);
            local_access::done();
        }
    }
}