use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc;
use parking_lot::Mutex;
use crate::value::{EdgeId, Value, VertexId};
#[derive(Clone, Debug, PartialEq)]
pub enum GraphEvent {
VertexAdded {
id: VertexId,
label: String,
properties: HashMap<String, Value>,
},
VertexRemoved { id: VertexId, label: String },
VertexPropertyChanged {
id: VertexId,
key: String,
old_value: Option<Value>,
new_value: Value,
},
EdgeAdded {
id: EdgeId,
src: VertexId,
dst: VertexId,
label: String,
properties: HashMap<String, Value>,
},
EdgeRemoved {
id: EdgeId,
src: VertexId,
dst: VertexId,
label: String,
},
EdgePropertyChanged {
id: EdgeId,
key: String,
old_value: Option<Value>,
new_value: Value,
},
Batch(Vec<GraphEvent>),
}
impl GraphEvent {
pub fn vertex_id(&self) -> Option<VertexId> {
match self {
Self::VertexAdded { id, .. }
| Self::VertexRemoved { id, .. }
| Self::VertexPropertyChanged { id, .. } => Some(*id),
_ => None,
}
}
pub fn edge_id(&self) -> Option<EdgeId> {
match self {
Self::EdgeAdded { id, .. }
| Self::EdgeRemoved { id, .. }
| Self::EdgePropertyChanged { id, .. } => Some(*id),
_ => None,
}
}
pub fn label(&self) -> Option<&str> {
match self {
Self::VertexAdded { label, .. }
| Self::VertexRemoved { label, .. }
| Self::EdgeAdded { label, .. }
| Self::EdgeRemoved { label, .. } => Some(label),
_ => None,
}
}
pub fn property_key(&self) -> Option<&str> {
match self {
Self::VertexPropertyChanged { key, .. }
| Self::EdgePropertyChanged { key, .. } => Some(key),
_ => None,
}
}
pub fn is_vertex_event(&self) -> bool {
matches!(
self,
Self::VertexAdded { .. }
| Self::VertexRemoved { .. }
| Self::VertexPropertyChanged { .. }
)
}
pub fn is_edge_event(&self) -> bool {
matches!(
self,
Self::EdgeAdded { .. }
| Self::EdgeRemoved { .. }
| Self::EdgePropertyChanged { .. }
)
}
pub fn is_removal(&self) -> bool {
matches!(
self,
Self::VertexRemoved { .. } | Self::EdgeRemoved { .. }
)
}
pub fn is_batch(&self) -> bool {
matches!(self, Self::Batch(_))
}
pub fn flatten(self) -> Vec<GraphEvent> {
match self {
Self::Batch(events) => events.into_iter().flat_map(|e| e.flatten()).collect(),
other => vec![other],
}
}
}
pub struct EventBus {
subscribers: Mutex<Vec<mpsc::SyncSender<GraphEvent>>>,
subscriber_count: AtomicUsize,
}
impl EventBus {
pub const DEFAULT_CAPACITY: usize = 1024;
pub fn new() -> Self {
Self {
subscribers: Mutex::new(Vec::new()),
subscriber_count: AtomicUsize::new(0),
}
}
pub fn subscribe(&self) -> mpsc::Receiver<GraphEvent> {
self.subscribe_with_capacity(Self::DEFAULT_CAPACITY)
}
pub fn subscribe_with_capacity(&self, capacity: usize) -> mpsc::Receiver<GraphEvent> {
let (tx, rx) = mpsc::sync_channel(capacity);
let mut subs = self.subscribers.lock();
subs.push(tx);
self.subscriber_count.store(subs.len(), Ordering::Release);
rx
}
pub fn emit(&self, event: GraphEvent) {
if self.subscriber_count.load(Ordering::Acquire) == 0 {
return;
}
let mut subs = self.subscribers.lock();
subs.retain(|tx| {
match tx.try_send(event.clone()) {
Ok(()) => true,
Err(mpsc::TrySendError::Full(_)) => true, Err(mpsc::TrySendError::Disconnected(_)) => false, }
});
self.subscriber_count.store(subs.len(), Ordering::Release);
}
pub fn subscriber_count(&self) -> usize {
self.subscriber_count.load(Ordering::Relaxed)
}
pub fn is_empty(&self) -> bool {
self.subscriber_count() == 0
}
}
impl Default for EventBus {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Debug for EventBus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EventBus")
.field("subscriber_count", &self.subscriber_count())
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_graph_event_vertex_id() {
let event = GraphEvent::VertexAdded {
id: VertexId(1),
label: "person".to_string(),
properties: HashMap::new(),
};
assert_eq!(event.vertex_id(), Some(VertexId(1)));
assert_eq!(event.edge_id(), None);
}
#[test]
fn test_graph_event_edge_id() {
let event = GraphEvent::EdgeAdded {
id: EdgeId(10),
src: VertexId(1),
dst: VertexId(2),
label: "knows".to_string(),
properties: HashMap::new(),
};
assert_eq!(event.edge_id(), Some(EdgeId(10)));
assert_eq!(event.vertex_id(), None);
}
#[test]
fn test_graph_event_label() {
let event = GraphEvent::VertexAdded {
id: VertexId(1),
label: "person".to_string(),
properties: HashMap::new(),
};
assert_eq!(event.label(), Some("person"));
let prop_event = GraphEvent::VertexPropertyChanged {
id: VertexId(1),
key: "age".to_string(),
old_value: None,
new_value: Value::Int(30),
};
assert_eq!(prop_event.label(), None);
assert_eq!(prop_event.property_key(), Some("age"));
}
#[test]
fn test_graph_event_is_methods() {
let vertex_event = GraphEvent::VertexAdded {
id: VertexId(1),
label: "person".to_string(),
properties: HashMap::new(),
};
assert!(vertex_event.is_vertex_event());
assert!(!vertex_event.is_edge_event());
assert!(!vertex_event.is_removal());
assert!(!vertex_event.is_batch());
let removal = GraphEvent::VertexRemoved {
id: VertexId(1),
label: "person".to_string(),
};
assert!(removal.is_removal());
let batch = GraphEvent::Batch(vec![]);
assert!(batch.is_batch());
}
#[test]
fn test_graph_event_flatten() {
let batch = GraphEvent::Batch(vec![
GraphEvent::VertexAdded {
id: VertexId(1),
label: "a".to_string(),
properties: HashMap::new(),
},
GraphEvent::Batch(vec![GraphEvent::VertexAdded {
id: VertexId(2),
label: "b".to_string(),
properties: HashMap::new(),
}]),
]);
let flat = batch.flatten();
assert_eq!(flat.len(), 2);
assert_eq!(flat[0].vertex_id(), Some(VertexId(1)));
assert_eq!(flat[1].vertex_id(), Some(VertexId(2)));
}
#[test]
fn test_event_bus_no_subscribers_fast_path() {
let bus = EventBus::new();
assert!(bus.is_empty());
assert_eq!(bus.subscriber_count(), 0);
bus.emit(GraphEvent::VertexAdded {
id: VertexId(1),
label: "test".to_string(),
properties: HashMap::new(),
});
}
#[test]
fn test_event_bus_single_subscriber() {
let bus = EventBus::new();
let rx = bus.subscribe();
assert_eq!(bus.subscriber_count(), 1);
bus.emit(GraphEvent::VertexAdded {
id: VertexId(1),
label: "person".to_string(),
properties: HashMap::new(),
});
let event = rx.try_recv().unwrap();
assert_eq!(event.vertex_id(), Some(VertexId(1)));
}
#[test]
fn test_event_bus_multiple_subscribers() {
let bus = EventBus::new();
let rx1 = bus.subscribe();
let rx2 = bus.subscribe();
assert_eq!(bus.subscriber_count(), 2);
bus.emit(GraphEvent::VertexRemoved {
id: VertexId(5),
label: "test".to_string(),
});
assert!(rx1.try_recv().is_ok());
assert!(rx2.try_recv().is_ok());
}
#[test]
fn test_event_bus_dead_subscriber_cleanup() {
let bus = EventBus::new();
let rx1 = bus.subscribe();
let _rx2 = bus.subscribe();
assert_eq!(bus.subscriber_count(), 2);
drop(rx1);
bus.emit(GraphEvent::VertexAdded {
id: VertexId(1),
label: "test".to_string(),
properties: HashMap::new(),
});
assert_eq!(bus.subscriber_count(), 1);
}
#[test]
fn test_event_bus_capacity_overflow() {
let bus = EventBus::new();
let rx = bus.subscribe_with_capacity(2);
for i in 0..5 {
bus.emit(GraphEvent::VertexAdded {
id: VertexId(i),
label: "test".to_string(),
properties: HashMap::new(),
});
}
assert!(rx.try_recv().is_ok());
assert!(rx.try_recv().is_ok());
assert_eq!(bus.subscriber_count(), 1);
}
#[test]
fn test_event_bus_all_subscribers_dropped() {
let bus = EventBus::new();
let rx = bus.subscribe();
drop(rx);
bus.emit(GraphEvent::VertexAdded {
id: VertexId(1),
label: "test".to_string(),
properties: HashMap::new(),
});
assert_eq!(bus.subscriber_count(), 0);
assert!(bus.is_empty());
}
}