1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
//! Hub patterns: dynamic many-to-many fan-out / fan-in.
//!
//! `BroadcastHub` and `MergeHub`. Hubs let consumers attach to a live source
//! (Broadcast) or producers attach to a live sink (Merge) at runtime, after
//! the graph has materialized.
//!
//! Built on `tokio::sync::broadcast` (BroadcastHub) and `tokio::sync::mpsc`
//! (MergeHub). The `BroadcastHub` buffer is bounded; slow subscribers see
//! lagged elements as silent gaps.
use futures::stream::{self, StreamExt};
use tokio::sync::{broadcast, mpsc};
use crate::source::Source;
// -- BroadcastHub --------------------------------------------------
/// Fan one source to many dynamic consumers.
pub struct BroadcastHub<T: Clone + Send + 'static> {
sender: broadcast::Sender<T>,
}
impl<T: Clone + Send + 'static> BroadcastHub<T> {
pub fn new(buffer_size: usize) -> Self {
assert!(buffer_size >= 1, "buffer_size must be >= 1");
let (sender, _rx) = broadcast::channel(buffer_size);
Self { sender }
}
/// Attach a producer source. Spawns a task that pumps each
/// element into the broadcast channel; returns immediately.
pub fn attach(&self, source: Source<T>) {
let tx = self.sender.clone();
tokio::spawn(async move {
let mut s = source.into_boxed();
while let Some(item) = s.next().await {
let _ = tx.send(item); // ok if no active receivers
}
});
}
/// Return a new consumer source. Yields elements broadcast after
/// this call (late subscribers miss earlier elements). Slow
/// subscribers silently skip lagged elements.
pub fn consumer(&self) -> Source<T> {
let rx = self.sender.subscribe();
let stream = stream::unfold(rx, |mut rx| async move {
loop {
match rx.recv().await {
Ok(item) => return Some((item, rx)),
Err(broadcast::error::RecvError::Lagged(_)) => continue,
Err(broadcast::error::RecvError::Closed) => return None,
}
}
});
Source { inner: stream.boxed() }
}
/// Number of currently-attached consumers.
pub fn consumer_count(&self) -> usize {
self.sender.receiver_count()
}
}
// -- MergeHub ------------------------------------------------------
/// Fan many dynamic producers into one consumer source.
pub struct MergeHub<T: Send + 'static> {
sender: mpsc::UnboundedSender<T>,
/// Held until [`MergeHub::source`] is called; then moved out.
receiver: parking_lot::Mutex<Option<mpsc::UnboundedReceiver<T>>>,
}
impl<T: Send + 'static> Default for MergeHub<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: Send + 'static> MergeHub<T> {
pub fn new() -> Self {
let (tx, rx) = mpsc::unbounded_channel();
Self { sender: tx, receiver: parking_lot::Mutex::new(Some(rx)) }
}
/// Attach a producer source — pumped into the merged stream.
pub fn attach(&self, source: Source<T>) {
let tx = self.sender.clone();
tokio::spawn(async move {
let mut s = source.into_boxed();
while let Some(item) = s.next().await {
if tx.send(item).is_err() {
return;
}
}
});
}
/// Take the merged consumer source. Calling more than once yields
/// an empty source (the receiver only exists once).
pub fn source(&self) -> Source<T> {
match self.receiver.lock().take() {
Some(rx) => Source::from_receiver(rx),
None => Source::empty(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sink::Sink;
use std::time::Duration;
#[tokio::test]
async fn broadcast_hub_fans_to_two_consumers() {
let hub = BroadcastHub::<i32>::new(16);
let c1 = hub.consumer();
let c2 = hub.consumer();
// Attach AFTER subscribers so they don't miss elements.
hub.attach(Source::from_iter(vec![1, 2, 3]));
// Drop the hub so its retained sender is released — otherwise
// consumers never observe `Closed` and would hang forever.
drop(hub);
// Both consumers see the same elements.
let (a, b) = tokio::join!(Sink::collect(c1), Sink::collect(c2));
assert_eq!(a, vec![1, 2, 3]);
assert_eq!(b, vec![1, 2, 3]);
}
#[tokio::test]
async fn broadcast_hub_late_consumer_misses_earlier_elements() {
let hub = BroadcastHub::<i32>::new(16);
// Pre-subscribe so the broadcast channel doesn't drop messages
// before we measure the late subscriber.
let c_pre = hub.consumer();
hub.attach(Source::from_iter(vec![1, 2, 3]));
// The hub keeps a sender alive, so `Sink::collect` would never
// observe `Closed` — bound it with a timeout and check that we
// received all three items.
let pre = tokio::time::timeout(Duration::from_millis(200), async move {
let mut got = Vec::new();
let mut s = c_pre.into_boxed();
while got.len() < 3 {
match s.next().await {
Some(v) => got.push(v),
None => break,
}
}
got
})
.await
.unwrap_or_default();
assert_eq!(pre, vec![1, 2, 3]);
// Late consumer attaches after the source is exhausted → sees
// nothing within the deadline.
let c_late = hub.consumer();
let late =
tokio::time::timeout(Duration::from_millis(50), Sink::collect(c_late)).await.unwrap_or_default();
assert!(late.is_empty());
}
#[tokio::test]
async fn broadcast_hub_consumer_count_grows_with_subscribers() {
let hub = BroadcastHub::<i32>::new(4);
assert_eq!(hub.consumer_count(), 0);
let _c1 = hub.consumer();
let _c2 = hub.consumer();
assert_eq!(hub.consumer_count(), 2);
}
#[tokio::test]
async fn merge_hub_aggregates_multiple_producers() {
let hub = MergeHub::<i32>::new();
hub.attach(Source::from_iter(vec![1, 2, 3]));
hub.attach(Source::from_iter(vec![10, 20, 30]));
let merged = hub.source();
// Drop the hub so the merged channel closes once attach tasks
// finish — without this, `Sink::collect` waits forever.
drop(hub);
let mut got = Sink::collect(merged).await;
got.sort();
assert_eq!(got, vec![1, 2, 3, 10, 20, 30]);
}
#[tokio::test]
async fn merge_hub_source_can_be_taken_only_once() {
let hub = MergeHub::<i32>::new();
hub.attach(Source::from_iter(vec![1]));
let _ = hub.source();
let s2 = hub.source();
let v = tokio::time::timeout(Duration::from_millis(50), Sink::collect(s2)).await.unwrap_or_default();
assert!(v.is_empty());
}
}