streem/streamer.rs
1//! Types and Traits for consuming Streams
2
3use std::pin::Pin;
4
5/// Enables consuming any Stream + Unpin
6pub trait IntoStreamer: futures_core::Stream {
7 /// Construct a type that consumes the stream
8 ///
9 /// Example
10 /// ```rust
11 /// use streem::IntoStreamer;
12 /// let stream = streem::from_fn(|yielder| async move { yielder.yield_(1).await; });
13 /// let pinned = std::pin::pin!(stream);
14 /// let streamer = pinned.into_streamer();
15 /// ```
16 fn into_streamer(self) -> Streamer<Self>
17 where
18 Self: Sized + Unpin,
19 {
20 Streamer { inner: self }
21 }
22}
23
24impl<S> IntoStreamer for S where S: futures_core::Stream {}
25
26/// Enables calling async next on a stream
27pub struct Streamer<S> {
28 inner: S,
29}
30
31impl<S> Streamer<S> {
32 /// Receive the next value from the stream
33 ///
34 /// Example:
35 /// ```rust
36 /// let input_stream = std::pin::pin!(streem::from_fn(|yielder| async move {
37 /// # for i in 0..10 {
38 /// # yielder.yield_(i).await;
39 /// # }
40 /// }));
41 /// use streem::IntoStreamer;
42 ///
43 /// let mut streamer = input_stream.into_streamer();
44 ///
45 /// # let _ = futures_executor::block_on(async move {
46 /// while let Some(item) = streamer.next().await {
47 /// println!("{item}");
48 /// }
49 /// # });
50 /// ```
51 pub async fn next(&mut self) -> Option<S::Item>
52 where
53 S: futures_core::Stream + Unpin,
54 {
55 std::future::poll_fn(|cx| Pin::new(&mut self.inner).poll_next(cx)).await
56 }
57
58 /// Receive a result of the next Ok stream value, or the given Err value
59 ///
60 /// Example:
61 /// ```rust
62 /// # fn fallible_fn(i: i32) -> Result<i32, String> {
63 /// # Ok(i)
64 /// # }
65 /// #
66 /// let input_stream = std::pin::pin!(streem::try_from_fn(|yielder| async move {
67 /// # for i in 0..10 {
68 /// # let value = fallible_fn(i)?;
69 /// #
70 /// # yielder.yield_ok(value).await;
71 /// # }
72 /// #
73 /// # Ok(()) as Result<_, String>
74 /// }));
75 /// use streem::IntoStreamer;
76 ///
77 /// let mut streamer = input_stream.into_streamer();
78 ///
79 /// # let _ = futures_executor::block_on(async move {
80 /// while let Some(item) = streamer.try_next().await? {
81 /// println!("{item}");
82 /// }
83 /// # Ok(()) as Result<_, String>
84 /// # });
85 /// ```
86 pub async fn try_next<T, E>(&mut self) -> Result<Option<T>, E>
87 where
88 S: futures_core::Stream<Item = Result<T, E>> + Unpin,
89 {
90 self.next().await.transpose()
91 }
92}