use crate::ops::SharedOp;
use crate::prelude::*;
use futures::{
executor::ThreadPool, future::Future, future::FutureExt, task::SpawnExt,
};
use std::sync::Mutex;
lazy_static! {
pub static ref DEFAULT_RUNTIME: Mutex<ThreadPool> =
Mutex::new(ThreadPool::new().unwrap());
}
pub fn from_future<O, U, F, Item>(
f: F,
) -> ops::SharedOp<Observable<impl FnOnce(Subscriber<O, U>) + Clone, Item, ()>>
where
Item: 'static,
O: Observer<Item, ()> + Send + 'static,
U: SubscriptionLike + Send + 'static,
F: Future<Output = Item> + Send + Clone + Sync + 'static,
{
Observable::new(move |mut subscriber| {
let fmapped = f.map(move |v| {
if !subscriber.is_closed() {
subscriber.next(v);
subscriber.complete();
}
});
DEFAULT_RUNTIME.lock().unwrap().spawn(fmapped).unwrap();
})
.to_shared()
}
pub fn from_future_with_err<O, U, F, Item, Error>(
f: F,
) -> SharedOp<Observable<impl FnOnce(Subscriber<O, U>) + Clone, Item, Error>>
where
Error: 'static,
Item: 'static,
O: Observer<Item, Error> + Send + 'static,
U: SubscriptionLike + Send + 'static,
F: Future + Send + Clone + Sync + 'static,
<F as Future>::Output: Into<Result<Item, Error>>,
{
Observable::new(move |mut subscriber| {
let fmapped = f.map(move |v| {
if !subscriber.is_closed() {
match v.into() {
Ok(item) => {
subscriber.next(item);
subscriber.complete();
}
Err(err) => subscriber.error(err),
};
}
});
DEFAULT_RUNTIME.lock().unwrap().spawn(fmapped).unwrap();
})
.to_shared()
}
#[test]
fn smoke() {
use futures::future;
use std::sync::Arc;
let res = Arc::new(Mutex::new(0));
let c_res = res.clone();
{
from_future_with_err(future::ok(1)).subscribe(move |v| {
*res.lock().unwrap() = v;
});
std::thread::sleep(std::time::Duration::from_millis(10));
assert_eq!(*c_res.lock().unwrap(), 1);
}
let res = c_res.clone();
from_future(future::ready(2)).subscribe(move |v| {
*res.lock().unwrap() = v;
});
std::thread::sleep(std::time::Duration::from_millis(10));
assert_eq!(*c_res.lock().unwrap(), 2);
}