ex_futures/stream/
unsync_fork.rs1use futures::{Stream, Poll, Async};
2
3use super::fork::Side;
4
5use std::rc::Rc;
6use std::collections::VecDeque;
7use std::cell::RefCell;
8
9
10pub type LeftUnsyncFork<S, F> = UnsyncFork<S, F>;
11pub type RightUnsyncFork<S, F> = UnsyncFork<S, F>;
12
13
14
15pub fn unsync_fork<S, F, T>(stream: S, router: F) -> (LeftUnsyncFork<S, F>, RightUnsyncFork<S, F>)
17where
18 S: Stream,
19 F: FnMut(&S::Item) -> T,
20 Side: From<T>,
21{
22 let shared = Shared {
23 router: router,
24 stream: stream,
25 queues: Queues::new(),
26 };
27
28 let shared = Rc::new(RefCell::new(shared));
29
30 let left = UnsyncFork {
31 route: Side::Left,
32 shared: shared.clone(),
33 };
34
35 let right = UnsyncFork {
36 route: Side::Right,
37 shared: shared,
38 };
39
40 (left, right)
41}
42
43
44
45#[derive(Debug)]
46struct Queues<T, E> {
47 left: VecDeque<Result<Option<T>, E>>,
48 right: VecDeque<Result<Option<T>, E>>,
49}
50
51
52impl<T, E> Queues<T, E> {
53 fn new() -> Queues<T, E> {
54 Queues {
55 left: VecDeque::new(),
56 right: VecDeque::new(),
57 }
58 }
59
60 fn get_queue_mut(&mut self, route: Side) -> &mut VecDeque<Result<Option<T>, E>> {
61 match route {
62 Side::Left => &mut self.left,
63 Side::Right => &mut self.right,
64 }
65 }
66
67 fn push_none(&mut self) {
68 self.left.push_back(Ok(None));
69 self.right.push_back(Ok(None));
70 }
71
72 fn push_err(&mut self, err: E)
73 where
74 E: Clone,
75 {
76 self.left.push_back(Err(err.clone()));
77 self.right.push_back(Err(err));
78 }
79}
80
81
82#[derive(Debug)]
83struct Shared<S: Stream, F> {
84 router: F,
85 stream: S,
86 queues: Queues<S::Item, S::Error>,
87}
88
89
90
91pub struct UnsyncFork<S: Stream, F> {
115 route: Side,
116 shared: Rc<RefCell<Shared<S, F>>>,
117}
118
119
120impl<S, F, T> Stream for UnsyncFork<S, F>
121where
122 S: Stream,
123 S::Error: Clone,
124 F: FnMut(&S::Item) -> T,
125 T: Into<Side>,
126{
127 type Item = S::Item;
128 type Error = S::Error;
129
130 fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
131 {
132 let mut shared = self.shared.borrow_mut();
133
134 let msg = shared.queues.get_queue_mut(self.route.clone()).pop_front();
135
136 let poll = match msg {
137 Some(Ok(Some(msg))) => return Ok(Async::Ready(Some(msg))),
138 Some(Ok(None)) => return Ok(Async::Ready(None)),
139 Some(Err(e)) => return Err(e),
140 None => shared.stream.poll(),
141 };
142
143 match poll {
144 Err(e) => shared.queues.push_err(e),
145 Ok(Async::Ready(Some(msg))) => {
146 let route = (&mut shared.router)(&msg).into();
147 shared.queues.get_queue_mut(route).push_back(Ok(Some(msg)));
148 }
149 Ok(Async::Ready(None)) => shared.queues.push_none(),
150 Ok(Async::NotReady) => {
151 return Ok(Async::NotReady);
152 }
153 }
154 }
155
156 self.poll()
157 }
158}
159
160
161
162impl<S: Stream, F> ::std::fmt::Debug for UnsyncFork<S, F> {
163 fn fmt(&self, f: &mut ::std::fmt::Formatter) -> Result<(), ::std::fmt::Error> {
164 match self.route {
165 Side::Left => write!(f, "LeftRoute"),
166 Side::Right => write!(f, "RightRoute"),
167 }
168 }
169}