Skip to main content

streamx/
latest.rs

1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use futures::Stream;
5
6/// A stream that yields only the latest value from the upstream when polled.
7///
8/// When polled, this stream drains all immediately available items from the
9/// upstream and yields only the most recent one. All previously buffered items
10/// are discarded.
11///
12/// This is useful when you only care about the most recent state and want to
13/// skip intermediate values that arrived while the downstream was not polling.
14pub struct LatestStream<T> {
15  source: Pin<Box<T>>,
16}
17
18impl<T: Stream> Stream for LatestStream<T> {
19  type Item = T::Item;
20
21  fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
22    let mut latest: Option<T::Item> = None;
23
24    loop {
25      match self.source.as_mut().poll_next(cx) {
26        Poll::Ready(Some(item)) => {
27          // Keep draining and store only the latest
28          latest = Some(item);
29        }
30        Poll::Ready(None) => {
31          // Stream ended - return any pending latest or None
32          return Poll::Ready(latest);
33        }
34        Poll::Pending => {
35          // No more ready items - return latest if we got one
36          if let Some(item) = latest {
37            return Poll::Ready(Some(item));
38          }
39          return Poll::Pending;
40        }
41      }
42    }
43  }
44}
45
46/// Extension trait that adds `.latest()` to streams.
47pub trait StreamLatestExt: Stream + Sized {
48  /// Wraps this stream so that only the latest available value is yielded when polled.
49  ///
50  /// This is useful when you have a fast producer and a slow consumer, and you
51  /// only care about the most recent value rather than processing every item.
52  ///
53  /// # Example
54  ///
55  /// ```
56  /// use futures::StreamExt;
57  /// use futures::executor::block_on;
58  /// use streamx::StreamLatestExt;
59  ///
60  /// let mut latest = futures::stream::iter([1_u32, 2, 3]).latest();
61  ///
62  /// // The iterator is immediately ready, so only the latest value (3) is returned.
63  /// let value = block_on(async { latest.next().await });
64  /// assert_eq!(value, Some(3));
65  /// ```
66  fn latest(self) -> LatestStream<Self> {
67    LatestStream {
68      source: Box::pin(self),
69    }
70  }
71}
72
73impl<T: Stream + Sized> StreamLatestExt for T {}
74
75#[cfg(test)]
76mod tests {
77  use std::pin::Pin;
78  use std::task::{Context, Poll};
79
80  use futures::StreamExt;
81
82  use super::StreamLatestExt;
83
84  struct MpscStream<T>(tokio::sync::mpsc::UnboundedReceiver<T>);
85
86  impl<T> futures::Stream for MpscStream<T> {
87    type Item = T;
88
89    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
90      self.0.poll_recv(cx)
91    }
92  }
93
94  #[tokio::test]
95  async fn latest_returns_most_recent_value() {
96    let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<u32>();
97
98    tx.send(1).unwrap();
99    tx.send(2).unwrap();
100    tx.send(3).unwrap();
101
102    let mut latest = MpscStream(rx).latest();
103
104    // Should skip 1 and 2, returning only 3
105    let value = latest.next().await;
106    assert_eq!(value, Some(3));
107  }
108
109  #[tokio::test]
110  async fn latest_returns_each_value_when_polled_immediately() {
111    let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<u32>();
112
113    let mut latest = MpscStream(rx).latest();
114
115    tx.send(1).unwrap();
116    assert_eq!(latest.next().await, Some(1));
117
118    tx.send(2).unwrap();
119    assert_eq!(latest.next().await, Some(2));
120
121    tx.send(3).unwrap();
122    assert_eq!(latest.next().await, Some(3));
123  }
124
125  #[tokio::test]
126  async fn latest_returns_none_when_stream_ends() {
127    let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<u32>();
128
129    let mut latest = MpscStream(rx).latest();
130
131    drop(tx);
132
133    assert_eq!(latest.next().await, None);
134  }
135
136  #[tokio::test]
137  async fn latest_returns_final_value_when_stream_ends_with_pending_items() {
138    let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<u32>();
139
140    tx.send(1).unwrap();
141    tx.send(2).unwrap();
142    drop(tx);
143
144    let mut latest = MpscStream(rx).latest();
145
146    // Should return the latest value (2) even though stream is closed
147    assert_eq!(latest.next().await, Some(2));
148    // Then return None
149    assert_eq!(latest.next().await, None);
150  }
151
152  #[tokio::test]
153  async fn latest_works_with_iter_stream() {
154    let mut latest = futures::stream::iter([1_u32, 2, 3, 4, 5]).latest();
155
156    // All values are immediately ready, so should return only the last
157    assert_eq!(latest.next().await, Some(5));
158    assert_eq!(latest.next().await, None);
159  }
160
161  #[tokio::test]
162  async fn latest_skips_intermediate_values_between_polls() {
163    let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<u32>();
164
165    let mut latest = MpscStream(rx).latest();
166
167    // First batch
168    tx.send(1).unwrap();
169    tx.send(2).unwrap();
170    assert_eq!(latest.next().await, Some(2));
171
172    // Second batch
173    tx.send(10).unwrap();
174    tx.send(20).unwrap();
175    tx.send(30).unwrap();
176    assert_eq!(latest.next().await, Some(30));
177
178    // Single value
179    tx.send(100).unwrap();
180    assert_eq!(latest.next().await, Some(100));
181
182    drop(tx);
183    assert_eq!(latest.next().await, None);
184  }
185}