1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
//!
//! Cache affine thread pool distributor
//!
//! Distributor provides a fair distribution of threads and pinning them to cores for fair execution.
//! It assigns threads in round-robin fashion to all cores.
use crate::placement::{self, CoreId};
use crate::run_queue::{Stealer, Worker};
use crate::worker;
use lightproc::prelude::*;
use std::thread;

pub(crate) struct Distributor {
    pub(crate) cores: Vec<CoreId>,
}

impl Distributor {
    pub(crate) fn new() -> Self {
        Distributor {
            cores: placement::get_core_ids().expect("Core mapping couldn't be fetched"),
        }
    }

    pub(crate) fn assign(self) -> Vec<Stealer<LightProc>> {
        let mut stealers = Vec::<Stealer<LightProc>>::new();

        for core in self.cores {
            let wrk = Worker::new_fifo();
            stealers.push(wrk.stealer());

            thread::Builder::new()
                .name("bastion-async-thread".to_string())
                .spawn(move || {
                    // affinity assignment
                    placement::set_for_current(core);

                    // run initial stats generation for cores
                    worker::stats_generator(core.id, &wrk);
                    // actual execution
                    worker::main_loop(core.id, wrk);
                })
                .expect("cannot start the thread for running proc");
        }

        stealers
    }
}