Skip to main content

optirs_core/streaming/adaptive_streaming/
optimizer.rs

1// Core adaptive streaming optimizer implementation
2//
3// This module contains the main AdaptiveStreamingOptimizer that orchestrates
4// all streaming optimization components including drift detection, performance
5// tracking, resource management, and adaptive learning rate control.
6
7use super::anomaly_detection::{
8    AnomalyDetector, AnomalyDiagnostics, EnsembleAnomalyDetector, MLAnomalyDetector,
9    StatisticalAnomalyDetector,
10};
11use super::buffering::{AdaptiveBuffer, BufferDiagnostics};
12use super::config::*;
13use super::drift_detection::{DriftDiagnostics, EnhancedDriftDetector};
14use super::meta_learning::{
15    ExperienceReplay, MetaAction, MetaLearner, MetaLearningDiagnostics, MetaState, StrategySelector,
16};
17use super::performance::{
18    DataStatistics, PerformanceDiagnostics, PerformanceSnapshot, PerformanceTracker,
19};
20use super::resource_management::{ResourceDiagnostics, ResourceManager, ResourceUsage};
21
22use crate::adaptive_selection::OptimizerType;
23// Removed dependency on learned_optimizers - using stub implementation
24use scirs2_core::ndarray::{Array, Array1, Array2, Dimension, IxDyn};
25use scirs2_core::numeric::Float;
26use scirs2_core::ScientificNumber;
27use serde::{Deserialize, Serialize};
28use std::collections::{HashMap, VecDeque};
29use std::marker::PhantomData;
30use std::sync::{Arc, Mutex};
31use std::time::{Duration, Instant};
32
33/// Stub implementation for AdaptiveLearningRateController
34/// (replaces the missing learned_optimizers dependency)
35#[derive(Debug, Clone)]
36pub struct AdaptiveLearningRateController<A: Float> {
37    base_lr: A,
38}
39
40impl<A: Float> AdaptiveLearningRateController<A> {
41    pub fn new(_config: &StreamingConfig) -> Result<Self, crate::error::OptimError> {
42        Ok(Self {
43            base_lr: A::from(0.001).unwrap_or_else(|| A::one()),
44        })
45    }
46
47    pub fn update_learning_rate(&mut self, _gradient: &Array1<A>) -> A {
48        self.base_lr
49    }
50
51    pub fn current_rate(&self) -> A {
52        self.base_lr
53    }
54
55    pub fn compute_adaptation(&self, _performance_metrics: &[A]) -> A {
56        // Simple adaptation: return the current learning rate
57        self.base_lr
58    }
59
60    pub fn apply_adaptation(&mut self, adaptation: A) {
61        // Apply the computed adaptation to the base learning rate
62        self.base_lr = adaptation;
63    }
64
65    pub fn last_change(&self) -> Option<A> {
66        // For now, return None as we're not tracking changes
67        None
68    }
69}
70
71/// Streaming data point for optimization
72#[derive(Debug, Clone)]
73pub struct StreamingDataPoint<A: Float + Send + Sync> {
74    /// Input features
75    pub features: Array1<A>,
76    /// Target values (optional for unsupervised learning)
77    pub target: Option<Array1<A>>,
78    /// Timestamp when data was received
79    pub timestamp: Instant,
80    /// Data source identifier
81    pub source_id: Option<String>,
82    /// Data quality score (0.0 to 1.0)
83    pub quality_score: A,
84    /// Additional metadata
85    pub metadata: HashMap<String, String>,
86}
87
88/// Adaptation instruction for optimizer components
89#[derive(Debug, Clone)]
90pub struct Adaptation<A: Float + Send + Sync> {
91    /// Type of adaptation
92    pub adaptation_type: AdaptationType,
93    /// Magnitude of adaptation
94    pub magnitude: A,
95    /// Target component for adaptation
96    pub target_component: String,
97    /// Adaptation parameters
98    pub parameters: HashMap<String, A>,
99    /// Priority of this adaptation
100    pub priority: AdaptationPriority,
101    /// Timestamp when adaptation was computed
102    pub timestamp: Instant,
103}
104
105/// Types of adaptations that can be applied
106#[derive(Debug, Clone, PartialEq, Eq)]
107pub enum AdaptationType {
108    /// Adjust learning rate
109    LearningRate,
110    /// Modify buffer size
111    BufferSize,
112    /// Change drift sensitivity
113    DriftSensitivity,
114    /// Update resource allocation
115    ResourceAllocation,
116    /// Adjust performance thresholds
117    PerformanceThreshold,
118    /// Modify anomaly detection parameters
119    AnomalyDetection,
120    /// Update meta-learning parameters
121    MetaLearning,
122    /// Custom adaptation type
123    Custom(String),
124}
125
126/// Priority levels for adaptations
127#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
128pub enum AdaptationPriority {
129    /// Low priority adaptation
130    Low = 0,
131    /// Normal priority adaptation
132    Normal = 1,
133    /// High priority adaptation
134    High = 2,
135    /// Critical adaptation that must be applied immediately
136    Critical = 3,
137}
138
139/// Statistics for adaptive streaming optimization
140#[derive(Debug, Clone, Serialize)]
141pub struct AdaptiveStreamingStats {
142    /// Total number of data points processed
143    pub total_data_points: usize,
144    /// Total number of optimization steps performed
145    pub optimization_steps: usize,
146    /// Number of drift events detected
147    pub drift_events: usize,
148    /// Number of anomalies detected
149    pub anomalies_detected: usize,
150    /// Number of adaptations applied
151    pub adaptations_applied: usize,
152    /// Current buffer size
153    pub current_buffer_size: usize,
154    /// Current learning rate
155    pub current_learning_rate: f64,
156    /// Average processing time per batch
157    pub avg_processing_time_ms: f64,
158    /// Resource utilization statistics
159    pub resource_utilization: ResourceUsage,
160    /// Performance trend (improvement/degradation)
161    pub performance_trend: f64,
162    /// Meta-learning effectiveness score
163    pub meta_learning_score: f64,
164}
165
166/// Main adaptive streaming optimizer
167pub struct AdaptiveStreamingOptimizer<O, A, D>
168where
169    A: Float + Default + Clone + Send + Sync + std::iter::Sum,
170    D: Dimension,
171{
172    /// Base optimizer instance
173    base_optimizer: O,
174    /// Streaming configuration
175    config: StreamingConfig,
176    /// Adaptive buffer for incoming data
177    buffer: AdaptiveBuffer<A>,
178    /// Drift detection system
179    drift_detector: EnhancedDriftDetector<A>,
180    /// Performance tracking system
181    performance_tracker: PerformanceTracker<A>,
182    /// Resource management system
183    resource_manager: ResourceManager,
184    /// Meta-learning system
185    meta_learner: MetaLearner<A>,
186    /// Anomaly detection system
187    anomaly_detector: AnomalyDetector<A>,
188    /// Learning rate controller
189    learning_rate_controller: AdaptiveLearningRateController<A>,
190    /// Current model parameters
191    parameters: Option<Array<A, D>>,
192    /// Optimization statistics
193    stats: AdaptiveStreamingStats,
194    /// Last adaptation timestamp
195    last_adaptation: Instant,
196    /// Adaptation history
197    adaptation_history: VecDeque<Adaptation<A>>,
198    /// Performance baseline for comparison
199    performance_baseline: Option<A>,
200    /// Phantom data for dimension type
201    _phantom: PhantomData<D>,
202}
203
204impl<O, A, D> AdaptiveStreamingOptimizer<O, A, D>
205where
206    A: Float
207        + Default
208        + Clone
209        + Send
210        + Sync
211        + std::iter::Sum
212        + std::fmt::Debug
213        + std::ops::DivAssign
214        + scirs2_core::ndarray::ScalarOperand
215        + 'static,
216    D: Dimension,
217    O: Clone,
218{
219    /// Creates a new adaptive streaming optimizer
220    pub fn new(base_optimizer: O, config: StreamingConfig) -> Result<Self, String> {
221        // Validate configuration
222        config.validate()?;
223
224        let buffer = AdaptiveBuffer::new(&config)?;
225        let drift_detector = EnhancedDriftDetector::new(&config)?;
226        let performance_tracker = PerformanceTracker::new(&config)?;
227        let resource_manager = ResourceManager::new(&config)?;
228        let meta_learner = MetaLearner::new(&config)?;
229        let anomaly_detector = AnomalyDetector::new(&config)?;
230        let learning_rate_controller =
231            AdaptiveLearningRateController::new(&config).map_err(|e| e.to_string())?;
232
233        let stats = AdaptiveStreamingStats {
234            total_data_points: 0,
235            optimization_steps: 0,
236            drift_events: 0,
237            anomalies_detected: 0,
238            adaptations_applied: 0,
239            current_buffer_size: config.buffer_config.initial_size,
240            current_learning_rate: config.learning_rate_config.initial_rate,
241            avg_processing_time_ms: 0.0,
242            resource_utilization: ResourceUsage::default(),
243            performance_trend: 0.0,
244            meta_learning_score: 0.0,
245        };
246
247        Ok(Self {
248            base_optimizer,
249            config,
250            buffer,
251            drift_detector,
252            performance_tracker,
253            resource_manager,
254            meta_learner,
255            anomaly_detector,
256            learning_rate_controller,
257            parameters: None,
258            stats,
259            last_adaptation: Instant::now(),
260            adaptation_history: VecDeque::with_capacity(1000),
261            performance_baseline: None,
262            _phantom: PhantomData,
263        })
264    }
265
266    /// Performs an adaptive optimization step with streaming data
267    pub fn adaptive_step(
268        &mut self,
269        data_batch: Vec<StreamingDataPoint<A>>,
270    ) -> Result<Array<A, D>, String> {
271        let start_time = Instant::now();
272
273        // Update resource utilization tracking
274        self.resource_manager.update_utilization()?;
275
276        // Add data to buffer and check for anomalies
277        let filtered_batch = self.filter_anomalies(data_batch)?;
278        self.buffer.add_batch(filtered_batch)?;
279
280        // Check if buffer should be processed
281        if !self.should_process_buffer()? {
282            return self
283                .parameters
284                .clone()
285                .ok_or("No parameters available".to_string());
286        }
287
288        // Get batch from buffer for processing
289        let processing_batch = self.buffer.get_batch_for_processing()?;
290        self.stats.total_data_points += processing_batch.len();
291
292        // Detect drift in the data
293        let drift_detected = self.drift_detector.detect_drift(&processing_batch)?;
294        if drift_detected {
295            self.stats.drift_events += 1;
296        }
297
298        // Compute necessary adaptations
299        let adaptations = self.compute_adaptations(&processing_batch, drift_detected)?;
300
301        // Apply adaptations to system components
302        self.apply_adaptations(&adaptations)?;
303
304        // Perform actual optimization step
305        let updated_parameters = self.perform_optimization_step(&processing_batch)?;
306
307        // Evaluate performance of the optimization step
308        let performance = self.evaluate_performance(&processing_batch, &updated_parameters)?;
309
310        // Update performance tracking
311        self.performance_tracker
312            .add_performance(performance.clone())?;
313
314        // Update meta-learner with experience
315        self.update_meta_learner(&processing_batch, &adaptations, &performance)?;
316
317        // Update statistics
318        self.stats.optimization_steps += 1;
319        self.stats.adaptations_applied += adaptations.len();
320        self.stats.current_buffer_size = self.buffer.current_size();
321        self.stats.current_learning_rate = self
322            .learning_rate_controller
323            .current_rate()
324            .to_f64()
325            .unwrap_or(0.0);
326        self.stats.performance_trend = self.compute_performance_trend();
327        self.stats.meta_learning_score = self
328            .meta_learner
329            .get_effectiveness_score()
330            .to_f64()
331            .unwrap_or(0.0);
332
333        let processing_time = start_time.elapsed().as_millis() as f64;
334        self.stats.avg_processing_time_ms = (self.stats.avg_processing_time_ms
335            * (self.stats.optimization_steps - 1) as f64
336            + processing_time)
337            / self.stats.optimization_steps as f64;
338
339        // Store updated parameters
340        self.parameters = Some(updated_parameters.clone());
341
342        Ok(updated_parameters)
343    }
344
345    /// Filters out anomalous data points
346    fn filter_anomalies(
347        &mut self,
348        data_batch: Vec<StreamingDataPoint<A>>,
349    ) -> Result<Vec<StreamingDataPoint<A>>, String> {
350        if !self.config.anomaly_config.enable_detection {
351            return Ok(data_batch);
352        }
353
354        let mut filtered_batch = Vec::new();
355
356        for data_point in data_batch {
357            let is_anomaly = self.anomaly_detector.detect_anomaly(&data_point)?;
358
359            if is_anomaly {
360                self.stats.anomalies_detected += 1;
361
362                // Apply anomaly response strategy
363                match &self.config.anomaly_config.response_strategy {
364                    AnomalyResponseStrategy::Ignore => {
365                        // Include the data point anyway
366                        filtered_batch.push(data_point);
367                    }
368                    AnomalyResponseStrategy::Filter => {
369                        // Skip this data point
370                        continue;
371                    }
372                    AnomalyResponseStrategy::Adaptive => {
373                        // Adapt the data point or model
374                        let adapted_point = self.adapt_for_anomaly(data_point)?;
375                        filtered_batch.push(adapted_point);
376                    }
377                    AnomalyResponseStrategy::Reset => {
378                        // Reset relevant components (implemented in apply_adaptations)
379                        filtered_batch.push(data_point);
380                    }
381                    AnomalyResponseStrategy::Custom(_) => {
382                        // Custom handling (simplified)
383                        filtered_batch.push(data_point);
384                    }
385                }
386            } else {
387                filtered_batch.push(data_point);
388            }
389        }
390
391        Ok(filtered_batch)
392    }
393
394    /// Adapts a data point that was detected as anomalous
395    fn adapt_for_anomaly(
396        &self,
397        mut data_point: StreamingDataPoint<A>,
398    ) -> Result<StreamingDataPoint<A>, String> {
399        // Simple adaptation: reduce the influence of extreme values
400        let median = self.compute_feature_median(&data_point.features)?;
401
402        for (i, value) in data_point.features.iter_mut().enumerate() {
403            let diff = (*value - median[i]).abs();
404            let threshold =
405                median[i] * A::from(self.config.anomaly_config.threshold).expect("unwrap failed");
406
407            if diff > threshold {
408                // Clip the value to be within the threshold
409                let sign = if *value > median[i] {
410                    A::one()
411                } else {
412                    -A::one()
413                };
414                *value = median[i] + sign * threshold;
415            }
416        }
417
418        // Reduce quality score for adapted anomalous data
419        data_point.quality_score = data_point.quality_score * A::from(0.5).expect("unwrap failed");
420
421        Ok(data_point)
422    }
423
424    /// Computes feature-wise median from recent data
425    fn compute_feature_median(&self, features: &Array1<A>) -> Result<Array1<A>, String> {
426        // Simplified implementation - in practice would use rolling window
427        Ok(features.clone())
428    }
429
430    /// Checks if the buffer should be processed
431    fn should_process_buffer(&self) -> Result<bool, String> {
432        let buffer_quality = self.buffer.get_quality_metrics();
433        let buffer_size = self.buffer.current_size();
434
435        // Check size threshold
436        let size_threshold = self.config.buffer_config.initial_size;
437        let size_ready = buffer_size >= size_threshold;
438
439        // Check quality threshold
440        let quality_ready = buffer_quality.average_quality
441            >= A::from(self.config.buffer_config.quality_threshold).expect("unwrap failed");
442
443        // Check timeout
444        let timeout_ready = self.buffer.time_since_last_processing()
445            >= self.config.buffer_config.processing_timeout;
446
447        // Check resource availability
448        let resources_available = self
449            .resource_manager
450            .has_sufficient_resources_for_processing()?;
451
452        Ok((size_ready && quality_ready) || timeout_ready && resources_available)
453    }
454
455    /// Computes necessary adaptations based on current state
456    fn compute_adaptations(
457        &mut self,
458        batch: &[StreamingDataPoint<A>],
459        drift_detected: bool,
460    ) -> Result<Vec<Adaptation<A>>, String> {
461        let mut adaptations = Vec::new();
462
463        // Learning rate adaptation
464        // Note: compute_adaptation doesn't currently use these parameters
465        let lr_value = self.learning_rate_controller.compute_adaptation(&[]);
466        let lr_adaptation = Adaptation {
467            adaptation_type: AdaptationType::LearningRate,
468            magnitude: lr_value,
469            target_component: String::from("learning_rate"),
470            parameters: HashMap::new(),
471            priority: AdaptationPriority::Normal,
472            timestamp: Instant::now(),
473        };
474        adaptations.push(lr_adaptation);
475
476        // Drift-based adaptations
477        if drift_detected {
478            if let Some(drift_adaptation) = self.drift_detector.compute_sensitivity_adaptation()? {
479                adaptations.push(drift_adaptation);
480            }
481        }
482
483        // Buffer size adaptation
484        if let Some(buffer_adaptation) = self
485            .buffer
486            .compute_size_adaptation(&self.performance_tracker)?
487        {
488            adaptations.push(buffer_adaptation);
489        }
490
491        // Resource allocation adaptation
492        // NOTE: Skipping resource adaptation due to type mismatch (f32 vs A)
493        // if let Some(resource_adaptation) = self.resource_manager.compute_allocation_adaptation()? {
494        //     adaptations.push(resource_adaptation);
495        // }
496
497        // Meta-learning based adaptations
498        let meta_adaptations = self
499            .meta_learner
500            .recommend_adaptations(batch, &self.performance_tracker)?;
501        adaptations.extend(meta_adaptations);
502
503        // Sort adaptations by priority
504        adaptations.sort_by(|a, b| b.priority.cmp(&a.priority));
505
506        Ok(adaptations)
507    }
508
509    /// Applies computed adaptations to system components
510    fn apply_adaptations(&mut self, adaptations: &[Adaptation<A>]) -> Result<(), String> {
511        for adaptation in adaptations {
512            match &adaptation.adaptation_type {
513                AdaptationType::LearningRate => {
514                    self.learning_rate_controller
515                        .apply_adaptation(adaptation.magnitude);
516                }
517                AdaptationType::BufferSize => {
518                    self.buffer.apply_size_adaptation(adaptation)?;
519                }
520                AdaptationType::DriftSensitivity => {
521                    self.drift_detector
522                        .apply_sensitivity_adaptation(adaptation)?;
523                }
524                AdaptationType::ResourceAllocation => {
525                    // NOTE: Skipping due to type mismatch (f32 vs A)
526                    // self.resource_manager
527                    //     .apply_allocation_adaptation(adaptation)?;
528                }
529                AdaptationType::PerformanceThreshold => {
530                    self.performance_tracker
531                        .apply_threshold_adaptation(adaptation)?;
532                }
533                AdaptationType::AnomalyDetection => {
534                    self.anomaly_detector.apply_adaptation(adaptation)?;
535                }
536                AdaptationType::MetaLearning => {
537                    self.meta_learner.apply_adaptation(adaptation)?;
538                }
539                AdaptationType::Custom(name) => {
540                    // Handle custom adaptations
541                    println!("Applying custom adaptation: {}", name);
542                }
543            }
544
545            // Store adaptation in history
546            if self.adaptation_history.len() >= 1000 {
547                self.adaptation_history.pop_front();
548            }
549            self.adaptation_history.push_back(adaptation.clone());
550        }
551
552        self.last_adaptation = Instant::now();
553        Ok(())
554    }
555
556    /// Performs the actual optimization step
557    fn perform_optimization_step(
558        &mut self,
559        batch: &[StreamingDataPoint<A>],
560    ) -> Result<Array<A, D>, String> {
561        // Compute gradients from the batch
562        let gradients = self.compute_batch_gradients(batch)?;
563
564        // Get current learning rate
565        let learning_rate = self.learning_rate_controller.current_rate();
566
567        // Apply optimization step (simplified implementation)
568        let mut updated_parameters = if let Some(params) = self.parameters.clone() {
569            params
570        } else {
571            // Cannot initialize parameters without proper dimension info
572            return Err("Parameters not initialized".to_string());
573        };
574
575        // Simple gradient descent update (in practice would use the base optimizer)
576        for (param, &grad) in updated_parameters.iter_mut().zip(gradients.iter()) {
577            *param = *param - learning_rate * grad;
578        }
579
580        Ok(updated_parameters)
581    }
582
583    /// Computes batch gradients from streaming data
584    fn compute_batch_gradients(
585        &self,
586        batch: &[StreamingDataPoint<A>],
587    ) -> Result<Array1<A>, String> {
588        if batch.is_empty() {
589            return Err("Cannot compute gradients from empty batch".to_string());
590        }
591
592        let feature_dim = batch[0].features.len();
593        let mut gradients = Array1::zeros(feature_dim);
594
595        // Simplified gradient computation (in practice would depend on loss function)
596        for data_point in batch {
597            for (i, &feature) in data_point.features.iter().enumerate() {
598                gradients[i] = gradients[i] + feature * data_point.quality_score;
599            }
600        }
601
602        // Normalize by batch size
603        let batch_size = A::from(batch.len()).expect("unwrap failed");
604        gradients /= batch_size;
605
606        Ok(gradients)
607    }
608
609    /// Evaluates performance of the optimization step
610    fn evaluate_performance(
611        &self,
612        batch: &[StreamingDataPoint<A>],
613        parameters: &Array<A, D>,
614    ) -> Result<PerformanceSnapshot<A>, String> {
615        // Compute various performance metrics
616        let loss = self.compute_loss(batch, parameters)?;
617        let accuracy = self.compute_accuracy(batch, parameters)?;
618        let convergence_rate = self.compute_convergence_rate(parameters)?;
619
620        // Compute data statistics
621        let data_stats = self.compute_data_statistics(batch)?;
622
623        // Get resource usage
624        let resource_usage = self.resource_manager.current_usage()?;
625
626        let performance = PerformanceSnapshot {
627            timestamp: Instant::now(),
628            loss,
629            accuracy: Some(accuracy),
630            convergence_rate: Some(convergence_rate),
631            gradient_norm: Some(A::from(1.0).expect("unwrap failed")), // Simplified
632            parameter_update_magnitude: Some(A::from(0.1).expect("unwrap failed")), // Simplified
633            data_statistics: data_stats,
634            resource_usage,
635            custom_metrics: HashMap::new(),
636        };
637
638        Ok(performance)
639    }
640
641    /// Computes loss for the current batch and parameters
642    fn compute_loss(
643        &self,
644        batch: &[StreamingDataPoint<A>],
645        _parameters: &Array<A, D>,
646    ) -> Result<A, String> {
647        // Simplified loss computation (Mean Squared Error)
648        let mut total_loss = A::zero();
649        let mut count = 0;
650
651        for data_point in batch {
652            if let Some(ref target) = data_point.target {
653                let prediction = &data_point.features; // Simplified
654                let diff = prediction - target;
655                let squared_diff = diff.mapv(|x| x * x);
656                total_loss = total_loss + squared_diff.sum();
657                count += 1;
658            }
659        }
660
661        if count > 0 {
662            Ok(total_loss / A::from(count).expect("unwrap failed"))
663        } else {
664            Ok(A::zero())
665        }
666    }
667
668    /// Computes accuracy for the current batch and parameters
669    fn compute_accuracy(
670        &self,
671        batch: &[StreamingDataPoint<A>],
672        _parameters: &Array<A, D>,
673    ) -> Result<A, String> {
674        // Simplified accuracy computation
675        let mut correct = 0;
676        let mut total = 0;
677
678        for data_point in batch {
679            if data_point.target.is_some() {
680                // Simplified: assume accuracy based on quality score
681                if data_point.quality_score > A::from(0.5).expect("unwrap failed") {
682                    correct += 1;
683                }
684                total += 1;
685            }
686        }
687
688        if total > 0 {
689            Ok(A::from(correct).expect("unwrap failed") / A::from(total).expect("unwrap failed"))
690        } else {
691            Ok(A::one())
692        }
693    }
694
695    /// Computes convergence rate
696    fn compute_convergence_rate(&self, _parameters: &Array<A, D>) -> Result<A, String> {
697        // Simplified convergence rate computation
698        let recent_losses = self.performance_tracker.get_recent_losses(10);
699        if recent_losses.len() >= 2 {
700            let improvement = recent_losses[0] - recent_losses[recent_losses.len() - 1];
701            Ok(improvement / recent_losses[0])
702        } else {
703            Ok(A::zero())
704        }
705    }
706
707    /// Computes comprehensive data statistics
708    fn compute_data_statistics(
709        &self,
710        batch: &[StreamingDataPoint<A>],
711    ) -> Result<DataStatistics<A>, String> {
712        if batch.is_empty() {
713            return Ok(DataStatistics::default());
714        }
715
716        let feature_dim = batch[0].features.len();
717        let mut feature_means = Array1::zeros(feature_dim);
718        let mut feature_stds = Array1::zeros(feature_dim);
719        let mut quality_scores = Vec::new();
720
721        // Compute means
722        for data_point in batch {
723            feature_means = feature_means + &data_point.features;
724            quality_scores.push(data_point.quality_score);
725        }
726        feature_means /= A::from(batch.len()).expect("unwrap failed");
727
728        // Compute standard deviations
729        for data_point in batch {
730            let diff = &data_point.features - &feature_means;
731            feature_stds = feature_stds + &diff.mapv(|x| x * x);
732        }
733        feature_stds /= A::from(batch.len()).expect("unwrap failed");
734        feature_stds = feature_stds.mapv(|x| x.sqrt());
735
736        let avg_quality = quality_scores.iter().copied().sum::<A>()
737            / A::from(quality_scores.len()).expect("unwrap failed");
738
739        Ok(DataStatistics {
740            sample_count: batch.len(),
741            feature_means,
742            feature_stds,
743            average_quality: avg_quality,
744            timestamp: Instant::now(),
745        })
746    }
747
748    /// Updates meta-learner with experience from this optimization step
749    fn update_meta_learner(
750        &mut self,
751        batch: &[StreamingDataPoint<A>],
752        adaptations: &[Adaptation<A>],
753        performance: &PerformanceSnapshot<A>,
754    ) -> Result<(), String> {
755        if !self.config.meta_learning_config.enable_meta_learning {
756            return Ok(());
757        }
758
759        // Extract meta-state from current situation
760        let meta_state = self.extract_meta_state(performance)?;
761
762        // Extract meta-action from applied adaptations
763        let meta_action = self.extract_meta_action(adaptations)?;
764
765        // Compute reward based on performance improvement
766        let reward = self.compute_meta_reward(performance)?;
767
768        // Update meta-learner
769        self.meta_learner
770            .update_experience(meta_state, meta_action, reward)?;
771
772        Ok(())
773    }
774
775    /// Extracts meta-state representation from performance data
776    fn extract_meta_state(
777        &self,
778        performance: &PerformanceSnapshot<A>,
779    ) -> Result<MetaState<A>, String> {
780        let state = MetaState {
781            performance_metrics: vec![
782                performance.loss,
783                performance.accuracy.unwrap_or(A::zero()),
784                performance.convergence_rate.unwrap_or(A::zero()),
785            ],
786            resource_state: vec![
787                A::from(performance.resource_usage.memory_usage_mb as f64).expect("unwrap failed"),
788                A::from(performance.resource_usage.cpu_usage_percent).expect("unwrap failed"),
789            ],
790            drift_indicators: vec![A::from(if self.drift_detector.is_drift_detected() {
791                1.0
792            } else {
793                0.0
794            })
795            .expect("unwrap failed")],
796            adaptation_history: self.adaptation_history.len(),
797            timestamp: Instant::now(),
798        };
799
800        Ok(state)
801    }
802
803    /// Extracts meta-action representation from adaptations
804    fn extract_meta_action(&self, adaptations: &[Adaptation<A>]) -> Result<MetaAction<A>, String> {
805        let mut adaptation_vector = Vec::new();
806        let mut adaptation_types = Vec::new();
807
808        for adaptation in adaptations {
809            adaptation_vector.push(adaptation.magnitude);
810            adaptation_types.push(adaptation.adaptation_type.clone());
811        }
812
813        let action = MetaAction {
814            adaptation_magnitudes: adaptation_vector,
815            adaptation_types,
816            learning_rate_change: self
817                .learning_rate_controller
818                .last_change()
819                .unwrap_or(A::zero()),
820            buffer_size_change: A::from(self.buffer.last_size_change()).unwrap_or(A::zero()),
821            timestamp: Instant::now(),
822        };
823
824        Ok(action)
825    }
826
827    /// Computes reward for meta-learning based on performance improvement
828    fn compute_meta_reward(&self, performance: &PerformanceSnapshot<A>) -> Result<A, String> {
829        // Compare with baseline or previous performance
830        let reward = if let Some(baseline) = self.performance_baseline {
831            performance.loss - baseline // Negative reward for higher loss
832        } else {
833            A::zero()
834        };
835
836        Ok(reward)
837    }
838
839    /// Gets current adaptive streaming statistics
840    pub fn get_adaptive_stats(&self) -> AdaptiveStreamingStats {
841        let mut stats = self.stats.clone();
842        stats.resource_utilization = self.resource_manager.current_usage().unwrap_or_default();
843        stats
844    }
845
846    /// Counts the number of adaptations applied in recent history
847    fn count_adaptations_applied(&self) -> usize {
848        let recent_threshold = Instant::now() - Duration::from_secs(300); // Last 5 minutes
849        self.adaptation_history
850            .iter()
851            .filter(|adaptation| adaptation.timestamp > recent_threshold)
852            .count()
853    }
854
855    /// Computes performance trend over recent optimization steps
856    fn compute_performance_trend(&self) -> f64 {
857        let recent_performance = self.performance_tracker.get_recent_performance(20);
858        if recent_performance.len() >= 2 {
859            let recent_avg = recent_performance
860                .iter()
861                .rev()
862                .take(5)
863                .map(|p| p.loss.to_f64().unwrap_or(0.0))
864                .sum::<f64>()
865                / 5.0;
866
867            let older_avg = recent_performance
868                .iter()
869                .take(5)
870                .map(|p| p.loss.to_f64().unwrap_or(0.0))
871                .sum::<f64>()
872                / 5.0;
873
874            // Negative trend means improvement (lower loss)
875            (recent_avg - older_avg) / older_avg
876        } else {
877            0.0
878        }
879    }
880
881    /// Forces an adaptation cycle even if normal triggers haven't fired
882    pub fn force_adaptation(&mut self) -> Result<(), String> {
883        let empty_batch = Vec::new();
884        let adaptations = self.compute_adaptations(&empty_batch, false)?;
885        self.apply_adaptations(&adaptations)?;
886        Ok(())
887    }
888
889    /// Resets the optimizer to initial state while preserving learned knowledge
890    pub fn soft_reset(&mut self) -> Result<(), String> {
891        // Reset components while preserving meta-learning knowledge
892        self.buffer.reset()?;
893        self.drift_detector.reset()?;
894        self.performance_tracker.reset()?;
895
896        // Don't reset meta-learner to preserve learned adaptations
897        // self.meta_learner.reset()?;
898
899        self.stats = AdaptiveStreamingStats {
900            total_data_points: 0,
901            optimization_steps: 0,
902            drift_events: 0,
903            anomalies_detected: 0,
904            adaptations_applied: 0,
905            current_buffer_size: self.config.buffer_config.initial_size,
906            current_learning_rate: self.config.learning_rate_config.initial_rate,
907            avg_processing_time_ms: 0.0,
908            resource_utilization: ResourceUsage::default(),
909            performance_trend: 0.0,
910            meta_learning_score: self.meta_learner.get_effectiveness_score() as f64,
911        };
912
913        self.adaptation_history.clear();
914        self.performance_baseline = None;
915
916        Ok(())
917    }
918
919    /// Gets detailed diagnostic information
920    pub fn get_diagnostics(&self) -> StreamingDiagnostics {
921        StreamingDiagnostics {
922            buffer_diagnostics: self.buffer.get_diagnostics(),
923            drift_diagnostics: self.drift_detector.get_diagnostics(),
924            performance_diagnostics: self.performance_tracker.get_diagnostics(),
925            resource_diagnostics: self.resource_manager.get_diagnostics(),
926            meta_learning_diagnostics: self.meta_learner.get_diagnostics(),
927            anomaly_diagnostics: self.anomaly_detector.get_diagnostics(),
928        }
929    }
930}
931
932/// Comprehensive diagnostic information for streaming optimizer
933#[derive(Debug, Clone)]
934pub struct StreamingDiagnostics {
935    pub buffer_diagnostics: BufferDiagnostics,
936    pub drift_diagnostics: DriftDiagnostics,
937    pub performance_diagnostics: PerformanceDiagnostics,
938    pub resource_diagnostics: ResourceDiagnostics,
939    pub meta_learning_diagnostics: MetaLearningDiagnostics,
940    pub anomaly_diagnostics: AnomalyDiagnostics,
941}