1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
//! Enstream provides a way to convert [`Future`] to [`Stream`].
//!
//! # Example
//!
//! ```
//! #![feature(generic_associated_types, type_alias_impl_trait)]
//!
//! use std::future::Future;
//!
//! use enstream::{HandlerFn, Yielder, enstream};
//! use futures_util::{future::FutureExt, pin_mut, stream::StreamExt};
//!
//! struct StreamState<'a> {
//! val: &'a str
//! }
//!
//! type StreamStateFut<'yielder> = impl Future<Output = ()> + 'yielder;
//!
//! impl<'a> HandlerFn<'a, &'a str> for StreamState<'a> {
//! type Fut<'yielder> = StreamStateFut<'yielder>
//! where
//! 'a: 'yielder;
//!
//! fn call<'yielder>(self, mut yielder: Yielder<'yielder, &'a str>) -> Self::Fut<'yielder>
//! where
//! 'a: 'yielder
//! {
//! async move {
//! yielder.yield_item(self.val).await;
//! }
//! }
//! }
//!
//! let owned = String::from("test");
//!
//! let stream = enstream(StreamState {
//! val: &owned
//! });
//!
//! pin_mut!(stream);
//!
//! assert_eq!(stream.next().now_or_never().flatten(), Some("test"));
//! ```
#![no_std]
#![feature(generic_associated_types)]
mod yield_now;
use core::{
cell::{Cell, UnsafeCell},
future::Future,
hint::unreachable_unchecked,
marker::PhantomData,
mem::MaybeUninit,
pin::Pin,
ptr::NonNull,
task::{Context, Poll},
};
use futures_util::stream::{FusedStream, Stream};
use pin_project::pin_project;
use pinned_aliasable::Aliasable;
use yield_now::YieldNow;
/// [`Future`] generator that can be converted to [`Stream`].
pub trait HandlerFn<'scope, T: 'scope> {
type Fut<'yielder>: Future<Output = ()> + 'yielder
where
'scope: 'yielder;
/// Create new [`Future`] with the provided [`Yielder`] as a [`Stream`] item source.
///
/// `'yielder` lifetime is defined inside of library internals,
/// thus you are not allowed to use it to access outer scope elements.
///
/// However, for those cases [`HandlerFn`] provides you with `'scope` lifetime,
/// which is required to outlive `'yielder`.
fn call<'yielder>(self, yielder: Yielder<'yielder, T>) -> Self::Fut<'yielder>
where
'scope: 'yielder;
}
/// [`Stream`] item yielder.
pub struct Yielder<'a, T>(NonNull<Option<T>>, PhantomData<Cell<&'a ()>>);
impl<'a, T> Yielder<'a, T> {
/// Yield an item to stream.
///
/// Each call to [`yield_item`] overrides the previous value
/// yielded to stream.
///
/// To ensure that the previous value is not overriden,
/// you have to `await` the future returned by [`yield_item`].
///
/// [`yield_item`]: Yielder::yield_item
pub async fn yield_item(&mut self, val: T) {
// Safety:
//
// Validity of internal NonNull pointer
// is constrained by the lifetime of PhantomData,
// which is defined by the lifetime of a Enstream struct.
unsafe { self.0.as_ptr().write(Some(val)) }
YieldNow::Created.await;
}
}
// Safety:
//
// Yielder doesn't use any special thread semantics,
// so it's safe to Send it to another thread as long as you can Send T.
unsafe impl<'a, T: Send> Send for Yielder<'a, T> {}
// Safety:
//
// Since we don't provide any access to T
// we can make Yielder Sync even if T is not Sync.
unsafe impl<'a, T: Send> Sync for Yielder<'a, T> {}
#[pin_project(project = EnstreamStateProj)]
enum EnstreamState<G, F> {
Gen(MaybeUninit<G>),
Fut(#[pin] F),
Completed,
}
#[pin_project]
struct Enstream<T, G, F> {
#[pin]
cell: Aliasable<UnsafeCell<Option<T>>>,
#[pin]
state: EnstreamState<G, F>,
}
impl<'yielder, 'scope: 'yielder, T: 'scope, G: 'scope> Stream
for Enstream<T, G, <G as HandlerFn<'scope, T>>::Fut<'yielder>>
where
G: HandlerFn<'scope, T>,
{
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
// Safety: Pointer is guaranteed to be not null.
let mut cell_pointer = unsafe { NonNull::new_unchecked(this.cell.as_ref().get().get()) };
let poll_result = match this.state.as_mut().project() {
EnstreamStateProj::Fut(fut) => fut.poll(cx),
EnstreamStateProj::Gen(gen) => {
// Safety: EnstreamState::Gen is always initialized with
// valid future generator instance, that is read only
// once, since after the initial read we replace EnstreamState::Gen
// with EnstreamState::Fut or EnstreamState::Completed.
let gen = unsafe { gen.assume_init_read() };
// Since the generator is provided by user,
// we have to protect ourselves from a panic
// while the future is being initialized.
this.state.set(EnstreamState::Completed);
let fut = gen.call(Yielder(cell_pointer, PhantomData));
this.state.set(EnstreamState::Fut(fut));
match this.state.as_mut().project() {
EnstreamStateProj::Fut(fut) => fut.poll(cx),
// Safety: EnstreamState was set to Fut above.
_ => unsafe { unreachable_unchecked() },
}
}
EnstreamStateProj::Completed => return Poll::Ready(None),
};
match poll_result {
Poll::Ready(()) => {
this.state.set(EnstreamState::Completed);
Poll::Ready(None)
}
Poll::Pending => match unsafe { cell_pointer.as_mut() }.take() {
Some(val) => Poll::Ready(Some(val)),
None => Poll::Pending,
},
}
}
}
impl<'yielder, 'scope: 'yielder, T: 'scope, G: 'scope> FusedStream
for Enstream<T, G, <G as HandlerFn<'scope, T>>::Fut<'yielder>>
where
G: HandlerFn<'scope, T>,
{
fn is_terminated(&self) -> bool {
matches!(self.state, EnstreamState::Completed)
}
}
/// Create new [`Stream`] from the provided [`HandlerFn`].
pub fn enstream<'scope, T: 'scope, G: 'scope>(generator: G) -> impl FusedStream<Item = T> + 'scope
where
G: HandlerFn<'scope, T>,
{
Enstream {
cell: Aliasable::new(UnsafeCell::new(None)),
state: EnstreamState::Gen(MaybeUninit::new(generator)),
}
}