use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use futures_util::stream::{self, BoxStream, StreamExt};
use std::sync::RwLock;
use tokio::sync::{
broadcast::{self, Receiver, Sender},
watch,
};
use tokio::time::timeout;
use super::emitter::{EmitterError, EventEmitter};
use super::event::Event;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct EventHubMetrics {
pub capacity: usize,
pub dropped: usize,
}
#[derive(Debug)]
pub struct EventHub {
sender: RwLock<Option<Sender<Event>>>,
dropped_events: AtomicUsize,
capacity: usize,
}
impl EventHub {
pub fn new(capacity: usize) -> Arc<Self> {
let capacity = capacity.max(1);
let (sender, _) = broadcast::channel(capacity);
Arc::new(Self {
sender: RwLock::new(Some(sender)),
dropped_events: AtomicUsize::new(0),
capacity,
})
}
pub fn publish(&self, event: Event) -> Result<(), EmitterError> {
self.current_sender()
.ok_or(EmitterError::Closed)
.and_then(|s| s.send(event).map(|_| ()).map_err(|_| EmitterError::Closed))
}
pub fn subscribe(self: &Arc<Self>) -> EventStream {
let receiver = match self.current_sender() {
Some(s) => s.subscribe(),
None => {
let (tx, rx) = broadcast::channel(self.capacity);
drop(tx);
rx
}
};
EventStream {
receiver,
hub: Arc::clone(self),
shutdown: None,
}
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn dropped(&self) -> usize {
self.dropped_events.load(Ordering::Relaxed)
}
pub fn metrics(&self) -> EventHubMetrics {
EventHubMetrics {
capacity: self.capacity(),
dropped: self.dropped(),
}
}
pub fn emitter(self: &Arc<Self>) -> HubEmitter {
HubEmitter {
hub: Arc::clone(self),
}
}
pub fn close(&self) {
self.sender
.write()
.expect("hub sender lock poisoned")
.take();
}
fn current_sender(&self) -> Option<Sender<Event>> {
self.sender
.read()
.expect("hub sender lock poisoned")
.clone()
}
fn record_lag(&self, missed: u64) {
if missed == 0 {
return;
}
let n = usize::try_from(missed).unwrap_or(usize::MAX);
let prev = self.dropped_events.fetch_add(n, Ordering::Relaxed);
tracing::warn!(
target: "weavegraph::event_bus",
missed,
total_dropped = prev.saturating_add(n),
"event stream lagged; dropped events"
);
}
}
#[derive(Clone, Debug)]
pub struct HubEmitter {
hub: Arc<EventHub>,
}
impl EventEmitter for HubEmitter {
fn emit(&self, event: Event) -> Result<(), EmitterError> {
self.hub.publish(event)
}
}
#[derive(Debug)]
pub struct EventStream {
receiver: Receiver<Event>,
hub: Arc<EventHub>,
shutdown: Option<watch::Receiver<bool>>,
}
impl EventStream {
pub async fn recv(&mut self) -> Result<Event, broadcast::error::RecvError> {
match self.receiver.recv().await {
Err(broadcast::error::RecvError::Lagged(n)) => {
self.hub.record_lag(n);
Err(broadcast::error::RecvError::Lagged(n))
}
result => result,
}
}
pub fn try_recv(&mut self) -> Result<Event, broadcast::error::TryRecvError> {
match self.receiver.try_recv() {
Err(broadcast::error::TryRecvError::Lagged(n)) => {
self.hub.record_lag(n);
Err(broadcast::error::TryRecvError::Lagged(n))
}
result => result,
}
}
pub fn into_inner(self) -> Receiver<Event> {
self.receiver
}
pub fn into_blocking_iter(self) -> BlockingEventIter {
BlockingEventIter {
receiver: self.receiver,
hub: self.hub,
}
}
pub fn with_shutdown(mut self, shutdown: watch::Receiver<bool>) -> Self {
self.shutdown = Some(shutdown);
self
}
pub fn into_async_stream(self) -> BoxStream<'static, Event> {
let EventStream {
receiver,
hub,
shutdown,
} = self;
stream::unfold(
(receiver, hub, shutdown),
|(mut receiver, hub, mut shutdown)| async move {
loop {
let recv_result = if let Some(ref mut rx) = shutdown {
tokio::select! {
biased;
changed = rx.changed() => {
if changed.is_ok() && *rx.borrow() { return None; }
continue;
}
result = receiver.recv() => result,
}
} else {
receiver.recv().await
};
match recv_result {
Ok(ev) => return Some((ev, (receiver, hub, shutdown))),
Err(broadcast::error::RecvError::Lagged(n)) => hub.record_lag(n),
Err(broadcast::error::RecvError::Closed) => return None,
}
}
},
)
.boxed()
}
pub async fn next_timeout(&mut self, duration: Duration) -> Option<Event> {
loop {
match timeout(duration, self.recv()).await {
Ok(Ok(event)) => return Some(event),
Ok(Err(broadcast::error::RecvError::Lagged(_))) => continue,
Ok(Err(broadcast::error::RecvError::Closed)) | Err(_) => return None,
}
}
}
}
pub struct BlockingEventIter {
receiver: Receiver<Event>,
hub: Arc<EventHub>,
}
impl Iterator for BlockingEventIter {
type Item = Event;
fn next(&mut self) -> Option<Self::Item> {
loop {
match self.receiver.blocking_recv() {
Ok(event) => return Some(event),
Err(broadcast::error::RecvError::Lagged(n)) => self.hub.record_lag(n),
Err(broadcast::error::RecvError::Closed) => return None,
}
}
}
}