use anymap3::Entry;
use nice_plug_core::nice_trace;
use parking_lot::Mutex;
use std::sync::{Arc, LazyLock, Weak};
use std::thread::{self, JoinHandle};
use super::MainThreadExecutor;
use crate::util::permit_alloc;
pub(crate) struct BackgroundThread<T, E> {
executor: Weak<E>,
worker_thread: Arc<WorkerThread<T, E>>,
}
struct WorkerThread<T, E> {
tasks_sender: crossbeam::channel::Sender<Message<T, E>>,
join_handle: Option<JoinHandle<()>>,
}
enum Message<T, E> {
Task((T, Weak<E>)),
Shutdown,
}
impl<T, E> BackgroundThread<T, E>
where
T: Send + 'static,
E: MainThreadExecutor<T> + 'static,
{
pub fn get_or_create(executor: Weak<E>) -> Self {
Self {
executor,
worker_thread: get_or_create_worker_thread(),
}
}
pub fn schedule(&self, task: T) -> bool {
permit_alloc(|| {
self.worker_thread
.tasks_sender
.try_send(Message::Task((task, self.executor.clone())))
.is_ok()
})
}
}
static HANDLE_MAP: LazyLock<Mutex<anymap3::Map<dyn std::any::Any + Send>>> =
LazyLock::new(|| Mutex::new(anymap3::Map::new()));
impl<T: Send + 'static, E: MainThreadExecutor<T> + 'static> WorkerThread<T, E> {
fn spawn() -> Self {
let (tasks_sender, tasks_receiver) =
crossbeam::channel::bounded(super::TASK_QUEUE_CAPACITY);
let join_handle = thread::Builder::new()
.name(String::from("bg-worker"))
.spawn(move || worker_thread(tasks_receiver))
.expect("Could not spawn background worker thread");
Self {
join_handle: Some(join_handle),
tasks_sender,
}
}
}
impl<T, E> Drop for WorkerThread<T, E> {
fn drop(&mut self) {
self.tasks_sender
.send(Message::Shutdown)
.expect("Failed while sending worker thread shutdown request");
self.join_handle
.take()
.expect("Missing Worker thread JoinHandle")
.join()
.expect("Worker thread panicked");
}
}
fn get_or_create_worker_thread<T, E>() -> Arc<WorkerThread<T, E>>
where
T: Send + 'static,
E: MainThreadExecutor<T> + 'static,
{
let mut handle_map = HANDLE_MAP.lock();
match handle_map.entry::<Weak<WorkerThread<T, E>>>() {
Entry::Occupied(mut entry) => {
let weak = entry.get_mut();
if let Some(arc) = weak.upgrade() {
arc
} else {
let arc = Arc::new(WorkerThread::spawn());
*weak = Arc::downgrade(&arc);
arc
}
}
Entry::Vacant(entry) => {
let arc = Arc::new(WorkerThread::spawn());
entry.insert(Arc::downgrade(&arc));
arc
}
}
}
fn worker_thread<T, E>(tasks_receiver: crossbeam::channel::Receiver<Message<T, E>>)
where
T: Send,
E: MainThreadExecutor<T> + 'static,
{
loop {
match tasks_receiver.recv() {
Ok(Message::Task((task, executor))) => match executor.upgrade() {
Some(e) => e.execute(task, true),
None => {
nice_trace!(
"Received a new task but the executor is no longer alive, shutting down \
worker"
);
return;
}
},
Ok(Message::Shutdown) => return,
Err(err) => {
nice_trace!(
"Worker thread got disconnected unexpectedly, shutting down: {}",
err
);
return;
}
}
}
}