promising_future/
futurestream.rs1use std::sync::Arc;
2use std::sync::mpsc::{Sender, Receiver, channel};
3use std::iter::FromIterator;
4
5use super::Pollresult::*;
6use super::Future;
7
8use cvmx::CvMx;
9
10#[derive(Clone)]
21pub struct FutureStream<T: Send> {
22 tx: Sender<Option<T>>, inner: Arc<CvMx<FutureStreamInner<T>>>, }
25
26pub struct FutureStreamWaiter<'a, T: Send + 'a> {
60 fs: &'a FutureStream<T>,
61 rx: Option<Receiver<Option<T>>>, }
63
64struct FutureStreamInner<T: Send> {
65 pending: usize,
66 rx: Option<Receiver<Option<T>>>, }
68
69impl<T: Send> FutureStream<T> {
70 pub fn new() -> FutureStream<T> {
71 let (tx, rx) = channel();
72 let inner = FutureStreamInner {
73 rx: Some(rx),
74 pending: 0,
75 };
76
77 FutureStream {
78 tx: tx,
79 inner: Arc::new(CvMx::new(inner)),
80 }
81 }
82
83 pub fn add(&self, fut: Future<T>) where T: 'static {
85 let mut inner = self.inner.mx.lock().unwrap();
86 let tx = self.tx.clone();
87
88 inner.pending += 1;
89 fut.callback_unit(move |v| { let _ = tx.send(v); })
91 }
92
93 pub fn outstanding(&self) -> usize {
95 self.inner.mx.lock().unwrap().pending
96 }
97
98 pub fn waiter<'fs>(&'fs self) -> FutureStreamWaiter<'fs, T> {
100 let mut inner = self.inner.mx.lock().unwrap();
101
102 loop {
103 match inner.rx.take() {
104 None => { inner = self.inner.cv.wait(inner).unwrap() },
105 Some(rx) => return FutureStreamWaiter::new(self, rx),
106 }
107 }
108 }
109
110 pub fn try_waiter<'fs>(&'fs self) -> Option<FutureStreamWaiter<'fs, T>> {
112 let mut inner = self.inner.mx.lock().unwrap();
113
114 match inner.rx.take() {
115 None => None,
116 Some(rx) => Some(FutureStreamWaiter::new(self, rx)),
117 }
118 }
119
120 fn return_waiter(&self, rx: Receiver<Option<T>>) {
121 let mut inner = self.inner.mx.lock().unwrap();
122
123 assert!(inner.rx.is_none());
124 inner.rx = Some(rx);
125 self.inner.cv.notify_one();
126 }
127
128 pub fn poll(&self) -> Option<Future<T>> {
130 self.waiter().poll()
131 }
132
133 pub fn wait(&self) -> Option<Future<T>> {
136 self.waiter().wait()
137 }
138}
139
140impl<'fs, T: Send> FutureStreamWaiter<'fs, T> {
141 fn new(fs: &'fs FutureStream<T>, rx: Receiver<Option<T>>) -> FutureStreamWaiter<'fs, T> {
142 FutureStreamWaiter { fs: fs, rx: Some(rx) }
143 }
144
145 pub fn wait(&mut self) -> Option<Future<T>> {
148 if { let l = self.fs.inner.mx.lock().unwrap(); l.pending == 0 } {
149 None
151 } else {
152 match self.rx.as_ref().unwrap().recv() {
154 Ok(val) => {
155 let mut l = self.fs.inner.mx.lock().unwrap();
156 l.pending -= 1;
157 Some(Future::from(val))
158 },
159 Err(_) => None,
160 }
161 }
162 }
163
164 pub fn poll(&mut self) -> Option<Future<T>> {
166 let mut inner = self.fs.inner.mx.lock().unwrap();
167
168 if inner.pending == 0 {
169 None
170 } else {
171 match self.rx.as_ref().unwrap().try_recv() {
172 Ok(val) => { inner.pending -= 1; Some(Future::from(val)) },
173 Err(_) => None,
174 }
175 }
176 }
177}
178
179impl<'fs, T: Send> Drop for FutureStreamWaiter<'fs, T> {
180 fn drop(&mut self) {
181 self.fs.return_waiter(self.rx.take().unwrap())
183 }
184}
185
186pub struct FutureStreamIter<'a, T: Send + 'a>(FutureStreamWaiter<'a, T>);
190
191impl<'fs, T: Send + 'fs> IntoIterator for FutureStreamWaiter<'fs, T> {
192 type Item = T;
193 type IntoIter = FutureStreamIter<'fs, T>;
194
195 fn into_iter(self) -> Self::IntoIter { FutureStreamIter(self) }
196}
197
198impl<'a, T: Send + 'a> Iterator for FutureStreamIter<'a, T> {
199 type Item = T;
200
201 fn next(&mut self) -> Option<Self::Item> {
203 loop {
204 match self.0.wait() {
205 None => return None,
206 Some(fut) => {
207 match fut.poll() {
208 Unresolved(_) => panic!("FutureStreamWait.wait returned unresolved Future"),
209 Resolved(v@Some(_)) => return v,
210 Resolved(None) => (),
211 }
212 },
213 }
214 }
215 }
216}
217
218impl<'a, T: Send + 'a> IntoIterator for &'a FutureStream<T> {
219 type Item = T;
220 type IntoIter = FutureStreamIter<'a, T>;
221
222 fn into_iter(self) -> Self::IntoIter { self.waiter().into_iter() }
223}
224
225impl<T: Send + 'static> FromIterator<Future<T>> for FutureStream<T> {
226 fn from_iter<I>(iterator: I) -> Self
228 where I: IntoIterator<Item=Future<T>>
229 {
230 let stream = FutureStream::new();
231 for f in iterator.into_iter() {
232 stream.add(f)
233 }
234
235 stream
236 }
237}