use futures::channel::mpsc;
use std::sync::{Arc};
use futures::prelude::*;
use futures::executor;
use ::desync::*;
use std::thread;
#[test]
#[cfg(not(miri))] fn pipe_blockage() {
let (mut sender0, mut stream0) = mpsc::channel(1000);
executor::block_on(async { sender0.send(0).await.unwrap() });
let (mut sender, stream) = mpsc::channel(1000);
thread::spawn(move || {
executor::block_on(async move {
while let Some(counter) = stream0.next().await {
sender.send(counter).await.unwrap();
}
})
});
let mut stream2 = pipe(Arc::new(Desync::new(0)), stream, move |_, item| {
let result = item + 1;
futures::future::ready(result).boxed()
});
let mut next_expected = 1i64;
executor::block_on(async move {
println!("Running...");
loop {
let next = stream2.next().await.unwrap();
sender0.send(next).await.unwrap();
assert_eq!(next_expected, next);
if (next_expected % 10_000) == 0 {
println!("{} iterations", next_expected);
}
next_expected += 1;
if next_expected > 100_000 {
break;
}
}
});
}
#[test]
#[cfg(not(miri))] fn pipe_through() {
for _ in 0..1000 {
let (mut sender, receiver) = mpsc::channel(10);
let obj = Arc::new(Desync::new(1));
let mut pipe_out = pipe(Arc::clone(&obj), receiver, |core, item| future::ready(item + *core).boxed());
executor::block_on(async {
sender.send(2).await.unwrap();
assert!(pipe_out.next().await == Some(3));
sender.send(42).await.unwrap();
assert!(pipe_out.next().await == Some(43));
obj.sync(|_| { });
obj.desync(|core| *core = 2);
sender.send(44).await.unwrap();
let val = pipe_out.next().await;
println!("{:?}", val);
assert!(val == Some(46));
});
}
}