futures_async_stream/lib.rs
1// SPDX-License-Identifier: Apache-2.0 OR MIT
2
3/*!
4<!-- Note: Document from sync-markdown-to-rustdoc:start through sync-markdown-to-rustdoc:end
5 is synchronized from README.md. Any changes to that range are not preserved. -->
6<!-- tidy:sync-markdown-to-rustdoc:start -->
7
8Async stream for Rust and the futures crate.
9
10This crate provides useful features for streams, using `async_await` and
11unstable [`coroutines`](https://github.com/rust-lang/rust/issues/43122).
12
13## Usage
14
15Add this to your `Cargo.toml`:
16
17```toml
18[dependencies]
19futures-async-stream = "0.2"
20futures = "0.3"
21```
22
23*Compiler support: requires rustc nightly-2024-04-25+*
24
25## `#[for_await]`
26
27Processes streams using a for loop.
28
29This is a reimplement of [futures-await]'s `#[async]` for loops for
30futures 0.3 and is an experimental implementation of [the idea listed as the
31next step of async/await](https://rust-lang.github.io/rfcs/2394-async_await.html#for-await-and-processing-streams).
32
33```
34#![feature(proc_macro_hygiene, stmt_expr_attributes)]
35
36use futures::stream::Stream;
37use futures_async_stream::for_await;
38
39async fn collect(stream: impl Stream<Item = i32>) -> Vec<i32> {
40 let mut vec = vec![];
41 #[for_await]
42 for value in stream {
43 vec.push(value);
44 }
45 vec
46}
47```
48
49`value` has the `Item` type of the stream passed in. Note that async for
50loops can only be used inside of `async` functions, closures, blocks,
51`#[stream]` functions and `stream_block!` macros.
52
53## `#[stream]`
54
55Creates streams via coroutines.
56
57This is a reimplement of [futures-await]'s `#[stream]` for futures 0.3 and
58is an experimental implementation of [the idea listed as the next step of
59async/await](https://rust-lang.github.io/rfcs/2394-async_await.html#generators-and-streams).
60
61```
62#![feature(coroutines)]
63
64use futures::stream::Stream;
65use futures_async_stream::stream;
66
67// Returns a stream of i32
68#[stream(item = i32)]
69async fn foo(stream: impl Stream<Item = String>) {
70 // `for_await` is built into `stream`. If you use `for_await` only in `stream`, there is no need to import `for_await`.
71 #[for_await]
72 for x in stream {
73 yield x.parse().unwrap();
74 }
75}
76```
77
78To early exit from a `#[stream]` function or block, use `return`.
79
80`#[stream]` on async fn must have an item type specified via
81`item = some::Path` and the values output from the stream must be yielded
82via the `yield` expression.
83
84`#[stream]` can also be used on async blocks:
85
86```
87#![feature(coroutines, proc_macro_hygiene, stmt_expr_attributes)]
88
89use futures::stream::Stream;
90use futures_async_stream::stream;
91
92fn foo() -> impl Stream<Item = i32> {
93 #[stream]
94 async move {
95 for i in 0..10 {
96 yield i;
97 }
98 }
99}
100```
101
102Note that `#[stream]` on async block does not require the `item` argument,
103but it may require additional type annotations.
104
105## Using async stream functions in traits
106
107You can use async stream functions in traits by passing `boxed` or
108`boxed_local` as an argument.
109
110```
111#![feature(coroutines)]
112
113use futures_async_stream::stream;
114
115trait Foo {
116 #[stream(boxed, item = u32)]
117 async fn method(&mut self);
118}
119
120struct Bar(u32);
121
122impl Foo for Bar {
123 #[stream(boxed, item = u32)]
124 async fn method(&mut self) {
125 while self.0 < u32::MAX {
126 self.0 += 1;
127 yield self.0;
128 }
129 }
130}
131```
132
133A async stream function that received a `boxed` argument is converted to a
134function that returns `Pin<Box<dyn Stream<Item = item> + Send + 'lifetime>>`.
135If you passed `boxed_local` instead of `boxed`, async stream function
136returns a non-thread-safe stream (`Pin<Box<dyn Stream<Item = item> + 'lifetime>>`).
137
138```
139#![feature(coroutines)]
140
141use std::pin::Pin;
142
143use futures::stream::Stream;
144use futures_async_stream::stream;
145
146// The trait itself can be defined without unstable features.
147trait Foo {
148 fn method(&mut self) -> Pin<Box<dyn Stream<Item = u32> + Send + '_>>;
149}
150
151struct Bar(u32);
152
153impl Foo for Bar {
154 #[stream(boxed, item = u32)]
155 async fn method(&mut self) {
156 while self.0 < u32::MAX {
157 self.0 += 1;
158 yield self.0;
159 }
160 }
161}
162```
163
164## `#[try_stream]`
165
166`?` operator can be used with the `#[try_stream]`. The `Item` of the
167returned stream is `Result` with `Ok` being the value yielded and `Err` the
168error type returned by `?` operator or `return Err(...)`.
169
170```
171#![feature(coroutines)]
172
173use futures::stream::Stream;
174use futures_async_stream::try_stream;
175
176#[try_stream(ok = i32, error = Box<dyn std::error::Error>)]
177async fn foo(stream: impl Stream<Item = String>) {
178 #[for_await]
179 for x in stream {
180 yield x.parse()?;
181 }
182}
183```
184
185`#[try_stream]` can be used wherever `#[stream]` can be used.
186
187To early exit from a `#[try_stream]` function or block, use `return Ok(())`.
188
189<!--
190## List of features that may be added in the future as an extension of this feature:
191
192- `async_sink` (https://github.com/rust-lang-nursery/futures-rs/pull/1548#issuecomment-486205382)
193- Support `.await` in macro (https://github.com/rust-lang-nursery/futures-rs/pull/1548#discussion_r285341883)
194- Parallel version of `for_await` (https://github.com/rustasync/runtime/pull/25)
195-->
196
197## How to write the equivalent code without this API?
198
199### `#[for_await]`
200
201You can write this by combining `while let` loop, `.await`, `pin!` macro,
202and `StreamExt::next()` method:
203
204```
205use std::pin::pin;
206
207use futures::stream::{Stream, StreamExt};
208
209async fn collect(stream: impl Stream<Item = i32>) -> Vec<i32> {
210 let mut vec = vec![];
211 let mut stream = pin!(stream);
212 while let Some(value) = stream.next().await {
213 vec.push(value);
214 }
215 vec
216}
217```
218
219### `#[stream]`
220
221You can write this by manually implementing the combinator:
222
223```
224use std::{
225 pin::Pin,
226 task::{ready, Context, Poll},
227};
228
229use futures::stream::Stream;
230use pin_project::pin_project;
231
232fn foo<S>(stream: S) -> impl Stream<Item = i32>
233where
234 S: Stream<Item = String>,
235{
236 Foo { stream }
237}
238
239#[pin_project]
240struct Foo<S> {
241 #[pin]
242 stream: S,
243}
244
245impl<S> Stream for Foo<S>
246where
247 S: Stream<Item = String>,
248{
249 type Item = i32;
250
251 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
252 if let Some(x) = ready!(self.project().stream.poll_next(cx)) {
253 Poll::Ready(Some(x.parse().unwrap()))
254 } else {
255 Poll::Ready(None)
256 }
257 }
258}
259```
260
261[futures-await]: https://github.com/alexcrichton/futures-await
262
263<!-- tidy:sync-markdown-to-rustdoc:end -->
264*/
265
266#![no_std]
267#![doc(test(
268 no_crate_inject,
269 attr(
270 deny(warnings, rust_2018_idioms, single_use_lifetimes),
271 allow(dead_code, unused_variables)
272 )
273))]
274#![warn(
275 // Lints that may help when writing public library.
276 missing_debug_implementations,
277 missing_docs,
278 clippy::alloc_instead_of_core,
279 clippy::exhaustive_enums,
280 clippy::exhaustive_structs,
281 clippy::impl_trait_in_params,
282 clippy::missing_inline_in_public_items,
283 clippy::std_instead_of_alloc,
284 clippy::std_instead_of_core,
285)]
286#![feature(coroutine_trait)]
287
288#[cfg(test)]
289extern crate std;
290
291#[cfg(test)]
292mod tests;
293
294#[cfg(test)]
295#[path = "gen/tests/assert_impl.rs"]
296mod assert_impl;
297#[cfg(test)]
298#[path = "gen/tests/track_size.rs"]
299mod track_size;
300
301#[doc(inline)]
302pub use futures_async_stream_macro::for_await;
303#[doc(inline)]
304pub use futures_async_stream_macro::stream;
305#[doc(inline)]
306pub use futures_async_stream_macro::stream_block;
307#[doc(inline)]
308pub use futures_async_stream_macro::try_stream;
309#[doc(inline)]
310pub use futures_async_stream_macro::try_stream_block;
311
312mod future {
313 use core::{
314 future::Future,
315 ops::{Coroutine, CoroutineState},
316 pin::Pin,
317 ptr::NonNull,
318 task::{Context, Poll},
319 };
320
321 use pin_project::pin_project;
322
323 // Based on https://github.com/rust-lang/rust/blob/1.66.0/library/core/src/future/mod.rs.
324 // TODO: can we avoid GenFuture? https://github.com/rust-lang/rust/commit/9f36f988ad873f5d34cd9c08e4903c597ffc9532
325
326 /// This type is needed because:
327 ///
328 /// a) Coroutines cannot implement `for<'a, 'b> Coroutine<&'a mut Context<'b>>`, so we need to pass
329 /// a raw pointer (see <https://github.com/rust-lang/rust/issues/68923>).
330 /// b) Raw pointers and `NonNull` aren't `Send` or `Sync`, so that would make every single future
331 /// non-Send/Sync as well, and we don't want that.
332 ///
333 /// It also simplifies the lowering of `.await`.
334 #[doc(hidden)]
335 #[derive(Debug, Clone, Copy)]
336 pub struct ResumeTy(pub(crate) NonNull<Context<'static>>);
337
338 // SAFETY: the caller of the `get_context` function that dereferences a
339 // pointer must guarantee that no data races will occur.
340 // Note: Since https://github.com/rust-lang/rust/pull/95985, `Context` is
341 // `!Send` and `!Sync`.
342 unsafe impl Send for ResumeTy {}
343 // SAFETY: see `Send` impl
344 unsafe impl Sync for ResumeTy {}
345
346 // Needed to work around old nightly bug (fixed in nightly-2024-05-24 by https://github.com/rust-lang/rust/pull/125392)
347 impl core::panic::UnwindSafe for ResumeTy {}
348 impl core::panic::RefUnwindSafe for ResumeTy {}
349
350 /// Wrap a coroutine in a future.
351 ///
352 /// This function returns a `GenFuture` underneath, but hides it in `impl Trait` to give
353 /// better error messages (`impl Future` rather than `GenFuture<[closure.....]>`).
354 #[doc(hidden)]
355 #[inline]
356 pub fn from_coroutine<G>(g: G) -> impl Future<Output = G::Return>
357 where
358 G: Coroutine<ResumeTy, Yield = ()>,
359 {
360 GenFuture(g)
361 }
362
363 #[pin_project]
364 pub(crate) struct GenFuture<G>(#[pin] G);
365
366 impl<G> Future for GenFuture<G>
367 where
368 G: Coroutine<ResumeTy, Yield = ()>,
369 {
370 type Output = G::Return;
371
372 #[inline]
373 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
374 let this = self.project();
375 // Resume the coroutine, turning the `&mut Context` into a `NonNull` raw pointer. The
376 // `.await` lowering will safely cast that back to a `&mut Context`.
377 match this.0.resume(ResumeTy(NonNull::from(cx).cast::<Context<'static>>())) {
378 CoroutineState::Yielded(()) => Poll::Pending,
379 CoroutineState::Complete(x) => Poll::Ready(x),
380 }
381 }
382 }
383
384 #[doc(hidden)]
385 #[inline]
386 #[must_use]
387 pub unsafe fn get_context<'a, 'b>(cx: ResumeTy) -> &'a mut Context<'b> {
388 // SAFETY: the caller must guarantee that `cx.0` is a valid pointer
389 // that fulfills all the requirements for a mutable reference.
390 unsafe { &mut *cx.0.as_ptr().cast::<Context<'b>>() }
391 }
392}
393
394mod stream {
395 use core::{
396 future::Future,
397 ops::{Coroutine, CoroutineState},
398 pin::Pin,
399 ptr::NonNull,
400 task::{Context, Poll},
401 };
402
403 use futures_core::stream::Stream;
404 use pin_project::pin_project;
405
406 use crate::future::ResumeTy;
407
408 /// Wrap a coroutine in a stream.
409 ///
410 /// This function returns a `GenStream` underneath, but hides it in `impl Trait` to give
411 /// better error messages (`impl Stream` rather than `GenStream<[closure.....]>`).
412 #[doc(hidden)]
413 #[inline]
414 pub fn from_coroutine<G, T>(g: G) -> impl Stream<Item = T>
415 where
416 G: Coroutine<ResumeTy, Yield = Poll<T>, Return = ()>,
417 {
418 GenStream(g)
419 }
420
421 #[pin_project]
422 pub(crate) struct GenStream<G>(#[pin] G);
423
424 impl<G, T> Stream for GenStream<G>
425 where
426 G: Coroutine<ResumeTy, Yield = Poll<T>, Return = ()>,
427 {
428 type Item = T;
429
430 #[inline]
431 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
432 let this = self.project();
433 match this.0.resume(ResumeTy(NonNull::from(cx).cast::<Context<'static>>())) {
434 CoroutineState::Yielded(x) => x.map(Some),
435 CoroutineState::Complete(()) => Poll::Ready(None),
436 }
437 }
438 }
439
440 // This is equivalent to the `futures::stream::StreamExt::next` method.
441 // But we want to make this crate dependency as small as possible, so we define our `next` function.
442 #[doc(hidden)]
443 #[inline]
444 pub fn next<S>(stream: &mut S) -> impl Future<Output = Option<S::Item>> + '_
445 where
446 S: Stream + Unpin,
447 {
448 Next(stream)
449 }
450
451 pub(crate) struct Next<'a, S>(&'a mut S);
452
453 impl<S> Future for Next<'_, S>
454 where
455 S: Stream + Unpin,
456 {
457 type Output = Option<S::Item>;
458
459 #[inline]
460 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
461 Pin::new(&mut self.0).poll_next(cx)
462 }
463 }
464}
465
466mod try_stream {
467 use core::{
468 ops::{Coroutine, CoroutineState},
469 pin::Pin,
470 ptr::NonNull,
471 task::{Context, Poll},
472 };
473
474 use futures_core::stream::{FusedStream, Stream};
475 use pin_project::pin_project;
476
477 use crate::future::ResumeTy;
478
479 /// Wrap a coroutine in a stream.
480 ///
481 /// This function returns a `GenStream` underneath, but hides it in `impl Trait` to give
482 /// better error messages (`impl Stream` rather than `GenStream<[closure.....]>`).
483 #[doc(hidden)]
484 #[inline]
485 pub fn from_coroutine<G, T, E>(g: G) -> impl FusedStream<Item = Result<T, E>>
486 where
487 G: Coroutine<ResumeTy, Yield = Poll<T>, Return = Result<(), E>>,
488 {
489 GenTryStream(Some(g))
490 }
491
492 #[pin_project]
493 pub(crate) struct GenTryStream<G>(#[pin] Option<G>);
494
495 impl<G, T, E> Stream for GenTryStream<G>
496 where
497 G: Coroutine<ResumeTy, Yield = Poll<T>, Return = Result<(), E>>,
498 {
499 type Item = Result<T, E>;
500
501 #[inline]
502 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
503 let mut this = self.project();
504 if let Some(g) = this.0.as_mut().as_pin_mut() {
505 let res = match g.resume(ResumeTy(NonNull::from(cx).cast::<Context<'static>>())) {
506 CoroutineState::Yielded(x) => x.map(|x| Some(Ok(x))),
507 CoroutineState::Complete(Err(e)) => Poll::Ready(Some(Err(e))),
508 CoroutineState::Complete(Ok(())) => Poll::Ready(None),
509 };
510 if let Poll::Ready(Some(Err(_)) | None) = &res {
511 this.0.set(None);
512 }
513 res
514 } else {
515 Poll::Ready(None)
516 }
517 }
518 }
519
520 impl<G, T, E> FusedStream for GenTryStream<G>
521 where
522 G: Coroutine<ResumeTy, Yield = Poll<T>, Return = Result<(), E>>,
523 {
524 #[inline]
525 fn is_terminated(&self) -> bool {
526 self.0.is_none()
527 }
528 }
529}
530
531// Not public API.
532#[doc(hidden)]
533pub mod __private {
534 #[doc(hidden)]
535 pub use core::{
536 marker::Send,
537 option::Option::{None, Some},
538 pin::Pin,
539 result::Result::{self, Ok},
540 task::Poll,
541 };
542
543 #[doc(hidden)]
544 pub mod future {
545 #[doc(hidden)]
546 pub use core::future::Future;
547
548 #[doc(hidden)]
549 pub use crate::future::{ResumeTy, from_coroutine, get_context};
550 }
551
552 #[doc(hidden)]
553 pub mod stream {
554 #[doc(hidden)]
555 pub use futures_core::stream::Stream;
556
557 #[doc(hidden)]
558 pub use crate::stream::{from_coroutine, next};
559 }
560
561 #[doc(hidden)]
562 pub mod try_stream {
563 #[doc(hidden)]
564 pub use crate::try_stream::from_coroutine;
565 }
566}