use crate::coordinate::{Coordinate, DagPosition, Region};
use crate::event::{EventKind, StoredEvent};
use flume::{Receiver, Sender, TrySendError};
use parking_lot::Mutex;
pub(crate) struct FanoutList<T: Clone + RegionFanoutItem> {
senders: Mutex<Vec<FanoutSender<T>>>,
}
struct FanoutSender<T: Clone> {
tx: Sender<T>,
region: Region,
}
pub(crate) struct FilteredSubscriberList {
senders: Mutex<Vec<FilteredSender>>,
}
struct FilteredSender {
tx: Sender<Notification>,
region: Region,
}
#[derive(Clone, Debug)]
pub(crate) struct CommittedEventEnvelope {
pub notification: Notification,
pub stored: StoredEvent<serde_json::Value>,
}
pub(crate) type SubscriberList = FilteredSubscriberList;
pub(crate) type ReactorSubscriberList = FanoutList<CommittedEventEnvelope>;
pub(crate) trait RegionFanoutItem {
fn matches_region(&self, region: &Region) -> bool;
}
#[derive(Clone, Debug)]
pub struct Notification {
pub event_id: crate::id::EventId,
pub correlation_id: u128,
pub causation_id: Option<u128>,
pub coord: Coordinate,
pub kind: EventKind,
pub sequence: u64,
pub position: DagPosition,
}
pub(crate) fn notification_matches_region(region: &Region, value: &Notification) -> bool {
region.matches_event_on_lane(
value.coord.entity(),
value.coord.scope(),
value.kind,
Some(value.position.lane()),
)
}
impl RegionFanoutItem for CommittedEventEnvelope {
fn matches_region(&self, region: &Region) -> bool {
notification_matches_region(region, &self.notification)
}
}
impl<T: Clone + RegionFanoutItem> FanoutList<T> {
pub(crate) fn new() -> Self {
Self {
senders: Mutex::new(Vec::new()),
}
}
pub(crate) fn subscribe_with_region(&self, capacity: usize, region: Region) -> Receiver<T> {
let (tx, rx) = flume::bounded(capacity);
self.senders.lock().push(FanoutSender { tx, region });
rx
}
pub(crate) fn has_subscribers(&self) -> bool {
!self.senders.lock().is_empty()
}
pub(crate) fn broadcast(&self, value: &T) {
let mut guard = self.senders.lock();
let subscribers_before = guard.len();
guard.retain(|sub| {
if !value.matches_region(&sub.region) {
return !sub.tx.is_disconnected();
}
match sub.tx.try_send(value.clone()) {
Ok(()) => true,
Err(TrySendError::Full(_)) => false,
Err(TrySendError::Disconnected(_)) => false,
}
});
tracing::trace!(
target: "batpak::fanout",
subscribers_before,
subscribers_after = guard.len(),
pruned = subscribers_before.saturating_sub(guard.len()),
"reactor fanout try_send pass",
);
}
}
impl FilteredSubscriberList {
pub(crate) fn new() -> Self {
Self {
senders: Mutex::new(Vec::new()),
}
}
pub(crate) fn subscribe_with_region(
&self,
capacity: usize,
region: Region,
) -> Receiver<Notification> {
let (tx, rx) = flume::bounded(capacity);
self.senders.lock().push(FilteredSender { tx, region });
rx
}
pub(crate) fn broadcast(&self, value: &Notification) {
let mut guard = self.senders.lock();
let subscribers_before = guard.len();
guard.retain(|sub| {
match notification_matches_region(&sub.region, value) {
false => !sub.tx.is_disconnected(),
true => match sub.tx.try_send(value.clone()) {
Ok(()) => true,
Err(TrySendError::Full(_)) => false,
Err(TrySendError::Disconnected(_)) => false,
},
}
});
tracing::trace!(
target: "batpak::fanout",
subscribers_before,
subscribers_after = guard.len(),
pruned = subscribers_before.saturating_sub(guard.len()),
"subscription fanout try_send pass",
);
}
}
#[cfg(test)]
mod fanout_subscriber_tests {
use super::{CommittedEventEnvelope, FanoutList, FilteredSubscriberList, Notification};
use crate::coordinate::{Coordinate, DagPosition, Region};
use crate::event::EventKind;
#[test]
fn has_subscribers_tracks_subscription_state() {
let fanout: FanoutList<CommittedEventEnvelope> = FanoutList::new();
assert!(
!fanout.has_subscribers(),
"a freshly constructed fanout has no subscribers"
);
let _rx = fanout.subscribe_with_region(1, Region::all());
assert!(
fanout.has_subscribers(),
"after a subscribe the fanout must report subscribers"
);
}
fn notification(scope: &str) -> Notification {
Notification {
event_id: crate::id::EventId::from_u128(1),
correlation_id: 1,
causation_id: None,
coord: Coordinate::new("entity", scope).expect("coordinate"),
kind: EventKind::DATA,
sequence: 1,
position: DagPosition::root(),
}
}
#[test]
fn dropped_out_of_region_subscriber_is_pruned_not_leaked() {
let list = FilteredSubscriberList::new();
let rx = list.subscribe_with_region(4, Region::scope("alpha"));
assert_eq!(list.senders.lock().len(), 1);
drop(rx);
list.broadcast(¬ification("beta"));
assert_eq!(
list.senders.lock().len(),
0,
"a dropped out-of-region subscriber must be pruned (audit R4)"
);
}
#[test]
fn live_out_of_region_subscriber_is_retained() {
let list = FilteredSubscriberList::new();
let _rx = list.subscribe_with_region(4, Region::scope("alpha"));
list.broadcast(¬ification("beta")); assert_eq!(
list.senders.lock().len(),
1,
"a live out-of-region subscriber must be retained"
);
}
}