signals/lib.rs
1//! The Signals crate is an asynchronous functional-reactive-like api.
2//!
3//! # Installation
4//!
5//! Add this to cargo.toml
6//! ```text
7//! [dependencies]
8//! signals = "*"
9//! ```
10//!
11//! Add this to your crate
12//! ```text
13//! extern crate signals;
14//! ```
15//!
16//! # Simple Example
17//!
18//! ```should_panic
19//! use signals::{Signal, Emitter, AmEmitter};
20//!
21//! fn main() {
22//! // create a signal that will assert when emitted
23//! let signal = Signal::new_arc_mutex( |x: u32| Ok(x) );
24//! let listener = Signal::new_arc_mutex( |x: u32| { assert_ne!(x, 5); Ok(()) } ); //fail!
25//!
26//! // when signal is emitted, listener should execute.
27//! signal.lock().register_listener(&listener);
28//!
29//! // emit signal
30//! signal.lock().emit(5);
31//! }
32//! ```
33//!
34//! # Complex Example
35//!
36//! ```should_panic
37//! use signals::{Signal, Emitter};
38//!
39//! fn main() {
40//! // create a bunch of signals. At the last signal, we should fail.
41//! // If we do not fail, the signals did not work.
42//! let root = Signal::new_arc_mutex( |x: u32| Ok(x.to_string()) ); //convert x to string.
43//! let peek = Signal::new_arc_mutex( |x: String| { println!("Peek: {}", x); Ok(()) } );
44//! let to_i32 = Signal::new_arc_mutex( |x: String| Ok(x.parse::<i32>()?) ); //convert to integer
45//! let inc = Signal::new_arc_mutex( |x: i32| Ok(x+1) ); //increment value
46//! let fail = Signal::new_arc_mutex( |x: i32| { assert_ne!(x, 8); Ok(()) } ); //fail!
47//!
48//! //connect all signals together.
49//! root.lock().register_listener(&peek); // snoop on the value! - because we can!
50//! root.lock().register_listener(&to_i32); // parse the string to an integer
51//! to_i32.lock().register_listener(&inc); //increment the value
52//! inc.lock().register_listener(&fail); //then finally fail if the value is 8!
53//!
54//! root.lock().emit(7); //7 will increment to 8 and fail!
55//! }
56//! ```
57
58extern crate failure;
59
60//external imports
61use failure::Error;
62
63//standard imports
64use std::marker::PhantomData;
65use std::sync::{Arc, Weak, Mutex, MutexGuard};
66use std::thread;
67
68/// This is a polymorphic trait allowing multiple generic signals to be stored in a list.
69pub trait Emitter: Send {
70 type input;
71
72 /// Start running the built in callback functionality with the signal. Pass on the values to children.
73 fn emit(&mut self, Self::input);
74}
75
76// because Arc<Mutex<T>> is sloppy
77#[derive(Clone)]
78pub struct Am<T: Sized>(Arc<Mutex<T>>);
79
80pub type Wm<T> = Weak<Mutex<T>>;
81pub type AmEmitter<T> = Am<Emitter<input=T>>;
82type WmEmitter<T> = Wm<Emitter<input=T>>;
83
84impl<T: Sized> Am<T> {
85 pub fn new(data: T) -> Self {
86 Am(Arc::new(Mutex::new(data)))
87 }
88
89 pub fn lock(&self) -> MutexGuard<'_, T> {
90 self.0.lock().unwrap()
91 }
92
93 pub fn clone(&self) -> Self {
94 Am(self.0.clone())
95 }
96}
97
98/// When creating a Signal, This trait represents the closure Fn allowed.
99pub trait SigFn<I, O>: Fn(I) -> Result<O, Error> {}
100impl<F, I, O> SigFn<I, O> for F where F: Fn(I) -> Result<O, Error> {}
101
102impl<I,O,F> Emitter for Signal<I,O,F>
103 where F: 'static + SigFn<I, O> + Send + Sync,
104 O: 'static + PartialEq + Send + Sync + Clone,
105 I: 'static + Send + Sync
106{
107 type input = I;
108
109 /// Run closure implemented for signal and pass on the results to children.
110 fn emit(&mut self, data: Self::input) {
111 let output = (self.func)(data);
112
113 match output {
114 Ok(output) => {
115 // Exit out of loop if the output didn't change.
116 if let Some(ref out) = self.output {
117 if *out == output {
118 return;
119 }
120 }
121
122 // There are no no errors, emit signals for all children.
123 self.output = Some(output);
124 self.emit_children();
125 },
126 Err(e) => println!("Error: {:?}", e),
127 };
128 }
129}
130
131/// Signals are the bread and butter of the crate. A signal can trigger other signals whose input is the same output as the original signal. Signals support both threaded and non-threaded children.
132pub struct Signal<I, O, F>
133 where F: SigFn<I, O>
134{
135 input: PhantomData<I>,
136 output: Option<O>,
137 func: F,
138
139 listeners: Vec< WmEmitter<O> >,
140 threaded_listeners: Vec< WmEmitter<O> >,
141}
142
143impl<I,O,F> Signal<I,O,F>
144 where F: 'static + SigFn<I, O> + Send + Sync,
145 O: 'static + Send + Sync + Clone,
146 I: 'static + Send + Sync,
147{
148 fn new_signal(f: F) -> Signal<I, O, impl SigFn<I, O>> {
149 Signal {
150 input: PhantomData,
151 output: None,
152 func: move |i: _| f(i),
153 listeners: Vec::new(),
154 threaded_listeners: Vec::new(),
155 }
156 }
157
158 /// Create a thread-safe parent signal. Note that the return function is Arc<Mutex<Signal<>>>
159 pub fn new_arc_mutex(f: F) -> Am<Signal<I, O, impl SigFn<I, O>>> {
160 Am::new(Signal::new_signal(f))
161 }
162
163 /// Upgrade all weak emitters, and call emit(...)
164 fn emit_children(&mut self) {
165 //emit instant listeners
166 for signal in self.listeners.iter() {
167 let output = self.output.clone();
168 Self::emit_child(signal.clone(), output);
169 }
170
171 //emit threaded listeners
172 for signal in self.threaded_listeners.iter() {
173 let signal = signal.clone();
174 let output = self.output.clone();
175 thread::spawn( move || Self::emit_child(signal, output) );
176 }
177 }
178
179 fn emit_child(signal: WmEmitter<O>, output: Option<O>) {
180 if let Some(signal) = signal.upgrade() {
181 if let Some(ref output) = output {
182 let output = output.clone();
183 if let Ok(mut signal) = signal.lock() {
184 signal.emit(output);
185 }
186 }
187 }
188 }
189
190 /// This method is a helper for Signal::new(f) and register_listener(...)
191 pub fn create_listener<Q, G>(&mut self, f: G) -> Am<Signal<O, Q, impl SigFn<O,Q>>>
192 where G: 'static + SigFn<O,Q> + Send + Sync,
193 Q: 'static + PartialEq + Send + Sync + Clone,
194 O: 'static
195 {
196 let ret = Signal::new_arc_mutex(f);
197 self.register_listener(&ret);
198 ret
199 }
200
201 /// This method is a helper for Signal::new(f) and register_threaded_listener(...)
202 pub fn create_threaded_listener<Q, G>(&mut self, f: G) -> Am<Signal<O, Q, impl SigFn<O,Q>>>
203 where G: 'static + SigFn<O,Q> + Send + Sync,
204 Q: 'static + PartialEq + Send + Sync + Clone,
205 O: 'static
206 {
207 let ret = Signal::new_arc_mutex(f);
208 self.register_threaded_listener(&ret);
209 ret
210 }
211
212 /// Register a child listener that will execute in the same thread as Self.
213 pub fn register_listener<E>(&mut self, strong: &Am<E>)
214 where E: 'static + Emitter<input=O>
215 {
216 let weak = Arc::downgrade(&strong.0);
217 self.listeners.push(weak);
218 }
219
220 /// Register a child listener that will run on its own thread.
221 pub fn register_threaded_listener<E>(&mut self, strong: &Am<E>)
222 where E: 'static + Emitter<input=O>
223 {
224 let weak = Arc::downgrade(&strong.0);
225 self.threaded_listeners.push(weak);
226 }
227
228 /// Get the last result of this signal... if it exists
229 pub fn get(&self) -> &Option<O> { &self.output }
230}