Skip to main content

mentedb_core/
event.rs

1//! Event System — publish/subscribe bus for memory-graph events.
2
3use parking_lot::RwLock;
4
5use crate::edge::EdgeType;
6use crate::types::{AgentId, MemoryId, SpaceId};
7
8/// A unique subscriber handle returned by [`EventBus::subscribe`].
9pub type SubscriberId = usize;
10
11/// Events emitted by the memory system.
12#[derive(Debug, Clone)]
13pub enum MenteEvent {
14    MemoryCreated {
15        id: MemoryId,
16        agent_id: AgentId,
17    },
18    MemoryUpdated {
19        id: MemoryId,
20        version: u64,
21    },
22    MemoryDeleted {
23        id: MemoryId,
24    },
25    EdgeCreated {
26        source: MemoryId,
27        target: MemoryId,
28        edge_type: EdgeType,
29    },
30    BeliefChanged {
31        id: MemoryId,
32        old_confidence: f32,
33        new_confidence: f32,
34    },
35    SpaceCreated {
36        id: SpaceId,
37    },
38    ContradictionDetected {
39        a: MemoryId,
40        b: MemoryId,
41    },
42}
43
44/// Thread-safe event bus.
45type Subscriber = Box<dyn Fn(&MenteEvent) + Send + Sync>;
46
47struct Entry {
48    id: SubscriberId,
49    callback: Subscriber,
50}
51
52/// A publish/subscribe event bus protected by `parking_lot::RwLock`.
53pub struct EventBus {
54    subscribers: RwLock<Vec<Entry>>,
55    next_id: RwLock<usize>,
56}
57
58impl std::fmt::Debug for EventBus {
59    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60        f.debug_struct("EventBus")
61            .field("subscriber_count", &self.subscribers.read().len())
62            .finish()
63    }
64}
65
66impl Default for EventBus {
67    fn default() -> Self {
68        Self {
69            subscribers: RwLock::new(Vec::new()),
70            next_id: RwLock::new(0),
71        }
72    }
73}
74
75impl EventBus {
76    pub fn new() -> Self {
77        Self::default()
78    }
79
80    /// Register a subscriber callback. Returns a handle for unsubscription.
81    pub fn subscribe(
82        &self,
83        callback: impl Fn(&MenteEvent) + Send + Sync + 'static,
84    ) -> SubscriberId {
85        let mut next = self.next_id.write();
86        let id = *next;
87        *next += 1;
88        self.subscribers.write().push(Entry {
89            id,
90            callback: Box::new(callback),
91        });
92        id
93    }
94
95    /// Remove a subscriber by handle.
96    pub fn unsubscribe(&self, id: SubscriberId) {
97        self.subscribers.write().retain(|e| e.id != id);
98    }
99
100    /// Publish an event to all current subscribers.
101    pub fn publish(&self, event: MenteEvent) {
102        let subs = self.subscribers.read();
103        for entry in subs.iter() {
104            (entry.callback)(&event);
105        }
106    }
107}
108
109#[cfg(test)]
110mod tests {
111    use super::*;
112    use std::sync::Arc;
113    use std::sync::atomic::{AtomicUsize, Ordering};
114    use uuid::Uuid;
115
116    #[test]
117    fn subscribe_and_publish() {
118        let bus = EventBus::new();
119        let count = Arc::new(AtomicUsize::new(0));
120        let c = count.clone();
121        bus.subscribe(move |_| {
122            c.fetch_add(1, Ordering::Relaxed);
123        });
124        bus.publish(MenteEvent::SpaceCreated { id: Uuid::new_v4() });
125        assert_eq!(count.load(Ordering::Relaxed), 1);
126    }
127
128    #[test]
129    fn unsubscribe() {
130        let bus = EventBus::new();
131        let count = Arc::new(AtomicUsize::new(0));
132        let c = count.clone();
133        let sid = bus.subscribe(move |_| {
134            c.fetch_add(1, Ordering::Relaxed);
135        });
136        bus.unsubscribe(sid);
137        bus.publish(MenteEvent::SpaceCreated { id: Uuid::new_v4() });
138        assert_eq!(count.load(Ordering::Relaxed), 0);
139    }
140
141    #[test]
142    fn multiple_subscribers() {
143        let bus = EventBus::new();
144        let count = Arc::new(AtomicUsize::new(0));
145        for _ in 0..3 {
146            let c = count.clone();
147            bus.subscribe(move |_| {
148                c.fetch_add(1, Ordering::Relaxed);
149            });
150        }
151        bus.publish(MenteEvent::MemoryDeleted { id: Uuid::new_v4() });
152        assert_eq!(count.load(Ordering::Relaxed), 3);
153    }
154}