runtime-loop 0.0.3

Runtime loop for Rust
Documentation
//! The runtime-loop crate provides a common runtime loop that continuously attempts to find changes in your system, and act upon them accordingly.
//!
//! # Installation
//!
//! Add this to cargo.toml
//! ```
//! [dependencies]
//! runtime-loop = "0.0.3"
//! ```
//!
//! Add this to your crate
//! ```
//! extern crate runtime_loop;
//! ```
//!
//! # Examples
//! 
//! ```
// //! extern crate runtime_loop;
//! 
//! use runtime_loop::{RuntimeLoop, Pollable, Signal};
//! use std::thread;
//! use std::sync::{Arc, Mutex};
//! use std::sync::atomic::Ordering;
//! use std::time;
//!
//! // Create memory for the runtime loop. Grab the cancel flag to exit the loop based on time.
//! let mut rloop = RuntimeLoop::new();
//! let cancel_flag = rloop.get_cancel_flag();
//!    
//! // When polled, increment the value.
//! // This should normally be your poll function. We increment x as a test.
//! let target = Arc::new(Mutex::new(Pollable::new(0, Box::new(|x| x+1) ) ) );
//!
//! // When the polled value changes, print a message.
//! if let Ok(mut target) = target.lock() {
//!     target.add_trigger(Signal::new(
//!         Box::new(|x| println!("Increment: {:?}", x)) 
//!     ));
//! }
//!
//! // Add the pollable to the runtime loop.
//! rloop.add_pollable(target.clone());
//!              
//! // This is an async runtime loop.  Though it is acceptable to call rloop.run() without spawning a new thread, spawning a new thread will enable external cancellation of the runtime loop.
//! let runtime_thread = thread::spawn(move || {
//!    if let Result::Err(s) = rloop.run() { 
//!        panic!(s); }
//! });
//!
//! // Run for N milliseconds before canceling and joining threads.  
//! thread::sleep(time::Duration::from_millis(100)); //let the thread run for a bit
//! cancel_flag.store(true, Ordering::Relaxed); //Cancel runtime loop!
//! if let Result::Err(s) = runtime_thread.join() {
//!    panic!(s);
//! }
//! ```

extern crate threadpool;
extern crate num_cpus;
extern crate uuid;

//#[allow(unused_imports)]
use uuid::Uuid;
use threadpool::ThreadPool;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{Ordering, AtomicBool};

pub struct Signal<T> {
    lambda: Box<Fn(&T) + Send>,
}

impl<T> Signal<T> {
    //! This is a temporary medium for creation of a signal. This will be replaced with an entire crate in the future.
    
    pub fn new(lambda: Box<Fn(&T) + Send>) -> Self { Signal{lambda} }
    
    fn trigger(&self, data: &T) { 
        (self.lambda)(data);
    }
}

// a pollable trait contains a closure that is called regularly
// in it's own thread using RuntimeLoop
pub struct Pollable<T: Send> {
    data: T,
    poll_func: Box<Fn(&T) -> T+Send>,
    triggers: Vec<Signal<T>>, //should signal be a trait or a struct?
}

impl<T: Send> Pollable<T> {
    /// Create a new Pollable<T>
    pub fn new(data: T, poll_func: Box<Fn(&T) -> T+Send>) -> Self { 
        Pollable{data, 
                 poll_func, 
                 triggers: Vec::new(),} 
    }
    
    /// When a Pollable value changes, any trigger closures will execute.
    pub fn add_trigger(&mut self, signal: Signal<T> ) {
        self.triggers.push(signal);
    }
}

//Lesson Learned: Any PollableTrait must also implement Send!
pub trait PollableTrait: Send {
    fn poll(&mut self);
}

impl<T: Send+PartialEq> PollableTrait for Pollable<T> {
    fn poll(&mut self) { 
        let new_data = (self.poll_func)(&self.data);
        if self.data != new_data {
            self.data = new_data;
            for trigger in self.triggers.iter() {
                trigger.trigger(&self.data);
            }
        }
    } 
}

pub struct RuntimeLoop {
    pool: ThreadPool,
    //shared_observers: HashMap<String, Uuid>,   //TODO for version 0.2.0
    observers: HashMap<Uuid, Arc<Mutex<PollableTrait>>>,
    cancel_flag: Arc<AtomicBool>,
}

impl RuntimeLoop {
    
    /// Constructs a new `RuntimeLoop`.
	pub fn new() -> Self { 
        RuntimeLoop{ pool: ThreadPool::new(num_cpus::get()),
                     //shared_observers: HashMap::new(), //TODO for version 0.2.0
                     observers: HashMap::new(),
                     cancel_flag: Arc::new(AtomicBool::new(false))} 
    }
    
    /// Sometimes we want to be able to cancel the loop from the outside.
    /// Before we start this loop in its own thread with run(), we can grab the cancel_flag
    ///     and set it to true.
    pub fn get_cancel_flag(&self) -> Arc<AtomicBool> { 
        self.cancel_flag.clone()
    }
    
