transform_stream/lib.rs
1//! Lightweight async stream wrapper.
2//!
3//! Inspired by <https://github.com/tokio-rs/async-stream>
4//!
5//! # Usage
6//! ```
7//! use transform_stream::{try_stream, AsyncTryStream};
8//! use futures_util::{pin_mut, StreamExt};
9//! use std::io;
10//!
11//! let stream: AsyncTryStream<Vec<u8>, io::Error, _> = try_stream!{
12//! yield_!(vec![b'1', b'2']);
13//! yield_!(vec![b'3', b'4']);
14//! Ok(())
15//! };
16//!
17//! futures_executor::block_on(async {
18//! pin_mut!(stream);
19//! assert_eq!(stream.next().await.unwrap().unwrap(), vec![b'1', b'2']);
20//! assert_eq!(stream.next().await.unwrap().unwrap(), vec![b'3', b'4']);
21//! assert!(stream.next().await.is_none());
22//! });
23//! ```
24
25#![deny(
26 missing_debug_implementations,
27 missing_docs,
28 clippy::all,
29 clippy::cargo
30)]
31
32mod scope;
33mod stream_impl;
34mod try_stream_impl;
35mod yielder;
36
37pub use self::stream_impl::AsyncStream;
38pub use self::try_stream_impl::AsyncTryStream;
39pub use self::yielder::Yielder;
40
41pub(crate) fn next_id() -> u64 {
42 use std::sync::atomic::{AtomicU64, Ordering};
43 static ID: AtomicU64 = AtomicU64::new(1);
44 ID.fetch_add(1, Ordering::Relaxed)
45}
46
47/// Create a new stream
48#[macro_export]
49macro_rules! stream {
50 {$($block:tt)*} => {
51 $crate::AsyncStream::new(|mut __y| async move{
52 #[allow(unused_macros)]
53 macro_rules! yield_ {
54 ($v:expr) => {
55 __y.yield_($v).await
56 };
57 }
58
59 $($block)*
60 })
61 }
62}
63
64/// Create a new try stream
65#[macro_export]
66macro_rules! try_stream{
67 {$($block:tt)*} => {
68 $crate::AsyncTryStream::new(|mut __y| async move{
69 macro_rules! yield_ {
70 ($v:expr) => {
71 __y.yield_ok($v).await
72 };
73 }
74
75 $($block)*
76 })
77 }
78}