futures_rx/stream_ext/
start_with.rs1use std::{
2 collections::VecDeque,
3 pin::Pin,
4 task::{Context, Poll},
5};
6
7use futures::{
8 stream::{Fuse, FusedStream},
9 Stream, StreamExt,
10};
11use pin_project_lite::pin_project;
12
13pin_project! {
14 #[must_use = "streams do nothing unless polled"]
16 pub struct StartWith<S: Stream> {
17 #[pin]
18 stream: Fuse<S>,
19 value: Option<VecDeque<S::Item>>,
20 }
21}
22
23impl<S: Stream> StartWith<S> {
24 pub(crate) fn new<I: IntoIterator<Item = S::Item>>(stream: S, value: I) -> Self {
25 let items = VecDeque::from_iter(value);
26
27 Self {
28 stream: stream.fuse(),
29 value: Some(items),
30 }
31 }
32}
33
34impl<S: FusedStream> FusedStream for StartWith<S> {
35 fn is_terminated(&self) -> bool {
36 self.stream.is_terminated()
37 }
38}
39
40impl<S: Stream> Stream for StartWith<S> {
41 type Item = S::Item;
42
43 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
44 let mut this = self.project();
45
46 if let Some(value) = this.value.as_mut() {
47 if let Some(event) = value.pop_front() {
48 return Poll::Ready(Some(event));
49 } else {
50 *this.value = None;
51 }
52 }
53
54 this.stream.as_mut().poll_next(cx)
55 }
56
57 fn size_hint(&self) -> (usize, Option<usize>) {
58 let len = self.value.as_ref().map(|it| it.len()).unwrap_or_default();
59 let (a, b) = self.stream.size_hint();
60
61 (a + len, b.map(|it| it + len))
62 }
63}
64
65#[cfg(test)]
66mod test {
67 use futures::{executor::block_on, stream, StreamExt};
68
69 use crate::RxExt;
70
71 #[test]
72 fn smoke() {
73 block_on(async {
74 let stream = stream::iter(1..=5);
75 let all_events = stream.start_with([0]).collect::<Vec<_>>().await;
76
77 assert_eq!(all_events, [0, 1, 2, 3, 4, 5]);
78 });
79 }
80}