tokio_stream_ext/
switch_map.rs

1use core::task::Context;
2use futures::StreamExt;
3use pin_project_lite::pin_project;
4use std::pin::Pin;
5use std::task::Poll;
6use tokio_stream::Stream;
7
8pin_project! {
9    pub struct SwitchMap<I, T, O>
10    {
11        #[pin]
12        from: I,
13        mapper: T,
14        #[pin]
15        mapped_stream: Option<O>,
16    }
17}
18
19#[allow(dead_code)]
20pub fn switch_map<I, T, O>(from: I, mapper: T) -> SwitchMap<I, T, O>
21where
22    I: Stream,
23    T: Fn(I::Item) -> Option<O> + Clone,
24    O: Stream,
25{
26    SwitchMap {
27        from: from,
28        mapper,
29        mapped_stream: None,
30    }
31}
32
33impl<I, T, O> Stream for SwitchMap<I, T, O>
34where
35    I: Stream,
36    T: Fn(I::Item) -> Option<O> + Clone,
37    O: Stream,
38{
39    type Item = O::Item;
40
41    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
42        let mut me = self.project();
43        while let Poll::Ready(p) = me.from.poll_next_unpin(cx) {
44            if let Some(state) = p {
45                me.mapped_stream.set((me.mapper)(state));
46            } else {
47                return Poll::Ready(None);
48            }
49        }
50        if let Some(mut mapped) = me.mapped_stream.as_mut().as_pin_mut() {
51            while let Poll::Ready(p) = mapped.poll_next_unpin(cx) {
52                if let Some(state) = p {
53                    return Poll::Ready(Some(state));
54                } else {
55                    me.mapped_stream.set(None);
56                    return Poll::Pending;
57                }
58            }
59        }
60
61        Poll::Pending
62    }
63}