tokio_stream_ext/
switch_map.rs1use core::task::Context;
2use futures::StreamExt;
3use pin_project_lite::pin_project;
4use std::pin::Pin;
5use std::task::Poll;
6use tokio_stream::Stream;
7
8pin_project! {
9 pub struct SwitchMap<I, T, O>
10 {
11 #[pin]
12 from: I,
13 mapper: T,
14 #[pin]
15 mapped_stream: Option<O>,
16 }
17}
18
19#[allow(dead_code)]
20pub fn switch_map<I, T, O>(from: I, mapper: T) -> SwitchMap<I, T, O>
21where
22 I: Stream,
23 T: Fn(I::Item) -> Option<O> + Clone,
24 O: Stream,
25{
26 SwitchMap {
27 from: from,
28 mapper,
29 mapped_stream: None,
30 }
31}
32
33impl<I, T, O> Stream for SwitchMap<I, T, O>
34where
35 I: Stream,
36 T: Fn(I::Item) -> Option<O> + Clone,
37 O: Stream,
38{
39 type Item = O::Item;
40
41 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
42 let mut me = self.project();
43 while let Poll::Ready(p) = me.from.poll_next_unpin(cx) {
44 if let Some(state) = p {
45 me.mapped_stream.set((me.mapper)(state));
46 } else {
47 return Poll::Ready(None);
48 }
49 }
50 if let Some(mut mapped) = me.mapped_stream.as_mut().as_pin_mut() {
51 while let Poll::Ready(p) = mapped.poll_next_unpin(cx) {
52 if let Some(state) = p {
53 return Poll::Ready(Some(state));
54 } else {
55 me.mapped_stream.set(None);
56 return Poll::Pending;
57 }
58 }
59 }
60
61 Poll::Pending
62 }
63}