use error::*;
use futures::{
Future,
Stream,
future
};
use futures::sync::mpsc::{
UnboundedSender,
unbounded as unbounded_channel
};
use futures::sync::oneshot::{
Sender as OneshotSender
};
use std::sync::Arc;
use parking_lot::RwLock;
use std::ops::{
Deref,
DerefMut
};
use super::CancelHookFt;
use thread_id;
use uuid::Uuid;
pub type CancelSender = UnboundedSender<bool>;
pub type ExitSender<T: Send + 'static> = OneshotSender<Option<T>>;
pub type InterruptSender = UnboundedSender<bool>;
pub fn future_error<T: 'static, E: 'static>(err: E) -> Box<Future<Item=T, Error=E>> {
Box::new(future::err(err))
}
pub fn future_ok<T: 'static, E: 'static>(d: T) -> Box<Future<Item=T, Error=E>> {
Box::new(future::ok(d))
}
pub fn uuid_v4() -> String {
Uuid::new_v4().hyphenated().to_string()
}
pub fn interruptible_future<T: Send + 'static>(ft: Box<Future<Item=T, Error=()>>, on_cancel: Arc<RwLock<Option<CancelHookFt<T>>>>)
-> (InterruptSender, Box<Future<Item=Option<T>, Error=()>>)
{
let (tx, rx) = unbounded_channel();
let out_tx = tx.clone();
let ft = Box::new(ft.then(move |result| {
let _ = tx.unbounded_send(false);
match result {
Ok(t) => Ok(Some(t)),
Err(_) => Err::<_, Option<T>>(None)
}
})
.join(rx.into_future().then(move |res| {
let was_canceled = match res {
Ok((b, _)) => match b {
Some(b) => b,
None => false
},
Err(_) => return future_ok(None)
};
if was_canceled {
let on_cancel = {
let mut cancel_guard = on_cancel.write();
cancel_guard.deref_mut().take()
};
if let Some(ft) = on_cancel {
Box::new(ft.then(|result| {
match result {
Ok(t) => future_error(Some(t)),
Err(_) => future_error(None)
}
}))
}else{
future_error(None)
}
}else{
future_ok(None)
}
}))
.then(|result: Result<(Option<T>, Option<T>), Option<T>>| {
match result {
Ok((ft_res, _)) => Ok(ft_res),
Err(t) => Ok(t)
}
}));
(out_tx, ft)
}
pub fn is_initialized(id: &Arc<RwLock<Option<usize>>>) -> bool {
id.read().deref().is_some()
}
pub fn check_initialized(id: &Arc<RwLock<Option<usize>>>) -> Result<(), WomboError> {
if is_initialized(id) {
Ok(())
}else{
Err(WomboError::new(
WomboErrorKind::NotInitialized, "Wombo not initialized."
))
}
}
pub fn check_not_initialized(id: &Arc<RwLock<Option<usize>>>) -> Result<(), WomboError> {
if !is_initialized(id) {
Ok(())
}else{
Err(WomboError::new(
WomboErrorKind::NotInitialized, "Wombo already initialized."
))
}
}
pub fn get_thread_id(id: &Arc<RwLock<Option<usize>>>) -> Option<usize> {
let id_guard = id.read();
id_guard.deref().clone()
}
pub fn set_thread_id(id: &Arc<RwLock<Option<usize>>>) -> usize {
let t_id = thread_id::get();
let mut id_guard = id.write();
let id_ref = id_guard.deref_mut();
*id_ref = Some(t_id);
t_id
}
pub fn clear_thread_id(id: &Arc<RwLock<Option<usize>>>) {
let mut id_guard = id.write();
let id_ref = id_guard.deref_mut();
*id_ref = None;
}
pub fn set_cancel_tx(cancel_tx: &Arc<RwLock<Option<CancelSender>>>, tx: CancelSender) {
let mut cancel_tx_guard = cancel_tx.write();
let cancel_tx_ref = cancel_tx_guard.deref_mut();
*cancel_tx_ref = Some(tx);
}
pub fn take_cancel_tx(cancel_tx: &Arc<RwLock<Option<CancelSender>>>) -> Option<CancelSender> {
let mut cancel_tx_guard = cancel_tx.write();
cancel_tx_guard.deref_mut().take()
}
pub fn set_exit_tx<T: Send + 'static>(exit_tx: &Arc<RwLock<Option<ExitSender<T>>>>, tx: ExitSender<T>) {
let mut exit_tx_guard = exit_tx.write();
let exit_tx_ref = exit_tx_guard.deref_mut();
*exit_tx_ref = Some(tx);
}
pub fn take_exit_tx<T: Send + 'static>(exit_tx: &Arc<RwLock<Option<ExitSender<T>>>>) -> Option<ExitSender<T>> {
let mut exit_tx_guard = exit_tx.write();
exit_tx_guard.deref_mut().take()
}
#[cfg(test)]
mod tests {
#![allow(unused_imports)]
use tokio_core::reactor::{
Core,
Handle
};
use futures::lazy;
use super::*;
use tokio_timer::Timer;
use std::time::Duration;
fn fake_callback_ft<T: 'static>(result: T) -> Box<Future<Item=T, Error=()>> {
Box::new(future::ok::<T, ()>(result))
}
#[test]
fn should_interrupt_timer_without_callback() {
let mut core = Core::new().unwrap();
let callbacks = Arc::new(RwLock::new(None));
let timer = Timer::default();
let sleep_dur = Duration::from_millis(10000);
let int_dur = Duration::from_millis(100);
let timer_ft = Box::new(timer.sleep(sleep_dur)
.map_err(|_| ())
.map(|_| 1));
let (interrupt_tx, test_ft) = interruptible_future(timer_ft, callbacks);
let interrupt_ft = timer.sleep(int_dur).map_err(|_| ()).and_then(move |_| {
let _ = interrupt_tx.unbounded_send(true);
Ok::<_, ()>(())
});
match core.run(test_ft.join(interrupt_ft)) {
Ok((val, _)) => assert_eq!(val, None),
Err(e) => panic!("Error: {:?}", e)
}
}
#[test]
fn should_interrupt_timer_with_callback() {
let mut core = Core::new().unwrap();
let callback_ft = fake_callback_ft(2);
let callbacks = Arc::new(RwLock::new(Some(callback_ft)));
let timer = Timer::default();
let sleep_dur = Duration::from_millis(10000);
let int_dur = Duration::from_millis(100);
let timer_ft = Box::new(timer.sleep(sleep_dur)
.map_err(|_| ())
.map(|_| 1));
let (interrupt_tx, test_ft) = interruptible_future(timer_ft, callbacks);
let interrupt_ft = timer.sleep(int_dur).map_err(|_| ()).and_then(move |_| {
let _ = interrupt_tx.unbounded_send(true);
Ok::<_, ()>(())
});
match core.run(test_ft.join(interrupt_ft)) {
Ok((val, _)) => assert_eq!(val, Some(2)),
Err(e) => panic!("Error: {:?}", e)
}
}
}