1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
//! The Signals crate is an asynchronous functional-reactive-like api.
//!
//! # Installation
//!
//! Add this to cargo.toml
//! ```text
//! [dependencies]
//! signals = "*"
//! ```
//!
//! Add this to your crate
//! ```text
//! extern crate signals;
//! ```
//!
//! # Simple Example
//!
//! ```should_panic
//! use signals::{Signal, Emitter, AmEmitter};
//!
//! fn main() {
//! // create a signal that will assert when emitted
//! let signal = Signal::new_arc_mutex( |x: u32| Ok(x) );
//! let listener = Signal::new_arc_mutex( |x: u32| { assert_ne!(x, 5); Ok(()) } ); //fail!
//!
//! // when signal is emitted, listener should execute.
//! signal.lock().register_listener(&listener);
//!
//! // emit signal
//! signal.lock().emit(5);
//! }
//! ```
//!
//! # Complex Example
//!
//! ```should_panic
//! use signals::{Signal, Emitter};
//!
//! fn main() {
//! // create a bunch of signals. At the last signal, we should fail.
//! // If we do not fail, the signals did not work.
//! let root = Signal::new_arc_mutex( |x: u32| Ok(x.to_string()) ); //convert x to string.
//! let peek = Signal::new_arc_mutex( |x: String| { println!("Peek: {}", x); Ok(()) } );
//! let to_i32 = Signal::new_arc_mutex( |x: String| Ok(x.parse::<i32>()?) ); //convert to integer
//! let inc = Signal::new_arc_mutex( |x: i32| Ok(x+1) ); //increment value
//! let fail = Signal::new_arc_mutex( |x: i32| { assert_ne!(x, 8); Ok(()) } ); //fail!
//!
//! //connect all signals together.
//! root.lock().register_listener(&peek); // snoop on the value! - because we can!
//! root.lock().register_listener(&to_i32); // parse the string to an integer
//! to_i32.lock().register_listener(&inc); //increment the value
//! inc.lock().register_listener(&fail); //then finally fail if the value is 8!
//!
//! root.lock().emit(7); //7 will increment to 8 and fail!
//! }
//! ```
extern crate failure;
//external imports
use failure::Error;
//standard imports
use std::marker::PhantomData;
use std::sync::{Arc, Weak, Mutex, MutexGuard};
use std::thread;
/// This is a polymorphic trait allowing multiple generic signals to be stored in a list.
pub trait Emitter: Send {
type input;
/// Start running the built in callback functionality with the signal. Pass on the values to children.
fn emit(&mut self, Self::input);
}
// because Arc<Mutex<T>> is sloppy
#[derive(Clone)]
pub struct Am<T: Sized>(Arc<Mutex<T>>);
pub type Wm<T> = Weak<Mutex<T>>;
pub type AmEmitter<T> = Am<Emitter<input=T>>;
type WmEmitter<T> = Wm<Emitter<input=T>>;
impl<T: Sized> Am<T> {
pub fn new(data: T) -> Self {
Am(Arc::new(Mutex::new(data)))
}
pub fn lock(&self) -> MutexGuard<'_, T> {
self.0.lock().unwrap()
}
pub fn clone(&self) -> Self {
Am(self.0.clone())
}
}
/// When creating a Signal, This trait represents the closure Fn allowed.
pub trait SigFn<I, O>: Fn(I) -> Result<O, Error> {}
impl<F, I, O> SigFn<I, O> for F where F: Fn(I) -> Result<O, Error> {}
impl<I,O,F> Emitter for Signal<I,O,F>
where F: 'static + SigFn<I, O> + Send + Sync,
O: 'static + PartialEq + Send + Sync + Clone,
I: 'static + Send + Sync
{
type input = I;
/// Run closure implemented for signal and pass on the results to children.
fn emit(&mut self, data: Self::input) {
let output = (self.func)(data);
match output {
Ok(output) => {
// Exit out of loop if the output didn't change.
if let Some(ref out) = self.output {
if *out == output {
return;
}
}
// There are no no errors, emit signals for all children.
self.output = Some(output);
self.emit_children();
},
Err(e) => println!("Error: {:?}", e),
};
}
}
/// Signals are the bread and butter of the crate. A signal can trigger other signals whose input is the same output as the original signal. Signals support both threaded and non-threaded children.
pub struct Signal<I, O, F>
where F: SigFn<I, O>
{
input: PhantomData<I>,
output: Option<O>,
func: F,
listeners: Vec< WmEmitter<O> >,
threaded_listeners: Vec< WmEmitter<O> >,
}
impl<I,O,F> Signal<I,O,F>
where F: 'static + SigFn<I, O> + Send + Sync,
O: 'static + Send + Sync + Clone,
I: 'static + Send + Sync,
{
fn new_signal(f: F) -> Signal<I, O, impl SigFn<I, O>> {
Signal {
input: PhantomData,
output: None,
func: move |i: _| f(i),
listeners: Vec::new(),
threaded_listeners: Vec::new(),
}
}
/// Create a thread-safe parent signal. Note that the return function is Arc<Mutex<Signal<>>>
pub fn new_arc_mutex(f: F) -> Am<Signal<I, O, impl SigFn<I, O>>> {
Am::new(Signal::new_signal(f))
}
/// Upgrade all weak emitters, and call emit(...)
fn emit_children(&mut self) {
//emit instant listeners
for signal in self.listeners.iter() {
let output = self.output.clone();
Self::emit_child(signal.clone(), output);
}
//emit threaded listeners
for signal in self.threaded_listeners.iter() {
let signal = signal.clone();
let output = self.output.clone();
thread::spawn( move || Self::emit_child(signal, output) );
}
}
fn emit_child(signal: WmEmitter<O>, output: Option<O>) {
if let Some(signal) = signal.upgrade() {
if let Some(ref output) = output {
let output = output.clone();
if let Ok(mut signal) = signal.lock() {
signal.emit(output);
}
}
}
}
/// This method is a helper for Signal::new(f) and register_listener(...)
pub fn create_listener<Q, G>(&mut self, f: G) -> Am<Signal<O, Q, impl SigFn<O,Q>>>
where G: 'static + SigFn<O,Q> + Send + Sync,
Q: 'static + PartialEq + Send + Sync + Clone,
O: 'static
{
let ret = Signal::new_arc_mutex(f);
self.register_listener(&ret);
ret
}
/// This method is a helper for Signal::new(f) and register_threaded_listener(...)
pub fn create_threaded_listener<Q, G>(&mut self, f: G) -> Am<Signal<O, Q, impl SigFn<O,Q>>>
where G: 'static + SigFn<O,Q> + Send + Sync,
Q: 'static + PartialEq + Send + Sync + Clone,
O: 'static
{
let ret = Signal::new_arc_mutex(f);
self.register_threaded_listener(&ret);
ret
}
/// Register a child listener that will execute in the same thread as Self.
pub fn register_listener<E>(&mut self, strong: &Am<E>)
where E: 'static + Emitter<input=O>
{
let weak = Arc::downgrade(&strong.0);
self.listeners.push(weak);
}
/// Register a child listener that will run on its own thread.
pub fn register_threaded_listener<E>(&mut self, strong: &Am<E>)
where E: 'static + Emitter<input=O>
{
let weak = Arc::downgrade(&strong.0);
self.threaded_listeners.push(weak);
}
/// Get the last result of this signal... if it exists
pub fn get(&self) -> &Option<O> { &self.output }
}