use crate::stream::Stream;
use crate::sync::Mutex;
use std::future::Future;
use std::mem;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll, Waker};
#[derive(Debug)]
enum FutureValue<T> {
Pending,
Ready(T),
Finished,
}
#[derive(Debug)]
struct StreamFutureStorage<T> {
value: FutureValue<T>,
waker: Option<Waker>,
}
impl<T> Default for StreamFutureStorage<T> {
fn default() -> Self {
StreamFutureStorage {
value: FutureValue::Pending,
waker: None,
}
}
}
#[derive(Debug)]
pub struct StreamFuture<T> {
storage: Arc<Mutex<StreamFutureStorage<T>>>,
stream: Stream<T>,
}
impl<T: Clone + Send + 'static> StreamFuture<T> {
pub(crate) fn new(stream: Stream<T>) -> Self {
let this = StreamFuture {
storage: Default::default(),
stream,
};
this.register_callback();
this
}
fn register_callback(&self) {
let weak = Arc::downgrade(&self.storage);
self.stream.observe(move |val| {
if let Some(st) = weak.upgrade() {
let mut storage = st.lock();
storage.value = FutureValue::Ready(val.into_owned());
if let Some(waker) = storage.waker.take() {
waker.wake();
}
}
false
});
}
#[inline]
pub fn get_source(&self) -> &Stream<T> {
&self.stream
}
pub fn reload(&self) {
let mut storage = self.storage.lock();
if let FutureValue::Finished = storage.value {
*storage = Default::default();
self.register_callback();
}
}
}
impl<T> Future for StreamFuture<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
let mut storage = self.storage.lock();
match mem::replace(&mut storage.value, FutureValue::Pending) {
FutureValue::Ready(value) => {
storage.value = FutureValue::Finished;
Poll::Ready(value)
}
FutureValue::Pending => {
storage.waker = Some(ctx.waker().clone());
Poll::Pending
}
FutureValue::Finished => {
storage.value = FutureValue::Finished;
panic!("future polled again after completion");
}
}
}
}
impl<T> Unpin for StreamFuture<T> {}
#[cfg(test)]
mod tests {
use super::*;
use crate::stream::Sink;
use futures::executor::block_on;
#[test]
fn basic() {
let sink = Sink::new();
let future = StreamFuture::new(sink.stream());
sink.send(42);
sink.send(13);
assert_eq!(block_on(future), 42);
}
#[test]
#[should_panic]
fn invalid_poll() {
let sink = Sink::new();
let mut future = StreamFuture::new(sink.stream());
sink.send(42);
let _a = block_on(&mut future);
let _b = block_on(&mut future);
}
#[test]
fn reload() {
let sink = Sink::new();
let mut future = StreamFuture::new(sink.stream());
sink.send(42);
assert_eq!(block_on(&mut future), 42);
future.reload();
sink.send(13);
assert_eq!(block_on(&mut future), 13);
}
}