futures_state_stream/
lib.rs

1//! A version of the `futures` crate's `Stream` which returns a state value when it completes.
2//!
3//! This is useful in contexts where a value "becomes" a stream for a period of time, but then
4//! switches back to its original type.
5#![warn(missing_docs)]
6#![doc(html_root_url="https://docs.rs/futures-state-stream/0.2")]
7
8#[macro_use]
9extern crate futures;
10
11use futures::{Async, Poll, Future, Stream};
12use std::mem;
13use std::panic::AssertUnwindSafe;
14
15/// An event from a `StateStream`.
16pub enum StreamEvent<I, S> {
17    /// The next item in the stream.
18    Next(I),
19    /// The state of the stream, returned when streaming has completed.
20    Done(S),
21}
22
23/// A variant of `futures::Stream` which returns a state value when it completes.
24pub trait StateStream {
25    /// The items being streamed.
26    type Item;
27    /// The state returned when streaming completes.
28    type State;
29    /// The error returned.
30    type Error;
31
32    /// Similar to `Stream::poll`.
33    ///
34    /// The end of a stream is indicated by a `StreamEvent::Done` value. The result of calling
35    /// `poll` after the end of the stream or an error has been reached is unspecified.
36    fn poll(&mut self) -> Poll<StreamEvent<Self::Item, Self::State>, (Self::Error, Self::State)>;
37
38    /// Returns a future which yields the next element of the stream.
39    #[inline]
40    fn into_future(self) -> IntoFuture<Self>
41    where
42        Self: Sized,
43    {
44        IntoFuture(Some(self))
45    }
46
47    /// Returns a normal `Stream` which yields the elements of this stream and discards the state.
48    #[inline]
49    fn into_stream(self) -> IntoStream<Self>
50    where
51        Self: Sized,
52    {
53        IntoStream(self)
54    }
55
56    /// Returns a stream which applies a transform to items of this stream.
57    #[inline]
58    fn map<F, B>(self, f: F) -> Map<Self, F>
59    where
60        Self: Sized,
61        F: FnMut(Self::Item) -> B,
62    {
63        Map { stream: self, f: f }
64    }
65
66    /// Returns a stream which applies a transform to errors of this stream.
67    #[inline]
68    fn map_err<F, B>(self, f: F) -> MapErr<Self, F>
69    where
70        Self: Sized,
71        F: FnMut(Self::Error) -> B,
72    {
73        MapErr { stream: self, f: f }
74    }
75
76    /// Returns a stream which applies a transform to the state of this stream.
77    #[inline]
78    fn map_state<F, B>(self, f: F) -> MapState<Self, F>
79    where
80        Self: Sized,
81        F: FnOnce(Self::State) -> B,
82    {
83        MapState {
84            stream: self,
85            f: Some(f),
86        }
87    }
88
89    /// Returns a stream which filters items of this stream by a predicate.
90    #[inline]
91    fn filter<F>(self, f: F) -> Filter<Self, F>
92    where
93        Self: Sized,
94        F: FnMut(&Self::Item) -> bool,
95    {
96        Filter { stream: self, f: f }
97    }
98
99    /// Returns a stream which filters and transforms items of this stream by a predicate.
100    #[inline]
101    fn filter_map<F, B>(self, f: F) -> FilterMap<Self, F>
102    where
103        Self: Sized,
104        F: FnMut(Self::Item) -> Option<B>,
105    {
106
107        FilterMap { stream: self, f: f }
108    }
109
110    /// Returns a stream which collects all items of this stream into a `Vec`, returning it along
111    /// with the stream's state.
112    #[inline]
113    fn collect(self) -> Collect<Self>
114    where
115        Self: Sized,
116    {
117        Collect {
118            stream: self,
119            items: vec![],
120        }
121    }
122
123    /// Returns a future which applies a closure to each item of this stream.
124    #[inline]
125    fn for_each<F>(self, f: F) -> ForEach<Self, F>
126    where
127        Self: Sized,
128        F: FnMut(Self::Item),
129    {
130        ForEach { stream: self, f: f }
131    }
132}
133
134impl<S: ?Sized> StateStream for Box<S>
135where
136    S: StateStream,
137{
138    type Item = S::Item;
139    type State = S::State;
140    type Error = S::Error;
141
142    #[inline]
143    fn poll(&mut self) -> Poll<StreamEvent<S::Item, S::State>, (S::Error, S::State)> {
144        S::poll(self)
145    }
146}
147
148impl<'a, S: ?Sized> StateStream for &'a mut S
149where
150    S: StateStream,
151{
152    type Item = S::Item;
153    type State = S::State;
154    type Error = S::Error;
155
156    #[inline]
157    fn poll(&mut self) -> Poll<StreamEvent<S::Item, S::State>, (S::Error, S::State)> {
158        S::poll(self)
159    }
160}
161
162impl<S> StateStream for AssertUnwindSafe<S>
163where
164    S: StateStream,
165{
166    type Item = S::Item;
167    type State = S::State;
168    type Error = S::Error;
169
170    #[inline]
171    fn poll(&mut self) -> Poll<StreamEvent<S::Item, S::State>, (S::Error, S::State)> {
172        self.0.poll()
173    }
174}
175
176/// An extension trait adding functionality to `Future`.
177pub trait FutureExt: Future {
178    /// Returns a stream which evaluates this future and then the resulting stream.
179    #[inline]
180    fn flatten_state_stream<E, S>(self) -> FlattenStateStream<Self>
181    where
182        Self: Sized + Future<Error = (E, S)>,
183        Self::Item: StateStream<State = S, Error = E>,
184    {
185        FlattenStateStream(FlattenStateStreamState::Future(self))
186    }
187}
188
189impl<F: ?Sized> FutureExt for F
190where
191    F: Future,
192{
193}
194
195/// An extension trait adding functionality to `Stream`.
196pub trait StreamExt: Stream {
197    /// Returns a `StateStream` yielding the items of this stream with a state.
198    #[inline]
199    fn into_state_stream<S>(self, state: S) -> FromStream<Self, S>
200    where
201        Self: Sized,
202    {
203        FromStream(self, Some(state))
204    }
205}
206
207impl<S: ?Sized> StreamExt for S
208where
209    S: Stream,
210{
211}
212
213/// Returns a stream produced by iteratively applying a closure to a state.
214#[inline]
215pub fn unfold<T, F, Fut, It, St>(init: T, f: F) -> Unfold<T, F, Fut>
216where
217    F: FnMut(T) -> Fut,
218    Fut: futures::IntoFuture<Item = StreamEvent<(It, T), St>>,
219{
220    Unfold {
221        state: UnfoldState::Ready(init),
222        f: f,
223    }
224}
225
226/// A `StateStream` yielding elements of a normal `Stream.
227pub struct FromStream<S, T>(S, Option<T>);
228
229impl<S, T> StateStream for FromStream<S, T>
230where
231    S: Stream,
232{
233    type Item = S::Item;
234    type State = T;
235    type Error = S::Error;
236
237    #[inline]
238    fn poll(&mut self) -> Poll<StreamEvent<S::Item, T>, (S::Error, T)> {
239        self.0
240            .poll()
241            .map(|a| match a {
242                Async::Ready(Some(i)) => Async::Ready(StreamEvent::Next(i)),
243                Async::Ready(None) => Async::Ready(StreamEvent::Done(
244                    self.1.take().expect("poll called after completion"),
245                )),
246                Async::NotReady => Async::NotReady,
247            })
248            .map_err(|e| {
249                (e, self.1.take().expect("poll called after completion"))
250            })
251    }
252}
253
254/// A future yielding the next element of a stream.
255pub struct IntoFuture<S>(Option<S>);
256
257impl<S> Future for IntoFuture<S>
258where
259    S: StateStream,
260{
261    type Item = (StreamEvent<S::Item, S::State>, S);
262    type Error = (S::Error, S::State, S);
263
264    #[inline]
265    fn poll(&mut self) -> Poll<(StreamEvent<S::Item, S::State>, S), (S::Error, S::State, S)> {
266        let item = match self.0.as_mut().expect("polling IntoFuture twice").poll() {
267            Ok(Async::NotReady) => return Ok(Async::NotReady),
268            Ok(Async::Ready(i)) => Ok(i),
269            Err(e) => Err(e),
270        };
271        let stream = self.0.take().unwrap();
272        match item {
273            Ok(i) => Ok(Async::Ready((i, stream))),
274            Err((e, s)) => Err((e, s, stream)),
275        }
276    }
277}
278
279/// A `Stream` yielding the elements of a `StateStream` and discarding its state.
280pub struct IntoStream<S>(S);
281
282impl<S> Stream for IntoStream<S>
283where
284    S: StateStream,
285{
286    type Item = S::Item;
287    type Error = S::Error;
288
289    #[inline]
290    fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
291        match self.0.poll() {
292            Ok(Async::Ready(StreamEvent::Next(i))) => Ok(Async::Ready(Some(i))),
293            Ok(Async::Ready(StreamEvent::Done(_))) => Ok(Async::Ready(None)),
294            Ok(Async::NotReady) => Ok(Async::NotReady),
295            Err((e, _)) => Err(e),
296        }
297    }
298}
299
300/// A stream which applies a transform to the elements of a stream.
301pub struct Map<S, F> {
302    stream: S,
303    f: F,
304}
305
306impl<S, F, B> StateStream for Map<S, F>
307where
308    S: StateStream,
309    F: FnMut(S::Item) -> B,
310{
311    type Item = B;
312    type State = S::State;
313    type Error = S::Error;
314
315    #[inline]
316    fn poll(&mut self) -> Poll<StreamEvent<B, S::State>, (S::Error, S::State)> {
317        self.stream.poll().map(|a| match a {
318            Async::Ready(StreamEvent::Next(i)) => Async::Ready(StreamEvent::Next((self.f)(i))),
319            Async::Ready(StreamEvent::Done(s)) => Async::Ready(StreamEvent::Done(s)),
320            Async::NotReady => Async::NotReady,
321        })
322    }
323}
324
325/// A stream which applies a transform to the errors of a stream.
326pub struct MapErr<S, F> {
327    stream: S,
328    f: F,
329}
330
331impl<S, F, B> StateStream for MapErr<S, F>
332where
333    S: StateStream,
334    F: FnMut(S::Error) -> B,
335{
336    type Item = S::Item;
337    type State = S::State;
338    type Error = B;
339
340    #[inline]
341    fn poll(&mut self) -> Poll<StreamEvent<S::Item, S::State>, (B, S::State)> {
342        match self.stream.poll() {
343            Ok(a) => Ok(a),
344            Err((e, s)) => Err(((self.f)(e), s)),
345        }
346    }
347}
348
349/// A stream which applies a transform to the state of a stream.
350pub struct MapState<S, F> {
351    stream: S,
352    f: Option<F>,
353}
354
355impl<S, F, B> StateStream for MapState<S, F>
356where
357    S: StateStream,
358    F: FnOnce(S::State) -> B,
359{
360    type Item = S::Item;
361    type State = B;
362    type Error = S::Error;
363
364    #[inline]
365    fn poll(&mut self) -> Poll<StreamEvent<S::Item, B>, (S::Error, B)> {
366        self.stream
367            .poll()
368            .map(|a| match a {
369                Async::Ready(StreamEvent::Next(i)) => Async::Ready(StreamEvent::Next(i)),
370                Async::Ready(StreamEvent::Done(s)) => {
371                    let f = self.f.take().expect("polled MapState after completion");
372                    Async::Ready(StreamEvent::Done(f(s)))
373                }
374                Async::NotReady => Async::NotReady,
375            })
376            .map_err(|(e, s)| {
377                let f = self.f.take().expect("polled MapState after completion");
378                (e, f(s))
379            })
380    }
381}
382
383/// A future which collects the items of a stream.
384pub struct Collect<S>
385where
386    S: StateStream,
387{
388    stream: S,
389    items: Vec<S::Item>,
390}
391
392impl<S> Future for Collect<S>
393where
394    S: StateStream,
395{
396    type Item = (Vec<S::Item>, S::State);
397    type Error = (S::Error, S::State);
398
399    #[inline]
400    fn poll(&mut self) -> Poll<(Vec<S::Item>, S::State), (S::Error, S::State)> {
401        loop {
402            match self.stream.poll() {
403                Ok(Async::Ready(StreamEvent::Next(i))) => self.items.push(i),
404                Ok(Async::Ready(StreamEvent::Done(s))) => {
405                    let items = mem::replace(&mut self.items, vec![]);
406                    return Ok(Async::Ready((items, s)));
407                }
408                Ok(Async::NotReady) => return Ok(Async::NotReady),
409                Err(e) => return Err(e),
410            }
411        }
412    }
413}
414
415/// A stream which iteratively applies a closure to state to produce items.
416pub struct Unfold<T, F, Fut>
417where
418    Fut: futures::IntoFuture,
419{
420    state: UnfoldState<T, Fut::Future>,
421    f: F,
422}
423
424enum UnfoldState<T, F> {
425    Empty,
426    Ready(T),
427    Processing(F),
428}
429
430impl<T, F, Fut, It, St, E> StateStream for Unfold<T, F, Fut>
431    where F: FnMut(T) -> Fut,
432          Fut: futures::IntoFuture<Item = StreamEvent<(It, T), St>, Error = (E, St)>
433{
434    type Item = It;
435    type State = St;
436    type Error = E;
437
438    #[inline]
439    fn poll(&mut self) -> Poll<StreamEvent<It, St>, (E, St)> {
440        loop {
441            match mem::replace(&mut self.state, UnfoldState::Empty) {
442                UnfoldState::Empty => panic!("polled an Unfold after completion"),
443                UnfoldState::Ready(state) => {
444                    self.state = UnfoldState::Processing((self.f)(state).into_future())
445                }
446                UnfoldState::Processing(mut fut) => {
447                    match try!(fut.poll()) {
448                        Async::Ready(StreamEvent::Next((i, state))) => {
449                            self.state = UnfoldState::Ready(state);
450                            return Ok(Async::Ready(StreamEvent::Next(i)));
451                        }
452                        Async::Ready(StreamEvent::Done(s)) => {
453                            return Ok(Async::Ready(StreamEvent::Done(s)));
454                        }
455                        Async::NotReady => {
456                            self.state = UnfoldState::Processing(fut);
457                            return Ok(Async::NotReady);
458                        }
459                    }
460                }
461            }
462        }
463    }
464}
465
466enum FlattenStateStreamState<F>
467where
468    F: Future,
469{
470    Future(F),
471    Stream(F::Item),
472}
473
474/// A stream which evaluates a future and then then stream returned by it.
475pub struct FlattenStateStream<F>(FlattenStateStreamState<F>)
476where
477    F: Future;
478
479impl<F, E, S> StateStream for FlattenStateStream<F>
480where
481    F: Future<Error = (E, S)>,
482    F::Item: StateStream<Error = E, State = S>,
483{
484    type Item = <F::Item as StateStream>::Item;
485    type State = S;
486    type Error = E;
487
488    #[inline]
489    fn poll(&mut self) -> Poll<StreamEvent<Self::Item, Self::State>, (Self::Error, Self::State)> {
490        loop {
491            self.0 = match self.0 {
492                FlattenStateStreamState::Future(ref mut f) => {
493                    match f.poll() {
494                        Ok(Async::NotReady) => return Ok(Async::NotReady),
495                        Ok(Async::Ready(stream)) => FlattenStateStreamState::Stream(stream),
496                        Err(e) => return Err(e),
497                    }
498                }
499                FlattenStateStreamState::Stream(ref mut s) => return s.poll(),
500            };
501        }
502    }
503}
504
505/// A stream which applies a filter to the items of a stream.
506pub struct Filter<S, F> {
507    stream: S,
508    f: F,
509}
510
511impl<S, F> StateStream for Filter<S, F>
512where
513    S: StateStream,
514    F: FnMut(&S::Item) -> bool,
515{
516    type Item = S::Item;
517    type State = S::State;
518    type Error = S::Error;
519
520    #[inline]
521    fn poll(&mut self) -> Poll<StreamEvent<S::Item, S::State>, (S::Error, S::State)> {
522        loop {
523            match self.stream.poll() {
524                Ok(Async::Ready(StreamEvent::Next(i))) => {
525                    if (self.f)(&i) {
526                        return Ok(Async::Ready(StreamEvent::Next(i)));
527                    }
528                }
529                s => return s,
530            }
531        }
532    }
533}
534
535/// A stream which applies a filter and transform to items of a stream.
536pub struct FilterMap<S, F> {
537    stream: S,
538    f: F,
539}
540
541impl<S, F, B> StateStream for FilterMap<S, F>
542where
543    S: StateStream,
544    F: FnMut(S::Item) -> Option<B>,
545{
546    type Item = B;
547    type State = S::State;
548    type Error = S::Error;
549
550    #[inline]
551    fn poll(&mut self) -> Poll<StreamEvent<B, S::State>, (S::Error, S::State)> {
552        loop {
553            match try!(self.stream.poll()) {
554                Async::Ready(StreamEvent::Next(i)) => {
555                    if let Some(i) = (self.f)(i) {
556                        return Ok(Async::Ready(StreamEvent::Next(i)));
557                    }
558                }
559                Async::Ready(StreamEvent::Done(s)) => {
560                    return Ok(Async::Ready(StreamEvent::Done(s)));
561                }
562                Async::NotReady => return Ok(Async::NotReady),
563            }
564        }
565    }
566}
567
568/// A future which applies a closure to each item of a stream.
569pub struct ForEach<S, F> {
570    stream: S,
571    f: F,
572}
573
574impl<S, F> Future for ForEach<S, F>
575where
576    S: StateStream,
577    F: FnMut(S::Item),
578{
579    type Item = S::State;
580    type Error = (S::Error, S::State);
581
582    #[inline]
583    fn poll(&mut self) -> Poll<S::State, (S::Error, S::State)> {
584        loop {
585            match try_ready!(self.stream.poll()) {
586                StreamEvent::Next(i) => (self.f)(i),
587                StreamEvent::Done(s) => return Ok(Async::Ready(s)),
588            }
589        }
590    }
591}