futures_async_stream/
lib.rs

1// SPDX-License-Identifier: Apache-2.0 OR MIT
2
3/*!
4<!-- Note: Document from sync-markdown-to-rustdoc:start through sync-markdown-to-rustdoc:end
5     is synchronized from README.md. Any changes to that range are not preserved. -->
6<!-- tidy:sync-markdown-to-rustdoc:start -->
7
8Async stream for Rust and the futures crate.
9
10This crate provides useful features for streams, using `async_await` and
11unstable [`coroutines`](https://github.com/rust-lang/rust/issues/43122).
12
13## Usage
14
15Add this to your `Cargo.toml`:
16
17```toml
18[dependencies]
19futures-async-stream = "0.2"
20futures = "0.3"
21```
22
23*Compiler support: requires rustc nightly-2024-04-25+*
24
25## `#[for_await]`
26
27Processes streams using a for loop.
28
29This is a reimplement of [futures-await]'s `#[async]` for loops for
30futures 0.3 and is an experimental implementation of [the idea listed as the
31next step of async/await](https://rust-lang.github.io/rfcs/2394-async_await.html#for-await-and-processing-streams).
32
33```
34#![feature(proc_macro_hygiene, stmt_expr_attributes)]
35
36use futures::stream::Stream;
37use futures_async_stream::for_await;
38
39async fn collect(stream: impl Stream<Item = i32>) -> Vec<i32> {
40    let mut vec = vec![];
41    #[for_await]
42    for value in stream {
43        vec.push(value);
44    }
45    vec
46}
47```
48
49`value` has the `Item` type of the stream passed in. Note that async for
50loops can only be used inside of `async` functions, closures, blocks,
51`#[stream]` functions and `stream_block!` macros.
52
53## `#[stream]`
54
55Creates streams via coroutines.
56
57This is a reimplement of [futures-await]'s `#[stream]` for futures 0.3 and
58is an experimental implementation of [the idea listed as the next step of
59async/await](https://rust-lang.github.io/rfcs/2394-async_await.html#generators-and-streams).
60
61```
62#![feature(coroutines)]
63
64use futures::stream::Stream;
65use futures_async_stream::stream;
66
67// Returns a stream of i32
68#[stream(item = i32)]
69async fn foo(stream: impl Stream<Item = String>) {
70    // `for_await` is built into `stream`. If you use `for_await` only in `stream`, there is no need to import `for_await`.
71    #[for_await]
72    for x in stream {
73        yield x.parse().unwrap();
74    }
75}
76```
77
78To early exit from a `#[stream]` function or block, use `return`.
79
80`#[stream]` on async fn must have an item type specified via
81`item = some::Path` and the values output from the stream must be yielded
82via the `yield` expression.
83
84`#[stream]` can also be used on async blocks:
85
86```
87#![feature(coroutines, proc_macro_hygiene, stmt_expr_attributes)]
88
89use futures::stream::Stream;
90use futures_async_stream::stream;
91
92fn foo() -> impl Stream<Item = i32> {
93    #[stream]
94    async move {
95        for i in 0..10 {
96            yield i;
97        }
98    }
99}
100```
101
102Note that `#[stream]` on async block does not require the `item` argument,
103but it may require additional type annotations.
104
105## Using async stream functions in traits
106
107You can use async stream functions in traits by passing `boxed` or
108`boxed_local` as an argument.
109
110```
111#![feature(coroutines)]
112
113use futures_async_stream::stream;
114
115trait Foo {
116    #[stream(boxed, item = u32)]
117    async fn method(&mut self);
118}
119
120struct Bar(u32);
121
122impl Foo for Bar {
123    #[stream(boxed, item = u32)]
124    async fn method(&mut self) {
125        while self.0 < u32::MAX {
126            self.0 += 1;
127            yield self.0;
128        }
129    }
130}
131```
132
133A async stream function that received a `boxed` argument is converted to a
134function that returns `Pin<Box<dyn Stream<Item = item> + Send + 'lifetime>>`.
135If you passed `boxed_local` instead of `boxed`, async stream function
136returns a non-thread-safe stream (`Pin<Box<dyn Stream<Item = item> + 'lifetime>>`).
137
138```
139#![feature(coroutines)]
140
141use std::pin::Pin;
142
143use futures::stream::Stream;
144use futures_async_stream::stream;
145
146// The trait itself can be defined without unstable features.
147trait Foo {
148    fn method(&mut self) -> Pin<Box<dyn Stream<Item = u32> + Send + '_>>;
149}
150
151struct Bar(u32);
152
153impl Foo for Bar {
154    #[stream(boxed, item = u32)]
155    async fn method(&mut self) {
156        while self.0 < u32::MAX {
157            self.0 += 1;
158            yield self.0;
159        }
160    }
161}
162```
163
164## `#[try_stream]`
165
166`?` operator can be used with the `#[try_stream]`. The `Item` of the
167returned stream is `Result` with `Ok` being the value yielded and `Err` the
168error type returned by `?` operator or `return Err(...)`.
169
170```
171#![feature(coroutines)]
172
173use futures::stream::Stream;
174use futures_async_stream::try_stream;
175
176#[try_stream(ok = i32, error = Box<dyn std::error::Error>)]
177async fn foo(stream: impl Stream<Item = String>) {
178    #[for_await]
179    for x in stream {
180        yield x.parse()?;
181    }
182}
183```
184
185`#[try_stream]` can be used wherever `#[stream]` can be used.
186
187To early exit from a `#[try_stream]` function or block, use `return Ok(())`.
188
189<!--
190## List of features that may be added in the future as an extension of this feature:
191
192- `async_sink` (https://github.com/rust-lang-nursery/futures-rs/pull/1548#issuecomment-486205382)
193- Support `.await` in macro (https://github.com/rust-lang-nursery/futures-rs/pull/1548#discussion_r285341883)
194- Parallel version of `for_await` (https://github.com/rustasync/runtime/pull/25)
195-->
196
197## How to write the equivalent code without this API?
198
199### `#[for_await]`
200
201You can write this by combining `while let` loop, `.await`, `pin!` macro,
202and `StreamExt::next()` method:
203
204```
205use std::pin::pin;
206
207use futures::stream::{Stream, StreamExt};
208
209async fn collect(stream: impl Stream<Item = i32>) -> Vec<i32> {
210    let mut vec = vec![];
211    let mut stream = pin!(stream);
212    while let Some(value) = stream.next().await {
213        vec.push(value);
214    }
215    vec
216}
217```
218
219### `#[stream]`
220
221You can write this by manually implementing the combinator:
222
223```
224use std::{
225    pin::Pin,
226    task::{ready, Context, Poll},
227};
228
229use futures::stream::Stream;
230use pin_project::pin_project;
231
232fn foo<S>(stream: S) -> impl Stream<Item = i32>
233where
234    S: Stream<Item = String>,
235{
236    Foo { stream }
237}
238
239#[pin_project]
240struct Foo<S> {
241    #[pin]
242    stream: S,
243}
244
245impl<S> Stream for Foo<S>
246where
247    S: Stream<Item = String>,
248{
249    type Item = i32;
250
251    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
252        if let Some(x) = ready!(self.project().stream.poll_next(cx)) {
253            Poll::Ready(Some(x.parse().unwrap()))
254        } else {
255            Poll::Ready(None)
256        }
257    }
258}
259```
260
261[futures-await]: https://github.com/alexcrichton/futures-await
262
263<!-- tidy:sync-markdown-to-rustdoc:end -->
264*/
265
266#![no_std]
267#![doc(test(
268    no_crate_inject,
269    attr(
270        deny(warnings, rust_2018_idioms, single_use_lifetimes),
271        allow(dead_code, unused_variables)
272    )
273))]
274#![warn(
275    // Lints that may help when writing public library.
276    missing_debug_implementations,
277    missing_docs,
278    clippy::alloc_instead_of_core,
279    clippy::exhaustive_enums,
280    clippy::exhaustive_structs,
281    clippy::impl_trait_in_params,
282    clippy::missing_inline_in_public_items,
283    clippy::std_instead_of_alloc,
284    clippy::std_instead_of_core,
285)]
286#![feature(coroutine_trait)]
287
288#[cfg(test)]
289extern crate std;
290
291#[cfg(test)]
292mod tests;
293
294#[cfg(test)]
295#[path = "gen/tests/assert_impl.rs"]
296mod assert_impl;
297#[cfg(test)]
298#[path = "gen/tests/track_size.rs"]
299mod track_size;
300
301#[doc(inline)]
302pub use futures_async_stream_macro::for_await;
303#[doc(inline)]
304pub use futures_async_stream_macro::stream;
305#[doc(inline)]
306pub use futures_async_stream_macro::stream_block;
307#[doc(inline)]
308pub use futures_async_stream_macro::try_stream;
309#[doc(inline)]
310pub use futures_async_stream_macro::try_stream_block;
311
312mod future {
313    use core::{
314        future::Future,
315        ops::{Coroutine, CoroutineState},
316        pin::Pin,
317        ptr::NonNull,
318        task::{Context, Poll},
319    };
320
321    use pin_project::pin_project;
322
323    // Based on https://github.com/rust-lang/rust/blob/1.66.0/library/core/src/future/mod.rs.
324    // TODO: can we avoid GenFuture? https://github.com/rust-lang/rust/commit/9f36f988ad873f5d34cd9c08e4903c597ffc9532
325
326    /// This type is needed because:
327    ///
328    /// a) Coroutines cannot implement `for<'a, 'b> Coroutine<&'a mut Context<'b>>`, so we need to pass
329    ///    a raw pointer (see <https://github.com/rust-lang/rust/issues/68923>).
330    /// b) Raw pointers and `NonNull` aren't `Send` or `Sync`, so that would make every single future
331    ///    non-Send/Sync as well, and we don't want that.
332    ///
333    /// It also simplifies the lowering of `.await`.
334    #[doc(hidden)]
335    #[derive(Debug, Clone, Copy)]
336    pub struct ResumeTy(pub(crate) NonNull<Context<'static>>);
337
338    // SAFETY: the caller of the `get_context` function that dereferences a
339    // pointer must guarantee that no data races will occur.
340    // Note: Since https://github.com/rust-lang/rust/pull/95985, `Context` is
341    // `!Send` and `!Sync`.
342    unsafe impl Send for ResumeTy {}
343    // SAFETY: see `Send` impl
344    unsafe impl Sync for ResumeTy {}
345
346    // Needed to work around old nightly bug (fixed in nightly-2024-05-24 by https://github.com/rust-lang/rust/pull/125392)
347    impl core::panic::UnwindSafe for ResumeTy {}
348    impl core::panic::RefUnwindSafe for ResumeTy {}
349
350    /// Wrap a coroutine in a future.
351    ///
352    /// This function returns a `GenFuture` underneath, but hides it in `impl Trait` to give
353    /// better error messages (`impl Future` rather than `GenFuture<[closure.....]>`).
354    #[doc(hidden)]
355    #[inline]
356    pub fn from_coroutine<G>(g: G) -> impl Future<Output = G::Return>
357    where
358        G: Coroutine<ResumeTy, Yield = ()>,
359    {
360        GenFuture(g)
361    }
362
363    #[pin_project]
364    pub(crate) struct GenFuture<G>(#[pin] G);
365
366    impl<G> Future for GenFuture<G>
367    where
368        G: Coroutine<ResumeTy, Yield = ()>,
369    {
370        type Output = G::Return;
371
372        #[inline]
373        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
374            let this = self.project();
375            // Resume the coroutine, turning the `&mut Context` into a `NonNull` raw pointer. The
376            // `.await` lowering will safely cast that back to a `&mut Context`.
377            match this.0.resume(ResumeTy(NonNull::from(cx).cast::<Context<'static>>())) {
378                CoroutineState::Yielded(()) => Poll::Pending,
379                CoroutineState::Complete(x) => Poll::Ready(x),
380            }
381        }
382    }
383
384    #[doc(hidden)]
385    #[inline]
386    #[must_use]
387    pub unsafe fn get_context<'a, 'b>(cx: ResumeTy) -> &'a mut Context<'b> {
388        // SAFETY: the caller must guarantee that `cx.0` is a valid pointer
389        // that fulfills all the requirements for a mutable reference.
390        unsafe { &mut *cx.0.as_ptr().cast::<Context<'b>>() }
391    }
392}
393
394mod stream {
395    use core::{
396        future::Future,
397        ops::{Coroutine, CoroutineState},
398        pin::Pin,
399        ptr::NonNull,
400        task::{Context, Poll},
401    };
402
403    use futures_core::stream::Stream;
404    use pin_project::pin_project;
405
406    use crate::future::ResumeTy;
407
408    /// Wrap a coroutine in a stream.
409    ///
410    /// This function returns a `GenStream` underneath, but hides it in `impl Trait` to give
411    /// better error messages (`impl Stream` rather than `GenStream<[closure.....]>`).
412    #[doc(hidden)]
413    #[inline]
414    pub fn from_coroutine<G, T>(g: G) -> impl Stream<Item = T>
415    where
416        G: Coroutine<ResumeTy, Yield = Poll<T>, Return = ()>,
417    {
418        GenStream(g)
419    }
420
421    #[pin_project]
422    pub(crate) struct GenStream<G>(#[pin] G);
423
424    impl<G, T> Stream for GenStream<G>
425    where
426        G: Coroutine<ResumeTy, Yield = Poll<T>, Return = ()>,
427    {
428        type Item = T;
429
430        #[inline]
431        fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
432            let this = self.project();
433            match this.0.resume(ResumeTy(NonNull::from(cx).cast::<Context<'static>>())) {
434                CoroutineState::Yielded(x) => x.map(Some),
435                CoroutineState::Complete(()) => Poll::Ready(None),
436            }
437        }
438    }
439
440    // This is equivalent to the `futures::stream::StreamExt::next` method.
441    // But we want to make this crate dependency as small as possible, so we define our `next` function.
442    #[doc(hidden)]
443    #[inline]
444    pub fn next<S>(stream: &mut S) -> impl Future<Output = Option<S::Item>> + '_
445    where
446        S: Stream + Unpin,
447    {
448        Next(stream)
449    }
450
451    pub(crate) struct Next<'a, S>(&'a mut S);
452
453    impl<S> Future for Next<'_, S>
454    where
455        S: Stream + Unpin,
456    {
457        type Output = Option<S::Item>;
458
459        #[inline]
460        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
461            Pin::new(&mut self.0).poll_next(cx)
462        }
463    }
464}
465
466mod try_stream {
467    use core::{
468        ops::{Coroutine, CoroutineState},
469        pin::Pin,
470        ptr::NonNull,
471        task::{Context, Poll},
472    };
473
474    use futures_core::stream::{FusedStream, Stream};
475    use pin_project::pin_project;
476
477    use crate::future::ResumeTy;
478
479    /// Wrap a coroutine in a stream.
480    ///
481    /// This function returns a `GenStream` underneath, but hides it in `impl Trait` to give
482    /// better error messages (`impl Stream` rather than `GenStream<[closure.....]>`).
483    #[doc(hidden)]
484    #[inline]
485    pub fn from_coroutine<G, T, E>(g: G) -> impl FusedStream<Item = Result<T, E>>
486    where
487        G: Coroutine<ResumeTy, Yield = Poll<T>, Return = Result<(), E>>,
488    {
489        GenTryStream(Some(g))
490    }
491
492    #[pin_project]
493    pub(crate) struct GenTryStream<G>(#[pin] Option<G>);
494
495    impl<G, T, E> Stream for GenTryStream<G>
496    where
497        G: Coroutine<ResumeTy, Yield = Poll<T>, Return = Result<(), E>>,
498    {
499        type Item = Result<T, E>;
500
501        #[inline]
502        fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
503            let mut this = self.project();
504            if let Some(g) = this.0.as_mut().as_pin_mut() {
505                let res = match g.resume(ResumeTy(NonNull::from(cx).cast::<Context<'static>>())) {
506                    CoroutineState::Yielded(x) => x.map(|x| Some(Ok(x))),
507                    CoroutineState::Complete(Err(e)) => Poll::Ready(Some(Err(e))),
508                    CoroutineState::Complete(Ok(())) => Poll::Ready(None),
509                };
510                if let Poll::Ready(Some(Err(_)) | None) = &res {
511                    this.0.set(None);
512                }
513                res
514            } else {
515                Poll::Ready(None)
516            }
517        }
518    }
519
520    impl<G, T, E> FusedStream for GenTryStream<G>
521    where
522        G: Coroutine<ResumeTy, Yield = Poll<T>, Return = Result<(), E>>,
523    {
524        #[inline]
525        fn is_terminated(&self) -> bool {
526            self.0.is_none()
527        }
528    }
529}
530
531// Not public API.
532#[doc(hidden)]
533pub mod __private {
534    #[doc(hidden)]
535    pub use core::{
536        marker::Send,
537        option::Option::{None, Some},
538        pin::Pin,
539        result::Result::{self, Ok},
540        task::Poll,
541    };
542
543    #[doc(hidden)]
544    pub mod future {
545        #[doc(hidden)]
546        pub use core::future::Future;
547
548        #[doc(hidden)]
549        pub use crate::future::{ResumeTy, from_coroutine, get_context};
550    }
551
552    #[doc(hidden)]
553    pub mod stream {
554        #[doc(hidden)]
555        pub use futures_core::stream::Stream;
556
557        #[doc(hidden)]
558        pub use crate::stream::{from_coroutine, next};
559    }
560
561    #[doc(hidden)]
562    pub mod try_stream {
563        #[doc(hidden)]
564        pub use crate::try_stream::from_coroutine;
565    }
566}