1use super::{
8 EventPriority, MembraneDynamicsConfig, NeuromorphicEvent, NeuromorphicMetrics, PlasticityModel,
9 STDPConfig, Spike, SpikeTrain,
10};
11use crate::error::{OptimError, Result};
12use crate::optimizers::Optimizer;
13use scirs2_core::ndarray::{Array1, Array2, ArrayBase, Data, DataMut, Dimension};
14use scirs2_core::numeric::Float;
15use std::cmp::Reverse;
16use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
17use std::fmt::Debug;
18use std::sync::{Arc, Mutex, RwLock};
19use std::time::{Duration, Instant};
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
23pub enum EventType {
24 Spike,
26
27 WeightUpdate,
29
30 ThresholdCrossing,
32
33 PlasticityEvent,
35
36 ExternalStimulus,
38
39 TimerEvent,
41
42 ErrorEvent,
44
45 HomeostaticEvent,
47
48 SynchronizationEvent,
50
51 EnergyEvent,
53}
54
55#[derive(Debug, Clone)]
57pub struct EventDrivenConfig<T: Float + Debug + Send + Sync + 'static> {
58 pub max_queue_size: usize,
60
61 pub processing_timeout: T,
63
64 pub priority_scheduling: bool,
66
67 pub event_threshold: T,
69
70 pub event_batching: bool,
72
73 pub batch_size: usize,
75
76 pub temporal_correlation: bool,
78
79 pub correlation_window: T,
81
82 pub adaptive_handling: bool,
84
85 pub rate_limits: HashMap<EventType, T>,
87
88 pub event_compression: bool,
90
91 pub compression_algorithm: EventCompressionAlgorithm,
93
94 pub distributed_processing: bool,
96
97 pub load_balancing: LoadBalancingStrategy,
99}
100
101#[derive(Debug, Clone, Copy)]
103pub enum EventCompressionAlgorithm {
104 None,
106
107 DeltaEncoding,
109
110 HuffmanEncoding,
112
113 RunLengthEncoding,
115
116 SparseEncoding,
118
119 PredictiveEncoding,
121}
122
123#[derive(Debug, Clone, Copy)]
125pub enum LoadBalancingStrategy {
126 RoundRobin,
128
129 TypeBased,
131
132 LoadAware,
134
135 LocalityAware,
137
138 Dynamic,
140}
141
142impl<T: Float + Debug + Send + Sync + 'static> Default for EventDrivenConfig<T> {
143 fn default() -> Self {
144 let mut rate_limits = HashMap::new();
145 rate_limits.insert(
146 EventType::Spike,
147 T::from(1000.0).unwrap_or_else(|| T::zero()),
148 );
149 rate_limits.insert(
150 EventType::WeightUpdate,
151 T::from(100.0).unwrap_or_else(|| T::zero()),
152 );
153 rate_limits.insert(
154 EventType::PlasticityEvent,
155 T::from(50.0).unwrap_or_else(|| T::zero()),
156 );
157
158 Self {
159 max_queue_size: 10000,
160 processing_timeout: T::from(1.0).unwrap_or_else(|| T::zero()),
161 priority_scheduling: true,
162 event_threshold: T::from(0.001).unwrap_or_else(|| T::zero()),
163 event_batching: true,
164 batch_size: 32,
165 temporal_correlation: true,
166 correlation_window: T::from(10.0).unwrap_or_else(|| T::zero()),
167 adaptive_handling: true,
168 rate_limits,
169 event_compression: false,
170 compression_algorithm: EventCompressionAlgorithm::None,
171 distributed_processing: false,
172 load_balancing: LoadBalancingStrategy::RoundRobin,
173 }
174 }
175}
176
177#[derive(Debug, Clone)]
179struct PriorityEventEntry<T: Float + Debug + Send + Sync + 'static> {
180 event: NeuromorphicEvent<T>,
181 insertion_time: Instant,
182}
183
184impl<T: Float + Debug + Send + Sync + 'static> PartialEq for PriorityEventEntry<T> {
185 fn eq(&self, other: &Self) -> bool {
186 self.event.priority == other.event.priority
187 }
188}
189
190impl<T: Float + Debug + Send + Sync + 'static> Eq for PriorityEventEntry<T> {}
191
192impl<T: Float + Debug + Send + Sync + 'static> PartialOrd for PriorityEventEntry<T> {
193 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
194 Some(self.cmp(other))
195 }
196}
197
198impl<T: Float + Debug + Send + Sync + 'static> Ord for PriorityEventEntry<T> {
199 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
200 other
202 .event
203 .priority
204 .cmp(&self.event.priority)
205 .then_with(|| self.insertion_time.cmp(&other.insertion_time))
206 }
207}
208
209pub struct EventDrivenOptimizer<T: Float + Debug + Send + Sync + 'static> {
211 config: EventDrivenConfig<T>,
213
214 stdp_config: STDPConfig<T>,
216
217 membrane_config: MembraneDynamicsConfig<T>,
219
220 event_queue: BinaryHeap<PriorityEventEntry<T>>,
222
223 event_stats: HashMap<EventType, EventStatistics<T>>,
225
226 system_state: SystemState<T>,
228
229 event_handlers: HashMap<EventType, Box<dyn EventHandler<T>>>,
231
232 correlation_tracker: TemporalCorrelationTracker<T>,
234
235 rate_limiter: EventRateLimiter<T>,
237
238 metrics: NeuromorphicMetrics<T>,
240
241 distributed_coordinator: Option<DistributedEventCoordinator<T>>,
243
244 compression_engine: EventCompressionEngine<T>,
246
247 adaptive_handler: AdaptiveEventHandler<T>,
249}
250
251#[derive(Debug, Clone)]
253pub struct EventStatistics<T: Float + Debug + Send + Sync + 'static> {
254 pub total_processed: usize,
256
257 pub avg_processing_time: T,
259
260 pub event_rate: T,
262
263 pub avg_queue_wait_time: T,
265
266 pub error_count: usize,
268
269 pub last_update: Instant,
271}
272
273#[derive(Debug, Clone)]
275pub struct SystemState<T: Float + Debug + Send + Sync + 'static> {
276 pub membrane_potentials: Array1<T>,
278
279 pub synaptic_weights: Array2<T>,
281
282 pub last_spike_times: Array1<T>,
284
285 pub refractory_until: Array1<T>,
287
288 pub current_time: T,
290
291 pub active_neurons: HashSet<usize>,
293
294 pub pending_updates: HashMap<(usize, usize), T>,
296}
297
298trait EventHandler<T: Float + Debug + Send + Sync + 'static>: Send + Sync {
300 fn handle_event(
301 &mut self,
302 event: &NeuromorphicEvent<T>,
303 state: &mut SystemState<T>,
304 ) -> Result<()>;
305 fn can_handle(&self, eventtype: EventType) -> bool;
306}
307
308struct SpikeEventHandler<T: Float + Debug + Send + Sync + 'static> {
310 stdp_config: STDPConfig<T>,
311 membrane_config: MembraneDynamicsConfig<T>,
312}
313
314impl<T: Float + Debug + Send + Sync + 'static> EventHandler<T> for SpikeEventHandler<T> {
315 fn handle_event(
316 &mut self,
317 event: &NeuromorphicEvent<T>,
318 state: &mut SystemState<T>,
319 ) -> Result<()> {
320 let neuron_id = event.source_neuron;
321
322 if neuron_id < state.membrane_potentials.len() {
324 state.membrane_potentials[neuron_id] = self.membrane_config.reset_potential;
326
327 state.refractory_until[neuron_id] =
329 state.current_time + self.membrane_config.refractory_period;
330
331 state.last_spike_times[neuron_id] = state.current_time;
333
334 state.active_neurons.insert(neuron_id);
336
337 self.trigger_stdp_updates(neuron_id, state)?;
339 }
340
341 Ok(())
342 }
343
344 fn can_handle(&self, event_type: EventType) -> bool {
345 event_type == EventType::Spike
346 }
347}
348
349impl<T: Float + Debug + Send + Sync + 'static> SpikeEventHandler<T> {
350 fn trigger_stdp_updates(&self, post_neuron: usize, state: &mut SystemState<T>) -> Result<()> {
351 for pre_neuron in 0..state.last_spike_times.len() {
353 if pre_neuron != post_neuron {
354 let pre_spike_time = state.last_spike_times[pre_neuron];
355
356 if pre_spike_time > T::from(-1000.0).unwrap_or_else(|| T::zero()) {
357 let dt = state.current_time - pre_spike_time;
358 let weight_change = self.compute_stdp_weight_change(dt);
359
360 state
362 .pending_updates
363 .insert((pre_neuron, post_neuron), weight_change);
364 }
365 }
366 }
367
368 Ok(())
369 }
370
371 fn compute_stdp_weight_change(&self, dt: T) -> T {
372 if dt > T::zero() {
373 let exp_arg = -dt / self.stdp_config.tau_pot;
375 self.stdp_config.learning_rate_pot * exp_arg.exp()
376 } else {
377 let exp_arg = dt / self.stdp_config.tau_dep;
379 -self.stdp_config.learning_rate_dep * exp_arg.exp()
380 }
381 }
382}
383
384struct WeightUpdateEventHandler<T: Float + Debug + Send + Sync + 'static> {
386 stdp_config: STDPConfig<T>,
387}
388
389impl<T: Float + Debug + Send + Sync + 'static> EventHandler<T> for WeightUpdateEventHandler<T> {
390 fn handle_event(
391 &mut self,
392 event: &NeuromorphicEvent<T>,
393 state: &mut SystemState<T>,
394 ) -> Result<()> {
395 let source = event.source_neuron;
396
397 if let Some(target) = event.target_neuron {
398 if source < state.synaptic_weights.nrows() && target < state.synaptic_weights.ncols() {
399 let current_weight = state.synaptic_weights[[source, target]];
401 let new_weight = (current_weight + event.value)
402 .max(self.stdp_config.weight_min)
403 .min(self.stdp_config.weight_max);
404
405 state.synaptic_weights[[source, target]] = new_weight;
406 }
407 }
408
409 Ok(())
410 }
411
412 fn can_handle(&self, event_type: EventType) -> bool {
413 event_type == EventType::WeightUpdate
414 }
415}
416
417struct TemporalCorrelationTracker<T: Float + Debug + Send + Sync + 'static> {
419 correlation_window: T,
420 event_history: VecDeque<(T, EventType, usize)>,
421 correlation_patterns: HashMap<(EventType, EventType), T>,
422}
423
424impl<T: Float + Debug + Send + Sync + 'static + std::ops::AddAssign> TemporalCorrelationTracker<T> {
425 fn new(correlation_window: T) -> Self {
426 Self {
427 correlation_window,
428 event_history: VecDeque::new(),
429 correlation_patterns: HashMap::new(),
430 }
431 }
432
433 fn add_event(&mut self, time: T, event_type: EventType, neuron_id: usize) {
434 self.event_history.push_back((time, event_type, neuron_id));
436
437 while let Some(&(old_time, _, _)) = self.event_history.front() {
439 if time - old_time > self.correlation_window {
440 self.event_history.pop_front();
441 } else {
442 break;
443 }
444 }
445
446 self.update_correlations(time, event_type);
448 }
449
450 fn update_correlations(&mut self, current_time: T, current_event: EventType) {
451 for &(event_time, event_type_, _) in &self.event_history {
452 if current_time - event_time <= self.correlation_window {
453 let correlation_key = (event_type_, current_event);
454 let time_diff = current_time - event_time;
455 let correlation_strength = (-time_diff / self.correlation_window).exp();
456
457 *self
458 .correlation_patterns
459 .entry(correlation_key)
460 .or_insert(T::zero()) += correlation_strength;
461 }
462 }
463 }
464
465 fn get_correlation(&self, event1: EventType, event2: EventType) -> T {
466 self.correlation_patterns
467 .get(&(event1, event2))
468 .copied()
469 .unwrap_or(T::zero())
470 }
471}
472
473struct EventRateLimiter<T: Float + Debug + Send + Sync + 'static> {
475 rate_limits: HashMap<EventType, T>,
476 event_counts: HashMap<EventType, usize>,
477 last_reset: Instant,
478 reset_interval: Duration,
479}
480
481impl<T: Float + Debug + Send + Sync + 'static> EventRateLimiter<T> {
482 fn new(rate_limits: HashMap<EventType, T>) -> Self {
483 Self {
484 rate_limits,
485 event_counts: HashMap::new(),
486 last_reset: Instant::now(),
487 reset_interval: Duration::from_secs(1),
488 }
489 }
490
491 fn can_process(&mut self, event_type: EventType) -> bool {
492 if self.last_reset.elapsed() >= self.reset_interval {
494 self.event_counts.clear();
495 self.last_reset = Instant::now();
496 }
497
498 if let Some(&limit) = self.rate_limits.get(&event_type) {
499 let current_count = self.event_counts.get(&event_type).copied().unwrap_or(0);
500 if T::from(current_count).unwrap_or_else(|| T::zero()) < limit {
501 *self.event_counts.entry(event_type).or_insert(0) += 1;
502 true
503 } else {
504 false
505 }
506 } else {
507 true
508 }
509 }
510}
511
512struct EventCompressionEngine<T: Float + Debug + Send + Sync + 'static> {
514 algorithm: EventCompressionAlgorithm,
515 compression_buffer: Vec<u8>,
516 decompression_buffer: Vec<u8>,
517 _phantom: std::marker::PhantomData<T>,
518}
519
520impl<T: Float + Debug + Send + Sync + 'static> EventCompressionEngine<T> {
521 fn new(algorithm: EventCompressionAlgorithm) -> Self {
522 Self {
523 algorithm,
524 compression_buffer: Vec::new(),
525 decompression_buffer: Vec::new(),
526 _phantom: std::marker::PhantomData,
527 }
528 }
529
530 fn compress_event(&mut self, event: &NeuromorphicEvent<T>) -> Result<Vec<u8>> {
531 match self.algorithm {
532 EventCompressionAlgorithm::None => {
533 self.serialize_event(event)
535 }
536 EventCompressionAlgorithm::DeltaEncoding => self.delta_encode_event(event),
537 EventCompressionAlgorithm::SparseEncoding => self.sparse_encode_event(event),
538 _ => {
539 self.serialize_event(event)
541 }
542 }
543 }
544
545 fn serialize_event(&self, event: &NeuromorphicEvent<T>) -> Result<Vec<u8>> {
546 let mut data = Vec::new();
548 data.extend_from_slice(&(event.event_type as u8).to_le_bytes());
549 data.extend_from_slice(&event.source_neuron.to_le_bytes());
550
551 if let Some(target) = event.target_neuron {
552 data.push(1);
553 data.extend_from_slice(&target.to_le_bytes());
554 } else {
555 data.push(0);
556 }
557
558 Ok(data)
559 }
560
561 fn delta_encode_event(&mut self, event: &NeuromorphicEvent<T>) -> Result<Vec<u8>> {
562 Ok(vec![0u8; 16])
564 }
565
566 fn sparse_encode_event(&mut self, event: &NeuromorphicEvent<T>) -> Result<Vec<u8>> {
567 Ok(vec![0u8; 8])
569 }
570}
571
572struct AdaptiveEventHandler<T: Float + Debug + Send + Sync + 'static> {
574 adaptation_rate: T,
575 performance_history: VecDeque<T>,
576 current_strategy: AdaptationStrategy,
577}
578
579#[derive(Debug, Clone, Copy)]
580enum AdaptationStrategy {
581 Conservative,
582 Balanced,
583 Aggressive,
584}
585
586impl<T: Float + Debug + Send + Sync + 'static + std::iter::Sum> AdaptiveEventHandler<T> {
587 fn new() -> Self {
588 Self {
589 adaptation_rate: T::from(0.1).unwrap_or_else(|| T::zero()),
590 performance_history: VecDeque::new(),
591 current_strategy: AdaptationStrategy::Balanced,
592 }
593 }
594
595 fn adapt_processing(&mut self, current_performance: T) {
596 self.performance_history.push_back(current_performance);
597
598 if self.performance_history.len() > 100 {
599 self.performance_history.pop_front();
600 }
601
602 if self.performance_history.len() >= 10 {
603 let recent_avg = self
604 .performance_history
605 .iter()
606 .rev()
607 .take(10)
608 .cloned()
609 .sum::<T>()
610 / T::from(10).unwrap_or_else(|| T::zero());
611 let older_avg = if self.performance_history.len() >= 20 {
612 self.performance_history
613 .iter()
614 .rev()
615 .skip(10)
616 .take(10)
617 .cloned()
618 .sum::<T>()
619 / T::from(10).unwrap_or_else(|| T::zero())
620 } else {
621 recent_avg
622 };
623
624 let performance_change = recent_avg - older_avg;
625
626 self.current_strategy =
627 if performance_change > T::from(0.1).unwrap_or_else(|| T::zero()) {
628 AdaptationStrategy::Aggressive
629 } else if performance_change < T::from(-0.1).unwrap_or_else(|| T::zero()) {
630 AdaptationStrategy::Conservative
631 } else {
632 AdaptationStrategy::Balanced
633 };
634 }
635 }
636
637 fn get_adaptation_factor(&self) -> T {
638 match self.current_strategy {
639 AdaptationStrategy::Conservative => T::from(0.5).unwrap_or_else(|| T::zero()),
640 AdaptationStrategy::Balanced => T::one(),
641 AdaptationStrategy::Aggressive => T::from(1.5).unwrap_or_else(|| T::zero()),
642 }
643 }
644}
645
646struct DistributedEventCoordinator<T: Float + Debug + Send + Sync + 'static> {
648 load_balancing: LoadBalancingStrategy,
649 worker_loads: HashMap<usize, T>,
650 current_worker: usize,
651 total_workers: usize,
652}
653
654impl<T: Float + Debug + Send + Sync + 'static> DistributedEventCoordinator<T> {
655 fn new(strategy: LoadBalancingStrategy, num_workers: usize) -> Self {
656 Self {
657 load_balancing: strategy,
658 worker_loads: HashMap::new(),
659 current_worker: 0,
660 total_workers: num_workers,
661 }
662 }
663
664 fn assign_worker(&mut self, event: &NeuromorphicEvent<T>) -> usize {
665 match self.load_balancing {
666 LoadBalancingStrategy::RoundRobin => {
667 let worker = self.current_worker;
668 self.current_worker = (self.current_worker + 1) % self.total_workers;
669 worker
670 }
671 LoadBalancingStrategy::TypeBased => {
672 (event.event_type as usize) % self.total_workers
674 }
675 LoadBalancingStrategy::LoadAware => {
676 self.worker_loads
678 .iter()
679 .min_by(|a, b| a.1.partial_cmp(b.1).unwrap_or(std::cmp::Ordering::Equal))
680 .map(|(&worker_id, _)| worker_id)
681 .unwrap_or(0)
682 }
683 _ => 0,
684 }
685 }
686
687 fn update_worker_load(&mut self, worker_id: usize, load: T) {
688 self.worker_loads.insert(worker_id, load);
689 }
690}
691
692impl<
693 T: Float
694 + Debug
695 + Send
696 + Sync
697 + 'static
698 + std::iter::Sum
699 + scirs2_core::ndarray::ScalarOperand
700 + std::ops::AddAssign,
701 > EventDrivenOptimizer<T>
702{
703 pub fn new(
705 config: EventDrivenConfig<T>,
706 stdp_config: STDPConfig<T>,
707 membrane_config: MembraneDynamicsConfig<T>,
708 num_neurons: usize,
709 ) -> Self {
710 let mut optimizer = Self {
711 config: config.clone(),
712 stdp_config: stdp_config.clone(),
713 membrane_config: membrane_config.clone(),
714 event_queue: BinaryHeap::new(),
715 event_stats: HashMap::new(),
716 system_state: SystemState {
717 membrane_potentials: Array1::from_elem(
718 num_neurons,
719 membrane_config.resting_potential,
720 ),
721 synaptic_weights: Array2::ones((num_neurons, num_neurons))
722 * T::from(0.1).unwrap_or_else(|| T::zero()),
723 last_spike_times: Array1::from_elem(
724 num_neurons,
725 T::from(-1000.0).unwrap_or_else(|| T::zero()),
726 ),
727 refractory_until: Array1::zeros(num_neurons),
728 current_time: T::zero(),
729 active_neurons: HashSet::new(),
730 pending_updates: HashMap::new(),
731 },
732 event_handlers: HashMap::new(),
733 correlation_tracker: TemporalCorrelationTracker::new(config.correlation_window),
734 rate_limiter: EventRateLimiter::new(config.rate_limits.clone()),
735 metrics: NeuromorphicMetrics::default(),
736 distributed_coordinator: if config.distributed_processing {
737 Some(DistributedEventCoordinator::new(config.load_balancing, 4))
738 } else {
739 None
740 },
741 compression_engine: EventCompressionEngine::new(config.compression_algorithm),
742 adaptive_handler: AdaptiveEventHandler::new(),
743 };
744
745 optimizer.register_default_handlers();
747
748 optimizer
749 }
750
751 fn register_default_handlers(&mut self) {
753 let spike_handler = Box::new(SpikeEventHandler {
754 stdp_config: self.stdp_config.clone(),
755 membrane_config: self.membrane_config.clone(),
756 });
757
758 let weight_handler = Box::new(WeightUpdateEventHandler {
759 stdp_config: self.stdp_config.clone(),
760 });
761
762 self.event_handlers.insert(EventType::Spike, spike_handler);
763 self.event_handlers
764 .insert(EventType::WeightUpdate, weight_handler);
765 }
766
767 pub fn enqueue_event(&mut self, event: NeuromorphicEvent<T>) -> Result<()> {
769 if !self.rate_limiter.can_process(event.event_type) {
771 return Err(OptimError::InvalidConfig("Rate limit exceeded".to_string()));
772 }
773
774 if self.event_queue.len() >= self.config.max_queue_size {
776 return Err(OptimError::InvalidConfig("Event queue full".to_string()));
777 }
778
779 let timestamp = event.timestamp;
781 let event_type = event.event_type;
782 let source_neuron = event.source_neuron;
783
784 let entry = PriorityEventEntry {
785 event,
786 insertion_time: Instant::now(),
787 };
788
789 self.event_queue.push(entry);
790
791 if self.config.temporal_correlation {
793 self.correlation_tracker
794 .add_event(timestamp, event_type, source_neuron);
795 }
796
797 Ok(())
798 }
799
800 pub fn process_events(&mut self) -> Result<usize> {
802 let mut processed_count = 0;
803 let start_time = Instant::now();
804 let timeout =
805 Duration::from_millis(self.config.processing_timeout.to_u64().unwrap_or(1000));
806
807 while !self.event_queue.is_empty() && start_time.elapsed() < timeout {
808 if self.config.event_batching {
809 let batch_size = self.config.batch_size.min(self.event_queue.len());
810 processed_count += self.process_event_batch(batch_size)?;
811 } else if let Some(entry) = self.event_queue.pop() {
812 self.process_single_event(&entry.event)?;
813 processed_count += 1;
814 }
815 }
816
817 self.apply_pending_updates()?;
819
820 let processing_rate = T::from(processed_count).unwrap_or_else(|| T::zero())
822 / T::from(start_time.elapsed().as_millis()).unwrap();
823 self.adaptive_handler.adapt_processing(processing_rate);
824
825 Ok(processed_count)
826 }
827
828 fn process_event_batch(&mut self, batch_size: usize) -> Result<usize> {
830 let mut batch_events = Vec::with_capacity(batch_size);
831
832 for _ in 0..batch_size {
834 if let Some(entry) = self.event_queue.pop() {
835 batch_events.push(entry.event);
836 } else {
837 break;
838 }
839 }
840
841 for event in &batch_events {
843 self.process_single_event(event)?;
844 }
845
846 Ok(batch_events.len())
847 }
848
849 fn process_single_event(&mut self, event: &NeuromorphicEvent<T>) -> Result<()> {
851 let start_time = Instant::now();
852
853 if let Some(handler) = self.event_handlers.get_mut(&event.event_type) {
855 handler.handle_event(event, &mut self.system_state)?;
856 } else {
857 self.default_event_handling(event)?;
859 }
860
861 let processing_time = start_time.elapsed().as_nanos() as f64 / 1_000_000.0;
863 self.update_event_statistics(
864 event.event_type,
865 T::from(processing_time).unwrap_or_else(|| T::zero()),
866 );
867
868 self.metrics.energy_consumption += event.energy_cost;
870
871 Ok(())
872 }
873
874 fn default_event_handling(&mut self, event: &NeuromorphicEvent<T>) -> Result<()> {
876 match event.event_type {
877 EventType::ExternalStimulus => {
878 if event.source_neuron < self.system_state.membrane_potentials.len() {
880 self.system_state.membrane_potentials[event.source_neuron] += event.value;
881 }
882 }
883 EventType::TimerEvent => {
884 self.system_state.current_time = event.timestamp;
886 }
887 _ => {
888 }
890 }
891
892 Ok(())
893 }
894
895 fn apply_pending_updates(&mut self) -> Result<()> {
897 for ((pre, post), weight_change) in self.system_state.pending_updates.drain() {
898 if pre < self.system_state.synaptic_weights.nrows()
899 && post < self.system_state.synaptic_weights.ncols()
900 {
901 let current_weight = self.system_state.synaptic_weights[[pre, post]];
902 let new_weight = (current_weight + weight_change)
903 .max(self.stdp_config.weight_min)
904 .min(self.stdp_config.weight_max);
905
906 self.system_state.synaptic_weights[[pre, post]] = new_weight;
907 }
908 }
909
910 Ok(())
911 }
912
913 fn update_event_statistics(&mut self, event_type: EventType, processing_time: T) {
915 let stats = self
916 .event_stats
917 .entry(event_type)
918 .or_insert_with(|| EventStatistics {
919 total_processed: 0,
920 avg_processing_time: T::zero(),
921 event_rate: T::zero(),
922 avg_queue_wait_time: T::zero(),
923 error_count: 0,
924 last_update: Instant::now(),
925 });
926
927 stats.total_processed += 1;
928
929 let alpha = T::from(0.1).unwrap_or_else(|| T::zero());
931 stats.avg_processing_time =
932 stats.avg_processing_time * (T::one() - alpha) + processing_time * alpha;
933
934 let time_since_last = stats.last_update.elapsed().as_secs_f64();
936 if time_since_last > 0.0 {
937 let current_rate = T::one() / T::from(time_since_last).unwrap_or_else(|| T::zero());
938 stats.event_rate = stats.event_rate * (T::one() - alpha) + current_rate * alpha;
939 }
940
941 stats.last_update = Instant::now();
942 }
943
944 pub fn get_event_statistics(&self) -> &HashMap<EventType, EventStatistics<T>> {
946 &self.event_stats
947 }
948
949 pub fn get_system_state(&self) -> &SystemState<T> {
951 &self.system_state
952 }
953
954 pub fn get_metrics(&self) -> &NeuromorphicMetrics<T> {
956 &self.metrics
957 }
958
959 pub fn clear_event_queue(&mut self) {
961 self.event_queue.clear();
962 }
963
964 pub fn get_queue_size(&self) -> usize {
966 self.event_queue.len()
967 }
968
969 pub fn enable_distributed_processing(&mut self, num_workers: usize) {
971 self.distributed_coordinator = Some(DistributedEventCoordinator::new(
972 self.config.load_balancing,
973 num_workers,
974 ));
975 self.config.distributed_processing = true;
976 }
977
978 pub fn disable_distributed_processing(&mut self) {
980 self.distributed_coordinator = None;
981 self.config.distributed_processing = false;
982 }
983}
984
985impl<T: Float + Debug + Send + Sync + 'static> Default for EventStatistics<T> {
986 fn default() -> Self {
987 Self {
988 total_processed: 0,
989 avg_processing_time: T::zero(),
990 event_rate: T::zero(),
991 avg_queue_wait_time: T::zero(),
992 error_count: 0,
993 last_update: Instant::now(),
994 }
995 }
996}