streem/
streamer.rs

1//! Types and Traits for consuming Streams
2
3use std::pin::Pin;
4
5/// Enables consuming any Stream + Unpin
6pub trait IntoStreamer: futures_core::Stream {
7    /// Construct a type that consumes the stream
8    ///
9    /// Example
10    /// ```rust
11    /// use streem::IntoStreamer;
12    /// let stream = streem::from_fn(|yielder| async move { yielder.yield_(1).await; });
13    /// let pinned = std::pin::pin!(stream);
14    /// let streamer = pinned.into_streamer();
15    /// ```
16    fn into_streamer(self) -> Streamer<Self>
17    where
18        Self: Sized + Unpin,
19    {
20        Streamer { inner: self }
21    }
22}
23
24impl<S> IntoStreamer for S where S: futures_core::Stream {}
25
26/// Enables calling async next on a stream
27pub struct Streamer<S> {
28    inner: S,
29}
30
31impl<S> Streamer<S> {
32    /// Receive the next value from the stream
33    ///
34    /// Example:
35    /// ```rust
36    /// let input_stream = std::pin::pin!(streem::from_fn(|yielder| async move {
37    /// #    for i in 0..10 {
38    /// #        yielder.yield_(i).await;
39    /// #    }
40    /// }));
41    /// use streem::IntoStreamer;
42    ///
43    /// let mut streamer = input_stream.into_streamer();
44    ///
45    /// # let _ = futures_executor::block_on(async move {
46    /// while let Some(item) = streamer.next().await {
47    ///     println!("{item}");
48    /// }
49    /// # });
50    /// ```
51    pub async fn next(&mut self) -> Option<S::Item>
52    where
53        S: futures_core::Stream + Unpin,
54    {
55        std::future::poll_fn(|cx| Pin::new(&mut self.inner).poll_next(cx)).await
56    }
57
58    /// Receive a result of the next Ok stream value, or the given Err value
59    ///
60    /// Example:
61    /// ```rust
62    /// # fn fallible_fn(i: i32) -> Result<i32, String> {
63    /// # Ok(i)
64    /// # }
65    /// #
66    /// let input_stream = std::pin::pin!(streem::try_from_fn(|yielder| async move {
67    /// #    for i in 0..10 {
68    /// #        let value = fallible_fn(i)?;
69    /// #
70    /// #        yielder.yield_ok(value).await;
71    /// #    }
72    /// #
73    /// #    Ok(()) as Result<_, String>
74    /// }));
75    /// use streem::IntoStreamer;
76    ///
77    /// let mut streamer = input_stream.into_streamer();
78    ///
79    /// # let _ = futures_executor::block_on(async move {
80    /// while let Some(item) = streamer.try_next().await? {
81    ///     println!("{item}");
82    /// }
83    /// # Ok(()) as Result<_, String>
84    /// # });
85    /// ```
86    pub async fn try_next<T, E>(&mut self) -> Result<Option<T>, E>
87    where
88        S: futures_core::Stream<Item = Result<T, E>> + Unpin,
89    {
90        self.next().await.transpose()
91    }
92}