1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
//! Event notification system for modules
//!
//! Handles event subscriptions and delivery to modules.
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::sync::Mutex as TokioMutex;
use tracing::{debug, info, warn};
use crate::module::ipc::protocol::{EventMessage, EventPayload, ModuleMessage};
use crate::module::traits::{EventType, ModuleError};
// Expose loaded_modules for ModuleManager to track loaded modules
impl EventManager {
/// Get reference to loaded_modules for tracking
pub fn loaded_modules(&self) -> &Arc<TokioMutex<HashMap<String, (String, u64)>>> {
&self.loaded_modules
}
/// Remove a module from loaded_modules (when unloaded).
pub async fn remove_loaded_module(&self, module_name: &str) {
let mut loaded = self.loaded_modules.lock().await;
loaded.remove(module_name);
}
}
/// Event subscription manager
pub struct EventManager {
/// Event subscribers by event type
subscribers: Arc<TokioMutex<HashMap<EventType, Vec<String>>>>,
/// Event channels for each module (module_id -> sender)
module_channels: Arc<TokioMutex<HashMap<String, mpsc::Sender<ModuleMessage>>>>,
/// Track loaded modules for sending to newly subscribing modules
/// (module_name -> (version, loaded_timestamp))
loaded_modules: Arc<TokioMutex<HashMap<String, (String, u64)>>>,
/// Event delivery statistics (for monitoring and debugging)
/// (module_id -> (successful_deliveries, failed_deliveries, channel_full_count))
delivery_stats: Arc<TokioMutex<HashMap<String, (u64, u64, u64)>>>,
}
impl EventManager {
/// Create a new event manager
pub fn new() -> Self {
Self {
subscribers: Arc::new(TokioMutex::new(HashMap::new())),
module_channels: Arc::new(TokioMutex::new(HashMap::new())),
loaded_modules: Arc::new(TokioMutex::new(HashMap::new())),
delivery_stats: Arc::new(TokioMutex::new(HashMap::new())),
}
}
/// Subscribe a module to events
///
/// When a module subscribes, it will receive ModuleLoaded events for all already-loaded modules
/// to ensure consistency (hotloaded modules know about existing modules).
///
/// ModuleLoaded events are only published AFTER subscription (after startup is complete).
/// This ensures consistent event ordering: subscription -> ModuleLoaded.
pub async fn subscribe_module(
&self,
module_id: String,
event_types: Vec<EventType>,
sender: mpsc::Sender<ModuleMessage>,
) -> Result<(), ModuleError> {
info!(
"Module {} subscribing to events: {:?}",
module_id, event_types
);
let mut subscribers = self.subscribers.lock().await;
let mut channels = self.module_channels.lock().await;
let loaded_modules = self.loaded_modules.lock().await;
// Register module channel
channels.insert(module_id.clone(), sender.clone());
// Add module to subscribers for each event type
for event_type in &event_types {
subscribers
.entry(*event_type)
.or_insert_with(Vec::new)
.push(module_id.clone());
}
// Extract module name from module_id (format: {module_name}_{uuid})
let subscribing_module_name = module_id
.split('_')
.next()
.unwrap_or(&module_id)
.to_string();
// If module subscribes to ModuleLoaded:
// 1. Send it events for all already-loaded modules (hotloaded modules get existing modules)
// 2. Publish ModuleLoaded for this newly subscribing module (if it's loaded)
// This ensures ModuleLoaded only happens AFTER subscription (startup complete)
let should_publish_loaded = if event_types.contains(&EventType::ModuleLoaded) {
use crate::module::ipc::protocol::{EventMessage, EventPayload, ModuleMessage};
// Collect already-loaded modules to send (clone data before dropping lock)
let already_loaded: Vec<(String, String)> = loaded_modules
.iter()
.map(|(name, (version, _))| (name.clone(), version.clone()))
.collect();
// Check if this module is loaded (for publishing ModuleLoaded after subscription)
let module_version = loaded_modules
.get(&subscribing_module_name)
.map(|(version, _)| version.clone());
// Send all already-loaded modules to this newly subscribing module
for (module_name, version) in &already_loaded {
let payload = EventPayload::ModuleLoaded {
module_name: module_name.clone(),
version: version.clone(),
};
let event_msg = ModuleMessage::Event(EventMessage {
event_type: EventType::ModuleLoaded,
payload,
});
// Send to module (non-blocking, best-effort)
if sender.try_send(event_msg).is_err() {
warn!("Failed to send ModuleLoaded event for {} to newly subscribing module {} (channel full)", module_name, module_id);
}
}
// Return version if module is loaded (for publishing after dropping locks)
module_version
} else {
None
};
// Drop all locks before publishing (to avoid deadlock)
drop(loaded_modules);
drop(subscribers);
drop(channels);
// Now publish ModuleLoaded for this newly subscribing module (if it's loaded)
// This ensures ModuleLoaded only happens AFTER subscription (startup complete)
// Other already-subscribed modules will receive this event
if let Some(version) = should_publish_loaded {
use crate::module::ipc::protocol::EventPayload;
let payload = EventPayload::ModuleLoaded {
module_name: subscribing_module_name.clone(),
version,
};
// Publish to all subscribers (including the newly subscribing module)
if let Err(e) = self.publish_event(EventType::ModuleLoaded, payload).await {
warn!(
"Failed to publish ModuleLoaded event for newly subscribing module {}: {}",
subscribing_module_name, e
);
} else {
info!(
"Published ModuleLoaded event for {} (after subscription)",
subscribing_module_name
);
}
}
Ok(())
}
/// Unsubscribe a module (when module unloads)
pub async fn unsubscribe_module(&self, module_id: &str) -> Result<(), ModuleError> {
debug!("Module {} unsubscribing from events", module_id);
let mut subscribers = self.subscribers.lock().await;
let mut channels = self.module_channels.lock().await;
let mut stats = self.delivery_stats.lock().await;
// Remove module channel
channels.remove(module_id);
// Remove module from all subscriber lists
for subscribers_list in subscribers.values_mut() {
subscribers_list.retain(|id| id != module_id);
}
// Clean up delivery statistics
stats.remove(module_id);
Ok(())
}
/// Publish an event to all subscribed modules
pub async fn publish_event(
&self,
event_type: EventType,
payload: EventPayload,
) -> Result<(), ModuleError> {
debug!("Publishing event: {:?}", event_type);
let subscribers = self.subscribers.lock().await;
let channels = self.module_channels.lock().await;
// Get list of modules subscribed to this event type
let module_ids = subscribers.get(&event_type).cloned().unwrap_or_default();
if module_ids.is_empty() {
return Ok(()); // No subscribers
}
// Create event message (shared via Arc to avoid cloning)
let event_message = Arc::new(ModuleMessage::Event(EventMessage {
event_type,
payload,
}));
// Build snapshot of channels while holding locks
let channels_snapshot: Vec<(String, mpsc::Sender<ModuleMessage>)> = {
module_ids
.iter()
.filter_map(|id| channels.get(id).map(|sender| (id.clone(), sender.clone())))
.collect()
};
// Explicitly drop locks before sending to avoid deadlock
drop(subscribers);
drop(channels);
// Send to all subscribed modules without holding locks
let mut failed_modules = Vec::new();
let mut stats_updates = Vec::new();
for (module_id, sender) in channels_snapshot {
let event_msg_clone = Arc::clone(&event_message);
// Use try_send to avoid blocking if channel is full or receiver is dropped
match sender.try_send((*event_msg_clone).clone()) {
Ok(()) => {
// Track successful delivery
stats_updates.push((module_id.clone(), true, false));
}
Err(mpsc::error::TrySendError::Full(_)) => {
warn!(
"Event channel full for module {} (event: {:?}), event dropped",
module_id, event_type
);
// Track channel full (but don't remove subscription - module might catch up)
stats_updates.push((module_id.clone(), false, true));
// Don't add to failed_modules - module is still alive, just slow
}
Err(mpsc::error::TrySendError::Closed(_)) => {
warn!(
"Event channel closed for module {} (event: {:?}), removing subscription",
module_id, event_type
);
// Track failed delivery and mark for removal
stats_updates.push((module_id.clone(), false, false));
failed_modules.push(module_id);
}
}
}
// Update delivery statistics
{
let mut stats = self.delivery_stats.lock().await;
for (module_id, success, channel_full) in stats_updates {
let entry = stats.entry(module_id).or_insert((0, 0, 0));
if success {
entry.0 += 1;
} else if channel_full {
entry.2 += 1; // Channel full count
} else {
entry.1 += 1; // Failed delivery count
}
}
}
// Clean up failed channels and remove from subscribers
if !failed_modules.is_empty() {
let mut channels = self.module_channels.lock().await;
let mut subscribers = self.subscribers.lock().await;
for module_id in failed_modules {
channels.remove(&module_id);
// Remove from all subscriber lists
for subscribers_list in subscribers.values_mut() {
subscribers_list.retain(|id| id != &module_id);
}
}
}
Ok(())
}
/// Get list of subscribed modules for an event type
pub async fn get_subscribers(&self, event_type: EventType) -> Vec<String> {
let subscribers = self.subscribers.lock().await;
subscribers.get(&event_type).cloned().unwrap_or_default()
}
}
impl Default for EventManager {
fn default() -> Self {
Self::new()
}
}