Skip to main content

streamify/
future.rs

1use futures_core::stream::Stream;
2use std::future::Future;
3use std::marker::Unpin;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7pub struct StreamifyFuture<T: Future> {
8  inner: T,
9  completed: bool,
10}
11
12impl<I: Future> Unpin for StreamifyFuture<I> {}
13
14impl<T: Future> StreamifyFuture<T> {
15  pub fn new(inner: T) -> Self {
16    StreamifyFuture {
17      inner,
18      completed: false,
19    }
20  }
21}
22
23impl<T: Future> Stream for StreamifyFuture<T> {
24  type Item = <T as Future>::Output;
25
26  fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
27    if self.completed {
28      return Poll::Ready(None);
29    }
30
31    let pinned_future = unsafe { Pin::new_unchecked(&mut self.inner) };
32
33    let status = pinned_future.poll(cx);
34    if let Poll::Ready(val) = status {
35      self.completed = true;
36      Poll::Ready(Some(val))
37    } else {
38      Poll::Pending
39    }
40  }
41
42  fn size_hint(&self) -> (usize, Option<usize>) {
43    (1, Some(1))
44  }
45}
46
47pub fn from_future<T: Future>(f: T) -> StreamifyFuture<T> {
48  StreamifyFuture::new(f)
49}