1use serde::{Deserialize, Serialize};
8use sklears_core::error::{Result as SklResult, SklearsError};
9use std::collections::HashMap;
10
11pub struct EventBus {
16 subscribers: HashMap<String, Vec<String>>,
18 event_queue: Vec<ComponentEvent>,
20 event_handlers: HashMap<String, Box<dyn EventHandler>>,
22 routing_config: EventRoutingConfig,
24 event_stats: EventStatistics,
26}
27
28impl EventBus {
29 #[must_use]
31 pub fn new() -> Self {
32 Self {
33 subscribers: HashMap::new(),
34 event_queue: Vec::new(),
35 event_handlers: HashMap::new(),
36 routing_config: EventRoutingConfig::default(),
37 event_stats: EventStatistics::new(),
38 }
39 }
40
41 pub fn subscribe(&mut self, event_type: &str, component_id: &str) -> SklResult<()> {
43 self.subscribers
44 .entry(event_type.to_string())
45 .or_default()
46 .push(component_id.to_string());
47
48 self.event_stats.total_subscriptions += 1;
49 Ok(())
50 }
51
52 pub fn unsubscribe(&mut self, event_type: &str, component_id: &str) -> SklResult<()> {
54 if let Some(subscribers) = self.subscribers.get_mut(event_type) {
55 subscribers.retain(|id| id != component_id);
56 self.event_stats.total_unsubscriptions += 1;
57 }
58 Ok(())
59 }
60
61 pub fn publish(&mut self, event: ComponentEvent) -> SklResult<()> {
63 self.event_queue.push(event.clone());
64 self.event_stats.total_events_published += 1;
65 self.event_stats
66 .events_by_type
67 .entry(event.event_type.clone())
68 .and_modify(|e| *e += 1)
69 .or_insert(1);
70
71 if self.routing_config.synchronous_routing {
73 self.route_event(&event)?;
74 }
75
76 Ok(())
77 }
78
79 pub fn emit_event(&mut self, event: ComponentEvent) -> SklResult<()> {
81 self.publish(event)
82 }
83
84 pub fn process_events(&mut self) -> SklResult<Vec<EventProcessingResult>> {
86 let mut results = Vec::new();
87 let events_to_process = self.event_queue.drain(..).collect::<Vec<_>>();
88
89 for event in events_to_process {
90 let result = self.process_single_event(&event)?;
91 results.push(result);
92 }
93
94 Ok(results)
95 }
96
97 fn process_single_event(&mut self, event: &ComponentEvent) -> SklResult<EventProcessingResult> {
99 let start_time = std::time::SystemTime::now();
100 let mut delivery_count = 0;
101 let mut failed_deliveries = 0;
102
103 if let Some(target) = &event.target {
105 match self.deliver_to_target(event, target) {
106 Ok(()) => delivery_count += 1,
107 Err(_) => failed_deliveries += 1,
108 }
109 } else {
110 if let Some(subscribers) = self.subscribers.get(&event.event_type) {
112 for subscriber in subscribers {
113 match self.deliver_to_target(event, subscriber) {
114 Ok(()) => delivery_count += 1,
115 Err(_) => failed_deliveries += 1,
116 }
117 }
118 }
119 }
120
121 let processing_time = std::time::SystemTime::now()
122 .duration_since(start_time)
123 .unwrap_or_default();
124 self.event_stats.total_processing_time += processing_time;
125
126 Ok(EventProcessingResult {
127 event_id: event.event_id.clone(),
128 event_type: event.event_type.clone(),
129 delivery_count,
130 failed_deliveries,
131 processing_time,
132 success: failed_deliveries == 0,
133 })
134 }
135
136 fn route_event(&mut self, event: &ComponentEvent) -> SklResult<()> {
138 if let Some(subscribers) = self.subscribers.get(&event.event_type) {
139 for subscriber in subscribers {
140 if let Some(handler) = self.event_handlers.get(&event.event_type) {
141 handler.handle_event(event, subscriber)?;
142 }
143 }
144 }
145 Ok(())
146 }
147
148 fn deliver_to_target(&self, event: &ComponentEvent, target: &str) -> SklResult<()> {
150 if self
153 .subscribers
154 .values()
155 .any(|subs| subs.contains(&target.to_string()))
156 {
157 Ok(())
158 } else {
159 Err(SklearsError::InvalidInput(format!(
160 "Target component {target} not found"
161 )))
162 }
163 }
164
165 pub fn register_handler(&mut self, event_type: &str, handler: Box<dyn EventHandler>) {
167 self.event_handlers.insert(event_type.to_string(), handler);
168 }
169
170 #[must_use]
172 pub fn get_statistics(&self) -> &EventStatistics {
173 &self.event_stats
174 }
175
176 #[must_use]
178 pub fn get_subscribers(&self, event_type: &str) -> Vec<String> {
179 self.subscribers
180 .get(event_type)
181 .cloned()
182 .unwrap_or_default()
183 }
184
185 pub fn clear_queue(&mut self) {
187 self.event_queue.clear();
188 }
189
190 #[must_use]
192 pub fn queue_size(&self) -> usize {
193 self.event_queue.len()
194 }
195
196 pub fn configure_routing(&mut self, config: EventRoutingConfig) {
198 self.routing_config = config;
199 }
200}
201
202impl std::fmt::Debug for EventBus {
203 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204 f.debug_struct("EventBus")
205 .field("subscribers", &self.subscribers)
206 .field(
207 "event_queue",
208 &format!("<{} events>", self.event_queue.len()),
209 )
210 .field(
211 "event_handlers",
212 &format!("<{} handlers>", self.event_handlers.len()),
213 )
214 .field("routing_config", &self.routing_config)
215 .field("event_stats", &self.event_stats)
216 .finish()
217 }
218}
219
220#[derive(Debug, Clone, Serialize, Deserialize)]
222pub struct ComponentEvent {
223 pub event_id: String,
225 pub source: String,
227 pub target: Option<String>,
229 pub event_type: String,
231 pub data: HashMap<String, String>,
233 pub timestamp: std::time::SystemTime,
235 pub priority: EventPriority,
237 pub metadata: EventMetadata,
239}
240
241impl ComponentEvent {
242 #[must_use]
244 pub fn new(source: &str, event_type: &str) -> Self {
245 Self {
246 event_id: uuid::Uuid::new_v4().to_string(),
247 source: source.to_string(),
248 target: None,
249 event_type: event_type.to_string(),
250 data: HashMap::new(),
251 timestamp: std::time::SystemTime::now(),
252 priority: EventPriority::Normal,
253 metadata: EventMetadata::new(),
254 }
255 }
256
257 #[must_use]
259 pub fn with_target(mut self, target: &str) -> Self {
260 self.target = Some(target.to_string());
261 self
262 }
263
264 #[must_use]
266 pub fn with_data(mut self, key: &str, value: &str) -> Self {
267 self.data.insert(key.to_string(), value.to_string());
268 self
269 }
270
271 #[must_use]
273 pub fn with_priority(mut self, priority: EventPriority) -> Self {
274 self.priority = priority;
275 self
276 }
277
278 #[must_use]
280 pub fn with_metadata(mut self, key: &str, value: &str) -> Self {
281 self.metadata
282 .custom_fields
283 .insert(key.to_string(), value.to_string());
284 self
285 }
286}
287
288#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
290pub enum EventPriority {
291 Low,
293 Normal,
295 High,
297 Critical,
299 Emergency,
301}
302
303#[derive(Debug, Clone, Serialize, Deserialize)]
305pub struct EventMetadata {
306 pub category: EventCategory,
308 pub correlation_id: Option<String>,
310 pub sequence_number: Option<u64>,
312 pub custom_fields: HashMap<String, String>,
314}
315
316impl EventMetadata {
317 #[must_use]
318 pub fn new() -> Self {
319 Self {
320 category: EventCategory::General,
321 correlation_id: None,
322 sequence_number: None,
323 custom_fields: HashMap::new(),
324 }
325 }
326}
327
328#[derive(Debug, Clone, Serialize, Deserialize)]
330pub enum EventCategory {
331 General,
333 Lifecycle,
335 Error,
337 Performance,
339 Security,
341 Configuration,
343 Custom(String),
345}
346
347pub trait EventHandler: Send + Sync {
349 fn handle_event(&self, event: &ComponentEvent, target: &str) -> SklResult<()>;
351
352 fn handler_id(&self) -> &str;
354
355 fn can_handle(&self, event_type: &str) -> bool;
357}
358
359#[derive(Debug, Clone)]
361pub struct EventRoutingConfig {
362 pub synchronous_routing: bool,
364 pub max_queue_size: usize,
366 pub event_timeout: std::time::Duration,
368 pub enable_persistence: bool,
370 pub routing_rules: HashMap<String, RoutingRule>,
372}
373
374impl Default for EventRoutingConfig {
375 fn default() -> Self {
376 Self {
377 synchronous_routing: false,
378 max_queue_size: 1000,
379 event_timeout: std::time::Duration::from_secs(30),
380 enable_persistence: false,
381 routing_rules: HashMap::new(),
382 }
383 }
384}
385
386#[derive(Debug, Clone)]
388pub struct RoutingRule {
389 pub targets: Vec<String>,
391 pub strategy: RoutingStrategy,
393 pub require_acknowledgment: bool,
395 pub max_retries: u32,
397}
398
399#[derive(Debug, Clone)]
401pub enum RoutingStrategy {
402 Broadcast,
404 RoundRobin,
406 FirstAvailable,
408 LoadBalanced,
410}
411
412#[derive(Debug, Clone)]
414pub struct EventProcessingResult {
415 pub event_id: String,
417 pub event_type: String,
419 pub delivery_count: usize,
421 pub failed_deliveries: usize,
423 pub processing_time: std::time::Duration,
425 pub success: bool,
427}
428
429#[derive(Debug, Clone)]
431pub struct EventStatistics {
432 pub total_events_published: u64,
434 pub total_subscriptions: u64,
436 pub total_unsubscriptions: u64,
438 pub events_by_type: HashMap<String, u64>,
440 pub total_processing_time: std::time::Duration,
442 pub average_processing_time: std::time::Duration,
444}
445
446impl Default for EventStatistics {
447 fn default() -> Self {
448 Self::new()
449 }
450}
451
452impl EventStatistics {
453 #[must_use]
454 pub fn new() -> Self {
455 Self {
456 total_events_published: 0,
457 total_subscriptions: 0,
458 total_unsubscriptions: 0,
459 events_by_type: HashMap::new(),
460 total_processing_time: std::time::Duration::from_secs(0),
461 average_processing_time: std::time::Duration::from_secs(0),
462 }
463 }
464
465 pub fn update_average_processing_time(&mut self) {
467 if self.total_events_published > 0 {
468 self.average_processing_time =
469 self.total_processing_time / self.total_events_published as u32;
470 }
471 }
472}
473
474impl Default for EventBus {
475 fn default() -> Self {
476 Self::new()
477 }
478}
479
480impl Default for EventMetadata {
481 fn default() -> Self {
482 Self::new()
483 }
484}
485
486#[allow(non_snake_case)]
487#[cfg(test)]
488mod tests {
489 use super::*;
490
491 #[test]
492 fn test_event_bus_creation() {
493 let event_bus = EventBus::new();
494 assert_eq!(event_bus.queue_size(), 0);
495 assert_eq!(event_bus.get_statistics().total_events_published, 0);
496 }
497
498 #[test]
499 fn test_subscription_management() {
500 let mut event_bus = EventBus::new();
501
502 let result = event_bus.subscribe("test_event", "component_1");
503 assert!(result.is_ok());
504
505 let subscribers = event_bus.get_subscribers("test_event");
506 assert_eq!(subscribers.len(), 1);
507 assert!(subscribers.contains(&"component_1".to_string()));
508
509 let result = event_bus.unsubscribe("test_event", "component_1");
510 assert!(result.is_ok());
511
512 let subscribers = event_bus.get_subscribers("test_event");
513 assert_eq!(subscribers.len(), 0);
514 }
515
516 #[test]
517 fn test_event_publishing() {
518 let mut event_bus = EventBus::new();
519
520 let event = ComponentEvent::new("source_component", "test_event")
521 .with_data("key", "value")
522 .with_priority(EventPriority::High);
523
524 let result = event_bus.publish(event);
525 assert!(result.is_ok());
526 assert_eq!(event_bus.queue_size(), 1);
527 assert_eq!(event_bus.get_statistics().total_events_published, 1);
528 }
529
530 #[test]
531 fn test_event_processing() {
532 let mut event_bus = EventBus::new();
533 event_bus.subscribe("test_event", "component_1").unwrap();
534
535 let event = ComponentEvent::new("source_component", "test_event");
536 event_bus.publish(event).unwrap();
537
538 let results = event_bus.process_events().unwrap();
539 assert_eq!(results.len(), 1);
540 assert_eq!(event_bus.queue_size(), 0);
541 }
542
543 #[test]
544 fn test_targeted_events() {
545 let mut event_bus = EventBus::new();
546 event_bus.subscribe("test_event", "component_1").unwrap();
547 event_bus.subscribe("test_event", "component_2").unwrap();
548
549 let event =
550 ComponentEvent::new("source_component", "test_event").with_target("component_1");
551
552 event_bus.publish(event).unwrap();
553 let results = event_bus.process_events().unwrap();
554
555 assert_eq!(results.len(), 1);
556 assert_eq!(results[0].delivery_count, 1);
557 }
558
559 #[test]
560 fn test_event_statistics() {
561 let mut event_bus = EventBus::new();
562 event_bus.subscribe("event_type_1", "component_1").unwrap();
563 event_bus.subscribe("event_type_2", "component_2").unwrap();
564
565 let event1 = ComponentEvent::new("source", "event_type_1");
566 let event2 = ComponentEvent::new("source", "event_type_1");
567 let event3 = ComponentEvent::new("source", "event_type_2");
568
569 event_bus.publish(event1).unwrap();
570 event_bus.publish(event2).unwrap();
571 event_bus.publish(event3).unwrap();
572
573 let stats = event_bus.get_statistics();
574 assert_eq!(stats.total_events_published, 3);
575 assert_eq!(stats.total_subscriptions, 2);
576 assert_eq!(stats.events_by_type.get("event_type_1"), Some(&2));
577 assert_eq!(stats.events_by_type.get("event_type_2"), Some(&1));
578 }
579}