use futures::{Stream, StreamExt, stream};
use std::task::Poll;
pub(crate) fn map_last_stream<T>(
mut input: impl Stream<Item = T> + Unpin,
map_f: impl Fn(T, bool) -> T,
) -> impl Stream<Item = T> + Unpin {
let mut current_value = None;
stream::poll_fn(move |cx| match futures::ready!(input.poll_next_unpin(cx)) {
Some(new_val) => {
match current_value.take() {
None => {
current_value = Some(new_val);
cx.waker().wake_by_ref();
Poll::Pending
}
Some(existing) => {
current_value = Some(new_val);
Poll::Ready(Some(map_f(existing, false)))
}
}
}
None => match current_value.take() {
Some(existing) => {
cx.waker().wake_by_ref();
Poll::Ready(Some(map_f(existing, true)))
}
None => Poll::Ready(None),
},
})
}
#[cfg(test)]
mod tests {
use super::*;
use futures::stream;
#[tokio::test]
async fn test_map_last_stream_empty_stream() {
let input = stream::empty::<i32>();
let mapped = map_last_stream(input, |x, _| x + 10);
let result: Vec<i32> = mapped.collect().await;
assert_eq!(result, Vec::<i32>::new());
}
#[tokio::test]
async fn test_map_last_stream_single_element() {
let input = stream::iter(vec![5]);
let mapped = map_last_stream(input, |x, _| x * 2);
let result: Vec<i32> = mapped.collect().await;
assert_eq!(result, vec![10]);
}
#[tokio::test]
async fn test_map_last_stream_multiple_elements() {
let input = stream::iter(vec![1, 2, 3, 4]);
let mapped = map_last_stream(input, |x, is_last| if is_last { x + 100 } else { x });
let result: Vec<i32> = mapped.collect().await;
assert_eq!(result, vec![1, 2, 3, 104]); }
#[tokio::test]
async fn test_map_last_stream_preserves_order() {
let input = stream::iter(vec![10, 20, 30, 40, 50]);
let mapped = map_last_stream(input, |x, is_last| if is_last { x - 50 } else { x });
let result: Vec<i32> = mapped.collect().await;
assert_eq!(result, vec![10, 20, 30, 40, 0]); }
}