#![allow(unused_imports)]
use crate::prelude::*;
use futures::{executor::ThreadPool, future::FutureExt, task::SpawnExt};
use std::sync::Mutex;
lazy_static! {
pub static ref DEFAULT_RUNTIME: Mutex<ThreadPool> =
Mutex::new(ThreadPool::new().unwrap());
}
pub macro from_future($f:expr) {
Observable::new(move |mut subscriber| {
let f = $f.map(move |v| {
if !subscriber.is_closed() {
subscriber.next(&v);
subscriber.complete();
}
});
DEFAULT_RUNTIME.lock().unwrap().spawn(f).unwrap();
})
.to_shared()
}
pub macro from_future_with_err($f:expr) {
Observable::new(move |mut subscriber| {
let f = $f.map(move |v| {
if !subscriber.is_closed() {
match v {
Ok(ref item) => {
subscriber.next(item);
subscriber.complete();
}
Err(ref err) => subscriber.error(err),
};
}
});
DEFAULT_RUNTIME.lock().unwrap().spawn(f).unwrap();
})
.to_shared()
}
#[test]
fn smoke() {
use futures::future;
use std::sync::Arc;
let res = Arc::new(Mutex::new(0));
let c_res = res.clone();
{
from_future_with_err!(future::ok(1)).subscribe(move |v| {
*res.lock().unwrap() = *v;
});
std::thread::sleep(std::time::Duration::from_millis(10));
assert_eq!(*c_res.lock().unwrap(), 1);
}
let res = c_res.clone();
from_future!(future::ready(2)).subscribe(move |v| {
*res.lock().unwrap() = *v;
});
std::thread::sleep(std::time::Duration::from_millis(10));
assert_eq!(*c_res.lock().unwrap(), 2);
}