Skip to main content

mentedb_core/
event.rs

1//! Event System: publish/subscribe bus for memory-graph events.
2
3use parking_lot::RwLock;
4
5use crate::edge::EdgeType;
6use crate::types::{AgentId, MemoryId, SpaceId};
7
8/// A unique subscriber handle returned by [`EventBus::subscribe`].
9pub type SubscriberId = usize;
10
11/// Events emitted by the memory system.
12#[derive(Debug, Clone)]
13pub enum MenteEvent {
14    MemoryCreated {
15        id: MemoryId,
16        agent_id: AgentId,
17    },
18    MemoryUpdated {
19        id: MemoryId,
20        version: u64,
21    },
22    MemoryDeleted {
23        id: MemoryId,
24    },
25    EdgeCreated {
26        source: MemoryId,
27        target: MemoryId,
28        edge_type: EdgeType,
29    },
30    BeliefChanged {
31        id: MemoryId,
32        old_confidence: f32,
33        new_confidence: f32,
34    },
35    SpaceCreated {
36        id: SpaceId,
37    },
38    ContradictionDetected {
39        a: MemoryId,
40        b: MemoryId,
41    },
42}
43
44/// Thread-safe event bus.
45type Subscriber = Box<dyn Fn(&MenteEvent) + Send + Sync>;
46
47struct Entry {
48    id: SubscriberId,
49    callback: Subscriber,
50}
51
52/// A publish/subscribe event bus protected by `parking_lot::RwLock`.
53pub struct EventBus {
54    subscribers: RwLock<Vec<Entry>>,
55    next_id: RwLock<usize>,
56}
57
58impl std::fmt::Debug for EventBus {
59    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60        f.debug_struct("EventBus")
61            .field("subscriber_count", &self.subscribers.read().len())
62            .finish()
63    }
64}
65
66impl Default for EventBus {
67    fn default() -> Self {
68        Self {
69            subscribers: RwLock::new(Vec::new()),
70            next_id: RwLock::new(0),
71        }
72    }
73}
74
75impl EventBus {
76    /// Creates a new empty event bus.
77    pub fn new() -> Self {
78        Self::default()
79    }
80
81    /// Register a subscriber callback. Returns a handle for unsubscription.
82    pub fn subscribe(
83        &self,
84        callback: impl Fn(&MenteEvent) + Send + Sync + 'static,
85    ) -> SubscriberId {
86        let mut next = self.next_id.write();
87        let id = *next;
88        *next += 1;
89        self.subscribers.write().push(Entry {
90            id,
91            callback: Box::new(callback),
92        });
93        id
94    }
95
96    /// Remove a subscriber by handle.
97    pub fn unsubscribe(&self, id: SubscriberId) {
98        self.subscribers.write().retain(|e| e.id != id);
99    }
100
101    /// Publish an event to all current subscribers.
102    pub fn publish(&self, event: MenteEvent) {
103        let subs = self.subscribers.read();
104        for entry in subs.iter() {
105            (entry.callback)(&event);
106        }
107    }
108}
109
110#[cfg(test)]
111mod tests {
112    use super::*;
113    use std::sync::Arc;
114    use std::sync::atomic::{AtomicUsize, Ordering};
115
116    #[test]
117    fn subscribe_and_publish() {
118        let bus = EventBus::new();
119        let count = Arc::new(AtomicUsize::new(0));
120        let c = count.clone();
121        bus.subscribe(move |_| {
122            c.fetch_add(1, Ordering::Relaxed);
123        });
124        bus.publish(MenteEvent::SpaceCreated { id: SpaceId::new() });
125        assert_eq!(count.load(Ordering::Relaxed), 1);
126    }
127
128    #[test]
129    fn unsubscribe() {
130        let bus = EventBus::new();
131        let count = Arc::new(AtomicUsize::new(0));
132        let c = count.clone();
133        let sid = bus.subscribe(move |_| {
134            c.fetch_add(1, Ordering::Relaxed);
135        });
136        bus.unsubscribe(sid);
137        bus.publish(MenteEvent::SpaceCreated { id: SpaceId::new() });
138        assert_eq!(count.load(Ordering::Relaxed), 0);
139    }
140
141    #[test]
142    fn multiple_subscribers() {
143        let bus = EventBus::new();
144        let count = Arc::new(AtomicUsize::new(0));
145        for _ in 0..3 {
146            let c = count.clone();
147            bus.subscribe(move |_| {
148                c.fetch_add(1, Ordering::Relaxed);
149            });
150        }
151        bus.publish(MenteEvent::MemoryDeleted {
152            id: MemoryId::new(),
153        });
154        assert_eq!(count.load(Ordering::Relaxed), 3);
155    }
156}