transform_stream/
lib.rs

1//! Lightweight async stream wrapper.
2//!
3//! Inspired by <https://github.com/tokio-rs/async-stream>
4//!
5//! # Usage
6//! ```
7//! use transform_stream::{try_stream, AsyncTryStream};
8//! use futures_util::{pin_mut, StreamExt};
9//! use std::io;
10//!
11//! let stream: AsyncTryStream<Vec<u8>, io::Error, _> = try_stream!{
12//!     yield_!(vec![b'1', b'2']);
13//!     yield_!(vec![b'3', b'4']);
14//!     Ok(())
15//! };
16//!
17//! futures_executor::block_on(async {
18//!     pin_mut!(stream);
19//!     assert_eq!(stream.next().await.unwrap().unwrap(), vec![b'1', b'2']);
20//!     assert_eq!(stream.next().await.unwrap().unwrap(), vec![b'3', b'4']);
21//!     assert!(stream.next().await.is_none());
22//! });
23//! ```
24
25#![deny(
26    missing_debug_implementations,
27    missing_docs,
28    clippy::all,
29    clippy::cargo
30)]
31
32mod scope;
33mod stream_impl;
34mod try_stream_impl;
35mod yielder;
36
37pub use self::stream_impl::AsyncStream;
38pub use self::try_stream_impl::AsyncTryStream;
39pub use self::yielder::Yielder;
40
41pub(crate) fn next_id() -> u64 {
42    use std::sync::atomic::{AtomicU64, Ordering};
43    static ID: AtomicU64 = AtomicU64::new(1);
44    ID.fetch_add(1, Ordering::Relaxed)
45}
46
47/// Create a new stream
48#[macro_export]
49macro_rules! stream {
50    {$($block:tt)*} => {
51        $crate::AsyncStream::new(|mut __y| async move{
52            #[allow(unused_macros)]
53            macro_rules! yield_ {
54                ($v:expr) => {
55                    __y.yield_($v).await
56                };
57            }
58
59            $($block)*
60        })
61    }
62}
63
64/// Create a new try stream
65#[macro_export]
66macro_rules! try_stream{
67    {$($block:tt)*} => {
68        $crate::AsyncTryStream::new(|mut __y| async move{
69            macro_rules! yield_ {
70                ($v:expr) => {
71                    __y.yield_ok($v).await
72                };
73            }
74
75            $($block)*
76        })
77    }
78}