use futures::{future, stream};
use futures::sync::mpsc;
use tokio::runtime::current_thread::Runtime;
use corona;
use corona::prelude::*;
struct Cor {
coroutine: Coroutine,
runtime: Runtime,
}
impl Cor {
fn new() -> Cor {
let runtime = Runtime::new().unwrap();
let coroutine = Coroutine::new();
Cor {
coroutine,
runtime,
}
}
fn cor_ft<F: FnOnce() -> u32 + 'static>(&mut self, f: F) {
let coroutine = self.coroutine.clone();
let result = self.runtime.block_on(future::lazy(move || coroutine.spawn(f).unwrap()));
assert_eq!(42, result.unwrap());
}
}
#[test]
fn coro_wait() {
Cor::new().cor_ft(|| future::ok::<_, ()>(42).coro_wait().unwrap());
}
#[test]
fn iter_ok() {
Cor::new().cor_ft(|| stream::once::<_, ()>(Ok(42)).iter_ok().sum());
}
#[test]
fn iter_ok_many() {
Cor::new().cor_ft(|| stream::iter_result(vec![Ok(42), Err(()), Ok(100)]).iter_ok().sum());
}
#[test]
fn iter_result() {
Cor::new()
.cor_ft(|| {
stream::iter_result(vec![Ok(12), Err(()), Ok(30)])
.iter_result()
.filter_map(Result::ok)
.sum()
});
}
#[test]
fn reference() {
Cor::new()
.cor_ft(|| {
struct Num(u32);
let num = Num(42);
let num_ref = #
future::ok::<_, ()>(num_ref)
.coro_wait()
.map(|&Num(num)| num)
.unwrap()
});
}
#[test]
fn push_sink() {
let sum = Coroutine::new().run(|| {
let (mut sender, receiver) = mpsc::channel(1);
corona::spawn(move || {
sender.coro_send(2).unwrap();
sender.coro_send_many(vec![20, 20]).unwrap().unwrap();
});
receiver.iter_ok().sum()
}).unwrap();
assert_eq!(42, sum);
}
#[test]
fn extract() {
let mut cor = Cor::new();
let mut s = stream::once::<_, ()>(Ok(42));
cor.cor_ft(move || s.coro_next().unwrap().unwrap());
}