use std::cmp::Ordering;
use std::pin::Pin;
use std::task::{ready, Context, Poll};
use futures::stream::{Fuse, Stream, StreamExt};
use pin_project::pin_project;
use crate::CollateRef;
#[pin_project]
pub struct Diff<C, T, L, R> {
collator: C,
#[pin]
left: Fuse<L>,
#[pin]
right: Fuse<R>,
pending_left: Option<T>,
pending_right: Option<T>,
}
impl<C, T, L, R> Stream for Diff<C, T, L, R>
where
C: CollateRef<T>,
L: Stream<Item = T> + Unpin,
R: Stream<Item = T> + Unpin,
{
type Item = T;
fn poll_next(self: Pin<&mut Self>, cxt: &mut Context) -> Poll<Option<Self::Item>> {
let mut this = self.project();
Poll::Ready(loop {
let left_done = if this.left.is_done() {
true
} else if this.pending_left.is_none() {
match ready!(Pin::new(&mut this.left).poll_next(cxt)) {
Some(value) => {
*this.pending_left = Some(value);
false
}
None => true,
}
} else {
false
};
let right_done = if this.right.is_done() {
true
} else if this.pending_right.is_none() {
match ready!(Pin::new(&mut this.right).poll_next(cxt)) {
Some(value) => {
*this.pending_right = Some(value);
false
}
None => true,
}
} else {
false
};
if this.pending_left.is_some() && this.pending_right.is_some() {
let l_value = this.pending_left.as_ref().unwrap();
let r_value = this.pending_right.as_ref().unwrap();
match this.collator.cmp_ref(l_value, r_value) {
Ordering::Equal => {
this.pending_left.take();
this.pending_right.take();
}
Ordering::Less => {
break this.pending_left.take();
}
Ordering::Greater => {
this.pending_right.take();
}
}
} else if right_done && this.pending_left.is_some() {
break this.pending_left.take();
} else if left_done {
break None;
}
})
}
}
pub fn diff<C, T, L, R>(collator: C, left: L, right: R) -> Diff<C, T, L, R>
where
C: CollateRef<T>,
L: Stream<Item = T>,
R: Stream<Item = T>,
{
Diff {
collator,
left: left.fuse(),
right: right.fuse(),
pending_left: None,
pending_right: None,
}
}