use std::collections::{HashMap, VecDeque};
use std::marker::Send;
use std::sync::atomic;
use std::sync::atomic::AtomicI32;
use std::thread;
use std::time::Duration;
use ::app::{Context, Message, Messenger};
use app::sys::{B_PREFERRED_TOKEN, B_QUIT_REQUESTED, QUIT};
use ::kernel::ports::Port;
use ::kernel::INFINITE_TIMEOUT;
use ::support::{ErrorKind, Flattenable, HaikuError, Result};
pub trait Handler<A> where A: Send + 'static {
fn message_received(&mut self, context: &Context<A>, message: &Message);
}
pub(crate) enum HandlerType<A> where A: Send + 'static {
OwnedHandler(Box<dyn Handler<A> + Send>),
LooperState
}
pub struct Looper<A> where A: Send + 'static {
pub(crate) name: String,
pub(crate) port: Port,
pub(crate) message_queue: VecDeque<Message>,
pub(crate) handlers: HashMap<i32, HandlerType<A>>,
pub(crate) preferred_handler: i32,
pub(crate) context: Context<A>,
pub(crate) state: Box<dyn Handler<A> + Send>,
pub(crate) terminating: bool
}
impl<A> Looper<A> where A: Send + 'static {
pub fn name(&self) -> &str {
&self.name
}
pub fn get_messenger(&self) -> Messenger {
Messenger::from_port(&self.port).unwrap()
}
pub fn run(mut self) -> Result<()> {
let _child = thread::spawn(move || {
self.looper_task();
});
Ok(())
}
pub fn add_handler(&mut self, handler: Box<dyn Handler<A> + Send>) {
self.handlers.insert(NEXT_HANDLER_TOKEN.fetch_add(1, atomic::Ordering::Relaxed), HandlerType::OwnedHandler(handler));
}
pub fn add_preferred_handler(&mut self, handler: Box<dyn Handler<A> + Send>) {
let token = NEXT_HANDLER_TOKEN.fetch_add(1, atomic::Ordering::Relaxed);
self.handlers.insert(token, HandlerType::OwnedHandler(handler));
self.preferred_handler = token;
}
pub(crate) fn looper_task(&mut self) {
loop {
if self.message_queue.len() == 0 {
match self.read_message_from_port(INFINITE_TIMEOUT) {
Ok(message) => self.message_queue.push_back(message),
Err(e) => {
println!("[{}] Error getting message: {:?}", self.name(), e);
continue;
}
}
}
let message_count = self.port.get_count().unwrap();
for _ in 0..message_count {
match self.read_message_from_port(Duration::new(0,0)) {
Ok(message) => self.message_queue.push_back(message),
Err(e) => {
println!("Error getting message: {:?}", e);
break;
}
}
}
let mut dispatch_next_message = true;
while dispatch_next_message && ! self.terminating {
let message = self.message_queue.pop_front();
if message.is_none() {
dispatch_next_message = false;
} else {
let message = message.unwrap();
let mut handler_token = message.header.target;
if handler_token == B_PREFERRED_TOKEN {
handler_token = self.preferred_handler;
}
let handler = match self.handlers.get_mut(&handler_token) {
Some(handler) => handler,
None => continue, };
match message.what() {
B_QUIT_REQUESTED => {},
QUIT => { self.terminating = true; },
_ => {
self.context.handler_messenger.set_token(handler_token);
match handler {
HandlerType::OwnedHandler(h) => {
h.message_received(&self.context, &message);
},
HandlerType::LooperState => {
self.state.message_received(&self.context, &message);
}
}
}
}
}
if self.terminating {
break;
}
match self.port.get_count() {
Ok(count) => {
if count > 0 {
dispatch_next_message = false;
}
},
Err(e) => println!("Error getting the port count: {:?}", e)
}
}
if self.terminating {
break;
}
}
}
fn read_message_from_port(&self, timeout: Duration) -> Result<Message> {
let (type_code, buffer) = self.port.try_read(timeout)?;
if type_code as u32 == Message::type_code() {
let message = Message::unflatten(&buffer)?;
Ok(message)
} else {
Err(HaikuError::new(ErrorKind::InvalidData, "the data on the looper's port does not contain a Message"))
}
}
}
pub struct LooperDelegate {
pub messenger: Messenger
}
impl LooperDelegate {
pub fn quit(&self) {
let message = Message::new(QUIT);
self.messenger.send(message, &self.messenger).unwrap();
}
}
pub(crate) static NEXT_HANDLER_TOKEN: AtomicI32 = AtomicI32::new(2);