1use super::anomaly_detection::{
8 AnomalyDetector, AnomalyDiagnostics, EnsembleAnomalyDetector, MLAnomalyDetector,
9 StatisticalAnomalyDetector,
10};
11use super::buffering::{AdaptiveBuffer, BufferDiagnostics};
12use super::config::*;
13use super::drift_detection::{DriftDiagnostics, EnhancedDriftDetector};
14use super::meta_learning::{
15 ExperienceReplay, MetaAction, MetaLearner, MetaLearningDiagnostics, MetaState, StrategySelector,
16};
17use super::performance::{
18 DataStatistics, PerformanceDiagnostics, PerformanceSnapshot, PerformanceTracker,
19};
20use super::resource_management::{ResourceDiagnostics, ResourceManager, ResourceUsage};
21
22use crate::adaptive_selection::OptimizerType;
23use scirs2_core::ndarray::{Array, Array1, Array2, Dimension, IxDyn};
25use scirs2_core::numeric::Float;
26use scirs2_core::ScientificNumber;
27use serde::{Deserialize, Serialize};
28use std::collections::{HashMap, VecDeque};
29use std::marker::PhantomData;
30use std::sync::{Arc, Mutex};
31use std::time::{Duration, Instant};
32
33#[derive(Debug, Clone)]
36pub struct AdaptiveLearningRateController<A: Float> {
37 base_lr: A,
38}
39
40impl<A: Float> AdaptiveLearningRateController<A> {
41 pub fn new(_config: &StreamingConfig) -> Result<Self, crate::error::OptimError> {
42 Ok(Self {
43 base_lr: A::from(0.001).unwrap_or_else(|| A::one()),
44 })
45 }
46
47 pub fn update_learning_rate(&mut self, _gradient: &Array1<A>) -> A {
48 self.base_lr
49 }
50
51 pub fn current_rate(&self) -> A {
52 self.base_lr
53 }
54
55 pub fn compute_adaptation(&self, _performance_metrics: &[A]) -> A {
56 self.base_lr
58 }
59
60 pub fn apply_adaptation(&mut self, adaptation: A) {
61 self.base_lr = adaptation;
63 }
64
65 pub fn last_change(&self) -> Option<A> {
66 None
68 }
69}
70
71#[derive(Debug, Clone)]
73pub struct StreamingDataPoint<A: Float + Send + Sync> {
74 pub features: Array1<A>,
76 pub target: Option<Array1<A>>,
78 pub timestamp: Instant,
80 pub source_id: Option<String>,
82 pub quality_score: A,
84 pub metadata: HashMap<String, String>,
86}
87
88#[derive(Debug, Clone)]
90pub struct Adaptation<A: Float + Send + Sync> {
91 pub adaptation_type: AdaptationType,
93 pub magnitude: A,
95 pub target_component: String,
97 pub parameters: HashMap<String, A>,
99 pub priority: AdaptationPriority,
101 pub timestamp: Instant,
103}
104
105#[derive(Debug, Clone, PartialEq, Eq)]
107pub enum AdaptationType {
108 LearningRate,
110 BufferSize,
112 DriftSensitivity,
114 ResourceAllocation,
116 PerformanceThreshold,
118 AnomalyDetection,
120 MetaLearning,
122 Custom(String),
124}
125
126#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
128pub enum AdaptationPriority {
129 Low = 0,
131 Normal = 1,
133 High = 2,
135 Critical = 3,
137}
138
139#[derive(Debug, Clone, Serialize)]
141pub struct AdaptiveStreamingStats {
142 pub total_data_points: usize,
144 pub optimization_steps: usize,
146 pub drift_events: usize,
148 pub anomalies_detected: usize,
150 pub adaptations_applied: usize,
152 pub current_buffer_size: usize,
154 pub current_learning_rate: f64,
156 pub avg_processing_time_ms: f64,
158 pub resource_utilization: ResourceUsage,
160 pub performance_trend: f64,
162 pub meta_learning_score: f64,
164}
165
166pub struct AdaptiveStreamingOptimizer<O, A, D>
168where
169 A: Float + Default + Clone + Send + Sync + std::iter::Sum,
170 D: Dimension,
171{
172 base_optimizer: O,
174 config: StreamingConfig,
176 buffer: AdaptiveBuffer<A>,
178 drift_detector: EnhancedDriftDetector<A>,
180 performance_tracker: PerformanceTracker<A>,
182 resource_manager: ResourceManager,
184 meta_learner: MetaLearner<A>,
186 anomaly_detector: AnomalyDetector<A>,
188 learning_rate_controller: AdaptiveLearningRateController<A>,
190 parameters: Option<Array<A, D>>,
192 stats: AdaptiveStreamingStats,
194 last_adaptation: Instant,
196 adaptation_history: VecDeque<Adaptation<A>>,
198 performance_baseline: Option<A>,
200 _phantom: PhantomData<D>,
202}
203
204impl<O, A, D> AdaptiveStreamingOptimizer<O, A, D>
205where
206 A: Float
207 + Default
208 + Clone
209 + Send
210 + Sync
211 + std::iter::Sum
212 + std::fmt::Debug
213 + std::ops::DivAssign
214 + scirs2_core::ndarray::ScalarOperand
215 + 'static,
216 D: Dimension,
217 O: Clone,
218{
219 pub fn new(base_optimizer: O, config: StreamingConfig) -> Result<Self, String> {
221 config.validate()?;
223
224 let buffer = AdaptiveBuffer::new(&config)?;
225 let drift_detector = EnhancedDriftDetector::new(&config)?;
226 let performance_tracker = PerformanceTracker::new(&config)?;
227 let resource_manager = ResourceManager::new(&config)?;
228 let meta_learner = MetaLearner::new(&config)?;
229 let anomaly_detector = AnomalyDetector::new(&config)?;
230 let learning_rate_controller =
231 AdaptiveLearningRateController::new(&config).map_err(|e| e.to_string())?;
232
233 let stats = AdaptiveStreamingStats {
234 total_data_points: 0,
235 optimization_steps: 0,
236 drift_events: 0,
237 anomalies_detected: 0,
238 adaptations_applied: 0,
239 current_buffer_size: config.buffer_config.initial_size,
240 current_learning_rate: config.learning_rate_config.initial_rate,
241 avg_processing_time_ms: 0.0,
242 resource_utilization: ResourceUsage::default(),
243 performance_trend: 0.0,
244 meta_learning_score: 0.0,
245 };
246
247 Ok(Self {
248 base_optimizer,
249 config,
250 buffer,
251 drift_detector,
252 performance_tracker,
253 resource_manager,
254 meta_learner,
255 anomaly_detector,
256 learning_rate_controller,
257 parameters: None,
258 stats,
259 last_adaptation: Instant::now(),
260 adaptation_history: VecDeque::with_capacity(1000),
261 performance_baseline: None,
262 _phantom: PhantomData,
263 })
264 }
265
266 pub fn adaptive_step(
268 &mut self,
269 data_batch: Vec<StreamingDataPoint<A>>,
270 ) -> Result<Array<A, D>, String> {
271 let start_time = Instant::now();
272
273 self.resource_manager.update_utilization()?;
275
276 let filtered_batch = self.filter_anomalies(data_batch)?;
278 self.buffer.add_batch(filtered_batch)?;
279
280 if !self.should_process_buffer()? {
282 return self
283 .parameters
284 .clone()
285 .ok_or("No parameters available".to_string());
286 }
287
288 let processing_batch = self.buffer.get_batch_for_processing()?;
290 self.stats.total_data_points += processing_batch.len();
291
292 let drift_detected = self.drift_detector.detect_drift(&processing_batch)?;
294 if drift_detected {
295 self.stats.drift_events += 1;
296 }
297
298 let adaptations = self.compute_adaptations(&processing_batch, drift_detected)?;
300
301 self.apply_adaptations(&adaptations)?;
303
304 let updated_parameters = self.perform_optimization_step(&processing_batch)?;
306
307 let performance = self.evaluate_performance(&processing_batch, &updated_parameters)?;
309
310 self.performance_tracker
312 .add_performance(performance.clone())?;
313
314 self.update_meta_learner(&processing_batch, &adaptations, &performance)?;
316
317 self.stats.optimization_steps += 1;
319 self.stats.adaptations_applied += adaptations.len();
320 self.stats.current_buffer_size = self.buffer.current_size();
321 self.stats.current_learning_rate = self
322 .learning_rate_controller
323 .current_rate()
324 .to_f64()
325 .unwrap_or(0.0);
326 self.stats.performance_trend = self.compute_performance_trend();
327 self.stats.meta_learning_score = self
328 .meta_learner
329 .get_effectiveness_score()
330 .to_f64()
331 .unwrap_or(0.0);
332
333 let processing_time = start_time.elapsed().as_millis() as f64;
334 self.stats.avg_processing_time_ms = (self.stats.avg_processing_time_ms
335 * (self.stats.optimization_steps - 1) as f64
336 + processing_time)
337 / self.stats.optimization_steps as f64;
338
339 self.parameters = Some(updated_parameters.clone());
341
342 Ok(updated_parameters)
343 }
344
345 fn filter_anomalies(
347 &mut self,
348 data_batch: Vec<StreamingDataPoint<A>>,
349 ) -> Result<Vec<StreamingDataPoint<A>>, String> {
350 if !self.config.anomaly_config.enable_detection {
351 return Ok(data_batch);
352 }
353
354 let mut filtered_batch = Vec::new();
355
356 for data_point in data_batch {
357 let is_anomaly = self.anomaly_detector.detect_anomaly(&data_point)?;
358
359 if is_anomaly {
360 self.stats.anomalies_detected += 1;
361
362 match &self.config.anomaly_config.response_strategy {
364 AnomalyResponseStrategy::Ignore => {
365 filtered_batch.push(data_point);
367 }
368 AnomalyResponseStrategy::Filter => {
369 continue;
371 }
372 AnomalyResponseStrategy::Adaptive => {
373 let adapted_point = self.adapt_for_anomaly(data_point)?;
375 filtered_batch.push(adapted_point);
376 }
377 AnomalyResponseStrategy::Reset => {
378 filtered_batch.push(data_point);
380 }
381 AnomalyResponseStrategy::Custom(_) => {
382 filtered_batch.push(data_point);
384 }
385 }
386 } else {
387 filtered_batch.push(data_point);
388 }
389 }
390
391 Ok(filtered_batch)
392 }
393
394 fn adapt_for_anomaly(
396 &self,
397 mut data_point: StreamingDataPoint<A>,
398 ) -> Result<StreamingDataPoint<A>, String> {
399 let median = self.compute_feature_median(&data_point.features)?;
401
402 for (i, value) in data_point.features.iter_mut().enumerate() {
403 let diff = (*value - median[i]).abs();
404 let threshold =
405 median[i] * A::from(self.config.anomaly_config.threshold).expect("unwrap failed");
406
407 if diff > threshold {
408 let sign = if *value > median[i] {
410 A::one()
411 } else {
412 -A::one()
413 };
414 *value = median[i] + sign * threshold;
415 }
416 }
417
418 data_point.quality_score = data_point.quality_score * A::from(0.5).expect("unwrap failed");
420
421 Ok(data_point)
422 }
423
424 fn compute_feature_median(&self, features: &Array1<A>) -> Result<Array1<A>, String> {
426 Ok(features.clone())
428 }
429
430 fn should_process_buffer(&self) -> Result<bool, String> {
432 let buffer_quality = self.buffer.get_quality_metrics();
433 let buffer_size = self.buffer.current_size();
434
435 let size_threshold = self.config.buffer_config.initial_size;
437 let size_ready = buffer_size >= size_threshold;
438
439 let quality_ready = buffer_quality.average_quality
441 >= A::from(self.config.buffer_config.quality_threshold).expect("unwrap failed");
442
443 let timeout_ready = self.buffer.time_since_last_processing()
445 >= self.config.buffer_config.processing_timeout;
446
447 let resources_available = self
449 .resource_manager
450 .has_sufficient_resources_for_processing()?;
451
452 Ok((size_ready && quality_ready) || timeout_ready && resources_available)
453 }
454
455 fn compute_adaptations(
457 &mut self,
458 batch: &[StreamingDataPoint<A>],
459 drift_detected: bool,
460 ) -> Result<Vec<Adaptation<A>>, String> {
461 let mut adaptations = Vec::new();
462
463 let lr_value = self.learning_rate_controller.compute_adaptation(&[]);
466 let lr_adaptation = Adaptation {
467 adaptation_type: AdaptationType::LearningRate,
468 magnitude: lr_value,
469 target_component: String::from("learning_rate"),
470 parameters: HashMap::new(),
471 priority: AdaptationPriority::Normal,
472 timestamp: Instant::now(),
473 };
474 adaptations.push(lr_adaptation);
475
476 if drift_detected {
478 if let Some(drift_adaptation) = self.drift_detector.compute_sensitivity_adaptation()? {
479 adaptations.push(drift_adaptation);
480 }
481 }
482
483 if let Some(buffer_adaptation) = self
485 .buffer
486 .compute_size_adaptation(&self.performance_tracker)?
487 {
488 adaptations.push(buffer_adaptation);
489 }
490
491 let meta_adaptations = self
499 .meta_learner
500 .recommend_adaptations(batch, &self.performance_tracker)?;
501 adaptations.extend(meta_adaptations);
502
503 adaptations.sort_by(|a, b| b.priority.cmp(&a.priority));
505
506 Ok(adaptations)
507 }
508
509 fn apply_adaptations(&mut self, adaptations: &[Adaptation<A>]) -> Result<(), String> {
511 for adaptation in adaptations {
512 match &adaptation.adaptation_type {
513 AdaptationType::LearningRate => {
514 self.learning_rate_controller
515 .apply_adaptation(adaptation.magnitude);
516 }
517 AdaptationType::BufferSize => {
518 self.buffer.apply_size_adaptation(adaptation)?;
519 }
520 AdaptationType::DriftSensitivity => {
521 self.drift_detector
522 .apply_sensitivity_adaptation(adaptation)?;
523 }
524 AdaptationType::ResourceAllocation => {
525 }
529 AdaptationType::PerformanceThreshold => {
530 self.performance_tracker
531 .apply_threshold_adaptation(adaptation)?;
532 }
533 AdaptationType::AnomalyDetection => {
534 self.anomaly_detector.apply_adaptation(adaptation)?;
535 }
536 AdaptationType::MetaLearning => {
537 self.meta_learner.apply_adaptation(adaptation)?;
538 }
539 AdaptationType::Custom(name) => {
540 println!("Applying custom adaptation: {}", name);
542 }
543 }
544
545 if self.adaptation_history.len() >= 1000 {
547 self.adaptation_history.pop_front();
548 }
549 self.adaptation_history.push_back(adaptation.clone());
550 }
551
552 self.last_adaptation = Instant::now();
553 Ok(())
554 }
555
556 fn perform_optimization_step(
558 &mut self,
559 batch: &[StreamingDataPoint<A>],
560 ) -> Result<Array<A, D>, String> {
561 let gradients = self.compute_batch_gradients(batch)?;
563
564 let learning_rate = self.learning_rate_controller.current_rate();
566
567 let mut updated_parameters = if let Some(params) = self.parameters.clone() {
569 params
570 } else {
571 return Err("Parameters not initialized".to_string());
573 };
574
575 for (param, &grad) in updated_parameters.iter_mut().zip(gradients.iter()) {
577 *param = *param - learning_rate * grad;
578 }
579
580 Ok(updated_parameters)
581 }
582
583 fn compute_batch_gradients(
585 &self,
586 batch: &[StreamingDataPoint<A>],
587 ) -> Result<Array1<A>, String> {
588 if batch.is_empty() {
589 return Err("Cannot compute gradients from empty batch".to_string());
590 }
591
592 let feature_dim = batch[0].features.len();
593 let mut gradients = Array1::zeros(feature_dim);
594
595 for data_point in batch {
597 for (i, &feature) in data_point.features.iter().enumerate() {
598 gradients[i] = gradients[i] + feature * data_point.quality_score;
599 }
600 }
601
602 let batch_size = A::from(batch.len()).expect("unwrap failed");
604 gradients /= batch_size;
605
606 Ok(gradients)
607 }
608
609 fn evaluate_performance(
611 &self,
612 batch: &[StreamingDataPoint<A>],
613 parameters: &Array<A, D>,
614 ) -> Result<PerformanceSnapshot<A>, String> {
615 let loss = self.compute_loss(batch, parameters)?;
617 let accuracy = self.compute_accuracy(batch, parameters)?;
618 let convergence_rate = self.compute_convergence_rate(parameters)?;
619
620 let data_stats = self.compute_data_statistics(batch)?;
622
623 let resource_usage = self.resource_manager.current_usage()?;
625
626 let performance = PerformanceSnapshot {
627 timestamp: Instant::now(),
628 loss,
629 accuracy: Some(accuracy),
630 convergence_rate: Some(convergence_rate),
631 gradient_norm: Some(A::from(1.0).expect("unwrap failed")), parameter_update_magnitude: Some(A::from(0.1).expect("unwrap failed")), data_statistics: data_stats,
634 resource_usage,
635 custom_metrics: HashMap::new(),
636 };
637
638 Ok(performance)
639 }
640
641 fn compute_loss(
643 &self,
644 batch: &[StreamingDataPoint<A>],
645 _parameters: &Array<A, D>,
646 ) -> Result<A, String> {
647 let mut total_loss = A::zero();
649 let mut count = 0;
650
651 for data_point in batch {
652 if let Some(ref target) = data_point.target {
653 let prediction = &data_point.features; let diff = prediction - target;
655 let squared_diff = diff.mapv(|x| x * x);
656 total_loss = total_loss + squared_diff.sum();
657 count += 1;
658 }
659 }
660
661 if count > 0 {
662 Ok(total_loss / A::from(count).expect("unwrap failed"))
663 } else {
664 Ok(A::zero())
665 }
666 }
667
668 fn compute_accuracy(
670 &self,
671 batch: &[StreamingDataPoint<A>],
672 _parameters: &Array<A, D>,
673 ) -> Result<A, String> {
674 let mut correct = 0;
676 let mut total = 0;
677
678 for data_point in batch {
679 if data_point.target.is_some() {
680 if data_point.quality_score > A::from(0.5).expect("unwrap failed") {
682 correct += 1;
683 }
684 total += 1;
685 }
686 }
687
688 if total > 0 {
689 Ok(A::from(correct).expect("unwrap failed") / A::from(total).expect("unwrap failed"))
690 } else {
691 Ok(A::one())
692 }
693 }
694
695 fn compute_convergence_rate(&self, _parameters: &Array<A, D>) -> Result<A, String> {
697 let recent_losses = self.performance_tracker.get_recent_losses(10);
699 if recent_losses.len() >= 2 {
700 let improvement = recent_losses[0] - recent_losses[recent_losses.len() - 1];
701 Ok(improvement / recent_losses[0])
702 } else {
703 Ok(A::zero())
704 }
705 }
706
707 fn compute_data_statistics(
709 &self,
710 batch: &[StreamingDataPoint<A>],
711 ) -> Result<DataStatistics<A>, String> {
712 if batch.is_empty() {
713 return Ok(DataStatistics::default());
714 }
715
716 let feature_dim = batch[0].features.len();
717 let mut feature_means = Array1::zeros(feature_dim);
718 let mut feature_stds = Array1::zeros(feature_dim);
719 let mut quality_scores = Vec::new();
720
721 for data_point in batch {
723 feature_means = feature_means + &data_point.features;
724 quality_scores.push(data_point.quality_score);
725 }
726 feature_means /= A::from(batch.len()).expect("unwrap failed");
727
728 for data_point in batch {
730 let diff = &data_point.features - &feature_means;
731 feature_stds = feature_stds + &diff.mapv(|x| x * x);
732 }
733 feature_stds /= A::from(batch.len()).expect("unwrap failed");
734 feature_stds = feature_stds.mapv(|x| x.sqrt());
735
736 let avg_quality = quality_scores.iter().copied().sum::<A>()
737 / A::from(quality_scores.len()).expect("unwrap failed");
738
739 Ok(DataStatistics {
740 sample_count: batch.len(),
741 feature_means,
742 feature_stds,
743 average_quality: avg_quality,
744 timestamp: Instant::now(),
745 })
746 }
747
748 fn update_meta_learner(
750 &mut self,
751 batch: &[StreamingDataPoint<A>],
752 adaptations: &[Adaptation<A>],
753 performance: &PerformanceSnapshot<A>,
754 ) -> Result<(), String> {
755 if !self.config.meta_learning_config.enable_meta_learning {
756 return Ok(());
757 }
758
759 let meta_state = self.extract_meta_state(performance)?;
761
762 let meta_action = self.extract_meta_action(adaptations)?;
764
765 let reward = self.compute_meta_reward(performance)?;
767
768 self.meta_learner
770 .update_experience(meta_state, meta_action, reward)?;
771
772 Ok(())
773 }
774
775 fn extract_meta_state(
777 &self,
778 performance: &PerformanceSnapshot<A>,
779 ) -> Result<MetaState<A>, String> {
780 let state = MetaState {
781 performance_metrics: vec![
782 performance.loss,
783 performance.accuracy.unwrap_or(A::zero()),
784 performance.convergence_rate.unwrap_or(A::zero()),
785 ],
786 resource_state: vec![
787 A::from(performance.resource_usage.memory_usage_mb as f64).expect("unwrap failed"),
788 A::from(performance.resource_usage.cpu_usage_percent).expect("unwrap failed"),
789 ],
790 drift_indicators: vec![A::from(if self.drift_detector.is_drift_detected() {
791 1.0
792 } else {
793 0.0
794 })
795 .expect("unwrap failed")],
796 adaptation_history: self.adaptation_history.len(),
797 timestamp: Instant::now(),
798 };
799
800 Ok(state)
801 }
802
803 fn extract_meta_action(&self, adaptations: &[Adaptation<A>]) -> Result<MetaAction<A>, String> {
805 let mut adaptation_vector = Vec::new();
806 let mut adaptation_types = Vec::new();
807
808 for adaptation in adaptations {
809 adaptation_vector.push(adaptation.magnitude);
810 adaptation_types.push(adaptation.adaptation_type.clone());
811 }
812
813 let action = MetaAction {
814 adaptation_magnitudes: adaptation_vector,
815 adaptation_types,
816 learning_rate_change: self
817 .learning_rate_controller
818 .last_change()
819 .unwrap_or(A::zero()),
820 buffer_size_change: A::from(self.buffer.last_size_change()).unwrap_or(A::zero()),
821 timestamp: Instant::now(),
822 };
823
824 Ok(action)
825 }
826
827 fn compute_meta_reward(&self, performance: &PerformanceSnapshot<A>) -> Result<A, String> {
829 let reward = if let Some(baseline) = self.performance_baseline {
831 performance.loss - baseline } else {
833 A::zero()
834 };
835
836 Ok(reward)
837 }
838
839 pub fn get_adaptive_stats(&self) -> AdaptiveStreamingStats {
841 let mut stats = self.stats.clone();
842 stats.resource_utilization = self.resource_manager.current_usage().unwrap_or_default();
843 stats
844 }
845
846 fn count_adaptations_applied(&self) -> usize {
848 let recent_threshold = Instant::now() - Duration::from_secs(300); self.adaptation_history
850 .iter()
851 .filter(|adaptation| adaptation.timestamp > recent_threshold)
852 .count()
853 }
854
855 fn compute_performance_trend(&self) -> f64 {
857 let recent_performance = self.performance_tracker.get_recent_performance(20);
858 if recent_performance.len() >= 2 {
859 let recent_avg = recent_performance
860 .iter()
861 .rev()
862 .take(5)
863 .map(|p| p.loss.to_f64().unwrap_or(0.0))
864 .sum::<f64>()
865 / 5.0;
866
867 let older_avg = recent_performance
868 .iter()
869 .take(5)
870 .map(|p| p.loss.to_f64().unwrap_or(0.0))
871 .sum::<f64>()
872 / 5.0;
873
874 (recent_avg - older_avg) / older_avg
876 } else {
877 0.0
878 }
879 }
880
881 pub fn force_adaptation(&mut self) -> Result<(), String> {
883 let empty_batch = Vec::new();
884 let adaptations = self.compute_adaptations(&empty_batch, false)?;
885 self.apply_adaptations(&adaptations)?;
886 Ok(())
887 }
888
889 pub fn soft_reset(&mut self) -> Result<(), String> {
891 self.buffer.reset()?;
893 self.drift_detector.reset()?;
894 self.performance_tracker.reset()?;
895
896 self.stats = AdaptiveStreamingStats {
900 total_data_points: 0,
901 optimization_steps: 0,
902 drift_events: 0,
903 anomalies_detected: 0,
904 adaptations_applied: 0,
905 current_buffer_size: self.config.buffer_config.initial_size,
906 current_learning_rate: self.config.learning_rate_config.initial_rate,
907 avg_processing_time_ms: 0.0,
908 resource_utilization: ResourceUsage::default(),
909 performance_trend: 0.0,
910 meta_learning_score: self.meta_learner.get_effectiveness_score() as f64,
911 };
912
913 self.adaptation_history.clear();
914 self.performance_baseline = None;
915
916 Ok(())
917 }
918
919 pub fn get_diagnostics(&self) -> StreamingDiagnostics {
921 StreamingDiagnostics {
922 buffer_diagnostics: self.buffer.get_diagnostics(),
923 drift_diagnostics: self.drift_detector.get_diagnostics(),
924 performance_diagnostics: self.performance_tracker.get_diagnostics(),
925 resource_diagnostics: self.resource_manager.get_diagnostics(),
926 meta_learning_diagnostics: self.meta_learner.get_diagnostics(),
927 anomaly_diagnostics: self.anomaly_detector.get_diagnostics(),
928 }
929 }
930}
931
932#[derive(Debug, Clone)]
934pub struct StreamingDiagnostics {
935 pub buffer_diagnostics: BufferDiagnostics,
936 pub drift_diagnostics: DriftDiagnostics,
937 pub performance_diagnostics: PerformanceDiagnostics,
938 pub resource_diagnostics: ResourceDiagnostics,
939 pub meta_learning_diagnostics: MetaLearningDiagnostics,
940 pub anomaly_diagnostics: AnomalyDiagnostics,
941}