use futures::prelude::*;
use futures::task::{Waker, Poll, Context};
use futures::stream::{BoxStream};
use std::pin::*;
use std::sync::*;
struct SwitchingStreamCore<TValue> {
stream: BoxStream<'static, TValue>,
closed: bool,
switch_available: bool,
wake: Option<Waker>
}
pub struct SwitchingStream<TValue> {
core: Arc<Mutex<SwitchingStreamCore<TValue>>>
}
pub struct StreamSwitch<TValue> {
core: Arc<Mutex<SwitchingStreamCore<TValue>>>
}
impl<TValue> Stream for SwitchingStream<TValue> {
type Item = TValue;
fn poll_next(self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<TValue>> {
let mut core = self.core.lock().unwrap();
core.wake = Some(context.waker().clone());
if core.closed {
if !core.switch_available {
Poll::Ready(None)
} else {
Poll::Pending
}
} else {
let result = core.stream.poll_next_unpin(context);
match result {
Poll::Ready(None) => {
core.closed = true;
if !core.switch_available {
Poll::Ready(None)
} else {
Poll::Pending
}
}
_ => result
}
}
}
}
impl<TValue> StreamSwitch<TValue> {
pub fn switch_to_stream<TStream: 'static+Send+Stream<Item=TValue>>(&self, new_stream: TStream) {
let waker = {
let mut core = self.core.lock().unwrap();
core.closed = false;
core.stream = new_stream.boxed();
core.wake.take()
};
waker.map(|waker| waker.wake());
}
}
impl<TValue> Drop for StreamSwitch<TValue> {
fn drop(&mut self) {
let waker = {
let mut core = self.core.lock().unwrap();
core.switch_available = false;
core.wake.take()
};
waker.map(|waker| waker.wake());
}
}
pub fn switchable_stream<TStream: 'static+Send+Stream>(stream: TStream) -> (SwitchingStream<TStream::Item>, StreamSwitch<TStream::Item>) {
let core = SwitchingStreamCore {
stream: stream.boxed(),
closed: false,
switch_available: true,
wake: None
};
let core = Arc::new(Mutex::new(core));
let stream = SwitchingStream { core: Arc::clone(&core) };
let switch = StreamSwitch { core: core };
(stream, switch)
}