runtime_loop/
lib.rs

1//! The runtime-loop crate provides a common runtime loop that continuously attempts to find changes in your system, and act upon them accordingly.
2//!
3//! # Installation
4//!
5//! Add this to cargo.toml
6//! ```
7//! [dependencies]
8//! runtime-loop = "0.0.3"
9//! ```
10//!
11//! Add this to your crate
12//! ```
13//! extern crate runtime_loop;
14//! ```
15//!
16//! # Examples
17//! 
18//! ```
19// //! extern crate runtime_loop;
20//! 
21//! use runtime_loop::{RuntimeLoop, Pollable, Signal};
22//! use std::thread;
23//! use std::sync::{Arc, Mutex};
24//! use std::sync::atomic::Ordering;
25//! use std::time;
26//!
27//! // Create memory for the runtime loop. Grab the cancel flag to exit the loop based on time.
28//! let mut rloop = RuntimeLoop::new();
29//! let cancel_flag = rloop.get_cancel_flag();
30//!    
31//! // When polled, increment the value.
32//! // This should normally be your poll function. We increment x as a test.
33//! let target = Arc::new(Mutex::new(Pollable::new(0, Box::new(|x| x+1) ) ) );
34//!
35//! // When the polled value changes, print a message.
36//! if let Ok(mut target) = target.lock() {
37//!     target.add_trigger(Signal::new(
38//!         Box::new(|x| println!("Increment: {:?}", x)) 
39//!     ));
40//! }
41//!
42//! // Add the pollable to the runtime loop.
43//! rloop.add_pollable(target.clone());
44//!              
45//! // This is an async runtime loop.  Though it is acceptable to call rloop.run() without spawning a new thread, spawning a new thread will enable external cancellation of the runtime loop.
46//! let runtime_thread = thread::spawn(move || {
47//!    if let Result::Err(s) = rloop.run() { 
48//!        panic!(s); }
49//! });
50//!
51//! // Run for N milliseconds before canceling and joining threads.  
52//! thread::sleep(time::Duration::from_millis(100)); //let the thread run for a bit
53//! cancel_flag.store(true, Ordering::Relaxed); //Cancel runtime loop!
54//! if let Result::Err(s) = runtime_thread.join() {
55//!    panic!(s);
56//! }
57//! ```
58
59extern crate threadpool;
60extern crate num_cpus;
61extern crate uuid;
62
63//#[allow(unused_imports)]
64use uuid::Uuid;
65use threadpool::ThreadPool;
66use std::collections::HashMap;
67use std::sync::{Arc, Mutex};
68use std::sync::atomic::{Ordering, AtomicBool};
69
70pub struct Signal<T> {
71    lambda: Box<Fn(&T) + Send>,
72}
73
74impl<T> Signal<T> {
75    //! This is a temporary medium for creation of a signal. This will be replaced with an entire crate in the future.
76    
77    pub fn new(lambda: Box<Fn(&T) + Send>) -> Self { Signal{lambda} }
78    
79    fn trigger(&self, data: &T) { 
80        (self.lambda)(data);
81    }
82}
83
84// a pollable trait contains a closure that is called regularly
85// in it's own thread using RuntimeLoop
86pub struct Pollable<T: Send> {
87    data: T,
88    poll_func: Box<Fn(&T) -> T+Send>,
89    triggers: Vec<Signal<T>>, //should signal be a trait or a struct?
90}
91
92impl<T: Send> Pollable<T> {
93    /// Create a new Pollable<T>
94    pub fn new(data: T, poll_func: Box<Fn(&T) -> T+Send>) -> Self { 
95        Pollable{data, 
96                 poll_func, 
97                 triggers: Vec::new(),} 
98    }
99    
100    /// When a Pollable value changes, any trigger closures will execute.
101    pub fn add_trigger(&mut self, signal: Signal<T> ) {
102        self.triggers.push(signal);
103    }
104}
105
106//Lesson Learned: Any PollableTrait must also implement Send!
107pub trait PollableTrait: Send {
108    fn poll(&mut self);
109}
110
111impl<T: Send+PartialEq> PollableTrait for Pollable<T> {
112    fn poll(&mut self) { 
113        let new_data = (self.poll_func)(&self.data);
114        if self.data != new_data {
115            self.data = new_data;
116            for trigger in self.triggers.iter() {
117                trigger.trigger(&self.data);
118            }
119        }
120    } 
121}
122
123pub struct RuntimeLoop {
124    pool: ThreadPool,
125    //shared_observers: HashMap<String, Uuid>,   //TODO for version 0.2.0
126    observers: HashMap<Uuid, Arc<Mutex<PollableTrait>>>,
127    cancel_flag: Arc<AtomicBool>,
128}
129
130impl RuntimeLoop {
131    
132    /// Constructs a new `RuntimeLoop`.
133	pub fn new() -> Self { 
134        RuntimeLoop{ pool: ThreadPool::new(num_cpus::get()),
135                     //shared_observers: HashMap::new(), //TODO for version 0.2.0
136                     observers: HashMap::new(),
137                     cancel_flag: Arc::new(AtomicBool::new(false))} 
138    }
139    
140    /// Sometimes we want to be able to cancel the loop from the outside.
141    /// Before we start this loop in its own thread with run(), we can grab the cancel_flag
142    ///     and set it to true.
143    pub fn get_cancel_flag(&self) -> Arc<AtomicBool> { 
144        self.cancel_flag.clone()
145    }
146    
147    /// The runtime loop will continue to check a list of pollable closures. This is how you add such a closure.
148    pub fn add_pollable(&mut self, pollable: Arc<Mutex<PollableTrait>>) { 
149        self.observers.insert(Uuid::new_v4(), pollable); 
150    }
151    
152    
153    /// run() will cause a blocking loop and return a result.  It is recommended, but unnecessary to call run() by using thread::spawn(...)
154    /// it can be canceled using the cancel flag. See get_cancel_flag().
155	pub fn run(&self) -> Result<(), String> {
156        while self.cancel_flag.load(Ordering::Relaxed) == false {
157            //go through all observers.
158            for observer in self.observers.iter().map(|x|x.1) {
159                //this thread will execute no matter what.
160                //execute in pool ANYWAY. Don't poll if we are locked.
161                let observer = observer.clone();
162                self.pool.execute(move || {
163                    let observer = observer.try_lock();
164                    if let Ok(mut observer) = observer {
165                        observer.poll(); }
166                });
167            }
168
169            // TODO - Some resources say that Sleep(0) will prevent
170            //         the cpu from maxing out all the time... ???
171            //thread::sleep(time::Duration::from_millis(0));
172        }
173        
174        self.pool.join();
175        Ok(())
176    }
177}
178
179#[cfg(test)]
180mod tests {
181    extern crate chrono;
182    
183    use super::*;
184    use std::sync::Mutex;
185    use std::thread;
186    
187    /*extern crate test;
188    use self::test::Bencher;
189	
190    #[bench]
191    fn increment_benchmark(b: &mut Bencher) {
192        b.iter(|| {
193            let mut rloop = RuntimeLoop::new();
194            let cancel_flag = rloop.get_cancel_flag();
195            
196            //when polled, increment the value.
197            let target = Arc::new(Mutex::new(Pollable::new(0, Box::new(move |x| {
198                                                                if x > 50 { cancel_flag.store(true, Ordering::Relaxed); }
199                                                                x+1  //increment x
200                                                            } ) ) ) );
201            
202            rloop.add_pollable(target);
203                          
204            // This is an async runtime loop.
205            if let Result::Err(s) = rloop.run() { 
206                panic!(s); }
207        })
208    }*/
209
210    
211    //Note: Stable code should not use unwrap()
212	#[test]
213	fn increment_test()
214	{
215        let mut rloop = RuntimeLoop::new();
216        let cancel_flag = rloop.get_cancel_flag();
217        
218        //when polled, increment the value.
219        let target = Arc::new(Mutex::new(Pollable::new(0, Box::new(move |x| {
220                                            if *x > 5 { 
221                                                cancel_flag.store(true, Ordering::Relaxed);
222                                            }
223                                            x+1
224                                        } ) ) ) );
225        
226        //push a signaled function
227        target.lock().unwrap().add_trigger(Signal::new(
228                                                Box::new(|x| println!("Increment: {:?}", x))
229                                                ));
230        
231        //we want to poll for target
232        rloop.add_pollable(target.clone());
233                      
234        // This is an async runtime loop.
235		if let Result::Err(s) = rloop.run() { 
236            panic!(s); }
237        
238        println!("Num: {}", target.lock().unwrap().data);
239		assert!(target.lock().unwrap().data > 5);
240    }
241    
242    //Note: Stable code should not use unwrap()
243	#[test]
244	fn clock_test()
245	{
246        use self::chrono::prelude::*;
247        use std::time;
248   
249        let mut rloop = RuntimeLoop::new();
250        let cancel_flag = rloop.get_cancel_flag();
251        
252        //when polled, return the current seconds on the clock.
253        let target = Arc::new(Mutex::new(Pollable::new(0, Box::new(|_x| { 
254                                                                           let dt = Local::now();
255                                                                           dt.second()
256                                                                       } ) ) ) );
257        
258        //push a signaled function
259        //every time this executes, push x to list
260        let my_vec = Arc::new(Mutex::new(Vec::new()));
261        let shared_vec = my_vec.clone();
262        target.lock().unwrap().add_trigger(Signal::new(
263                                                Box::new(move |&x| {
264                                                    println!("Clock: {:?}", x);
265                                                    shared_vec.lock().unwrap().push(x);
266                                                    })
267                                                ));
268        
269        //we want to poll for target
270        rloop.add_pollable(target.clone());
271                      
272        // This is an async runtime loop.
273		let runtime_thread = thread::spawn(move || {
274            if let Result::Err(s) = rloop.run() { 
275                panic!(s); }
276        });
277        
278        // We want the runtime loop to run for N seconds - then return and print the polled value.
279		thread::sleep(time::Duration::from_millis(2000)); //let the thread run for a bit
280        cancel_flag.store(true, Ordering::Relaxed); //Cancel runtime loop!
281        if let Result::Err(s) = runtime_thread.join() {
282            panic!(s);
283        }
284        println!("Num: {}", target.lock().unwrap().data);
285		println!("Count: {}", my_vec.lock().unwrap().len());
286        assert!(my_vec.lock().unwrap().len() >= 3); //2 updates(3 nums) in 2 seconds.
287    }
288    
289}