Expand description
Make asynchronous Stream and Sink easy.
This crate contains ScopedStream and ScopedSink type.
They use normal Rust lifetime mechanism to ensure safety
(eg. sending interior data outside of it’s scope).
Unlike async_stream,
it doesn’t use macro.
§📌 Plan for 2.0
Since AFIT (and RPITIT) is stabilized, i plan to upgrade this library’s interface to use them.
This should eliminate the Box::pin requirement, at the cost of complicated type bounds
(and harder to use too, maybe).
So far i’ve been unsuccessful to fully reason the type bounds.
So here are the (rough) plan for (possible) 2.0:
- Eliminate
Box::pinrequirement (maybe add type alias for dynamic version). - Beef up
StreamSinkfunctionality (right now it’s kinda experimental).
§no-std Support
Currently, this crate requires alloc (because of Box and such).
But it’s perfectly usable on platforms like WASM.
Do note that the default std feature requires stdlib and so is incompatible with no-std.
§Examples
Using ScopedStream:
use std::time::Duration;
use futures_util::{SinkExt, StreamExt};
use scoped_stream_sink::*;
#[tokio::main]
async fn main() {
// Create new scoped stream
let mut stream = ScopedStream::new(|mut sink| Box::pin(async move {
// We have to Box::pin it because otherwise the trait bounds is too complex
// Interior sink cannot outlast the lifetime of it's outer stream
// This will not work
// tokio::spawn(async move { sink.send(10000).await.unwrap() }).await.unwrap();
// Assume this is a complex task
let (mut a, mut b) = (1usize, 1);
for _ in 0..10 {
sink.send(a).await.unwrap();
(a, b) = (b, a + b);
tokio::time::sleep(Duration::from_millis(100)).await;
}
}));
let mut v = Vec::new();
while let Some(i) = stream.next().await {
v.push(i);
}
println!("{v:?}");
}Using ScopedSink:
use std::time::Duration;
use anyhow::Error;
use futures_util::{SinkExt, StreamExt};
use scoped_stream_sink::*;
#[tokio::main]
async fn main() -> Result<(), Error> {
// Create new sink
let mut sink = <ScopedSink<usize, Error>>::new(|mut stream| Box::pin(async move {
// Unlike ScopedStream, this closure will be called over and over again,
// until all values are consumed
// Assume this is a complex task
tokio::time::sleep(Duration::from_millis(100)).await;
if let Some(v) = stream.next().await {
println!("Value: {v}");
}
Ok(())
}));
for i in 0..10 {
sink.send(i).await?;
}
sink.close().await?;
Ok(())
}These following examples will fail to compile:
let sink = <ScopedSink<usize, Error>>::new(|mut stream| Box::pin(async move {
// Moving inner stream into another thread will fail
// because it might live for longer than the sink.
tokio::spawn(async move {
if let Some(v) = stream.next().await {
println!("Value: {v}");
}
}).await?;
Ok(())
}));let stream = <ScopedTryStream<usize, Error>>::new(|mut sink| Box::pin(async move {
// Moving inner sink into another thread will fail
// because it might live for longer than the stream.
tokio::spawn(async move {
sink.send(1).await.unwrap();
}).await?;
Ok(())
}));Some very hacky generator out of ScopedStream:
use core::pin::pin;
use core::ptr::NonNull;
use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use futures_util::{SinkExt, StreamExt};
use scoped_stream_sink::*;
/// Create a null waker. It does nothing when waken.
fn nil_waker() -> Waker {
fn raw() -> RawWaker {
RawWaker::new(NonNull::dangling().as_ptr(), &VTABLE)
}
unsafe fn clone(_: *const ()) -> RawWaker {
raw()
}
unsafe fn wake(_: *const ()) {}
unsafe fn wake_by_ref(_: *const ()) {}
unsafe fn drop(_: *const ()) {}
static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop);
unsafe { Waker::from_raw(raw()) }
}
// Create a generator
let mut stream = ScopedStream::new(|mut sink| Box::pin(async move {
for i in 0usize..10 {
sink.send(i).await.unwrap();
}
}));
let mut stream = pin!(stream);
// Setup waker and context
let waker = nil_waker();
let mut cx = Context::from_waker(&waker);
// The loop
loop {
let v = match stream.as_mut().poll_next(&mut cx) {
Poll::Pending => continue, // Should not happen, but continue anyways
Poll::Ready(None) => break, // Stop iteration
Poll::Ready(Some(v)) => v, // Process value
};
println!("{v}");
}Re-exports§
Modules§
Structs§
- Chain
- Return type of
StreamSinkExt::chain(). - Close
- Return type of
StreamSinkExt::close(). - Error
Cast - Return type of
StreamSinkExt::error_cast(). - Local
Scoped Sink - Local sink with a scoped future.
- Local
Scoped Stream - Local stream with a scoped future.
- Local
Scoped Stream Sink - Locally scoped version of
StreamSink. Does not implementSend. - Local
Scoped TryStream - Local stream with a scoped future.
- Local
Sink Inner - Inner type for
LocalScopedSink. - Local
Sink Part Sinkhalf of innerLocalScopedStreamSink. Can receive both send type or aResulttype. Closing will complete when outerLocalScopedStreamSinkis closed and received all data.- Local
Stream Inner - Inner type of
LocalScopedStream. - Local
Stream Part Streamhalf of innerLocalScopedStreamSink. Produce receive type values. Can only be closed from it’s outerLocalScopedStreamSink.- Local
TryStream Inner - Inner type of
LocalScopedTryStream. - MapError
- Return type of
StreamSinkExt::map_error(). - MapRecv
- Return type of
StreamSinkExt::map_recv(). - MapSend
- Return type of
StreamSinkExt::map_send(). - Ready
- Return type of
StreamSinkExt::ready(). - Scoped
Sink - Sink with a scoped future.
- Scoped
Stream - Stream with a scoped future.
- Scoped
Stream Sink - Scoped version of
StreamSink. Makes buildingStreamSinkmuch easier to do. - Scoped
TryStream - Fallible stream with a scoped future.
- Send
Iter - Return type of
StreamSinkExt::send_iter(). - SendOne
- Return type of
StreamSinkExt::send_one(). - Send
TryIter - Return type of
StreamSinkExt::send_try_iter(). - Sink
Inner - Inner type for
ScopedSink. - Sink
Part Sinkhalf of innerScopedStreamSink. Can receive both send type or aResulttype. Closing will complete when outerScopedStreamSinkis closed and received all data.- Stream
Inner - Inner type of
ScopedStream. - Stream
Part Streamhalf of innerScopedStreamSink. Produce receive type values. Can only be closed from it’s outerScopedStreamSink.- Stream
Sink Fallible Wrapper - Wraps a type that implements both
StreamandSinkwhere theStreamhalf returns aResult. Although such type is not very usable, both implementation is combined here intoStreamSink. - Stream
Sink Pair - Wraps a pair of
StreamandSink. ImplementsStreamSink. - Stream
Sink Wrapper - Wraps a type that implements both
StreamandSink. Although such type is not very usable, both implementation is combined here intoStreamSink. - TrySend
Future - Return type of
StreamSinkExt::try_send_future(). - TrySend
One - Return type of
StreamSinkExt::try_send_one(). - TryStream
Inner - Inner type of
ScopedTryStream.
Enums§
- State
- State enum for
StreamSink.
Traits§
- Stream
Sink - Combines
StreamandSinkinto one trait. - Stream
Sink Ext - Extension trait for
StreamSink. Contains helper methods for usingStreamSink.
Type Aliases§
- DynLocal
Sink Fn - Erased type for the local scope function.
- DynLocal
Sink Future - Erased type for the locally scoped future.
- DynSink
Fn - Erased type for the scope function.
- DynSink
Future - Erased type for the scoped future.