optirs_core/streaming/adaptive_streaming/
optimizer.rs

1// Core adaptive streaming optimizer implementation
2//
3// This module contains the main AdaptiveStreamingOptimizer that orchestrates
4// all streaming optimization components including drift detection, performance
5// tracking, resource management, and adaptive learning rate control.
6
7use super::anomaly_detection::{
8    AnomalyDetector, AnomalyDiagnostics, EnsembleAnomalyDetector, MLAnomalyDetector,
9    StatisticalAnomalyDetector,
10};
11use super::buffering::{AdaptiveBuffer, BufferDiagnostics};
12use super::config::*;
13use super::drift_detection::{DriftDiagnostics, EnhancedDriftDetector};
14use super::meta_learning::{
15    ExperienceReplay, MetaAction, MetaLearner, MetaLearningDiagnostics, MetaState, StrategySelector,
16};
17use super::performance::{
18    DataStatistics, PerformanceDiagnostics, PerformanceSnapshot, PerformanceTracker,
19};
20use super::resource_management::{ResourceDiagnostics, ResourceManager, ResourceUsage};
21
22use crate::adaptive_selection::OptimizerType;
23// Removed dependency on learned_optimizers - using stub implementation
24use scirs2_core::ndarray::{Array, Array1, Array2, Dimension, IxDyn};
25use scirs2_core::numeric::Float;
26use scirs2_core::ScientificNumber;
27use serde::{Deserialize, Serialize};
28use std::collections::{HashMap, VecDeque};
29use std::marker::PhantomData;
30use std::sync::{Arc, Mutex};
31use std::time::{Duration, Instant};
32
33/// Stub implementation for AdaptiveLearningRateController
34/// (replaces the missing learned_optimizers dependency)
35#[derive(Debug, Clone)]
36pub struct AdaptiveLearningRateController<A: Float> {
37    base_lr: A,
38}
39
40impl<A: Float> AdaptiveLearningRateController<A> {
41    pub fn new(_config: &StreamingConfig) -> Result<Self, crate::error::OptimError> {
42        Ok(Self {
43            base_lr: A::from(0.001).unwrap_or_else(|| A::one()),
44        })
45    }
46
47    pub fn update_learning_rate(&mut self, _gradient: &Array1<A>) -> A {
48        self.base_lr
49    }
50
51    pub fn current_rate(&self) -> A {
52        self.base_lr
53    }
54
55    pub fn compute_adaptation(&self, _performance_metrics: &[A]) -> A {
56        // Simple adaptation: return the current learning rate
57        self.base_lr
58    }
59
60    pub fn apply_adaptation(&mut self, adaptation: A) {
61        // Apply the computed adaptation to the base learning rate
62        self.base_lr = adaptation;
63    }
64
65    pub fn last_change(&self) -> Option<A> {
66        // For now, return None as we're not tracking changes
67        None
68    }
69}
70
71/// Streaming data point for optimization
72#[derive(Debug, Clone)]
73pub struct StreamingDataPoint<A: Float + Send + Sync> {
74    /// Input features
75    pub features: Array1<A>,
76    /// Target values (optional for unsupervised learning)
77    pub target: Option<Array1<A>>,
78    /// Timestamp when data was received
79    pub timestamp: Instant,
80    /// Data source identifier
81    pub source_id: Option<String>,
82    /// Data quality score (0.0 to 1.0)
83    pub quality_score: A,
84    /// Additional metadata
85    pub metadata: HashMap<String, String>,
86}
87
88/// Adaptation instruction for optimizer components
89#[derive(Debug, Clone)]
90pub struct Adaptation<A: Float + Send + Sync> {
91    /// Type of adaptation
92    pub adaptation_type: AdaptationType,
93    /// Magnitude of adaptation
94    pub magnitude: A,
95    /// Target component for adaptation
96    pub target_component: String,
97    /// Adaptation parameters
98    pub parameters: HashMap<String, A>,
99    /// Priority of this adaptation
100    pub priority: AdaptationPriority,
101    /// Timestamp when adaptation was computed
102    pub timestamp: Instant,
103}
104
105/// Types of adaptations that can be applied
106#[derive(Debug, Clone, PartialEq, Eq)]
107pub enum AdaptationType {
108    /// Adjust learning rate
109    LearningRate,
110    /// Modify buffer size
111    BufferSize,
112    /// Change drift sensitivity
113    DriftSensitivity,
114    /// Update resource allocation
115    ResourceAllocation,
116    /// Adjust performance thresholds
117    PerformanceThreshold,
118    /// Modify anomaly detection parameters
119    AnomalyDetection,
120    /// Update meta-learning parameters
121    MetaLearning,
122    /// Custom adaptation type
123    Custom(String),
124}
125
126/// Priority levels for adaptations
127#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
128pub enum AdaptationPriority {
129    /// Low priority adaptation
130    Low = 0,
131    /// Normal priority adaptation
132    Normal = 1,
133    /// High priority adaptation
134    High = 2,
135    /// Critical adaptation that must be applied immediately
136    Critical = 3,
137}
138
139/// Statistics for adaptive streaming optimization
140#[derive(Debug, Clone, Serialize)]
141pub struct AdaptiveStreamingStats {
142    /// Total number of data points processed
143    pub total_data_points: usize,
144    /// Total number of optimization steps performed
145    pub optimization_steps: usize,
146    /// Number of drift events detected
147    pub drift_events: usize,
148    /// Number of anomalies detected
149    pub anomalies_detected: usize,
150    /// Number of adaptations applied
151    pub adaptations_applied: usize,
152    /// Current buffer size
153    pub current_buffer_size: usize,
154    /// Current learning rate
155    pub current_learning_rate: f64,
156    /// Average processing time per batch
157    pub avg_processing_time_ms: f64,
158    /// Resource utilization statistics
159    pub resource_utilization: ResourceUsage,
160    /// Performance trend (improvement/degradation)
161    pub performance_trend: f64,
162    /// Meta-learning effectiveness score
163    pub meta_learning_score: f64,
164}
165
166/// Main adaptive streaming optimizer
167pub struct AdaptiveStreamingOptimizer<O, A, D>
168where
169    A: Float + Default + Clone + Send + Sync + std::iter::Sum,
170    D: Dimension,
171{
172    /// Base optimizer instance
173    base_optimizer: O,
174    /// Streaming configuration
175    config: StreamingConfig,
176    /// Adaptive buffer for incoming data
177    buffer: AdaptiveBuffer<A>,
178    /// Drift detection system
179    drift_detector: EnhancedDriftDetector<A>,
180    /// Performance tracking system
181    performance_tracker: PerformanceTracker<A>,
182    /// Resource management system
183    resource_manager: ResourceManager,
184    /// Meta-learning system
185    meta_learner: MetaLearner<A>,
186    /// Anomaly detection system
187    anomaly_detector: AnomalyDetector<A>,
188    /// Learning rate controller
189    learning_rate_controller: AdaptiveLearningRateController<A>,
190    /// Current model parameters
191    parameters: Option<Array<A, D>>,
192    /// Optimization statistics
193    stats: AdaptiveStreamingStats,
194    /// Last adaptation timestamp
195    last_adaptation: Instant,
196    /// Adaptation history
197    adaptation_history: VecDeque<Adaptation<A>>,
198    /// Performance baseline for comparison
199    performance_baseline: Option<A>,
200    /// Phantom data for dimension type
201    _phantom: PhantomData<D>,
202}
203
204impl<O, A, D> AdaptiveStreamingOptimizer<O, A, D>
205where
206    A: Float
207        + Default
208        + Clone
209        + Send
210        + Sync
211        + std::iter::Sum
212        + std::fmt::Debug
213        + std::ops::DivAssign
214        + scirs2_core::ndarray::ScalarOperand
215        + 'static,
216    D: Dimension,
217    O: Clone,
218{
219    /// Creates a new adaptive streaming optimizer
220    pub fn new(base_optimizer: O, config: StreamingConfig) -> Result<Self, String> {
221        // Validate configuration
222        config.validate()?;
223
224        let buffer = AdaptiveBuffer::new(&config)?;
225        let drift_detector = EnhancedDriftDetector::new(&config)?;
226        let performance_tracker = PerformanceTracker::new(&config)?;
227        let resource_manager = ResourceManager::new(&config)?;
228        let meta_learner = MetaLearner::new(&config)?;
229        let anomaly_detector = AnomalyDetector::new(&config)?;
230        let learning_rate_controller =
231            AdaptiveLearningRateController::new(&config).map_err(|e| e.to_string())?;
232
233        let stats = AdaptiveStreamingStats {
234            total_data_points: 0,
235            optimization_steps: 0,
236            drift_events: 0,
237            anomalies_detected: 0,
238            adaptations_applied: 0,
239            current_buffer_size: config.buffer_config.initial_size,
240            current_learning_rate: config.learning_rate_config.initial_rate,
241            avg_processing_time_ms: 0.0,
242            resource_utilization: ResourceUsage::default(),
243            performance_trend: 0.0,
244            meta_learning_score: 0.0,
245        };
246
247        Ok(Self {
248            base_optimizer,
249            config,
250            buffer,
251            drift_detector,
252            performance_tracker,
253            resource_manager,
254            meta_learner,
255            anomaly_detector,
256            learning_rate_controller,
257            parameters: None,
258            stats,
259            last_adaptation: Instant::now(),
260            adaptation_history: VecDeque::with_capacity(1000),
261            performance_baseline: None,
262            _phantom: PhantomData,
263        })
264    }
265
266    /// Performs an adaptive optimization step with streaming data
267    pub fn adaptive_step(
268        &mut self,
269        data_batch: Vec<StreamingDataPoint<A>>,
270    ) -> Result<Array<A, D>, String> {
271        let start_time = Instant::now();
272
273        // Update resource utilization tracking
274        self.resource_manager.update_utilization()?;
275
276        // Add data to buffer and check for anomalies
277        let filtered_batch = self.filter_anomalies(data_batch)?;
278        self.buffer.add_batch(filtered_batch)?;
279
280        // Check if buffer should be processed
281        if !self.should_process_buffer()? {
282            return self
283                .parameters
284                .clone()
285                .ok_or("No parameters available".to_string());
286        }
287
288        // Get batch from buffer for processing
289        let processing_batch = self.buffer.get_batch_for_processing()?;
290        self.stats.total_data_points += processing_batch.len();
291
292        // Detect drift in the data
293        let drift_detected = self.drift_detector.detect_drift(&processing_batch)?;
294        if drift_detected {
295            self.stats.drift_events += 1;
296        }
297
298        // Compute necessary adaptations
299        let adaptations = self.compute_adaptations(&processing_batch, drift_detected)?;
300
301        // Apply adaptations to system components
302        self.apply_adaptations(&adaptations)?;
303
304        // Perform actual optimization step
305        let updated_parameters = self.perform_optimization_step(&processing_batch)?;
306
307        // Evaluate performance of the optimization step
308        let performance = self.evaluate_performance(&processing_batch, &updated_parameters)?;
309
310        // Update performance tracking
311        self.performance_tracker
312            .add_performance(performance.clone())?;
313
314        // Update meta-learner with experience
315        self.update_meta_learner(&processing_batch, &adaptations, &performance)?;
316
317        // Update statistics
318        self.stats.optimization_steps += 1;
319        self.stats.adaptations_applied += adaptations.len();
320        self.stats.current_buffer_size = self.buffer.current_size();
321        self.stats.current_learning_rate = self
322            .learning_rate_controller
323            .current_rate()
324            .to_f64()
325            .unwrap_or(0.0);
326        self.stats.performance_trend = self.compute_performance_trend();
327        self.stats.meta_learning_score = self
328            .meta_learner
329            .get_effectiveness_score()
330            .to_f64()
331            .unwrap_or(0.0);
332
333        let processing_time = start_time.elapsed().as_millis() as f64;
334        self.stats.avg_processing_time_ms = (self.stats.avg_processing_time_ms
335            * (self.stats.optimization_steps - 1) as f64
336            + processing_time)
337            / self.stats.optimization_steps as f64;
338
339        // Store updated parameters
340        self.parameters = Some(updated_parameters.clone());
341
342        Ok(updated_parameters)
343    }
344
345    /// Filters out anomalous data points
346    fn filter_anomalies(
347        &mut self,
348        data_batch: Vec<StreamingDataPoint<A>>,
349    ) -> Result<Vec<StreamingDataPoint<A>>, String> {
350        if !self.config.anomaly_config.enable_detection {
351            return Ok(data_batch);
352        }
353
354        let mut filtered_batch = Vec::new();
355
356        for data_point in data_batch {
357            let is_anomaly = self.anomaly_detector.detect_anomaly(&data_point)?;
358
359            if is_anomaly {
360                self.stats.anomalies_detected += 1;
361
362                // Apply anomaly response strategy
363                match &self.config.anomaly_config.response_strategy {
364                    AnomalyResponseStrategy::Ignore => {
365                        // Include the data point anyway
366                        filtered_batch.push(data_point);
367                    }
368                    AnomalyResponseStrategy::Filter => {
369                        // Skip this data point
370                        continue;
371                    }
372                    AnomalyResponseStrategy::Adaptive => {
373                        // Adapt the data point or model
374                        let adapted_point = self.adapt_for_anomaly(data_point)?;
375                        filtered_batch.push(adapted_point);
376                    }
377                    AnomalyResponseStrategy::Reset => {
378                        // Reset relevant components (implemented in apply_adaptations)
379                        filtered_batch.push(data_point);
380                    }
381                    AnomalyResponseStrategy::Custom(_) => {
382                        // Custom handling (simplified)
383                        filtered_batch.push(data_point);
384                    }
385                }
386            } else {
387                filtered_batch.push(data_point);
388            }
389        }
390
391        Ok(filtered_batch)
392    }
393
394    /// Adapts a data point that was detected as anomalous
395    fn adapt_for_anomaly(
396        &self,
397        mut data_point: StreamingDataPoint<A>,
398    ) -> Result<StreamingDataPoint<A>, String> {
399        // Simple adaptation: reduce the influence of extreme values
400        let median = self.compute_feature_median(&data_point.features)?;
401
402        for (i, value) in data_point.features.iter_mut().enumerate() {
403            let diff = (*value - median[i]).abs();
404            let threshold = median[i] * A::from(self.config.anomaly_config.threshold).unwrap();
405
406            if diff > threshold {
407                // Clip the value to be within the threshold
408                let sign = if *value > median[i] {
409                    A::one()
410                } else {
411                    -A::one()
412                };
413                *value = median[i] + sign * threshold;
414            }
415        }
416
417        // Reduce quality score for adapted anomalous data
418        data_point.quality_score = data_point.quality_score * A::from(0.5).unwrap();
419
420        Ok(data_point)
421    }
422
423    /// Computes feature-wise median from recent data
424    fn compute_feature_median(&self, features: &Array1<A>) -> Result<Array1<A>, String> {
425        // Simplified implementation - in practice would use rolling window
426        Ok(features.clone())
427    }
428
429    /// Checks if the buffer should be processed
430    fn should_process_buffer(&self) -> Result<bool, String> {
431        let buffer_quality = self.buffer.get_quality_metrics();
432        let buffer_size = self.buffer.current_size();
433
434        // Check size threshold
435        let size_threshold = self.config.buffer_config.initial_size;
436        let size_ready = buffer_size >= size_threshold;
437
438        // Check quality threshold
439        let quality_ready = buffer_quality.average_quality
440            >= A::from(self.config.buffer_config.quality_threshold).unwrap();
441
442        // Check timeout
443        let timeout_ready = self.buffer.time_since_last_processing()
444            >= self.config.buffer_config.processing_timeout;
445
446        // Check resource availability
447        let resources_available = self
448            .resource_manager
449            .has_sufficient_resources_for_processing()?;
450
451        Ok((size_ready && quality_ready) || timeout_ready && resources_available)
452    }
453
454    /// Computes necessary adaptations based on current state
455    fn compute_adaptations(
456        &mut self,
457        batch: &[StreamingDataPoint<A>],
458        drift_detected: bool,
459    ) -> Result<Vec<Adaptation<A>>, String> {
460        let mut adaptations = Vec::new();
461
462        // Learning rate adaptation
463        // Note: compute_adaptation doesn't currently use these parameters
464        let lr_value = self.learning_rate_controller.compute_adaptation(&[]);
465        let lr_adaptation = Adaptation {
466            adaptation_type: AdaptationType::LearningRate,
467            magnitude: lr_value,
468            target_component: String::from("learning_rate"),
469            parameters: HashMap::new(),
470            priority: AdaptationPriority::Normal,
471            timestamp: Instant::now(),
472        };
473        adaptations.push(lr_adaptation);
474
475        // Drift-based adaptations
476        if drift_detected {
477            if let Some(drift_adaptation) = self.drift_detector.compute_sensitivity_adaptation()? {
478                adaptations.push(drift_adaptation);
479            }
480        }
481
482        // Buffer size adaptation
483        if let Some(buffer_adaptation) = self
484            .buffer
485            .compute_size_adaptation(&self.performance_tracker)?
486        {
487            adaptations.push(buffer_adaptation);
488        }
489
490        // Resource allocation adaptation
491        // NOTE: Skipping resource adaptation due to type mismatch (f32 vs A)
492        // if let Some(resource_adaptation) = self.resource_manager.compute_allocation_adaptation()? {
493        //     adaptations.push(resource_adaptation);
494        // }
495
496        // Meta-learning based adaptations
497        let meta_adaptations = self
498            .meta_learner
499            .recommend_adaptations(batch, &self.performance_tracker)?;
500        adaptations.extend(meta_adaptations);
501
502        // Sort adaptations by priority
503        adaptations.sort_by(|a, b| b.priority.cmp(&a.priority));
504
505        Ok(adaptations)
506    }
507
508    /// Applies computed adaptations to system components
509    fn apply_adaptations(&mut self, adaptations: &[Adaptation<A>]) -> Result<(), String> {
510        for adaptation in adaptations {
511            match &adaptation.adaptation_type {
512                AdaptationType::LearningRate => {
513                    self.learning_rate_controller
514                        .apply_adaptation(adaptation.magnitude);
515                }
516                AdaptationType::BufferSize => {
517                    self.buffer.apply_size_adaptation(adaptation)?;
518                }
519                AdaptationType::DriftSensitivity => {
520                    self.drift_detector
521                        .apply_sensitivity_adaptation(adaptation)?;
522                }
523                AdaptationType::ResourceAllocation => {
524                    // NOTE: Skipping due to type mismatch (f32 vs A)
525                    // self.resource_manager
526                    //     .apply_allocation_adaptation(adaptation)?;
527                }
528                AdaptationType::PerformanceThreshold => {
529                    self.performance_tracker
530                        .apply_threshold_adaptation(adaptation)?;
531                }
532                AdaptationType::AnomalyDetection => {
533                    self.anomaly_detector.apply_adaptation(adaptation)?;
534                }
535                AdaptationType::MetaLearning => {
536                    self.meta_learner.apply_adaptation(adaptation)?;
537                }
538                AdaptationType::Custom(name) => {
539                    // Handle custom adaptations
540                    println!("Applying custom adaptation: {}", name);
541                }
542            }
543
544            // Store adaptation in history
545            if self.adaptation_history.len() >= 1000 {
546                self.adaptation_history.pop_front();
547            }
548            self.adaptation_history.push_back(adaptation.clone());
549        }
550
551        self.last_adaptation = Instant::now();
552        Ok(())
553    }
554
555    /// Performs the actual optimization step
556    fn perform_optimization_step(
557        &mut self,
558        batch: &[StreamingDataPoint<A>],
559    ) -> Result<Array<A, D>, String> {
560        // Compute gradients from the batch
561        let gradients = self.compute_batch_gradients(batch)?;
562
563        // Get current learning rate
564        let learning_rate = self.learning_rate_controller.current_rate();
565
566        // Apply optimization step (simplified implementation)
567        let mut updated_parameters = if let Some(params) = self.parameters.clone() {
568            params
569        } else {
570            // Cannot initialize parameters without proper dimension info
571            return Err("Parameters not initialized".to_string());
572        };
573
574        // Simple gradient descent update (in practice would use the base optimizer)
575        for (param, &grad) in updated_parameters.iter_mut().zip(gradients.iter()) {
576            *param = *param - learning_rate * grad;
577        }
578
579        Ok(updated_parameters)
580    }
581
582    /// Computes batch gradients from streaming data
583    fn compute_batch_gradients(
584        &self,
585        batch: &[StreamingDataPoint<A>],
586    ) -> Result<Array1<A>, String> {
587        if batch.is_empty() {
588            return Err("Cannot compute gradients from empty batch".to_string());
589        }
590
591        let feature_dim = batch[0].features.len();
592        let mut gradients = Array1::zeros(feature_dim);
593
594        // Simplified gradient computation (in practice would depend on loss function)
595        for data_point in batch {
596            for (i, &feature) in data_point.features.iter().enumerate() {
597                gradients[i] = gradients[i] + feature * data_point.quality_score;
598            }
599        }
600
601        // Normalize by batch size
602        let batch_size = A::from(batch.len()).unwrap();
603        gradients /= batch_size;
604
605        Ok(gradients)
606    }
607
608    /// Evaluates performance of the optimization step
609    fn evaluate_performance(
610        &self,
611        batch: &[StreamingDataPoint<A>],
612        parameters: &Array<A, D>,
613    ) -> Result<PerformanceSnapshot<A>, String> {
614        // Compute various performance metrics
615        let loss = self.compute_loss(batch, parameters)?;
616        let accuracy = self.compute_accuracy(batch, parameters)?;
617        let convergence_rate = self.compute_convergence_rate(parameters)?;
618
619        // Compute data statistics
620        let data_stats = self.compute_data_statistics(batch)?;
621
622        // Get resource usage
623        let resource_usage = self.resource_manager.current_usage()?;
624
625        let performance = PerformanceSnapshot {
626            timestamp: Instant::now(),
627            loss,
628            accuracy: Some(accuracy),
629            convergence_rate: Some(convergence_rate),
630            gradient_norm: Some(A::from(1.0).unwrap()), // Simplified
631            parameter_update_magnitude: Some(A::from(0.1).unwrap()), // Simplified
632            data_statistics: data_stats,
633            resource_usage,
634            custom_metrics: HashMap::new(),
635        };
636
637        Ok(performance)
638    }
639
640    /// Computes loss for the current batch and parameters
641    fn compute_loss(
642        &self,
643        batch: &[StreamingDataPoint<A>],
644        _parameters: &Array<A, D>,
645    ) -> Result<A, String> {
646        // Simplified loss computation (Mean Squared Error)
647        let mut total_loss = A::zero();
648        let mut count = 0;
649
650        for data_point in batch {
651            if let Some(ref target) = data_point.target {
652                let prediction = &data_point.features; // Simplified
653                let diff = prediction - target;
654                let squared_diff = diff.mapv(|x| x * x);
655                total_loss = total_loss + squared_diff.sum();
656                count += 1;
657            }
658        }
659
660        if count > 0 {
661            Ok(total_loss / A::from(count).unwrap())
662        } else {
663            Ok(A::zero())
664        }
665    }
666
667    /// Computes accuracy for the current batch and parameters
668    fn compute_accuracy(
669        &self,
670        batch: &[StreamingDataPoint<A>],
671        _parameters: &Array<A, D>,
672    ) -> Result<A, String> {
673        // Simplified accuracy computation
674        let mut correct = 0;
675        let mut total = 0;
676
677        for data_point in batch {
678            if data_point.target.is_some() {
679                // Simplified: assume accuracy based on quality score
680                if data_point.quality_score > A::from(0.5).unwrap() {
681                    correct += 1;
682                }
683                total += 1;
684            }
685        }
686
687        if total > 0 {
688            Ok(A::from(correct).unwrap() / A::from(total).unwrap())
689        } else {
690            Ok(A::one())
691        }
692    }
693
694    /// Computes convergence rate
695    fn compute_convergence_rate(&self, _parameters: &Array<A, D>) -> Result<A, String> {
696        // Simplified convergence rate computation
697        let recent_losses = self.performance_tracker.get_recent_losses(10);
698        if recent_losses.len() >= 2 {
699            let improvement = recent_losses[0] - recent_losses[recent_losses.len() - 1];
700            Ok(improvement / recent_losses[0])
701        } else {
702            Ok(A::zero())
703        }
704    }
705
706    /// Computes comprehensive data statistics
707    fn compute_data_statistics(
708        &self,
709        batch: &[StreamingDataPoint<A>],
710    ) -> Result<DataStatistics<A>, String> {
711        if batch.is_empty() {
712            return Ok(DataStatistics::default());
713        }
714
715        let feature_dim = batch[0].features.len();
716        let mut feature_means = Array1::zeros(feature_dim);
717        let mut feature_stds = Array1::zeros(feature_dim);
718        let mut quality_scores = Vec::new();
719
720        // Compute means
721        for data_point in batch {
722            feature_means = feature_means + &data_point.features;
723            quality_scores.push(data_point.quality_score);
724        }
725        feature_means /= A::from(batch.len()).unwrap();
726
727        // Compute standard deviations
728        for data_point in batch {
729            let diff = &data_point.features - &feature_means;
730            feature_stds = feature_stds + &diff.mapv(|x| x * x);
731        }
732        feature_stds /= A::from(batch.len()).unwrap();
733        feature_stds = feature_stds.mapv(|x| x.sqrt());
734
735        let avg_quality =
736            quality_scores.iter().copied().sum::<A>() / A::from(quality_scores.len()).unwrap();
737
738        Ok(DataStatistics {
739            sample_count: batch.len(),
740            feature_means,
741            feature_stds,
742            average_quality: avg_quality,
743            timestamp: Instant::now(),
744        })
745    }
746
747    /// Updates meta-learner with experience from this optimization step
748    fn update_meta_learner(
749        &mut self,
750        batch: &[StreamingDataPoint<A>],
751        adaptations: &[Adaptation<A>],
752        performance: &PerformanceSnapshot<A>,
753    ) -> Result<(), String> {
754        if !self.config.meta_learning_config.enable_meta_learning {
755            return Ok(());
756        }
757
758        // Extract meta-state from current situation
759        let meta_state = self.extract_meta_state(performance)?;
760
761        // Extract meta-action from applied adaptations
762        let meta_action = self.extract_meta_action(adaptations)?;
763
764        // Compute reward based on performance improvement
765        let reward = self.compute_meta_reward(performance)?;
766
767        // Update meta-learner
768        self.meta_learner
769            .update_experience(meta_state, meta_action, reward)?;
770
771        Ok(())
772    }
773
774    /// Extracts meta-state representation from performance data
775    fn extract_meta_state(
776        &self,
777        performance: &PerformanceSnapshot<A>,
778    ) -> Result<MetaState<A>, String> {
779        let state = MetaState {
780            performance_metrics: vec![
781                performance.loss,
782                performance.accuracy.unwrap_or(A::zero()),
783                performance.convergence_rate.unwrap_or(A::zero()),
784            ],
785            resource_state: vec![
786                A::from(performance.resource_usage.memory_usage_mb as f64).unwrap(),
787                A::from(performance.resource_usage.cpu_usage_percent).unwrap(),
788            ],
789            drift_indicators: vec![A::from(if self.drift_detector.is_drift_detected() {
790                1.0
791            } else {
792                0.0
793            })
794            .unwrap()],
795            adaptation_history: self.adaptation_history.len(),
796            timestamp: Instant::now(),
797        };
798
799        Ok(state)
800    }
801
802    /// Extracts meta-action representation from adaptations
803    fn extract_meta_action(&self, adaptations: &[Adaptation<A>]) -> Result<MetaAction<A>, String> {
804        let mut adaptation_vector = Vec::new();
805        let mut adaptation_types = Vec::new();
806
807        for adaptation in adaptations {
808            adaptation_vector.push(adaptation.magnitude);
809            adaptation_types.push(adaptation.adaptation_type.clone());
810        }
811
812        let action = MetaAction {
813            adaptation_magnitudes: adaptation_vector,
814            adaptation_types,
815            learning_rate_change: self
816                .learning_rate_controller
817                .last_change()
818                .unwrap_or(A::zero()),
819            buffer_size_change: A::from(self.buffer.last_size_change()).unwrap_or(A::zero()),
820            timestamp: Instant::now(),
821        };
822
823        Ok(action)
824    }
825
826    /// Computes reward for meta-learning based on performance improvement
827    fn compute_meta_reward(&self, performance: &PerformanceSnapshot<A>) -> Result<A, String> {
828        // Compare with baseline or previous performance
829        let reward = if let Some(baseline) = self.performance_baseline {
830            performance.loss - baseline // Negative reward for higher loss
831        } else {
832            A::zero()
833        };
834
835        Ok(reward)
836    }
837
838    /// Gets current adaptive streaming statistics
839    pub fn get_adaptive_stats(&self) -> AdaptiveStreamingStats {
840        let mut stats = self.stats.clone();
841        stats.resource_utilization = self.resource_manager.current_usage().unwrap_or_default();
842        stats
843    }
844
845    /// Counts the number of adaptations applied in recent history
846    fn count_adaptations_applied(&self) -> usize {
847        let recent_threshold = Instant::now() - Duration::from_secs(300); // Last 5 minutes
848        self.adaptation_history
849            .iter()
850            .filter(|adaptation| adaptation.timestamp > recent_threshold)
851            .count()
852    }
853
854    /// Computes performance trend over recent optimization steps
855    fn compute_performance_trend(&self) -> f64 {
856        let recent_performance = self.performance_tracker.get_recent_performance(20);
857        if recent_performance.len() >= 2 {
858            let recent_avg = recent_performance
859                .iter()
860                .rev()
861                .take(5)
862                .map(|p| p.loss.to_f64().unwrap_or(0.0))
863                .sum::<f64>()
864                / 5.0;
865
866            let older_avg = recent_performance
867                .iter()
868                .take(5)
869                .map(|p| p.loss.to_f64().unwrap_or(0.0))
870                .sum::<f64>()
871                / 5.0;
872
873            // Negative trend means improvement (lower loss)
874            (recent_avg - older_avg) / older_avg
875        } else {
876            0.0
877        }
878    }
879
880    /// Forces an adaptation cycle even if normal triggers haven't fired
881    pub fn force_adaptation(&mut self) -> Result<(), String> {
882        let empty_batch = Vec::new();
883        let adaptations = self.compute_adaptations(&empty_batch, false)?;
884        self.apply_adaptations(&adaptations)?;
885        Ok(())
886    }
887
888    /// Resets the optimizer to initial state while preserving learned knowledge
889    pub fn soft_reset(&mut self) -> Result<(), String> {
890        // Reset components while preserving meta-learning knowledge
891        self.buffer.reset()?;
892        self.drift_detector.reset()?;
893        self.performance_tracker.reset()?;
894
895        // Don't reset meta-learner to preserve learned adaptations
896        // self.meta_learner.reset()?;
897
898        self.stats = AdaptiveStreamingStats {
899            total_data_points: 0,
900            optimization_steps: 0,
901            drift_events: 0,
902            anomalies_detected: 0,
903            adaptations_applied: 0,
904            current_buffer_size: self.config.buffer_config.initial_size,
905            current_learning_rate: self.config.learning_rate_config.initial_rate,
906            avg_processing_time_ms: 0.0,
907            resource_utilization: ResourceUsage::default(),
908            performance_trend: 0.0,
909            meta_learning_score: self.meta_learner.get_effectiveness_score() as f64,
910        };
911
912        self.adaptation_history.clear();
913        self.performance_baseline = None;
914
915        Ok(())
916    }
917
918    /// Gets detailed diagnostic information
919    pub fn get_diagnostics(&self) -> StreamingDiagnostics {
920        StreamingDiagnostics {
921            buffer_diagnostics: self.buffer.get_diagnostics(),
922            drift_diagnostics: self.drift_detector.get_diagnostics(),
923            performance_diagnostics: self.performance_tracker.get_diagnostics(),
924            resource_diagnostics: self.resource_manager.get_diagnostics(),
925            meta_learning_diagnostics: self.meta_learner.get_diagnostics(),
926            anomaly_diagnostics: self.anomaly_detector.get_diagnostics(),
927        }
928    }
929}
930
931/// Comprehensive diagnostic information for streaming optimizer
932#[derive(Debug, Clone)]
933pub struct StreamingDiagnostics {
934    pub buffer_diagnostics: BufferDiagnostics,
935    pub drift_diagnostics: DriftDiagnostics,
936    pub performance_diagnostics: PerformanceDiagnostics,
937    pub resource_diagnostics: ResourceDiagnostics,
938    pub meta_learning_diagnostics: MetaLearningDiagnostics,
939    pub anomaly_diagnostics: AnomalyDiagnostics,
940}