sqlx_core/ext/
async_stream.rs1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures_channel::mpsc;
6use futures_core::future::BoxFuture;
7use futures_core::stream::Stream;
8use futures_util::{pin_mut, FutureExt, SinkExt};
9
10use crate::error::Error;
11
12pub struct TryAsyncStream<'a, T> {
13 receiver: mpsc::Receiver<Result<T, Error>>,
14 future: BoxFuture<'a, Result<(), Error>>,
15}
16
17impl<'a, T> TryAsyncStream<'a, T> {
18 pub fn new<F, Fut>(f: F) -> Self
19 where
20 F: FnOnce(mpsc::Sender<Result<T, Error>>) -> Fut + Send,
21 Fut: 'a + Future<Output = Result<(), Error>> + Send,
22 T: 'a + Send,
23 {
24 let (mut sender, receiver) = mpsc::channel(0);
25
26 let future = f(sender.clone());
27 let future = async move {
28 if let Err(error) = future.await {
29 let _ = sender.send(Err(error)).await;
30 }
31
32 Ok(())
33 }
34 .fuse()
35 .boxed();
36
37 Self { future, receiver }
38 }
39}
40
41impl<'a, T> Stream for TryAsyncStream<'a, T> {
42 type Item = Result<T, Error>;
43
44 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
45 let future = &mut self.future;
46 pin_mut!(future);
47
48 let _ = future.poll(cx);
52
53 let receiver = &mut self.receiver;
54 pin_mut!(receiver);
55
56 receiver.poll_next(cx)
58 }
59}
60
61macro_rules! try_stream {
62 ($($block:tt)*) => {
63 crate::ext::async_stream::TryAsyncStream::new(move |mut sender| async move {
64 macro_rules! r#yield {
65 ($v:expr) => {{
66 let _ = futures_util::sink::SinkExt::send(&mut sender, Ok($v)).await;
67 }}
68 }
69
70 $($block)*
71 })
72 }
73}