use crate::coordinate::{Coordinate, DagPosition, Region};
use crate::event::{EventKind, StoredEvent};
use flume::{Receiver, Sender, TrySendError};
use parking_lot::Mutex;
pub(crate) struct FanoutList<T: Clone> {
senders: Mutex<Vec<Sender<T>>>,
}
pub(crate) struct FilteredSubscriberList {
senders: Mutex<Vec<FilteredSender>>,
}
struct FilteredSender {
tx: Sender<Notification>,
region: Region,
}
#[derive(Clone, Debug)]
pub(crate) struct CommittedEventEnvelope {
pub notification: Notification,
pub stored: StoredEvent<serde_json::Value>,
}
pub(crate) type SubscriberList = FilteredSubscriberList;
pub(crate) type ReactorSubscriberList = FanoutList<CommittedEventEnvelope>;
#[derive(Clone, Debug)]
pub struct Notification {
pub event_id: u128,
pub correlation_id: u128,
pub causation_id: Option<u128>,
pub coord: Coordinate,
pub kind: EventKind,
pub sequence: u64,
pub position: DagPosition,
}
impl<T: Clone> FanoutList<T> {
pub(crate) fn new() -> Self {
Self {
senders: Mutex::new(Vec::new()),
}
}
pub(crate) fn subscribe(&self, capacity: usize) -> Receiver<T> {
let (tx, rx) = flume::bounded(capacity);
self.senders.lock().push(tx);
rx
}
pub(crate) fn has_subscribers(&self) -> bool {
!self.senders.lock().is_empty()
}
pub(crate) fn broadcast(&self, value: &T) {
let mut guard = self.senders.lock();
guard.retain(|tx| match tx.try_send(value.clone()) {
Ok(()) => true,
Err(TrySendError::Full(_)) => false,
Err(TrySendError::Disconnected(_)) => false,
});
}
}
impl FilteredSubscriberList {
pub(crate) fn new() -> Self {
Self {
senders: Mutex::new(Vec::new()),
}
}
pub(crate) fn subscribe_with_region(
&self,
capacity: usize,
region: Region,
) -> Receiver<Notification> {
let (tx, rx) = flume::bounded(capacity);
self.senders.lock().push(FilteredSender { tx, region });
rx
}
pub(crate) fn broadcast(&self, value: &Notification) {
let mut guard = self.senders.lock();
guard.retain(|sub| {
match sub
.region
.matches_event(value.coord.entity(), value.coord.scope(), value.kind)
{
false => true, true => match sub.tx.try_send(value.clone()) {
Ok(()) => true,
Err(TrySendError::Full(_)) => false,
Err(TrySendError::Disconnected(_)) => false,
},
}
});
}
}