signals/
lib.rs

1//! The Signals crate is an asynchronous functional-reactive-like api.
2//!
3//! # Installation
4//!
5//! Add this to cargo.toml
6//! ```text
7//! [dependencies]
8//! signals = "*"
9//! ```
10//!
11//! Add this to your crate
12//! ```text
13//! extern crate signals; 
14//! ```
15//!
16//! # Simple Example
17//!
18//! ```should_panic
19//! use signals::{Signal, Emitter, AmEmitter};
20//!
21//! fn main() {
22//!     // create a signal that will assert when emitted
23//!     let signal = Signal::new_arc_mutex( |x: u32| Ok(x) );
24//!     let listener = Signal::new_arc_mutex( |x: u32| { assert_ne!(x, 5); Ok(()) } ); //fail!
25//!     
26//!     // when signal is emitted, listener should execute.
27//!     signal.lock().register_listener(&listener);
28//!
29//!     // emit signal
30//!     signal.lock().emit(5);
31//! }
32//! ```
33//!
34//! # Complex Example
35//!
36//! ```should_panic
37//! use signals::{Signal, Emitter};
38//!
39//! fn main() {
40//!     // create a bunch of signals. At the last signal, we should fail. 
41//!     // If we do not fail, the signals did not work.
42//!     let root = Signal::new_arc_mutex( |x: u32| Ok(x.to_string()) ); //convert x to string.
43//!     let peek = Signal::new_arc_mutex( |x: String| { println!("Peek: {}", x); Ok(()) } );
44//!     let to_i32 = Signal::new_arc_mutex( |x: String| Ok(x.parse::<i32>()?) ); //convert to integer
45//!     let inc = Signal::new_arc_mutex( |x: i32| Ok(x+1) ); //increment value
46//!     let fail = Signal::new_arc_mutex( |x: i32| { assert_ne!(x, 8); Ok(()) } ); //fail!
47//!     
48//!     //connect all signals together.
49//!     root.lock().register_listener(&peek); // snoop on the value! - because we can!
50//!     root.lock().register_listener(&to_i32); // parse the string to an integer
51//!     to_i32.lock().register_listener(&inc); //increment the value
52//!     inc.lock().register_listener(&fail); //then finally fail if the value is 8!
53//!     
54//!     root.lock().emit(7); //7 will increment to 8 and fail!
55//! }
56//! ```
57
58extern crate failure;
59
60//external imports
61use failure::Error;
62
63//standard imports
64use std::marker::PhantomData;
65use std::sync::{Arc, Weak, Mutex, MutexGuard};
66use std::thread;
67
68/// This is a polymorphic trait allowing multiple generic signals to be stored in a list.
69pub trait Emitter: Send {
70    type input;
71    
72    /// Start running the built in callback functionality with the signal. Pass on the values to children.
73    fn emit(&mut self, Self::input);
74}
75
76// because Arc<Mutex<T>> is sloppy
77#[derive(Clone)]
78pub struct Am<T: Sized>(Arc<Mutex<T>>);
79
80pub type Wm<T> = Weak<Mutex<T>>;
81pub type AmEmitter<T> = Am<Emitter<input=T>>;
82type WmEmitter<T> = Wm<Emitter<input=T>>;
83
84impl<T: Sized> Am<T> {
85    pub fn new(data: T) -> Self {
86        Am(Arc::new(Mutex::new(data)))
87    }
88
89    pub fn lock(&self) -> MutexGuard<'_, T> {
90        self.0.lock().unwrap()
91    }
92
93    pub fn clone(&self) -> Self {
94        Am(self.0.clone())
95    }
96}
97
98/// When creating a Signal, This trait represents the closure Fn allowed.
99pub trait SigFn<I, O>: Fn(I) -> Result<O, Error> {}
100impl<F, I, O> SigFn<I, O> for F where F: Fn(I) -> Result<O, Error> {}
101
102impl<I,O,F> Emitter for Signal<I,O,F> 
103    where F: 'static + SigFn<I, O> + Send + Sync,
104          O: 'static + PartialEq + Send + Sync + Clone,
105          I: 'static + Send + Sync
106{
107    type input = I;
108    
109    /// Run closure implemented for signal and pass on the results to children.
110    fn emit(&mut self, data: Self::input) {
111        let output = (self.func)(data);
112
113        match output {
114            Ok(output) => {
115                // Exit out of loop if the output didn't change.
116                if let Some(ref out) = self.output {
117                    if *out == output {
118                        return;
119                    }
120                }
121                
122                // There are no no errors, emit signals for all children.
123                self.output = Some(output);
124                self.emit_children();
125            },
126            Err(e) => println!("Error: {:?}", e),
127        };
128    }
129}
130
131/// Signals are the bread and butter of the crate.  A signal can trigger other signals whose input is the same output as the original signal. Signals support both threaded and non-threaded children.
132pub struct Signal<I, O, F> 
133    where F: SigFn<I, O> 
134{
135    input: PhantomData<I>,
136    output: Option<O>,
137    func: F,
138
139    listeners: Vec< WmEmitter<O> >,
140    threaded_listeners: Vec< WmEmitter<O> >,
141}
142
143impl<I,O,F> Signal<I,O,F> 
144    where F: 'static + SigFn<I, O> + Send + Sync,
145          O: 'static + Send + Sync + Clone,
146          I: 'static + Send + Sync,
147{
148    fn new_signal(f: F) -> Signal<I, O, impl SigFn<I, O>> {
149        Signal {
150            input: PhantomData,
151            output: None,
152            func: move |i: _| f(i),
153            listeners: Vec::new(),
154            threaded_listeners: Vec::new(),
155        }
156    }
157
158    /// Create a thread-safe parent signal. Note that the return function is Arc<Mutex<Signal<>>>
159    pub fn new_arc_mutex(f: F) -> Am<Signal<I, O, impl SigFn<I, O>>> {
160        Am::new(Signal::new_signal(f))
161    }
162
163    /// Upgrade all weak emitters, and call emit(...)
164    fn emit_children(&mut self) {
165        //emit instant listeners
166        for signal in self.listeners.iter() {
167            let output = self.output.clone();
168            Self::emit_child(signal.clone(), output);
169        }
170        
171        //emit threaded listeners
172        for signal in self.threaded_listeners.iter() {
173            let signal = signal.clone();
174            let output = self.output.clone();
175            thread::spawn( move || Self::emit_child(signal, output) );
176        }
177    }
178
179    fn emit_child(signal: WmEmitter<O>, output: Option<O>) {
180        if let Some(signal) = signal.upgrade() {
181            if let Some(ref output) = output {
182                let output = output.clone();
183                if let Ok(mut signal) = signal.lock() {
184                    signal.emit(output);
185                }
186            }
187        }
188    }
189
190    /// This method is a helper for Signal::new(f) and register_listener(...)
191    pub fn create_listener<Q, G>(&mut self, f: G) -> Am<Signal<O, Q, impl SigFn<O,Q>>>
192        where G: 'static + SigFn<O,Q> + Send + Sync,
193              Q: 'static + PartialEq + Send + Sync + Clone,
194              O: 'static
195    {
196        let ret = Signal::new_arc_mutex(f);
197        self.register_listener(&ret);
198        ret
199    }
200    
201    /// This method is a helper for Signal::new(f) and register_threaded_listener(...)
202    pub fn create_threaded_listener<Q, G>(&mut self, f: G) -> Am<Signal<O, Q, impl SigFn<O,Q>>>
203        where G: 'static + SigFn<O,Q> + Send + Sync,
204              Q: 'static + PartialEq + Send + Sync + Clone,
205              O: 'static
206    {
207        let ret = Signal::new_arc_mutex(f);
208        self.register_threaded_listener(&ret);
209        ret
210    }
211    
212    /// Register a child listener that will execute in the same thread as Self.
213    pub fn register_listener<E>(&mut self, strong: &Am<E>) 
214        where E: 'static + Emitter<input=O>
215    {
216        let weak = Arc::downgrade(&strong.0);
217        self.listeners.push(weak);
218    }
219    
220    /// Register a child listener that will run on its own thread.
221    pub fn register_threaded_listener<E>(&mut self, strong: &Am<E>) 
222        where E: 'static + Emitter<input=O>
223    {
224        let weak = Arc::downgrade(&strong.0);
225        self.threaded_listeners.push(weak);
226    }
227
228    /// Get the last result of this signal... if it exists
229    pub fn get(&self) -> &Option<O> { &self.output }
230}