use std::sync::{Weak, Arc, RwLock};
use std::any::Any;
use crate::{Signal, Handler, Remote, Future};
mod query;
pub use query::*;
#[thread_local]
pub(crate) static mut LOCAL_THREAD: Option<Weak<Thread>> = None;
#[thread_local]
pub(crate) static mut USER_DATA: Option<Vec<Box<Any>>> = None;
pub struct Stats {
pub count: usize
}
pub struct Thread {
mx: crossbeam_channel::Sender<Box<Query>>,
input: crossbeam_channel::Receiver<Box<Query>>,
stats: RwLock<Stats>,
receiver: RwLock<Option<Receiver>>,
}
impl Thread {
pub fn new() -> Arc<Thread> {
let (mx, input) = crossbeam_channel::unbounded();
Arc::new(Thread {
mx: mx,
input: input,
stats: RwLock::new(Stats {
count: 0
}),
receiver: RwLock::new(None)
})
}
pub fn ping(&self) {
println!("ping thread: {:?}", self as *const Thread);
}
pub fn query<Q: 'static + Query>(&self, query: Q) {
self.mx.send(Box::new(query)).unwrap_or(())
}
pub fn len(&self) -> usize {
self.input.len()
}
pub fn count(&self) -> usize {
self.stats.read().unwrap().count
}
pub(crate) fn grab(&self) {
self.stats.write().unwrap().count += 1;
}
pub(crate) fn release(&self) {
self.stats.write().unwrap().count -= 1;
}
fn process_query(&self, mut query: Box<Query>) {
{
let mut current_receiver = self.receiver.write().unwrap();
*current_receiver = query.receiver();
}
query.process()
}
pub fn current_receiver<T>(&self, t: &T) -> Option<Remote<T>> {
let current_receiver = self.receiver.read().unwrap();
match &*current_receiver {
Some(ref receiver) => unsafe { receiver.upgrade(t) },
None => None
}
}
pub fn r#yield(&self) {
if let Ok(mut query) = self.input.try_recv() {
let saved_receiver: Option<Receiver> = *self.receiver.read().unwrap();
self.process_query(query);
{
let mut current_receiver = self.receiver.write().unwrap();
*current_receiver = saved_receiver;
}
}
}
pub fn r#await<R>(&self, p: Future<R>) -> crate::future::Result<R> {
let saved_receiver: Option<Receiver> = *self.receiver.read().unwrap();
loop {
select! {
recv(p.receiver) -> t => {
return match t {
Ok(v) => Ok(v),
Err(_) => Err(crate::future::Error::Broken)
}
},
recv(self.input) -> query => {
self.process_query(query.unwrap());
{
let mut current_receiver = self.receiver.write().unwrap();
*current_receiver = saved_receiver;
}
}
}
}
}
fn process_all(&self) {
loop {
let mut query = self.input.recv().unwrap();
self.process_query(query)
}
}
pub fn current() -> Option<Arc<Thread>> {
match unsafe { &LOCAL_THREAD } {
Some(weak) => weak.upgrade(),
None => None
}
}
pub fn save_data<T: Any>(t: T) {
match unsafe { &mut USER_DATA } {
Some(user_data) => {
user_data.push(Box::new(t));
},
None => ()
}
}
pub fn find_user_data<F, T>(f: F) -> Option<T> where F: Fn(&Any) -> Option<T> {
match unsafe { &USER_DATA } {
Some(user_data) => {
for data in user_data.iter() {
match f(data) {
Some(t) => return Some(t),
None => ()
}
}
None
},
None => None
}
}
pub fn start(this: &Arc<Thread>) {
unsafe {
LOCAL_THREAD = Some(Arc::downgrade(this));
}
this.process_all()
}
}