1#![warn(missing_docs)]
6#![doc(html_root_url="https://docs.rs/futures-state-stream/0.2")]
7
8#[macro_use]
9extern crate futures;
10
11use futures::{Async, Poll, Future, Stream};
12use std::mem;
13use std::panic::AssertUnwindSafe;
14
15pub enum StreamEvent<I, S> {
17 Next(I),
19 Done(S),
21}
22
23pub trait StateStream {
25 type Item;
27 type State;
29 type Error;
31
32 fn poll(&mut self) -> Poll<StreamEvent<Self::Item, Self::State>, (Self::Error, Self::State)>;
37
38 #[inline]
40 fn into_future(self) -> IntoFuture<Self>
41 where
42 Self: Sized,
43 {
44 IntoFuture(Some(self))
45 }
46
47 #[inline]
49 fn into_stream(self) -> IntoStream<Self>
50 where
51 Self: Sized,
52 {
53 IntoStream(self)
54 }
55
56 #[inline]
58 fn map<F, B>(self, f: F) -> Map<Self, F>
59 where
60 Self: Sized,
61 F: FnMut(Self::Item) -> B,
62 {
63 Map { stream: self, f: f }
64 }
65
66 #[inline]
68 fn map_err<F, B>(self, f: F) -> MapErr<Self, F>
69 where
70 Self: Sized,
71 F: FnMut(Self::Error) -> B,
72 {
73 MapErr { stream: self, f: f }
74 }
75
76 #[inline]
78 fn map_state<F, B>(self, f: F) -> MapState<Self, F>
79 where
80 Self: Sized,
81 F: FnOnce(Self::State) -> B,
82 {
83 MapState {
84 stream: self,
85 f: Some(f),
86 }
87 }
88
89 #[inline]
91 fn filter<F>(self, f: F) -> Filter<Self, F>
92 where
93 Self: Sized,
94 F: FnMut(&Self::Item) -> bool,
95 {
96 Filter { stream: self, f: f }
97 }
98
99 #[inline]
101 fn filter_map<F, B>(self, f: F) -> FilterMap<Self, F>
102 where
103 Self: Sized,
104 F: FnMut(Self::Item) -> Option<B>,
105 {
106
107 FilterMap { stream: self, f: f }
108 }
109
110 #[inline]
113 fn collect(self) -> Collect<Self>
114 where
115 Self: Sized,
116 {
117 Collect {
118 stream: self,
119 items: vec![],
120 }
121 }
122
123 #[inline]
125 fn for_each<F>(self, f: F) -> ForEach<Self, F>
126 where
127 Self: Sized,
128 F: FnMut(Self::Item),
129 {
130 ForEach { stream: self, f: f }
131 }
132}
133
134impl<S: ?Sized> StateStream for Box<S>
135where
136 S: StateStream,
137{
138 type Item = S::Item;
139 type State = S::State;
140 type Error = S::Error;
141
142 #[inline]
143 fn poll(&mut self) -> Poll<StreamEvent<S::Item, S::State>, (S::Error, S::State)> {
144 S::poll(self)
145 }
146}
147
148impl<'a, S: ?Sized> StateStream for &'a mut S
149where
150 S: StateStream,
151{
152 type Item = S::Item;
153 type State = S::State;
154 type Error = S::Error;
155
156 #[inline]
157 fn poll(&mut self) -> Poll<StreamEvent<S::Item, S::State>, (S::Error, S::State)> {
158 S::poll(self)
159 }
160}
161
162impl<S> StateStream for AssertUnwindSafe<S>
163where
164 S: StateStream,
165{
166 type Item = S::Item;
167 type State = S::State;
168 type Error = S::Error;
169
170 #[inline]
171 fn poll(&mut self) -> Poll<StreamEvent<S::Item, S::State>, (S::Error, S::State)> {
172 self.0.poll()
173 }
174}
175
176pub trait FutureExt: Future {
178 #[inline]
180 fn flatten_state_stream<E, S>(self) -> FlattenStateStream<Self>
181 where
182 Self: Sized + Future<Error = (E, S)>,
183 Self::Item: StateStream<State = S, Error = E>,
184 {
185 FlattenStateStream(FlattenStateStreamState::Future(self))
186 }
187}
188
189impl<F: ?Sized> FutureExt for F
190where
191 F: Future,
192{
193}
194
195pub trait StreamExt: Stream {
197 #[inline]
199 fn into_state_stream<S>(self, state: S) -> FromStream<Self, S>
200 where
201 Self: Sized,
202 {
203 FromStream(self, Some(state))
204 }
205}
206
207impl<S: ?Sized> StreamExt for S
208where
209 S: Stream,
210{
211}
212
213#[inline]
215pub fn unfold<T, F, Fut, It, St>(init: T, f: F) -> Unfold<T, F, Fut>
216where
217 F: FnMut(T) -> Fut,
218 Fut: futures::IntoFuture<Item = StreamEvent<(It, T), St>>,
219{
220 Unfold {
221 state: UnfoldState::Ready(init),
222 f: f,
223 }
224}
225
226pub struct FromStream<S, T>(S, Option<T>);
228
229impl<S, T> StateStream for FromStream<S, T>
230where
231 S: Stream,
232{
233 type Item = S::Item;
234 type State = T;
235 type Error = S::Error;
236
237 #[inline]
238 fn poll(&mut self) -> Poll<StreamEvent<S::Item, T>, (S::Error, T)> {
239 self.0
240 .poll()
241 .map(|a| match a {
242 Async::Ready(Some(i)) => Async::Ready(StreamEvent::Next(i)),
243 Async::Ready(None) => Async::Ready(StreamEvent::Done(
244 self.1.take().expect("poll called after completion"),
245 )),
246 Async::NotReady => Async::NotReady,
247 })
248 .map_err(|e| {
249 (e, self.1.take().expect("poll called after completion"))
250 })
251 }
252}
253
254pub struct IntoFuture<S>(Option<S>);
256
257impl<S> Future for IntoFuture<S>
258where
259 S: StateStream,
260{
261 type Item = (StreamEvent<S::Item, S::State>, S);
262 type Error = (S::Error, S::State, S);
263
264 #[inline]
265 fn poll(&mut self) -> Poll<(StreamEvent<S::Item, S::State>, S), (S::Error, S::State, S)> {
266 let item = match self.0.as_mut().expect("polling IntoFuture twice").poll() {
267 Ok(Async::NotReady) => return Ok(Async::NotReady),
268 Ok(Async::Ready(i)) => Ok(i),
269 Err(e) => Err(e),
270 };
271 let stream = self.0.take().unwrap();
272 match item {
273 Ok(i) => Ok(Async::Ready((i, stream))),
274 Err((e, s)) => Err((e, s, stream)),
275 }
276 }
277}
278
279pub struct IntoStream<S>(S);
281
282impl<S> Stream for IntoStream<S>
283where
284 S: StateStream,
285{
286 type Item = S::Item;
287 type Error = S::Error;
288
289 #[inline]
290 fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
291 match self.0.poll() {
292 Ok(Async::Ready(StreamEvent::Next(i))) => Ok(Async::Ready(Some(i))),
293 Ok(Async::Ready(StreamEvent::Done(_))) => Ok(Async::Ready(None)),
294 Ok(Async::NotReady) => Ok(Async::NotReady),
295 Err((e, _)) => Err(e),
296 }
297 }
298}
299
300pub struct Map<S, F> {
302 stream: S,
303 f: F,
304}
305
306impl<S, F, B> StateStream for Map<S, F>
307where
308 S: StateStream,
309 F: FnMut(S::Item) -> B,
310{
311 type Item = B;
312 type State = S::State;
313 type Error = S::Error;
314
315 #[inline]
316 fn poll(&mut self) -> Poll<StreamEvent<B, S::State>, (S::Error, S::State)> {
317 self.stream.poll().map(|a| match a {
318 Async::Ready(StreamEvent::Next(i)) => Async::Ready(StreamEvent::Next((self.f)(i))),
319 Async::Ready(StreamEvent::Done(s)) => Async::Ready(StreamEvent::Done(s)),
320 Async::NotReady => Async::NotReady,
321 })
322 }
323}
324
325pub struct MapErr<S, F> {
327 stream: S,
328 f: F,
329}
330
331impl<S, F, B> StateStream for MapErr<S, F>
332where
333 S: StateStream,
334 F: FnMut(S::Error) -> B,
335{
336 type Item = S::Item;
337 type State = S::State;
338 type Error = B;
339
340 #[inline]
341 fn poll(&mut self) -> Poll<StreamEvent<S::Item, S::State>, (B, S::State)> {
342 match self.stream.poll() {
343 Ok(a) => Ok(a),
344 Err((e, s)) => Err(((self.f)(e), s)),
345 }
346 }
347}
348
349pub struct MapState<S, F> {
351 stream: S,
352 f: Option<F>,
353}
354
355impl<S, F, B> StateStream for MapState<S, F>
356where
357 S: StateStream,
358 F: FnOnce(S::State) -> B,
359{
360 type Item = S::Item;
361 type State = B;
362 type Error = S::Error;
363
364 #[inline]
365 fn poll(&mut self) -> Poll<StreamEvent<S::Item, B>, (S::Error, B)> {
366 self.stream
367 .poll()
368 .map(|a| match a {
369 Async::Ready(StreamEvent::Next(i)) => Async::Ready(StreamEvent::Next(i)),
370 Async::Ready(StreamEvent::Done(s)) => {
371 let f = self.f.take().expect("polled MapState after completion");
372 Async::Ready(StreamEvent::Done(f(s)))
373 }
374 Async::NotReady => Async::NotReady,
375 })
376 .map_err(|(e, s)| {
377 let f = self.f.take().expect("polled MapState after completion");
378 (e, f(s))
379 })
380 }
381}
382
383pub struct Collect<S>
385where
386 S: StateStream,
387{
388 stream: S,
389 items: Vec<S::Item>,
390}
391
392impl<S> Future for Collect<S>
393where
394 S: StateStream,
395{
396 type Item = (Vec<S::Item>, S::State);
397 type Error = (S::Error, S::State);
398
399 #[inline]
400 fn poll(&mut self) -> Poll<(Vec<S::Item>, S::State), (S::Error, S::State)> {
401 loop {
402 match self.stream.poll() {
403 Ok(Async::Ready(StreamEvent::Next(i))) => self.items.push(i),
404 Ok(Async::Ready(StreamEvent::Done(s))) => {
405 let items = mem::replace(&mut self.items, vec![]);
406 return Ok(Async::Ready((items, s)));
407 }
408 Ok(Async::NotReady) => return Ok(Async::NotReady),
409 Err(e) => return Err(e),
410 }
411 }
412 }
413}
414
415pub struct Unfold<T, F, Fut>
417where
418 Fut: futures::IntoFuture,
419{
420 state: UnfoldState<T, Fut::Future>,
421 f: F,
422}
423
424enum UnfoldState<T, F> {
425 Empty,
426 Ready(T),
427 Processing(F),
428}
429
430impl<T, F, Fut, It, St, E> StateStream for Unfold<T, F, Fut>
431 where F: FnMut(T) -> Fut,
432 Fut: futures::IntoFuture<Item = StreamEvent<(It, T), St>, Error = (E, St)>
433{
434 type Item = It;
435 type State = St;
436 type Error = E;
437
438 #[inline]
439 fn poll(&mut self) -> Poll<StreamEvent<It, St>, (E, St)> {
440 loop {
441 match mem::replace(&mut self.state, UnfoldState::Empty) {
442 UnfoldState::Empty => panic!("polled an Unfold after completion"),
443 UnfoldState::Ready(state) => {
444 self.state = UnfoldState::Processing((self.f)(state).into_future())
445 }
446 UnfoldState::Processing(mut fut) => {
447 match try!(fut.poll()) {
448 Async::Ready(StreamEvent::Next((i, state))) => {
449 self.state = UnfoldState::Ready(state);
450 return Ok(Async::Ready(StreamEvent::Next(i)));
451 }
452 Async::Ready(StreamEvent::Done(s)) => {
453 return Ok(Async::Ready(StreamEvent::Done(s)));
454 }
455 Async::NotReady => {
456 self.state = UnfoldState::Processing(fut);
457 return Ok(Async::NotReady);
458 }
459 }
460 }
461 }
462 }
463 }
464}
465
466enum FlattenStateStreamState<F>
467where
468 F: Future,
469{
470 Future(F),
471 Stream(F::Item),
472}
473
474pub struct FlattenStateStream<F>(FlattenStateStreamState<F>)
476where
477 F: Future;
478
479impl<F, E, S> StateStream for FlattenStateStream<F>
480where
481 F: Future<Error = (E, S)>,
482 F::Item: StateStream<Error = E, State = S>,
483{
484 type Item = <F::Item as StateStream>::Item;
485 type State = S;
486 type Error = E;
487
488 #[inline]
489 fn poll(&mut self) -> Poll<StreamEvent<Self::Item, Self::State>, (Self::Error, Self::State)> {
490 loop {
491 self.0 = match self.0 {
492 FlattenStateStreamState::Future(ref mut f) => {
493 match f.poll() {
494 Ok(Async::NotReady) => return Ok(Async::NotReady),
495 Ok(Async::Ready(stream)) => FlattenStateStreamState::Stream(stream),
496 Err(e) => return Err(e),
497 }
498 }
499 FlattenStateStreamState::Stream(ref mut s) => return s.poll(),
500 };
501 }
502 }
503}
504
505pub struct Filter<S, F> {
507 stream: S,
508 f: F,
509}
510
511impl<S, F> StateStream for Filter<S, F>
512where
513 S: StateStream,
514 F: FnMut(&S::Item) -> bool,
515{
516 type Item = S::Item;
517 type State = S::State;
518 type Error = S::Error;
519
520 #[inline]
521 fn poll(&mut self) -> Poll<StreamEvent<S::Item, S::State>, (S::Error, S::State)> {
522 loop {
523 match self.stream.poll() {
524 Ok(Async::Ready(StreamEvent::Next(i))) => {
525 if (self.f)(&i) {
526 return Ok(Async::Ready(StreamEvent::Next(i)));
527 }
528 }
529 s => return s,
530 }
531 }
532 }
533}
534
535pub struct FilterMap<S, F> {
537 stream: S,
538 f: F,
539}
540
541impl<S, F, B> StateStream for FilterMap<S, F>
542where
543 S: StateStream,
544 F: FnMut(S::Item) -> Option<B>,
545{
546 type Item = B;
547 type State = S::State;
548 type Error = S::Error;
549
550 #[inline]
551 fn poll(&mut self) -> Poll<StreamEvent<B, S::State>, (S::Error, S::State)> {
552 loop {
553 match try!(self.stream.poll()) {
554 Async::Ready(StreamEvent::Next(i)) => {
555 if let Some(i) = (self.f)(i) {
556 return Ok(Async::Ready(StreamEvent::Next(i)));
557 }
558 }
559 Async::Ready(StreamEvent::Done(s)) => {
560 return Ok(Async::Ready(StreamEvent::Done(s)));
561 }
562 Async::NotReady => return Ok(Async::NotReady),
563 }
564 }
565 }
566}
567
568pub struct ForEach<S, F> {
570 stream: S,
571 f: F,
572}
573
574impl<S, F> Future for ForEach<S, F>
575where
576 S: StateStream,
577 F: FnMut(S::Item),
578{
579 type Item = S::State;
580 type Error = (S::Error, S::State);
581
582 #[inline]
583 fn poll(&mut self) -> Poll<S::State, (S::Error, S::State)> {
584 loop {
585 match try_ready!(self.stream.poll()) {
586 StreamEvent::Next(i) => (self.f)(i),
587 StreamEvent::Done(s) => return Ok(Async::Ready(s)),
588 }
589 }
590 }
591}