extern crate desync;
extern crate flo_stream;
extern crate futures;
extern crate rand;
use desync::*;
use flo_stream::*;
use futures::*;
use futures::stream;
use futures::executor;
use std::sync::*;
use std::time;
fn main() {
fn count_zeros<In: 'static+Unpin+Send+Stream<Item=Vec<u32>>>(input: In) -> impl Stream<Item=Result<u32, ()>> {
let worker = Arc::new(Desync::new(()));
pipe(worker, input, |_state, next| {
async move {
let mut count = 0;
let mut _some_count = 0;
let start = time::SystemTime::now();
while time::SystemTime::now().duration_since(start).unwrap() < time::Duration::from_millis(10) {
_some_count += 1;
}
for val in next {
if val == 0 {
count += 1;
}
}
Ok(count)
}.boxed()
})
}
let work_publisher = SinglePublisher::new(1);
let mut work_publisher = work_publisher.to_sink();
let workers = (0..5).into_iter()
.map(|_| count_zeros(work_publisher.subscribe().unwrap()))
.collect::<Vec<_>>();
let input_stream = stream::iter::<_>((0..10_000_000)
.into_iter()
.map(|_| rand::random::<u32>() % 1024));
let input_work = input_stream.chunks(32000)
.map(|val| Ok(val));
let work_done = input_work.forward(work_publisher);
let final_count = Arc::new(Desync::new(0));
workers.into_iter().for_each(|worker| {
pipe_in(final_count.clone(), worker, |state, next| {
async move {
*state += next.unwrap();
println!("So far: {}", *state);
}.boxed()
});
});
executor::block_on(async {
work_done.await.unwrap();
});
final_count.sync(|count| println!("Final count was {}", count));
}