optirs_core/neuromorphic/
event_driven.rs

1// Event-Driven Optimization Algorithms
2//
3// This module implements event-driven optimization algorithms that process
4// updates asynchronously based on neuromorphic events, designed for
5// neuromorphic computing platforms with event-based architectures.
6
7use super::{
8    EventPriority, MembraneDynamicsConfig, NeuromorphicEvent, NeuromorphicMetrics, PlasticityModel,
9    STDPConfig, Spike, SpikeTrain,
10};
11use crate::error::{OptimError, Result};
12use crate::optimizers::Optimizer;
13use scirs2_core::ndarray::{Array1, Array2, ArrayBase, Data, DataMut, Dimension};
14use scirs2_core::numeric::Float;
15use std::cmp::Reverse;
16use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
17use std::fmt::Debug;
18use std::sync::{Arc, Mutex, RwLock};
19use std::time::{Duration, Instant};
20
21/// Event types for neuromorphic computing
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
23pub enum EventType {
24    /// Spike event from a neuron
25    Spike,
26
27    /// Synaptic weight update event
28    WeightUpdate,
29
30    /// Membrane potential threshold crossing
31    ThresholdCrossing,
32
33    /// Plasticity-triggered event
34    PlasticityEvent,
35
36    /// External stimulus event
37    ExternalStimulus,
38
39    /// Timer-based event
40    TimerEvent,
41
42    /// Error backpropagation event
43    ErrorEvent,
44
45    /// Homeostatic adaptation event
46    HomeostaticEvent,
47
48    /// Population synchronization event
49    SynchronizationEvent,
50
51    /// Energy budget event
52    EnergyEvent,
53}
54
55/// Event-driven optimization configuration
56#[derive(Debug, Clone)]
57pub struct EventDrivenConfig<T: Float + Debug + Send + Sync + 'static> {
58    /// Maximum event queue size
59    pub max_queue_size: usize,
60
61    /// Event processing timeout (ms)
62    pub processing_timeout: T,
63
64    /// Enable event priority scheduling
65    pub priority_scheduling: bool,
66
67    /// Event filtering threshold
68    pub event_threshold: T,
69
70    /// Enable event batching
71    pub event_batching: bool,
72
73    /// Batch size for event processing
74    pub batch_size: usize,
75
76    /// Enable temporal event correlation
77    pub temporal_correlation: bool,
78
79    /// Temporal correlation window (ms)
80    pub correlation_window: T,
81
82    /// Enable adaptive event handling
83    pub adaptive_handling: bool,
84
85    /// Event rate limits (events/second)
86    pub rate_limits: HashMap<EventType, T>,
87
88    /// Enable event compression
89    pub event_compression: bool,
90
91    /// Compression algorithm
92    pub compression_algorithm: EventCompressionAlgorithm,
93
94    /// Enable distributed event processing
95    pub distributed_processing: bool,
96
97    /// Load balancing strategy
98    pub load_balancing: LoadBalancingStrategy,
99}
100
101/// Event compression algorithms
102#[derive(Debug, Clone, Copy)]
103pub enum EventCompressionAlgorithm {
104    /// No compression
105    None,
106
107    /// Delta encoding
108    DeltaEncoding,
109
110    /// Huffman encoding
111    HuffmanEncoding,
112
113    /// Run-length encoding
114    RunLengthEncoding,
115
116    /// Sparse encoding
117    SparseEncoding,
118
119    /// Predictive encoding
120    PredictiveEncoding,
121}
122
123/// Load balancing strategies for distributed event processing
124#[derive(Debug, Clone, Copy)]
125pub enum LoadBalancingStrategy {
126    /// Round-robin distribution
127    RoundRobin,
128
129    /// Event type-based partitioning
130    TypeBased,
131
132    /// Load-aware distribution
133    LoadAware,
134
135    /// Locality-aware distribution
136    LocalityAware,
137
138    /// Dynamic load balancing
139    Dynamic,
140}
141
142impl<T: Float + Debug + Send + Sync + 'static> Default for EventDrivenConfig<T> {
143    fn default() -> Self {
144        let mut rate_limits = HashMap::new();
145        rate_limits.insert(
146            EventType::Spike,
147            T::from(1000.0).unwrap_or_else(|| T::zero()),
148        );
149        rate_limits.insert(
150            EventType::WeightUpdate,
151            T::from(100.0).unwrap_or_else(|| T::zero()),
152        );
153        rate_limits.insert(
154            EventType::PlasticityEvent,
155            T::from(50.0).unwrap_or_else(|| T::zero()),
156        );
157
158        Self {
159            max_queue_size: 10000,
160            processing_timeout: T::from(1.0).unwrap_or_else(|| T::zero()),
161            priority_scheduling: true,
162            event_threshold: T::from(0.001).unwrap_or_else(|| T::zero()),
163            event_batching: true,
164            batch_size: 32,
165            temporal_correlation: true,
166            correlation_window: T::from(10.0).unwrap_or_else(|| T::zero()),
167            adaptive_handling: true,
168            rate_limits,
169            event_compression: false,
170            compression_algorithm: EventCompressionAlgorithm::None,
171            distributed_processing: false,
172            load_balancing: LoadBalancingStrategy::RoundRobin,
173        }
174    }
175}
176
177/// Priority queue entry for event scheduling
178#[derive(Debug, Clone)]
179struct PriorityEventEntry<T: Float + Debug + Send + Sync + 'static> {
180    event: NeuromorphicEvent<T>,
181    insertion_time: Instant,
182}
183
184impl<T: Float + Debug + Send + Sync + 'static> PartialEq for PriorityEventEntry<T> {
185    fn eq(&self, other: &Self) -> bool {
186        self.event.priority == other.event.priority
187    }
188}
189
190impl<T: Float + Debug + Send + Sync + 'static> Eq for PriorityEventEntry<T> {}
191
192impl<T: Float + Debug + Send + Sync + 'static> PartialOrd for PriorityEventEntry<T> {
193    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
194        Some(self.cmp(other))
195    }
196}
197
198impl<T: Float + Debug + Send + Sync + 'static> Ord for PriorityEventEntry<T> {
199    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
200        // Higher priority events come first (reverse order)
201        other
202            .event
203            .priority
204            .cmp(&self.event.priority)
205            .then_with(|| self.insertion_time.cmp(&other.insertion_time))
206    }
207}
208
209/// Event-driven optimizer
210pub struct EventDrivenOptimizer<T: Float + Debug + Send + Sync + 'static> {
211    /// Configuration
212    config: EventDrivenConfig<T>,
213
214    /// STDP configuration
215    stdp_config: STDPConfig<T>,
216
217    /// Membrane dynamics configuration
218    membrane_config: MembraneDynamicsConfig<T>,
219
220    /// Event queue with priority scheduling
221    event_queue: BinaryHeap<PriorityEventEntry<T>>,
222
223    /// Event processing statistics
224    event_stats: HashMap<EventType, EventStatistics<T>>,
225
226    /// Current system state
227    system_state: SystemState<T>,
228
229    /// Event handlers
230    event_handlers: HashMap<EventType, Box<dyn EventHandler<T>>>,
231
232    /// Temporal correlation tracker
233    correlation_tracker: TemporalCorrelationTracker<T>,
234
235    /// Event rate limiter
236    rate_limiter: EventRateLimiter<T>,
237
238    /// Performance metrics
239    metrics: NeuromorphicMetrics<T>,
240
241    /// Distributed processing coordinator
242    distributed_coordinator: Option<DistributedEventCoordinator<T>>,
243
244    /// Event compression engine
245    compression_engine: EventCompressionEngine<T>,
246
247    /// Adaptive handler
248    adaptive_handler: AdaptiveEventHandler<T>,
249}
250
251/// Event processing statistics
252#[derive(Debug, Clone)]
253pub struct EventStatistics<T: Float + Debug + Send + Sync + 'static> {
254    /// Total events processed
255    pub total_processed: usize,
256
257    /// Average processing time (ms)
258    pub avg_processing_time: T,
259
260    /// Event rate (events/second)
261    pub event_rate: T,
262
263    /// Queue wait time (ms)
264    pub avg_queue_wait_time: T,
265
266    /// Error count
267    pub error_count: usize,
268
269    /// Last update time
270    pub last_update: Instant,
271}
272
273/// System state for event-driven optimization
274#[derive(Debug, Clone)]
275pub struct SystemState<T: Float + Debug + Send + Sync + 'static> {
276    /// Current membrane potentials
277    pub membrane_potentials: Array1<T>,
278
279    /// Synaptic weights
280    pub synaptic_weights: Array2<T>,
281
282    /// Last spike times
283    pub last_spike_times: Array1<T>,
284
285    /// Refractory states
286    pub refractory_until: Array1<T>,
287
288    /// Current simulation time
289    pub current_time: T,
290
291    /// Active neurons
292    pub active_neurons: HashSet<usize>,
293
294    /// Pending weight updates
295    pub pending_updates: HashMap<(usize, usize), T>,
296}
297
298/// Event handler trait
299trait EventHandler<T: Float + Debug + Send + Sync + 'static>: Send + Sync {
300    fn handle_event(
301        &mut self,
302        event: &NeuromorphicEvent<T>,
303        state: &mut SystemState<T>,
304    ) -> Result<()>;
305    fn can_handle(&self, eventtype: EventType) -> bool;
306}
307
308/// Spike event handler
309struct SpikeEventHandler<T: Float + Debug + Send + Sync + 'static> {
310    stdp_config: STDPConfig<T>,
311    membrane_config: MembraneDynamicsConfig<T>,
312}
313
314impl<T: Float + Debug + Send + Sync + 'static> EventHandler<T> for SpikeEventHandler<T> {
315    fn handle_event(
316        &mut self,
317        event: &NeuromorphicEvent<T>,
318        state: &mut SystemState<T>,
319    ) -> Result<()> {
320        let neuron_id = event.source_neuron;
321
322        // Generate spike
323        if neuron_id < state.membrane_potentials.len() {
324            // Reset membrane potential
325            state.membrane_potentials[neuron_id] = self.membrane_config.reset_potential;
326
327            // Set refractory period
328            state.refractory_until[neuron_id] =
329                state.current_time + self.membrane_config.refractory_period;
330
331            // Update last spike time
332            state.last_spike_times[neuron_id] = state.current_time;
333
334            // Add to active neurons
335            state.active_neurons.insert(neuron_id);
336
337            // Trigger STDP updates for connected synapses
338            self.trigger_stdp_updates(neuron_id, state)?;
339        }
340
341        Ok(())
342    }
343
344    fn can_handle(&self, event_type: EventType) -> bool {
345        event_type == EventType::Spike
346    }
347}
348
349impl<T: Float + Debug + Send + Sync + 'static> SpikeEventHandler<T> {
350    fn trigger_stdp_updates(&self, post_neuron: usize, state: &mut SystemState<T>) -> Result<()> {
351        // Check all presynaptic connections
352        for pre_neuron in 0..state.last_spike_times.len() {
353            if pre_neuron != post_neuron {
354                let pre_spike_time = state.last_spike_times[pre_neuron];
355
356                if pre_spike_time > T::from(-1000.0).unwrap_or_else(|| T::zero()) {
357                    let dt = state.current_time - pre_spike_time;
358                    let weight_change = self.compute_stdp_weight_change(dt);
359
360                    // Add to pending updates
361                    state
362                        .pending_updates
363                        .insert((pre_neuron, post_neuron), weight_change);
364                }
365            }
366        }
367
368        Ok(())
369    }
370
371    fn compute_stdp_weight_change(&self, dt: T) -> T {
372        if dt > T::zero() {
373            // Post-before-pre: LTP
374            let exp_arg = -dt / self.stdp_config.tau_pot;
375            self.stdp_config.learning_rate_pot * exp_arg.exp()
376        } else {
377            // Pre-before-post: LTD
378            let exp_arg = dt / self.stdp_config.tau_dep;
379            -self.stdp_config.learning_rate_dep * exp_arg.exp()
380        }
381    }
382}
383
384/// Weight update event handler
385struct WeightUpdateEventHandler<T: Float + Debug + Send + Sync + 'static> {
386    stdp_config: STDPConfig<T>,
387}
388
389impl<T: Float + Debug + Send + Sync + 'static> EventHandler<T> for WeightUpdateEventHandler<T> {
390    fn handle_event(
391        &mut self,
392        event: &NeuromorphicEvent<T>,
393        state: &mut SystemState<T>,
394    ) -> Result<()> {
395        let source = event.source_neuron;
396
397        if let Some(target) = event.target_neuron {
398            if source < state.synaptic_weights.nrows() && target < state.synaptic_weights.ncols() {
399                // Apply weight update
400                let current_weight = state.synaptic_weights[[source, target]];
401                let new_weight = (current_weight + event.value)
402                    .max(self.stdp_config.weight_min)
403                    .min(self.stdp_config.weight_max);
404
405                state.synaptic_weights[[source, target]] = new_weight;
406            }
407        }
408
409        Ok(())
410    }
411
412    fn can_handle(&self, event_type: EventType) -> bool {
413        event_type == EventType::WeightUpdate
414    }
415}
416
417/// Temporal correlation tracker
418struct TemporalCorrelationTracker<T: Float + Debug + Send + Sync + 'static> {
419    correlation_window: T,
420    event_history: VecDeque<(T, EventType, usize)>,
421    correlation_patterns: HashMap<(EventType, EventType), T>,
422}
423
424impl<T: Float + Debug + Send + Sync + 'static + std::ops::AddAssign> TemporalCorrelationTracker<T> {
425    fn new(correlation_window: T) -> Self {
426        Self {
427            correlation_window,
428            event_history: VecDeque::new(),
429            correlation_patterns: HashMap::new(),
430        }
431    }
432
433    fn add_event(&mut self, time: T, event_type: EventType, neuron_id: usize) {
434        // Add new event
435        self.event_history.push_back((time, event_type, neuron_id));
436
437        // Remove old events outside correlation window
438        while let Some(&(old_time, _, _)) = self.event_history.front() {
439            if time - old_time > self.correlation_window {
440                self.event_history.pop_front();
441            } else {
442                break;
443            }
444        }
445
446        // Update correlation patterns
447        self.update_correlations(time, event_type);
448    }
449
450    fn update_correlations(&mut self, current_time: T, current_event: EventType) {
451        for &(event_time, event_type_, _) in &self.event_history {
452            if current_time - event_time <= self.correlation_window {
453                let correlation_key = (event_type_, current_event);
454                let time_diff = current_time - event_time;
455                let correlation_strength = (-time_diff / self.correlation_window).exp();
456
457                *self
458                    .correlation_patterns
459                    .entry(correlation_key)
460                    .or_insert(T::zero()) += correlation_strength;
461            }
462        }
463    }
464
465    fn get_correlation(&self, event1: EventType, event2: EventType) -> T {
466        self.correlation_patterns
467            .get(&(event1, event2))
468            .copied()
469            .unwrap_or(T::zero())
470    }
471}
472
473/// Event rate limiter
474struct EventRateLimiter<T: Float + Debug + Send + Sync + 'static> {
475    rate_limits: HashMap<EventType, T>,
476    event_counts: HashMap<EventType, usize>,
477    last_reset: Instant,
478    reset_interval: Duration,
479}
480
481impl<T: Float + Debug + Send + Sync + 'static> EventRateLimiter<T> {
482    fn new(rate_limits: HashMap<EventType, T>) -> Self {
483        Self {
484            rate_limits,
485            event_counts: HashMap::new(),
486            last_reset: Instant::now(),
487            reset_interval: Duration::from_secs(1),
488        }
489    }
490
491    fn can_process(&mut self, event_type: EventType) -> bool {
492        // Reset counters if interval elapsed
493        if self.last_reset.elapsed() >= self.reset_interval {
494            self.event_counts.clear();
495            self.last_reset = Instant::now();
496        }
497
498        if let Some(&limit) = self.rate_limits.get(&event_type) {
499            let current_count = self.event_counts.get(&event_type).copied().unwrap_or(0);
500            if T::from(current_count).unwrap_or_else(|| T::zero()) < limit {
501                *self.event_counts.entry(event_type).or_insert(0) += 1;
502                true
503            } else {
504                false
505            }
506        } else {
507            true
508        }
509    }
510}
511
512/// Event compression engine
513struct EventCompressionEngine<T: Float + Debug + Send + Sync + 'static> {
514    algorithm: EventCompressionAlgorithm,
515    compression_buffer: Vec<u8>,
516    decompression_buffer: Vec<u8>,
517    _phantom: std::marker::PhantomData<T>,
518}
519
520impl<T: Float + Debug + Send + Sync + 'static> EventCompressionEngine<T> {
521    fn new(algorithm: EventCompressionAlgorithm) -> Self {
522        Self {
523            algorithm,
524            compression_buffer: Vec::new(),
525            decompression_buffer: Vec::new(),
526            _phantom: std::marker::PhantomData,
527        }
528    }
529
530    fn compress_event(&mut self, event: &NeuromorphicEvent<T>) -> Result<Vec<u8>> {
531        match self.algorithm {
532            EventCompressionAlgorithm::None => {
533                // No compression, serialize directly
534                self.serialize_event(event)
535            }
536            EventCompressionAlgorithm::DeltaEncoding => self.delta_encode_event(event),
537            EventCompressionAlgorithm::SparseEncoding => self.sparse_encode_event(event),
538            _ => {
539                // Fallback to no compression
540                self.serialize_event(event)
541            }
542        }
543    }
544
545    fn serialize_event(&self, event: &NeuromorphicEvent<T>) -> Result<Vec<u8>> {
546        // Simplified serialization
547        let mut data = Vec::new();
548        data.extend_from_slice(&(event.event_type as u8).to_le_bytes());
549        data.extend_from_slice(&event.source_neuron.to_le_bytes());
550
551        if let Some(target) = event.target_neuron {
552            data.push(1);
553            data.extend_from_slice(&target.to_le_bytes());
554        } else {
555            data.push(0);
556        }
557
558        Ok(data)
559    }
560
561    fn delta_encode_event(&mut self, event: &NeuromorphicEvent<T>) -> Result<Vec<u8>> {
562        // Simplified delta encoding implementation
563        Ok(vec![0u8; 16])
564    }
565
566    fn sparse_encode_event(&mut self, event: &NeuromorphicEvent<T>) -> Result<Vec<u8>> {
567        // Simplified sparse encoding implementation
568        Ok(vec![0u8; 8])
569    }
570}
571
572/// Adaptive event handler
573struct AdaptiveEventHandler<T: Float + Debug + Send + Sync + 'static> {
574    adaptation_rate: T,
575    performance_history: VecDeque<T>,
576    current_strategy: AdaptationStrategy,
577}
578
579#[derive(Debug, Clone, Copy)]
580enum AdaptationStrategy {
581    Conservative,
582    Balanced,
583    Aggressive,
584}
585
586impl<T: Float + Debug + Send + Sync + 'static + std::iter::Sum> AdaptiveEventHandler<T> {
587    fn new() -> Self {
588        Self {
589            adaptation_rate: T::from(0.1).unwrap_or_else(|| T::zero()),
590            performance_history: VecDeque::new(),
591            current_strategy: AdaptationStrategy::Balanced,
592        }
593    }
594
595    fn adapt_processing(&mut self, current_performance: T) {
596        self.performance_history.push_back(current_performance);
597
598        if self.performance_history.len() > 100 {
599            self.performance_history.pop_front();
600        }
601
602        if self.performance_history.len() >= 10 {
603            let recent_avg = self
604                .performance_history
605                .iter()
606                .rev()
607                .take(10)
608                .cloned()
609                .sum::<T>()
610                / T::from(10).unwrap_or_else(|| T::zero());
611            let older_avg = if self.performance_history.len() >= 20 {
612                self.performance_history
613                    .iter()
614                    .rev()
615                    .skip(10)
616                    .take(10)
617                    .cloned()
618                    .sum::<T>()
619                    / T::from(10).unwrap_or_else(|| T::zero())
620            } else {
621                recent_avg
622            };
623
624            let performance_change = recent_avg - older_avg;
625
626            self.current_strategy =
627                if performance_change > T::from(0.1).unwrap_or_else(|| T::zero()) {
628                    AdaptationStrategy::Aggressive
629                } else if performance_change < T::from(-0.1).unwrap_or_else(|| T::zero()) {
630                    AdaptationStrategy::Conservative
631                } else {
632                    AdaptationStrategy::Balanced
633                };
634        }
635    }
636
637    fn get_adaptation_factor(&self) -> T {
638        match self.current_strategy {
639            AdaptationStrategy::Conservative => T::from(0.5).unwrap_or_else(|| T::zero()),
640            AdaptationStrategy::Balanced => T::one(),
641            AdaptationStrategy::Aggressive => T::from(1.5).unwrap_or_else(|| T::zero()),
642        }
643    }
644}
645
646/// Distributed event coordinator
647struct DistributedEventCoordinator<T: Float + Debug + Send + Sync + 'static> {
648    load_balancing: LoadBalancingStrategy,
649    worker_loads: HashMap<usize, T>,
650    current_worker: usize,
651    total_workers: usize,
652}
653
654impl<T: Float + Debug + Send + Sync + 'static> DistributedEventCoordinator<T> {
655    fn new(strategy: LoadBalancingStrategy, num_workers: usize) -> Self {
656        Self {
657            load_balancing: strategy,
658            worker_loads: HashMap::new(),
659            current_worker: 0,
660            total_workers: num_workers,
661        }
662    }
663
664    fn assign_worker(&mut self, event: &NeuromorphicEvent<T>) -> usize {
665        match self.load_balancing {
666            LoadBalancingStrategy::RoundRobin => {
667                let worker = self.current_worker;
668                self.current_worker = (self.current_worker + 1) % self.total_workers;
669                worker
670            }
671            LoadBalancingStrategy::TypeBased => {
672                // Hash event type to worker
673                (event.event_type as usize) % self.total_workers
674            }
675            LoadBalancingStrategy::LoadAware => {
676                // Find worker with minimum load
677                self.worker_loads
678                    .iter()
679                    .min_by(|a, b| a.1.partial_cmp(b.1).unwrap_or(std::cmp::Ordering::Equal))
680                    .map(|(&worker_id, _)| worker_id)
681                    .unwrap_or(0)
682            }
683            _ => 0,
684        }
685    }
686
687    fn update_worker_load(&mut self, worker_id: usize, load: T) {
688        self.worker_loads.insert(worker_id, load);
689    }
690}
691
692impl<
693        T: Float
694            + Debug
695            + Send
696            + Sync
697            + 'static
698            + std::iter::Sum
699            + scirs2_core::ndarray::ScalarOperand
700            + std::ops::AddAssign,
701    > EventDrivenOptimizer<T>
702{
703    /// Create a new event-driven optimizer
704    pub fn new(
705        config: EventDrivenConfig<T>,
706        stdp_config: STDPConfig<T>,
707        membrane_config: MembraneDynamicsConfig<T>,
708        num_neurons: usize,
709    ) -> Self {
710        let mut optimizer = Self {
711            config: config.clone(),
712            stdp_config: stdp_config.clone(),
713            membrane_config: membrane_config.clone(),
714            event_queue: BinaryHeap::new(),
715            event_stats: HashMap::new(),
716            system_state: SystemState {
717                membrane_potentials: Array1::from_elem(
718                    num_neurons,
719                    membrane_config.resting_potential,
720                ),
721                synaptic_weights: Array2::ones((num_neurons, num_neurons))
722                    * T::from(0.1).unwrap_or_else(|| T::zero()),
723                last_spike_times: Array1::from_elem(
724                    num_neurons,
725                    T::from(-1000.0).unwrap_or_else(|| T::zero()),
726                ),
727                refractory_until: Array1::zeros(num_neurons),
728                current_time: T::zero(),
729                active_neurons: HashSet::new(),
730                pending_updates: HashMap::new(),
731            },
732            event_handlers: HashMap::new(),
733            correlation_tracker: TemporalCorrelationTracker::new(config.correlation_window),
734            rate_limiter: EventRateLimiter::new(config.rate_limits.clone()),
735            metrics: NeuromorphicMetrics::default(),
736            distributed_coordinator: if config.distributed_processing {
737                Some(DistributedEventCoordinator::new(config.load_balancing, 4))
738            } else {
739                None
740            },
741            compression_engine: EventCompressionEngine::new(config.compression_algorithm),
742            adaptive_handler: AdaptiveEventHandler::new(),
743        };
744
745        // Register default event handlers
746        optimizer.register_default_handlers();
747
748        optimizer
749    }
750
751    /// Register default event handlers
752    fn register_default_handlers(&mut self) {
753        let spike_handler = Box::new(SpikeEventHandler {
754            stdp_config: self.stdp_config.clone(),
755            membrane_config: self.membrane_config.clone(),
756        });
757
758        let weight_handler = Box::new(WeightUpdateEventHandler {
759            stdp_config: self.stdp_config.clone(),
760        });
761
762        self.event_handlers.insert(EventType::Spike, spike_handler);
763        self.event_handlers
764            .insert(EventType::WeightUpdate, weight_handler);
765    }
766
767    /// Add event to the processing queue
768    pub fn enqueue_event(&mut self, event: NeuromorphicEvent<T>) -> Result<()> {
769        // Check rate limits
770        if !self.rate_limiter.can_process(event.event_type) {
771            return Err(OptimError::InvalidConfig("Rate limit exceeded".to_string()));
772        }
773
774        // Check queue capacity
775        if self.event_queue.len() >= self.config.max_queue_size {
776            return Err(OptimError::InvalidConfig("Event queue full".to_string()));
777        }
778
779        // Extract event fields before moving
780        let timestamp = event.timestamp;
781        let event_type = event.event_type;
782        let source_neuron = event.source_neuron;
783
784        let entry = PriorityEventEntry {
785            event,
786            insertion_time: Instant::now(),
787        };
788
789        self.event_queue.push(entry);
790
791        // Update correlation tracking
792        if self.config.temporal_correlation {
793            self.correlation_tracker
794                .add_event(timestamp, event_type, source_neuron);
795        }
796
797        Ok(())
798    }
799
800    /// Process events from the queue
801    pub fn process_events(&mut self) -> Result<usize> {
802        let mut processed_count = 0;
803        let start_time = Instant::now();
804        let timeout =
805            Duration::from_millis(self.config.processing_timeout.to_u64().unwrap_or(1000));
806
807        while !self.event_queue.is_empty() && start_time.elapsed() < timeout {
808            if self.config.event_batching {
809                let batch_size = self.config.batch_size.min(self.event_queue.len());
810                processed_count += self.process_event_batch(batch_size)?;
811            } else if let Some(entry) = self.event_queue.pop() {
812                self.process_single_event(&entry.event)?;
813                processed_count += 1;
814            }
815        }
816
817        // Apply pending weight updates
818        self.apply_pending_updates()?;
819
820        // Update adaptive processing
821        let processing_rate = T::from(processed_count).unwrap_or_else(|| T::zero())
822            / T::from(start_time.elapsed().as_millis()).unwrap();
823        self.adaptive_handler.adapt_processing(processing_rate);
824
825        Ok(processed_count)
826    }
827
828    /// Process a batch of events
829    fn process_event_batch(&mut self, batch_size: usize) -> Result<usize> {
830        let mut batch_events = Vec::with_capacity(batch_size);
831
832        // Collect batch events
833        for _ in 0..batch_size {
834            if let Some(entry) = self.event_queue.pop() {
835                batch_events.push(entry.event);
836            } else {
837                break;
838            }
839        }
840
841        // Process batch
842        for event in &batch_events {
843            self.process_single_event(event)?;
844        }
845
846        Ok(batch_events.len())
847    }
848
849    /// Process a single event
850    fn process_single_event(&mut self, event: &NeuromorphicEvent<T>) -> Result<()> {
851        let start_time = Instant::now();
852
853        // Find appropriate handler
854        if let Some(handler) = self.event_handlers.get_mut(&event.event_type) {
855            handler.handle_event(event, &mut self.system_state)?;
856        } else {
857            // Default handling
858            self.default_event_handling(event)?;
859        }
860
861        // Update statistics
862        let processing_time = start_time.elapsed().as_nanos() as f64 / 1_000_000.0;
863        self.update_event_statistics(
864            event.event_type,
865            T::from(processing_time).unwrap_or_else(|| T::zero()),
866        );
867
868        // Update energy consumption
869        self.metrics.energy_consumption += event.energy_cost;
870
871        Ok(())
872    }
873
874    /// Default event handling
875    fn default_event_handling(&mut self, event: &NeuromorphicEvent<T>) -> Result<()> {
876        match event.event_type {
877            EventType::ExternalStimulus => {
878                // Apply external stimulus to neuron
879                if event.source_neuron < self.system_state.membrane_potentials.len() {
880                    self.system_state.membrane_potentials[event.source_neuron] += event.value;
881                }
882            }
883            EventType::TimerEvent => {
884                // Update system time
885                self.system_state.current_time = event.timestamp;
886            }
887            _ => {
888                // Ignore unknown events
889            }
890        }
891
892        Ok(())
893    }
894
895    /// Apply pending weight updates
896    fn apply_pending_updates(&mut self) -> Result<()> {
897        for ((pre, post), weight_change) in self.system_state.pending_updates.drain() {
898            if pre < self.system_state.synaptic_weights.nrows()
899                && post < self.system_state.synaptic_weights.ncols()
900            {
901                let current_weight = self.system_state.synaptic_weights[[pre, post]];
902                let new_weight = (current_weight + weight_change)
903                    .max(self.stdp_config.weight_min)
904                    .min(self.stdp_config.weight_max);
905
906                self.system_state.synaptic_weights[[pre, post]] = new_weight;
907            }
908        }
909
910        Ok(())
911    }
912
913    /// Update event processing statistics
914    fn update_event_statistics(&mut self, event_type: EventType, processing_time: T) {
915        let stats = self
916            .event_stats
917            .entry(event_type)
918            .or_insert_with(|| EventStatistics {
919                total_processed: 0,
920                avg_processing_time: T::zero(),
921                event_rate: T::zero(),
922                avg_queue_wait_time: T::zero(),
923                error_count: 0,
924                last_update: Instant::now(),
925            });
926
927        stats.total_processed += 1;
928
929        // Update average processing _time using exponential moving average
930        let alpha = T::from(0.1).unwrap_or_else(|| T::zero());
931        stats.avg_processing_time =
932            stats.avg_processing_time * (T::one() - alpha) + processing_time * alpha;
933
934        // Update event rate
935        let time_since_last = stats.last_update.elapsed().as_secs_f64();
936        if time_since_last > 0.0 {
937            let current_rate = T::one() / T::from(time_since_last).unwrap_or_else(|| T::zero());
938            stats.event_rate = stats.event_rate * (T::one() - alpha) + current_rate * alpha;
939        }
940
941        stats.last_update = Instant::now();
942    }
943
944    /// Get event processing statistics
945    pub fn get_event_statistics(&self) -> &HashMap<EventType, EventStatistics<T>> {
946        &self.event_stats
947    }
948
949    /// Get current system state
950    pub fn get_system_state(&self) -> &SystemState<T> {
951        &self.system_state
952    }
953
954    /// Get current metrics
955    pub fn get_metrics(&self) -> &NeuromorphicMetrics<T> {
956        &self.metrics
957    }
958
959    /// Clear event queue
960    pub fn clear_event_queue(&mut self) {
961        self.event_queue.clear();
962    }
963
964    /// Get queue size
965    pub fn get_queue_size(&self) -> usize {
966        self.event_queue.len()
967    }
968
969    /// Enable distributed processing
970    pub fn enable_distributed_processing(&mut self, num_workers: usize) {
971        self.distributed_coordinator = Some(DistributedEventCoordinator::new(
972            self.config.load_balancing,
973            num_workers,
974        ));
975        self.config.distributed_processing = true;
976    }
977
978    /// Disable distributed processing
979    pub fn disable_distributed_processing(&mut self) {
980        self.distributed_coordinator = None;
981        self.config.distributed_processing = false;
982    }
983}
984
985impl<T: Float + Debug + Send + Sync + 'static> Default for EventStatistics<T> {
986    fn default() -> Self {
987        Self {
988            total_processed: 0,
989            avg_processing_time: T::zero(),
990            event_rate: T::zero(),
991            avg_queue_wait_time: T::zero(),
992            error_count: 0,
993            last_update: Instant::now(),
994        }
995    }
996}