use once_cell::sync::Lazy;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::RwLock;
use std::thread;
use std::time::Duration;
use super::{DispatchError, DispatchGuard, Dispatcher};
use crossbeam_channel::RecvTimeoutError;
pub const GLOBAL_DISPATCHER_LIMIT: usize = 1000000;
static GLOBAL_DISPATCHER: Lazy<RwLock<Option<Dispatcher>>> =
Lazy::new(|| RwLock::new(Some(Dispatcher::new(GLOBAL_DISPATCHER_LIMIT))));
pub static TESTING_MODE: AtomicBool = AtomicBool::new(false);
pub static QUEUE_TASKS: AtomicBool = AtomicBool::new(true);
pub fn is_test_mode() -> bool {
TESTING_MODE.load(Ordering::SeqCst)
}
fn guard() -> DispatchGuard {
GLOBAL_DISPATCHER
.read()
.unwrap()
.as_ref()
.map(|dispatcher| dispatcher.guard())
.unwrap()
}
pub fn launch(task: impl FnOnce() + Send + 'static) {
let current_thread = thread::current();
if let Some("glean.shutdown") = current_thread.name() {
log::error!("Tried to launch a task from the shutdown thread. That is forbidden.");
}
let guard = guard();
match guard.launch(task) {
Ok(_) => {}
Err(DispatchError::QueueFull) => {
log::info!("Exceeded maximum queue size, discarding task");
}
Err(_) => {
log::debug!("Failed to launch a task on the queue. Discarding task.");
}
}
let is_queueing = QUEUE_TASKS.load(Ordering::SeqCst);
let is_test = TESTING_MODE.load(Ordering::SeqCst);
if !is_queueing && is_test {
guard.block_on_queue();
}
}
pub fn block_on_queue() {
guard().block_on_queue();
}
pub fn block_on_queue_timeout(timeout: Duration) -> Result<(), RecvTimeoutError> {
guard().block_on_queue_timeout(timeout)
}
pub fn flush_init() -> Result<usize, DispatchError> {
guard().flush_init()
}
fn join_dispatcher_thread() -> Result<(), DispatchError> {
let mut lock = GLOBAL_DISPATCHER.write().unwrap();
let dispatcher = lock.as_mut().expect("Global dispatcher has gone missing");
if let Some(worker) = dispatcher.worker.take() {
return worker.join().map_err(|_| DispatchError::WorkerPanic);
}
Ok(())
}
pub fn kill() -> Result<(), DispatchError> {
guard().kill()?;
join_dispatcher_thread()
}
pub fn shutdown() -> Result<(), DispatchError> {
guard().shutdown()?;
join_dispatcher_thread()
}
pub fn reset_dispatcher() {
let _ = shutdown();
QUEUE_TASKS.store(true, Ordering::SeqCst);
let mut lock = GLOBAL_DISPATCHER.write().unwrap();
let new_dispatcher = Some(Dispatcher::new(GLOBAL_DISPATCHER_LIMIT));
*lock = new_dispatcher;
}
#[cfg(test)]
mod test {
use std::sync::{Arc, Mutex};
use super::*;
#[test]
#[ignore] fn global_fills_up_in_order_and_works() {
let _ = env_logger::builder().is_test(true).try_init();
let result = Arc::new(Mutex::new(vec![]));
for i in 1..=GLOBAL_DISPATCHER_LIMIT {
let result = Arc::clone(&result);
launch(move || {
result.lock().unwrap().push(i);
});
}
{
let result = Arc::clone(&result);
launch(move || {
result.lock().unwrap().push(150);
});
}
flush_init().unwrap();
{
let result = Arc::clone(&result);
launch(move || {
result.lock().unwrap().push(200);
});
}
block_on_queue();
let mut expected = (1..=GLOBAL_DISPATCHER_LIMIT).collect::<Vec<_>>();
expected.push(200);
assert_eq!(&*result.lock().unwrap(), &expected);
}
#[test]
#[ignore] fn global_nested_calls() {
let _ = env_logger::builder().is_test(true).try_init();
let result = Arc::new(Mutex::new(vec![]));
{
let result = Arc::clone(&result);
launch(move || {
result.lock().unwrap().push(1);
});
}
flush_init().unwrap();
{
let result = Arc::clone(&result);
launch(move || {
result.lock().unwrap().push(21);
{
let result = Arc::clone(&result);
launch(move || {
result.lock().unwrap().push(3);
});
}
result.lock().unwrap().push(22);
});
}
block_on_queue();
let expected = vec![1, 21, 22, 3];
assert_eq!(&*result.lock().unwrap(), &expected);
}
}