use super::assert_stream;
use crate::stream::{Fuse, StreamExt};
use core::{fmt, pin::Pin};
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
use pin_project_lite::pin_project;
#[derive(Debug, PartialEq, Eq, Copy, Clone, Hash)]
pub enum PollNext {
Left,
Right,
}
impl PollNext {
pub fn toggle(&mut self) -> Self {
let old = *self;
match self {
PollNext::Left => *self = PollNext::Right,
PollNext::Right => *self = PollNext::Left,
}
old
}
}
impl Default for PollNext {
fn default() -> Self {
PollNext::Left
}
}
pin_project! {
#[must_use = "streams do nothing unless polled"]
pub struct SelectWithStrategy<St1, St2, Clos, State> {
#[pin]
stream1: Fuse<St1>,
#[pin]
stream2: Fuse<St2>,
state: State,
clos: Clos,
}
}
pub fn select_with_strategy<St1, St2, Clos, State>(
stream1: St1,
stream2: St2,
which: Clos,
) -> SelectWithStrategy<St1, St2, Clos, State>
where
St1: Stream,
St2: Stream<Item = St1::Item>,
Clos: FnMut(&mut State) -> PollNext,
State: Default,
{
assert_stream::<St1::Item, _>(SelectWithStrategy {
stream1: stream1.fuse(),
stream2: stream2.fuse(),
state: Default::default(),
clos: which,
})
}
impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> {
pub fn get_ref(&self) -> (&St1, &St2) {
(self.stream1.get_ref(), self.stream2.get_ref())
}
pub fn get_mut(&mut self) -> (&mut St1, &mut St2) {
(self.stream1.get_mut(), self.stream2.get_mut())
}
pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) {
let this = self.project();
(this.stream1.get_pin_mut(), this.stream2.get_pin_mut())
}
pub fn into_inner(self) -> (St1, St2) {
(self.stream1.into_inner(), self.stream2.into_inner())
}
}
impl<St1, St2, Clos, State> FusedStream for SelectWithStrategy<St1, St2, Clos, State>
where
St1: Stream,
St2: Stream<Item = St1::Item>,
Clos: FnMut(&mut State) -> PollNext,
{
fn is_terminated(&self) -> bool {
self.stream1.is_terminated() && self.stream2.is_terminated()
}
}
impl<St1, St2, Clos, State> Stream for SelectWithStrategy<St1, St2, Clos, State>
where
St1: Stream,
St2: Stream<Item = St1::Item>,
Clos: FnMut(&mut State) -> PollNext,
{
type Item = St1::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St1::Item>> {
let this = self.project();
match (this.clos)(this.state) {
PollNext::Left => poll_inner(this.stream1, this.stream2, cx),
PollNext::Right => poll_inner(this.stream2, this.stream1, cx),
}
}
}
fn poll_inner<St1, St2>(
a: Pin<&mut St1>,
b: Pin<&mut St2>,
cx: &mut Context<'_>,
) -> Poll<Option<St1::Item>>
where
St1: Stream,
St2: Stream<Item = St1::Item>,
{
let a_done = match a.poll_next(cx) {
Poll::Ready(Some(item)) => return Poll::Ready(Some(item)),
Poll::Ready(None) => true,
Poll::Pending => false,
};
match b.poll_next(cx) {
Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
Poll::Ready(None) if a_done => Poll::Ready(None),
Poll::Ready(None) | Poll::Pending => Poll::Pending,
}
}
impl<St1, St2, Clos, State> fmt::Debug for SelectWithStrategy<St1, St2, Clos, State>
where
St1: fmt::Debug,
St2: fmt::Debug,
State: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SelectWithStrategy")
.field("stream1", &self.stream1)
.field("stream2", &self.stream2)
.field("state", &self.state)
.finish()
}
}