use std::{
future::Future,
ops::{Deref, DerefMut},
pin::{self, Pin},
task::Poll,
};
use futures::{
future::{self, Pending},
FutureExt, Stream, StreamExt,
};
pub(crate) trait BoolUtils {
fn assert_true(&self);
fn false_to_err<T>(&self, value: T) -> Result<T, T>;
fn true_to_err<T>(&self, value: T) -> Result<T, T>;
}
impl BoolUtils for bool {
fn assert_true(&self) {
if !*self {
panic!();
}
}
fn false_to_err<T>(&self, value: T) -> Result<T, T> {
if *self {
Ok(value)
} else {
Err(value)
}
}
fn true_to_err<T>(&self, value: T) -> Result<T, T> {
if !*self {
Ok(value)
} else {
Err(value)
}
}
}
pub(crate) trait Generator: Stream + Sized + Send + Sync
where
<Self as Stream>::Item: Send + Sync + 'static,
{
fn with_generator(self, future: impl Future<Output = ()> + Send + Sync + 'static) -> impl Stream<Item = <Self as Stream>::Item> + Send + Sync {
let mut boxed_stream = Box::pin(self);
let mut boxed_future = Some(Box::pin(future));
futures::stream::poll_fn(move |cx| {
boxed_future.take_if(|future| future.poll_unpin(cx) == Poll::Ready(()));
match boxed_stream.poll_next_unpin(cx) {
Poll::Ready(Some(value)) => Poll::Ready(Some(value)),
Poll::Ready(None) if boxed_future.is_none() => Poll::Ready(None),
_ => Poll::Pending,
}
})
}
}
impl<S> Generator for S
where
S: Stream + Sized + Send + Sync,
S::Item: Send + Sync + 'static,
{
}
pub trait HakubanStreamExt: Stream + Sized {
fn for_each_till_next<F, C, Fut>(self, raw_context: C, mut f: F) -> impl Future<Output = C>
where
F: FnMut(ForEachTillNextContext<C>, Self::Item) -> Fut,
Fut: Future<Output = ()>,
{
async move {
let mut stream = pin::pin!(self.fuse());
let mut stream_next_future = Box::pin(stream.next());
let mut user_future: Pin<Box<future::Fuse<futures::future::Either<Pending<()>, Fut>>>> =
Box::pin(futures::future::Either::Left(futures::future::pending()).fuse());
let (mut context_return_box_sender, mut context_return_box_receiver) = futures::channel::oneshot::channel();
context_return_box_sender.send(raw_context).is_ok().assert_true();
loop {
futures::select_biased! {
item = stream_next_future => {
drop(user_future);
let raw_context = context_return_box_receiver.await.unwrap();
if let Some(item) = item {
(context_return_box_sender, context_return_box_receiver) = futures::channel::oneshot::channel();
let context = ForEachTillNextContext { inner: Some(raw_context), return_box: Some(context_return_box_sender) };
user_future = Box::pin(futures::future::Either::Right(f(context, item)).fuse());
stream_next_future = Box::pin(stream.next());
} else {
return raw_context;
}
},
_ = user_future => {
},
};
}
}
}
}
impl<S> HakubanStreamExt for S
where
S: Stream + Sized + Send + Sync,
S::Item: Send + Sync + 'static,
{
}
pub struct ForEachTillNextContext<T> {
inner: Option<T>,
return_box: Option<futures::channel::oneshot::Sender<T>>,
}
impl<T> Drop for ForEachTillNextContext<T> {
fn drop(&mut self) {
self.return_box.take().unwrap().send(self.inner.take().unwrap()).ok();
}
}
impl<T> Deref for ForEachTillNextContext<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.inner.as_ref().unwrap()
}
}
impl<T> DerefMut for ForEachTillNextContext<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.inner.as_mut().unwrap()
}
}