#![cfg(feature = "threadsafe")]
use std::collections::LinkedList;
use std::error::Error;
use std::fmt::{Display, self, Formatter};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use crate::hexchat::Hexchat;
use crate::hexchat_entry_points::PHEXCHAT;
use crate::user_data::*;
use UserData::*;
const TASK_SPURT_SIZE: i32 = 5;
const TASK_REST_MSECS: i64 = 2;
type TaskQueue = LinkedList<Box<dyn Task>>;
static mut TASK_QUEUE: Option<Arc<Mutex<Option<TaskQueue>>>> = None;
pub(crate) static mut MAIN_THREAD_ID: Option<thread::ThreadId> = None;
trait Task : Send {
fn execute(&mut self, hexchat: &Hexchat);
fn set_error(&mut self, error: &str);
}
struct ConcreteTask<F, R>
where
F: FnMut(&Hexchat) -> R,
R: Clone + Send,
{
callback : F,
result : AsyncResult<R>,
}
impl<F, R> ConcreteTask<F, R>
where
F: FnMut(&Hexchat) -> R,
R: Clone + Send,
{
fn new(callback: F, result: AsyncResult<R>) -> Self {
ConcreteTask {
callback,
result,
}
}
}
impl<F, R> Task for ConcreteTask<F, R>
where
F: FnMut(&Hexchat) -> R,
R: Clone + Send,
{
fn execute(&mut self, hexchat: &Hexchat) {
self.result.set((self.callback)(hexchat));
}
fn set_error(&mut self, error: &str) {
self.result.set_error(error);
}
}
unsafe impl<F, R> Send for ConcreteTask<F, R>
where
F: FnMut(&Hexchat) -> R,
R: Clone + Send,
{}
#[derive(Debug, Clone)]
pub struct TaskError(pub(crate) String);
impl Error for TaskError {}
impl Display for TaskError {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "TaskError({})", self.0)
}
}
#[allow(clippy::type_complexity)]
#[derive(Clone)]
pub struct AsyncResult<T: Clone + Send> {
data: Arc<(Mutex<(Option<Result<T, TaskError>>, bool)>, Condvar)>,
}
unsafe impl<T: Clone + Send> Send for AsyncResult<T> {}
unsafe impl<T: Clone + Send> Sync for AsyncResult<T> {}
impl<T: Clone + Send> AsyncResult<T> {
pub (crate)
fn new() -> Self {
AsyncResult {
data: Arc::new((Mutex::new((None, false)), Condvar::new()))
}
}
pub fn is_done(&self) -> bool {
let (mtx, _) = &*self.data;
mtx.lock().unwrap().1
}
pub fn get(&self) -> Result<T, TaskError> {
let (mtx, cvar) = &*self.data;
let mut guard = mtx.lock().unwrap();
while !guard.1 {
guard = cvar.wait(guard).unwrap();
}
guard.0.take().unwrap()
}
pub (crate)
fn set(&self, result: T) {
let (mtx, cvar) = &*self.data;
let mut guard = mtx.lock().unwrap();
*guard = (Some(Ok(result)), true);
cvar.notify_all();
}
fn set_error(&self, error: &str) {
let (mtx, cvar) = &*self.data;
let mut guard = mtx.lock().unwrap();
*guard = (Some(Err(TaskError(error.into()))), true);
cvar.notify_all();
}
}
pub fn main_thread<F, R>(mut callback: F) -> AsyncResult<R>
where
F: FnMut(&Hexchat) -> R + Sync + Send,
F: 'static + Send,
R: 'static + Clone + Send,
{
if Some(thread::current().id()) == unsafe { MAIN_THREAD_ID } {
let result = callback(unsafe { &*PHEXCHAT });
let res = AsyncResult::new();
res.set(result);
res
} else {
let res = AsyncResult::new();
let cln = res.clone();
if let Some(arc) = unsafe { TASK_QUEUE.as_ref() } {
if let Some(queue) = arc.lock().unwrap().as_mut() {
let task = Box::new(ConcreteTask::new(callback, cln));
queue.push_back(task);
}
else {
res.set_error("Task queue has been shut down.");
}
} else {
res.set_error("Task queue has been shut down.");
}
res
}
}
pub (crate)
fn main_thread_init() {
unsafe { MAIN_THREAD_ID = Some(thread::current().id()) }
if unsafe { TASK_QUEUE.is_none() } {
unsafe {
TASK_QUEUE = Some(Arc::new(Mutex::new(Some(LinkedList::new()))));
}
let hex = unsafe { &*PHEXCHAT };
hex.hook_timer(
TASK_REST_MSECS,
move |_hc, _ud| {
if let Some(arc) = unsafe { TASK_QUEUE.as_ref() } {
if arc.lock().unwrap().is_some() {
let mut count = 1;
while let Some(mut task)
= arc.lock().unwrap().as_mut()
.and_then(|q| q.pop_front()) {
task.execute(hex);
count += 1;
if count > TASK_SPURT_SIZE {
break
}
}
1 } else {
0 }
} else {
0 }
},
NoData);
}
}
pub (crate)
fn main_thread_deinit() {
if let Some(queue) = unsafe { &TASK_QUEUE } {
if let Some(mut queue ) = queue.lock().unwrap().take() {
while let Some(mut task) = queue.pop_front() {
task.set_error("Task queue is being shut down.");
}
}
}
unsafe { TASK_QUEUE = None; }
}
#[deprecated(
since = "0.2.6",
note = "This function is no longer necessary. Threadsafe features can be \
turned off by specifying `features = []` in the Cargo.toml file \
for the `hexchat-api` dependency.")]
pub unsafe fn turn_off_threadsafe_features() {
main_thread_deinit();
}