use std::{
hint,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, Condvar, Mutex, OnceLock,
},
thread::{self},
};
use core_affinity::CoreId;
use log::{debug, error, trace, warn};
use super::configuration::Configuration;
type Func<'a> = Box<dyn FnOnce() + Send + 'a>;
struct Lock {
lock: AtomicBool,
}
impl Lock {
fn new() -> Self {
Lock {
lock: AtomicBool::new(false),
}
}
fn lock(&self) {
while self
.lock
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
hint::spin_loop()
}
}
fn unlock(&self) {
self.lock.store(false, Ordering::Release);
}
}
impl Drop for Lock {
fn drop(&mut self) {
self.unlock();
}
}
pub enum Job {
NewJob(Func<'static>),
Terminate,
}
#[derive(Debug)]
pub(crate) struct JobInfo {
status: Arc<AtomicBool>,
}
impl JobInfo {
fn new() -> JobInfo {
JobInfo {
status: Arc::new(AtomicBool::new(false)),
}
}
pub(crate) fn wait(&self) {
while !self.status.load(Ordering::Relaxed) {
hint::spin_loop();
}
}
}
struct Executor {
thread: Thread,
global_lock: Arc<Lock>,
status: Arc<AtomicBool>,
available_workers: Arc<AtomicUsize>,
queue: Arc<Mutex<Vec<Job>>>,
cvar: Arc<Condvar>,
}
impl Executor {
fn new(
core_id: CoreId,
config: Arc<Configuration>,
global_lock: Arc<Lock>,
available_workers: Arc<AtomicUsize>,
) -> Executor {
let status = Arc::new(AtomicBool::new(false));
let cvar = Arc::new(Condvar::new());
let queue = Arc::new(Mutex::new(Vec::with_capacity(1)));
let worker = ExecutorInfo::new(
status.clone(),
global_lock.clone(),
available_workers.clone(),
queue.clone(),
cvar.clone(),
);
let thread = Thread::new(
core_id,
move || {
worker.run();
},
config,
);
Executor {
thread,
global_lock,
status,
available_workers,
queue,
cvar,
}
}
fn get_status(&self) -> bool {
self.global_lock.lock();
let res = self.status.load(Ordering::Relaxed);
self.global_lock.unlock();
res
}
fn push(&self, job: Job) {
self.warn_busy();
let mut queue = self.queue.lock().unwrap();
assert!(queue.is_empty());
queue.push(job);
self.cvar.notify_one();
}
fn warn_busy(&self) {
self.global_lock.lock();
self.status.store(false, Ordering::Relaxed);
let _ = self.available_workers.fetch_update(
Ordering::Relaxed,
Ordering::Relaxed,
|x| -> Option<usize> { Some(x.saturating_sub(1)) },
);
self.global_lock.unlock();
}
fn join(&mut self) {
self.thread.join();
}
}
struct ExecutorInfo {
status: Arc<AtomicBool>,
global_lock: Arc<Lock>,
available_workers: Arc<AtomicUsize>,
queue: Arc<Mutex<Vec<Job>>>,
cvar: Arc<Condvar>,
}
impl ExecutorInfo {
fn new(
status: Arc<AtomicBool>,
global_lock: Arc<Lock>,
available_workers: Arc<AtomicUsize>,
queue: Arc<Mutex<Vec<Job>>>,
cvar: Arc<Condvar>,
) -> ExecutorInfo {
ExecutorInfo {
status,
global_lock,
available_workers,
queue,
cvar,
}
}
fn warn_available(&self) {
self.global_lock.lock();
self.status.store(true, Ordering::Relaxed);
self.available_workers.fetch_add(1, Ordering::Relaxed);
self.global_lock.unlock();
}
fn run(&self) {
loop {
if let Some(job) = self.fetch_job() {
match job {
Job::NewJob(f) => {
f();
self.warn_available();
}
Job::Terminate => {
break;
}
}
}
}
}
fn fetch_job(&self) -> Option<Job> {
let mut queue = self.queue.lock().unwrap();
let mut job = queue.pop();
while job.is_none() {
queue = self.cvar.wait(queue).unwrap();
job = queue.pop();
}
job
}
}
struct Thread {
thread: Option<thread::JoinHandle<()>>,
}
impl Thread {
fn new<F>(core_id: CoreId, f: F, configuration: Arc<Configuration>) -> Thread
where
F: FnOnce() + Send + 'static,
{
Thread {
thread: Some(thread::spawn(move || {
if configuration.get_pinning() {
let err = core_affinity::set_for_current(core_id);
if !err {
error!("Thread pinning on core {} failed!", core_id.id);
} else {
trace!("Thread pinned on core {}.", core_id.id);
}
}
trace!("{:?} started", thread::current().id());
(f)();
trace!("{:?} now will end.", thread::current().id());
})),
}
}
fn join(&mut self) {
if let Some(thread) = self.thread.take() {
thread.join().unwrap();
}
}
}
pub struct Partition {
core_id: CoreId,
workers: Mutex<Vec<Executor>>,
global_lock: Arc<Lock>,
available_workers: Arc<AtomicUsize>,
configuration: Arc<Configuration>,
}
impl Partition {
fn new(core_id: usize, configuration: Arc<Configuration>) -> Partition {
let workers = Vec::new();
let core_id = configuration.get_thread_mapping()[core_id];
Partition {
core_id,
workers: Mutex::new(workers),
global_lock: Arc::new(Lock::new()),
available_workers: Arc::new(AtomicUsize::new(0)),
configuration,
}
}
fn get_worker_count(&self) -> usize {
self.workers.lock().unwrap().len()
}
fn get_busy_worker_count(&self) -> usize {
self.get_worker_count() - self.get_free_worker_count()
}
fn get_free_worker_count(&self) -> usize {
self.global_lock.lock();
let res = self.available_workers.load(Ordering::Acquire);
self.global_lock.unlock();
res
}
fn find_executor(workers: &mut Vec<Executor>) -> Option<Executor> {
for i in 0..workers.len() {
if workers[i].get_status() {
return Some(workers.remove(i));
}
}
None
}
fn push<F>(&self, f: F) -> JobInfo
where
F: FnOnce() + Send + 'static,
{
let job_info = JobInfo::new();
let job_info_clone = Arc::clone(&job_info.status);
let job = Job::NewJob(Box::new(move || {
f();
job_info_clone.store(true, Ordering::Relaxed);
}));
let mut workers = self.workers.lock().unwrap();
let worker = Self::find_executor(&mut workers);
match worker {
Some(executor) => {
executor.push(job);
workers.push(executor);
}
None => {
let executor = Executor::new(
self.core_id,
self.configuration.clone(),
self.global_lock.clone(),
self.available_workers.clone(),
);
executor.push(job);
workers.push(executor);
}
}
job_info
}
}
impl Drop for Partition {
fn drop(&mut self) {
let mut workers = self.workers.lock().unwrap();
for worker in workers.iter_mut() {
worker.push(Job::Terminate);
}
for worker in workers.iter_mut() {
worker.join();
}
}
}
pub struct Orchestrator {
partitions: Vec<Partition>,
configuration: Arc<Configuration>,
}
static mut ORCHESTRATOR: OnceLock<Arc<Orchestrator>> = OnceLock::new();
pub(crate) fn new() -> Arc<Orchestrator> {
Arc::new(Orchestrator::new(Arc::new(Configuration::new_default())))
}
pub fn get_global_orchestrator() -> Arc<Orchestrator> {
unsafe {
ORCHESTRATOR
.get_or_init(|| -> Arc<Orchestrator> { new() })
.clone()
}
}
impl Orchestrator {
fn new(configuration: Arc<Configuration>) -> Orchestrator {
let mut partitions = Vec::new();
let mut max_cores = 1;
if configuration.get_pinning() {
max_cores = configuration.get_max_cores();
if cfg!(target_os = "macos") {
warn!("Thread pinning is not currently supported for Apple Silicon.");
}
}
for i in 0..max_cores {
partitions.push(Partition::new(i, Arc::clone(&configuration)));
}
Orchestrator {
partitions,
configuration,
}
}
fn find_partition(partitions: &[Partition]) -> Option<&Partition> {
partitions.iter().min_by_key(|p| p.get_busy_worker_count())
}
fn find_partitions_sequence(partitions: &[Partition], count: usize) -> Option<&[Partition]> {
if count > partitions.len() {
return None;
}
let mut min = None;
let mut min_busy = usize::MAX;
let mut busy = partitions
.iter()
.take(count)
.map(|p| p.get_busy_worker_count())
.sum();
if busy == 0 {
return Some(&partitions[0..count]);
}
if busy < min_busy {
min = Some(&partitions[0..count]);
min_busy = busy;
}
for i in count..partitions.len() {
busy -= partitions[i - count].get_busy_worker_count();
busy += partitions[i].get_busy_worker_count();
if busy == 0 {
return Some(&partitions[i - count + 1..=i]);
}
if busy < min_busy {
min = Some(&partitions[i - count + 1..=i]);
min_busy = busy;
}
}
min
}
fn push_single<F>(partitions: &[Partition], f: F) -> JobInfo
where
F: FnOnce() + Send + 'static,
{
let partition = Self::find_partition(partitions);
match partition {
Some(p) => p.push(f),
None => panic!("No partition found!"), }
}
pub(crate) fn push_jobs<F>(&self, mut f: Vec<F>) -> Vec<JobInfo>
where
F: FnOnce() + Send + 'static,
{
let mut job_info = Vec::with_capacity(f.len());
let mut partitions = None;
if f.len() > 1 {
partitions = Self::find_partitions_sequence(&self.partitions, f.len());
}
match partitions {
Some(p) => {
for partition in p {
let func = f.remove(0);
job_info.push(partition.push(move || {
func();
}));
}
}
None => {
for _i in 0..f.len() {
let func = f.remove(0);
job_info.push(Self::push_single(&self.partitions, move || {
func();
}));
}
}
}
job_info
}
pub(crate) fn get_configuration(&self) -> Arc<Configuration> {
Arc::clone(&self.configuration)
}
pub unsafe fn delete_global_orchestrator() {
unsafe {
drop(ORCHESTRATOR.take());
}
}
}
impl Drop for Orchestrator {
fn drop(&mut self) {
while !self.partitions.is_empty() {
let partition = self.partitions.remove(0);
debug!(
"Total worker for Partition[{}]: {}",
partition.core_id.id,
partition.get_worker_count()
)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use serial_test::serial;
use std::sync::{atomic::AtomicUsize, Arc};
#[test]
#[serial]
fn test_orchestrator() {
let configuration = Arc::new(Configuration::new_default());
let orchestrator = Orchestrator::new(Arc::clone(&configuration));
let counter = Arc::new(AtomicUsize::new(0));
let mut jobs_info = Vec::with_capacity(1000);
(0..1000).for_each(|_| {
let counter_clone = counter.clone();
let mut job_info = orchestrator.push_jobs(vec![move || {
counter_clone.fetch_add(1, Ordering::AcqRel);
}]);
jobs_info.push(job_info.remove(0));
});
for job_info in jobs_info {
job_info.wait();
}
assert_eq!(counter.load(Ordering::Acquire), 1000);
}
#[test]
#[serial]
fn test_global_orchestrator() {
let orchestrator = get_global_orchestrator();
let counter = Arc::new(AtomicUsize::new(0));
let mut jobs_info = Vec::with_capacity(1000);
(0..1000).for_each(|_| {
let counter_clone = counter.clone();
let mut job_info = orchestrator.push_jobs(vec![move || {
counter_clone.fetch_add(1, Ordering::AcqRel);
}]);
jobs_info.push(job_info.remove(0));
});
for job_info in jobs_info {
job_info.wait();
}
assert_eq!(counter.load(Ordering::Acquire), 1000);
unsafe {
Orchestrator::delete_global_orchestrator();
}
}
}