use crate::prelude::*;
use futures::FutureExt;
use observable::of;
use std::future::Future;
pub fn from_future<F, Item, S>(
f: F,
scheduler: S,
) -> ObservableBase<FutureEmitter<F, S>>
where
F: Future<Output = Item>,
{
ObservableBase::new(FutureEmitter {
future: f,
scheduler,
})
}
#[derive(Clone)]
pub struct FutureEmitter<F, S> {
future: F,
scheduler: S,
}
impl<Item, F, S> Emitter for FutureEmitter<F, S>
where
F: Future<Output = Item>,
{
type Item = Item;
type Err = ();
}
impl<Item, F, S> SharedEmitter for FutureEmitter<F, S>
where
F: Future<Output = Item> + Send + Sync + 'static,
S: SharedScheduler,
{
fn emit<O>(self, subscriber: Subscriber<O, SharedSubscription>)
where
O: Observer<Item = Self::Item, Err = Self::Err> + Send + Sync + 'static,
{
let subscription = subscriber.subscription.clone();
let f = self
.future
.map(move |v| SharedEmitter::emit(of::OfEmitter(v), subscriber));
let (future, handle) = futures::future::abortable(f);
self.scheduler.spawn(future.map(|_| ()));
subscription.add(SpawnHandle::new(handle))
}
}
impl<Item, F, S> LocalEmitter<'static> for FutureEmitter<F, S>
where
F: Future<Output = Item> + 'static,
S: LocalScheduler,
{
fn emit<O>(self, subscriber: Subscriber<O, LocalSubscription>)
where
O: Observer<Item = Self::Item, Err = Self::Err> + 'static,
{
let subscription = subscriber.subscription.clone();
let f = self
.future
.map(move |v| LocalEmitter::emit(of::OfEmitter(v), subscriber));
let (future, handle) = futures::future::abortable(f);
self.scheduler.spawn(future.map(|_| ()));
subscription.add(SpawnHandle::new(handle))
}
}
pub fn from_future_result<F, S, Item, Err>(
future: F,
scheduler: S,
) -> ObservableBase<FutureResultEmitter<F, S, Item, Err>>
where
F: Future,
<F as Future>::Output: Into<Result<Item, Err>>,
{
ObservableBase::new(FutureResultEmitter {
future,
scheduler,
_marker: TypeHint::new(),
})
}
#[derive(Clone)]
pub struct FutureResultEmitter<F, S, Item, Err> {
future: F,
scheduler: S,
_marker: TypeHint<(Item, Err)>,
}
impl<Item, S, Err, F> Emitter for FutureResultEmitter<F, S, Item, Err>
where
F: Future,
<F as Future>::Output: Into<Result<Item, Err>>,
{
type Item = Item;
type Err = Err;
}
impl<Item, Err, S, F> SharedEmitter for FutureResultEmitter<F, S, Item, Err>
where
Item: Send + Sync + 'static,
Err: Send + Sync + 'static,
F: Future + Send + Clone + Sync + 'static,
<F as Future>::Output: Into<Result<Item, Err>>,
S: SharedScheduler,
{
fn emit<O>(self, subscriber: Subscriber<O, SharedSubscription>)
where
O: Observer<Item = Self::Item, Err = Self::Err> + Send + Sync + 'static,
{
let subscription = subscriber.subscription.clone();
let f = self.future.map(move |v| {
SharedEmitter::emit(of::ResultEmitter(v.into()), subscriber)
});
let (future, handle) = futures::future::abortable(f);
self.scheduler.spawn(future.map(|_| ()));
subscription.add(SpawnHandle::new(handle))
}
}
impl<Item, Err, S, F> LocalEmitter<'static>
for FutureResultEmitter<F, S, Item, Err>
where
F: Future + 'static,
<F as Future>::Output: Into<Result<Item, Err>>,
S: LocalScheduler,
{
fn emit<O>(self, subscriber: Subscriber<O, LocalSubscription>)
where
O: Observer<Item = Self::Item, Err = Self::Err> + 'static,
{
let subscription = subscriber.subscription.clone();
let f = self.future.map(move |v| {
LocalEmitter::emit(of::ResultEmitter(v.into()), subscriber)
});
let (future, handle) = futures::future::abortable(f);
self.scheduler.spawn(future.map(|_| ()));
subscription.add(SpawnHandle::new(handle))
}
}
#[cfg(test)]
mod tests {
use super::*;
use bencher::Bencher;
use futures::{
executor::{LocalPool, ThreadPool},
future,
};
use std::{
cell::RefCell,
rc::Rc,
sync::{Arc, Mutex},
};
#[test]
fn shared() {
let res = Arc::new(Mutex::new(0));
let c_res = res.clone();
let pool = ThreadPool::new().unwrap();
{
from_future_result(future::ok(1), pool.clone())
.into_shared()
.subscribe(move |v| {
*res.lock().unwrap() = v;
});
std::thread::sleep(std::time::Duration::from_millis(10));
assert_eq!(*c_res.lock().unwrap(), 1);
}
let res = c_res.clone();
from_future(future::ready(2), pool)
.into_shared()
.subscribe(move |v| {
*res.lock().unwrap() = v;
});
std::thread::sleep(std::time::Duration::from_millis(10));
assert_eq!(*c_res.lock().unwrap(), 2);
}
#[test]
fn local() {
let mut local = LocalPool::new();
let value = Rc::new(RefCell::new(0));
let v_c = value.clone();
from_future_result(future::ok(1), local.spawner()).subscribe(move |v| {
*v_c.borrow_mut() = v;
});
local.run();
assert_eq!(*value.borrow(), 1);
let v_c = value.clone();
from_future(future::ready(2), local.spawner()).subscribe(move |v| {
*v_c.borrow_mut() = v;
});
local.run();
assert_eq!(*value.borrow(), 2);
}
#[test]
fn bench() { do_bench(); }
benchmark_group!(do_bench, bench_from_future);
fn bench_from_future(b: &mut Bencher) { b.iter(local); }
}