use futures::future::FusedFuture;
use futures::stream::FusedStream;
use futures::Stream;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll, Waker};
pub struct Optional<T> {
task: Option<T>,
waker: Option<Waker>,
}
impl<T: Unpin> Unpin for Optional<T> {}
impl<T> Default for Optional<T> {
fn default() -> Self {
Self {
task: None,
waker: None,
}
}
}
impl<T> From<Option<T>> for Optional<T> {
fn from(task: Option<T>) -> Self {
Self { task, waker: None }
}
}
impl<T> From<T> for Optional<T> {
fn from(fut: T) -> Self {
Self {
task: Some(fut),
waker: None,
}
}
}
impl<T> Optional<T> {
pub fn new(task: T) -> Self {
Self {
task: Some(task),
waker: None,
}
}
pub fn with_future(future: T) -> Self
where
T: Future,
{
Self::new(future)
}
pub fn with_stream(stream: T) -> Self
where
T: Stream,
{
Self::new(stream)
}
pub fn take(&mut self) -> Option<T> {
let fut = self.task.take();
if let Some(waker) = self.waker.take() {
waker.wake();
}
fut
}
pub fn is_some(&self) -> bool {
self.task.is_some()
}
pub fn is_none(&self) -> bool {
self.task.is_none()
}
pub fn as_ref(&self) -> Option<&T> {
self.task.as_ref()
}
pub fn as_mut(&mut self) -> Option<&mut T> {
self.task.as_mut()
}
pub fn replace(&mut self, task: T) -> Option<T> {
let fut = self.task.replace(task);
if let Some(waker) = self.waker.take() {
waker.wake();
}
fut
}
pub fn as_pin_mut(&mut self) -> Option<Pin<&mut T>>
where
T: Unpin,
{
self.task.as_mut().map(Pin::new)
}
pub fn as_pin_ref(&self) -> Option<Pin<&T>>
where
T: Unpin,
{
self.task.as_ref().map(Pin::new)
}
}
impl<F> Future for Optional<F>
where
F: Future + Unpin,
{
type Output = F::Output;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let Some(future) = self.as_pin_mut() else {
self.waker.replace(cx.waker().clone());
return Poll::Pending;
};
match future.poll(cx) {
Poll::Ready(output) => {
self.task.take();
Poll::Ready(output)
}
Poll::Pending => {
self.waker.replace(cx.waker().clone());
Poll::Pending
}
}
}
}
impl<F: Future> FusedFuture for Optional<F>
where
F: Future + Unpin,
{
fn is_terminated(&self) -> bool {
self.task.is_none()
}
}
impl<S> Stream for Optional<S>
where
S: Stream + Unpin,
{
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let Some(stream) = self.as_pin_mut() else {
self.waker.replace(cx.waker().clone());
return Poll::Pending;
};
match stream.poll_next(cx) {
Poll::Ready(Some(output)) => Poll::Ready(Some(output)),
Poll::Ready(None) => {
self.task.take();
Poll::Ready(None)
}
Poll::Pending => {
self.waker.replace(cx.waker().clone());
Poll::Pending
}
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
match self.task.as_ref() {
Some(st) => st.size_hint(),
None => (0, Some(0)),
}
}
}
impl<S> FusedStream for Optional<S>
where
S: Stream + Unpin,
{
fn is_terminated(&self) -> bool {
self.task.is_none()
}
}
#[cfg(test)]
mod test {
use super::*;
use futures::StreamExt;
#[test]
fn test_optional_future() {
let mut future = Optional::new(futures::future::ready(0));
assert!(future.is_some());
let waker = futures::task::noop_waker_ref();
let val = Pin::new(&mut future).poll(&mut Context::from_waker(waker));
assert_eq!(val, Poll::Ready(0));
assert!(future.is_none());
}
#[test]
fn reusable_optional_future() {
let mut future = Optional::new(futures::future::ready(0));
assert!(future.is_some());
let waker = futures::task::noop_waker_ref();
let val = Pin::new(&mut future).poll(&mut Context::from_waker(waker));
assert_eq!(val, Poll::Ready(0));
assert!(future.is_none());
future.replace(futures::future::ready(1));
assert!(future.is_some());
let val = Pin::new(&mut future).poll(&mut Context::from_waker(waker));
assert_eq!(val, Poll::Ready(1));
assert!(future.is_none());
}
#[test]
fn convert_future_to_optional_future() {
let fut = futures::future::ready(0);
let mut future = Optional::from(fut);
assert!(future.is_some());
let waker = futures::task::noop_waker_ref();
let val = Pin::new(&mut future).poll(&mut Context::from_waker(waker));
assert_eq!(val, Poll::Ready(0));
assert!(future.is_none());
}
#[test]
fn test_optional_stream() {
let mut stream = Optional::new(futures::stream::once(async { 0 }).boxed());
assert!(stream.is_some());
let waker = futures::task::noop_waker_ref();
let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
assert_eq!(val, Poll::Ready(Some(0)));
assert!(stream.is_some());
let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
assert_eq!(val, Poll::Ready(None));
assert!(stream.is_none());
}
#[test]
fn reusable_optional_stream() {
let mut stream = Optional::new(futures::stream::once(async { 0 }).boxed());
assert!(stream.is_some());
let waker = futures::task::noop_waker_ref();
let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
assert_eq!(val, Poll::Ready(Some(0)));
assert!(stream.is_some());
let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
assert_eq!(val, Poll::Ready(None));
assert!(stream.is_none());
stream.replace(futures::stream::once(async { 1 }).boxed());
assert!(stream.is_some());
let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
assert_eq!(val, Poll::Ready(Some(1)));
assert!(stream.is_some());
let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
assert_eq!(val, Poll::Ready(None));
assert!(stream.is_none());
}
#[test]
fn convert_stream_to_optional_stream() {
let st = futures::stream::once(async { 0 }).boxed();
let mut stream = Optional::from(st);
assert!(stream.is_some());
let waker = futures::task::noop_waker_ref();
let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
assert_eq!(val, Poll::Ready(Some(0)));
assert!(stream.is_some());
let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
assert_eq!(val, Poll::Ready(None));
assert!(stream.is_none());
}
}