use super::assert_stream;
use core::{fmt, pin::Pin};
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
use pin_project_lite::pin_project;
#[derive(Debug, PartialEq, Eq, Copy, Clone, Hash)]
pub enum PollNext {
Left,
Right,
}
impl PollNext {
pub fn toggle(&mut self) -> Self {
let old = *self;
*self = self.other();
old
}
fn other(&self) -> Self {
match self {
Self::Left => Self::Right,
Self::Right => Self::Left,
}
}
}
impl Default for PollNext {
fn default() -> Self {
Self::Left
}
}
enum InternalState {
Start,
LeftFinished,
RightFinished,
BothFinished,
}
impl InternalState {
fn finish(&mut self, ps: PollNext) {
match (&self, ps) {
(Self::Start, PollNext::Left) => {
*self = Self::LeftFinished;
}
(Self::Start, PollNext::Right) => {
*self = Self::RightFinished;
}
(Self::LeftFinished, PollNext::Right) | (Self::RightFinished, PollNext::Left) => {
*self = Self::BothFinished;
}
_ => {}
}
}
}
pin_project! {
#[must_use = "streams do nothing unless polled"]
#[project = SelectWithStrategyProj]
pub struct SelectWithStrategy<St1, St2, Clos, State> {
#[pin]
stream1: St1,
#[pin]
stream2: St2,
internal_state: InternalState,
state: State,
clos: Clos,
}
}
#[allow(clippy::too_long_first_doc_paragraph)]
pub fn select_with_strategy<St1, St2, Clos, State>(
stream1: St1,
stream2: St2,
which: Clos,
) -> SelectWithStrategy<St1, St2, Clos, State>
where
St1: Stream,
St2: Stream<Item = St1::Item>,
Clos: FnMut(&mut State) -> PollNext,
State: Default,
{
assert_stream::<St1::Item, _>(SelectWithStrategy {
stream1,
stream2,
state: Default::default(),
internal_state: InternalState::Start,
clos: which,
})
}
impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> {
pub fn get_ref(&self) -> (&St1, &St2) {
(&self.stream1, &self.stream2)
}
pub fn get_mut(&mut self) -> (&mut St1, &mut St2) {
(&mut self.stream1, &mut self.stream2)
}
pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) {
let this = self.project();
(this.stream1, this.stream2)
}
pub fn into_inner(self) -> (St1, St2) {
(self.stream1, self.stream2)
}
}
impl<St1, St2, Clos, State> FusedStream for SelectWithStrategy<St1, St2, Clos, State>
where
St1: Stream,
St2: Stream<Item = St1::Item>,
Clos: FnMut(&mut State) -> PollNext,
{
fn is_terminated(&self) -> bool {
match self.internal_state {
InternalState::BothFinished => true,
_ => false,
}
}
}
#[inline]
fn poll_side<St1, St2, Clos, State>(
select: &mut SelectWithStrategyProj<'_, St1, St2, Clos, State>,
side: PollNext,
cx: &mut Context<'_>,
) -> Poll<Option<St1::Item>>
where
St1: Stream,
St2: Stream<Item = St1::Item>,
{
match side {
PollNext::Left => select.stream1.as_mut().poll_next(cx),
PollNext::Right => select.stream2.as_mut().poll_next(cx),
}
}
#[inline]
fn poll_inner<St1, St2, Clos, State>(
select: &mut SelectWithStrategyProj<'_, St1, St2, Clos, State>,
side: PollNext,
cx: &mut Context<'_>,
) -> Poll<Option<St1::Item>>
where
St1: Stream,
St2: Stream<Item = St1::Item>,
{
let first_done = match poll_side(select, side, cx) {
Poll::Ready(Some(item)) => return Poll::Ready(Some(item)),
Poll::Ready(None) => {
select.internal_state.finish(side);
true
}
Poll::Pending => false,
};
let other = side.other();
match poll_side(select, other, cx) {
Poll::Ready(None) => {
select.internal_state.finish(other);
if first_done {
Poll::Ready(None)
} else {
Poll::Pending
}
}
a => a,
}
}
impl<St1, St2, Clos, State> Stream for SelectWithStrategy<St1, St2, Clos, State>
where
St1: Stream,
St2: Stream<Item = St1::Item>,
Clos: FnMut(&mut State) -> PollNext,
{
type Item = St1::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St1::Item>> {
let mut this = self.project();
match this.internal_state {
InternalState::Start => {
let next_side = (this.clos)(this.state);
poll_inner(&mut this, next_side, cx)
}
InternalState::LeftFinished => match this.stream2.poll_next(cx) {
Poll::Ready(None) => {
*this.internal_state = InternalState::BothFinished;
Poll::Ready(None)
}
a => a,
},
InternalState::RightFinished => match this.stream1.poll_next(cx) {
Poll::Ready(None) => {
*this.internal_state = InternalState::BothFinished;
Poll::Ready(None)
}
a => a,
},
InternalState::BothFinished => Poll::Ready(None),
}
}
}
impl<St1, St2, Clos, State> fmt::Debug for SelectWithStrategy<St1, St2, Clos, State>
where
St1: fmt::Debug,
St2: fmt::Debug,
State: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SelectWithStrategy")
.field("stream1", &self.stream1)
.field("stream2", &self.stream2)
.field("state", &self.state)
.finish()
}
}