1use super::anomaly_detection::{
8 AnomalyDetector, AnomalyDiagnostics, EnsembleAnomalyDetector, MLAnomalyDetector,
9 StatisticalAnomalyDetector,
10};
11use super::buffering::{AdaptiveBuffer, BufferDiagnostics};
12use super::config::*;
13use super::drift_detection::{DriftDiagnostics, EnhancedDriftDetector};
14use super::meta_learning::{
15 ExperienceReplay, MetaAction, MetaLearner, MetaLearningDiagnostics, MetaState, StrategySelector,
16};
17use super::performance::{
18 DataStatistics, PerformanceDiagnostics, PerformanceSnapshot, PerformanceTracker,
19};
20use super::resource_management::{ResourceDiagnostics, ResourceManager, ResourceUsage};
21
22use crate::adaptive_selection::OptimizerType;
23use scirs2_core::ndarray::{Array, Array1, Array2, Dimension, IxDyn};
25use scirs2_core::numeric::Float;
26use scirs2_core::ScientificNumber;
27use serde::{Deserialize, Serialize};
28use std::collections::{HashMap, VecDeque};
29use std::marker::PhantomData;
30use std::sync::{Arc, Mutex};
31use std::time::{Duration, Instant};
32
33#[derive(Debug, Clone)]
36pub struct AdaptiveLearningRateController<A: Float> {
37 base_lr: A,
38}
39
40impl<A: Float> AdaptiveLearningRateController<A> {
41 pub fn new(_config: &StreamingConfig) -> Result<Self, crate::error::OptimError> {
42 Ok(Self {
43 base_lr: A::from(0.001).unwrap_or_else(|| A::one()),
44 })
45 }
46
47 pub fn update_learning_rate(&mut self, _gradient: &Array1<A>) -> A {
48 self.base_lr
49 }
50
51 pub fn current_rate(&self) -> A {
52 self.base_lr
53 }
54
55 pub fn compute_adaptation(&self, _performance_metrics: &[A]) -> A {
56 self.base_lr
58 }
59
60 pub fn apply_adaptation(&mut self, adaptation: A) {
61 self.base_lr = adaptation;
63 }
64
65 pub fn last_change(&self) -> Option<A> {
66 None
68 }
69}
70
71#[derive(Debug, Clone)]
73pub struct StreamingDataPoint<A: Float + Send + Sync> {
74 pub features: Array1<A>,
76 pub target: Option<Array1<A>>,
78 pub timestamp: Instant,
80 pub source_id: Option<String>,
82 pub quality_score: A,
84 pub metadata: HashMap<String, String>,
86}
87
88#[derive(Debug, Clone)]
90pub struct Adaptation<A: Float + Send + Sync> {
91 pub adaptation_type: AdaptationType,
93 pub magnitude: A,
95 pub target_component: String,
97 pub parameters: HashMap<String, A>,
99 pub priority: AdaptationPriority,
101 pub timestamp: Instant,
103}
104
105#[derive(Debug, Clone, PartialEq, Eq)]
107pub enum AdaptationType {
108 LearningRate,
110 BufferSize,
112 DriftSensitivity,
114 ResourceAllocation,
116 PerformanceThreshold,
118 AnomalyDetection,
120 MetaLearning,
122 Custom(String),
124}
125
126#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
128pub enum AdaptationPriority {
129 Low = 0,
131 Normal = 1,
133 High = 2,
135 Critical = 3,
137}
138
139#[derive(Debug, Clone, Serialize)]
141pub struct AdaptiveStreamingStats {
142 pub total_data_points: usize,
144 pub optimization_steps: usize,
146 pub drift_events: usize,
148 pub anomalies_detected: usize,
150 pub adaptations_applied: usize,
152 pub current_buffer_size: usize,
154 pub current_learning_rate: f64,
156 pub avg_processing_time_ms: f64,
158 pub resource_utilization: ResourceUsage,
160 pub performance_trend: f64,
162 pub meta_learning_score: f64,
164}
165
166pub struct AdaptiveStreamingOptimizer<O, A, D>
168where
169 A: Float + Default + Clone + Send + Sync + std::iter::Sum,
170 D: Dimension,
171{
172 base_optimizer: O,
174 config: StreamingConfig,
176 buffer: AdaptiveBuffer<A>,
178 drift_detector: EnhancedDriftDetector<A>,
180 performance_tracker: PerformanceTracker<A>,
182 resource_manager: ResourceManager,
184 meta_learner: MetaLearner<A>,
186 anomaly_detector: AnomalyDetector<A>,
188 learning_rate_controller: AdaptiveLearningRateController<A>,
190 parameters: Option<Array<A, D>>,
192 stats: AdaptiveStreamingStats,
194 last_adaptation: Instant,
196 adaptation_history: VecDeque<Adaptation<A>>,
198 performance_baseline: Option<A>,
200 _phantom: PhantomData<D>,
202}
203
204impl<O, A, D> AdaptiveStreamingOptimizer<O, A, D>
205where
206 A: Float
207 + Default
208 + Clone
209 + Send
210 + Sync
211 + std::iter::Sum
212 + std::fmt::Debug
213 + std::ops::DivAssign
214 + scirs2_core::ndarray::ScalarOperand
215 + 'static,
216 D: Dimension,
217 O: Clone,
218{
219 pub fn new(base_optimizer: O, config: StreamingConfig) -> Result<Self, String> {
221 config.validate()?;
223
224 let buffer = AdaptiveBuffer::new(&config)?;
225 let drift_detector = EnhancedDriftDetector::new(&config)?;
226 let performance_tracker = PerformanceTracker::new(&config)?;
227 let resource_manager = ResourceManager::new(&config)?;
228 let meta_learner = MetaLearner::new(&config)?;
229 let anomaly_detector = AnomalyDetector::new(&config)?;
230 let learning_rate_controller =
231 AdaptiveLearningRateController::new(&config).map_err(|e| e.to_string())?;
232
233 let stats = AdaptiveStreamingStats {
234 total_data_points: 0,
235 optimization_steps: 0,
236 drift_events: 0,
237 anomalies_detected: 0,
238 adaptations_applied: 0,
239 current_buffer_size: config.buffer_config.initial_size,
240 current_learning_rate: config.learning_rate_config.initial_rate,
241 avg_processing_time_ms: 0.0,
242 resource_utilization: ResourceUsage::default(),
243 performance_trend: 0.0,
244 meta_learning_score: 0.0,
245 };
246
247 Ok(Self {
248 base_optimizer,
249 config,
250 buffer,
251 drift_detector,
252 performance_tracker,
253 resource_manager,
254 meta_learner,
255 anomaly_detector,
256 learning_rate_controller,
257 parameters: None,
258 stats,
259 last_adaptation: Instant::now(),
260 adaptation_history: VecDeque::with_capacity(1000),
261 performance_baseline: None,
262 _phantom: PhantomData,
263 })
264 }
265
266 pub fn adaptive_step(
268 &mut self,
269 data_batch: Vec<StreamingDataPoint<A>>,
270 ) -> Result<Array<A, D>, String> {
271 let start_time = Instant::now();
272
273 self.resource_manager.update_utilization()?;
275
276 let filtered_batch = self.filter_anomalies(data_batch)?;
278 self.buffer.add_batch(filtered_batch)?;
279
280 if !self.should_process_buffer()? {
282 return self
283 .parameters
284 .clone()
285 .ok_or("No parameters available".to_string());
286 }
287
288 let processing_batch = self.buffer.get_batch_for_processing()?;
290 self.stats.total_data_points += processing_batch.len();
291
292 let drift_detected = self.drift_detector.detect_drift(&processing_batch)?;
294 if drift_detected {
295 self.stats.drift_events += 1;
296 }
297
298 let adaptations = self.compute_adaptations(&processing_batch, drift_detected)?;
300
301 self.apply_adaptations(&adaptations)?;
303
304 let updated_parameters = self.perform_optimization_step(&processing_batch)?;
306
307 let performance = self.evaluate_performance(&processing_batch, &updated_parameters)?;
309
310 self.performance_tracker
312 .add_performance(performance.clone())?;
313
314 self.update_meta_learner(&processing_batch, &adaptations, &performance)?;
316
317 self.stats.optimization_steps += 1;
319 self.stats.adaptations_applied += adaptations.len();
320 self.stats.current_buffer_size = self.buffer.current_size();
321 self.stats.current_learning_rate = self
322 .learning_rate_controller
323 .current_rate()
324 .to_f64()
325 .unwrap_or(0.0);
326 self.stats.performance_trend = self.compute_performance_trend();
327 self.stats.meta_learning_score = self
328 .meta_learner
329 .get_effectiveness_score()
330 .to_f64()
331 .unwrap_or(0.0);
332
333 let processing_time = start_time.elapsed().as_millis() as f64;
334 self.stats.avg_processing_time_ms = (self.stats.avg_processing_time_ms
335 * (self.stats.optimization_steps - 1) as f64
336 + processing_time)
337 / self.stats.optimization_steps as f64;
338
339 self.parameters = Some(updated_parameters.clone());
341
342 Ok(updated_parameters)
343 }
344
345 fn filter_anomalies(
347 &mut self,
348 data_batch: Vec<StreamingDataPoint<A>>,
349 ) -> Result<Vec<StreamingDataPoint<A>>, String> {
350 if !self.config.anomaly_config.enable_detection {
351 return Ok(data_batch);
352 }
353
354 let mut filtered_batch = Vec::new();
355
356 for data_point in data_batch {
357 let is_anomaly = self.anomaly_detector.detect_anomaly(&data_point)?;
358
359 if is_anomaly {
360 self.stats.anomalies_detected += 1;
361
362 match &self.config.anomaly_config.response_strategy {
364 AnomalyResponseStrategy::Ignore => {
365 filtered_batch.push(data_point);
367 }
368 AnomalyResponseStrategy::Filter => {
369 continue;
371 }
372 AnomalyResponseStrategy::Adaptive => {
373 let adapted_point = self.adapt_for_anomaly(data_point)?;
375 filtered_batch.push(adapted_point);
376 }
377 AnomalyResponseStrategy::Reset => {
378 filtered_batch.push(data_point);
380 }
381 AnomalyResponseStrategy::Custom(_) => {
382 filtered_batch.push(data_point);
384 }
385 }
386 } else {
387 filtered_batch.push(data_point);
388 }
389 }
390
391 Ok(filtered_batch)
392 }
393
394 fn adapt_for_anomaly(
396 &self,
397 mut data_point: StreamingDataPoint<A>,
398 ) -> Result<StreamingDataPoint<A>, String> {
399 let median = self.compute_feature_median(&data_point.features)?;
401
402 for (i, value) in data_point.features.iter_mut().enumerate() {
403 let diff = (*value - median[i]).abs();
404 let threshold = median[i] * A::from(self.config.anomaly_config.threshold).unwrap();
405
406 if diff > threshold {
407 let sign = if *value > median[i] {
409 A::one()
410 } else {
411 -A::one()
412 };
413 *value = median[i] + sign * threshold;
414 }
415 }
416
417 data_point.quality_score = data_point.quality_score * A::from(0.5).unwrap();
419
420 Ok(data_point)
421 }
422
423 fn compute_feature_median(&self, features: &Array1<A>) -> Result<Array1<A>, String> {
425 Ok(features.clone())
427 }
428
429 fn should_process_buffer(&self) -> Result<bool, String> {
431 let buffer_quality = self.buffer.get_quality_metrics();
432 let buffer_size = self.buffer.current_size();
433
434 let size_threshold = self.config.buffer_config.initial_size;
436 let size_ready = buffer_size >= size_threshold;
437
438 let quality_ready = buffer_quality.average_quality
440 >= A::from(self.config.buffer_config.quality_threshold).unwrap();
441
442 let timeout_ready = self.buffer.time_since_last_processing()
444 >= self.config.buffer_config.processing_timeout;
445
446 let resources_available = self
448 .resource_manager
449 .has_sufficient_resources_for_processing()?;
450
451 Ok((size_ready && quality_ready) || timeout_ready && resources_available)
452 }
453
454 fn compute_adaptations(
456 &mut self,
457 batch: &[StreamingDataPoint<A>],
458 drift_detected: bool,
459 ) -> Result<Vec<Adaptation<A>>, String> {
460 let mut adaptations = Vec::new();
461
462 let lr_value = self.learning_rate_controller.compute_adaptation(&[]);
465 let lr_adaptation = Adaptation {
466 adaptation_type: AdaptationType::LearningRate,
467 magnitude: lr_value,
468 target_component: String::from("learning_rate"),
469 parameters: HashMap::new(),
470 priority: AdaptationPriority::Normal,
471 timestamp: Instant::now(),
472 };
473 adaptations.push(lr_adaptation);
474
475 if drift_detected {
477 if let Some(drift_adaptation) = self.drift_detector.compute_sensitivity_adaptation()? {
478 adaptations.push(drift_adaptation);
479 }
480 }
481
482 if let Some(buffer_adaptation) = self
484 .buffer
485 .compute_size_adaptation(&self.performance_tracker)?
486 {
487 adaptations.push(buffer_adaptation);
488 }
489
490 let meta_adaptations = self
498 .meta_learner
499 .recommend_adaptations(batch, &self.performance_tracker)?;
500 adaptations.extend(meta_adaptations);
501
502 adaptations.sort_by(|a, b| b.priority.cmp(&a.priority));
504
505 Ok(adaptations)
506 }
507
508 fn apply_adaptations(&mut self, adaptations: &[Adaptation<A>]) -> Result<(), String> {
510 for adaptation in adaptations {
511 match &adaptation.adaptation_type {
512 AdaptationType::LearningRate => {
513 self.learning_rate_controller
514 .apply_adaptation(adaptation.magnitude);
515 }
516 AdaptationType::BufferSize => {
517 self.buffer.apply_size_adaptation(adaptation)?;
518 }
519 AdaptationType::DriftSensitivity => {
520 self.drift_detector
521 .apply_sensitivity_adaptation(adaptation)?;
522 }
523 AdaptationType::ResourceAllocation => {
524 }
528 AdaptationType::PerformanceThreshold => {
529 self.performance_tracker
530 .apply_threshold_adaptation(adaptation)?;
531 }
532 AdaptationType::AnomalyDetection => {
533 self.anomaly_detector.apply_adaptation(adaptation)?;
534 }
535 AdaptationType::MetaLearning => {
536 self.meta_learner.apply_adaptation(adaptation)?;
537 }
538 AdaptationType::Custom(name) => {
539 println!("Applying custom adaptation: {}", name);
541 }
542 }
543
544 if self.adaptation_history.len() >= 1000 {
546 self.adaptation_history.pop_front();
547 }
548 self.adaptation_history.push_back(adaptation.clone());
549 }
550
551 self.last_adaptation = Instant::now();
552 Ok(())
553 }
554
555 fn perform_optimization_step(
557 &mut self,
558 batch: &[StreamingDataPoint<A>],
559 ) -> Result<Array<A, D>, String> {
560 let gradients = self.compute_batch_gradients(batch)?;
562
563 let learning_rate = self.learning_rate_controller.current_rate();
565
566 let mut updated_parameters = if let Some(params) = self.parameters.clone() {
568 params
569 } else {
570 return Err("Parameters not initialized".to_string());
572 };
573
574 for (param, &grad) in updated_parameters.iter_mut().zip(gradients.iter()) {
576 *param = *param - learning_rate * grad;
577 }
578
579 Ok(updated_parameters)
580 }
581
582 fn compute_batch_gradients(
584 &self,
585 batch: &[StreamingDataPoint<A>],
586 ) -> Result<Array1<A>, String> {
587 if batch.is_empty() {
588 return Err("Cannot compute gradients from empty batch".to_string());
589 }
590
591 let feature_dim = batch[0].features.len();
592 let mut gradients = Array1::zeros(feature_dim);
593
594 for data_point in batch {
596 for (i, &feature) in data_point.features.iter().enumerate() {
597 gradients[i] = gradients[i] + feature * data_point.quality_score;
598 }
599 }
600
601 let batch_size = A::from(batch.len()).unwrap();
603 gradients /= batch_size;
604
605 Ok(gradients)
606 }
607
608 fn evaluate_performance(
610 &self,
611 batch: &[StreamingDataPoint<A>],
612 parameters: &Array<A, D>,
613 ) -> Result<PerformanceSnapshot<A>, String> {
614 let loss = self.compute_loss(batch, parameters)?;
616 let accuracy = self.compute_accuracy(batch, parameters)?;
617 let convergence_rate = self.compute_convergence_rate(parameters)?;
618
619 let data_stats = self.compute_data_statistics(batch)?;
621
622 let resource_usage = self.resource_manager.current_usage()?;
624
625 let performance = PerformanceSnapshot {
626 timestamp: Instant::now(),
627 loss,
628 accuracy: Some(accuracy),
629 convergence_rate: Some(convergence_rate),
630 gradient_norm: Some(A::from(1.0).unwrap()), parameter_update_magnitude: Some(A::from(0.1).unwrap()), data_statistics: data_stats,
633 resource_usage,
634 custom_metrics: HashMap::new(),
635 };
636
637 Ok(performance)
638 }
639
640 fn compute_loss(
642 &self,
643 batch: &[StreamingDataPoint<A>],
644 _parameters: &Array<A, D>,
645 ) -> Result<A, String> {
646 let mut total_loss = A::zero();
648 let mut count = 0;
649
650 for data_point in batch {
651 if let Some(ref target) = data_point.target {
652 let prediction = &data_point.features; let diff = prediction - target;
654 let squared_diff = diff.mapv(|x| x * x);
655 total_loss = total_loss + squared_diff.sum();
656 count += 1;
657 }
658 }
659
660 if count > 0 {
661 Ok(total_loss / A::from(count).unwrap())
662 } else {
663 Ok(A::zero())
664 }
665 }
666
667 fn compute_accuracy(
669 &self,
670 batch: &[StreamingDataPoint<A>],
671 _parameters: &Array<A, D>,
672 ) -> Result<A, String> {
673 let mut correct = 0;
675 let mut total = 0;
676
677 for data_point in batch {
678 if data_point.target.is_some() {
679 if data_point.quality_score > A::from(0.5).unwrap() {
681 correct += 1;
682 }
683 total += 1;
684 }
685 }
686
687 if total > 0 {
688 Ok(A::from(correct).unwrap() / A::from(total).unwrap())
689 } else {
690 Ok(A::one())
691 }
692 }
693
694 fn compute_convergence_rate(&self, _parameters: &Array<A, D>) -> Result<A, String> {
696 let recent_losses = self.performance_tracker.get_recent_losses(10);
698 if recent_losses.len() >= 2 {
699 let improvement = recent_losses[0] - recent_losses[recent_losses.len() - 1];
700 Ok(improvement / recent_losses[0])
701 } else {
702 Ok(A::zero())
703 }
704 }
705
706 fn compute_data_statistics(
708 &self,
709 batch: &[StreamingDataPoint<A>],
710 ) -> Result<DataStatistics<A>, String> {
711 if batch.is_empty() {
712 return Ok(DataStatistics::default());
713 }
714
715 let feature_dim = batch[0].features.len();
716 let mut feature_means = Array1::zeros(feature_dim);
717 let mut feature_stds = Array1::zeros(feature_dim);
718 let mut quality_scores = Vec::new();
719
720 for data_point in batch {
722 feature_means = feature_means + &data_point.features;
723 quality_scores.push(data_point.quality_score);
724 }
725 feature_means /= A::from(batch.len()).unwrap();
726
727 for data_point in batch {
729 let diff = &data_point.features - &feature_means;
730 feature_stds = feature_stds + &diff.mapv(|x| x * x);
731 }
732 feature_stds /= A::from(batch.len()).unwrap();
733 feature_stds = feature_stds.mapv(|x| x.sqrt());
734
735 let avg_quality =
736 quality_scores.iter().copied().sum::<A>() / A::from(quality_scores.len()).unwrap();
737
738 Ok(DataStatistics {
739 sample_count: batch.len(),
740 feature_means,
741 feature_stds,
742 average_quality: avg_quality,
743 timestamp: Instant::now(),
744 })
745 }
746
747 fn update_meta_learner(
749 &mut self,
750 batch: &[StreamingDataPoint<A>],
751 adaptations: &[Adaptation<A>],
752 performance: &PerformanceSnapshot<A>,
753 ) -> Result<(), String> {
754 if !self.config.meta_learning_config.enable_meta_learning {
755 return Ok(());
756 }
757
758 let meta_state = self.extract_meta_state(performance)?;
760
761 let meta_action = self.extract_meta_action(adaptations)?;
763
764 let reward = self.compute_meta_reward(performance)?;
766
767 self.meta_learner
769 .update_experience(meta_state, meta_action, reward)?;
770
771 Ok(())
772 }
773
774 fn extract_meta_state(
776 &self,
777 performance: &PerformanceSnapshot<A>,
778 ) -> Result<MetaState<A>, String> {
779 let state = MetaState {
780 performance_metrics: vec![
781 performance.loss,
782 performance.accuracy.unwrap_or(A::zero()),
783 performance.convergence_rate.unwrap_or(A::zero()),
784 ],
785 resource_state: vec![
786 A::from(performance.resource_usage.memory_usage_mb as f64).unwrap(),
787 A::from(performance.resource_usage.cpu_usage_percent).unwrap(),
788 ],
789 drift_indicators: vec![A::from(if self.drift_detector.is_drift_detected() {
790 1.0
791 } else {
792 0.0
793 })
794 .unwrap()],
795 adaptation_history: self.adaptation_history.len(),
796 timestamp: Instant::now(),
797 };
798
799 Ok(state)
800 }
801
802 fn extract_meta_action(&self, adaptations: &[Adaptation<A>]) -> Result<MetaAction<A>, String> {
804 let mut adaptation_vector = Vec::new();
805 let mut adaptation_types = Vec::new();
806
807 for adaptation in adaptations {
808 adaptation_vector.push(adaptation.magnitude);
809 adaptation_types.push(adaptation.adaptation_type.clone());
810 }
811
812 let action = MetaAction {
813 adaptation_magnitudes: adaptation_vector,
814 adaptation_types,
815 learning_rate_change: self
816 .learning_rate_controller
817 .last_change()
818 .unwrap_or(A::zero()),
819 buffer_size_change: A::from(self.buffer.last_size_change()).unwrap_or(A::zero()),
820 timestamp: Instant::now(),
821 };
822
823 Ok(action)
824 }
825
826 fn compute_meta_reward(&self, performance: &PerformanceSnapshot<A>) -> Result<A, String> {
828 let reward = if let Some(baseline) = self.performance_baseline {
830 performance.loss - baseline } else {
832 A::zero()
833 };
834
835 Ok(reward)
836 }
837
838 pub fn get_adaptive_stats(&self) -> AdaptiveStreamingStats {
840 let mut stats = self.stats.clone();
841 stats.resource_utilization = self.resource_manager.current_usage().unwrap_or_default();
842 stats
843 }
844
845 fn count_adaptations_applied(&self) -> usize {
847 let recent_threshold = Instant::now() - Duration::from_secs(300); self.adaptation_history
849 .iter()
850 .filter(|adaptation| adaptation.timestamp > recent_threshold)
851 .count()
852 }
853
854 fn compute_performance_trend(&self) -> f64 {
856 let recent_performance = self.performance_tracker.get_recent_performance(20);
857 if recent_performance.len() >= 2 {
858 let recent_avg = recent_performance
859 .iter()
860 .rev()
861 .take(5)
862 .map(|p| p.loss.to_f64().unwrap_or(0.0))
863 .sum::<f64>()
864 / 5.0;
865
866 let older_avg = recent_performance
867 .iter()
868 .take(5)
869 .map(|p| p.loss.to_f64().unwrap_or(0.0))
870 .sum::<f64>()
871 / 5.0;
872
873 (recent_avg - older_avg) / older_avg
875 } else {
876 0.0
877 }
878 }
879
880 pub fn force_adaptation(&mut self) -> Result<(), String> {
882 let empty_batch = Vec::new();
883 let adaptations = self.compute_adaptations(&empty_batch, false)?;
884 self.apply_adaptations(&adaptations)?;
885 Ok(())
886 }
887
888 pub fn soft_reset(&mut self) -> Result<(), String> {
890 self.buffer.reset()?;
892 self.drift_detector.reset()?;
893 self.performance_tracker.reset()?;
894
895 self.stats = AdaptiveStreamingStats {
899 total_data_points: 0,
900 optimization_steps: 0,
901 drift_events: 0,
902 anomalies_detected: 0,
903 adaptations_applied: 0,
904 current_buffer_size: self.config.buffer_config.initial_size,
905 current_learning_rate: self.config.learning_rate_config.initial_rate,
906 avg_processing_time_ms: 0.0,
907 resource_utilization: ResourceUsage::default(),
908 performance_trend: 0.0,
909 meta_learning_score: self.meta_learner.get_effectiveness_score() as f64,
910 };
911
912 self.adaptation_history.clear();
913 self.performance_baseline = None;
914
915 Ok(())
916 }
917
918 pub fn get_diagnostics(&self) -> StreamingDiagnostics {
920 StreamingDiagnostics {
921 buffer_diagnostics: self.buffer.get_diagnostics(),
922 drift_diagnostics: self.drift_detector.get_diagnostics(),
923 performance_diagnostics: self.performance_tracker.get_diagnostics(),
924 resource_diagnostics: self.resource_manager.get_diagnostics(),
925 meta_learning_diagnostics: self.meta_learner.get_diagnostics(),
926 anomaly_diagnostics: self.anomaly_detector.get_diagnostics(),
927 }
928 }
929}
930
931#[derive(Debug, Clone)]
933pub struct StreamingDiagnostics {
934 pub buffer_diagnostics: BufferDiagnostics,
935 pub drift_diagnostics: DriftDiagnostics,
936 pub performance_diagnostics: PerformanceDiagnostics,
937 pub resource_diagnostics: ResourceDiagnostics,
938 pub meta_learning_diagnostics: MetaLearningDiagnostics,
939 pub anomaly_diagnostics: AnomalyDiagnostics,
940}