extern crate threadpool;
extern crate num_cpus;
extern crate uuid;
use uuid::Uuid;
use threadpool::ThreadPool;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{Ordering, AtomicBool};
pub struct Signal<T> {
lambda: Box<Fn(&T) + Send>,
}
impl<T> Signal<T> {
pub fn new(lambda: Box<Fn(&T) + Send>) -> Self { Signal{lambda} }
fn trigger(&self, data: &T) {
(self.lambda)(data);
}
}
pub struct Pollable<T: Send> {
data: T,
poll_func: Box<Fn(&T) -> T+Send>,
triggers: Vec<Signal<T>>, }
impl<T: Send> Pollable<T> {
pub fn new(data: T, poll_func: Box<Fn(&T) -> T+Send>) -> Self {
Pollable{data,
poll_func,
triggers: Vec::new(),}
}
pub fn add_trigger(&mut self, signal: Signal<T> ) {
self.triggers.push(signal);
}
}
pub trait PollableTrait: Send {
fn poll(&mut self);
}
impl<T: Send+PartialEq> PollableTrait for Pollable<T> {
fn poll(&mut self) {
let new_data = (self.poll_func)(&self.data);
if self.data != new_data {
self.data = new_data;
for trigger in self.triggers.iter() {
trigger.trigger(&self.data);
}
}
}
}
pub struct RuntimeLoop {
pool: ThreadPool,
observers: HashMap<Uuid, Arc<Mutex<PollableTrait>>>,
cancel_flag: Arc<AtomicBool>,
}
impl RuntimeLoop {
pub fn new() -> Self {
RuntimeLoop{ pool: ThreadPool::new(num_cpus::get()),
observers: HashMap::new(),
cancel_flag: Arc::new(AtomicBool::new(false))}
}
pub fn get_cancel_flag(&self) -> Arc<AtomicBool> {
self.cancel_flag.clone()
}
pub fn add_pollable(&mut self, pollable: Arc<Mutex<PollableTrait>>) {
self.observers.insert(Uuid::new_v4(), pollable);
}
pub fn run(&self) -> Result<(), String> {
while self.cancel_flag.load(Ordering::Relaxed) == false {
for observer in self.observers.iter().map(|x|x.1) {
let observer = observer.clone();
self.pool.execute(move || {
let observer = observer.try_lock();
if let Ok(mut observer) = observer {
observer.poll(); }
});
}
}
self.pool.join();
Ok(())
}
}
#[cfg(test)]
mod tests {
extern crate chrono;
use super::*;
use std::sync::Mutex;
use std::thread;
#[test]
fn increment_test()
{
let mut rloop = RuntimeLoop::new();
let cancel_flag = rloop.get_cancel_flag();
let target = Arc::new(Mutex::new(Pollable::new(0, Box::new(move |x| {
if *x > 5 {
cancel_flag.store(true, Ordering::Relaxed);
}
x+1
} ) ) ) );
target.lock().unwrap().add_trigger(Signal::new(
Box::new(|x| println!("Increment: {:?}", x))
));
rloop.add_pollable(target.clone());
if let Result::Err(s) = rloop.run() {
panic!(s); }
println!("Num: {}", target.lock().unwrap().data);
assert!(target.lock().unwrap().data > 5);
}
#[test]
fn clock_test()
{
use self::chrono::prelude::*;
use std::time;
let mut rloop = RuntimeLoop::new();
let cancel_flag = rloop.get_cancel_flag();
let target = Arc::new(Mutex::new(Pollable::new(0, Box::new(|_x| {
let dt = Local::now();
dt.second()
} ) ) ) );
let my_vec = Arc::new(Mutex::new(Vec::new()));
let shared_vec = my_vec.clone();
target.lock().unwrap().add_trigger(Signal::new(
Box::new(move |&x| {
println!("Clock: {:?}", x);
shared_vec.lock().unwrap().push(x);
})
));
rloop.add_pollable(target.clone());
let runtime_thread = thread::spawn(move || {
if let Result::Err(s) = rloop.run() {
panic!(s); }
});
thread::sleep(time::Duration::from_millis(2000)); cancel_flag.store(true, Ordering::Relaxed); if let Result::Err(s) = runtime_thread.join() {
panic!(s);
}
println!("Num: {}", target.lock().unwrap().data);
println!("Count: {}", my_vec.lock().unwrap().len());
assert!(my_vec.lock().unwrap().len() >= 3); }
}