sklears_compose/modular_framework/
event_system.rs

1//! Event System for Component Communication
2//!
3//! This module provides comprehensive event-driven communication capabilities for
4//! modular components including event buses, component events, and subscription
5//! management for decoupled component interaction.
6
7use serde::{Deserialize, Serialize};
8use sklears_core::error::{Result as SklResult, SklearsError};
9use std::collections::HashMap;
10
11/// Event bus for component communication
12///
13/// Provides publish-subscribe messaging between components with event routing,
14/// subscription management, and event queuing for asynchronous processing.
15pub struct EventBus {
16    /// Subscribers mapped by event type
17    subscribers: HashMap<String, Vec<String>>,
18    /// Event queue for processing
19    event_queue: Vec<ComponentEvent>,
20    /// Event handlers for different event types
21    event_handlers: HashMap<String, Box<dyn EventHandler>>,
22    /// Event routing configuration
23    routing_config: EventRoutingConfig,
24    /// Event statistics for monitoring
25    event_stats: EventStatistics,
26}
27
28impl EventBus {
29    /// Create a new event bus
30    #[must_use]
31    pub fn new() -> Self {
32        Self {
33            subscribers: HashMap::new(),
34            event_queue: Vec::new(),
35            event_handlers: HashMap::new(),
36            routing_config: EventRoutingConfig::default(),
37            event_stats: EventStatistics::new(),
38        }
39    }
40
41    /// Subscribe a component to an event type
42    pub fn subscribe(&mut self, event_type: &str, component_id: &str) -> SklResult<()> {
43        self.subscribers
44            .entry(event_type.to_string())
45            .or_default()
46            .push(component_id.to_string());
47
48        self.event_stats.total_subscriptions += 1;
49        Ok(())
50    }
51
52    /// Unsubscribe a component from an event type
53    pub fn unsubscribe(&mut self, event_type: &str, component_id: &str) -> SklResult<()> {
54        if let Some(subscribers) = self.subscribers.get_mut(event_type) {
55            subscribers.retain(|id| id != component_id);
56            self.event_stats.total_unsubscriptions += 1;
57        }
58        Ok(())
59    }
60
61    /// Publish an event to the bus
62    pub fn publish(&mut self, event: ComponentEvent) -> SklResult<()> {
63        self.event_queue.push(event.clone());
64        self.event_stats.total_events_published += 1;
65        self.event_stats
66            .events_by_type
67            .entry(event.event_type.clone())
68            .and_modify(|e| *e += 1)
69            .or_insert(1);
70
71        // Route event immediately if synchronous routing is enabled
72        if self.routing_config.synchronous_routing {
73            self.route_event(&event)?;
74        }
75
76        Ok(())
77    }
78
79    /// Emit an event (alias for publish for backward compatibility)
80    pub fn emit_event(&mut self, event: ComponentEvent) -> SklResult<()> {
81        self.publish(event)
82    }
83
84    /// Process all queued events
85    pub fn process_events(&mut self) -> SklResult<Vec<EventProcessingResult>> {
86        let mut results = Vec::new();
87        let events_to_process = self.event_queue.drain(..).collect::<Vec<_>>();
88
89        for event in events_to_process {
90            let result = self.process_single_event(&event)?;
91            results.push(result);
92        }
93
94        Ok(results)
95    }
96
97    /// Process a single event
98    fn process_single_event(&mut self, event: &ComponentEvent) -> SklResult<EventProcessingResult> {
99        let start_time = std::time::SystemTime::now();
100        let mut delivery_count = 0;
101        let mut failed_deliveries = 0;
102
103        // Route to target if specified
104        if let Some(target) = &event.target {
105            match self.deliver_to_target(event, target) {
106                Ok(()) => delivery_count += 1,
107                Err(_) => failed_deliveries += 1,
108            }
109        } else {
110            // Broadcast to all subscribers of this event type
111            if let Some(subscribers) = self.subscribers.get(&event.event_type) {
112                for subscriber in subscribers {
113                    match self.deliver_to_target(event, subscriber) {
114                        Ok(()) => delivery_count += 1,
115                        Err(_) => failed_deliveries += 1,
116                    }
117                }
118            }
119        }
120
121        let processing_time = std::time::SystemTime::now()
122            .duration_since(start_time)
123            .unwrap_or_default();
124        self.event_stats.total_processing_time += processing_time;
125
126        Ok(EventProcessingResult {
127            event_id: event.event_id.clone(),
128            event_type: event.event_type.clone(),
129            delivery_count,
130            failed_deliveries,
131            processing_time,
132            success: failed_deliveries == 0,
133        })
134    }
135
136    /// Route an event to its subscribers
137    fn route_event(&mut self, event: &ComponentEvent) -> SklResult<()> {
138        if let Some(subscribers) = self.subscribers.get(&event.event_type) {
139            for subscriber in subscribers {
140                if let Some(handler) = self.event_handlers.get(&event.event_type) {
141                    handler.handle_event(event, subscriber)?;
142                }
143            }
144        }
145        Ok(())
146    }
147
148    /// Deliver event to specific target
149    fn deliver_to_target(&self, event: &ComponentEvent, target: &str) -> SklResult<()> {
150        // In a real implementation, this would deliver to the actual component
151        // For now, we'll just validate the target exists
152        if self
153            .subscribers
154            .values()
155            .any(|subs| subs.contains(&target.to_string()))
156        {
157            Ok(())
158        } else {
159            Err(SklearsError::InvalidInput(format!(
160                "Target component {target} not found"
161            )))
162        }
163    }
164
165    /// Register an event handler
166    pub fn register_handler(&mut self, event_type: &str, handler: Box<dyn EventHandler>) {
167        self.event_handlers.insert(event_type.to_string(), handler);
168    }
169
170    /// Get event statistics
171    #[must_use]
172    pub fn get_statistics(&self) -> &EventStatistics {
173        &self.event_stats
174    }
175
176    /// Get subscribers for an event type
177    #[must_use]
178    pub fn get_subscribers(&self, event_type: &str) -> Vec<String> {
179        self.subscribers
180            .get(event_type)
181            .cloned()
182            .unwrap_or_default()
183    }
184
185    /// Clear all events from the queue
186    pub fn clear_queue(&mut self) {
187        self.event_queue.clear();
188    }
189
190    /// Get current queue size
191    #[must_use]
192    pub fn queue_size(&self) -> usize {
193        self.event_queue.len()
194    }
195
196    /// Configure event routing
197    pub fn configure_routing(&mut self, config: EventRoutingConfig) {
198        self.routing_config = config;
199    }
200}
201
202impl std::fmt::Debug for EventBus {
203    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204        f.debug_struct("EventBus")
205            .field("subscribers", &self.subscribers)
206            .field(
207                "event_queue",
208                &format!("<{} events>", self.event_queue.len()),
209            )
210            .field(
211                "event_handlers",
212                &format!("<{} handlers>", self.event_handlers.len()),
213            )
214            .field("routing_config", &self.routing_config)
215            .field("event_stats", &self.event_stats)
216            .finish()
217    }
218}
219
220/// Component event for inter-component communication
221#[derive(Debug, Clone, Serialize, Deserialize)]
222pub struct ComponentEvent {
223    /// Unique event identifier
224    pub event_id: String,
225    /// Source component identifier
226    pub source: String,
227    /// Target component (optional, if None broadcasts to all subscribers)
228    pub target: Option<String>,
229    /// Event type identifier
230    pub event_type: String,
231    /// Event data payload
232    pub data: HashMap<String, String>,
233    /// Event timestamp
234    pub timestamp: std::time::SystemTime,
235    /// Event priority level
236    pub priority: EventPriority,
237    /// Event metadata
238    pub metadata: EventMetadata,
239}
240
241impl ComponentEvent {
242    /// Create a new component event
243    #[must_use]
244    pub fn new(source: &str, event_type: &str) -> Self {
245        Self {
246            event_id: uuid::Uuid::new_v4().to_string(),
247            source: source.to_string(),
248            target: None,
249            event_type: event_type.to_string(),
250            data: HashMap::new(),
251            timestamp: std::time::SystemTime::now(),
252            priority: EventPriority::Normal,
253            metadata: EventMetadata::new(),
254        }
255    }
256
257    /// Set target component
258    #[must_use]
259    pub fn with_target(mut self, target: &str) -> Self {
260        self.target = Some(target.to_string());
261        self
262    }
263
264    /// Add data to the event
265    #[must_use]
266    pub fn with_data(mut self, key: &str, value: &str) -> Self {
267        self.data.insert(key.to_string(), value.to_string());
268        self
269    }
270
271    /// Set event priority
272    #[must_use]
273    pub fn with_priority(mut self, priority: EventPriority) -> Self {
274        self.priority = priority;
275        self
276    }
277
278    /// Add metadata to the event
279    #[must_use]
280    pub fn with_metadata(mut self, key: &str, value: &str) -> Self {
281        self.metadata
282            .custom_fields
283            .insert(key.to_string(), value.to_string());
284        self
285    }
286}
287
288/// Event priority levels
289#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
290pub enum EventPriority {
291    /// Low
292    Low,
293    /// Normal
294    Normal,
295    /// High
296    High,
297    /// Critical
298    Critical,
299    /// Emergency
300    Emergency,
301}
302
303/// Event metadata
304#[derive(Debug, Clone, Serialize, Deserialize)]
305pub struct EventMetadata {
306    /// Event category
307    pub category: EventCategory,
308    /// Correlation ID for related events
309    pub correlation_id: Option<String>,
310    /// Event sequence number
311    pub sequence_number: Option<u64>,
312    /// Custom metadata fields
313    pub custom_fields: HashMap<String, String>,
314}
315
316impl EventMetadata {
317    #[must_use]
318    pub fn new() -> Self {
319        Self {
320            category: EventCategory::General,
321            correlation_id: None,
322            sequence_number: None,
323            custom_fields: HashMap::new(),
324        }
325    }
326}
327
328/// Event categories for classification
329#[derive(Debug, Clone, Serialize, Deserialize)]
330pub enum EventCategory {
331    /// General
332    General,
333    /// Lifecycle
334    Lifecycle,
335    /// Error
336    Error,
337    /// Performance
338    Performance,
339    /// Security
340    Security,
341    /// Configuration
342    Configuration,
343    /// Custom
344    Custom(String),
345}
346
347/// Event handler trait for processing events
348pub trait EventHandler: Send + Sync {
349    /// Handle an event for a specific component
350    fn handle_event(&self, event: &ComponentEvent, target: &str) -> SklResult<()>;
351
352    /// Get handler identifier
353    fn handler_id(&self) -> &str;
354
355    /// Check if handler can process the event type
356    fn can_handle(&self, event_type: &str) -> bool;
357}
358
359/// Event routing configuration
360#[derive(Debug, Clone)]
361pub struct EventRoutingConfig {
362    /// Enable synchronous event routing
363    pub synchronous_routing: bool,
364    /// Maximum event queue size
365    pub max_queue_size: usize,
366    /// Event timeout duration
367    pub event_timeout: std::time::Duration,
368    /// Enable event persistence
369    pub enable_persistence: bool,
370    /// Routing rules for specific event types
371    pub routing_rules: HashMap<String, RoutingRule>,
372}
373
374impl Default for EventRoutingConfig {
375    fn default() -> Self {
376        Self {
377            synchronous_routing: false,
378            max_queue_size: 1000,
379            event_timeout: std::time::Duration::from_secs(30),
380            enable_persistence: false,
381            routing_rules: HashMap::new(),
382        }
383    }
384}
385
386/// Routing rule for specific event types
387#[derive(Debug, Clone)]
388pub struct RoutingRule {
389    /// Target components for this event type
390    pub targets: Vec<String>,
391    /// Routing strategy
392    pub strategy: RoutingStrategy,
393    /// Whether to require acknowledgment
394    pub require_acknowledgment: bool,
395    /// Maximum retry attempts
396    pub max_retries: u32,
397}
398
399/// Routing strategies
400#[derive(Debug, Clone)]
401pub enum RoutingStrategy {
402    /// Broadcast to all targets
403    Broadcast,
404    /// Round-robin among targets
405    RoundRobin,
406    /// Send to first available target
407    FirstAvailable,
408    /// Load balance based on target capacity
409    LoadBalanced,
410}
411
412/// Event processing result
413#[derive(Debug, Clone)]
414pub struct EventProcessingResult {
415    /// Event identifier
416    pub event_id: String,
417    /// Event type
418    pub event_type: String,
419    /// Number of successful deliveries
420    pub delivery_count: usize,
421    /// Number of failed deliveries
422    pub failed_deliveries: usize,
423    /// Processing time
424    pub processing_time: std::time::Duration,
425    /// Overall success status
426    pub success: bool,
427}
428
429/// Event statistics for monitoring
430#[derive(Debug, Clone)]
431pub struct EventStatistics {
432    /// Total events published
433    pub total_events_published: u64,
434    /// Total subscriptions
435    pub total_subscriptions: u64,
436    /// Total unsubscriptions
437    pub total_unsubscriptions: u64,
438    /// Events by type
439    pub events_by_type: HashMap<String, u64>,
440    /// Total processing time
441    pub total_processing_time: std::time::Duration,
442    /// Average processing time per event
443    pub average_processing_time: std::time::Duration,
444}
445
446impl Default for EventStatistics {
447    fn default() -> Self {
448        Self::new()
449    }
450}
451
452impl EventStatistics {
453    #[must_use]
454    pub fn new() -> Self {
455        Self {
456            total_events_published: 0,
457            total_subscriptions: 0,
458            total_unsubscriptions: 0,
459            events_by_type: HashMap::new(),
460            total_processing_time: std::time::Duration::from_secs(0),
461            average_processing_time: std::time::Duration::from_secs(0),
462        }
463    }
464
465    /// Update average processing time
466    pub fn update_average_processing_time(&mut self) {
467        if self.total_events_published > 0 {
468            self.average_processing_time =
469                self.total_processing_time / self.total_events_published as u32;
470        }
471    }
472}
473
474impl Default for EventBus {
475    fn default() -> Self {
476        Self::new()
477    }
478}
479
480impl Default for EventMetadata {
481    fn default() -> Self {
482        Self::new()
483    }
484}
485
486#[allow(non_snake_case)]
487#[cfg(test)]
488mod tests {
489    use super::*;
490
491    #[test]
492    fn test_event_bus_creation() {
493        let event_bus = EventBus::new();
494        assert_eq!(event_bus.queue_size(), 0);
495        assert_eq!(event_bus.get_statistics().total_events_published, 0);
496    }
497
498    #[test]
499    fn test_subscription_management() {
500        let mut event_bus = EventBus::new();
501
502        let result = event_bus.subscribe("test_event", "component_1");
503        assert!(result.is_ok());
504
505        let subscribers = event_bus.get_subscribers("test_event");
506        assert_eq!(subscribers.len(), 1);
507        assert!(subscribers.contains(&"component_1".to_string()));
508
509        let result = event_bus.unsubscribe("test_event", "component_1");
510        assert!(result.is_ok());
511
512        let subscribers = event_bus.get_subscribers("test_event");
513        assert_eq!(subscribers.len(), 0);
514    }
515
516    #[test]
517    fn test_event_publishing() {
518        let mut event_bus = EventBus::new();
519
520        let event = ComponentEvent::new("source_component", "test_event")
521            .with_data("key", "value")
522            .with_priority(EventPriority::High);
523
524        let result = event_bus.publish(event);
525        assert!(result.is_ok());
526        assert_eq!(event_bus.queue_size(), 1);
527        assert_eq!(event_bus.get_statistics().total_events_published, 1);
528    }
529
530    #[test]
531    fn test_event_processing() {
532        let mut event_bus = EventBus::new();
533        event_bus.subscribe("test_event", "component_1").unwrap();
534
535        let event = ComponentEvent::new("source_component", "test_event");
536        event_bus.publish(event).unwrap();
537
538        let results = event_bus.process_events().unwrap();
539        assert_eq!(results.len(), 1);
540        assert_eq!(event_bus.queue_size(), 0);
541    }
542
543    #[test]
544    fn test_targeted_events() {
545        let mut event_bus = EventBus::new();
546        event_bus.subscribe("test_event", "component_1").unwrap();
547        event_bus.subscribe("test_event", "component_2").unwrap();
548
549        let event =
550            ComponentEvent::new("source_component", "test_event").with_target("component_1");
551
552        event_bus.publish(event).unwrap();
553        let results = event_bus.process_events().unwrap();
554
555        assert_eq!(results.len(), 1);
556        assert_eq!(results[0].delivery_count, 1);
557    }
558
559    #[test]
560    fn test_event_statistics() {
561        let mut event_bus = EventBus::new();
562        event_bus.subscribe("event_type_1", "component_1").unwrap();
563        event_bus.subscribe("event_type_2", "component_2").unwrap();
564
565        let event1 = ComponentEvent::new("source", "event_type_1");
566        let event2 = ComponentEvent::new("source", "event_type_1");
567        let event3 = ComponentEvent::new("source", "event_type_2");
568
569        event_bus.publish(event1).unwrap();
570        event_bus.publish(event2).unwrap();
571        event_bus.publish(event3).unwrap();
572
573        let stats = event_bus.get_statistics();
574        assert_eq!(stats.total_events_published, 3);
575        assert_eq!(stats.total_subscriptions, 2);
576        assert_eq!(stats.events_by_type.get("event_type_1"), Some(&2));
577        assert_eq!(stats.events_by_type.get("event_type_2"), Some(&1));
578    }
579}