gen_z/
lib.rs

1use futures::{
2  channel::mpsc::{self, Sender},
3  future::{Either, FutureExt},
4  stream::{self, StreamExt},
5  Future, Stream,
6};
7
8pub fn gen_z_sender<
9  'fut,
10  T: 'fut + Send,
11  Fut: Future<Output = ()> + 'fut + Send,
12  Gen: FnOnce(Sender<T>) -> Fut,
13>(
14  gen: Gen,
15) -> impl Stream<Item = T> + 'fut {
16  let (send, recv) = mpsc::channel::<T>(0); // 0 + #senders
17  let fut: Fut = gen(send);
18  // HACK: Join as a stream to ensure future and outputs are polled evenly, and filter out fut's result
19  let fut = fut.map(Either::Left).into_stream();
20  let joined = stream::select(fut, recv.map(Either::Right)).filter_map(|x| {
21    futures::future::ready(match x {
22      Either::Left(_) => None,
23      Either::Right(y) => Some(y),
24    })
25  });
26  joined.fuse()
27}
28
29/// Just provides a nicer panic message for unexpected failures, and makes lifetimes more obvious
30async fn send_infallible<T: Send>(sender: &mut Sender<T>, item: T) -> () {
31  use futures::sink::SinkExt;
32  SinkExt::send(sender, item)
33    .await
34    .expect("Infallible sends should only be used where they cannot fail")
35}
36
37/// A wrapper around a Sender to restrict its behaviour to infallible sends; a "yield" helper
38pub struct Yielder<T: Send> {
39  sender: Sender<T>,
40}
41
42impl<T: Send> Yielder<T> {
43  pub fn new(sender: Sender<T>) -> Yielder<T> {
44    Yielder { sender }
45  }
46
47  pub fn send(&mut self, item: T) -> impl Future<Output = ()> + '_ {
48    send_infallible(&mut self.sender, item)
49  }
50
51  // TODO: Consider implementing a feature that implements trait-Fn to allow direct calls
52}
53
54/// Given an infallible future and a yield helper, produce a stream until the future arrives
55pub fn gen_z<
56  'fut,
57  T: 'fut + Send,
58  Fut: Future<Output = ()> + 'fut + Send,
59  Gen: 'fut + FnOnce(Yielder<T>) -> Fut,
60>(
61  gen: Gen,
62) -> impl Stream<Item = T> + 'fut {
63  gen_z_sender(|sender| gen(Yielder::<T>::new(sender)))
64}