1use parking_lot::RwLock;
4
5use crate::edge::EdgeType;
6use crate::types::{AgentId, MemoryId, SpaceId};
7
8pub type SubscriberId = usize;
10
11#[derive(Debug, Clone)]
13pub enum MenteEvent {
14 MemoryCreated {
15 id: MemoryId,
16 agent_id: AgentId,
17 },
18 MemoryUpdated {
19 id: MemoryId,
20 version: u64,
21 },
22 MemoryDeleted {
23 id: MemoryId,
24 },
25 EdgeCreated {
26 source: MemoryId,
27 target: MemoryId,
28 edge_type: EdgeType,
29 },
30 BeliefChanged {
31 id: MemoryId,
32 old_confidence: f32,
33 new_confidence: f32,
34 },
35 SpaceCreated {
36 id: SpaceId,
37 },
38 ContradictionDetected {
39 a: MemoryId,
40 b: MemoryId,
41 },
42}
43
44type Subscriber = Box<dyn Fn(&MenteEvent) + Send + Sync>;
46
47struct Entry {
48 id: SubscriberId,
49 callback: Subscriber,
50}
51
52pub struct EventBus {
54 subscribers: RwLock<Vec<Entry>>,
55 next_id: RwLock<usize>,
56}
57
58impl std::fmt::Debug for EventBus {
59 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60 f.debug_struct("EventBus")
61 .field("subscriber_count", &self.subscribers.read().len())
62 .finish()
63 }
64}
65
66impl Default for EventBus {
67 fn default() -> Self {
68 Self {
69 subscribers: RwLock::new(Vec::new()),
70 next_id: RwLock::new(0),
71 }
72 }
73}
74
75impl EventBus {
76 pub fn new() -> Self {
78 Self::default()
79 }
80
81 pub fn subscribe(
83 &self,
84 callback: impl Fn(&MenteEvent) + Send + Sync + 'static,
85 ) -> SubscriberId {
86 let mut next = self.next_id.write();
87 let id = *next;
88 *next += 1;
89 self.subscribers.write().push(Entry {
90 id,
91 callback: Box::new(callback),
92 });
93 id
94 }
95
96 pub fn unsubscribe(&self, id: SubscriberId) {
98 self.subscribers.write().retain(|e| e.id != id);
99 }
100
101 pub fn publish(&self, event: MenteEvent) {
103 let subs = self.subscribers.read();
104 for entry in subs.iter() {
105 (entry.callback)(&event);
106 }
107 }
108}
109
110#[cfg(test)]
111mod tests {
112 use super::*;
113 use std::sync::Arc;
114 use std::sync::atomic::{AtomicUsize, Ordering};
115
116 #[test]
117 fn subscribe_and_publish() {
118 let bus = EventBus::new();
119 let count = Arc::new(AtomicUsize::new(0));
120 let c = count.clone();
121 bus.subscribe(move |_| {
122 c.fetch_add(1, Ordering::Relaxed);
123 });
124 bus.publish(MenteEvent::SpaceCreated { id: SpaceId::new() });
125 assert_eq!(count.load(Ordering::Relaxed), 1);
126 }
127
128 #[test]
129 fn unsubscribe() {
130 let bus = EventBus::new();
131 let count = Arc::new(AtomicUsize::new(0));
132 let c = count.clone();
133 let sid = bus.subscribe(move |_| {
134 c.fetch_add(1, Ordering::Relaxed);
135 });
136 bus.unsubscribe(sid);
137 bus.publish(MenteEvent::SpaceCreated { id: SpaceId::new() });
138 assert_eq!(count.load(Ordering::Relaxed), 0);
139 }
140
141 #[test]
142 fn multiple_subscribers() {
143 let bus = EventBus::new();
144 let count = Arc::new(AtomicUsize::new(0));
145 for _ in 0..3 {
146 let c = count.clone();
147 bus.subscribe(move |_| {
148 c.fetch_add(1, Ordering::Relaxed);
149 });
150 }
151 bus.publish(MenteEvent::MemoryDeleted {
152 id: MemoryId::new(),
153 });
154 assert_eq!(count.load(Ordering::Relaxed), 3);
155 }
156}