type_toppings/
stream.rs

1impl<S> crate::StreamExt for S
2where
3    S: futures::stream::Stream,
4{
5    fn chain_ready<T>(self, item: T) -> futures::stream::Chain<Self, futures::stream::Once<std::future::Ready<T>>>
6    where
7        Self: Sized,
8        Self: futures::Stream<Item = T>,
9    {
10        self.chain_future(std::future::ready(item))
11    }
12
13    fn chain_future<T, F>(self, fut: F) -> futures::stream::Chain<Self, futures::stream::Once<F>>
14    where
15        Self: Sized,
16        Self: futures::Stream<Item = T>,
17        F: core::future::Future<Output = T>,
18    {
19        futures::StreamExt::chain(self, futures::stream::once(fut))
20    }
21}
22
23#[cfg(test)]
24mod tests {
25    use crate::StreamExt as _;
26
27    #[test]
28    fn test_chain_ready() {
29        let initial_stream = futures::stream::iter(vec![1, 2, 3]);
30        let chained_stream = initial_stream.chain_ready(4);
31
32        let collected: Vec<_> = futures::executor::block_on_stream(chained_stream).collect();
33
34        assert_eq!(collected, vec![1, 2, 3, 4]);
35    }
36
37    #[test]
38    fn test_chain_future() {
39        let initial_stream = futures::stream::iter(vec![1, 2, 3]);
40        let chained_stream = initial_stream.chain_future(Box::pin(async { 4 }));
41
42        let collected: Vec<_> = futures::executor::block_on_stream(chained_stream).collect();
43
44        assert_eq!(collected, vec![1, 2, 3, 4]);
45    }
46}