use std::{
collections::{BTreeMap, VecDeque},
fmt,
sync::{
Arc, Condvar, Mutex, MutexGuard,
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
},
time::Duration,
};
use arc_swap::ArcSwap;
use smallvec::SmallVec;
use crate::stream::{BoxStream, NotUsed, Sink, Source, SourceRuntimeHints, StreamCompletion};
use crate::{StreamError, StreamResult};
type Partitioner<T> = Arc<dyn Fn(&PartitionConsumerInfo, &T) -> isize + Send + Sync>;
const MERGE_HUB_BATCH_LIMIT: usize = 256;
const BROADCAST_HUB_BATCH_LIMIT: usize = 256;
const BROADCAST_HUB_SINGLE_CONSUMER_BATCH_LIMIT: usize = 64;
const PARTITION_HUB_BATCH_LIMIT: usize = 1024;
const PARTITION_HUB_WIDE_BATCH_LIMIT: usize = 2048;
const PARTITION_HUB_SINGLE_CONSUMER_BATCH_LIMIT: usize = 256;
const FAN_OUT_CONSUMER_BATCH_LIMIT: usize = 256;
fn fan_out_wait_timeout() -> Duration {
Duration::from_millis(1)
}
#[derive(Clone)]
pub struct MergeHubDrainingControl {
state: Arc<MergeHubState>,
on_drain: Arc<dyn Fn() + Send + Sync>,
}
impl fmt::Debug for MergeHubDrainingControl {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MergeHubDrainingControl").finish()
}
}
impl MergeHubDrainingControl {
pub fn drain_and_complete(&self) {
let mut state = self.state.lock();
state.draining = true;
self.state.condvar.notify_all();
drop(state);
(self.on_drain)();
}
}
pub struct MergeHub;
impl MergeHub {
#[must_use]
pub fn source<T: Send + 'static>(
per_producer_buffer_size: usize,
) -> Source<T, Sink<T, NotUsed>> {
Self::source_with_draining(per_producer_buffer_size)
.map_materialized_value(|(sink, _)| sink)
}
#[must_use]
pub fn source_with_draining<T: Send + 'static>(
per_producer_buffer_size: usize,
) -> Source<T, (Sink<T, NotUsed>, MergeHubDrainingControl)> {
assert!(
per_producer_buffer_size > 0,
"MergeHub per_producer_buffer_size must be greater than zero"
);
Source::from_terminal_batch_materialized_factory(move |_| {
let state = Arc::new(MergeHubShared::<T>::new(per_producer_buffer_size));
let source = Box::new(MergeHubSourceStream {
state: Arc::clone(&state),
local: VecDeque::new(),
prefer_direct: false,
}) as BoxStream<T>;
let sink = merge_hub_sink(Arc::clone(&state));
let control = MergeHubDrainingControl {
state: Arc::clone(&state.state),
on_drain: Arc::new({
let state = Arc::clone(&state);
move || state.finish_if_draining()
}),
};
Ok((source, (sink, control)))
})
}
}
pub struct BroadcastHub;
impl BroadcastHub {
#[must_use]
pub fn sink<T: Clone + Send + 'static>(
buffer_size: usize,
) -> Sink<T, BroadcastHubConsumerSource<T>> {
Self::sink_starting_after(0, buffer_size)
}
#[must_use]
pub fn sink_starting_after<T: Clone + Send + 'static>(
start_after_nr_of_consumers: usize,
buffer_size: usize,
) -> Sink<T, BroadcastHubConsumerSource<T>> {
assert!(
buffer_size > 0,
"BroadcastHub buffer_size must be greater than zero"
);
Sink::from_hinted_runner(move |input, materializer, hints| {
let state = Arc::new(FanOutHubShared::new(
FanOutMode::Broadcast,
start_after_nr_of_consumers,
buffer_size,
None::<Partitioner<T>>,
));
let source = BroadcastHubConsumerSource {
state: Arc::clone(&state),
completion: Arc::new(Mutex::new(None)),
};
let completion = materializer.spawn_stream(move |cancelled| {
FanOutProducer::new(input, state).run(cancelled, hints)
});
source.attach_completion(completion);
Ok(source)
})
}
}
pub struct PartitionHub;
impl PartitionHub {
#[must_use]
pub fn sink<T: Clone + Send + 'static, F>(
partitioner: F,
start_after_nr_of_consumers: usize,
buffer_size: usize,
) -> Sink<T, PartitionHubConsumerSource<T>>
where
F: Fn(&PartitionConsumerInfo, &T) -> isize + Send + Sync + 'static,
{
assert!(
buffer_size > 0,
"PartitionHub buffer_size must be greater than zero"
);
let partitioner = Arc::new(partitioner);
Sink::from_hinted_runner(move |input, materializer, hints| {
let partitioner = Arc::clone(&partitioner);
let state = Arc::new(FanOutHubShared::new(
FanOutMode::Partition,
start_after_nr_of_consumers,
buffer_size,
Some(partitioner),
));
let source = PartitionHubConsumerSource {
state: Arc::clone(&state),
completion: Arc::new(Mutex::new(None)),
};
let completion = materializer.spawn_stream(move |cancelled| {
FanOutProducer::new(input, state).run(cancelled, hints)
});
source.attach_completion(completion);
Ok(source)
})
}
}
#[derive(Clone)]
pub struct BroadcastHubConsumerSource<T> {
state: Arc<FanOutHubShared<T>>,
completion: Arc<Mutex<Option<StreamCompletion<NotUsed>>>>,
}
impl<T: Clone + Send + 'static> BroadcastHubConsumerSource<T> {
fn attach_completion(&self, completion: StreamCompletion<NotUsed>) {
*self
.completion
.lock()
.expect("broadcast hub completion poisoned") = Some(completion);
}
#[must_use]
pub fn source(&self) -> Source<T, NotUsed> {
let state = Arc::clone(&self.state);
Source::from_materialized_factory(move |_| {
let lane = state.register_consumer();
let stream = Box::new(FanOutConsumerStream {
state: Arc::clone(&state),
lane,
local: None,
detached: false,
}) as BoxStream<T>;
Ok((stream, NotUsed))
})
}
}
impl<T: Clone + Send + 'static> fmt::Debug for BroadcastHubConsumerSource<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BroadcastHubConsumerSource").finish()
}
}
#[derive(Clone)]
pub struct PartitionHubConsumerSource<T> {
state: Arc<FanOutHubShared<T>>,
completion: Arc<Mutex<Option<StreamCompletion<NotUsed>>>>,
}
impl<T: Clone + Send + 'static> PartitionHubConsumerSource<T> {
fn attach_completion(&self, completion: StreamCompletion<NotUsed>) {
*self
.completion
.lock()
.expect("partition hub completion poisoned") = Some(completion);
}
#[must_use]
pub fn source(&self) -> Source<T, NotUsed> {
let state = Arc::clone(&self.state);
Source::from_materialized_factory(move |_| {
let lane = state.register_consumer();
let stream = Box::new(FanOutConsumerStream {
state: Arc::clone(&state),
lane,
local: None,
detached: false,
}) as BoxStream<T>;
Ok((stream, NotUsed))
})
}
}
impl<T: Clone + Send + 'static> fmt::Debug for PartitionHubConsumerSource<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PartitionHubConsumerSource").finish()
}
}
#[derive(Clone, Debug)]
pub struct PartitionConsumerInfo {
consumer_ids: SmallVec<[u64; 16]>,
queue_sizes: SmallVec<[(u64, usize); 16]>,
}
impl PartitionConsumerInfo {
#[must_use]
pub fn size(&self) -> usize {
self.consumer_ids.len()
}
#[must_use]
pub fn consumer_ids(&self) -> &[u64] {
&self.consumer_ids
}
#[must_use]
pub fn consumer_id_by_idx(&self, idx: usize) -> u64 {
self.consumer_ids[idx]
}
#[must_use]
pub fn queue_size(&self, consumer_id: u64) -> usize {
self.queue_sizes
.iter()
.find_map(|(id, size)| (*id == consumer_id).then_some(*size))
.unwrap_or(0)
}
}
fn merge_hub_sink<T: Send + 'static>(state: Arc<MergeHubShared<T>>) -> Sink<T, NotUsed> {
Sink::from_raw_hinted_runner(move |input, materializer, hints| {
if hints.inline_micro_max_success_items.is_some() {
state.register_direct_producer(input)?;
return Ok(NotUsed);
}
let input = materializer.checked_stream(input, None);
let producer_id = state.register_producer()?;
let hub = Arc::clone(&state);
let completion = materializer.spawn_stream(move |cancelled| {
let mut input = input;
loop {
if cancelled.load(std::sync::atomic::Ordering::SeqCst) {
hub.fail(StreamError::Cancelled);
hub.deregister_producer(producer_id);
return Err(StreamError::Cancelled);
}
match input.next() {
Some(Ok(item)) => hub.push_item(producer_id, item)?,
Some(Err(error)) => {
hub.fail(error.clone());
hub.deregister_producer(producer_id);
return Err(error);
}
None => {
hub.deregister_producer(producer_id);
return Ok(NotUsed);
}
}
}
});
state.store_producer_completion(completion);
Ok(NotUsed)
})
}
struct MergeHubShared<T> {
state: Arc<MergeHubState>,
shared: Mutex<MergeHubInner<T>>,
condvar: Condvar,
failed: AtomicBool,
}
#[derive(Debug)]
struct MergeHubState {
inner: Mutex<MergeHubFlags>,
condvar: Condvar,
}
#[derive(Debug, Default)]
struct MergeHubFlags {
draining: bool,
}
impl MergeHubState {
fn lock(&self) -> MutexGuard<'_, MergeHubFlags> {
self.inner.lock().expect("merge hub flags poisoned")
}
}
struct MergeHubInner<T> {
queue: VecDeque<(u64, T)>,
direct_producers: VecDeque<MergeHubDirectProducer<T>>,
queued_per_producer: BTreeMap<u64, usize>,
producer_completions: Vec<StreamCompletion<NotUsed>>,
active_producers: usize,
next_producer_id: u64,
source_closed: bool,
completed: bool,
failed: Option<StreamError>,
per_producer_buffer_size: usize,
}
struct MergeHubDirectProducer<T> {
id: u64,
input: BoxStream<T>,
}
enum MergeHubProducerTerminal {
Active,
Completed,
Failed(StreamError),
}
impl<T> MergeHubShared<T> {
fn new(per_producer_buffer_size: usize) -> Self {
Self {
state: Arc::new(MergeHubState {
inner: Mutex::new(MergeHubFlags::default()),
condvar: Condvar::new(),
}),
shared: Mutex::new(MergeHubInner {
queue: VecDeque::new(),
direct_producers: VecDeque::new(),
queued_per_producer: BTreeMap::new(),
producer_completions: Vec::new(),
active_producers: 0,
next_producer_id: 0,
source_closed: false,
completed: false,
failed: None,
per_producer_buffer_size,
}),
condvar: Condvar::new(),
failed: AtomicBool::new(false),
}
}
fn register_direct_producer(&self, input: BoxStream<T>) -> StreamResult<()> {
let mut inner = self.shared.lock().expect("merge hub poisoned");
prune_finished_producer_completions(&mut inner.producer_completions);
let flags = self.state.lock();
if flags.draining || inner.source_closed || inner.completed {
return Err(StreamError::Failed(
"merge hub is draining or closed to new producers".to_owned(),
));
}
if let Some(error) = inner.failed.clone() {
return Err(error);
}
let id = inner.next_producer_id;
inner.next_producer_id += 1;
inner.active_producers += 1;
inner
.direct_producers
.push_back(MergeHubDirectProducer { id, input });
drop(flags);
drop(inner);
self.condvar.notify_all();
Ok(())
}
fn register_producer(&self) -> StreamResult<u64> {
let mut inner = self.shared.lock().expect("merge hub poisoned");
prune_finished_producer_completions(&mut inner.producer_completions);
let flags = self.state.lock();
if flags.draining || inner.source_closed || inner.completed {
return Err(StreamError::Failed(
"merge hub is draining or closed to new producers".to_owned(),
));
}
if let Some(error) = inner.failed.clone() {
return Err(error);
}
let id = inner.next_producer_id;
inner.next_producer_id += 1;
inner.active_producers += 1;
inner.queued_per_producer.insert(id, 0);
Ok(id)
}
fn store_producer_completion(&self, completion: StreamCompletion<NotUsed>) {
let mut inner = self.shared.lock().expect("merge hub poisoned");
prune_finished_producer_completions(&mut inner.producer_completions);
inner.producer_completions.push(completion);
}
fn push_item(&self, producer_id: u64, item: T) -> StreamResult<()> {
let mut inner = self.shared.lock().expect("merge hub poisoned");
loop {
if let Some(error) = inner.failed.clone() {
inner.queued_per_producer.remove(&producer_id);
return Err(error);
}
if inner.source_closed {
inner.queued_per_producer.remove(&producer_id);
return Err(StreamError::Cancelled);
}
let queued = inner
.queued_per_producer
.get(&producer_id)
.copied()
.unwrap_or(0);
if queued < inner.per_producer_buffer_size {
inner.queue.push_back((producer_id, item));
inner.queued_per_producer.insert(producer_id, queued + 1);
self.condvar.notify_all();
return Ok(());
}
inner = self
.condvar
.wait(inner)
.expect("merge hub poisoned while waiting");
}
}
fn deregister_producer(&self, producer_id: u64) {
let mut inner = self.shared.lock().expect("merge hub poisoned");
prune_finished_producer_completions(&mut inner.producer_completions);
inner.queued_per_producer.remove(&producer_id);
inner
.direct_producers
.retain(|producer| producer.id != producer_id);
inner.active_producers = inner.active_producers.saturating_sub(1);
if inner.active_producers == 0 {
let flags = self.state.lock();
if flags.draining {
inner.completed = true;
}
}
self.condvar.notify_all();
}
fn pop_direct_producer(
&self,
inner: &mut MergeHubInner<T>,
) -> Option<MergeHubDirectProducer<T>> {
inner.direct_producers.pop_front()
}
fn restore_direct_producer(
&self,
producer: MergeHubDirectProducer<T>,
terminal: MergeHubProducerTerminal,
) -> StreamResult<()> {
match terminal {
MergeHubProducerTerminal::Active => {
let mut inner = self.shared.lock().expect("merge hub poisoned");
if let Some(error) = inner.failed.clone() {
inner.active_producers = inner.active_producers.saturating_sub(1);
return Err(error);
}
if inner.source_closed {
inner.active_producers = inner.active_producers.saturating_sub(1);
return Err(StreamError::Cancelled);
}
inner.direct_producers.push_back(producer);
Ok(())
}
MergeHubProducerTerminal::Completed => {
self.deregister_producer(producer.id);
Ok(())
}
MergeHubProducerTerminal::Failed(error) => {
self.fail(error.clone());
self.deregister_producer(producer.id);
Err(error)
}
}
}
fn fail(&self, error: StreamError) {
let mut inner = self.shared.lock().expect("merge hub poisoned");
if inner.failed.is_none() {
inner.failed = Some(error);
self.failed.store(true, Ordering::SeqCst);
}
self.condvar.notify_all();
}
fn failed_error(&self) -> Option<StreamError> {
if !self.failed.load(Ordering::SeqCst) {
return None;
}
self.shared
.lock()
.expect("merge hub poisoned")
.failed
.clone()
}
fn finish_if_draining(&self) {
let flags = self.state.lock();
if !flags.draining {
return;
}
drop(flags);
let mut inner = self.shared.lock().expect("merge hub poisoned");
prune_finished_producer_completions(&mut inner.producer_completions);
if inner.active_producers == 0 {
inner.completed = true;
self.condvar.notify_all();
}
}
}
fn prune_finished_producer_completions(completions: &mut Vec<StreamCompletion<NotUsed>>) {
let mut index = 0;
while index < completions.len() {
if completions[index].try_wait().is_some() {
drop(completions.swap_remove(index));
} else {
index += 1;
}
}
}
struct MergeHubSourceStream<T> {
state: Arc<MergeHubShared<T>>,
local: VecDeque<T>,
prefer_direct: bool,
}
impl<T> Iterator for MergeHubSourceStream<T> {
type Item = StreamResult<T>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(error) = self.state.failed_error() {
self.local.clear();
let mut inner = self.state.shared.lock().expect("merge hub poisoned");
inner.source_closed = true;
return Some(Err(error));
}
if let Some(item) = self.local.pop_front() {
return Some(Ok(item));
}
let mut inner = self.state.shared.lock().expect("merge hub poisoned");
loop {
if let Some(error) = inner.failed.clone() {
inner.source_closed = true;
return Some(Err(error));
}
let should_drain_direct = !inner.direct_producers.is_empty()
&& (self.prefer_direct || inner.queue.is_empty());
if should_drain_direct {
let Some(mut producer) = self.state.pop_direct_producer(&mut inner) else {
continue;
};
let drain_limit = inner
.per_producer_buffer_size
.clamp(1, MERGE_HUB_BATCH_LIMIT);
drop(inner);
let mut batch = std::mem::take(&mut self.local);
batch.clear();
batch.reserve(drain_limit.saturating_sub(batch.capacity()));
let mut terminal = MergeHubProducerTerminal::Active;
for _ in 0..drain_limit {
match producer.input.next() {
Some(Ok(item)) => batch.push_back(item),
Some(Err(error)) => {
terminal = MergeHubProducerTerminal::Failed(error);
break;
}
None => {
terminal = MergeHubProducerTerminal::Completed;
break;
}
}
}
let restore_result = self.state.restore_direct_producer(producer, terminal);
match restore_result {
Ok(()) => {
self.prefer_direct = false;
if let Some(first) = batch.pop_front() {
self.local = batch;
return Some(Ok(first));
}
inner = self.state.shared.lock().expect("merge hub poisoned");
continue;
}
Err(error) => {
self.local.clear();
return Some(Err(error));
}
}
}
if !inner.queue.is_empty() {
let drain_n = inner.queue.len().min(MERGE_HUB_BATCH_LIMIT);
let mut batch = std::mem::take(&mut self.local);
batch.clear();
batch.reserve(drain_n.saturating_sub(batch.capacity()));
for _ in 0..drain_n {
if let Some((producer_id, item)) = inner.queue.pop_front() {
if let Some(queued) = inner.queued_per_producer.get_mut(&producer_id) {
*queued = queued.saturating_sub(1);
}
batch.push_back(item);
}
}
self.state.condvar.notify_all();
drop(inner);
let first = batch
.pop_front()
.expect("merge hub drained non-empty batch");
self.local = batch;
self.prefer_direct = true;
return Some(Ok(first));
}
if inner.completed {
inner.source_closed = true;
return None;
}
inner = self
.state
.condvar
.wait(inner)
.expect("merge hub poisoned while waiting");
}
}
}
impl<T> Drop for MergeHubSourceStream<T> {
fn drop(&mut self) {
let mut inner = self.state.shared.lock().expect("merge hub poisoned");
inner.source_closed = true;
self.state.condvar.notify_all();
}
}
#[derive(Clone, Copy)]
enum FanOutMode {
Broadcast,
Partition,
}
struct FanOutHubShared<T> {
registry: Mutex<FanOutRegistry<T>>,
snapshot: ArcSwap<FanOutSnapshot<T>>,
producer_wait: Mutex<()>,
producer_condvar: Condvar,
producer_epoch: AtomicU64,
topology_epoch: AtomicU64,
mode: FanOutMode,
start_after_nr_of_consumers: usize,
buffer_size: usize,
partitioner: Option<Partitioner<T>>,
}
struct FanOutRegistry<T> {
consumers: BTreeMap<u64, Arc<FanOutConsumerLane<T>>>,
next_consumer_id: u64,
terminal: Option<FanOutTerminal>,
}
struct FanOutSnapshot<T> {
consumers: Vec<Arc<FanOutConsumerLane<T>>>,
terminal: Option<FanOutTerminal>,
}
#[derive(Clone, Debug)]
enum FanOutTerminal {
Completed,
Failed(StreamError),
}
impl FanOutTerminal {
fn producer_error(&self) -> StreamError {
match self {
Self::Completed => StreamError::Cancelled,
Self::Failed(error) => error.clone(),
}
}
}
struct FanOutConsumerLane<T> {
id: u64,
state: Mutex<FanOutLaneState<T>>,
condvar: Condvar,
queued: AtomicUsize,
active: AtomicBool,
failed: AtomicBool,
}
struct FanOutLaneState<T> {
chunks: VecDeque<Vec<T>>,
queued: usize,
terminal: Option<FanOutTerminal>,
}
impl<T> FanOutConsumerLane<T> {
fn new(id: u64, buffer_size: usize) -> Self {
Self {
id,
state: Mutex::new(FanOutLaneState {
chunks: VecDeque::with_capacity((buffer_size / FAN_OUT_CONSUMER_BATCH_LIMIT) + 1),
queued: 0,
terminal: None,
}),
condvar: Condvar::new(),
queued: AtomicUsize::new(0),
active: AtomicBool::new(true),
failed: AtomicBool::new(false),
}
}
fn terminal(id: u64, terminal: FanOutTerminal) -> Self {
let failed = matches!(terminal, FanOutTerminal::Failed(_));
Self {
id,
state: Mutex::new(FanOutLaneState {
chunks: VecDeque::new(),
queued: 0,
terminal: Some(terminal),
}),
condvar: Condvar::new(),
queued: AtomicUsize::new(0),
active: AtomicBool::new(false),
failed: AtomicBool::new(failed),
}
}
fn id(&self) -> u64 {
self.id
}
fn queued_len(&self) -> usize {
self.queued.load(Ordering::Acquire)
}
fn is_active(&self) -> bool {
self.active.load(Ordering::Acquire)
}
fn deactivate(&self) {
self.active.store(false, Ordering::Release);
self.condvar.notify_all();
}
fn set_terminal(&self, terminal: FanOutTerminal) {
let mut state = self.state.lock().expect("fan-out lane poisoned");
if matches!(terminal, FanOutTerminal::Failed(_)) {
state.chunks.clear();
state.queued = 0;
self.queued.store(0, Ordering::Release);
self.failed.store(true, Ordering::Release);
}
state.terminal = Some(terminal);
drop(state);
self.condvar.notify_all();
}
}
impl PartitionConsumerInfo {
fn from_cached_lanes<T>(consumer_ids: &[u64], lanes: &[Arc<FanOutConsumerLane<T>>]) -> Self {
Self {
consumer_ids: consumer_ids.iter().copied().collect(),
queue_sizes: lanes
.iter()
.map(|lane| (lane.id(), lane.queued_len()))
.collect(),
}
}
}
struct PartitionTopologyCache<T> {
epoch: u64,
consumer_ids: SmallVec<[u64; 16]>,
lanes: SmallVec<[Arc<FanOutConsumerLane<T>>; 16]>,
}
type PartitionRoutedBatches<T> = SmallVec<[(Arc<FanOutConsumerLane<T>>, VecDeque<T>); 16]>;
impl<T> PartitionTopologyCache<T> {
fn new() -> Self {
Self {
epoch: u64::MAX,
consumer_ids: SmallVec::new(),
lanes: SmallVec::new(),
}
}
fn clear(&mut self) {
self.epoch = u64::MAX;
self.consumer_ids.clear();
self.lanes.clear();
}
}
impl<T> FanOutHubShared<T> {
fn new(
mode: FanOutMode,
start_after_nr_of_consumers: usize,
buffer_size: usize,
partitioner: Option<Partitioner<T>>,
) -> Self {
Self {
registry: Mutex::new(FanOutRegistry {
consumers: BTreeMap::new(),
next_consumer_id: 0,
terminal: None,
}),
snapshot: ArcSwap::from_pointee(FanOutSnapshot {
consumers: Vec::new(),
terminal: None,
}),
producer_wait: Mutex::new(()),
producer_condvar: Condvar::new(),
producer_epoch: AtomicU64::new(0),
topology_epoch: AtomicU64::new(0),
mode,
start_after_nr_of_consumers,
buffer_size,
partitioner,
}
}
fn register_consumer(&self) -> Arc<FanOutConsumerLane<T>> {
let mut registry = self.registry.lock().expect("fan-out hub poisoned");
let id = registry.next_consumer_id;
registry.next_consumer_id += 1;
if let Some(terminal) = registry.terminal.clone() {
return Arc::new(FanOutConsumerLane::terminal(id, terminal));
}
let lane = Arc::new(FanOutConsumerLane::new(id, self.buffer_size));
registry.consumers.insert(id, Arc::clone(&lane));
self.publish_snapshot_locked(®istry);
drop(registry);
self.notify_topology_transition();
lane
}
fn remove_consumer(&self, consumer_id: u64) {
let lane = {
let mut registry = self.registry.lock().expect("fan-out hub poisoned");
let lane = registry.consumers.remove(&consumer_id);
if let Some(lane) = &lane {
lane.deactivate();
self.publish_snapshot_locked(®istry);
}
lane
};
if let Some(lane) = lane {
lane.condvar.notify_all();
self.notify_topology_transition();
}
}
fn wait_for_broadcast_capacity(&self, max_items: usize) -> StreamResult<usize> {
loop {
let observed = self.producer_epoch.load(Ordering::Acquire);
let snapshot = self.snapshot.load();
if let Some(terminal) = &snapshot.terminal {
return Err(terminal.producer_error());
}
let lanes = self.active_lanes(&snapshot);
if lanes.len() < self.start_after_nr_of_consumers || lanes.is_empty() {
self.wait_for_producer_transition(observed);
continue;
}
let free = lanes
.iter()
.map(|lane| self.buffer_size.saturating_sub(lane.queued_len()))
.min()
.unwrap_or(0);
if free > 0 {
let max_items = if lanes.len() == 1 {
max_items.min(BROADCAST_HUB_SINGLE_CONSUMER_BATCH_LIMIT)
} else {
max_items
};
return Ok(free.min(max_items).max(1));
}
self.wait_for_producer_transition(observed);
}
}
fn push_broadcast_item(&self, item: T) -> StreamResult<()>
where
T: Clone,
{
let mut batch = VecDeque::new();
batch.push_back(item);
self.push_broadcast_batch(&mut batch)
}
fn push_broadcast_batch(&self, batch: &mut VecDeque<T>) -> StreamResult<()>
where
T: Clone,
{
if batch.is_empty() {
return Ok(());
}
while !batch.is_empty() {
let observed = self.producer_epoch.load(Ordering::Acquire);
let snapshot = self.snapshot.load();
if let Some(terminal) = &snapshot.terminal {
batch.clear();
return Err(terminal.producer_error());
}
let lanes = self.active_lanes(&snapshot);
if lanes.len() < self.start_after_nr_of_consumers || lanes.is_empty() {
self.wait_for_producer_transition(observed);
continue;
}
let free = lanes
.iter()
.map(|lane| self.buffer_size.saturating_sub(lane.queued_len()))
.min()
.unwrap_or(0);
if free == 0 {
self.wait_for_producer_transition(observed);
continue;
}
let take_n = free.min(batch.len());
if !self.try_push_broadcast_batch(&lanes, batch, take_n)? {
self.wait_for_producer_transition(observed);
}
}
Ok(())
}
fn try_push_broadcast_batch(
&self,
lanes: &[Arc<FanOutConsumerLane<T>>],
batch: &mut VecDeque<T>,
take_n: usize,
) -> StreamResult<bool>
where
T: Clone,
{
if take_n == 0 || lanes.is_empty() {
return Ok(false);
}
let mut guards = SmallVec::<
[(
Arc<FanOutConsumerLane<T>>,
MutexGuard<'_, FanOutLaneState<T>>,
); 16],
>::new();
for lane in lanes {
if !lane.is_active() {
return Ok(false);
}
let guard = lane.state.lock().expect("fan-out lane poisoned");
if !lane.is_active() {
return Ok(false);
}
if let Some(terminal) = &guard.terminal {
return Err(terminal.producer_error());
}
if self.buffer_size.saturating_sub(guard.queued) < take_n {
return Ok(false);
}
guards.push((Arc::clone(lane), guard));
}
let lane_count = guards.len();
if lane_count == 0 {
return Ok(false);
}
let mut notify_lanes = SmallVec::<[Arc<FanOutConsumerLane<T>>; 16]>::new();
for (index, (lane, guard)) in guards.iter_mut().enumerate() {
let chunk = if index + 1 == lane_count {
batch.drain(..take_n).collect()
} else {
batch.iter().take(take_n).cloned().collect()
};
guard.chunks.push_back(chunk);
guard.queued += take_n;
lane.queued.store(guard.queued, Ordering::Release);
notify_lanes.push(Arc::clone(lane));
}
drop(guards);
for lane in notify_lanes {
lane.condvar.notify_one();
}
Ok(true)
}
fn select_partition(
&self,
item: &T,
cache: &mut PartitionTopologyCache<T>,
) -> StreamResult<Option<Arc<FanOutConsumerLane<T>>>> {
let Some(partitioner) = &self.partitioner else {
return Err(StreamError::Failed(
"partition hub partitioner missing".to_owned(),
));
};
let topology_epoch = self.topology_epoch.load(Ordering::Acquire);
if cache.epoch != topology_epoch || cache.lanes.is_empty() {
self.refresh_partition_topology(cache)?;
}
let info = PartitionConsumerInfo::from_cached_lanes(&cache.consumer_ids, &cache.lanes);
let selected = partitioner(&info, item);
if selected < 0 {
return Ok(None);
}
let selected = selected as u64;
let selected_idx = selected as usize;
if cache.consumer_ids.get(selected_idx).copied() == Some(selected) {
return Ok(Some(Arc::clone(&cache.lanes[selected_idx])));
}
cache
.consumer_ids
.iter()
.position(|id| *id == selected)
.map(|idx| Some(Arc::clone(&cache.lanes[idx])))
.ok_or_else(|| {
StreamError::Failed("partition hub selected unknown consumer".to_owned())
})
}
fn refresh_partition_topology(
&self,
cache: &mut PartitionTopologyCache<T>,
) -> StreamResult<()> {
loop {
let observed = self.producer_epoch.load(Ordering::Acquire);
let topology_epoch = self.topology_epoch.load(Ordering::Acquire);
let snapshot = self.snapshot.load();
if let Some(terminal) = &snapshot.terminal {
cache.clear();
return Err(terminal.producer_error());
}
let lanes = self.active_lanes(&snapshot);
if lanes.len() >= self.start_after_nr_of_consumers && !lanes.is_empty() {
cache.epoch = topology_epoch;
cache.consumer_ids.clear();
cache
.consumer_ids
.extend(lanes.iter().map(|lane| lane.id()));
cache.lanes = lanes;
return Ok(());
}
self.wait_for_producer_transition(observed);
}
}
fn enqueue_partition(&self, selected: Arc<FanOutConsumerLane<T>>, item: T) -> StreamResult<()> {
let mut batch = VecDeque::new();
batch.push_back(item);
self.enqueue_partition_batch(selected, &mut batch)
}
fn enqueue_partition_batch(
&self,
selected: Arc<FanOutConsumerLane<T>>,
batch: &mut VecDeque<T>,
) -> StreamResult<()> {
if batch.is_empty() {
return Ok(());
}
let mut state = selected.state.lock().expect("fan-out lane poisoned");
loop {
if !selected.is_active() {
batch.clear();
return Err(StreamError::Failed(
"partition hub selected unknown consumer".to_owned(),
));
}
if let Some(terminal) = &state.terminal {
batch.clear();
return Err(terminal.producer_error());
}
let free = self.buffer_size.saturating_sub(state.queued);
if free > 0 {
let take_n = free.min(batch.len());
let chunk = batch.drain(..take_n).collect();
state.chunks.push_back(chunk);
state.queued += take_n;
selected.queued.store(state.queued, Ordering::Release);
selected.condvar.notify_one();
if batch.is_empty() {
return Ok(());
}
}
let (guard, _) = selected
.condvar
.wait_timeout(state, fan_out_wait_timeout())
.expect("fan-out lane poisoned while waiting");
state = guard;
}
}
fn complete(&self) {
let lanes = {
let mut registry = self.registry.lock().expect("fan-out hub poisoned");
if registry.terminal.is_none() {
registry.terminal = Some(FanOutTerminal::Completed);
self.publish_snapshot_locked(®istry);
}
registry.consumers.values().cloned().collect::<Vec<_>>()
};
for lane in lanes {
lane.set_terminal(FanOutTerminal::Completed);
}
self.notify_topology_transition();
}
fn fail(&self, error: StreamError) {
let terminal = FanOutTerminal::Failed(error);
let lanes = {
let mut registry = self.registry.lock().expect("fan-out hub poisoned");
if registry.terminal.is_none() {
registry.terminal = Some(terminal.clone());
self.publish_snapshot_locked(®istry);
}
registry.consumers.values().cloned().collect::<Vec<_>>()
};
for lane in lanes {
lane.set_terminal(terminal.clone());
}
self.notify_topology_transition();
}
fn publish_snapshot_locked(&self, registry: &FanOutRegistry<T>) {
self.snapshot.store(Arc::new(FanOutSnapshot {
consumers: registry.consumers.values().cloned().collect(),
terminal: registry.terminal.clone(),
}));
}
fn active_lanes(
&self,
snapshot: &FanOutSnapshot<T>,
) -> SmallVec<[Arc<FanOutConsumerLane<T>>; 16]> {
snapshot
.consumers
.iter()
.filter(|lane| lane.is_active())
.cloned()
.collect()
}
fn partition_batch_limit(&self) -> usize {
let snapshot = self.snapshot.load();
let active = snapshot
.consumers
.iter()
.filter(|lane| lane.is_active())
.count();
match active {
0 | 1 => PARTITION_HUB_SINGLE_CONSUMER_BATCH_LIMIT,
2..=7 => PARTITION_HUB_BATCH_LIMIT,
_ => PARTITION_HUB_WIDE_BATCH_LIMIT,
}
}
fn notify_producer_transition(&self) {
self.producer_epoch.fetch_add(1, Ordering::Release);
self.producer_condvar.notify_one();
}
fn notify_topology_transition(&self) {
self.topology_epoch.fetch_add(1, Ordering::Release);
self.notify_producer_transition();
}
fn wait_for_producer_transition(&self, observed_epoch: u64) {
let guard = self
.producer_wait
.lock()
.expect("fan-out producer wait poisoned");
if self.producer_epoch.load(Ordering::Acquire) == observed_epoch {
let (_guard, _) = self
.producer_condvar
.wait_timeout(guard, fan_out_wait_timeout())
.expect("fan-out producer wait poisoned while waiting");
}
}
}
fn push_partition_routed<T>(
routed: &mut PartitionRoutedBatches<T>,
lane: Arc<FanOutConsumerLane<T>>,
item: T,
) {
for (existing, batch) in routed.iter_mut() {
if existing.id() == lane.id() {
batch.push_back(item);
return;
}
}
let mut batch = VecDeque::new();
batch.push_back(item);
routed.push((lane, batch));
}
struct FanOutProducer<T> {
input: BoxStream<T>,
state: Arc<FanOutHubShared<T>>,
}
impl<T> FanOutProducer<T> {
fn new(input: BoxStream<T>, state: Arc<FanOutHubShared<T>>) -> Self {
Self { input, state }
}
}
impl<T: Send + 'static + Clone> FanOutProducer<T> {
fn run(
mut self,
cancelled: Arc<std::sync::atomic::AtomicBool>,
hints: SourceRuntimeHints,
) -> StreamResult<NotUsed> {
struct ProducerDropGuard<T> {
state: Arc<FanOutHubShared<T>>,
disarmed: bool,
}
impl<T> ProducerDropGuard<T> {
fn new(state: Arc<FanOutHubShared<T>>) -> Self {
Self {
state,
disarmed: false,
}
}
fn disarm(&mut self) {
self.disarmed = true;
}
}
impl<T> Drop for ProducerDropGuard<T> {
fn drop(&mut self) {
if !self.disarmed && std::thread::panicking() {
self.state.fail(StreamError::Failed(
"fan-out hub producer panicked".to_owned(),
));
}
}
}
let mut guard = ProducerDropGuard::new(Arc::clone(&self.state));
match self.state.mode {
FanOutMode::Broadcast => {
let batch_producer = hints.inline_micro_max_success_items.is_some();
let mut batch = VecDeque::new();
loop {
if cancelled.load(Ordering::SeqCst) {
self.state.fail(StreamError::Cancelled);
guard.disarm();
return Err(StreamError::Cancelled);
}
if batch_producer {
let capacity = self
.state
.wait_for_broadcast_capacity(BROADCAST_HUB_BATCH_LIMIT)?;
batch.clear();
batch.reserve(capacity.saturating_sub(batch.capacity()));
let mut terminal = None;
for _ in 0..capacity {
if cancelled.load(Ordering::SeqCst) {
terminal = Some(Err(StreamError::Cancelled));
break;
}
match self.input.next() {
Some(Ok(item)) => batch.push_back(item),
Some(Err(error)) => {
terminal = Some(Err(error));
break;
}
None => {
terminal = Some(Ok(()));
break;
}
}
}
if !batch.is_empty() {
self.state.push_broadcast_batch(&mut batch)?;
}
match terminal {
Some(Err(StreamError::Cancelled)) => {
self.state.fail(StreamError::Cancelled);
guard.disarm();
return Err(StreamError::Cancelled);
}
Some(Err(error)) => {
self.state.fail(error.clone());
guard.disarm();
return Err(error);
}
Some(Ok(())) => {
self.state.complete();
guard.disarm();
return Ok(NotUsed);
}
None => continue,
}
}
match self.input.next() {
Some(Ok(item)) => self.state.push_broadcast_item(item)?,
Some(Err(error)) => {
self.state.fail(error.clone());
guard.disarm();
return Err(error);
}
None => {
self.state.complete();
guard.disarm();
return Ok(NotUsed);
}
}
}
}
FanOutMode::Partition => {
let batch_producer = hints.inline_micro_max_success_items.is_some();
let mut topology_cache = PartitionTopologyCache::new();
let mut routed = PartitionRoutedBatches::new();
loop {
if cancelled.load(Ordering::SeqCst) {
self.state.fail(StreamError::Cancelled);
guard.disarm();
return Err(StreamError::Cancelled);
}
if batch_producer {
for (_, batch) in routed.iter_mut() {
batch.clear();
}
let partition_batch_limit = self.state.partition_batch_limit();
let mut terminal = None;
for _ in 0..partition_batch_limit {
if cancelled.load(Ordering::SeqCst) {
terminal = Some(Err(StreamError::Cancelled));
break;
}
match self.input.next() {
Some(Ok(item)) => {
if let Some(selected) =
self.state.select_partition(&item, &mut topology_cache)?
{
push_partition_routed(&mut routed, selected, item);
}
}
Some(Err(error)) => {
terminal = Some(Err(error));
break;
}
None => {
terminal = Some(Ok(()));
break;
}
}
}
for (selected, batch) in routed.iter_mut() {
if !batch.is_empty() {
self.state
.enqueue_partition_batch(Arc::clone(selected), batch)?;
}
}
match terminal {
Some(Err(StreamError::Cancelled)) => {
self.state.fail(StreamError::Cancelled);
guard.disarm();
return Err(StreamError::Cancelled);
}
Some(Err(error)) => {
self.state.fail(error.clone());
guard.disarm();
return Err(error);
}
Some(Ok(())) => {
self.state.complete();
guard.disarm();
return Ok(NotUsed);
}
None => continue,
}
}
match self.input.next() {
Some(Ok(item)) => {
if let Some(selected) =
self.state.select_partition(&item, &mut topology_cache)?
{
self.state.enqueue_partition(selected, item)?;
}
}
Some(Err(error)) => {
self.state.fail(error.clone());
guard.disarm();
return Err(error);
}
None => {
self.state.complete();
guard.disarm();
return Ok(NotUsed);
}
}
}
}
}
}
}
struct FanOutConsumerStream<T> {
state: Arc<FanOutHubShared<T>>,
lane: Arc<FanOutConsumerLane<T>>,
local: Option<std::vec::IntoIter<T>>,
detached: bool,
}
impl<T: Clone + Send + 'static> Iterator for FanOutConsumerStream<T> {
type Item = StreamResult<T>;
fn next(&mut self) -> Option<Self::Item> {
if self.lane.failed.load(Ordering::Acquire) {
self.local = None;
} else if let Some(local) = &mut self.local {
if let Some(item) = local.next() {
return Some(Ok(item));
}
self.local = None;
}
let mut state = self.lane.state.lock().expect("fan-out lane poisoned");
loop {
if let Some(FanOutTerminal::Failed(error)) = state.terminal.clone() {
return Some(Err(error));
}
if let Some(batch) = state.chunks.pop_front() {
state.queued = state.queued.saturating_sub(batch.len());
self.lane.queued.store(state.queued, Ordering::Release);
if matches!(self.state.mode, FanOutMode::Partition) {
self.lane.condvar.notify_one();
}
self.state.notify_producer_transition();
drop(state);
let mut batch = batch.into_iter();
let first = batch.next().expect("fan-out drained non-empty lane batch");
if batch.len() > 0 {
self.local = Some(batch);
}
return Some(Ok(first));
}
if matches!(state.terminal, Some(FanOutTerminal::Completed)) {
return None;
}
let (guard, _) = self
.lane
.condvar
.wait_timeout(state, fan_out_wait_timeout())
.expect("fan-out lane poisoned while waiting");
state = guard;
}
}
}
impl<T> Drop for FanOutConsumerStream<T> {
fn drop(&mut self) {
if !self.detached {
self.state.remove_consumer(self.lane.id());
self.detached = true;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::testkit::{TestSink, TestSource};
use crate::{Keep, Materializer, Sink, Source};
use std::{
panic::{self, AssertUnwindSafe},
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
thread,
time::{Duration, Instant},
};
#[test]
fn merge_hub_accepts_dynamic_producers_and_drains() {
let materializer = Materializer::new();
let ((hub_sink, control), completion) = MergeHub::source_with_draining::<i32>(4)
.to_mat(Sink::collect(), Keep::both)
.run_with_materializer(&materializer)
.expect("merge hub materializes");
hub_sink
.clone()
.run_with(Source::from_iter([1, 2, 3]))
.expect("first producer attaches");
hub_sink
.run_with(Source::from_iter([4, 5]))
.expect("second producer attaches");
control.drain_and_complete();
let mut result = completion.wait().expect("merge hub completes");
result.sort_unstable();
assert_eq!(result, vec![1, 2, 3, 4, 5]);
}
#[test]
fn merge_hub_producer_error_fails_downstream_consumer() {
let materializer = Materializer::new();
let (hub_sink, sink) = MergeHub::source::<i32>(4)
.to_mat(TestSink::probe(), Keep::both)
.run_with_materializer(&materializer)
.expect("merge hub materializes");
let producer_ok = TestSource::probe::<i32>()
.to_mat(hub_sink.clone(), Keep::left)
.run_with_materializer(&materializer)
.expect("successful producer attaches");
let producer_fail = TestSource::probe::<i32>()
.to_mat(hub_sink, Keep::left)
.run_with_materializer(&materializer)
.expect("failing producer attaches");
sink.request(1);
assert_eq!(producer_ok.expect_request(), 1);
producer_ok.send_next(1);
sink.assert_next(1);
producer_fail.send_error(StreamError::Failed("producer failed".to_owned()));
sink.request(1);
assert_eq!(
sink.expect_error(),
StreamError::Failed("producer failed".to_owned())
);
}
#[test]
fn broadcast_hub_backpressures_slowest_consumer() {
let materializer = Materializer::new();
let (publisher, hub_source) = TestSource::probe::<i32>()
.to_mat(BroadcastHub::sink(1), Keep::both)
.run_with_materializer(&materializer)
.expect("broadcast hub materializes");
let sink_a = hub_source
.source()
.run_with(TestSink::probe())
.expect("first consumer materializes");
let sink_b = hub_source
.source()
.run_with(TestSink::probe())
.expect("second consumer materializes");
sink_a.request(1);
sink_b.request(1);
assert_eq!(publisher.expect_request(), 1);
publisher.send_next(1);
sink_a.assert_next(1);
sink_b.assert_next(1);
sink_a.request(1);
assert_eq!(publisher.expect_request(), 1);
publisher.send_next(2);
sink_a.assert_next(2);
sink_b.expect_no_message(Duration::from_millis(250));
sink_a.request(1);
sink_a.expect_no_message(Duration::from_millis(250));
sink_b.request(1);
sink_b.assert_next(2);
assert_eq!(publisher.expect_request(), 1);
}
#[test]
fn broadcast_hub_late_consumer_sees_only_late_elements() {
let materializer = Materializer::new();
let (publisher, hub_source) = TestSource::probe::<i32>()
.to_mat(BroadcastHub::sink(2), Keep::both)
.run_with_materializer(&materializer)
.expect("broadcast hub materializes");
let sink_a = hub_source
.source()
.run_with(TestSink::probe())
.expect("first consumer materializes");
sink_a.request(1);
assert_eq!(publisher.expect_request(), 1);
publisher.send_next(1);
sink_a.assert_next(1);
let sink_b = hub_source
.source()
.run_with(TestSink::probe())
.expect("late consumer materializes");
sink_a.request(1);
sink_b.request(1);
assert_eq!(publisher.expect_request(), 1);
publisher.send_next(2);
sink_a.assert_next(2);
sink_b.assert_next(2);
publisher.send_complete();
sink_a.request(1);
sink_b.request(1);
sink_a.expect_complete();
sink_b.expect_complete();
}
#[test]
fn partition_hub_routes_elements_to_selected_consumers() {
let materializer = Materializer::new();
let hub = Source::from_iter([0, 1, 2, 3])
.run_with_materializer(
PartitionHub::sink(
|info, item| {
let idx = (*item as usize) % info.size();
info.consumer_id_by_idx(idx) as isize
},
2,
8,
),
&materializer,
)
.expect("partition hub materializes");
let sink_a = hub
.source()
.run_with(TestSink::probe())
.expect("first consumer materializes");
let sink_b = hub
.source()
.run_with(TestSink::probe())
.expect("second consumer materializes");
sink_a.request(2);
sink_b.request(2);
sink_a.assert_next_n([0, 2]);
sink_b.assert_next_n([1, 3]);
}
#[test]
fn partition_hub_evaluates_stateful_partitioner_once_per_blocked_element() {
let materializer = Materializer::new();
let partition_calls = Arc::new(AtomicUsize::new(0));
let partition_calls_for_hub = Arc::clone(&partition_calls);
let (publisher, hub) = TestSource::probe::<i32>()
.to_mat(
PartitionHub::sink(
move |info, _item| {
partition_calls_for_hub.fetch_add(1, Ordering::SeqCst);
info.consumer_id_by_idx(0) as isize
},
1,
1,
),
Keep::both,
)
.run_with_materializer(&materializer)
.expect("partition hub materializes");
let sink = hub
.source()
.run_with(TestSink::probe())
.expect("consumer materializes");
assert_eq!(publisher.expect_request(), 1);
publisher.send_next(1);
wait_for_partition_calls(&partition_calls, 1);
assert_eq!(publisher.expect_request(), 1);
publisher.send_next(2);
sink.expect_no_message(Duration::from_millis(250));
wait_for_partition_calls(&partition_calls, 2);
sink.request(1);
sink.assert_next(1);
sink.request(1);
sink.assert_next(2);
assert_eq!(partition_calls.load(Ordering::SeqCst), 2);
}
#[test]
fn partition_hub_invalidates_cached_topology_on_consumer_churn() {
let materializer = Materializer::new();
let (publisher, hub) = TestSource::probe::<i32>()
.to_mat(
PartitionHub::sink(
|info, _item| info.consumer_id_by_idx(info.size() - 1) as isize,
1,
8,
),
Keep::both,
)
.run_with_materializer(&materializer)
.expect("partition hub materializes");
let sink_a = hub
.source()
.run_with(TestSink::probe())
.expect("first consumer materializes");
sink_a.request(1);
assert_eq!(publisher.expect_request(), 1);
publisher.send_next(1);
sink_a.assert_next(1);
drop(sink_a);
let sink_b = hub
.source()
.run_with(TestSink::probe())
.expect("replacement consumer materializes");
sink_b.request(1);
assert_eq!(publisher.expect_request(), 1);
publisher.send_next(2);
sink_b.assert_next(2);
publisher.send_complete();
sink_b.request(1);
sink_b.expect_complete();
}
#[test]
fn broadcast_hub_drains_local_chunk_before_completion() {
let materializer = Materializer::new();
let hub = Source::from_iter([1, 2, 3])
.run_with_materializer(BroadcastHub::sink_starting_after(1, 8), &materializer)
.expect("broadcast hub materializes");
let sink = hub
.source()
.run_with(TestSink::probe())
.expect("consumer materializes");
sink.request(1);
sink.assert_next(1);
sink.request(3);
sink.assert_next_n([2, 3]);
sink.expect_complete();
}
#[test]
fn broadcast_hub_panicking_upstream_fails_consumers() {
let materializer = Materializer::new();
let hub = Source::from_fn_iter(|| {
let mut yielded = false;
std::iter::from_fn(move || {
if !yielded {
yielded = true;
Some(1)
} else {
panic!("boom");
}
})
})
.run_with_materializer(BroadcastHub::sink_starting_after(1, 8), &materializer)
.expect("broadcast hub materializes");
let sink = hub
.source()
.run_with(TestSink::probe())
.expect("consumer materializes");
sink.request(2);
match panic::catch_unwind(AssertUnwindSafe(|| sink.expect_error())) {
Ok(error) => assert_eq!(
error,
StreamError::Failed("fan-out hub producer panicked".to_owned())
),
Err(payload) => {
assert_eq!(
panic_message(payload),
"expected stream error, got next element"
);
sink.request(1);
assert_eq!(
sink.expect_error(),
StreamError::Failed("fan-out hub producer panicked".to_owned())
);
}
}
}
fn panic_message(payload: Box<dyn std::any::Any + Send>) -> String {
match payload.downcast::<String>() {
Ok(message) => *message,
Err(payload) => match payload.downcast::<&'static str>() {
Ok(message) => (*message).to_owned(),
Err(_) => "<non-string panic payload>".to_owned(),
},
}
}
fn wait_for_partition_calls(counter: &AtomicUsize, expected: usize) {
let deadline = Instant::now() + Duration::from_secs(1);
while Instant::now() < deadline {
if counter.load(Ordering::SeqCst) == expected {
return;
}
thread::sleep(Duration::from_millis(5));
}
assert_eq!(counter.load(Ordering::SeqCst), expected);
}
}