hakuban 0.8.5

Data-object sharing library
Documentation
use std::{
	future::Future,
	ops::{Deref, DerefMut},
	pin::{self, Pin},
	task::Poll,
};

use futures::{
	future::{self, Pending},
	FutureExt, Stream, StreamExt,
};

pub(crate) trait BoolUtils {
	fn assert_true(&self);
	// fn map<T>(&self, on_true: T, on_false: T) -> T;
	fn false_to_err<T>(&self, value: T) -> Result<T, T>;
	fn true_to_err<T>(&self, value: T) -> Result<T, T>;
}

impl BoolUtils for bool {
	fn assert_true(&self) {
		if !*self {
			panic!();
		}
	}

	// fn map<T>(&self, on_true: T, on_false: T) -> T {
	// 	if *self {
	// 		on_true
	// 	} else {
	// 		on_false
	// 	}
	// }

	fn false_to_err<T>(&self, value: T) -> Result<T, T> {
		if *self {
			Ok(value)
		} else {
			Err(value)
		}
	}

	fn true_to_err<T>(&self, value: T) -> Result<T, T> {
		if !*self {
			Ok(value)
		} else {
			Err(value)
		}
	}
}

pub(crate) trait Generator: Stream + Sized + Send + Sync
where
	<Self as Stream>::Item: Send + Sync + 'static,
{
	fn with_generator(self, future: impl Future<Output = ()> + Send + Sync + 'static) -> impl Stream<Item = <Self as Stream>::Item> + Send + Sync {
		let mut boxed_stream = Box::pin(self);
		let mut boxed_future = Some(Box::pin(future));
		futures::stream::poll_fn(move |cx| {
			boxed_future.take_if(|future| future.poll_unpin(cx) == Poll::Ready(()));
			match boxed_stream.poll_next_unpin(cx) {
				Poll::Ready(Some(value)) => Poll::Ready(Some(value)),
				Poll::Ready(None) if boxed_future.is_none() => Poll::Ready(None),
				_ => Poll::Pending,
			}
		})
	}

	/*
	fn with_try_generator<E: Send + Sync, Fut: Future<Output = Result<(), E>> + Send + Sync + 'static>(
		self, future: Fut,
	) -> impl Stream<Item = Result<<Self as Stream>::Item, E>> + Send + Sync {
		let mut boxed_stream = Some(Box::pin(self));
		let mut boxed_future = Some(Box::pin(future));
		futures::stream::poll_fn(move |cx| {
			if let Some(ref mut future) = boxed_future {
				if let Poll::Ready(value) = future.poll_unpin(cx) {
					boxed_future.take();
					if let Err(error) = value {
						boxed_stream.take();
						return Poll::Ready(Some(Err(error)));
					};
				};
			}
			if let Some(ref mut stream) = boxed_stream {
				match stream.poll_next_unpin(cx) {
					Poll::Ready(Some(value)) => Poll::Ready(Some(Ok(value))),
					Poll::Ready(None) if boxed_future.is_none() => Poll::Ready(None),
					_ => Poll::Pending,
				}
			} else {
				Poll::Ready(None)
			}
		})
	}
	*/
}

impl<S> Generator for S
where
	S: Stream + Sized + Send + Sync,
	S::Item: Send + Sync + 'static,
{
}

pub trait HakubanStreamExt: Stream + Sized {
	fn for_each_till_next<F, C, Fut>(self, raw_context: C, mut f: F) -> impl Future<Output = C>
	where
		F: FnMut(ForEachTillNextContext<C>, Self::Item) -> Fut,
		Fut: Future<Output = ()>,
	{
		async move {
			let mut stream = pin::pin!(self.fuse());
			let mut stream_next_future = Box::pin(stream.next());
			let mut user_future: Pin<Box<future::Fuse<futures::future::Either<Pending<()>, Fut>>>> =
				Box::pin(futures::future::Either::Left(futures::future::pending()).fuse());
			let (mut context_return_box_sender, mut context_return_box_receiver) = futures::channel::oneshot::channel();
			context_return_box_sender.send(raw_context).is_ok().assert_true();
			loop {
				futures::select_biased! {
					item = stream_next_future => {
						drop(user_future);
						let raw_context = context_return_box_receiver.await.unwrap();
						if let Some(item) = item {
							(context_return_box_sender, context_return_box_receiver) = futures::channel::oneshot::channel();
							let context = ForEachTillNextContext { inner: Some(raw_context), return_box: Some(context_return_box_sender) };
							user_future = Box::pin(futures::future::Either::Right(f(context, item)).fuse());
							stream_next_future = Box::pin(stream.next());
						} else {
							return raw_context;
						}
					},
					_ = user_future => {
					},
				};
			}
		}
	}
}

impl<S> HakubanStreamExt for S
where
	S: Stream + Sized + Send + Sync,
	S::Item: Send + Sync + 'static,
{
}

pub struct ForEachTillNextContext<T> {
	inner: Option<T>,
	return_box: Option<futures::channel::oneshot::Sender<T>>,
}

impl<T> Drop for ForEachTillNextContext<T> {
	fn drop(&mut self) {
		self.return_box.take().unwrap().send(self.inner.take().unwrap()).ok();
	}
}

impl<T> Deref for ForEachTillNextContext<T> {
	type Target = T;

	fn deref(&self) -> &Self::Target {
		self.inner.as_ref().unwrap()
	}
}

impl<T> DerefMut for ForEachTillNextContext<T> {
	fn deref_mut(&mut self) -> &mut Self::Target {
		self.inner.as_mut().unwrap()
	}
}