1use futures::{
2 channel::mpsc::{self, Sender},
3 future::{Either, FutureExt},
4 stream::{self, StreamExt},
5 Future, Stream,
6};
7
8pub fn gen_z_sender<
9 'fut,
10 T: 'fut + Send,
11 Fut: Future<Output = ()> + 'fut + Send,
12 Gen: FnOnce(Sender<T>) -> Fut,
13>(
14 gen: Gen,
15) -> impl Stream<Item = T> + 'fut {
16 let (send, recv) = mpsc::channel::<T>(0); let fut: Fut = gen(send);
18 let fut = fut.map(Either::Left).into_stream();
20 let joined = stream::select(fut, recv.map(Either::Right)).filter_map(|x| {
21 futures::future::ready(match x {
22 Either::Left(_) => None,
23 Either::Right(y) => Some(y),
24 })
25 });
26 joined.fuse()
27}
28
29async fn send_infallible<T: Send>(sender: &mut Sender<T>, item: T) -> () {
31 use futures::sink::SinkExt;
32 SinkExt::send(sender, item)
33 .await
34 .expect("Infallible sends should only be used where they cannot fail")
35}
36
37pub struct Yielder<T: Send> {
39 sender: Sender<T>,
40}
41
42impl<T: Send> Yielder<T> {
43 pub fn new(sender: Sender<T>) -> Yielder<T> {
44 Yielder { sender }
45 }
46
47 pub fn send(&mut self, item: T) -> impl Future<Output = ()> + '_ {
48 send_infallible(&mut self.sender, item)
49 }
50
51 }
53
54pub fn gen_z<
56 'fut,
57 T: 'fut + Send,
58 Fut: Future<Output = ()> + 'fut + Send,
59 Gen: 'fut + FnOnce(Yielder<T>) -> Fut,
60>(
61 gen: Gen,
62) -> impl Stream<Item = T> + 'fut {
63 gen_z_sender(|sender| gen(Yielder::<T>::new(sender)))
64}