Expand description
A library for creating shareable streams that can be cloned and consumed by multiple tasks.
SharedStream
wraps any Stream
to make it cloneable. All clones share the same underlying
stream state, so clones created at the same time will see the same items, while clones created
after partial consumption will only see the remaining items.
§Examples
use stream_shared::SharedStream;
use futures_util::stream;
use futures_util::StreamExt;
let data = vec![1, 2, 3, 4, 5];
let stream = stream::iter(data.clone());
let shared_stream = SharedStream::new(stream);
// Clone the stream for multiple consumers
let consumer1 = shared_stream.clone();
let consumer2 = shared_stream.clone();
// Both consumers will receive all items
let result1: Vec<i32> = consumer1.collect().await;
let result2: Vec<i32> = consumer2.collect().await;
assert_eq!(result1, data);
assert_eq!(result2, data);
§Requirements
The underlying Stream
type must be Unpin
and the stream’s items must implement Clone
.
With a !Unpin
stream, you’ll first have to pin the stream. This can
be done by boxing the stream using Box::pin
or pinning it to the stack using the pin_mut!
macro from the pin_utils
crate.
§Behavior
When you clone a SharedStream
, the clone will start from the current position
of the stream being cloned, not from the beginning of the original data. Each
SharedStream
maintains its own independent position. This means:
- Clones created from the same stream at the same time will see the same items
- Clones created after consumption will only see items remaining from that stream’s position
- Each clone can be consumed independently and can itself be cloned from its current position
For example, with a stream containing 20 items:
use stream_shared::SharedStream;
use futures_util::stream;
let data = (1..=20).collect::<Vec<i32>>();
let stream_with_20_items = stream::iter(data);
let original = SharedStream::new(stream_with_20_items);
// ... consume 10 items from original ...
let clone1 = original.clone(); // clone1 will have 10 remaining items
// ... consume 2 items from clone1 ...
let clone2 = clone1.clone(); // clone2 will have 8 remaining items
// Each stream maintains its own position independently
let clone3 = original.clone(); // clone3 will have 10 remaining items
§Thread Safety
SharedStream
is both Send
and Sync
when the underlying stream and its items
are Send
and Sync
. This means cloned streams can be safely moved across threads
and shared between tasks running on different threads.
use stream_shared::SharedStream;
use futures_util::stream;
use futures_util::StreamExt;
use std::sync::Arc;
use tokio::task;
let data = vec![1, 2, 3, 4, 5];
let stream = stream::iter(data.clone());
let shared_stream = SharedStream::new(stream);
// Clone and move to different threads
let stream1 = shared_stream.clone();
let stream2 = shared_stream.clone();
let handle1 = task::spawn(async move {
stream1.collect::<Vec<i32>>().await
});
let handle2 = task::spawn(async move {
stream2.collect::<Vec<i32>>().await
});
let (result1, result2) = tokio::join!(handle1, handle2);
assert_eq!(result1.unwrap(), data);
assert_eq!(result2.unwrap(), data);
§Performance Considerations
SharedStream
introduces some overhead compared to consuming a stream directly:
- Memory overhead: Each item must be cloned for every active consumer
- Synchronization cost: Uses
Shared<Future>
internally, which has coordination overhead - Item lifetime: Items are kept in memory until all clones have consumed them
For best performance:
- Minimize the number of concurrent clones when possible
- Prefer small, cheap-to-clone items (consider
Arc<T>
for large data)
Structs§
- Shared
Stream - A cloneable stream wrapper that allows multiple consumers to share the same stream.
Traits§
- Shared
Stream Ext - Extension trait for
Stream
that provides theinto_shared
method.