    /// The runtime loop will continue to check a list of pollable closures. This is how you add such a closure.
    pub fn add_pollable(&mut self, pollable: Arc<Mutex<PollableTrait>>) { 
        self.observers.insert(Uuid::new_v4(), pollable); 
    }
    
    
    /// run() will cause a blocking loop and return a result.  It is recommended, but unnecessary to call run() by using thread::spawn(...)
    /// it can be canceled using the cancel flag. See get_cancel_flag().
	pub fn run(&self) -> Result<(), String> {
        while self.cancel_flag.load(Ordering::Relaxed) == false {
            //go through all observers.
            for observer in self.observers.iter().map(|x|x.1) {
                //this thread will execute no matter what.
                //execute in pool ANYWAY. Don't poll if we are locked.
                let observer = observer.clone();
                self.pool.execute(move || {
                    let observer = observer.try_lock();
                    if let Ok(mut observer) = observer {
                        observer.poll(); }
                });
            }

            // TODO - Some resources say that Sleep(0) will prevent
            //         the cpu from maxing out all the time... ???
            //thread::sleep(time::Duration::from_millis(0));
        }
        
        self.pool.join();
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    extern crate chrono;
    
    use super::*;
    use std::sync::Mutex;
    use std::thread;
    
    /*extern crate test;
    use self::test::Bencher;
	
    #[bench]
    fn increment_benchmark(b: &mut Bencher) {
        b.iter(|| {
            let mut rloop = RuntimeLoop::new();
            let cancel_flag = rloop.get_cancel_flag();
            
            //when polled, increment the value.
            let target = Arc::new(Mutex::new(Pollable::new(0, Box::new(move |x| {
                                                                if x > 50 { cancel_flag.store(true, Ordering::Relaxed); }
                                                                x+1  //increment x
                                                            } ) ) ) );
            
            rloop.add_pollable(target);
                          
            // This is an async runtime loop.
            if let Result::Err(s) = rloop.run() { 
                panic!(s); }
        })
    }*/

    
    //Note: Stable code should not use unwrap()
	#[test]
	fn increment_test()
	{
        let mut rloop = RuntimeLoop::new();
        let cancel_flag = rloop.get_cancel_flag();
        
        //when polled, increment the value.
        let target = Arc::new(Mutex::new(Pollable::new(0, Box::new(move |x| {
                                            if *x > 5 { 
                                                cancel_flag.store(true, Ordering::Relaxed);
                                            }
                                            x+1
                                        } ) ) ) );
        
        //push a signaled function
        target.lock().unwrap().add_trigger(Signal::new(
                                                Box::new(|x| println!("Increment: {:?}", x))
                                                ));
        
        //we want to poll for target
        rloop.add_pollable(target.clone());
                      
        // This is an async runtime loop.
		if let Result::Err(s) = rloop.run() { 
            panic!(s); }
        
        println!("Num: {}", target.lock().unwrap().data);
		assert!(target.lock().unwrap().data > 5);
    }
    
    //Note: Stable code should not use unwrap()
	#[test]
	fn clock_test()
	{
        use self::chrono::prelude::*;
        use std::time;
   
        let mut rloop = RuntimeLoop::new();
        let cancel_flag = rloop.get_cancel_flag();
        
        //when polled, return the current seconds on the clock.
        let target = Arc::new(Mutex::new(Pollable::new(0, Box::new(|_x| { 
                                                                           let dt = Local::now();
                                                                           dt.second()
                                                                       } ) ) ) );
        
        //push a signaled function
        //every time this executes, push x to list
        let my_vec = Arc::new(Mutex::new(Vec::new()));
        let shared_vec = my_vec.clone();
        target.lock().unwrap().add_trigger(Signal::new(
                                                Box::new(move |&x| {
                                                    println!("Clock: {:?}", x);
                                                    shared_vec.lock().unwrap().push(x);
                                                    })
                                                ));
        
        //we want to poll for target
        rloop.add_pollable(target.clone());
                      
        // This is an async runtime loop.
		let runtime_thread = thread::spawn(move || {
            if let Result::Err(s) = rloop.run() { 
                panic!(s); }
        });
        
        // We want the runtime loop to run for N seconds - then return and print the polled value.
		thread::sleep(time::Duration::from_millis(2000)); //let the thread run for a bit
        cancel_flag.store(true, Ordering::Relaxed); //Cancel runtime loop!
        if let Result::Err(s) = runtime_thread.join() {
            panic!(s);
        }
        println!("Num: {}", target.lock().unwrap().data);
		println!("Count: {}", my_vec.lock().unwrap().len());
        assert!(my_vec.lock().unwrap().len() >= 3); //2 updates(3 nums) in 2 seconds.
    }
    
}