futures_async_stream/
lib.rs

1// SPDX-License-Identifier: Apache-2.0 OR MIT
2
3/*!
4<!-- tidy:crate-doc:start -->
5Async stream for Rust and the futures crate.
6
7This crate provides useful features for streams, using `async_await` and
8unstable [`coroutines`](https://github.com/rust-lang/rust/issues/43122).
9
10## Usage
11
12Add this to your `Cargo.toml`:
13
14```toml
15[dependencies]
16futures-async-stream = "0.2"
17futures = "0.3"
18```
19
20*Compiler support: requires rustc nightly-2024-04-25+*
21
22## `#[for_await]`
23
24Processes streams using a for loop.
25
26This is a reimplement of [futures-await]'s `#[async]` for loops for
27futures 0.3 and is an experimental implementation of [the idea listed as the
28next step of async/await](https://github.com/rust-lang/rfcs/blob/HEAD/text/2394-async_await.md#for-await-and-processing-streams).
29
30```rust
31#![feature(proc_macro_hygiene, stmt_expr_attributes)]
32
33use futures::stream::Stream;
34use futures_async_stream::for_await;
35
36async fn collect(stream: impl Stream<Item = i32>) -> Vec<i32> {
37    let mut vec = vec![];
38    #[for_await]
39    for value in stream {
40        vec.push(value);
41    }
42    vec
43}
44```
45
46`value` has the `Item` type of the stream passed in. Note that async for
47loops can only be used inside of `async` functions, closures, blocks,
48`#[stream]` functions and `stream_block!` macros.
49
50## `#[stream]`
51
52Creates streams via coroutines.
53
54This is a reimplement of [futures-await]'s `#[stream]` for futures 0.3 and
55is an experimental implementation of [the idea listed as the next step of
56async/await](https://github.com/rust-lang/rfcs/blob/HEAD/text/2394-async_await.md#generators-and-streams).
57
58```rust
59#![feature(coroutines)]
60
61use futures::stream::Stream;
62use futures_async_stream::stream;
63
64// Returns a stream of i32
65#[stream(item = i32)]
66async fn foo(stream: impl Stream<Item = String>) {
67    // `for_await` is built into `stream`. If you use `for_await` only in `stream`, there is no need to import `for_await`.
68    #[for_await]
69    for x in stream {
70        yield x.parse().unwrap();
71    }
72}
73```
74
75To early exit from a `#[stream]` function or block, use `return`.
76
77`#[stream]` on async fn must have an item type specified via
78`item = some::Path` and the values output from the stream must be yielded
79via the `yield` expression.
80
81`#[stream]` can also be used on async blocks:
82
83```rust
84#![feature(coroutines, proc_macro_hygiene, stmt_expr_attributes)]
85
86use futures::stream::Stream;
87use futures_async_stream::stream;
88
89fn foo() -> impl Stream<Item = i32> {
90    #[stream]
91    async move {
92        for i in 0..10 {
93            yield i;
94        }
95    }
96}
97```
98
99Note that `#[stream]` on async block does not require the `item` argument,
100but it may require additional type annotations.
101
102## Using async stream functions in traits
103
104You can use async stream functions in traits by passing `boxed` or
105`boxed_local` as an argument.
106
107```rust
108#![feature(coroutines)]
109
110use futures_async_stream::stream;
111
112trait Foo {
113    #[stream(boxed, item = u32)]
114    async fn method(&mut self);
115}
116
117struct Bar(u32);
118
119impl Foo for Bar {
120    #[stream(boxed, item = u32)]
121    async fn method(&mut self) {
122        while self.0 < u32::MAX {
123            self.0 += 1;
124            yield self.0;
125        }
126    }
127}
128```
129
130A async stream function that received a `boxed` argument is converted to a
131function that returns `Pin<Box<dyn Stream<Item = item> + Send + 'lifetime>>`.
132If you passed `boxed_local` instead of `boxed`, async stream function
133returns a non-thread-safe stream (`Pin<Box<dyn Stream<Item = item> + 'lifetime>>`).
134
135```rust
136#![feature(coroutines)]
137
138use std::pin::Pin;
139
140use futures::stream::Stream;
141use futures_async_stream::stream;
142
143// The trait itself can be defined without unstable features.
144trait Foo {
145    fn method(&mut self) -> Pin<Box<dyn Stream<Item = u32> + Send + '_>>;
146}
147
148struct Bar(u32);
149
150impl Foo for Bar {
151    #[stream(boxed, item = u32)]
152    async fn method(&mut self) {
153        while self.0 < u32::MAX {
154            self.0 += 1;
155            yield self.0;
156        }
157    }
158}
159```
160
161## `#[try_stream]`
162
163`?` operator can be used with the `#[try_stream]`. The `Item` of the
164returned stream is `Result` with `Ok` being the value yielded and `Err` the
165error type returned by `?` operator or `return Err(...)`.
166
167```rust
168#![feature(coroutines)]
169
170use futures::stream::Stream;
171use futures_async_stream::try_stream;
172
173#[try_stream(ok = i32, error = Box<dyn std::error::Error>)]
174async fn foo(stream: impl Stream<Item = String>) {
175    #[for_await]
176    for x in stream {
177        yield x.parse()?;
178    }
179}
180```
181
182`#[try_stream]` can be used wherever `#[stream]` can be used.
183
184To early exit from a `#[try_stream]` function or block, use `return Ok(())`.
185
186<!--
187## List of features that may be added in the future as an extension of this feature:
188
189- `async_sink` (https://github.com/rust-lang-nursery/futures-rs/pull/1548#issuecomment-486205382)
190- Support `.await` in macro (https://github.com/rust-lang-nursery/futures-rs/pull/1548#discussion_r285341883)
191- Parallel version of `for_await` (https://github.com/rustasync/runtime/pull/25)
192-->
193
194## How to write the equivalent code without this API?
195
196### `#[for_await]`
197
198You can write this by combining `while let` loop, `.await`, `pin!` macro,
199and `StreamExt::next()` method:
200
201```rust
202use std::pin::pin;
203
204use futures::stream::{Stream, StreamExt};
205
206async fn collect(stream: impl Stream<Item = i32>) -> Vec<i32> {
207    let mut vec = vec![];
208    let mut stream = pin!(stream);
209    while let Some(value) = stream.next().await {
210        vec.push(value);
211    }
212    vec
213}
214```
215
216### `#[stream]`
217
218You can write this by manually implementing the combinator:
219
220```rust
221use std::{
222    pin::Pin,
223    task::{ready, Context, Poll},
224};
225
226use futures::stream::Stream;
227use pin_project::pin_project;
228
229fn foo<S>(stream: S) -> impl Stream<Item = i32>
230where
231    S: Stream<Item = String>,
232{
233    Foo { stream }
234}
235
236#[pin_project]
237struct Foo<S> {
238    #[pin]
239    stream: S,
240}
241
242impl<S> Stream for Foo<S>
243where
244    S: Stream<Item = String>,
245{
246    type Item = i32;
247
248    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
249        if let Some(x) = ready!(self.project().stream.poll_next(cx)) {
250            Poll::Ready(Some(x.parse().unwrap()))
251        } else {
252            Poll::Ready(None)
253        }
254    }
255}
256```
257
258[futures-await]: https://github.com/alexcrichton/futures-await
259
260<!-- tidy:crate-doc:end -->
261*/
262
263#![no_std]
264#![doc(test(
265    no_crate_inject,
266    attr(
267        deny(warnings, rust_2018_idioms, single_use_lifetimes),
268        allow(dead_code, unused_variables)
269    )
270))]
271#![warn(
272    // Lints that may help when writing public library.
273    missing_debug_implementations,
274    missing_docs,
275    clippy::alloc_instead_of_core,
276    clippy::exhaustive_enums,
277    clippy::exhaustive_structs,
278    clippy::impl_trait_in_params,
279    clippy::missing_inline_in_public_items,
280    clippy::std_instead_of_alloc,
281    clippy::std_instead_of_core,
282)]
283#![feature(coroutine_trait)]
284
285#[cfg(test)]
286extern crate std;
287
288#[cfg(test)]
289#[path = "gen/assert_impl.rs"]
290mod assert_impl;
291
292#[doc(inline)]
293pub use futures_async_stream_macro::for_await;
294#[doc(inline)]
295pub use futures_async_stream_macro::stream;
296#[doc(inline)]
297pub use futures_async_stream_macro::stream_block;
298#[doc(inline)]
299pub use futures_async_stream_macro::try_stream;
300#[doc(inline)]
301pub use futures_async_stream_macro::try_stream_block;
302
303mod future {
304    use core::{
305        future::Future,
306        ops::{Coroutine, CoroutineState},
307        pin::Pin,
308        ptr::NonNull,
309        task::{Context, Poll},
310    };
311
312    use pin_project::pin_project;
313
314    // Based on https://github.com/rust-lang/rust/blob/1.66.0/library/core/src/future/mod.rs.
315    // TODO: can we avoid GenFuture? https://github.com/rust-lang/rust/commit/9f36f988ad873f5d34cd9c08e4903c597ffc9532
316
317    /// This type is needed because:
318    ///
319    /// a) Coroutines cannot implement `for<'a, 'b> Coroutine<&'a mut Context<'b>>`, so we need to pass
320    ///    a raw pointer (see <https://github.com/rust-lang/rust/issues/68923>).
321    /// b) Raw pointers and `NonNull` aren't `Send` or `Sync`, so that would make every single future
322    ///    non-Send/Sync as well, and we don't want that.
323    ///
324    /// It also simplifies the lowering of `.await`.
325    #[doc(hidden)]
326    #[derive(Debug, Copy, Clone)]
327    pub struct ResumeTy(pub(crate) NonNull<Context<'static>>);
328
329    // SAFETY: the caller of the `get_context` function that dereferences a
330    // pointer must guarantee that no data races will occur.
331    // Note: Since https://github.com/rust-lang/rust/pull/95985, `Context` is
332    // `!Send` and `!Sync`.
333    unsafe impl Send for ResumeTy {}
334    // SAFETY: see `Send` impl
335    unsafe impl Sync for ResumeTy {}
336
337    /// Wrap a coroutine in a future.
338    ///
339    /// This function returns a `GenFuture` underneath, but hides it in `impl Trait` to give
340    /// better error messages (`impl Future` rather than `GenFuture<[closure.....]>`).
341    #[doc(hidden)]
342    #[inline]
343    pub fn from_coroutine<G>(gen: G) -> impl Future<Output = G::Return>
344    where
345        G: Coroutine<ResumeTy, Yield = ()>,
346    {
347        GenFuture(gen)
348    }
349
350    #[pin_project]
351    pub(crate) struct GenFuture<G>(#[pin] G);
352
353    impl<G> Future for GenFuture<G>
354    where
355        G: Coroutine<ResumeTy, Yield = ()>,
356    {
357        type Output = G::Return;
358
359        #[inline]
360        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
361            let this = self.project();
362            // Resume the coroutine, turning the `&mut Context` into a `NonNull` raw pointer. The
363            // `.await` lowering will safely cast that back to a `&mut Context`.
364            match this.0.resume(ResumeTy(NonNull::from(cx).cast::<Context<'static>>())) {
365                CoroutineState::Yielded(()) => Poll::Pending,
366                CoroutineState::Complete(x) => Poll::Ready(x),
367            }
368        }
369    }
370
371    #[doc(hidden)]
372    #[must_use]
373    #[inline]
374    pub unsafe fn get_context<'a, 'b>(cx: ResumeTy) -> &'a mut Context<'b> {
375        // SAFETY: the caller must guarantee that `cx.0` is a valid pointer
376        // that fulfills all the requirements for a mutable reference.
377        unsafe { &mut *cx.0.as_ptr().cast::<Context<'b>>() }
378    }
379}
380
381mod stream {
382    use core::{
383        future::Future,
384        ops::{Coroutine, CoroutineState},
385        pin::Pin,
386        ptr::NonNull,
387        task::{Context, Poll},
388    };
389
390    use futures_core::stream::Stream;
391    use pin_project::pin_project;
392
393    use crate::future::ResumeTy;
394
395    /// Wrap a coroutine in a stream.
396    ///
397    /// This function returns a `GenStream` underneath, but hides it in `impl Trait` to give
398    /// better error messages (`impl Stream` rather than `GenStream<[closure.....]>`).
399    #[doc(hidden)]
400    #[inline]
401    pub fn from_coroutine<G, T>(gen: G) -> impl Stream<Item = T>
402    where
403        G: Coroutine<ResumeTy, Yield = Poll<T>, Return = ()>,
404    {
405        GenStream(gen)
406    }
407
408    #[pin_project]
409    pub(crate) struct GenStream<G>(#[pin] G);
410
411    impl<G, T> Stream for GenStream<G>
412    where
413        G: Coroutine<ResumeTy, Yield = Poll<T>, Return = ()>,
414    {
415        type Item = T;
416
417        #[inline]
418        fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
419            let this = self.project();
420            match this.0.resume(ResumeTy(NonNull::from(cx).cast::<Context<'static>>())) {
421                CoroutineState::Yielded(x) => x.map(Some),
422                CoroutineState::Complete(()) => Poll::Ready(None),
423            }
424        }
425    }
426
427    // This is equivalent to the `futures::stream::StreamExt::next` method.
428    // But we want to make this crate dependency as small as possible, so we define our `next` function.
429    #[doc(hidden)]
430    #[inline]
431    pub fn next<S>(stream: &mut S) -> impl Future<Output = Option<S::Item>> + '_
432    where
433        S: Stream + Unpin,
434    {
435        Next(stream)
436    }
437
438    pub(crate) struct Next<'a, S>(&'a mut S);
439
440    impl<S> Future for Next<'_, S>
441    where
442        S: Stream + Unpin,
443    {
444        type Output = Option<S::Item>;
445
446        #[inline]
447        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
448            Pin::new(&mut self.0).poll_next(cx)
449        }
450    }
451}
452
453mod try_stream {
454    use core::{
455        ops::{Coroutine, CoroutineState},
456        pin::Pin,
457        ptr::NonNull,
458        task::{Context, Poll},
459    };
460
461    use futures_core::stream::{FusedStream, Stream};
462    use pin_project::pin_project;
463
464    use crate::future::ResumeTy;
465
466    /// Wrap a coroutine in a stream.
467    ///
468    /// This function returns a `GenStream` underneath, but hides it in `impl Trait` to give
469    /// better error messages (`impl Stream` rather than `GenStream<[closure.....]>`).
470    #[doc(hidden)]
471    #[inline]
472    pub fn from_coroutine<G, T, E>(gen: G) -> impl FusedStream<Item = Result<T, E>>
473    where
474        G: Coroutine<ResumeTy, Yield = Poll<T>, Return = Result<(), E>>,
475    {
476        GenTryStream(Some(gen))
477    }
478
479    #[pin_project]
480    pub(crate) struct GenTryStream<G>(#[pin] Option<G>);
481
482    impl<G, T, E> Stream for GenTryStream<G>
483    where
484        G: Coroutine<ResumeTy, Yield = Poll<T>, Return = Result<(), E>>,
485    {
486        type Item = Result<T, E>;
487
488        #[inline]
489        fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
490            let mut this = self.project();
491            if let Some(gen) = this.0.as_mut().as_pin_mut() {
492                let res = match gen.resume(ResumeTy(NonNull::from(cx).cast::<Context<'static>>())) {
493                    CoroutineState::Yielded(x) => x.map(|x| Some(Ok(x))),
494                    CoroutineState::Complete(Err(e)) => Poll::Ready(Some(Err(e))),
495                    CoroutineState::Complete(Ok(())) => Poll::Ready(None),
496                };
497                if let Poll::Ready(Some(Err(_)) | None) = &res {
498                    this.0.set(None);
499                }
500                res
501            } else {
502                Poll::Ready(None)
503            }
504        }
505    }
506
507    impl<G, T, E> FusedStream for GenTryStream<G>
508    where
509        G: Coroutine<ResumeTy, Yield = Poll<T>, Return = Result<(), E>>,
510    {
511        #[inline]
512        fn is_terminated(&self) -> bool {
513            self.0.is_none()
514        }
515    }
516}
517
518// Not public API.
519#[doc(hidden)]
520pub mod __private {
521    #[doc(hidden)]
522    pub use core::{
523        marker::Send,
524        option::Option::{None, Some},
525        pin::Pin,
526        result::Result::{self, Ok},
527        task::Poll,
528    };
529
530    #[doc(hidden)]
531    pub mod future {
532        #[doc(hidden)]
533        pub use core::future::Future;
534
535        #[doc(hidden)]
536        pub use crate::future::{from_coroutine, get_context, ResumeTy};
537    }
538
539    #[doc(hidden)]
540    pub mod stream {
541        #[doc(hidden)]
542        pub use futures_core::stream::Stream;
543
544        #[doc(hidden)]
545        pub use crate::stream::{from_coroutine, next};
546    }
547
548    #[doc(hidden)]
549    pub mod try_stream {
550        #[doc(hidden)]
551        pub use crate::try_stream::from_coroutine;
552    }
553}
554
555// TODO: generate this with codegen
556#[allow(clippy::wildcard_imports)]
557#[cfg(test)]
558mod tests {
559    use core::marker::PhantomPinned;
560
561    use static_assertions::{
562        assert_impl_all as assert_impl, assert_not_impl_all as assert_not_impl,
563    };
564
565    use crate::*;
566
567    assert_impl!(future::GenFuture<()>: Send);
568    assert_not_impl!(future::GenFuture<*const ()>: Send);
569    assert_impl!(future::GenFuture<()>: Sync);
570    assert_not_impl!(future::GenFuture<*const ()>: Sync);
571    assert_impl!(future::GenFuture<()>: Unpin);
572    assert_not_impl!(future::GenFuture<PhantomPinned>: Unpin);
573
574    assert_impl!(stream::GenStream<()>: Send);
575    assert_not_impl!(stream::GenStream<*const ()>: Send);
576    assert_impl!(stream::GenStream<()>: Sync);
577    assert_not_impl!(stream::GenStream<*const ()>: Sync);
578    assert_impl!(stream::GenStream<()>: Unpin);
579    assert_not_impl!(stream::GenStream<PhantomPinned>: Unpin);
580
581    assert_impl!(stream::Next<'_, ()>: Send);
582    assert_not_impl!(stream::Next<'_, *const ()>: Send);
583    assert_impl!(stream::Next<'_, ()>: Sync);
584    assert_not_impl!(stream::Next<'_, *const ()>: Sync);
585    assert_impl!(stream::Next<'_, PhantomPinned>: Unpin);
586
587    assert_impl!(try_stream::GenTryStream<()>: Send);
588    assert_not_impl!(try_stream::GenTryStream<*const ()>: Send);
589    assert_impl!(try_stream::GenTryStream<()>: Sync);
590    assert_not_impl!(try_stream::GenTryStream<*const ()>: Sync);
591    assert_impl!(try_stream::GenTryStream<()>: Unpin);
592    assert_not_impl!(try_stream::GenTryStream<PhantomPinned>: Unpin);
593}