dscale 0.6.3

A fast & deterministic simulation framework for benchmarking and testing distributed systems
Documentation
use crate::{
    Pid, Process,
    events::PidEvent,
    process::ProcessPtr,
    random::Seed,
    runners::scalable::{
        task::{TaskId, TaskResult},
        thread_number::ThreadNumber,
    },
    services::{
        self, Services,
        process_access::{self, setup_process_access},
        setup_services,
    },
};
use std::{
    sync::{
        Arc,
        mpsc::{self, Receiver, RecvError, SyncSender},
    },
    thread::{self, JoinHandle},
};

pub(crate) struct WorkItem {
    pub task_id: TaskId,
    pub pidx: usize,
    pub event: PidEvent,
}

struct ThreadHandle {
    join: JoinHandle<()>,
    tx: SyncSender<WorkItem>,
}

pub(crate) struct Workers {
    threads: Vec<ThreadHandle>,
    rx: Receiver<TaskResult>,
    num_procs: usize,
}

impl Workers {
    pub(crate) fn new(
        services: Arc<Services>,
        procs: Vec<ProcessPtr>,
        threads: ThreadNumber,
        seed: Seed,
    ) -> Self {
        let num_procs = procs.len();
        let k_threads: usize = threads.into();
        log::warn!("Using {k_threads} threads for simulation");

        let (result_tx, result_rx) =
            mpsc::sync_channel::<TaskResult>((4 * num_procs).next_power_of_two());

        let mut per_thread: Vec<Vec<(usize, ProcessPtr)>> =
            (0..k_threads).map(|_| Vec::new()).collect();

        for (global_id, proc) in procs.into_iter().enumerate() {
            per_thread[global_id % k_threads].push((global_id, proc));
        }

        let thread_handles = per_thread
            .into_iter()
            .map(|owned_procs| {
                let (work_tx, work_rx) =
                    mpsc::sync_channel::<WorkItem>((4 * owned_procs.len()).next_power_of_two());
                let result_tx = result_tx.clone();
                let services_clone = services.clone();
                let join = thread::spawn(move || {
                    setup_services(services_clone.clone());
                    setup_process_access(seed, Some(result_tx));
                    Self::do_work(work_rx, owned_procs)
                });
                ThreadHandle { join, tx: work_tx }
            })
            .collect();

        Self {
            threads: thread_handles,
            rx: result_rx,
            num_procs,
        }
    }

    fn do_work(work_rx: Receiver<WorkItem>, mut procs: Vec<(usize, ProcessPtr)>) {
        while let Ok(WorkItem {
            task_id,
            pidx,
            event,
        }) = work_rx.recv()
        {
            let (pid, ref mut proc) = procs[pidx];
            process_access::prepare_task(task_id, pid);
            match event {
                PidEvent::Start { base_seed } => proc.on_start(base_seed + pid as u64), // Change seed a little bit and prevent resonance
                PidEvent::Message {
                    source, message, ..
                } => proc.on_message(source, message),
                PidEvent::Timer { id, .. } => proc.on_timer(id),
            }
            process_access::notify_coordinator();
        }
        services::reset();
    }

    pub(crate) fn num_procs(&self) -> usize {
        self.num_procs
    }

    pub(crate) fn spawn_event(&self, task_id: TaskId, pid: Pid, event: PidEvent) {
        let k_threads = self.threads.len();

        let thread_idx = pid % k_threads;
        let pidx = pid / k_threads;

        self.threads[thread_idx]
            .tx
            .send(WorkItem {
                task_id,
                pidx,
                event,
            })
            .expect("worker thread has exited unexpectedly");
    }

    pub(crate) fn try_next_result(&self) -> Option<TaskResult> {
        self.rx.try_recv().ok()
    }

    pub(crate) fn next_result(&self) -> Result<TaskResult, RecvError> {
        self.rx.recv()
    }
}

impl Drop for Workers {
    fn drop(&mut self) {
        let handles: Vec<JoinHandle<()>> = self
            .threads
            .drain(..)
            .map(|h| {
                drop(h.tx);
                h.join
            })
            .collect();

        for h in handles {
            h.join().expect("worker thread panicked");
        }
    }
}