scoped_stream_sink/lib.rs
1#![no_std]
2#![allow(clippy::type_complexity)]
3
4//! Make asynchronous [`Stream`] and [`Sink`] easy.
5//!
6//! This crate contains [`ScopedStream`] and [`ScopedSink`] type.
7//! They use normal Rust lifetime mechanism to ensure safety
8//! (eg. sending interior data outside of it's scope).
9//! Unlike [`async_stream`](https://docs.rs/async-stream/latest/async_stream/),
10//! it doesn't use macro.
11//!
12//! ## 📌 Plan for 2.0
13//!
14//! Since AFIT (and RPITIT) is stabilized, i plan to upgrade this library's interface to use them.
15//! This _should_ eliminate the [`Box::pin`](alloc::boxed::Box::pin) requirement, at the cost of complicated type bounds
16//! (and harder to use too, maybe).
17//! So far i've been unsuccessful to fully reason the type bounds.
18//!
19//! So here are the (rough) plan for (possible) 2.0:
20//! - Eliminate [`Box::pin`](alloc::boxed::Box::pin) requirement (maybe add type alias for dynamic version).
21//! - Beef up [`StreamSink`] functionality (right now it's kinda experimental).
22//!
23//! ## `no-std` Support
24//!
25//! Currently, this crate requires `alloc` (because of [`Box`](alloc::boxed::Box) and such).
26//! But it's perfectly usable on platforms like WASM.
27//! Do note that the default `std` feature requires stdlib and so is incompatible with `no-std`.
28//!
29//! # Examples
30//!
31//! Using [`ScopedStream`]:
32//! ```
33//! use std::time::Duration;
34//!
35//! use futures_util::{SinkExt, StreamExt};
36//!
37//! use scoped_stream_sink::*;
38//!
39//! #[tokio::main]
40//! async fn main() {
41//! // Create new scoped stream
42//! let mut stream = ScopedStream::new(|mut sink| Box::pin(async move {
43//! // We have to Box::pin it because otherwise the trait bounds is too complex
44//! // Interior sink cannot outlast the lifetime of it's outer stream
45//!
46//! // This will not work
47//! // tokio::spawn(async move { sink.send(10000).await.unwrap() }).await.unwrap();
48//!
49//! // Assume this is a complex task
50//! let (mut a, mut b) = (1usize, 1);
51//! for _ in 0..10 {
52//! sink.send(a).await.unwrap();
53//! (a, b) = (b, a + b);
54//! tokio::time::sleep(Duration::from_millis(100)).await;
55//! }
56//! }));
57//!
58//! let mut v = Vec::new();
59//! while let Some(i) = stream.next().await {
60//! v.push(i);
61//! }
62//! println!("{v:?}");
63//! }
64//! ```
65//!
66//! Using [`ScopedSink`]:
67//! ```
68//! use std::time::Duration;
69//!
70//! use anyhow::Error;
71//! use futures_util::{SinkExt, StreamExt};
72//!
73//! use scoped_stream_sink::*;
74//!
75//! #[tokio::main]
76//! async fn main() -> Result<(), Error> {
77//! // Create new sink
78//! let mut sink = <ScopedSink<usize, Error>>::new(|mut stream| Box::pin(async move {
79//! // Unlike ScopedStream, this closure will be called over and over again,
80//! // until all values are consumed
81//!
82//! // Assume this is a complex task
83//! tokio::time::sleep(Duration::from_millis(100)).await;
84//! if let Some(v) = stream.next().await {
85//! println!("Value: {v}");
86//! }
87//!
88//! Ok(())
89//! }));
90//!
91//! for i in 0..10 {
92//! sink.send(i).await?;
93//! }
94//! sink.close().await?;
95//!
96//! Ok(())
97//! }
98//! ```
99//!
100//! These following examples will fail to compile:
101//! ```compile_fail
102//! # use anyhow::Error;
103//! # use futures_util::{SinkExt, StreamExt};
104//! # use scoped_stream_sink::*;
105//! let sink = <ScopedSink<usize, Error>>::new(|mut stream| Box::pin(async move {
106//! // Moving inner stream into another thread will fail
107//! // because it might live for longer than the sink.
108//! tokio::spawn(async move {
109//! if let Some(v) = stream.next().await {
110//! println!("Value: {v}");
111//! }
112//! }).await?;
113//!
114//! Ok(())
115//! }));
116//! ```
117//!
118//! ```compile_fail
119//! # use anyhow::Error;
120//! # use futures_util::{SinkExt, StreamExt};
121//! # use scoped_stream_sink::*;
122//! let stream = <ScopedTryStream<usize, Error>>::new(|mut sink| Box::pin(async move {
123//! // Moving inner sink into another thread will fail
124//! // because it might live for longer than the stream.
125//! tokio::spawn(async move {
126//! sink.send(1).await.unwrap();
127//! }).await?;
128//!
129//! Ok(())
130//! }));
131//! ```
132//!
133//! Some very hacky generator out of [`ScopedStream`]:
134//! ```
135//! use core::pin::pin;
136//! use core::ptr::NonNull;
137//! use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
138//!
139//! use futures_util::{SinkExt, StreamExt};
140//! use scoped_stream_sink::*;
141//!
142//! /// Create a null waker. It does nothing when waken.
143//! fn nil_waker() -> Waker {
144//! fn raw() -> RawWaker {
145//! RawWaker::new(NonNull::dangling().as_ptr(), &VTABLE)
146//! }
147//!
148//! unsafe fn clone(_: *const ()) -> RawWaker {
149//! raw()
150//! }
151//! unsafe fn wake(_: *const ()) {}
152//! unsafe fn wake_by_ref(_: *const ()) {}
153//! unsafe fn drop(_: *const ()) {}
154//!
155//! static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop);
156//!
157//! unsafe { Waker::from_raw(raw()) }
158//! }
159//!
160//! // Create a generator
161//! let mut stream = ScopedStream::new(|mut sink| Box::pin(async move {
162//! for i in 0usize..10 {
163//! sink.send(i).await.unwrap();
164//! }
165//! }));
166//! let mut stream = pin!(stream);
167//!
168//! // Setup waker and context
169//! let waker = nil_waker();
170//! let mut cx = Context::from_waker(&waker);
171//!
172//! // The loop
173//! loop {
174//! let v = match stream.as_mut().poll_next(&mut cx) {
175//! Poll::Pending => continue, // Should not happen, but continue anyways
176//! Poll::Ready(None) => break, // Stop iteration
177//! Poll::Ready(Some(v)) => v, // Process value
178//! };
179//!
180//! println!("{v}");
181//! }
182//! ```
183
184extern crate alloc;
185#[cfg(any(feature = "std", test))]
186#[macro_use]
187extern crate std;
188
189#[cfg(feature = "std")]
190mod local_thread;
191mod scoped_sink;
192mod scoped_stream;
193mod scoped_stream_sink;
194mod stream_sink;
195mod stream_sink_ext;
196
197pub use crate::scoped_sink::*;
198pub use crate::scoped_stream::*;
199pub use crate::scoped_stream_sink::*;
200pub use crate::stream_sink::*;
201pub use crate::stream_sink_ext::*;
202
203#[doc(no_inline)]
204pub use futures_core::Stream;
205#[doc(no_inline)]
206pub use futures_sink::Sink;
207
208#[cfg(feature = "std")]
209pub(crate) use local_thread::*;
210
211/* Let's comment out this until ready
212pub(crate) mod sealed {
213 pub(crate) trait Sealed {}
214}
215*/
216
217pub mod prelude {
218 pub use crate::{
219 LocalScopedSink, LocalScopedStream, LocalScopedStreamSink, LocalScopedTryStream,
220 StreamSink as _, StreamSinkExt as _,
221 };
222
223 #[doc(no_inline)]
224 pub use futures_core::Stream as _;
225 #[doc(no_inline)]
226 pub use futures_sink::Sink as _;
227
228 #[cfg(feature = "std")]
229 pub use crate::{ScopedSink, ScopedStream, ScopedStreamSink, ScopedTryStream};
230 #[doc(no_inline)]
231 #[cfg(feature = "std")]
232 pub use futures_util::{SinkExt as _, StreamExt as _};
233}