1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
//! A library for creating shareable streams that can be cloned and consumed by multiple tasks.
//!
//! [`SharedStream`] wraps any [`Stream`] to make it cloneable. All clones share the same underlying
//! stream state, so clones created at the same time will see the same items, while clones created
//! after partial consumption will only see the remaining items.
//!
//! # Examples
//!
//! ```
//! use stream_shared::SharedStream;
//! use futures_util::stream;
//! use futures_util::StreamExt;
//!
//! # tokio_test::block_on(async {
//! let data = vec![1, 2, 3, 4, 5];
//! let stream = stream::iter(data.clone());
//! let shared_stream = SharedStream::new(stream);
//!
//! // Clone the stream for multiple consumers
//! let consumer1 = shared_stream.clone();
//! let consumer2 = shared_stream.clone();
//!
//! // Both consumers will receive all items
//! let result1: Vec<i32> = consumer1.collect().await;
//! let result2: Vec<i32> = consumer2.collect().await;
//!
//! assert_eq!(result1, data);
//! assert_eq!(result2, data);
//! # });
//! ```
//!
//! # Requirements
//!
//! The underlying [`Stream`] type must be [`Unpin`] and the stream's items must implement [`Clone`].
//! With a [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
//! be done by boxing the stream using [`Box::pin`] or pinning it to the stack using the `pin_mut!`
//! macro from the `pin_utils` crate.
//!
//! # Behavior
//!
//! When you clone a [`SharedStream`], the clone will start from the current position
//! of the stream being cloned, not from the beginning of the original data. Each
//! `SharedStream` maintains its own independent position. This means:
//!
//! - Clones created from the same stream at the same time will see the same items
//! - Clones created after consumption will only see items remaining from that stream's position
//! - Each clone can be consumed independently and can itself be cloned from its current position
//!
//! For example, with a stream containing 20 items:
//! ```
//! use stream_shared::SharedStream;
//! use futures_util::stream;
//!
//! let data = (1..=20).collect::<Vec<i32>>();
//! let stream_with_20_items = stream::iter(data);
//! let original = SharedStream::new(stream_with_20_items);
//! // ... consume 10 items from original ...
//! let clone1 = original.clone(); // clone1 will have 10 remaining items
//!
//! // ... consume 2 items from clone1 ...
//! let clone2 = clone1.clone(); // clone2 will have 8 remaining items
//!
//! // Each stream maintains its own position independently
//! let clone3 = original.clone(); // clone3 will have 10 remaining items
//! ```
//!
//! # Thread Safety
//!
//! `SharedStream` is both [`Send`] and [`Sync`] when the underlying stream and its items
//! are `Send` and `Sync`. This means cloned streams can be safely moved across threads
//! and shared between tasks running on different threads.
//!
//! ```
//! use stream_shared::SharedStream;
//! use futures_util::stream;
//! use futures_util::StreamExt;
//! use std::sync::Arc;
//! use tokio::task;
//!
//! # tokio_test::block_on(async {
//! let data = vec![1, 2, 3, 4, 5];
//! let stream = stream::iter(data.clone());
//! let shared_stream = SharedStream::new(stream);
//!
//! // Clone and move to different threads
//! let stream1 = shared_stream.clone();
//! let stream2 = shared_stream.clone();
//!
//! let handle1 = task::spawn(async move {
//! stream1.collect::<Vec<i32>>().await
//! });
//!
//! let handle2 = task::spawn(async move {
//! stream2.collect::<Vec<i32>>().await
//! });
//!
//! let (result1, result2) = tokio::join!(handle1, handle2);
//! assert_eq!(result1.unwrap(), data);
//! assert_eq!(result2.unwrap(), data);
//! # });
//! ```
//!
//! # Performance Considerations
//!
//! `SharedStream` introduces some overhead compared to consuming a stream directly:
//!
//! - **Memory overhead**: Each item must be cloned for every active consumer (only when consumed)
//! - **Synchronization cost**: Uses `Shared<Future>` internally, which has coordination overhead
//! - **Item lifetime**: Items are kept in memory until all clones have consumed them
//!
//! For best performance:
//! - Prefer cheap-to-clone small items or `Arc<T>` for large data
//!
pub use SharedStreamExt;
pub use SharedStream;
use Stream;