ex_futures/stream/
unsync_fork.rs

1use futures::{Stream, Poll, Async};
2
3use super::fork::Side;
4
5use std::rc::Rc;
6use std::collections::VecDeque;
7use std::cell::RefCell;
8
9
10pub type LeftUnsyncFork<S, F> = UnsyncFork<S, F>;
11pub type RightUnsyncFork<S, F> = UnsyncFork<S, F>;
12
13
14
15/// UnsyncFork given stream into two stream. Please have a look at document of `StreamExt` trait.
16pub fn unsync_fork<S, F, T>(stream: S, router: F) -> (LeftUnsyncFork<S, F>, RightUnsyncFork<S, F>)
17where
18    S: Stream,
19    F: FnMut(&S::Item) -> T,
20    Side: From<T>,
21{
22    let shared = Shared {
23        router: router,
24        stream: stream,
25        queues: Queues::new(),
26    };
27
28    let shared = Rc::new(RefCell::new(shared));
29
30    let left = UnsyncFork {
31        route: Side::Left,
32        shared: shared.clone(),
33    };
34
35    let right = UnsyncFork {
36        route: Side::Right,
37        shared: shared,
38    };
39
40    (left, right)
41}
42
43
44
45#[derive(Debug)]
46struct Queues<T, E> {
47    left: VecDeque<Result<Option<T>, E>>,
48    right: VecDeque<Result<Option<T>, E>>,
49}
50
51
52impl<T, E> Queues<T, E> {
53    fn new() -> Queues<T, E> {
54        Queues {
55            left: VecDeque::new(),
56            right: VecDeque::new(),
57        }
58    }
59
60    fn get_queue_mut(&mut self, route: Side) -> &mut VecDeque<Result<Option<T>, E>> {
61        match route {
62            Side::Left => &mut self.left,
63            Side::Right => &mut self.right,
64        }
65    }
66
67    fn push_none(&mut self) {
68        self.left.push_back(Ok(None));
69        self.right.push_back(Ok(None));
70    }
71
72    fn push_err(&mut self, err: E)
73    where
74        E: Clone,
75    {
76        self.left.push_back(Err(err.clone()));
77        self.right.push_back(Err(err));
78    }
79}
80
81
82#[derive(Debug)]
83struct Shared<S: Stream, F> {
84    router: F,
85    stream: S,
86    queues: Queues<S::Item, S::Error>,
87}
88
89
90
91/// UnsyncFork any kind of stream into two stream like that the river branches.
92/// The closure being passed this function is called "router". Each item of original stream is
93/// passed to branch following to "router" decision.
94/// "Router" can return not only `Side` which is `Left` or `Right` but also
95/// `bool` (`true` is considered as `Left`).
96///
97/// # Examples
98///
99/// ```
100/// # extern crate futures;
101/// # extern crate ex_futures;
102/// use ex_futures::StreamExt;
103///
104/// # fn main() {
105/// let (tx, rx) = ::futures::sync::mpsc::channel::<usize>(42);
106///
107/// let (even, odd) = rx.unsync_fork(|i| i % 2 == 0);
108/// # }
109/// ```
110///
111/// # Notice
112///
113/// The value being returned by this function is not `Sync`. We will provide `Sync` version later.
114pub struct UnsyncFork<S: Stream, F> {
115    route: Side,
116    shared: Rc<RefCell<Shared<S, F>>>,
117}
118
119
120impl<S, F, T> Stream for UnsyncFork<S, F>
121where
122    S: Stream,
123    S::Error: Clone,
124    F: FnMut(&S::Item) -> T,
125    T: Into<Side>,
126{
127    type Item = S::Item;
128    type Error = S::Error;
129
130    fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
131        {
132            let mut shared = self.shared.borrow_mut();
133
134            let msg = shared.queues.get_queue_mut(self.route.clone()).pop_front();
135
136            let poll = match msg {
137                Some(Ok(Some(msg))) => return Ok(Async::Ready(Some(msg))),
138                Some(Ok(None)) => return Ok(Async::Ready(None)),
139                Some(Err(e)) => return Err(e),
140                None => shared.stream.poll(),
141            };
142
143            match poll {
144                Err(e) => shared.queues.push_err(e),
145                Ok(Async::Ready(Some(msg))) => {
146                    let route = (&mut shared.router)(&msg).into();
147                    shared.queues.get_queue_mut(route).push_back(Ok(Some(msg)));
148                }
149                Ok(Async::Ready(None)) => shared.queues.push_none(),
150                Ok(Async::NotReady) => {
151                    return Ok(Async::NotReady);
152                }
153            }
154        }
155
156        self.poll()
157    }
158}
159
160
161
162impl<S: Stream, F> ::std::fmt::Debug for UnsyncFork<S, F> {
163    fn fmt(&self, f: &mut ::std::fmt::Formatter) -> Result<(), ::std::fmt::Error> {
164        match self.route {
165            Side::Left => write!(f, "LeftRoute"),
166            Side::Right => write!(f, "RightRoute"),
167        }
168    }
169}