1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use futures::Stream;
5
6pub struct LatestStream<T> {
15 source: Pin<Box<T>>,
16}
17
18impl<T: Stream> Stream for LatestStream<T> {
19 type Item = T::Item;
20
21 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
22 let mut latest: Option<T::Item> = None;
23
24 loop {
25 match self.source.as_mut().poll_next(cx) {
26 Poll::Ready(Some(item)) => {
27 latest = Some(item);
29 }
30 Poll::Ready(None) => {
31 return Poll::Ready(latest);
33 }
34 Poll::Pending => {
35 if let Some(item) = latest {
37 return Poll::Ready(Some(item));
38 }
39 return Poll::Pending;
40 }
41 }
42 }
43 }
44}
45
46pub trait StreamLatestExt: Stream + Sized {
48 fn latest(self) -> LatestStream<Self> {
67 LatestStream {
68 source: Box::pin(self),
69 }
70 }
71}
72
73impl<T: Stream + Sized> StreamLatestExt for T {}
74
75#[cfg(test)]
76mod tests {
77 use std::pin::Pin;
78 use std::task::{Context, Poll};
79
80 use futures::StreamExt;
81
82 use super::StreamLatestExt;
83
84 struct MpscStream<T>(tokio::sync::mpsc::UnboundedReceiver<T>);
85
86 impl<T> futures::Stream for MpscStream<T> {
87 type Item = T;
88
89 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
90 self.0.poll_recv(cx)
91 }
92 }
93
94 #[tokio::test]
95 async fn latest_returns_most_recent_value() {
96 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<u32>();
97
98 tx.send(1).unwrap();
99 tx.send(2).unwrap();
100 tx.send(3).unwrap();
101
102 let mut latest = MpscStream(rx).latest();
103
104 let value = latest.next().await;
106 assert_eq!(value, Some(3));
107 }
108
109 #[tokio::test]
110 async fn latest_returns_each_value_when_polled_immediately() {
111 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<u32>();
112
113 let mut latest = MpscStream(rx).latest();
114
115 tx.send(1).unwrap();
116 assert_eq!(latest.next().await, Some(1));
117
118 tx.send(2).unwrap();
119 assert_eq!(latest.next().await, Some(2));
120
121 tx.send(3).unwrap();
122 assert_eq!(latest.next().await, Some(3));
123 }
124
125 #[tokio::test]
126 async fn latest_returns_none_when_stream_ends() {
127 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<u32>();
128
129 let mut latest = MpscStream(rx).latest();
130
131 drop(tx);
132
133 assert_eq!(latest.next().await, None);
134 }
135
136 #[tokio::test]
137 async fn latest_returns_final_value_when_stream_ends_with_pending_items() {
138 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<u32>();
139
140 tx.send(1).unwrap();
141 tx.send(2).unwrap();
142 drop(tx);
143
144 let mut latest = MpscStream(rx).latest();
145
146 assert_eq!(latest.next().await, Some(2));
148 assert_eq!(latest.next().await, None);
150 }
151
152 #[tokio::test]
153 async fn latest_works_with_iter_stream() {
154 let mut latest = futures::stream::iter([1_u32, 2, 3, 4, 5]).latest();
155
156 assert_eq!(latest.next().await, Some(5));
158 assert_eq!(latest.next().await, None);
159 }
160
161 #[tokio::test]
162 async fn latest_skips_intermediate_values_between_polls() {
163 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<u32>();
164
165 let mut latest = MpscStream(rx).latest();
166
167 tx.send(1).unwrap();
169 tx.send(2).unwrap();
170 assert_eq!(latest.next().await, Some(2));
171
172 tx.send(10).unwrap();
174 tx.send(20).unwrap();
175 tx.send(30).unwrap();
176 assert_eq!(latest.next().await, Some(30));
177
178 tx.send(100).unwrap();
180 assert_eq!(latest.next().await, Some(100));
181
182 drop(tx);
183 assert_eq!(latest.next().await, None);
184 }
185}