1use super::config::*;
8use super::optimizer::{Adaptation, AdaptationPriority, AdaptationType, StreamingDataPoint};
9use super::performance::{PerformanceSnapshot, PerformanceTracker};
10
11use scirs2_core::numeric::Float;
12use serde::{Deserialize, Serialize};
13use std::cmp::Ordering;
14use std::collections::{BinaryHeap, HashMap, VecDeque};
15use std::time::{Duration, Instant};
16
17pub struct AdaptiveBuffer<A: Float + Send + Sync> {
19 config: BufferConfig,
21 buffer: BinaryHeap<PrioritizedDataPoint<A>>,
23 secondary_buffer: VecDeque<StreamingDataPoint<A>>,
25 quality_metrics: BufferQualityMetrics<A>,
27 sizing_strategy: BufferSizingStrategy<A>,
29 retention_policy: DataRetentionPolicy<A>,
31 statistics: BufferStatistics<A>,
33 last_processing: Instant,
35 size_change_log: VecDeque<SizeChangeEvent>,
37}
38
39#[derive(Debug, Clone)]
41pub struct PrioritizedDataPoint<A: Float + Send + Sync> {
42 pub data_point: StreamingDataPoint<A>,
44 pub priority_score: A,
46 pub buffer_timestamp: Instant,
48 pub expected_processing_time: Duration,
50 pub freshness_score: A,
52 pub relevance_score: A,
54}
55
56#[derive(Debug, Clone)]
58pub struct BufferQualityMetrics<A: Float + Send + Sync> {
59 pub average_quality: A,
61 pub quality_variance: A,
63 pub min_quality: A,
65 pub max_quality: A,
67 pub freshness_distribution: Vec<A>,
69 pub priority_distribution: Vec<A>,
71 pub quality_trend: QualityTrend<A>,
73}
74
75#[derive(Debug, Clone)]
77pub struct QualityTrend<A: Float + Send + Sync> {
78 pub recent_changes: VecDeque<A>,
80 pub trend_direction: TrendDirection,
82 pub trend_magnitude: A,
84 pub confidence: A,
86}
87
88#[derive(Debug, Clone, PartialEq, Eq)]
90pub enum TrendDirection {
91 Improving,
93 Degrading,
95 Stable,
97 Oscillating,
99}
100
101pub struct BufferSizingStrategy<A: Float + Send + Sync> {
103 strategy_type: BufferSizeStrategy,
105 target_size: usize,
107 adjustment_params: SizeAdjustmentParams<A>,
109 performance_feedback: VecDeque<SizingPerformanceFeedback<A>>,
111 sizing_history: VecDeque<SizingEvent>,
113}
114
115#[derive(Debug, Clone)]
117pub struct SizeAdjustmentParams<A: Float + Send + Sync> {
118 pub growth_rate: A,
120 pub shrinkage_rate: A,
122 pub stability_threshold: A,
124 pub performance_sensitivity: A,
126 pub quality_sensitivity: A,
128 pub memory_sensitivity: A,
130}
131
132#[derive(Debug, Clone)]
134pub struct SizingPerformanceFeedback<A: Float + Send + Sync> {
135 pub buffer_size: usize,
137 pub processing_latency: Duration,
139 pub throughput: A,
141 pub quality_score: A,
143 pub memory_usage: usize,
145 pub timestamp: Instant,
147}
148
149#[derive(Debug, Clone)]
151pub struct SizingEvent {
152 pub timestamp: Instant,
154 pub old_size: usize,
156 pub new_size: usize,
158 pub reason: SizingReason,
160 pub performance_impact: Option<f64>,
162}
163
164#[derive(Debug, Clone)]
166pub enum SizingReason {
167 PerformanceOptimization,
169 QualityImprovement,
171 MemoryPressure,
173 LatencyRequirement,
175 ThroughputOptimization,
177 Manual,
179 Configuration,
181}
182
183pub struct DataRetentionPolicy<A: Float + Send + Sync> {
185 strategy: RetentionStrategy,
187 age_policy: AgeBasedRetention,
189 quality_policy: QualityBasedRetention<A>,
191 relevance_policy: RelevanceBasedRetention<A>,
193 retention_scorer: RetentionScorer<A>,
195}
196
197#[derive(Debug, Clone)]
199pub enum RetentionStrategy {
200 FIFO,
202 LIFO,
204 LRU,
206 Priority,
208 Quality,
210 Age,
212 Hybrid,
214 Adaptive,
216}
217
218#[derive(Debug, Clone)]
220pub struct AgeBasedRetention {
221 pub max_age: Duration,
223 pub soft_age_limit: Duration,
225 pub age_weight: f64,
227 pub adaptive_limits: bool,
229}
230
231#[derive(Debug, Clone)]
233pub struct QualityBasedRetention<A: Float + Send + Sync> {
234 pub min_quality_threshold: A,
236 pub quality_weight: A,
238 pub adaptive_thresholds: bool,
240 pub quality_targets: QualityDistributionTargets<A>,
242}
243
244#[derive(Debug, Clone)]
246pub struct QualityDistributionTargets<A: Float + Send + Sync> {
247 pub high_quality_target: A,
249 pub medium_quality_target: A,
251 pub low_quality_target: A,
253 pub high_quality_threshold: A,
255 pub medium_quality_threshold: A,
256}
257
258#[derive(Debug, Clone)]
260pub struct RelevanceBasedRetention<A: Float + Send + Sync> {
261 pub relevance_method: RelevanceMethod,
263 pub relevance_weight: A,
265 pub temporal_decay: bool,
267 pub decay_rate: A,
269}
270
271#[derive(Debug, Clone)]
273pub enum RelevanceMethod {
274 Distance,
276 Similarity,
278 FeatureImportance,
280 Uncertainty,
282 Diversity,
284 Custom(String),
286}
287
288pub struct RetentionScorer<A: Float + Send + Sync> {
290 weights: RetentionWeights<A>,
292 scoring_history: VecDeque<RetentionScore<A>>,
294 performance_feedback: VecDeque<RetentionPerformanceFeedback<A>>,
296}
297
298#[derive(Debug, Clone)]
300pub struct RetentionWeights<A: Float + Send + Sync> {
301 pub age_weight: A,
303 pub quality_weight: A,
305 pub relevance_weight: A,
307 pub priority_weight: A,
309 pub freshness_weight: A,
311 pub diversity_weight: A,
313}
314
315#[derive(Debug, Clone)]
317pub struct RetentionScore<A: Float + Send + Sync> {
318 pub overall_score: A,
320 pub component_scores: HashMap<String, A>,
322 pub should_retain: bool,
324 pub confidence: A,
326 pub timestamp: Instant,
328}
329
330#[derive(Debug, Clone)]
332pub struct RetentionPerformanceFeedback<A: Float + Send + Sync> {
333 pub items_retained: usize,
335 pub items_discarded: usize,
337 pub retained_quality: A,
339 pub discarded_quality: A,
341 pub performance_impact: A,
343 pub timestamp: Instant,
345}
346
347#[derive(Debug, Clone)]
349pub struct BufferStatistics<A: Float + Send + Sync> {
350 pub total_items_processed: u64,
352 pub total_items_discarded: u64,
354 pub avg_buffer_utilization: A,
356 pub peak_buffer_utilization: A,
358 pub avg_processing_latency: Duration,
360 pub throughput_stats: ThroughputStatistics<A>,
362 pub quality_stats: QualityStatistics<A>,
364 pub memory_stats: MemoryStatistics,
366}
367
368#[derive(Debug, Clone)]
370pub struct ThroughputStatistics<A: Float + Send + Sync> {
371 pub current_throughput: A,
373 pub avg_throughput: A,
375 pub peak_throughput: A,
377 pub throughput_trend: TrendDirection,
379 pub stability: A,
381}
382
383#[derive(Debug, Clone)]
385pub struct QualityStatistics<A: Float + Send + Sync> {
386 pub current_avg_quality: A,
388 pub historical_avg_quality: A,
390 pub quality_improvement_rate: A,
392 pub quality_distribution: HashMap<String, A>,
394 pub predicted_quality: Option<A>,
396}
397
398#[derive(Debug, Clone)]
400pub struct MemoryStatistics {
401 pub current_usage_bytes: usize,
403 pub peak_usage_bytes: usize,
405 pub avg_usage_bytes: usize,
407 pub memory_efficiency: f64,
409 pub fragmentation: f64,
411}
412
413#[derive(Debug, Clone)]
415pub struct SizeChangeEvent {
416 pub timestamp: Instant,
418 pub old_size: usize,
420 pub new_size: usize,
422 pub change_magnitude: i32,
424 pub reason: String,
426}
427
428impl<A: Float + Default + Clone + Send + Sync + std::iter::Sum + std::fmt::Debug>
429 AdaptiveBuffer<A>
430{
431 pub fn new(config: &StreamingConfig) -> Result<Self, String> {
433 let buffer_config = config.buffer_config.clone();
434
435 let quality_metrics = BufferQualityMetrics {
436 average_quality: A::zero(),
437 quality_variance: A::zero(),
438 min_quality: A::one(),
439 max_quality: A::zero(),
440 freshness_distribution: Vec::new(),
441 priority_distribution: Vec::new(),
442 quality_trend: QualityTrend {
443 recent_changes: VecDeque::with_capacity(50),
444 trend_direction: TrendDirection::Stable,
445 trend_magnitude: A::zero(),
446 confidence: A::zero(),
447 },
448 };
449
450 let sizing_strategy = BufferSizingStrategy::new(
451 buffer_config.size_strategy.clone(),
452 buffer_config.initial_size,
453 );
454
455 let retention_policy = DataRetentionPolicy::new(RetentionStrategy::Hybrid);
456
457 let statistics = BufferStatistics {
458 total_items_processed: 0,
459 total_items_discarded: 0,
460 avg_buffer_utilization: A::zero(),
461 peak_buffer_utilization: A::zero(),
462 avg_processing_latency: Duration::ZERO,
463 throughput_stats: ThroughputStatistics {
464 current_throughput: A::zero(),
465 avg_throughput: A::zero(),
466 peak_throughput: A::zero(),
467 throughput_trend: TrendDirection::Stable,
468 stability: A::zero(),
469 },
470 quality_stats: QualityStatistics {
471 current_avg_quality: A::zero(),
472 historical_avg_quality: A::zero(),
473 quality_improvement_rate: A::zero(),
474 quality_distribution: HashMap::new(),
475 predicted_quality: None,
476 },
477 memory_stats: MemoryStatistics {
478 current_usage_bytes: 0,
479 peak_usage_bytes: 0,
480 avg_usage_bytes: 0,
481 memory_efficiency: 0.0,
482 fragmentation: 0.0,
483 },
484 };
485
486 Ok(Self {
487 config: buffer_config,
488 buffer: BinaryHeap::new(),
489 secondary_buffer: VecDeque::new(),
490 quality_metrics,
491 sizing_strategy,
492 retention_policy,
493 statistics,
494 last_processing: Instant::now(),
495 size_change_log: VecDeque::with_capacity(100),
496 })
497 }
498
499 pub fn add_batch(&mut self, batch: Vec<StreamingDataPoint<A>>) -> Result<(), String> {
501 for data_point in batch {
502 self.add_single_point(data_point)?;
503 }
504
505 self.update_quality_metrics()?;
507
508 self.check_buffer_resizing()?;
510
511 if self.current_size() > self.sizing_strategy.target_size {
513 self.apply_retention_policy()?;
514 }
515
516 Ok(())
517 }
518
519 fn add_single_point(&mut self, data_point: StreamingDataPoint<A>) -> Result<(), String> {
521 let priority_score = self.calculate_priority_score(&data_point)?;
523
524 let freshness_score = self.calculate_freshness_score(&data_point);
526 let relevance_score = self.calculate_relevance_score(&data_point)?;
527
528 let prioritized_point = PrioritizedDataPoint {
529 data_point,
530 priority_score,
531 buffer_timestamp: Instant::now(),
532 expected_processing_time: Duration::from_millis(100), freshness_score,
534 relevance_score,
535 };
536
537 if priority_score >= A::from(self.config.quality_threshold).unwrap() {
539 self.buffer.push(prioritized_point);
540 } else {
541 self.secondary_buffer
543 .push_back(prioritized_point.data_point);
544 }
545
546 self.statistics.total_items_processed += 1;
548
549 Ok(())
550 }
551
552 fn calculate_priority_score(&self, data_point: &StreamingDataPoint<A>) -> Result<A, String> {
554 let mut score = data_point.quality_score;
555
556 let age = data_point.timestamp.elapsed().as_secs_f64();
558 let recency_bonus = A::from(1.0 / (1.0 + age / 3600.0)).unwrap(); score = score + recency_bonus * A::from(0.1).unwrap();
560
561 let novelty_score = self.calculate_novelty_score(data_point)?;
563 score = score + novelty_score * A::from(0.2).unwrap();
564
565 Ok(score)
566 }
567
568 fn calculate_novelty_score(&self, data_point: &StreamingDataPoint<A>) -> Result<A, String> {
570 if self.buffer.is_empty() {
572 return Ok(A::from(0.5).unwrap()); }
574
575 let recent_points: Vec<_> = self.buffer.iter().take(10).collect();
577 if recent_points.is_empty() {
578 return Ok(A::from(0.5).unwrap());
579 }
580
581 let mut total_distance = A::zero();
582 for recent_point in &recent_points {
583 let distance = self.calculate_feature_distance(
584 &data_point.features,
585 &recent_point.data_point.features,
586 )?;
587 total_distance = total_distance + distance;
588 }
589
590 let avg_distance = total_distance / A::from(recent_points.len()).unwrap();
591
592 let normalized_novelty = avg_distance / (avg_distance + A::one());
594 Ok(normalized_novelty)
595 }
596
597 fn calculate_feature_distance(
599 &self,
600 features1: &scirs2_core::ndarray::Array1<A>,
601 features2: &scirs2_core::ndarray::Array1<A>,
602 ) -> Result<A, String> {
603 if features1.len() != features2.len() {
604 return Err("Feature vectors have different lengths".to_string());
605 }
606
607 let mut distance = A::zero();
608 for (f1, f2) in features1.iter().zip(features2.iter()) {
609 let diff = *f1 - *f2;
610 distance = distance + diff * diff;
611 }
612
613 Ok(distance.sqrt())
614 }
615
616 fn calculate_freshness_score(&self, data_point: &StreamingDataPoint<A>) -> A {
618 let age_seconds = data_point.timestamp.elapsed().as_secs_f64();
619 let max_age = 3600.0; let freshness = (max_age - age_seconds.min(max_age)) / max_age;
622 A::from(freshness.max(0.0)).unwrap()
623 }
624
625 fn calculate_relevance_score(&self, _data_point: &StreamingDataPoint<A>) -> Result<A, String> {
627 Ok(A::from(0.7).unwrap()) }
631
632 pub fn get_batch_for_processing(&mut self) -> Result<Vec<StreamingDataPoint<A>>, String> {
634 let batch_size = self.calculate_optimal_batch_size()?;
635 let mut processing_batch = Vec::with_capacity(batch_size);
636
637 while processing_batch.len() < batch_size && !self.buffer.is_empty() {
639 if let Some(prioritized_point) = self.buffer.pop() {
640 processing_batch.push(prioritized_point.data_point);
641 }
642 }
643
644 while processing_batch.len() < batch_size && !self.secondary_buffer.is_empty() {
646 if let Some(data_point) = self.secondary_buffer.pop_front() {
647 processing_batch.push(data_point);
648 }
649 }
650
651 self.last_processing = Instant::now();
653
654 self.update_throughput_stats(processing_batch.len())?;
656
657 Ok(processing_batch)
658 }
659
660 fn calculate_optimal_batch_size(&self) -> Result<usize, String> {
662 let mut batch_size = self.config.initial_size.min(32); let buffer_utilization =
666 self.current_size() as f64 / self.sizing_strategy.target_size as f64;
667 if buffer_utilization > 0.8 {
668 batch_size = (batch_size as f64 * 1.5) as usize; } else if buffer_utilization < 0.3 {
670 batch_size = (batch_size as f64 * 0.7) as usize; }
672
673 if self.statistics.avg_processing_latency > Duration::from_millis(500) {
675 batch_size = (batch_size as f64 * 0.8) as usize; }
677
678 Ok(batch_size.max(1).min(self.current_size().min(100)))
680 }
681
682 fn update_quality_metrics(&mut self) -> Result<(), String> {
684 if self.buffer.is_empty() && self.secondary_buffer.is_empty() {
685 return Ok(());
686 }
687
688 let mut quality_sum = A::zero();
689 let mut quality_values = Vec::new();
690
691 for prioritized_point in &self.buffer {
693 let quality = prioritized_point.data_point.quality_score;
694 quality_sum = quality_sum + quality;
695 quality_values.push(quality);
696 }
697
698 for data_point in &self.secondary_buffer {
700 let quality = data_point.quality_score;
701 quality_sum = quality_sum + quality;
702 quality_values.push(quality);
703 }
704
705 if !quality_values.is_empty() {
706 let count = A::from(quality_values.len()).unwrap();
707 self.quality_metrics.average_quality = quality_sum / count;
708
709 self.quality_metrics.min_quality =
711 quality_values.iter().cloned().fold(A::one(), A::min);
712 self.quality_metrics.max_quality =
713 quality_values.iter().cloned().fold(A::zero(), A::max);
714
715 let mean = self.quality_metrics.average_quality;
717 let variance_sum = quality_values
718 .iter()
719 .map(|&q| (q - mean) * (q - mean))
720 .sum::<A>();
721 self.quality_metrics.quality_variance = variance_sum / count;
722
723 self.update_quality_trend(self.quality_metrics.average_quality)?;
725 }
726
727 Ok(())
728 }
729
730 fn update_quality_trend(&mut self, current_quality: A) -> Result<(), String> {
732 let trend = &mut self.quality_metrics.quality_trend;
733
734 if trend.recent_changes.len() >= 50 {
736 trend.recent_changes.pop_front();
737 }
738 trend.recent_changes.push_back(current_quality);
739
740 if trend.recent_changes.len() >= 10 {
742 let recent: Vec<A> = trend.recent_changes.iter().cloned().collect();
743 let first_half_avg = recent.iter().take(recent.len() / 2).cloned().sum::<A>()
744 / A::from(recent.len() / 2).unwrap();
745 let second_half_avg = recent.iter().skip(recent.len() / 2).cloned().sum::<A>()
746 / A::from(recent.len() - recent.len() / 2).unwrap();
747
748 let change = second_half_avg - first_half_avg;
749 let change_threshold = A::from(0.05).unwrap(); trend.trend_direction = if change > change_threshold {
752 TrendDirection::Improving
753 } else if change < -change_threshold {
754 TrendDirection::Degrading
755 } else {
756 TrendDirection::Stable
757 };
758
759 trend.trend_magnitude = change.abs();
760 trend.confidence = A::from(0.8).unwrap(); }
762
763 Ok(())
764 }
765
766 fn check_buffer_resizing(&mut self) -> Result<(), String> {
768 if !self.config.enable_adaptive_sizing {
769 return Ok(());
770 }
771
772 let current_size = self.current_size();
773 let target_size = self.sizing_strategy.target_size;
774 let utilization = current_size as f64 / target_size as f64;
775
776 let should_resize = if utilization > 0.9 {
778 Some(SizingReason::ThroughputOptimization)
780 } else if utilization < 0.3 && target_size > self.config.min_size {
781 Some(SizingReason::MemoryPressure)
783 } else {
784 None
785 };
786
787 if let Some(reason) = should_resize {
788 self.resize_buffer(reason)?;
789 }
790
791 Ok(())
792 }
793
794 fn resize_buffer(&mut self, reason: SizingReason) -> Result<(), String> {
796 let old_size = self.sizing_strategy.target_size;
797 let new_size = match reason {
798 SizingReason::ThroughputOptimization => {
799 let growth_factor = 1.0
801 + self
802 .sizing_strategy
803 .adjustment_params
804 .growth_rate
805 .to_f64()
806 .unwrap_or(0.2);
807 ((old_size as f64) * growth_factor) as usize
808 }
809 SizingReason::MemoryPressure => {
810 let shrink_factor = 1.0
812 - self
813 .sizing_strategy
814 .adjustment_params
815 .shrinkage_rate
816 .to_f64()
817 .unwrap_or(0.2);
818 ((old_size as f64) * shrink_factor) as usize
819 }
820 _ => old_size, };
822
823 let bounded_size = new_size.max(self.config.min_size).min(self.config.max_size);
825
826 if bounded_size != old_size {
827 self.sizing_strategy.target_size = bounded_size;
828
829 let change_event = SizeChangeEvent {
831 timestamp: Instant::now(),
832 old_size,
833 new_size: bounded_size,
834 change_magnitude: bounded_size as i32 - old_size as i32,
835 reason: format!("{:?}", reason),
836 };
837
838 if self.size_change_log.len() >= 100 {
839 self.size_change_log.pop_front();
840 }
841 self.size_change_log.push_back(change_event);
842 }
843
844 Ok(())
845 }
846
847 fn apply_retention_policy(&mut self) -> Result<(), String> {
849 let target_size = self.sizing_strategy.target_size;
850 let current_size = self.current_size();
851
852 if current_size <= target_size {
853 return Ok(());
854 }
855
856 let items_to_remove = current_size - target_size;
857 let mut removed_count = 0;
858
859 while removed_count < items_to_remove && !self.secondary_buffer.is_empty() {
861 if self.should_remove_from_secondary()? {
862 self.secondary_buffer.pop_front();
863 removed_count += 1;
864 self.statistics.total_items_discarded += 1;
865 } else {
866 break;
867 }
868 }
869
870 let mut temp_buffer = Vec::new();
872 while let Some(item) = self.buffer.pop() {
873 temp_buffer.push(item);
874 }
875
876 temp_buffer.sort_by(|a, b| {
878 let score_a = self
879 .calculate_retention_score(&a.data_point)
880 .unwrap_or(A::zero());
881 let score_b = self
882 .calculate_retention_score(&b.data_point)
883 .unwrap_or(A::zero());
884 score_b.partial_cmp(&score_a).unwrap_or(Ordering::Equal)
885 });
886
887 let items_to_keep = (temp_buffer.len()).saturating_sub(items_to_remove - removed_count);
889 for item in temp_buffer.into_iter().take(items_to_keep) {
890 self.buffer.push(item);
891 }
892
893 Ok(())
894 }
895
896 fn should_remove_from_secondary(&self) -> Result<bool, String> {
898 if let Some(oldest) = self.secondary_buffer.front() {
900 let age = oldest.timestamp.elapsed();
901 Ok(age > Duration::from_secs(3600)) } else {
903 Ok(false)
904 }
905 }
906
907 fn calculate_retention_score(&self, data_point: &StreamingDataPoint<A>) -> Result<A, String> {
909 let age_score = self.calculate_age_score(data_point);
910 let quality_score = data_point.quality_score;
911 let freshness_score = self.calculate_freshness_score(data_point);
912
913 let retention_score = quality_score * A::from(0.5).unwrap()
915 + freshness_score * A::from(0.3).unwrap()
916 + age_score * A::from(0.2).unwrap();
917
918 Ok(retention_score)
919 }
920
921 fn calculate_age_score(&self, data_point: &StreamingDataPoint<A>) -> A {
923 let age_seconds = data_point.timestamp.elapsed().as_secs_f64();
924 let max_age = 7200.0; let age_score = (max_age - age_seconds.min(max_age)) / max_age;
927 A::from(age_score.max(0.0)).unwrap()
928 }
929
930 fn update_throughput_stats(&mut self, items_processed: usize) -> Result<(), String> {
932 let time_since_last = self.last_processing.elapsed().as_secs_f64();
933 if time_since_last > 0.0 {
934 let current_throughput = items_processed as f64 / time_since_last;
935 let throughput_value = A::from(current_throughput).unwrap();
936
937 self.statistics.throughput_stats.current_throughput = throughput_value;
938
939 let alpha = A::from(0.1).unwrap(); self.statistics.throughput_stats.avg_throughput = alpha * throughput_value
942 + (A::one() - alpha) * self.statistics.throughput_stats.avg_throughput;
943
944 self.statistics.throughput_stats.peak_throughput = self
946 .statistics
947 .throughput_stats
948 .peak_throughput
949 .max(throughput_value);
950 }
951
952 Ok(())
953 }
954
955 pub fn current_size(&self) -> usize {
957 self.buffer.len() + self.secondary_buffer.len()
958 }
959
960 pub fn time_since_last_processing(&self) -> Duration {
962 self.last_processing.elapsed()
963 }
964
965 pub fn get_quality_metrics(&self) -> BufferQualityMetrics<A> {
967 self.quality_metrics.clone()
968 }
969
970 pub fn compute_size_adaptation(
972 &self,
973 performance_tracker: &PerformanceTracker<A>,
974 ) -> Result<Option<Adaptation<A>>, String> {
975 let recent_performance = performance_tracker.get_recent_performance(10);
977 if recent_performance.is_empty() {
978 return Ok(None);
979 }
980
981 let avg_processing_time = recent_performance
983 .iter()
984 .map(|p| p.timestamp.elapsed().as_millis() as f64)
985 .sum::<f64>()
986 / recent_performance.len() as f64;
987
988 if avg_processing_time > 1000.0 {
990 let adaptation = Adaptation {
992 adaptation_type: AdaptationType::BufferSize,
993 magnitude: A::from(-0.2).unwrap(), target_component: "adaptive_buffer".to_string(),
995 parameters: std::collections::HashMap::new(),
996 priority: AdaptationPriority::Normal,
997 timestamp: Instant::now(),
998 };
999 return Ok(Some(adaptation));
1000 }
1001
1002 let avg_utilization = self.current_size() as f64 / self.sizing_strategy.target_size as f64;
1004 if avg_processing_time < 100.0 && avg_utilization < 0.3 {
1005 let adaptation = Adaptation {
1006 adaptation_type: AdaptationType::BufferSize,
1007 magnitude: A::from(0.3).unwrap(), target_component: "adaptive_buffer".to_string(),
1009 parameters: std::collections::HashMap::new(),
1010 priority: AdaptationPriority::Low,
1011 timestamp: Instant::now(),
1012 };
1013 return Ok(Some(adaptation));
1014 }
1015
1016 Ok(None)
1017 }
1018
1019 pub fn apply_size_adaptation(&mut self, adaptation: &Adaptation<A>) -> Result<(), String> {
1021 if adaptation.adaptation_type == AdaptationType::BufferSize {
1022 let current_target = self.sizing_strategy.target_size;
1023 let change_factor = A::one() + adaptation.magnitude;
1024 let new_target =
1025 (current_target as f64 * change_factor.to_f64().unwrap_or(1.0)) as usize;
1026
1027 let bounded_target = new_target
1029 .max(self.config.min_size)
1030 .min(self.config.max_size);
1031
1032 if bounded_target != current_target {
1033 self.sizing_strategy.target_size = bounded_target;
1034
1035 let change_event = SizeChangeEvent {
1037 timestamp: Instant::now(),
1038 old_size: current_target,
1039 new_size: bounded_target,
1040 change_magnitude: bounded_target as i32 - current_target as i32,
1041 reason: "adaptation".to_string(),
1042 };
1043
1044 if self.size_change_log.len() >= 100 {
1045 self.size_change_log.pop_front();
1046 }
1047 self.size_change_log.push_back(change_event);
1048 }
1049 }
1050
1051 Ok(())
1052 }
1053
1054 pub fn last_size_change(&self) -> f32 {
1056 if let Some(last_change) = self.size_change_log.back() {
1057 last_change.change_magnitude as f32
1058 } else {
1059 0.0
1060 }
1061 }
1062
1063 pub fn reset(&mut self) -> Result<(), String> {
1065 self.buffer.clear();
1066 self.secondary_buffer.clear();
1067
1068 self.quality_metrics = BufferQualityMetrics {
1069 average_quality: A::zero(),
1070 quality_variance: A::zero(),
1071 min_quality: A::one(),
1072 max_quality: A::zero(),
1073 freshness_distribution: Vec::new(),
1074 priority_distribution: Vec::new(),
1075 quality_trend: QualityTrend {
1076 recent_changes: VecDeque::with_capacity(50),
1077 trend_direction: TrendDirection::Stable,
1078 trend_magnitude: A::zero(),
1079 confidence: A::zero(),
1080 },
1081 };
1082
1083 self.statistics.total_items_processed = 0;
1084 self.statistics.total_items_discarded = 0;
1085 self.last_processing = Instant::now();
1086 self.size_change_log.clear();
1087
1088 Ok(())
1089 }
1090
1091 pub fn get_diagnostics(&self) -> BufferDiagnostics {
1093 BufferDiagnostics {
1094 current_size: self.current_size(),
1095 target_size: self.sizing_strategy.target_size,
1096 utilization: self.current_size() as f64 / self.sizing_strategy.target_size as f64,
1097 average_quality: self.quality_metrics.average_quality.to_f64().unwrap_or(0.0),
1098 total_processed: self.statistics.total_items_processed,
1099 total_discarded: self.statistics.total_items_discarded,
1100 size_changes: self.size_change_log.len(),
1101 }
1102 }
1103}
1104
1105impl<A: Float + Send + Sync + Send + Sync> Ord for PrioritizedDataPoint<A> {
1107 fn cmp(&self, other: &Self) -> Ordering {
1108 self.priority_score
1109 .partial_cmp(&other.priority_score)
1110 .unwrap_or(Ordering::Equal)
1111 }
1112}
1113
1114impl<A: Float + Send + Sync + Send + Sync> PartialOrd for PrioritizedDataPoint<A> {
1115 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1116 Some(self.cmp(other))
1117 }
1118}
1119
1120impl<A: Float + Send + Sync + Send + Sync> PartialEq for PrioritizedDataPoint<A> {
1121 fn eq(&self, other: &Self) -> bool {
1122 self.priority_score == other.priority_score
1123 }
1124}
1125
1126impl<A: Float + Send + Sync + Send + Sync> Eq for PrioritizedDataPoint<A> {}
1127
1128impl<A: Float + Send + Sync + Send + Sync> BufferSizingStrategy<A> {
1129 fn new(strategy_type: BufferSizeStrategy, initial_size: usize) -> Self {
1130 Self {
1131 strategy_type,
1132 target_size: initial_size,
1133 adjustment_params: SizeAdjustmentParams {
1134 growth_rate: A::from(0.2).unwrap(),
1135 shrinkage_rate: A::from(0.15).unwrap(),
1136 stability_threshold: A::from(0.05).unwrap(),
1137 performance_sensitivity: A::from(0.1).unwrap(),
1138 quality_sensitivity: A::from(0.1).unwrap(),
1139 memory_sensitivity: A::from(0.2).unwrap(),
1140 },
1141 performance_feedback: VecDeque::with_capacity(100),
1142 sizing_history: VecDeque::with_capacity(100),
1143 }
1144 }
1145}
1146
1147impl<A: Float + Send + Sync + Send + Sync> DataRetentionPolicy<A> {
1148 fn new(strategy: RetentionStrategy) -> Self {
1149 Self {
1150 strategy,
1151 age_policy: AgeBasedRetention {
1152 max_age: Duration::from_secs(7200), soft_age_limit: Duration::from_secs(3600), age_weight: 0.3,
1155 adaptive_limits: true,
1156 },
1157 quality_policy: QualityBasedRetention {
1158 min_quality_threshold: A::from(0.3).unwrap(),
1159 quality_weight: A::from(0.5).unwrap(),
1160 adaptive_thresholds: true,
1161 quality_targets: QualityDistributionTargets {
1162 high_quality_target: A::from(0.3).unwrap(),
1163 medium_quality_target: A::from(0.5).unwrap(),
1164 low_quality_target: A::from(0.2).unwrap(),
1165 high_quality_threshold: A::from(0.8).unwrap(),
1166 medium_quality_threshold: A::from(0.5).unwrap(),
1167 },
1168 },
1169 relevance_policy: RelevanceBasedRetention {
1170 relevance_method: RelevanceMethod::Similarity,
1171 relevance_weight: A::from(0.2).unwrap(),
1172 temporal_decay: true,
1173 decay_rate: A::from(0.1).unwrap(),
1174 },
1175 retention_scorer: RetentionScorer::new(),
1176 }
1177 }
1178}
1179
1180impl<A: Float + Send + Sync + Send + Sync> RetentionScorer<A> {
1181 fn new() -> Self {
1182 Self {
1183 weights: RetentionWeights {
1184 age_weight: A::from(0.2).unwrap(),
1185 quality_weight: A::from(0.3).unwrap(),
1186 relevance_weight: A::from(0.2).unwrap(),
1187 priority_weight: A::from(0.15).unwrap(),
1188 freshness_weight: A::from(0.1).unwrap(),
1189 diversity_weight: A::from(0.05).unwrap(),
1190 },
1191 scoring_history: VecDeque::with_capacity(1000),
1192 performance_feedback: VecDeque::with_capacity(100),
1193 }
1194 }
1195}
1196
1197#[derive(Debug, Clone)]
1199pub struct BufferDiagnostics {
1200 pub current_size: usize,
1201 pub target_size: usize,
1202 pub utilization: f64,
1203 pub average_quality: f64,
1204 pub total_processed: u64,
1205 pub total_discarded: u64,
1206 pub size_changes: usize,
1207}