co_primitives/library/
co_try_stream_ext.rs1use futures::{pin_mut, Stream, TryStreamExt};
5use std::{marker::PhantomData, task::Poll};
6
7#[async_trait::async_trait]
8pub trait CoTryStreamExt: Stream<Item = Result<Self::Ok, Self::Error>> {
9 type Ok;
10 type Error;
11
12 async fn try_first(self) -> Result<Option<Self::Ok>, Self::Error>
13 where
14 Self: Sized,
15 {
16 Ok(try_first(self).await?)
17 }
18
19 fn try_ignore_elements<T>(self) -> TryIgnoreElements<Self, T>
21 where
22 Self: Sized,
23 {
24 TryIgnoreElements { inner: self, _out: PhantomData }
25 }
26}
27impl<S, T, E> CoTryStreamExt for S
28where
29 S: ?Sized + Stream<Item = Result<T, E>>,
30{
31 type Ok = T;
32 type Error = E;
33}
34
35async fn try_first<T, E, S>(stream: S) -> Result<Option<T>, E>
36where
37 S: Stream<Item = Result<T, E>> + Sized,
38{
39 pin_mut!(stream);
40 stream.try_next().await
41}
42
43#[pin_project::pin_project]
44pub struct TryIgnoreElements<S, O> {
45 #[pin]
46 inner: S,
47 _out: PhantomData<O>,
48}
49impl<S, T, E, O> Stream for TryIgnoreElements<S, O>
50where
51 S: Stream<Item = Result<T, E>>,
52{
53 type Item = Result<O, E>;
54
55 fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
56 let mut this = self.project();
57 match this.inner.as_mut().poll_next(cx) {
58 Poll::Ready(Some(Ok(_))) => Poll::Pending,
60 Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
62 Poll::Ready(None) => Poll::Ready(None),
64 Poll::Pending => Poll::Pending,
66 }
67 }
68}