use core::fmt::{self, Debug, Formatter};
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use completion_core::{CompletionFuture, CompletionStream};
use futures_core::{ready, Stream};
use pin_project_lite::pin_project;
pub fn unfold<T, F, Fut, Item>(seed: T, f: F) -> Unfold<T, F, Fut>
where
F: FnMut(T) -> Fut,
Fut: CompletionFuture<Output = Option<(Item, T)>>,
{
Unfold {
state: Some(seed),
f,
fut: None,
}
}
pin_project! {
pub struct Unfold<T, F, Fut> {
state: Option<T>,
f: F,
#[pin]
fut: Option<Fut>,
}
}
impl<T: Debug, F, Fut: Debug> Debug for Unfold<T, F, Fut> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("Unfold")
.field("state", &self.state)
.field("fut", &self.fut)
.finish()
}
}
impl<T, F, Fut, Item> CompletionStream for Unfold<T, F, Fut>
where
F: FnMut(T) -> Fut,
Fut: CompletionFuture<Output = Option<(Item, T)>>,
{
type Item = Item;
unsafe fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
if let Some(state) = this.state.take() {
this.fut.set(Some((this.f)(state)));
}
let step = ready!(this
.fut
.as_mut()
.as_pin_mut()
.expect("`Unfold` polled after completion")
.poll(cx));
this.fut.set(None);
Poll::Ready(step.map(|(item, next_state)| {
*this.state = Some(next_state);
item
}))
}
unsafe fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if let Some(fut) = self.project().fut.as_pin_mut() {
fut.poll_cancel(cx)
} else {
Poll::Ready(())
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
if self.fut.is_none() && self.state.is_none() {
(0, Some(0))
} else {
(0, None)
}
}
}
impl<T, F, Fut, Item> Stream for Unfold<T, F, Fut>
where
F: FnMut(T) -> Fut,
Fut: CompletionFuture<Output = Option<(Item, T)>> + Future<Output = Option<(Item, T)>>,
{
type Item = Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
unsafe { CompletionStream::poll_next(self, cx) }
}
fn size_hint(&self) -> (usize, Option<usize>) {
CompletionStream::size_hint(self)
}
}
pub fn try_unfold<T, E, F, Fut, Item>(seed: T, f: F) -> TryUnfold<T, F, Fut>
where
F: FnMut(T) -> Fut,
Fut: CompletionFuture<Output = Result<Option<(Item, T)>, E>>,
{
TryUnfold {
state: Some(seed),
f,
fut: None,
}
}
pin_project! {
pub struct TryUnfold<T, F, Fut> {
state: Option<T>,
f: F,
#[pin]
fut: Option<Fut>,
}
}
impl<T: Debug, F, Fut: Debug> Debug for TryUnfold<T, F, Fut> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("TryUnfold")
.field("state", &self.state)
.field("fut", &self.fut)
.finish()
}
}
impl<T, E, F, Fut, Item> CompletionStream for TryUnfold<T, F, Fut>
where
F: FnMut(T) -> Fut,
Fut: CompletionFuture<Output = Result<Option<(Item, T)>, E>>,
{
type Item = Result<Item, E>;
unsafe fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
if let Some(state) = this.state.take() {
this.fut.set(Some((this.f)(state)));
}
Poll::Ready(match this.fut.as_mut().as_pin_mut() {
Some(fut) => {
let step = ready!(fut.poll(cx));
this.fut.set(None);
match step {
Ok(Some((item, next_state))) => {
*this.state = Some(next_state);
Some(Ok(item))
}
Ok(None) => None,
Err(e) => Some(Err(e)),
}
}
None => {
None
}
})
}
unsafe fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if let Some(fut) = self.project().fut.as_pin_mut() {
fut.poll_cancel(cx)
} else {
Poll::Ready(())
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
if self.state.is_none() && self.fut.is_none() {
(0, Some(0))
} else {
(0, None)
}
}
}
impl<T, E, F, Fut, Item> Stream for TryUnfold<T, F, Fut>
where
F: FnMut(T) -> Fut,
Fut: CompletionFuture<Output = Result<Option<(Item, T)>, E>>
+ Future<Output = Result<Option<(Item, T)>, E>>,
{
type Item = Result<Item, E>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
unsafe { CompletionStream::poll_next(self, cx) }
}
fn size_hint(&self) -> (usize, Option<usize>) {
CompletionStream::size_hint(self)
}
}