runtime_loop/lib.rs
1//! The runtime-loop crate provides a common runtime loop that continuously attempts to find changes in your system, and act upon them accordingly.
2//!
3//! # Installation
4//!
5//! Add this to cargo.toml
6//! ```
7//! [dependencies]
8//! runtime-loop = "0.0.3"
9//! ```
10//!
11//! Add this to your crate
12//! ```
13//! extern crate runtime_loop;
14//! ```
15//!
16//! # Examples
17//!
18//! ```
19// //! extern crate runtime_loop;
20//!
21//! use runtime_loop::{RuntimeLoop, Pollable, Signal};
22//! use std::thread;
23//! use std::sync::{Arc, Mutex};
24//! use std::sync::atomic::Ordering;
25//! use std::time;
26//!
27//! // Create memory for the runtime loop. Grab the cancel flag to exit the loop based on time.
28//! let mut rloop = RuntimeLoop::new();
29//! let cancel_flag = rloop.get_cancel_flag();
30//!
31//! // When polled, increment the value.
32//! // This should normally be your poll function. We increment x as a test.
33//! let target = Arc::new(Mutex::new(Pollable::new(0, Box::new(|x| x+1) ) ) );
34//!
35//! // When the polled value changes, print a message.
36//! if let Ok(mut target) = target.lock() {
37//! target.add_trigger(Signal::new(
38//! Box::new(|x| println!("Increment: {:?}", x))
39//! ));
40//! }
41//!
42//! // Add the pollable to the runtime loop.
43//! rloop.add_pollable(target.clone());
44//!
45//! // This is an async runtime loop. Though it is acceptable to call rloop.run() without spawning a new thread, spawning a new thread will enable external cancellation of the runtime loop.
46//! let runtime_thread = thread::spawn(move || {
47//! if let Result::Err(s) = rloop.run() {
48//! panic!(s); }
49//! });
50//!
51//! // Run for N milliseconds before canceling and joining threads.
52//! thread::sleep(time::Duration::from_millis(100)); //let the thread run for a bit
53//! cancel_flag.store(true, Ordering::Relaxed); //Cancel runtime loop!
54//! if let Result::Err(s) = runtime_thread.join() {
55//! panic!(s);
56//! }
57//! ```
58
59extern crate threadpool;
60extern crate num_cpus;
61extern crate uuid;
62
63//#[allow(unused_imports)]
64use uuid::Uuid;
65use threadpool::ThreadPool;
66use std::collections::HashMap;
67use std::sync::{Arc, Mutex};
68use std::sync::atomic::{Ordering, AtomicBool};
69
70pub struct Signal<T> {
71 lambda: Box<Fn(&T) + Send>,
72}
73
74impl<T> Signal<T> {
75 //! This is a temporary medium for creation of a signal. This will be replaced with an entire crate in the future.
76
77 pub fn new(lambda: Box<Fn(&T) + Send>) -> Self { Signal{lambda} }
78
79 fn trigger(&self, data: &T) {
80 (self.lambda)(data);
81 }
82}
83
84// a pollable trait contains a closure that is called regularly
85// in it's own thread using RuntimeLoop
86pub struct Pollable<T: Send> {
87 data: T,
88 poll_func: Box<Fn(&T) -> T+Send>,
89 triggers: Vec<Signal<T>>, //should signal be a trait or a struct?
90}
91
92impl<T: Send> Pollable<T> {
93 /// Create a new Pollable<T>
94 pub fn new(data: T, poll_func: Box<Fn(&T) -> T+Send>) -> Self {
95 Pollable{data,
96 poll_func,
97 triggers: Vec::new(),}
98 }
99
100 /// When a Pollable value changes, any trigger closures will execute.
101 pub fn add_trigger(&mut self, signal: Signal<T> ) {
102 self.triggers.push(signal);
103 }
104}
105
106//Lesson Learned: Any PollableTrait must also implement Send!
107pub trait PollableTrait: Send {
108 fn poll(&mut self);
109}
110
111impl<T: Send+PartialEq> PollableTrait for Pollable<T> {
112 fn poll(&mut self) {
113 let new_data = (self.poll_func)(&self.data);
114 if self.data != new_data {
115 self.data = new_data;
116 for trigger in self.triggers.iter() {
117 trigger.trigger(&self.data);
118 }
119 }
120 }
121}
122
123pub struct RuntimeLoop {
124 pool: ThreadPool,
125 //shared_observers: HashMap<String, Uuid>, //TODO for version 0.2.0
126 observers: HashMap<Uuid, Arc<Mutex<PollableTrait>>>,
127 cancel_flag: Arc<AtomicBool>,
128}
129
130impl RuntimeLoop {
131
132 /// Constructs a new `RuntimeLoop`.
133 pub fn new() -> Self {
134 RuntimeLoop{ pool: ThreadPool::new(num_cpus::get()),
135 //shared_observers: HashMap::new(), //TODO for version 0.2.0
136 observers: HashMap::new(),
137 cancel_flag: Arc::new(AtomicBool::new(false))}
138 }
139
140 /// Sometimes we want to be able to cancel the loop from the outside.
141 /// Before we start this loop in its own thread with run(), we can grab the cancel_flag
142 /// and set it to true.
143 pub fn get_cancel_flag(&self) -> Arc<AtomicBool> {
144 self.cancel_flag.clone()
145 }
146
147 /// The runtime loop will continue to check a list of pollable closures. This is how you add such a closure.
148 pub fn add_pollable(&mut self, pollable: Arc<Mutex<PollableTrait>>) {
149 self.observers.insert(Uuid::new_v4(), pollable);
150 }
151
152
153 /// run() will cause a blocking loop and return a result. It is recommended, but unnecessary to call run() by using thread::spawn(...)
154 /// it can be canceled using the cancel flag. See get_cancel_flag().
155 pub fn run(&self) -> Result<(), String> {
156 while self.cancel_flag.load(Ordering::Relaxed) == false {
157 //go through all observers.
158 for observer in self.observers.iter().map(|x|x.1) {
159 //this thread will execute no matter what.
160 //execute in pool ANYWAY. Don't poll if we are locked.
161 let observer = observer.clone();
162 self.pool.execute(move || {
163 let observer = observer.try_lock();
164 if let Ok(mut observer) = observer {
165 observer.poll(); }
166 });
167 }
168
169 // TODO - Some resources say that Sleep(0) will prevent
170 // the cpu from maxing out all the time... ???
171 //thread::sleep(time::Duration::from_millis(0));
172 }
173
174 self.pool.join();
175 Ok(())
176 }
177}
178
179#[cfg(test)]
180mod tests {
181 extern crate chrono;
182
183 use super::*;
184 use std::sync::Mutex;
185 use std::thread;
186
187 /*extern crate test;
188 use self::test::Bencher;
189
190 #[bench]
191 fn increment_benchmark(b: &mut Bencher) {
192 b.iter(|| {
193 let mut rloop = RuntimeLoop::new();
194 let cancel_flag = rloop.get_cancel_flag();
195
196 //when polled, increment the value.
197 let target = Arc::new(Mutex::new(Pollable::new(0, Box::new(move |x| {
198 if x > 50 { cancel_flag.store(true, Ordering::Relaxed); }
199 x+1 //increment x
200 } ) ) ) );
201
202 rloop.add_pollable(target);
203
204 // This is an async runtime loop.
205 if let Result::Err(s) = rloop.run() {
206 panic!(s); }
207 })
208 }*/
209
210
211 //Note: Stable code should not use unwrap()
212 #[test]
213 fn increment_test()
214 {
215 let mut rloop = RuntimeLoop::new();
216 let cancel_flag = rloop.get_cancel_flag();
217
218 //when polled, increment the value.
219 let target = Arc::new(Mutex::new(Pollable::new(0, Box::new(move |x| {
220 if *x > 5 {
221 cancel_flag.store(true, Ordering::Relaxed);
222 }
223 x+1
224 } ) ) ) );
225
226 //push a signaled function
227 target.lock().unwrap().add_trigger(Signal::new(
228 Box::new(|x| println!("Increment: {:?}", x))
229 ));
230
231 //we want to poll for target
232 rloop.add_pollable(target.clone());
233
234 // This is an async runtime loop.
235 if let Result::Err(s) = rloop.run() {
236 panic!(s); }
237
238 println!("Num: {}", target.lock().unwrap().data);
239 assert!(target.lock().unwrap().data > 5);
240 }
241
242 //Note: Stable code should not use unwrap()
243 #[test]
244 fn clock_test()
245 {
246 use self::chrono::prelude::*;
247 use std::time;
248
249 let mut rloop = RuntimeLoop::new();
250 let cancel_flag = rloop.get_cancel_flag();
251
252 //when polled, return the current seconds on the clock.
253 let target = Arc::new(Mutex::new(Pollable::new(0, Box::new(|_x| {
254 let dt = Local::now();
255 dt.second()
256 } ) ) ) );
257
258 //push a signaled function
259 //every time this executes, push x to list
260 let my_vec = Arc::new(Mutex::new(Vec::new()));
261 let shared_vec = my_vec.clone();
262 target.lock().unwrap().add_trigger(Signal::new(
263 Box::new(move |&x| {
264 println!("Clock: {:?}", x);
265 shared_vec.lock().unwrap().push(x);
266 })
267 ));
268
269 //we want to poll for target
270 rloop.add_pollable(target.clone());
271
272 // This is an async runtime loop.
273 let runtime_thread = thread::spawn(move || {
274 if let Result::Err(s) = rloop.run() {
275 panic!(s); }
276 });
277
278 // We want the runtime loop to run for N seconds - then return and print the polled value.
279 thread::sleep(time::Duration::from_millis(2000)); //let the thread run for a bit
280 cancel_flag.store(true, Ordering::Relaxed); //Cancel runtime loop!
281 if let Result::Err(s) = runtime_thread.join() {
282 panic!(s);
283 }
284 println!("Num: {}", target.lock().unwrap().data);
285 println!("Count: {}", my_vec.lock().unwrap().len());
286 assert!(my_vec.lock().unwrap().len() >= 3); //2 updates(3 nums) in 2 seconds.
287 }
288
289}