extern crate num_cpus;
use std::thread;
use std::sync::{mpsc, Arc, Mutex, Condvar};
pub enum ControlFlow {
Series,
Parallel,
ParallelLimit(usize),
ParallelCPUS
}
pub struct Deferred<T,E> {
starter : Arc<(Mutex<bool>, Condvar)>,
receiver : mpsc::Receiver<Result<T,E>>
}
impl<T,E> Deferred<T,E> where T: Send + 'static , E: Send + 'static {
pub fn new<F>(f:F) -> Deferred<T,E> where F: Send + 'static + FnOnce() -> Result<T,E> {
let (tx,rx) = mpsc::channel();
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair_c = pair.clone();
thread::spawn(move|| {
let &(ref lock, ref cvar) = &*pair_c;
let mut started = lock.lock().unwrap();
while !*started { started = cvar.wait(started).unwrap(); }
tx.send(f())
});
Deferred {
starter : pair,
receiver : rx
}
}
pub fn sync(self) -> Result<T,E> {
self.to_promise().sync()
}
pub fn then<TT,EE,FT,FE>(self,ft:FT,fe:FE) -> Deferred<TT,EE>
where TT: Send + 'static, EE: Send + 'static,
FT: Send + 'static + FnOnce(T) -> Result<TT,EE>,
FE: Send + 'static + FnOnce(E) -> Result<TT,EE> {
let (tx,rx) = mpsc::channel();
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair_c = pair.clone();
thread::spawn(move|| {
let &(ref lock, ref cvar) = &*pair_c;
let mut started = lock.lock().unwrap();
while !*started { started = cvar.wait(started).unwrap(); }
self.unlock();
tx.send(match self.receiver.recv().unwrap() {
Ok(t) => ft(t),
Err(e) => fe(e),
})
});
Deferred::<TT,EE> {
starter : pair,
receiver : rx
}
}
pub fn chain<TT,EE,FF>(self, f:FF) -> Deferred<TT,EE>
where TT: Send + 'static, EE : Send + 'static,
FF: Send + 'static + FnOnce(Result<T,E>) -> Result<TT,EE> {
let (tx,rx) = mpsc::channel();
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair_c = pair.clone();
thread::spawn(move|| {
let &(ref lock, ref cvar) = &*pair_c;
let mut started = lock.lock().unwrap();
while !*started { started = cvar.wait(started).unwrap(); }
self.unlock();
tx.send(f(self.receiver.recv().unwrap()))
});
Deferred::<TT,EE> {
starter : pair,
receiver : rx
}
}
pub fn success<TT,F>(self,f:F) -> Deferred<TT,E> where F: Send + 'static + FnOnce(T) -> Result<TT,E>, TT: Send + 'static {
let (tx,rx) = mpsc::channel();
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair_c = pair.clone();
thread::spawn(move|| {
let &(ref lock, ref cvar) = &*pair_c;
let mut started = lock.lock().unwrap();
while !*started { started = cvar.wait(started).unwrap(); }
self.unlock();
tx.send(match self.receiver.recv().unwrap() {
Ok(t) => f(t),
Err(e) => Err(e),
})
});
Deferred::<TT,E> {
starter : pair,
receiver : rx
}
}
pub fn fail<F>(self,f:F) -> Deferred<T,E> where F: Send + 'static + FnOnce(E) -> Result<T,E> {
let (tx,rx) = mpsc::channel();
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair_c = pair.clone();
thread::spawn(move|| {
let &(ref lock, ref cvar) = &*pair_c;
let mut started = lock.lock().unwrap();
while !*started { started = cvar.wait(started).unwrap(); }
self.unlock();
tx.send(match self.receiver.recv().unwrap() {
Ok(t) => Ok(t),
Err(e) => f(e),
})
});
Deferred::<T,E> {
starter : pair,
receiver : rx
}
}
pub fn finally<F>(self, f:F) where F: Send + 'static + FnOnce(Result<T,E>) {
self.to_promise().finally(f);
}
pub fn finally_sync<F>(self, f:F) where F: Send + 'static + FnOnce(Result<T,E>) {
self.to_promise().finally_sync(f);
}
pub fn to_promise(self) -> Promise<T,E> {
self.unlock();
Promise { receiver: self.receiver }
}
pub fn vec_to_promise(vector:Vec<Deferred<T,E>>, control: ControlFlow) -> Promise<Vec<T>,Vec<Result<T,E>>> {
match control {
ControlFlow::Series => Deferred::process_series(vector, 0),
ControlFlow::Parallel => Deferred::process_parallel(vector, 0, 0, true),
ControlFlow::ParallelLimit(limit) => Deferred::process_parallel(vector, limit, 0, true),
ControlFlow::ParallelCPUS => Deferred::process_parallel(vector, num_cpus::get(), 0, true)
}
}
pub fn first_to_promise(num_first:usize, wait:bool, vector:Vec<Deferred<T,E>>, control: ControlFlow) -> Promise<Vec<T>,Vec<Result<T,E>>> {
match control {
ControlFlow::Series => Deferred::process_series(vector, num_first),
ControlFlow::Parallel => Deferred::process_parallel(vector, 0, num_first, wait),
ControlFlow::ParallelLimit(limit) => Deferred::process_parallel(vector, limit, num_first, wait),
ControlFlow::ParallelCPUS => Deferred::process_parallel(vector, num_cpus::get(), num_first, wait)
}
}
fn process_series(vector:Vec<Deferred<T,E>>, num_first:usize) -> Promise<Vec<T>,Vec<Result<T,E>>> {
let (tx,rx) = mpsc::channel();
thread::spawn(move || {
let mut results:Vec<T> = Vec::new();
for defer in vector {
defer.unlock();
match defer.receiver.recv().unwrap() {
Ok(t) => {
results.push(t);
if num_first > 0 && results.len() >= num_first { break }
},
Err(e) => {
let mut results_error:Vec<Result<T,E>> = Vec::new();
for t in results { results_error.push(Ok(t)) }
results_error.push(Err(e));
let res:Result<Vec<T>,Vec<Result<T,E>>> = Err(results_error);
tx.send(res).unwrap();
return
}
}
}
let ok_results:Result<Vec<T>, Vec<Result<T,E>>> = Ok(results);
tx.send(ok_results).unwrap()
});
Promise::<Vec<T>, Vec<Result<T,E>>> { receiver: rx }
}
fn process_parallel(vector:Vec<Deferred<T,E>>, limit:usize, num_first:usize, wait:bool) -> Promise<Vec<T>,Vec<Result<T,E>>> {
let (tx,rx) = mpsc::channel();
thread::spawn(move || {
let mut num_results_received = 0;
let mut results:Vec<Option<Result<T,E>>> = vec![];
for _ in 0..vector.len() { results.push(None); }
let mut it = vector.into_iter();
let (txinter, rxinter) = mpsc::channel();
let mut id_process = 0;
let mut active_process = 0;
let mut is_error = false;
loop {
if active_process > 0 {
match rxinter.recv() {
Ok(r) => {
let finished:(usize,Result<T,E>) = r;
if finished.1.is_err() { is_error = true } else { num_results_received += 1 }
results[finished.0] = Some(finished.1);
active_process -= 1;
},
Err(_) => break,
}
}
if !wait && num_first > 0 && num_results_received >= num_first { break }
loop {
if num_first > 0 && num_results_received >= num_first { break }
match it.next() {
Some(defer) => {
active_process += 1;
defer.unlock();
let txinter_cloned = txinter.clone();
thread::spawn(move || {
match defer.receiver.recv() {
Ok(r) => { &txinter_cloned.send((id_process, r)); } ,
Err(_) => (),
}
});
id_process += 1;
},
None => break
}
if limit > 0 && active_process >= limit { break }
}
if active_process == 0 { break }
}
if num_first > 0 { is_error = num_results_received < num_first }
let ok_results:Result<Vec<T>, Vec<Result<T,E>>> = match is_error {
false => {
let mut v:Vec<T> = Vec::new();
for r in results {
if r.is_none() { continue }
v.push(match r.unwrap() { Ok(t) => t, Err(_) => continue })
}
Ok(v)
},
true => {
let mut v:Vec<Result<T,E>> = Vec::new();
for r in results {
if r.is_none() { continue }
v.push(r.unwrap())
}
Err(v)
}
};
tx.send(ok_results).unwrap()
});
Promise::<Vec<T>, Vec<Result<T,E>>> { receiver: rx }
}
fn unlock(&self) {
let &(ref lock, ref cvar) = &*self.starter;
let mut started = lock.lock().unwrap();
*started = true;
cvar.notify_one();
}
}
pub struct Promise<T,E> {
receiver : mpsc::Receiver<Result<T,E>>
}
impl<T,E> Promise<T,E> where T: Send + 'static , E: Send + 'static {
pub fn new<F>(f:F) -> Promise<T,E> where F: Send + 'static + FnOnce() -> Result<T,E> {
let (tx,rx) = mpsc::channel();
thread::spawn(move || { tx.send(f()) });
Promise::<T,E> { receiver: rx }
}
pub fn sync(self) -> Result<T,E> {
self.receiver.recv().unwrap()
}
pub fn all(vector:Vec<Promise<T,E>>) -> Promise<Vec<T>, Vec<Result<T,E>>> {
let (tx,rx) = mpsc::channel();
thread::spawn(move || {
let results:Vec<Result<T,E>> = vector.iter().map(|p| p.receiver.recv().unwrap() ).collect();
let is_error = results.iter().find(|r| r.is_err()).is_some();
let ok_results:Result<Vec<T>, Vec<Result<T,E>>> = match is_error {
false => {
let mut v:Vec<T> = Vec::new();
for r in results { v.push(match r { Ok(t) => t, Err(_) => unreachable!() })}
Ok(v)
},
true => Err(results)
};
tx.send(ok_results).unwrap()
});
Promise::<Vec<T>, Vec<Result<T,E>>> { receiver: rx }
}
pub fn then<TT,EE,FT,FE>(self,ft:FT,fe:FE) -> Promise<TT,EE>
where TT: Send + 'static, EE: Send + 'static,
FT: Send + 'static + FnOnce(T) -> Result<TT,EE>,
FE: Send + 'static + FnOnce(E) -> Result<TT,EE> {
let (tx,rx) = mpsc::channel();
thread::spawn(move || {
let res = self.receiver.recv().unwrap();
match res {
Ok(t) => tx.send(ft(t)),
Err(e) => tx.send(fe(e))
}
});
Promise::<TT,EE> { receiver: rx }
}
pub fn chain<TT,EE,F>(self, f:F) -> Promise<TT,EE>
where TT: Send + 'static, EE: Send + 'static,
F: Send + 'static + FnOnce(Result<T,E>) -> Result<TT,EE> {
let (tx,rx) = mpsc::channel();
thread::spawn(move || {
let res = self.receiver.recv().unwrap();
tx.send(f(res))
});
Promise::<TT,EE> { receiver: rx }
}
pub fn success<TT,F>(self,f:F) -> Promise<TT,E> where F: Send + 'static + FnOnce(T) -> Result<TT,E>, TT: Send + 'static {
let (tx,rx) = mpsc::channel();
thread::spawn(move || {
let res = self.receiver.recv().unwrap();
match res {
Ok(t) => tx.send(f(t)),
Err(e) => tx.send(Err(e))
}
});
Promise::<TT,E> { receiver: rx }
}
pub fn fail<F>(self,f:F) -> Promise<T,E> where F: Send + 'static + FnOnce(E) -> Result<T,E> {
let (tx,rx) = mpsc::channel();
thread::spawn(move || {
let res = self.receiver.recv().unwrap();
match res {
Ok(t) => tx.send(Ok(t)),
Err(e) => tx.send(f(e))
}
});
Promise::<T,E> { receiver: rx }
}
pub fn finally<F>(self, f:F) where F: Send + 'static + FnOnce(Result<T,E>) {
thread::spawn(move || {
f(self.receiver.recv().unwrap());
});
}
pub fn finally_sync<F>(self, f:F) where F: Send + 'static + FnOnce(Result<T,E>) {
f(self.sync());
}
}
pub enum Emit<Ev> {
Event(Ev),
Continue,
Stop
}
pub struct EventLoopHandler<Ev> {
tx : mpsc::Sender<Option<Ev>>,
finisher : Arc<(Mutex<bool>, Condvar)>,
finished : Arc<Mutex<bool>>,
}
impl<Ev> Clone for EventLoopHandler<Ev> {
fn clone(&self) -> EventLoopHandler<Ev> {
EventLoopHandler {
tx : self.tx.clone(),
finisher : self.finisher.clone(),
finished : self.finished.clone(),
}
}
}
impl<Ev> EventLoopHandler<Ev> where Ev: Send + 'static {
pub fn emit(&self, event:Ev) -> Result<(), Ev>{
if self.is_active() {
match self.tx.send(Some(event)) {
Ok(_) => Ok(()),
Err(e) => Err(e.0.unwrap())
}
} else {
Err(event)
}
}
pub fn emit_until<F>(&self, f:F) where F : Send + 'static + Fn() -> Emit<Ev> {
let handler = self.clone();
thread::spawn(move || {
loop {
match f() {
Emit::Event(e) => match handler.emit(e) {
Ok(_) => (),
Err(_) => break,
},
Emit::Continue => continue,
Emit::Stop => break,
};
}
});
}
pub fn is_active(&self) -> bool {
let lock_finished = self.finished.lock().unwrap();
!*lock_finished
}
pub fn finish(&self) {
let mut lock_finished = self.finished.lock().unwrap();
if !*lock_finished {
*lock_finished = true;
let _ = self.tx.send(None);
}
}
pub fn finish_in_ms(&self, duration_ms:u32) {
let handler = self.clone();
thread::spawn(move|| {
thread::sleep_ms(duration_ms);
let _ = handler.finish();
});
}
}
pub struct EventLoop<Ev> {
handler : EventLoopHandler<Ev>,
receiver : mpsc::Receiver<Ev>,
}
impl<Ev> EventLoop<Ev> where Ev: Send + 'static {
pub fn new() -> EventLoop<Ev> {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair_cloned = pair.clone();
let (tx,rx) = mpsc::channel();
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
loop {
match rx.recv() {
Ok(t) => match t {
Some(v) => sender.send(v).unwrap(),
None => break
},
Err(_) => break
};
}
let &(ref lock, ref cvar) = &*pair_cloned;
let mut finished = lock.lock().unwrap();
*finished = true;
cvar.notify_one();
});
EventLoop {
handler : EventLoopHandler {
tx : tx,
finisher : pair,
finished : Arc::new(Mutex::new(false)),
},
receiver : receiver,
}
}
pub fn on<F>(f:F) -> EventLoop<Ev> where F : Send + 'static + Fn(Ev) {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair_cloned = pair.clone();
let (tx,rx) = mpsc::channel();
let (_, receiver) = mpsc::channel();
thread::spawn(move || {
loop {
match rx.recv() {
Ok(t) => match t {
Some(v) => f(v) ,
None => break
},
Err(_) => break
};
}
let &(ref lock, ref cvar) = &*pair_cloned;
let mut finished = lock.lock().unwrap();
*finished = true;
cvar.notify_one();
});
EventLoop {
handler : EventLoopHandler {
tx : tx,
finisher : pair,
finished : Arc::new(Mutex::new(false)),
},
receiver : receiver,
}
}
pub fn on_managed<F>(f:F) -> EventLoop<Ev> where F : Send + 'static + Fn(Ev) -> Emit<Ev> {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair_cloned = pair.clone();
let (tx,rx) = mpsc::channel();
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
loop {
let rec = match rx.recv() {
Ok(t) => match t {
Some(v) => v ,
None => break
},
Err(_) => break
};
match f(rec) {
Emit::Event(e) => sender.send(e).unwrap(),
Emit::Continue => (),
Emit::Stop => break,
}
}
let &(ref lock, ref cvar) = &*pair_cloned;
let mut finished = lock.lock().unwrap();
*finished = true;
cvar.notify_one();
});
EventLoop {
handler : EventLoopHandler {
tx : tx,
finisher : pair,
finished : Arc::new(Mutex::new(false)),
},
receiver : receiver,
}
}
pub fn emit(&self, event:Ev) -> Result<(), Ev>{
self.handler.emit(event)
}
pub fn emit_until<F>(&self, f:F) where F : Send + 'static + Fn() -> Emit<Ev> {
self.handler.emit_until(f);
}
pub fn finish(self) -> EventLoop<Ev> {
self.handler.finish();
self
}
pub fn finish_in_ms(self, duration_ms:u32) -> EventLoop<Ev> {
self.handler.finish_in_ms(duration_ms);
self
}
pub fn is_active(&self) -> bool {
self.handler.is_active()
}
pub fn get_handler(&self) -> EventLoopHandler<Ev> {
self.handler.clone()
}
pub fn to_promise(self) -> Promise<Vec<Ev>,()> {
let handler = self.get_handler();
Promise::new(move || {
let &(ref lock, ref cvar) = &*handler.finisher;
let mut finished = lock.lock().unwrap();
while !*finished { finished = cvar.wait(finished).unwrap(); }
let mut vec = Vec::new();
loop {
match self.receiver.recv() {
Ok(val) => vec.push(val),
Err(_) => break,
}
}
Ok(vec)
})
}
}