use crate::backend::native::v2::pubsub::{NodeMetadata, PubSubEvent, SubscriberId, SubscriptionFilter};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
#[derive(Debug)]
pub struct Publisher {
senders: Arc<Mutex<Vec<(SubscriberId, Sender<PubSubEvent>, SubscriptionFilter)>>>,
next_id: Arc<Mutex<u64>>,
}
impl Publisher {
pub fn new() -> Self {
Self {
senders: Arc::new(Mutex::new(Vec::new())),
next_id: Arc::new(Mutex::new(1)),
}
}
pub fn subscribe(&self, filter: SubscriptionFilter) -> (SubscriberId, Receiver<PubSubEvent>) {
let (tx, rx) = mpsc::channel();
let id = {
let mut next = self.next_id.lock().unwrap();
let id = SubscriberId::from_raw(*next);
*next += 1;
id
};
let mut senders = self.senders.lock().unwrap();
senders.push((id, tx, filter));
(id, rx)
}
pub fn unsubscribe(&self, id: SubscriberId) -> bool {
let mut senders = self.senders.lock().unwrap();
let original_len = senders.len();
senders.retain(|(sub_id, _, _)| *sub_id != id);
senders.len() < original_len
}
pub fn emit(&self, event: PubSubEvent) {
let senders = self.senders.lock().unwrap();
for (_, sender, filter) in senders.iter() {
if filter.matches_simple(&event) {
let _ = sender.send(event.clone());
}
}
}
pub fn emit_with_metadata(&self, event: PubSubEvent, node_metadata: Option<&NodeMetadata>) {
let senders = self.senders.lock().unwrap();
for (_, sender, filter) in senders.iter() {
if filter.matches(&event, node_metadata) {
let _ = sender.send(event.clone());
}
}
}
pub fn subscriber_count(&self) -> usize {
self.senders.lock().unwrap().len()
}
}
impl Default for Publisher {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::backend::native::v2::pubsub::{PubSubEvent, PubSubEventType, SubscriptionFilter};
#[test]
fn test_publisher_creation() {
let pubber = Publisher::new();
assert_eq!(pubber.subscriber_count(), 0);
}
#[test]
fn test_default_publisher() {
let pubber = Publisher::default();
assert_eq!(pubber.subscriber_count(), 0);
}
#[test]
fn test_subscribe_unsubscribe() {
let pubber = Publisher::new();
let filter = SubscriptionFilter::all();
let (id, _rx) = pubber.subscribe(filter.clone());
assert_eq!(pubber.subscriber_count(), 1);
assert!(pubber.unsubscribe(id));
assert_eq!(pubber.subscriber_count(), 0);
assert!(!pubber.unsubscribe(id));
}
#[test]
fn test_multiple_subscribers() {
let pubber = Publisher::new();
let filter = SubscriptionFilter::all();
let (_id1, _rx1) = pubber.subscribe(filter.clone());
assert_eq!(pubber.subscriber_count(), 1);
let (_id2, _rx2) = pubber.subscribe(filter.clone());
assert_eq!(pubber.subscriber_count(), 2);
let (_id3, _rx3) = pubber.subscribe(filter);
assert_eq!(pubber.subscriber_count(), 3);
}
#[test]
fn test_emit_to_single_subscriber() {
let pubber = Publisher::new();
let filter = SubscriptionFilter::all();
let (_id, rx) = pubber.subscribe(filter);
pubber.emit(PubSubEvent::SnapshotCommitted { snapshot_id: 100 });
let received = rx.recv().unwrap();
assert_eq!(received.snapshot_id(), 100);
}
#[test]
fn test_emit_to_multiple_subscribers() {
let pubber = Publisher::new();
let filter = SubscriptionFilter::all();
let (_id1, rx1) = pubber.subscribe(filter.clone());
let (_id2, rx2) = pubber.subscribe(filter);
pubber.emit(PubSubEvent::SnapshotCommitted { snapshot_id: 200 });
assert_eq!(rx1.recv().unwrap().snapshot_id(), 200);
assert_eq!(rx2.recv().unwrap().snapshot_id(), 200);
}
#[test]
fn test_filter_by_event_type() {
let pubber = Publisher::new();
let node_filter = SubscriptionFilter::event_types(vec![PubSubEventType::Node]);
let (_id, rx) = pubber.subscribe(node_filter);
pubber.emit(PubSubEvent::NodeChanged {
node_id: 1,
snapshot_id: 100,
});
pubber.emit(PubSubEvent::EdgeChanged {
edge_id: 1,
snapshot_id: 100,
});
let received = rx.recv().unwrap();
assert!(received.is_node_event());
let timeout = std::time::Duration::from_millis(100);
let result = rx.recv_timeout(timeout);
assert!(result.is_err()); }
#[test]
fn test_filter_by_specific_node() {
let pubber = Publisher::new();
let filter = SubscriptionFilter::nodes(vec![42, 43]);
let (_id, rx) = pubber.subscribe(filter);
pubber.emit(PubSubEvent::NodeChanged {
node_id: 42,
snapshot_id: 100,
});
pubber.emit(PubSubEvent::NodeChanged {
node_id: 99,
snapshot_id: 101,
});
let received = rx.recv().unwrap();
assert_eq!(received.snapshot_id(), 100);
let timeout = std::time::Duration::from_millis(100);
let result = rx.recv_timeout(timeout);
assert!(result.is_err());
}
#[test]
fn test_best_effort_delivery() {
let pubber = Publisher::new();
let filter = SubscriptionFilter::all();
let (_id, rx) = pubber.subscribe(filter);
drop(rx);
pubber.emit(PubSubEvent::SnapshotCommitted { snapshot_id: 100 });
}
#[test]
fn test_mixed_filters() {
let pubber = Publisher::new();
let node_filter = SubscriptionFilter::event_types(vec![PubSubEventType::Node]);
let (_id1, rx1) = pubber.subscribe(node_filter);
let edge_filter = SubscriptionFilter::event_types(vec![PubSubEventType::Edge]);
let (_id2, rx2) = pubber.subscribe(edge_filter);
let all_filter = SubscriptionFilter::all();
let (_id3, rx3) = pubber.subscribe(all_filter);
pubber.emit(PubSubEvent::NodeChanged {
node_id: 1,
snapshot_id: 100,
});
pubber.emit(PubSubEvent::EdgeChanged {
edge_id: 1,
snapshot_id: 100,
});
assert!(rx1.recv().unwrap().is_node_event());
assert!(
rx1.recv_timeout(std::time::Duration::from_millis(100))
.is_err()
);
assert!(rx2.recv().unwrap().is_edge_event());
assert!(
rx2.recv_timeout(std::time::Duration::from_millis(100))
.is_err()
);
assert!(rx3.recv().unwrap().is_node_event());
assert!(rx3.recv().unwrap().is_edge_event());
}
}