bottle 0.1.0

Actor model framework for Rust.
use std::sync::{Weak, Arc, RwLock};
use std::any::Any;
use crate::{Signal, Handler, Remote, Future};

mod query;
pub use query::*;

#[thread_local]
/// The current thread data.
/// It is initialized when `start` is called.
pub(crate) static mut LOCAL_THREAD: Option<Weak<Thread>> = None;

#[thread_local]
pub(crate) static mut USER_DATA: Option<Vec<Box<Any>>> = None;

/// Thread statistics
pub struct Stats {
	/// Number of actors handled by the thread
	pub count: usize
}

pub struct Thread {
	/// Thread's mailbox.
	mx: crossbeam_channel::Sender<Box<Query>>,

	/// Thread input messages.
	input: crossbeam_channel::Receiver<Box<Query>>,

	/// Thread stats.
	stats: RwLock<Stats>,

	/// Current signal receiver.
	receiver: RwLock<Option<Receiver>>,
}

// impl Send for Thread {}

impl Thread {
	pub fn new() -> Arc<Thread> {
		let (mx, input) = crossbeam_channel::unbounded();
		Arc::new(Thread {
			mx: mx,
			input: input,
			stats: RwLock::new(Stats {
				count: 0
			}),
			receiver: RwLock::new(None)
		})
	}

	pub fn ping(&self) {
		println!("ping thread: {:?}", self as *const Thread);
	}

	pub fn query<Q: 'static + Query>(&self, query: Q) {
		self.mx.send(Box::new(query)).unwrap_or(())
	}

	pub fn len(&self) -> usize {
		self.input.len()
	}

	pub fn count(&self) -> usize {
		self.stats.read().unwrap().count
	}

	pub(crate) fn grab(&self) {
		self.stats.write().unwrap().count += 1;
	}

	pub(crate) fn release(&self) {
		self.stats.write().unwrap().count -= 1;
	}

	fn process_query(&self, mut query: Box<Query>) {
		{
			let mut current_receiver = self.receiver.write().unwrap();
			*current_receiver = query.receiver();
		}
		query.process()
	}

	pub fn current_receiver<T>(&self, t: &T) -> Option<Remote<T>> {
		let current_receiver = self.receiver.read().unwrap();
		match &*current_receiver {
			Some(ref receiver) => unsafe { receiver.upgrade(t) },
			None => None
		}
	}

	pub fn r#yield(&self) {
		if let Ok(mut query) = self.input.try_recv() {
			let saved_receiver: Option<Receiver> = *self.receiver.read().unwrap();
			self.process_query(query);
			{
				let mut current_receiver = self.receiver.write().unwrap();
				*current_receiver = saved_receiver;
			}
		}
	}

	pub fn r#await<R>(&self, p: Future<R>) -> crate::future::Result<R> {
		let saved_receiver: Option<Receiver> = *self.receiver.read().unwrap();
		loop {
			select! {
				recv(p.receiver) -> t => {
					return match t {
						Ok(v) => Ok(v),
						Err(_) => Err(crate::future::Error::Broken)
					}
				},
				recv(self.input) -> query => {
					self.process_query(query.unwrap());
					{
						let mut current_receiver = self.receiver.write().unwrap();
						*current_receiver = saved_receiver;
					}
				}
			}
		}
	}

	fn process_all(&self) {
		//println!("thread: {:?}", self as *const Self);
		//println!("started.");
        loop {
			///println!("waiting for query.");
			let mut query = self.input.recv().unwrap();
			//println!("new query.");
			//println!("process query.");
			self.process_query(query)
		}
    }

	pub fn current() -> Option<Arc<Thread>> {
		match unsafe { &LOCAL_THREAD } {
			Some(weak) => weak.upgrade(),
			None => None
		}
	}

	/// Save data in the current thread.
	pub fn save_data<T: Any>(t: T) {
		match unsafe { &mut USER_DATA } {
			Some(user_data) => {
				user_data.push(Box::new(t));
			},
			None => ()
		}
	}

	/// Retreive user data in the current thread.
	pub fn find_user_data<F, T>(f: F) -> Option<T> where F: Fn(&Any) -> Option<T> {
		match unsafe { &USER_DATA } {
			Some(user_data) => {
				for data in user_data.iter() {
					match f(data) {
						Some(t) => return Some(t),
						None => ()
					}
				}
				None
			},
			None => None
		}
	}

	pub fn start(this: &Arc<Thread>) {
		unsafe {
			LOCAL_THREAD = Some(Arc::downgrade(this));
		}
		this.process_all()
	}
}