scoped_stream_sink/
lib.rs

1#![no_std]
2#![allow(clippy::type_complexity)]
3
4//! Make asynchronous [`Stream`] and [`Sink`] easy.
5//!
6//! This crate contains [`ScopedStream`] and [`ScopedSink`] type.
7//! They use normal Rust lifetime mechanism to ensure safety
8//! (eg. sending interior data outside of it's scope).
9//! Unlike [`async_stream`](https://docs.rs/async-stream/latest/async_stream/),
10//! it doesn't use macro.
11//!
12//! ## 📌 Plan for 2.0
13//!
14//! Since AFIT (and RPITIT) is stabilized, i plan to upgrade this library's interface to use them.
15//! This _should_ eliminate the [`Box::pin`](alloc::boxed::Box::pin) requirement, at the cost of complicated type bounds
16//! (and harder to use too, maybe).
17//! So far i've been unsuccessful to fully reason the type bounds.
18//!
19//! So here are the (rough) plan for (possible) 2.0:
20//! - Eliminate [`Box::pin`](alloc::boxed::Box::pin) requirement (maybe add type alias for dynamic version).
21//! - Beef up [`StreamSink`] functionality (right now it's kinda experimental).
22//!
23//! ## `no-std` Support
24//!
25//! Currently, this crate requires `alloc` (because of [`Box`](alloc::boxed::Box) and such).
26//! But it's perfectly usable on platforms like WASM.
27//! Do note that the default `std` feature requires stdlib and so is incompatible with `no-std`.
28//!
29//! # Examples
30//!
31//! Using [`ScopedStream`]:
32//! ```
33//! use std::time::Duration;
34//!
35//! use futures_util::{SinkExt, StreamExt};
36//!
37//! use scoped_stream_sink::*;
38//!
39//! #[tokio::main]
40//! async fn main() {
41//!     // Create new scoped stream
42//!     let mut stream = ScopedStream::new(|mut sink| Box::pin(async move {
43//!         // We have to Box::pin it because otherwise the trait bounds is too complex
44//!         // Interior sink cannot outlast the lifetime of it's outer stream
45//!
46//!         // This will not work
47//!         // tokio::spawn(async move { sink.send(10000).await.unwrap() }).await.unwrap();
48//!
49//!         // Assume this is a complex task
50//!         let (mut a, mut b) = (1usize, 1);
51//!         for _ in 0..10 {
52//!             sink.send(a).await.unwrap();
53//!             (a, b) = (b, a + b);
54//!             tokio::time::sleep(Duration::from_millis(100)).await;
55//!         }
56//!     }));
57//!
58//!     let mut v = Vec::new();
59//!     while let Some(i) = stream.next().await {
60//!         v.push(i);
61//!     }
62//!     println!("{v:?}");
63//! }
64//! ```
65//!
66//! Using [`ScopedSink`]:
67//! ```
68//! use std::time::Duration;
69//!
70//! use anyhow::Error;
71//! use futures_util::{SinkExt, StreamExt};
72//!
73//! use scoped_stream_sink::*;
74//!
75//! #[tokio::main]
76//! async fn main() -> Result<(), Error> {
77//!     // Create new sink
78//!     let mut sink = <ScopedSink<usize, Error>>::new(|mut stream| Box::pin(async move {
79//!         // Unlike ScopedStream, this closure will be called over and over again,
80//!         // until all values are consumed
81//!
82//!         // Assume this is a complex task
83//!         tokio::time::sleep(Duration::from_millis(100)).await;
84//!         if let Some(v) = stream.next().await {
85//!             println!("Value: {v}");
86//!         }
87//!
88//!         Ok(())
89//!     }));
90//!
91//!     for i in 0..10 {
92//!         sink.send(i).await?;
93//!     }
94//!     sink.close().await?;
95//!
96//!     Ok(())
97//! }
98//! ```
99//!
100//! These following examples will fail to compile:
101//! ```compile_fail
102//! # use anyhow::Error;
103//! # use futures_util::{SinkExt, StreamExt};
104//! # use scoped_stream_sink::*;
105//! let sink = <ScopedSink<usize, Error>>::new(|mut stream| Box::pin(async move {
106//!     // Moving inner stream into another thread will fail
107//!     // because it might live for longer than the sink.
108//!     tokio::spawn(async move {
109//!         if let Some(v) = stream.next().await {
110//!             println!("Value: {v}");
111//!         }
112//!     }).await?;
113//!
114//!     Ok(())
115//! }));
116//! ```
117//!
118//! ```compile_fail
119//! # use anyhow::Error;
120//! # use futures_util::{SinkExt, StreamExt};
121//! # use scoped_stream_sink::*;
122//! let stream = <ScopedTryStream<usize, Error>>::new(|mut sink| Box::pin(async move {
123//!     // Moving inner sink into another thread will fail
124//!     // because it might live for longer than the stream.
125//!     tokio::spawn(async move {
126//!         sink.send(1).await.unwrap();
127//!     }).await?;
128//!
129//!     Ok(())
130//! }));
131//! ```
132//!
133//! Some very hacky generator out of [`ScopedStream`]:
134//! ```
135//! use core::pin::pin;
136//! use core::ptr::NonNull;
137//! use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
138//!
139//! use futures_util::{SinkExt, StreamExt};
140//! use scoped_stream_sink::*;
141//!
142//! /// Create a null waker. It does nothing when waken.
143//! fn nil_waker() -> Waker {
144//!     fn raw() -> RawWaker {
145//!         RawWaker::new(NonNull::dangling().as_ptr(), &VTABLE)
146//!     }
147//!
148//!     unsafe fn clone(_: *const ()) -> RawWaker {
149//!         raw()
150//!     }
151//!     unsafe fn wake(_: *const ()) {}
152//!     unsafe fn wake_by_ref(_: *const ()) {}
153//!     unsafe fn drop(_: *const ()) {}
154//!
155//!     static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop);
156//!
157//!     unsafe { Waker::from_raw(raw()) }
158//! }
159//!
160//! // Create a generator
161//! let mut stream = ScopedStream::new(|mut sink| Box::pin(async move {
162//!     for i in 0usize..10 {
163//!         sink.send(i).await.unwrap();
164//!     }
165//! }));
166//! let mut stream = pin!(stream);
167//!
168//! // Setup waker and context
169//! let waker = nil_waker();
170//! let mut cx = Context::from_waker(&waker);
171//!
172//! // The loop
173//! loop {
174//!     let v = match stream.as_mut().poll_next(&mut cx) {
175//!         Poll::Pending => continue, // Should not happen, but continue anyways
176//!         Poll::Ready(None) => break, // Stop iteration
177//!         Poll::Ready(Some(v)) => v, // Process value
178//!     };
179//!
180//!     println!("{v}");
181//! }
182//! ```
183
184extern crate alloc;
185#[cfg(any(feature = "std", test))]
186#[macro_use]
187extern crate std;
188
189#[cfg(feature = "std")]
190mod local_thread;
191mod scoped_sink;
192mod scoped_stream;
193mod scoped_stream_sink;
194mod stream_sink;
195mod stream_sink_ext;
196
197pub use crate::scoped_sink::*;
198pub use crate::scoped_stream::*;
199pub use crate::scoped_stream_sink::*;
200pub use crate::stream_sink::*;
201pub use crate::stream_sink_ext::*;
202
203#[doc(no_inline)]
204pub use futures_core::Stream;
205#[doc(no_inline)]
206pub use futures_sink::Sink;
207
208#[cfg(feature = "std")]
209pub(crate) use local_thread::*;
210
211/* Let's comment out this until ready
212pub(crate) mod sealed {
213    pub(crate) trait Sealed {}
214}
215*/
216
217pub mod prelude {
218    pub use crate::{
219        LocalScopedSink, LocalScopedStream, LocalScopedStreamSink, LocalScopedTryStream,
220        StreamSink as _, StreamSinkExt as _,
221    };
222
223    #[doc(no_inline)]
224    pub use futures_core::Stream as _;
225    #[doc(no_inline)]
226    pub use futures_sink::Sink as _;
227
228    #[cfg(feature = "std")]
229    pub use crate::{ScopedSink, ScopedStream, ScopedStreamSink, ScopedTryStream};
230    #[doc(no_inline)]
231    #[cfg(feature = "std")]
232    pub use futures_util::{SinkExt as _, StreamExt as _};
233}