use std::{
pin::Pin,
task::{Context, Poll},
};
use futures_util::{
Stream, StreamExt, ready,
stream::{Fuse, FusedStream},
};
use pin_project::pin_project;
#[pin_project]
#[derive(Debug)]
pub struct Grouped<S, T, K = <<S as Stream>::Item as GroupItem>::K> {
#[pin]
stream: Fuse<S>,
current: Option<(K, T)>,
}
impl<S: Default + Stream, T, K> Default for Grouped<S, T, K> {
fn default() -> Self {
S::default().into()
}
}
pub trait GroupItem: Sized {
type K: PartialEq;
type V;
type Grouped<T: Extend<Self::V>>;
fn poll_next_group<S: FusedStream<Item = Self>, T: Default + Extend<Self::V>>(
stream: Pin<&mut S>,
current: &mut Option<(Self::K, T)>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Grouped<T>>>;
}
impl<K: PartialEq, V> GroupItem for (K, V) {
type K = K;
type V = V;
type Grouped<T: Extend<Self::V>> = (K, T);
fn poll_next_group<S: FusedStream<Item = Self>, T: Default + Extend<Self::V>>(
mut stream: Pin<&mut S>,
current: &mut Option<(Self::K, T)>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Grouped<T>>> {
let mut next = || {
if stream.is_terminated() {
Poll::Ready(None)
} else {
stream.as_mut().poll_next(cx)
}
};
let (k, t) = match current.as_mut() {
Some(current) => current,
None => {
let Some((k, v)) = ready!(next()) else {
return Poll::Ready(None);
};
let mut t = T::default();
t.extend(std::iter::once(v));
current.insert((k, t))
}
};
while let Some((mut k_other, v)) = ready!(next()) {
if k_other != *k {
std::mem::swap(&mut k_other, k);
let t_other = std::mem::take(t);
t.extend(std::iter::once(v));
return Poll::Ready(Some((k_other, t_other)));
}
t.extend(std::iter::once(v));
}
let current = current.take().expect("has just dealt with Some");
Poll::Ready(Some(current))
}
}
impl<K: PartialEq, V, E> GroupItem for Result<(K, V), E> {
type K = K;
type V = V;
type Grouped<T: Extend<Self::V>> = Result<(K, T), E>;
fn poll_next_group<S: FusedStream<Item = Self>, T: Default + Extend<Self::V>>(
mut stream: Pin<&mut S>,
current: &mut Option<(Self::K, T)>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Grouped<T>>> {
let mut next = || {
if stream.is_terminated() {
Poll::Ready(None)
} else {
stream.as_mut().poll_next(cx)
}
};
let (k, t) = match current.as_mut() {
Some(current) => current,
None => {
let Some((k, v)) = ready!(next()?) else {
return Poll::Ready(None);
};
let mut t = T::default();
t.extend(std::iter::once(v));
current.insert((k, t))
}
};
while let Some((mut k_other, v)) = ready!(next()?) {
if k_other != *k {
std::mem::swap(&mut k_other, k);
let t_other = std::mem::take(t);
t.extend(std::iter::once(v));
return Poll::Ready(Some(Ok((k_other, t_other))));
}
t.extend(std::iter::once(v));
}
let current = current.take().expect("has just dealt with Some");
Poll::Ready(Some(Ok(current)))
}
}
impl<S: Stream<Item = I>, I: GroupItem, T: Default + Extend<I::V>> Stream for Grouped<S, T, I::K> {
type Item = I::Grouped<T>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
I::poll_next_group(this.stream, this.current, cx)
}
}
impl<S: Stream<Item = I>, I: GroupItem, T: Default + Extend<I::V>> FusedStream
for Grouped<S, T, I::K>
{
fn is_terminated(&self) -> bool {
self.current.is_none() && self.stream.is_terminated()
}
}
impl<S: Stream, T, K> From<S> for Grouped<S, T, K> {
fn from(stream: S) -> Self {
Self {
stream: stream.fuse(),
current: None,
}
}
}
pub trait GroupSequential: Sized + Stream<Item: GroupItem<V = Self::V>> {
type V;
#[must_use]
fn group_sequential<T: Default + Extend<Self::V>>(self) -> Grouped<Self, T> {
self.into()
}
}
impl<S: Stream<Item = I>, I: GroupItem> GroupSequential for S {
type V = I::V;
}