use std::{
collections::{HashMap, VecDeque},
fmt, hint,
marker::PhantomData,
sync::{
Arc, Condvar, Mutex, MutexGuard,
atomic::{AtomicBool, AtomicU8, AtomicU64, AtomicUsize, Ordering, fence},
mpsc,
},
thread,
time::Duration,
};
use arc_swap::ArcSwap;
use crossbeam_queue::ArrayQueue;
use ractor::{Actor, ActorProcessingErr, ActorRef};
use tokio::sync::Notify;
use crate::{
StreamError, StreamResult,
actor::block_on_ractor_runtime,
stream::{BoxStream, NotUsed, Source, current_stream_cancelled},
};
const TOPIC_OPEN: u8 = 0;
const TOPIC_CLOSING: u8 = 1;
const TOPIC_CLOSED: u8 = 2;
const SLOT_OPEN: u8 = 0;
const SLOT_COMPLETE: u8 = 1;
const SLOT_ERROR: u8 = 2;
const SLOT_WAIT_BACKSTOP: Duration = Duration::from_millis(10);
const TOPIC_DRAIN_BATCH: usize = 256;
type Ack = mpsc::Sender<StreamResult<()>>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TopicOverflow {
Backpressure,
Sliding,
Dropping,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TopicPublishError<T> {
Closed(T),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TopicTryPublishError<T> {
Closed(T),
Full(T),
Busy(T),
}
pub struct Topic<T: Send + Sync + 'static> {
inner: Arc<TopicInner<T>>,
}
struct TopicInner<T: Send + Sync + 'static> {
actor: ActorRef<TopicMessage<T>>,
shared: Arc<TopicShared<T>>,
next_subscriber_id: Arc<AtomicU64>,
}
struct TopicShared<T: Send + Sync + 'static> {
subscribers: Arc<ArcSwap<TopicSlotTable<T>>>,
capacity: usize,
overflow: TopicOverflow,
lifecycle: AtomicU8,
active_publishers: AtomicUsize,
next_sequence: AtomicU64,
delivered_sequence: AtomicU64,
space_waiters: AtomicUsize,
space_available: Notify,
closed_notified: Notify,
}
struct TopicSlotTable<T: Send + Sync + 'static> {
slots: Vec<Arc<TopicSlot<T>>>,
}
struct TopicSlot<T: Send + Sync + 'static> {
id: u64,
actor: ActorRef<TopicMessage<T>>,
buffer: ArrayQueue<Arc<T>>,
active: AtomicBool,
parked: AtomicBool,
terminal_state: AtomicU8,
terminal: Mutex<Option<TopicSlotTerminal>>,
available_lock: Mutex<()>,
available: Condvar,
async_available: Notify,
}
#[derive(Clone)]
enum TopicSlotTerminal {
Complete,
Error(StreamError),
}
impl<T: Send + Sync + 'static> Clone for Topic<T> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
impl<T: Send + Sync + 'static> Topic<T> {
pub fn new(capacity: usize, overflow: TopicOverflow) -> StreamResult<Self> {
assert!(capacity > 0, "topic capacity must be greater than zero");
let shared = Arc::new(TopicShared {
subscribers: Arc::new(ArcSwap::from_pointee(TopicSlotTable { slots: Vec::new() })),
capacity,
overflow,
lifecycle: AtomicU8::new(TOPIC_OPEN),
active_publishers: AtomicUsize::new(0),
next_sequence: AtomicU64::new(0),
delivered_sequence: AtomicU64::new(0),
space_waiters: AtomicUsize::new(0),
space_available: Notify::new(),
closed_notified: Notify::new(),
});
let state = TopicActorState {
shared: Arc::clone(&shared),
subscribers: HashMap::new(),
closed: false,
};
let (actor, _handle) =
block_on_ractor_runtime(Actor::spawn(None, TopicActor::<T>::default(), state))?
.map_err(|error| {
StreamError::Failed(format!("topic actor failed to spawn: {error}"))
})?;
Ok(Self {
inner: Arc::new(TopicInner {
actor,
shared,
next_subscriber_id: Arc::new(AtomicU64::new(1)),
}),
})
}
pub async fn publish(&self, value: T) -> Result<(), TopicPublishError<T>> {
let Ok(_permit) = self.inner.shared.begin_publish() else {
return Err(TopicPublishError::Closed(value));
};
let sequence = self.inner.shared.claim_sequence();
self.inner.shared.wait_publish_turn(sequence);
let table = self.inner.shared.subscribers.load_full();
if self.inner.shared.overflow == TopicOverflow::Backpressure {
self.inner.shared.wait_for_capacity(&table).await;
}
self.inner
.shared
.publish_to_snapshot(&table, Arc::new(value));
self.inner.shared.finish_publish(sequence);
Ok(())
}
pub fn try_publish(&self, value: T) -> Result<(), TopicTryPublishError<T>> {
let Ok(_permit) = self.inner.shared.begin_publish() else {
return Err(TopicTryPublishError::Closed(value));
};
let Some(sequence) = self.inner.shared.try_claim_sequence() else {
return Err(TopicTryPublishError::Busy(value));
};
let table = self.inner.shared.subscribers.load_full();
if self.inner.shared.overflow == TopicOverflow::Backpressure
&& !self.inner.shared.snapshot_has_capacity(&table)
{
self.inner.shared.finish_publish(sequence);
return Err(TopicTryPublishError::Full(value));
}
self.inner
.shared
.publish_to_snapshot(&table, Arc::new(value));
self.inner.shared.finish_publish(sequence);
Ok(())
}
#[must_use]
pub fn subscribe(&self) -> Source<T>
where
T: Clone,
{
let topic = self.clone();
let actor = self.inner.actor.clone();
let next_subscriber_id = Arc::clone(&self.inner.next_subscriber_id);
Source::from_materialized_factory(move |_materializer| {
let id = next_subscriber_id.fetch_add(1, Ordering::Relaxed);
let capacity = topic.registered_capacity();
let slot = TopicSlot::new(id, actor.clone(), capacity);
topic.register_slot(Arc::clone(&slot), id)?;
let stream: BoxStream<T> = Box::new(TopicStream {
shared: Arc::clone(&topic.inner.shared),
slot,
pending: VecDeque::new(),
terminated: false,
});
Ok((stream, NotUsed))
})
}
#[must_use]
pub fn subscriber_count(&self) -> usize {
self.inner.shared.subscriber_count()
}
pub fn close(&self) -> StreamResult<()> {
let (reply, receiver) = mpsc::channel();
self.inner
.actor
.send_message(TopicMessage::Close { reply })
.map_err(|error| StreamError::ActorAskSendFailed {
reason: error.to_string(),
})?;
receiver.recv().unwrap_or(Err(StreamError::ActorTerminated))
}
#[must_use]
pub fn is_closed(&self) -> bool {
self.inner.shared.is_closed()
}
pub async fn closed(&self) {
loop {
if self.is_closed() {
return;
}
let notified = self.inner.shared.closed_notified.notified();
let mut notified = std::pin::pin!(notified);
notified.as_mut().enable();
if self.is_closed() {
return;
}
notified.as_mut().await;
}
}
fn registered_capacity(&self) -> usize {
self.inner.shared.capacity
}
fn register_slot(&self, slot: Arc<TopicSlot<T>>, id: u64) -> StreamResult<()> {
let (reply, receiver) = mpsc::channel();
self.inner
.actor
.send_message(TopicMessage::Subscribe { id, slot, reply })
.map_err(|error| StreamError::ActorAskSendFailed {
reason: error.to_string(),
})?;
receiver.recv().unwrap_or(Err(StreamError::ActorTerminated))
}
}
impl<T: Clone + Send + Sync + 'static> Topic<T> {
#[doc(hidden)]
pub fn __benchmark_subscribe(&self) -> StreamResult<TopicBenchmarkStream<T>> {
let id = self
.inner
.next_subscriber_id
.fetch_add(1, Ordering::Relaxed);
let capacity = self.registered_capacity();
let slot = TopicSlot::new(id, self.inner.actor.clone(), capacity);
self.register_slot(Arc::clone(&slot), id)?;
Ok(TopicBenchmarkStream {
shared: Arc::clone(&self.inner.shared),
slot,
pending: VecDeque::new(),
terminated: false,
})
}
}
impl<T: Send + Sync + 'static> TopicShared<T> {
fn begin_publish(&self) -> StreamResult<PublishPermit<'_>> {
if self.lifecycle.load(Ordering::Acquire) != TOPIC_OPEN {
return Err(closed_error());
}
self.active_publishers.fetch_add(1, Ordering::AcqRel);
if self.lifecycle.load(Ordering::Acquire) == TOPIC_OPEN {
Ok(PublishPermit {
active_publishers: &self.active_publishers,
})
} else {
self.active_publishers.fetch_sub(1, Ordering::AcqRel);
Err(closed_error())
}
}
fn claim_sequence(&self) -> u64 {
self.next_sequence.fetch_add(1, Ordering::AcqRel) + 1
}
fn try_claim_sequence(&self) -> Option<u64> {
let delivered = self.delivered_sequence.load(Ordering::Acquire);
self.next_sequence
.compare_exchange(
delivered,
delivered + 1,
Ordering::AcqRel,
Ordering::Acquire,
)
.ok()
.map(|_| delivered + 1)
}
fn wait_publish_turn(&self, sequence: u64) {
let mut spins = 0_u32;
while self.delivered_sequence.load(Ordering::Acquire) + 1 != sequence {
spins = spins.wrapping_add(1);
if spins < 64 {
hint::spin_loop();
} else {
thread::yield_now();
}
}
}
fn finish_publish(&self, sequence: u64) {
self.delivered_sequence.store(sequence, Ordering::Release);
}
fn publish_to_snapshot(&self, table: &TopicSlotTable<T>, value: Arc<T>) {
match self.overflow {
TopicOverflow::Backpressure => {
for slot in &table.slots {
slot.enqueue_backpressured(Arc::clone(&value));
}
}
TopicOverflow::Sliding => {
for slot in &table.slots {
slot.enqueue_sliding(Arc::clone(&value));
}
}
TopicOverflow::Dropping => {
for slot in &table.slots {
slot.enqueue_dropping(Arc::clone(&value));
}
}
}
}
async fn wait_for_capacity(&self, table: &TopicSlotTable<T>) {
loop {
if self.snapshot_has_capacity(table) {
return;
}
let notified = self.space_available.notified();
let mut notified = std::pin::pin!(notified);
notified.as_mut().enable();
self.space_waiters.fetch_add(1, Ordering::AcqRel);
if self.snapshot_has_capacity(table) {
self.space_waiters.fetch_sub(1, Ordering::AcqRel);
return;
}
notified.as_mut().await;
self.space_waiters.fetch_sub(1, Ordering::AcqRel);
}
}
fn snapshot_has_capacity(&self, table: &TopicSlotTable<T>) -> bool {
table.slots.iter().all(|slot| slot.has_capacity())
}
fn notify_space(&self) {
if self.space_waiters.load(Ordering::Acquire) != 0 {
self.space_available.notify_waiters();
}
}
fn subscriber_count(&self) -> usize {
let table = self.subscribers.load();
table.slots.iter().filter(|slot| slot.is_active()).count()
}
fn is_closed(&self) -> bool {
self.lifecycle.load(Ordering::Acquire) == TOPIC_CLOSED
}
fn wait_for_publishers_to_drain(&self) {
while self.active_publishers.load(Ordering::Acquire) != 0 {
thread::yield_now();
}
}
fn mark_actor_terminated(&self) {
self.lifecycle.store(TOPIC_CLOSED, Ordering::Release);
self.closed_notified.notify_waiters();
self.notify_space();
}
}
struct PublishPermit<'a> {
active_publishers: &'a AtomicUsize,
}
impl Drop for PublishPermit<'_> {
fn drop(&mut self) {
self.active_publishers.fetch_sub(1, Ordering::AcqRel);
}
}
impl<T: Send + Sync + 'static> TopicSlot<T> {
fn new(id: u64, actor: ActorRef<TopicMessage<T>>, capacity: usize) -> Arc<Self> {
Arc::new(Self {
id,
actor,
buffer: ArrayQueue::new(capacity),
active: AtomicBool::new(true),
parked: AtomicBool::new(false),
terminal_state: AtomicU8::new(SLOT_OPEN),
terminal: Mutex::new(None),
available_lock: Mutex::new(()),
available: Condvar::new(),
async_available: Notify::new(),
})
}
fn terminal_lock(&self) -> MutexGuard<'_, Option<TopicSlotTerminal>> {
self.terminal
.lock()
.unwrap_or_else(|poison| poison.into_inner())
}
fn is_active(&self) -> bool {
self.active.load(Ordering::Acquire)
}
fn has_capacity(&self) -> bool {
!self.is_active() || !self.buffer.is_full()
}
fn enqueue_backpressured(&self, value: Arc<T>) {
if !self.is_active() {
return;
}
let was_empty = self.buffer.is_empty();
if self.buffer.push(value).is_ok() && was_empty {
self.wake();
}
}
fn enqueue_sliding(&self, value: Arc<T>) {
if !self.is_active() {
return;
}
while self.buffer.is_full() {
if self.buffer.pop().is_none() {
break;
}
}
let was_empty = self.buffer.is_empty();
if self.buffer.push(value).is_ok() && was_empty {
self.wake();
}
}
fn enqueue_dropping(&self, value: Arc<T>) {
if !self.is_active() || self.buffer.is_full() {
return;
}
let was_empty = self.buffer.is_empty();
if self.buffer.push(value).is_ok() && was_empty {
self.wake();
}
}
fn pop(&self) -> Option<Arc<T>> {
self.buffer.pop()
}
fn park(&self) {
self.parked.store(true, Ordering::Release);
}
fn unpark(&self) {
self.parked.store(false, Ordering::Release);
}
fn wake(&self) {
if self.parked.swap(false, Ordering::AcqRel) {
let _guard = self
.available_lock
.lock()
.unwrap_or_else(|poison| poison.into_inner());
self.available.notify_one();
self.async_available.notify_waiters();
}
}
fn complete(&self) {
if self
.terminal_state
.compare_exchange(
SLOT_OPEN,
SLOT_COMPLETE,
Ordering::AcqRel,
Ordering::Acquire,
)
.is_err()
{
return;
}
self.active.store(false, Ordering::Release);
*self.terminal_lock() = Some(TopicSlotTerminal::Complete);
self.wake();
}
fn fail(&self, error: StreamError) {
if self
.terminal_state
.compare_exchange(SLOT_OPEN, SLOT_ERROR, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
return;
}
self.active.store(false, Ordering::Release);
*self.terminal_lock() = Some(TopicSlotTerminal::Error(error));
self.wake();
}
fn terminal(&self) -> Option<TopicSlotTerminal> {
if self.terminal_state.load(Ordering::Acquire) == SLOT_OPEN {
return None;
}
self.terminal_lock().clone()
}
fn deactivate(&self) {
self.active.store(false, Ordering::Release);
while self.buffer.pop().is_some() {}
self.wake();
}
fn unsubscribe(&self) {
self.deactivate();
let _ = self
.actor
.send_message(TopicMessage::Unsubscribe { id: self.id });
}
}
impl<T: Send + Sync + 'static> Drop for TopicInner<T> {
fn drop(&mut self) {
self.actor.stop(None);
}
}
enum TopicMessage<T: Send + Sync + 'static> {
Close {
reply: Ack,
},
Subscribe {
id: u64,
slot: Arc<TopicSlot<T>>,
reply: Ack,
},
Unsubscribe {
id: u64,
},
}
#[cfg(feature = "cluster")]
impl<T: Send + Sync + 'static> ractor::Message for TopicMessage<T> {}
struct TopicActor<T> {
_marker: PhantomData<fn() -> T>,
}
impl<T> Default for TopicActor<T> {
fn default() -> Self {
Self {
_marker: PhantomData,
}
}
}
struct TopicActorState<T: Send + Sync + 'static> {
shared: Arc<TopicShared<T>>,
subscribers: HashMap<u64, Arc<TopicSlot<T>>>,
closed: bool,
}
impl<T: Send + Sync + 'static> Actor for TopicActor<T> {
type Msg = TopicMessage<T>;
type State = TopicActorState<T>;
type Arguments = TopicActorState<T>;
async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
args: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
Ok(args)
}
async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
TopicMessage::Close { reply } => {
close_topic(state);
let _ = reply.send(Ok(()));
}
TopicMessage::Subscribe { id, slot, reply } => {
if state.closed || state.shared.lifecycle.load(Ordering::Acquire) == TOPIC_CLOSED {
slot.complete();
} else {
state.subscribers.insert(id, Arc::clone(&slot));
publish_topic_slot_table(state);
}
let _ = reply.send(Ok(()));
}
TopicMessage::Unsubscribe { id } => {
state.subscribers.remove(&id);
publish_topic_slot_table(state);
state.shared.notify_space();
}
}
Ok(())
}
async fn post_stop(
&self,
_myself: ActorRef<Self::Msg>,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
if !state.closed {
for slot in state.subscribers.values() {
slot.fail(StreamError::ActorTerminated);
}
state.subscribers.clear();
publish_topic_slot_table(state);
state.shared.mark_actor_terminated();
}
Ok(())
}
}
fn close_topic<T: Send + Sync + 'static>(state: &mut TopicActorState<T>) {
if state.closed {
return;
}
match state.shared.lifecycle.compare_exchange(
TOPIC_OPEN,
TOPIC_CLOSING,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {}
Err(TOPIC_CLOSED) => {
state.closed = true;
return;
}
Err(_) => {}
}
state.shared.wait_for_publishers_to_drain();
state
.shared
.lifecycle
.store(TOPIC_CLOSED, Ordering::Release);
for slot in state.subscribers.values() {
slot.complete();
}
state.subscribers.clear();
publish_topic_slot_table(state);
state.shared.notify_space();
state.shared.closed_notified.notify_waiters();
state.closed = true;
}
fn publish_topic_slot_table<T: Send + Sync + 'static>(state: &TopicActorState<T>) {
let slots = state.subscribers.values().cloned().collect::<Vec<_>>();
state
.shared
.subscribers
.store(Arc::new(TopicSlotTable { slots }));
}
fn closed_error() -> StreamError {
StreamError::Failed("topic is closed".into())
}
struct TopicStream<T: Clone + Send + Sync + 'static> {
shared: Arc<TopicShared<T>>,
slot: Arc<TopicSlot<T>>,
pending: VecDeque<Arc<T>>,
terminated: bool,
}
#[doc(hidden)]
pub struct TopicBenchmarkStream<T: Clone + Send + Sync + 'static> {
shared: Arc<TopicShared<T>>,
slot: Arc<TopicSlot<T>>,
pending: VecDeque<Arc<T>>,
terminated: bool,
}
impl<T: Clone + Send + Sync + 'static> Iterator for TopicStream<T> {
type Item = StreamResult<T>;
fn next(&mut self) -> Option<Self::Item> {
if self.terminated {
return None;
}
loop {
if let Some(value) = self.pending.pop_front() {
return Some(Ok(value.as_ref().clone()));
}
if let Some(value) = self.drain_batch() {
return Some(Ok(value.as_ref().clone()));
}
if let Some(terminal) = self.slot.terminal() {
self.terminated = true;
return match terminal {
TopicSlotTerminal::Complete => None,
TopicSlotTerminal::Error(error) => Some(Err(error)),
};
}
if current_stream_cancelled()
.as_ref()
.is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
{
self.terminated = true;
return Some(Err(StreamError::Cancelled));
}
self.wait_for_wake();
}
}
}
impl<T: Clone + Send + Sync + 'static> TopicStream<T> {
fn drain_batch(&mut self) -> Option<Arc<T>> {
let first = self.slot.pop()?;
let mut drained = 1_usize;
while drained < TOPIC_DRAIN_BATCH {
let Some(value) = self.slot.pop() else {
break;
};
self.pending.push_back(value);
drained += 1;
}
self.shared.notify_space();
Some(first)
}
fn wait_for_wake(&self) {
let guard = self
.slot
.available_lock
.lock()
.unwrap_or_else(|poison| poison.into_inner());
self.slot.park();
fence(Ordering::SeqCst);
if !self.slot.buffer.is_empty() || self.slot.terminal().is_some() {
self.slot.unpark();
return;
}
let _guard = self
.slot
.available
.wait_timeout(guard, SLOT_WAIT_BACKSTOP)
.unwrap_or_else(|poison| poison.into_inner())
.0;
self.slot.unpark();
}
}
impl<T: Clone + Send + Sync + 'static> Drop for TopicStream<T> {
fn drop(&mut self) {
self.slot.unsubscribe();
self.shared.notify_space();
}
}
impl<T: Clone + Send + Sync + 'static> TopicBenchmarkStream<T> {
#[doc(hidden)]
pub async fn next(&mut self) -> Option<StreamResult<T>> {
if self.terminated {
return None;
}
loop {
if let Some(value) = self.pending.pop_front() {
return Some(Ok(value.as_ref().clone()));
}
if let Some(value) = self.drain_batch() {
return Some(Ok(value.as_ref().clone()));
}
if let Some(terminal) = self.slot.terminal() {
self.terminated = true;
return match terminal {
TopicSlotTerminal::Complete => None,
TopicSlotTerminal::Error(error) => Some(Err(error)),
};
}
self.wait_for_wake().await;
}
}
#[doc(hidden)]
pub async fn count_items(&mut self, target: u64) -> StreamResult<u64> {
let mut count = 0_u64;
while count < target {
if self.terminated {
return Err(StreamError::Failed(
"topic stream ended before requested count".into(),
));
}
if !self.pending.is_empty() {
let drained = self.pending.len().min((target - count) as usize);
self.pending.drain(..drained);
count += drained as u64;
continue;
}
if let Some(drained) = self.drain_available_count((target - count) as usize) {
count += drained as u64;
continue;
}
if let Some(terminal) = self.slot.terminal() {
self.terminated = true;
return match terminal {
TopicSlotTerminal::Complete => Err(StreamError::Failed(
"topic stream completed before requested count".into(),
)),
TopicSlotTerminal::Error(error) => Err(error),
};
}
self.wait_for_wake().await;
}
Ok(count)
}
fn drain_batch(&mut self) -> Option<Arc<T>> {
let first = self.slot.pop()?;
let mut drained = 1_usize;
while drained < TOPIC_DRAIN_BATCH {
let Some(value) = self.slot.pop() else {
break;
};
self.pending.push_back(value);
drained += 1;
}
self.shared.notify_space();
Some(first)
}
fn drain_available_count(&mut self, limit: usize) -> Option<usize> {
let mut drained = 0_usize;
let limit = limit.min(TOPIC_DRAIN_BATCH);
while drained < limit {
let Some(_value) = self.slot.pop() else {
break;
};
drained += 1;
}
if drained == 0 {
return None;
}
self.shared.notify_space();
Some(drained)
}
async fn wait_for_wake(&self) {
let notified = self.slot.async_available.notified();
tokio::pin!(notified);
notified.as_mut().enable();
self.slot.park();
fence(Ordering::SeqCst);
if !self.slot.buffer.is_empty() || self.slot.terminal().is_some() {
self.slot.unpark();
return;
}
notified.as_mut().await;
self.slot.unpark();
}
}
impl<T: Clone + Send + Sync + 'static> Drop for TopicBenchmarkStream<T> {
fn drop(&mut self) {
self.slot.unsubscribe();
self.shared.notify_space();
}
}
impl<T: Send + Sync + 'static> fmt::Debug for Topic<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Topic")
.field("closed", &self.is_closed())
.field("subscribers", &self.subscriber_count())
.finish_non_exhaustive()
}
}
impl<T> fmt::Display for TopicPublishError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TopicPublishError::Closed(_) => f.write_str("topic is closed"),
}
}
}
impl<T> fmt::Display for TopicTryPublishError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TopicTryPublishError::Closed(_) => f.write_str("topic is closed"),
TopicTryPublishError::Full(_) => f.write_str("topic subscriber buffer is full"),
TopicTryPublishError::Busy(_) => f.write_str("topic publish turn is busy"),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{Sink, stream::Materializer};
use futures::executor::block_on;
use std::{
collections::HashSet,
sync::{
Arc,
atomic::{AtomicBool, AtomicUsize},
},
thread,
time::{Duration, Instant},
};
fn wait<T>(completion: crate::StreamCompletion<T>) -> T {
completion.wait().unwrap()
}
fn wait_until<F>(timeout: Duration, mut condition: F) -> bool
where
F: FnMut() -> bool,
{
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
if condition() {
return true;
}
thread::yield_now();
thread::sleep(Duration::from_millis(1));
}
condition()
}
fn materialize_topic<T: Clone + Send + Sync + 'static>(topic: &Topic<T>) -> BoxStream<T> {
let materializer = Materializer::new();
let (stream, _) = topic.subscribe().factory.create(&materializer).unwrap();
stream
}
#[test]
fn every_subscriber_sees_every_post_subscription_element() {
const SUBSCRIBERS: usize = 4;
const PUBLISHERS: usize = 4;
const PER_PUBLISHER: usize = 128;
let topic = Topic::new(1_024, TopicOverflow::Backpressure).unwrap();
let completions = (0..SUBSCRIBERS)
.map(|_| topic.subscribe().run_with(Sink::collect()).unwrap())
.collect::<Vec<_>>();
assert!(wait_until(Duration::from_secs(1), || topic
.subscriber_count()
== SUBSCRIBERS));
let mut handles = Vec::new();
for publisher in 0..PUBLISHERS {
let topic = topic.clone();
handles.push(thread::spawn(move || {
for seq in 0..PER_PUBLISHER {
let value = ((publisher as u64) << 32) | seq as u64;
block_on(topic.publish(value)).unwrap();
}
}));
}
for handle in handles {
handle.join().unwrap();
}
topic.close().unwrap();
let observed = completions.into_iter().map(wait).collect::<Vec<_>>();
let first = observed.first().unwrap();
assert_eq!(first.len(), PUBLISHERS * PER_PUBLISHER);
let unique = first.iter().copied().collect::<HashSet<_>>();
assert_eq!(unique.len(), PUBLISHERS * PER_PUBLISHER);
for values in &observed[1..] {
assert_eq!(values, first, "subscribers disagreed on global order");
}
}
#[test]
fn late_subscriber_sees_nothing_prior_and_zero_subscriber_publish_drops() {
let topic = Topic::new(8, TopicOverflow::Backpressure).unwrap();
assert_eq!(topic.subscriber_count(), 0);
block_on(topic.publish(1_u64)).unwrap();
topic.try_publish(2).unwrap();
let completion = topic.subscribe().run_with(Sink::collect()).unwrap();
assert!(wait_until(Duration::from_secs(1), || topic
.subscriber_count()
== 1));
block_on(topic.publish(3)).unwrap();
topic.close().unwrap();
assert_eq!(wait(completion), vec![3]);
}
#[test]
fn sliding_overflow_drops_oldest_for_slow_subscriber() {
let topic = Topic::new(2, TopicOverflow::Sliding).unwrap();
let mut stream = materialize_topic(&topic);
topic.try_publish(1_u64).unwrap();
topic.try_publish(2).unwrap();
topic.try_publish(3).unwrap();
topic.try_publish(4).unwrap();
topic.close().unwrap();
assert_eq!(stream.next(), Some(Ok(3)));
assert_eq!(stream.next(), Some(Ok(4)));
assert_eq!(stream.next(), None);
}
#[test]
fn dropping_overflow_drops_newest_for_slow_subscriber() {
let topic = Topic::new(2, TopicOverflow::Dropping).unwrap();
let mut stream = materialize_topic(&topic);
topic.try_publish(1_u64).unwrap();
topic.try_publish(2).unwrap();
topic.try_publish(3).unwrap();
topic.try_publish(4).unwrap();
topic.close().unwrap();
assert_eq!(stream.next(), Some(Ok(1)));
assert_eq!(stream.next(), Some(Ok(2)));
assert_eq!(stream.next(), None);
}
#[test]
fn backpressure_stalls_publisher_until_slow_subscriber_drains() {
let topic = Topic::new(1, TopicOverflow::Backpressure).unwrap();
let mut stream = materialize_topic(&topic);
block_on(topic.publish(1_u64)).unwrap();
let completed = Arc::new(AtomicBool::new(false));
let publisher_completed = Arc::clone(&completed);
let publisher = {
let topic = topic.clone();
thread::spawn(move || {
block_on(topic.publish(2)).unwrap();
publisher_completed.store(true, Ordering::SeqCst);
})
};
assert!(!wait_until(Duration::from_millis(25), || completed
.load(Ordering::SeqCst)));
assert_eq!(stream.next(), Some(Ok(1)));
assert!(wait_until(Duration::from_secs(1), || completed.load(Ordering::SeqCst)));
publisher.join().unwrap();
topic.close().unwrap();
assert_eq!(stream.next(), Some(Ok(2)));
assert_eq!(stream.next(), None);
}
#[test]
fn dropping_source_unsubscribes_and_frees_backpressured_slot() {
let topic = Topic::new(1, TopicOverflow::Backpressure).unwrap();
let stream = materialize_topic(&topic);
block_on(topic.publish(1_u64)).unwrap();
assert_eq!(topic.subscriber_count(), 1);
drop(stream);
assert!(wait_until(Duration::from_secs(1), || topic
.subscriber_count()
== 0));
block_on(topic.publish(2)).unwrap();
topic.close().unwrap();
}
#[test]
fn close_drains_then_completes_and_rejects_late_publishes() {
let topic = Topic::new(4, TopicOverflow::Backpressure).unwrap();
let mut stream = materialize_topic(&topic);
block_on(topic.publish(1_u64)).unwrap();
block_on(topic.publish(2)).unwrap();
topic.close().unwrap();
assert_eq!(stream.next(), Some(Ok(1)));
assert_eq!(stream.next(), Some(Ok(2)));
assert_eq!(stream.next(), None);
assert_eq!(topic.try_publish(3), Err(TopicTryPublishError::Closed(3)));
assert_eq!(
block_on(topic.publish(4)),
Err(TopicPublishError::Closed(4))
);
}
#[test]
fn closed_future_wakes_on_close() {
let topic = Topic::<u64>::new(1, TopicOverflow::Backpressure).unwrap();
let waiting = Arc::new(AtomicBool::new(false));
let waiter_started = Arc::clone(&waiting);
let waiter = {
let topic = topic.clone();
thread::spawn(move || {
waiter_started.store(true, Ordering::SeqCst);
block_on(topic.closed());
})
};
assert!(wait_until(Duration::from_secs(1), || waiting.load(Ordering::SeqCst)));
topic.close().unwrap();
waiter.join().unwrap();
assert!(topic.is_closed());
}
#[test]
fn publisher_subscriber_churn_hammer() {
const ROUNDS: usize = 200;
const PUBLISHERS: usize = 4;
let topic = Topic::new(8, TopicOverflow::Sliding).unwrap();
let published = Arc::new(AtomicUsize::new(0));
let mut publisher_handles = Vec::new();
for publisher in 0..PUBLISHERS {
let topic = topic.clone();
let published = Arc::clone(&published);
publisher_handles.push(thread::spawn(move || {
for seq in 0..ROUNDS {
let value = ((publisher as u64) << 32) | seq as u64;
block_on(topic.publish(value)).unwrap();
published.fetch_add(1, Ordering::Relaxed);
}
}));
}
let churn_topic = topic.clone();
let churn = thread::spawn(move || {
for _ in 0..ROUNDS {
let stream = materialize_topic(&churn_topic);
drop(stream);
}
});
for handle in publisher_handles {
handle.join().unwrap();
}
churn.join().unwrap();
topic.close().unwrap();
assert_eq!(published.load(Ordering::Relaxed), PUBLISHERS * ROUNDS);
}
#[test]
fn post_close_subscribe_completes_empty() {
let topic = Topic::<u64>::new(8, TopicOverflow::Backpressure).unwrap();
topic.close().unwrap();
assert_eq!(topic.subscribe().run_collect().unwrap(), Vec::<u64>::new());
}
}