use crate::coordinate::{Coordinate, DagPosition, Region};
use crate::event::{EventKind, StoredEvent};
use flume::{Receiver, Sender, TrySendError};
use parking_lot::Mutex;
pub(crate) struct FanoutList<T: Clone + RegionFanoutItem> {
senders: Mutex<Vec<FanoutSender<T>>>,
}
struct FanoutSender<T: Clone> {
tx: Sender<T>,
region: Region,
}
pub(crate) struct FilteredSubscriberList {
senders: Mutex<Vec<FilteredSender>>,
}
struct FilteredSender {
tx: Sender<Notification>,
region: Region,
}
#[derive(Clone, Debug)]
pub(crate) struct CommittedEventEnvelope {
pub notification: Notification,
pub stored: StoredEvent<serde_json::Value>,
}
pub(crate) type SubscriberList = FilteredSubscriberList;
pub(crate) type ReactorSubscriberList = FanoutList<CommittedEventEnvelope>;
pub(crate) trait RegionFanoutItem {
fn matches_region(&self, region: &Region) -> bool;
}
#[derive(Clone, Debug)]
pub struct Notification {
pub event_id: u128,
pub correlation_id: u128,
pub causation_id: Option<u128>,
pub coord: Coordinate,
pub kind: EventKind,
pub sequence: u64,
pub position: DagPosition,
}
pub(crate) fn notification_matches_region(region: &Region, value: &Notification) -> bool {
region.matches_event(value.coord.entity(), value.coord.scope(), value.kind)
}
impl RegionFanoutItem for CommittedEventEnvelope {
fn matches_region(&self, region: &Region) -> bool {
notification_matches_region(region, &self.notification)
}
}
impl<T: Clone + RegionFanoutItem> FanoutList<T> {
pub(crate) fn new() -> Self {
Self {
senders: Mutex::new(Vec::new()),
}
}
pub(crate) fn subscribe_with_region(&self, capacity: usize, region: Region) -> Receiver<T> {
let (tx, rx) = flume::bounded(capacity);
self.senders.lock().push(FanoutSender { tx, region });
rx
}
pub(crate) fn has_subscribers(&self) -> bool {
!self.senders.lock().is_empty()
}
pub(crate) fn broadcast(&self, value: &T) {
let mut guard = self.senders.lock();
let subscribers_before = guard.len();
guard.retain(|sub| {
if !value.matches_region(&sub.region) {
return true;
}
match sub.tx.try_send(value.clone()) {
Ok(()) => true,
Err(TrySendError::Full(_)) => false,
Err(TrySendError::Disconnected(_)) => false,
}
});
tracing::trace!(
target: "batpak::fanout",
subscribers_before,
subscribers_after = guard.len(),
pruned = subscribers_before.saturating_sub(guard.len()),
"reactor fanout try_send pass",
);
}
}
impl FilteredSubscriberList {
pub(crate) fn new() -> Self {
Self {
senders: Mutex::new(Vec::new()),
}
}
pub(crate) fn subscribe_with_region(
&self,
capacity: usize,
region: Region,
) -> Receiver<Notification> {
let (tx, rx) = flume::bounded(capacity);
self.senders.lock().push(FilteredSender { tx, region });
rx
}
pub(crate) fn broadcast(&self, value: &Notification) {
let mut guard = self.senders.lock();
let subscribers_before = guard.len();
guard.retain(|sub| {
match notification_matches_region(&sub.region, value) {
false => true, true => match sub.tx.try_send(value.clone()) {
Ok(()) => true,
Err(TrySendError::Full(_)) => false,
Err(TrySendError::Disconnected(_)) => false,
},
}
});
tracing::trace!(
target: "batpak::fanout",
subscribers_before,
subscribers_after = guard.len(),
pruned = subscribers_before.saturating_sub(guard.len()),
"subscription fanout try_send pass",
);
}
}