use std::{sync::Arc, time::Duration};
use crossbeam_deque::{Injector, Steal, Stealer, Worker};
use crate::schedule::{
config::{
DEFAULT_STEALER_INTERVAL_TIME, DEFAULT_TASK_POOL_SIZE, DEFAULT_TICK_TIME, MAX_BACKOFF,
},
task_actor::ITaskHandler,
};
pub(super) struct TasksManager {
global_queue: Arc<Injector<Arc<Box<dyn ITaskHandler>>>>,
}
impl TasksManager {
pub(crate) fn new() -> Self {
let sender = Arc::new(Injector::new());
Self {
global_queue: sender,
}
}
pub(crate) fn get_global_queue(&self) -> Arc<Injector<Arc<Box<dyn ITaskHandler>>>> {
self.global_queue.clone()
}
pub(crate) fn start(&self) {
let mut local_queues: Vec<Worker<Arc<Box<dyn ITaskHandler>>>> =
Vec::with_capacity(DEFAULT_TASK_POOL_SIZE);
let mut stealers: Vec<Stealer<Arc<Box<dyn ITaskHandler>>>> =
Vec::with_capacity(DEFAULT_TASK_POOL_SIZE);
for _ in 0..DEFAULT_TASK_POOL_SIZE {
let worker = Worker::new_lifo();
stealers.push(worker.stealer());
local_queues.push(worker);
}
let global_queue = self.global_queue.clone();
let stealers_arc = Arc::new(stealers);
for local_queue in local_queues {
let local_queue = local_queue;
let global_queue = global_queue.clone();
let stealers_arc = stealers_arc.clone();
tokio::spawn(async move {
let mut failed_steal_attempts = 0;
loop {
if let Some(task) = get_task(&local_queue, &global_queue, &stealers_arc) {
task.run().await;
} else {
if failed_steal_attempts < MAX_BACKOFF {
failed_steal_attempts += 1;
}
let sleep_duration = Duration::from_millis(
DEFAULT_STEALER_INTERVAL_TIME * (1 << failed_steal_attempts),
)
.min(Duration::from_millis(100));
tokio::time::sleep(sleep_duration).await;
}
}
});
}
}
}
#[inline(always)]
fn get_task(
local_queue: &Worker<Arc<Box<dyn ITaskHandler>>>,
global_queue: &Arc<Injector<Arc<Box<dyn ITaskHandler>>>>,
stealers_arc: &Arc<Vec<Stealer<Arc<Box<dyn ITaskHandler>>>>>,
) -> Option<Arc<Box<dyn ITaskHandler>>> {
if let Some(event) = local_queue.pop() {
return Some(event);
}
if let Steal::Success(event) = global_queue.steal_batch_and_pop(local_queue) {
return Some(event);
}
for i in 0..stealers_arc.len() {
if let Steal::Success(event) = stealers_arc[i].steal() {
return Some(event);
}
}
None
}