use std::sync::Arc;
use crate::metrics::MetricsCollector;
use crate::pool::task::IPriority;
use crate::queue::PriorityQueue;
pub enum Steal<T> {
Empty,
Success(T),
Retry,
}
pub struct PriorityInjector<T> {
global_queue: Arc<PriorityQueue<T>>,
}
#[allow(dead_code)]
impl<T: std::cmp::Ord + IPriority> PriorityInjector<T> {
pub fn new() -> Self {
PriorityInjector {
global_queue: Arc::new(PriorityQueue::new()),
}
}
pub fn push(&self, item: T) {
self.global_queue.push(item);
}
pub(self) fn pop(&self) -> Option<T> {
self.global_queue.pop()
}
pub fn steal_batch(&self, dest: &PriorityWorker<T>) -> Steal<()> {
let count = self.global_queue.len();
if count == 0 {
return Steal::Empty;
}
let mut stolen = Vec::new();
for _ in 0..((count + 1) / 2) {
if let Some(task) = self.global_queue.pop() {
stolen.push(task);
}
}
dest.push_batch(stolen);
return Steal::Success(());
}
fn arc_clone(&self) -> Arc<PriorityQueue<T>> {
Arc::clone(&self.global_queue)
}
}
pub struct PriorityWorker<T> {
local: Arc<PriorityQueue<T>>,
}
#[allow(dead_code)]
impl<T: std::cmp::Ord + IPriority> PriorityWorker<T> {
pub fn new() -> Self {
PriorityWorker {
local: Arc::new(PriorityQueue::new()),
}
}
pub fn push(&self, task: T) {
self.local.push(task);
}
pub(self) fn push_batch(&self, items: Vec<T>) {
for item in items {
self.local.push(item);
}
}
pub fn pop(&self) -> Option<T> {
self.local.pop()
}
pub fn stealer(&self) -> PriorityStealer<T> {
PriorityStealer {
local: Arc::clone(&self.local),
}
}
}
impl<T> Clone for PriorityWorker<T> {
fn clone(&self) -> Self {
PriorityWorker {
local: Arc::clone(&self.local),
}
}
}
pub struct PriorityStealer<T> {
local: Arc<PriorityQueue<T>>,
}
impl<T: std::cmp::Ord + IPriority> PriorityStealer<T> {
pub fn steal(&self) -> Steal<T> {
if self.local.len() == 0 {
return Steal::Empty;
}
self.local.pop().map_or(Steal::Retry, Steal::Success)
}
}
pub fn prioritized_work_stealing_queues<T: std::cmp::Ord + IPriority>(
num_workers: usize,
metrics_collector: Option<Arc<dyn MetricsCollector>>,
) -> (
Arc<PriorityInjector<T>>,
Vec<PriorityStealer<T>>,
Vec<PriorityWorker<T>>,
) {
let injector = Arc::new(PriorityInjector::new());
let mut workers = Vec::with_capacity(num_workers);
let mut stealers = Vec::with_capacity(num_workers);
for _ in 0..num_workers {
let w = PriorityWorker::new();
metrics_collector.as_ref().map(|m| m.on_worker_started());
stealers.push(w.stealer());
workers.push(w);
}
(injector, stealers, workers)
}