1use parking_lot::RwLock;
4
5use crate::edge::EdgeType;
6use crate::types::{AgentId, MemoryId, SpaceId};
7
8pub type SubscriberId = usize;
10
11#[derive(Debug, Clone)]
13pub enum MenteEvent {
14 MemoryCreated {
15 id: MemoryId,
16 agent_id: AgentId,
17 },
18 MemoryUpdated {
19 id: MemoryId,
20 version: u64,
21 },
22 MemoryDeleted {
23 id: MemoryId,
24 },
25 EdgeCreated {
26 source: MemoryId,
27 target: MemoryId,
28 edge_type: EdgeType,
29 },
30 BeliefChanged {
31 id: MemoryId,
32 old_confidence: f32,
33 new_confidence: f32,
34 },
35 SpaceCreated {
36 id: SpaceId,
37 },
38 ContradictionDetected {
39 a: MemoryId,
40 b: MemoryId,
41 },
42}
43
44type Subscriber = Box<dyn Fn(&MenteEvent) + Send + Sync>;
46
47struct Entry {
48 id: SubscriberId,
49 callback: Subscriber,
50}
51
52pub struct EventBus {
54 subscribers: RwLock<Vec<Entry>>,
55 next_id: RwLock<usize>,
56}
57
58impl std::fmt::Debug for EventBus {
59 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60 f.debug_struct("EventBus")
61 .field("subscriber_count", &self.subscribers.read().len())
62 .finish()
63 }
64}
65
66impl Default for EventBus {
67 fn default() -> Self {
68 Self {
69 subscribers: RwLock::new(Vec::new()),
70 next_id: RwLock::new(0),
71 }
72 }
73}
74
75impl EventBus {
76 pub fn new() -> Self {
77 Self::default()
78 }
79
80 pub fn subscribe(
82 &self,
83 callback: impl Fn(&MenteEvent) + Send + Sync + 'static,
84 ) -> SubscriberId {
85 let mut next = self.next_id.write();
86 let id = *next;
87 *next += 1;
88 self.subscribers.write().push(Entry {
89 id,
90 callback: Box::new(callback),
91 });
92 id
93 }
94
95 pub fn unsubscribe(&self, id: SubscriberId) {
97 self.subscribers.write().retain(|e| e.id != id);
98 }
99
100 pub fn publish(&self, event: MenteEvent) {
102 let subs = self.subscribers.read();
103 for entry in subs.iter() {
104 (entry.callback)(&event);
105 }
106 }
107}
108
109#[cfg(test)]
110mod tests {
111 use super::*;
112 use std::sync::Arc;
113 use std::sync::atomic::{AtomicUsize, Ordering};
114 use uuid::Uuid;
115
116 #[test]
117 fn subscribe_and_publish() {
118 let bus = EventBus::new();
119 let count = Arc::new(AtomicUsize::new(0));
120 let c = count.clone();
121 bus.subscribe(move |_| {
122 c.fetch_add(1, Ordering::Relaxed);
123 });
124 bus.publish(MenteEvent::SpaceCreated { id: Uuid::new_v4() });
125 assert_eq!(count.load(Ordering::Relaxed), 1);
126 }
127
128 #[test]
129 fn unsubscribe() {
130 let bus = EventBus::new();
131 let count = Arc::new(AtomicUsize::new(0));
132 let c = count.clone();
133 let sid = bus.subscribe(move |_| {
134 c.fetch_add(1, Ordering::Relaxed);
135 });
136 bus.unsubscribe(sid);
137 bus.publish(MenteEvent::SpaceCreated { id: Uuid::new_v4() });
138 assert_eq!(count.load(Ordering::Relaxed), 0);
139 }
140
141 #[test]
142 fn multiple_subscribers() {
143 let bus = EventBus::new();
144 let count = Arc::new(AtomicUsize::new(0));
145 for _ in 0..3 {
146 let c = count.clone();
147 bus.subscribe(move |_| {
148 c.fetch_add(1, Ordering::Relaxed);
149 });
150 }
151 bus.publish(MenteEvent::MemoryDeleted { id: Uuid::new_v4() });
152 assert_eq!(count.load(Ordering::Relaxed), 3);
153 }
154}