optirs_core/streaming/adaptive_streaming/
buffering.rs

1// Adaptive buffering strategies for streaming optimization
2//
3// This module provides sophisticated buffer management including adaptive sizing,
4// quality-based filtering, priority queuing, and intelligent data retention
5// strategies for streaming optimization scenarios.
6
7use super::config::*;
8use super::optimizer::{Adaptation, AdaptationPriority, AdaptationType, StreamingDataPoint};
9use super::performance::{PerformanceSnapshot, PerformanceTracker};
10
11use scirs2_core::numeric::Float;
12use serde::{Deserialize, Serialize};
13use std::cmp::Ordering;
14use std::collections::{BinaryHeap, HashMap, VecDeque};
15use std::time::{Duration, Instant};
16
17/// Adaptive buffer for managing streaming data with quality-based retention
18pub struct AdaptiveBuffer<A: Float + Send + Sync> {
19    /// Buffer configuration
20    config: BufferConfig,
21    /// Main data buffer with priority queue
22    buffer: BinaryHeap<PrioritizedDataPoint<A>>,
23    /// Secondary buffer for low-quality data
24    secondary_buffer: VecDeque<StreamingDataPoint<A>>,
25    /// Buffer quality metrics
26    quality_metrics: BufferQualityMetrics<A>,
27    /// Buffer sizing strategy
28    sizing_strategy: BufferSizingStrategy<A>,
29    /// Data retention policy
30    retention_policy: DataRetentionPolicy<A>,
31    /// Buffer statistics
32    statistics: BufferStatistics<A>,
33    /// Last processing timestamp
34    last_processing: Instant,
35    /// Size change tracking
36    size_change_log: VecDeque<SizeChangeEvent>,
37}
38
39/// Data point with priority information for buffering
40#[derive(Debug, Clone)]
41pub struct PrioritizedDataPoint<A: Float + Send + Sync> {
42    /// The actual data point
43    pub data_point: StreamingDataPoint<A>,
44    /// Priority score (higher = more important)
45    pub priority_score: A,
46    /// Buffer insertion timestamp
47    pub buffer_timestamp: Instant,
48    /// Expected processing time
49    pub expected_processing_time: Duration,
50    /// Data freshness score
51    pub freshness_score: A,
52    /// Relevance score for current model
53    pub relevance_score: A,
54}
55
56/// Buffer quality metrics for adaptive management
57#[derive(Debug, Clone)]
58pub struct BufferQualityMetrics<A: Float + Send + Sync> {
59    /// Average quality score of buffered data
60    pub average_quality: A,
61    /// Quality variance
62    pub quality_variance: A,
63    /// Minimum quality in buffer
64    pub min_quality: A,
65    /// Maximum quality in buffer
66    pub max_quality: A,
67    /// Data freshness distribution
68    pub freshness_distribution: Vec<A>,
69    /// Priority distribution
70    pub priority_distribution: Vec<A>,
71    /// Quality trend over time
72    pub quality_trend: QualityTrend<A>,
73}
74
75/// Quality trend analysis
76#[derive(Debug, Clone)]
77pub struct QualityTrend<A: Float + Send + Sync> {
78    /// Recent quality changes
79    pub recent_changes: VecDeque<A>,
80    /// Trend direction
81    pub trend_direction: TrendDirection,
82    /// Trend magnitude
83    pub trend_magnitude: A,
84    /// Trend confidence
85    pub confidence: A,
86}
87
88/// Trend direction for quality analysis
89#[derive(Debug, Clone, PartialEq, Eq)]
90pub enum TrendDirection {
91    /// Quality improving
92    Improving,
93    /// Quality degrading
94    Degrading,
95    /// Quality stable
96    Stable,
97    /// Quality oscillating
98    Oscillating,
99}
100
101/// Buffer sizing strategy implementation
102pub struct BufferSizingStrategy<A: Float + Send + Sync> {
103    /// Current strategy type
104    strategy_type: BufferSizeStrategy,
105    /// Target size
106    target_size: usize,
107    /// Size adjustment parameters
108    adjustment_params: SizeAdjustmentParams<A>,
109    /// Performance feedback
110    performance_feedback: VecDeque<SizingPerformanceFeedback<A>>,
111    /// Sizing history
112    sizing_history: VecDeque<SizingEvent>,
113}
114
115/// Parameters for size adjustment
116#[derive(Debug, Clone)]
117pub struct SizeAdjustmentParams<A: Float + Send + Sync> {
118    /// Growth rate for increasing buffer size
119    pub growth_rate: A,
120    /// Shrinkage rate for decreasing buffer size
121    pub shrinkage_rate: A,
122    /// Stability threshold (minimum change for adjustment)
123    pub stability_threshold: A,
124    /// Performance sensitivity
125    pub performance_sensitivity: A,
126    /// Quality sensitivity
127    pub quality_sensitivity: A,
128    /// Memory pressure sensitivity
129    pub memory_sensitivity: A,
130}
131
132/// Performance feedback for buffer sizing
133#[derive(Debug, Clone)]
134pub struct SizingPerformanceFeedback<A: Float + Send + Sync> {
135    /// Buffer size when feedback was recorded
136    pub buffer_size: usize,
137    /// Processing latency
138    pub processing_latency: Duration,
139    /// Throughput (items per second)
140    pub throughput: A,
141    /// Quality score achieved
142    pub quality_score: A,
143    /// Memory usage
144    pub memory_usage: usize,
145    /// Timestamp of feedback
146    pub timestamp: Instant,
147}
148
149/// Buffer sizing event
150#[derive(Debug, Clone)]
151pub struct SizingEvent {
152    /// Event timestamp
153    pub timestamp: Instant,
154    /// Old buffer size
155    pub old_size: usize,
156    /// New buffer size
157    pub new_size: usize,
158    /// Reason for size change
159    pub reason: SizingReason,
160    /// Performance impact
161    pub performance_impact: Option<f64>,
162}
163
164/// Reasons for buffer size changes
165#[derive(Debug, Clone)]
166pub enum SizingReason {
167    /// Performance optimization
168    PerformanceOptimization,
169    /// Quality improvement
170    QualityImprovement,
171    /// Memory pressure
172    MemoryPressure,
173    /// Latency requirements
174    LatencyRequirement,
175    /// Throughput optimization
176    ThroughputOptimization,
177    /// Manual adjustment
178    Manual,
179    /// Configuration change
180    Configuration,
181}
182
183/// Data retention policy for buffer management
184pub struct DataRetentionPolicy<A: Float + Send + Sync> {
185    /// Retention strategy
186    strategy: RetentionStrategy,
187    /// Age-based retention parameters
188    age_policy: AgeBasedRetention,
189    /// Quality-based retention parameters
190    quality_policy: QualityBasedRetention<A>,
191    /// Relevance-based retention parameters
192    relevance_policy: RelevanceBasedRetention<A>,
193    /// Combined retention scoring
194    retention_scorer: RetentionScorer<A>,
195}
196
197/// Data retention strategies
198#[derive(Debug, Clone)]
199pub enum RetentionStrategy {
200    /// First In, First Out
201    FIFO,
202    /// Last In, First Out
203    LIFO,
204    /// Least Recently Used
205    LRU,
206    /// Priority-based retention
207    Priority,
208    /// Quality-based retention
209    Quality,
210    /// Age-based retention
211    Age,
212    /// Hybrid retention combining multiple factors
213    Hybrid,
214    /// Adaptive retention based on performance
215    Adaptive,
216}
217
218/// Age-based retention configuration
219#[derive(Debug, Clone)]
220pub struct AgeBasedRetention {
221    /// Maximum age for data retention
222    pub max_age: Duration,
223    /// Soft age limit (start considering for removal)
224    pub soft_age_limit: Duration,
225    /// Age weight in retention scoring
226    pub age_weight: f64,
227    /// Enable adaptive age limits
228    pub adaptive_limits: bool,
229}
230
231/// Quality-based retention configuration
232#[derive(Debug, Clone)]
233pub struct QualityBasedRetention<A: Float + Send + Sync> {
234    /// Minimum quality threshold
235    pub min_quality_threshold: A,
236    /// Quality weight in retention scoring
237    pub quality_weight: A,
238    /// Enable adaptive quality thresholds
239    pub adaptive_thresholds: bool,
240    /// Quality distribution targets
241    pub quality_targets: QualityDistributionTargets<A>,
242}
243
244/// Target quality distribution for buffer content
245#[derive(Debug, Clone)]
246pub struct QualityDistributionTargets<A: Float + Send + Sync> {
247    /// Target percentage of high-quality data
248    pub high_quality_target: A,
249    /// Target percentage of medium-quality data
250    pub medium_quality_target: A,
251    /// Target percentage of low-quality data
252    pub low_quality_target: A,
253    /// Quality boundaries
254    pub high_quality_threshold: A,
255    pub medium_quality_threshold: A,
256}
257
258/// Relevance-based retention configuration
259#[derive(Debug, Clone)]
260pub struct RelevanceBasedRetention<A: Float + Send + Sync> {
261    /// Relevance calculation method
262    pub relevance_method: RelevanceMethod,
263    /// Relevance weight in retention scoring
264    pub relevance_weight: A,
265    /// Enable temporal relevance decay
266    pub temporal_decay: bool,
267    /// Relevance decay rate
268    pub decay_rate: A,
269}
270
271/// Methods for calculating data relevance
272#[derive(Debug, Clone)]
273pub enum RelevanceMethod {
274    /// Distance-based relevance
275    Distance,
276    /// Similarity-based relevance
277    Similarity,
278    /// Feature importance-based relevance
279    FeatureImportance,
280    /// Model uncertainty-based relevance
281    Uncertainty,
282    /// Diversity-based relevance
283    Diversity,
284    /// Custom relevance function
285    Custom(String),
286}
287
288/// Retention scoring system
289pub struct RetentionScorer<A: Float + Send + Sync> {
290    /// Scoring weights
291    weights: RetentionWeights<A>,
292    /// Scoring history for adaptation
293    scoring_history: VecDeque<RetentionScore<A>>,
294    /// Performance feedback
295    performance_feedback: VecDeque<RetentionPerformanceFeedback<A>>,
296}
297
298/// Weights for different retention factors
299#[derive(Debug, Clone)]
300pub struct RetentionWeights<A: Float + Send + Sync> {
301    /// Age weight
302    pub age_weight: A,
303    /// Quality weight
304    pub quality_weight: A,
305    /// Relevance weight
306    pub relevance_weight: A,
307    /// Priority weight
308    pub priority_weight: A,
309    /// Freshness weight
310    pub freshness_weight: A,
311    /// Diversity weight
312    pub diversity_weight: A,
313}
314
315/// Retention score for a data point
316#[derive(Debug, Clone)]
317pub struct RetentionScore<A: Float + Send + Sync> {
318    /// Overall retention score
319    pub overall_score: A,
320    /// Individual component scores
321    pub component_scores: HashMap<String, A>,
322    /// Retention decision
323    pub should_retain: bool,
324    /// Confidence in decision
325    pub confidence: A,
326    /// Scoring timestamp
327    pub timestamp: Instant,
328}
329
330/// Performance feedback for retention decisions
331#[derive(Debug, Clone)]
332pub struct RetentionPerformanceFeedback<A: Float + Send + Sync> {
333    /// Number of items retained
334    pub items_retained: usize,
335    /// Number of items discarded
336    pub items_discarded: usize,
337    /// Quality of retained items
338    pub retained_quality: A,
339    /// Quality of discarded items
340    pub discarded_quality: A,
341    /// Performance impact
342    pub performance_impact: A,
343    /// Feedback timestamp
344    pub timestamp: Instant,
345}
346
347/// Buffer statistics for monitoring and optimization
348#[derive(Debug, Clone)]
349pub struct BufferStatistics<A: Float + Send + Sync> {
350    /// Total items processed
351    pub total_items_processed: u64,
352    /// Total items discarded
353    pub total_items_discarded: u64,
354    /// Average buffer utilization
355    pub avg_buffer_utilization: A,
356    /// Peak buffer utilization
357    pub peak_buffer_utilization: A,
358    /// Average processing latency
359    pub avg_processing_latency: Duration,
360    /// Throughput statistics
361    pub throughput_stats: ThroughputStatistics<A>,
362    /// Quality statistics
363    pub quality_stats: QualityStatistics<A>,
364    /// Memory usage statistics
365    pub memory_stats: MemoryStatistics,
366}
367
368/// Throughput statistics
369#[derive(Debug, Clone)]
370pub struct ThroughputStatistics<A: Float + Send + Sync> {
371    /// Current throughput (items per second)
372    pub current_throughput: A,
373    /// Average throughput
374    pub avg_throughput: A,
375    /// Peak throughput
376    pub peak_throughput: A,
377    /// Throughput trend
378    pub throughput_trend: TrendDirection,
379    /// Throughput stability
380    pub stability: A,
381}
382
383/// Quality statistics for buffer content
384#[derive(Debug, Clone)]
385pub struct QualityStatistics<A: Float + Send + Sync> {
386    /// Current average quality
387    pub current_avg_quality: A,
388    /// Historical average quality
389    pub historical_avg_quality: A,
390    /// Quality improvement rate
391    pub quality_improvement_rate: A,
392    /// Quality distribution
393    pub quality_distribution: HashMap<String, A>,
394    /// Quality prediction
395    pub predicted_quality: Option<A>,
396}
397
398/// Memory usage statistics
399#[derive(Debug, Clone)]
400pub struct MemoryStatistics {
401    /// Current memory usage in bytes
402    pub current_usage_bytes: usize,
403    /// Peak memory usage in bytes
404    pub peak_usage_bytes: usize,
405    /// Average memory usage in bytes
406    pub avg_usage_bytes: usize,
407    /// Memory efficiency (useful data / total memory)
408    pub memory_efficiency: f64,
409    /// Memory fragmentation
410    pub fragmentation: f64,
411}
412
413/// Size change tracking event
414#[derive(Debug, Clone)]
415pub struct SizeChangeEvent {
416    /// Change timestamp
417    pub timestamp: Instant,
418    /// Size before change
419    pub old_size: usize,
420    /// Size after change
421    pub new_size: usize,
422    /// Change magnitude
423    pub change_magnitude: i32,
424    /// Reason for change
425    pub reason: String,
426}
427
428impl<A: Float + Default + Clone + Send + Sync + std::iter::Sum + std::fmt::Debug>
429    AdaptiveBuffer<A>
430{
431    /// Creates a new adaptive buffer
432    pub fn new(config: &StreamingConfig) -> Result<Self, String> {
433        let buffer_config = config.buffer_config.clone();
434
435        let quality_metrics = BufferQualityMetrics {
436            average_quality: A::zero(),
437            quality_variance: A::zero(),
438            min_quality: A::one(),
439            max_quality: A::zero(),
440            freshness_distribution: Vec::new(),
441            priority_distribution: Vec::new(),
442            quality_trend: QualityTrend {
443                recent_changes: VecDeque::with_capacity(50),
444                trend_direction: TrendDirection::Stable,
445                trend_magnitude: A::zero(),
446                confidence: A::zero(),
447            },
448        };
449
450        let sizing_strategy = BufferSizingStrategy::new(
451            buffer_config.size_strategy.clone(),
452            buffer_config.initial_size,
453        );
454
455        let retention_policy = DataRetentionPolicy::new(RetentionStrategy::Hybrid);
456
457        let statistics = BufferStatistics {
458            total_items_processed: 0,
459            total_items_discarded: 0,
460            avg_buffer_utilization: A::zero(),
461            peak_buffer_utilization: A::zero(),
462            avg_processing_latency: Duration::ZERO,
463            throughput_stats: ThroughputStatistics {
464                current_throughput: A::zero(),
465                avg_throughput: A::zero(),
466                peak_throughput: A::zero(),
467                throughput_trend: TrendDirection::Stable,
468                stability: A::zero(),
469            },
470            quality_stats: QualityStatistics {
471                current_avg_quality: A::zero(),
472                historical_avg_quality: A::zero(),
473                quality_improvement_rate: A::zero(),
474                quality_distribution: HashMap::new(),
475                predicted_quality: None,
476            },
477            memory_stats: MemoryStatistics {
478                current_usage_bytes: 0,
479                peak_usage_bytes: 0,
480                avg_usage_bytes: 0,
481                memory_efficiency: 0.0,
482                fragmentation: 0.0,
483            },
484        };
485
486        Ok(Self {
487            config: buffer_config,
488            buffer: BinaryHeap::new(),
489            secondary_buffer: VecDeque::new(),
490            quality_metrics,
491            sizing_strategy,
492            retention_policy,
493            statistics,
494            last_processing: Instant::now(),
495            size_change_log: VecDeque::with_capacity(100),
496        })
497    }
498
499    /// Adds a batch of data points to the buffer
500    pub fn add_batch(&mut self, batch: Vec<StreamingDataPoint<A>>) -> Result<(), String> {
501        for data_point in batch {
502            self.add_single_point(data_point)?;
503        }
504
505        // Update quality metrics after batch addition
506        self.update_quality_metrics()?;
507
508        // Check if buffer needs resizing
509        self.check_buffer_resizing()?;
510
511        // Apply retention policy if buffer is too large
512        if self.current_size() > self.sizing_strategy.target_size {
513            self.apply_retention_policy()?;
514        }
515
516        Ok(())
517    }
518
519    /// Adds a single data point to the buffer
520    fn add_single_point(&mut self, data_point: StreamingDataPoint<A>) -> Result<(), String> {
521        // Calculate priority score for the data point
522        let priority_score = self.calculate_priority_score(&data_point)?;
523
524        // Calculate freshness and relevance scores
525        let freshness_score = self.calculate_freshness_score(&data_point);
526        let relevance_score = self.calculate_relevance_score(&data_point)?;
527
528        let prioritized_point = PrioritizedDataPoint {
529            data_point,
530            priority_score,
531            buffer_timestamp: Instant::now(),
532            expected_processing_time: Duration::from_millis(100), // Estimated
533            freshness_score,
534            relevance_score,
535        };
536
537        // Add to appropriate buffer based on quality
538        if priority_score >= A::from(self.config.quality_threshold).unwrap() {
539            self.buffer.push(prioritized_point);
540        } else {
541            // Add to secondary buffer for potential later processing
542            self.secondary_buffer
543                .push_back(prioritized_point.data_point);
544        }
545
546        // Update statistics
547        self.statistics.total_items_processed += 1;
548
549        Ok(())
550    }
551
552    /// Calculates priority score for a data point
553    fn calculate_priority_score(&self, data_point: &StreamingDataPoint<A>) -> Result<A, String> {
554        let mut score = data_point.quality_score;
555
556        // Adjust score based on recency
557        let age = data_point.timestamp.elapsed().as_secs_f64();
558        let recency_bonus = A::from(1.0 / (1.0 + age / 3600.0)).unwrap(); // Hour-based decay
559        score = score + recency_bonus * A::from(0.1).unwrap();
560
561        // Adjust score based on feature variance (novelty)
562        let novelty_score = self.calculate_novelty_score(data_point)?;
563        score = score + novelty_score * A::from(0.2).unwrap();
564
565        Ok(score)
566    }
567
568    /// Calculates novelty score based on feature variance
569    fn calculate_novelty_score(&self, data_point: &StreamingDataPoint<A>) -> Result<A, String> {
570        // Simple novelty calculation based on distance from recent data
571        if self.buffer.is_empty() {
572            return Ok(A::from(0.5).unwrap()); // Medium novelty for first data
573        }
574
575        // Calculate average distance from recent buffer content
576        let recent_points: Vec<_> = self.buffer.iter().take(10).collect();
577        if recent_points.is_empty() {
578            return Ok(A::from(0.5).unwrap());
579        }
580
581        let mut total_distance = A::zero();
582        for recent_point in &recent_points {
583            let distance = self.calculate_feature_distance(
584                &data_point.features,
585                &recent_point.data_point.features,
586            )?;
587            total_distance = total_distance + distance;
588        }
589
590        let avg_distance = total_distance / A::from(recent_points.len()).unwrap();
591
592        // Normalize to 0-1 range
593        let normalized_novelty = avg_distance / (avg_distance + A::one());
594        Ok(normalized_novelty)
595    }
596
597    /// Calculates distance between feature vectors
598    fn calculate_feature_distance(
599        &self,
600        features1: &scirs2_core::ndarray::Array1<A>,
601        features2: &scirs2_core::ndarray::Array1<A>,
602    ) -> Result<A, String> {
603        if features1.len() != features2.len() {
604            return Err("Feature vectors have different lengths".to_string());
605        }
606
607        let mut distance = A::zero();
608        for (f1, f2) in features1.iter().zip(features2.iter()) {
609            let diff = *f1 - *f2;
610            distance = distance + diff * diff;
611        }
612
613        Ok(distance.sqrt())
614    }
615
616    /// Calculates freshness score based on data age
617    fn calculate_freshness_score(&self, data_point: &StreamingDataPoint<A>) -> A {
618        let age_seconds = data_point.timestamp.elapsed().as_secs_f64();
619        let max_age = 3600.0; // 1 hour maximum age
620
621        let freshness = (max_age - age_seconds.min(max_age)) / max_age;
622        A::from(freshness.max(0.0)).unwrap()
623    }
624
625    /// Calculates relevance score for current model context
626    fn calculate_relevance_score(&self, _data_point: &StreamingDataPoint<A>) -> Result<A, String> {
627        // Simplified relevance calculation
628        // In practice, this would consider current model parameters, recent performance, etc.
629        Ok(A::from(0.7).unwrap()) // Default moderate relevance
630    }
631
632    /// Gets a batch of data for processing
633    pub fn get_batch_for_processing(&mut self) -> Result<Vec<StreamingDataPoint<A>>, String> {
634        let batch_size = self.calculate_optimal_batch_size()?;
635        let mut processing_batch = Vec::with_capacity(batch_size);
636
637        // Extract high-priority items from main buffer
638        while processing_batch.len() < batch_size && !self.buffer.is_empty() {
639            if let Some(prioritized_point) = self.buffer.pop() {
640                processing_batch.push(prioritized_point.data_point);
641            }
642        }
643
644        // Fill remaining space with secondary buffer items if needed
645        while processing_batch.len() < batch_size && !self.secondary_buffer.is_empty() {
646            if let Some(data_point) = self.secondary_buffer.pop_front() {
647                processing_batch.push(data_point);
648            }
649        }
650
651        // Update last processing time
652        self.last_processing = Instant::now();
653
654        // Update throughput statistics
655        self.update_throughput_stats(processing_batch.len())?;
656
657        Ok(processing_batch)
658    }
659
660    /// Calculates optimal batch size based on current conditions
661    fn calculate_optimal_batch_size(&self) -> Result<usize, String> {
662        let mut batch_size = self.config.initial_size.min(32); // Default reasonable batch size
663
664        // Adjust based on buffer fullness
665        let buffer_utilization =
666            self.current_size() as f64 / self.sizing_strategy.target_size as f64;
667        if buffer_utilization > 0.8 {
668            batch_size = (batch_size as f64 * 1.5) as usize; // Larger batches when buffer is full
669        } else if buffer_utilization < 0.3 {
670            batch_size = (batch_size as f64 * 0.7) as usize; // Smaller batches when buffer is sparse
671        }
672
673        // Adjust based on processing latency
674        if self.statistics.avg_processing_latency > Duration::from_millis(500) {
675            batch_size = (batch_size as f64 * 0.8) as usize; // Smaller batches for slow processing
676        }
677
678        // Ensure minimum and maximum bounds
679        Ok(batch_size.max(1).min(self.current_size().min(100)))
680    }
681
682    /// Updates quality metrics for the buffer
683    fn update_quality_metrics(&mut self) -> Result<(), String> {
684        if self.buffer.is_empty() && self.secondary_buffer.is_empty() {
685            return Ok(());
686        }
687
688        let mut quality_sum = A::zero();
689        let mut quality_values = Vec::new();
690
691        // Collect quality scores from main buffer
692        for prioritized_point in &self.buffer {
693            let quality = prioritized_point.data_point.quality_score;
694            quality_sum = quality_sum + quality;
695            quality_values.push(quality);
696        }
697
698        // Collect quality scores from secondary buffer
699        for data_point in &self.secondary_buffer {
700            let quality = data_point.quality_score;
701            quality_sum = quality_sum + quality;
702            quality_values.push(quality);
703        }
704
705        if !quality_values.is_empty() {
706            let count = A::from(quality_values.len()).unwrap();
707            self.quality_metrics.average_quality = quality_sum / count;
708
709            // Update min/max quality
710            self.quality_metrics.min_quality =
711                quality_values.iter().cloned().fold(A::one(), A::min);
712            self.quality_metrics.max_quality =
713                quality_values.iter().cloned().fold(A::zero(), A::max);
714
715            // Calculate quality variance
716            let mean = self.quality_metrics.average_quality;
717            let variance_sum = quality_values
718                .iter()
719                .map(|&q| (q - mean) * (q - mean))
720                .sum::<A>();
721            self.quality_metrics.quality_variance = variance_sum / count;
722
723            // Update quality trend
724            self.update_quality_trend(self.quality_metrics.average_quality)?;
725        }
726
727        Ok(())
728    }
729
730    /// Updates quality trend analysis
731    fn update_quality_trend(&mut self, current_quality: A) -> Result<(), String> {
732        let trend = &mut self.quality_metrics.quality_trend;
733
734        // Add current quality to recent changes
735        if trend.recent_changes.len() >= 50 {
736            trend.recent_changes.pop_front();
737        }
738        trend.recent_changes.push_back(current_quality);
739
740        // Analyze trend if we have enough data
741        if trend.recent_changes.len() >= 10 {
742            let recent: Vec<A> = trend.recent_changes.iter().cloned().collect();
743            let first_half_avg = recent.iter().take(recent.len() / 2).cloned().sum::<A>()
744                / A::from(recent.len() / 2).unwrap();
745            let second_half_avg = recent.iter().skip(recent.len() / 2).cloned().sum::<A>()
746                / A::from(recent.len() - recent.len() / 2).unwrap();
747
748            let change = second_half_avg - first_half_avg;
749            let change_threshold = A::from(0.05).unwrap(); // 5% change threshold
750
751            trend.trend_direction = if change > change_threshold {
752                TrendDirection::Improving
753            } else if change < -change_threshold {
754                TrendDirection::Degrading
755            } else {
756                TrendDirection::Stable
757            };
758
759            trend.trend_magnitude = change.abs();
760            trend.confidence = A::from(0.8).unwrap(); // Simplified confidence
761        }
762
763        Ok(())
764    }
765
766    /// Checks if buffer needs resizing
767    fn check_buffer_resizing(&mut self) -> Result<(), String> {
768        if !self.config.enable_adaptive_sizing {
769            return Ok(());
770        }
771
772        let current_size = self.current_size();
773        let target_size = self.sizing_strategy.target_size;
774        let utilization = current_size as f64 / target_size as f64;
775
776        // Check if resize is needed
777        let should_resize = if utilization > 0.9 {
778            // Buffer is nearly full - consider growing
779            Some(SizingReason::ThroughputOptimization)
780        } else if utilization < 0.3 && target_size > self.config.min_size {
781            // Buffer is underutilized - consider shrinking
782            Some(SizingReason::MemoryPressure)
783        } else {
784            None
785        };
786
787        if let Some(reason) = should_resize {
788            self.resize_buffer(reason)?;
789        }
790
791        Ok(())
792    }
793
794    /// Resizes the buffer based on current conditions
795    fn resize_buffer(&mut self, reason: SizingReason) -> Result<(), String> {
796        let old_size = self.sizing_strategy.target_size;
797        let new_size = match reason {
798            SizingReason::ThroughputOptimization => {
799                // Grow buffer
800                let growth_factor = 1.0
801                    + self
802                        .sizing_strategy
803                        .adjustment_params
804                        .growth_rate
805                        .to_f64()
806                        .unwrap_or(0.2);
807                ((old_size as f64) * growth_factor) as usize
808            }
809            SizingReason::MemoryPressure => {
810                // Shrink buffer
811                let shrink_factor = 1.0
812                    - self
813                        .sizing_strategy
814                        .adjustment_params
815                        .shrinkage_rate
816                        .to_f64()
817                        .unwrap_or(0.2);
818                ((old_size as f64) * shrink_factor) as usize
819            }
820            _ => old_size, // No change for other reasons
821        };
822
823        // Apply size bounds
824        let bounded_size = new_size.max(self.config.min_size).min(self.config.max_size);
825
826        if bounded_size != old_size {
827            self.sizing_strategy.target_size = bounded_size;
828
829            // Log the size change
830            let change_event = SizeChangeEvent {
831                timestamp: Instant::now(),
832                old_size,
833                new_size: bounded_size,
834                change_magnitude: bounded_size as i32 - old_size as i32,
835                reason: format!("{:?}", reason),
836            };
837
838            if self.size_change_log.len() >= 100 {
839                self.size_change_log.pop_front();
840            }
841            self.size_change_log.push_back(change_event);
842        }
843
844        Ok(())
845    }
846
847    /// Applies retention policy to manage buffer size
848    fn apply_retention_policy(&mut self) -> Result<(), String> {
849        let target_size = self.sizing_strategy.target_size;
850        let current_size = self.current_size();
851
852        if current_size <= target_size {
853            return Ok(());
854        }
855
856        let items_to_remove = current_size - target_size;
857        let mut removed_count = 0;
858
859        // Apply retention policy to secondary buffer first
860        while removed_count < items_to_remove && !self.secondary_buffer.is_empty() {
861            if self.should_remove_from_secondary()? {
862                self.secondary_buffer.pop_front();
863                removed_count += 1;
864                self.statistics.total_items_discarded += 1;
865            } else {
866                break;
867            }
868        }
869
870        // If still need to remove items, apply to main buffer
871        let mut temp_buffer = Vec::new();
872        while let Some(item) = self.buffer.pop() {
873            temp_buffer.push(item);
874        }
875
876        // Sort by retention score and keep the best items
877        temp_buffer.sort_by(|a, b| {
878            let score_a = self
879                .calculate_retention_score(&a.data_point)
880                .unwrap_or(A::zero());
881            let score_b = self
882                .calculate_retention_score(&b.data_point)
883                .unwrap_or(A::zero());
884            score_b.partial_cmp(&score_a).unwrap_or(Ordering::Equal)
885        });
886
887        // Keep only the target number of items
888        let items_to_keep = (temp_buffer.len()).saturating_sub(items_to_remove - removed_count);
889        for item in temp_buffer.into_iter().take(items_to_keep) {
890            self.buffer.push(item);
891        }
892
893        Ok(())
894    }
895
896    /// Determines if an item should be removed from secondary buffer
897    fn should_remove_from_secondary(&self) -> Result<bool, String> {
898        // Simple policy: remove oldest items first
899        if let Some(oldest) = self.secondary_buffer.front() {
900            let age = oldest.timestamp.elapsed();
901            Ok(age > Duration::from_secs(3600)) // Remove items older than 1 hour
902        } else {
903            Ok(false)
904        }
905    }
906
907    /// Calculates retention score for a data point
908    fn calculate_retention_score(&self, data_point: &StreamingDataPoint<A>) -> Result<A, String> {
909        let age_score = self.calculate_age_score(data_point);
910        let quality_score = data_point.quality_score;
911        let freshness_score = self.calculate_freshness_score(data_point);
912
913        // Weighted combination
914        let retention_score = quality_score * A::from(0.5).unwrap()
915            + freshness_score * A::from(0.3).unwrap()
916            + age_score * A::from(0.2).unwrap();
917
918        Ok(retention_score)
919    }
920
921    /// Calculates age score for retention
922    fn calculate_age_score(&self, data_point: &StreamingDataPoint<A>) -> A {
923        let age_seconds = data_point.timestamp.elapsed().as_secs_f64();
924        let max_age = 7200.0; // 2 hours
925
926        let age_score = (max_age - age_seconds.min(max_age)) / max_age;
927        A::from(age_score.max(0.0)).unwrap()
928    }
929
930    /// Updates throughput statistics
931    fn update_throughput_stats(&mut self, items_processed: usize) -> Result<(), String> {
932        let time_since_last = self.last_processing.elapsed().as_secs_f64();
933        if time_since_last > 0.0 {
934            let current_throughput = items_processed as f64 / time_since_last;
935            let throughput_value = A::from(current_throughput).unwrap();
936
937            self.statistics.throughput_stats.current_throughput = throughput_value;
938
939            // Update average throughput (simple moving average)
940            let alpha = A::from(0.1).unwrap(); // Smoothing factor
941            self.statistics.throughput_stats.avg_throughput = alpha * throughput_value
942                + (A::one() - alpha) * self.statistics.throughput_stats.avg_throughput;
943
944            // Update peak throughput
945            self.statistics.throughput_stats.peak_throughput = self
946                .statistics
947                .throughput_stats
948                .peak_throughput
949                .max(throughput_value);
950        }
951
952        Ok(())
953    }
954
955    /// Gets current buffer size (total items across all buffers)
956    pub fn current_size(&self) -> usize {
957        self.buffer.len() + self.secondary_buffer.len()
958    }
959
960    /// Gets time since last processing
961    pub fn time_since_last_processing(&self) -> Duration {
962        self.last_processing.elapsed()
963    }
964
965    /// Gets current buffer quality metrics
966    pub fn get_quality_metrics(&self) -> BufferQualityMetrics<A> {
967        self.quality_metrics.clone()
968    }
969
970    /// Computes size adaptation based on performance feedback
971    pub fn compute_size_adaptation(
972        &self,
973        performance_tracker: &PerformanceTracker<A>,
974    ) -> Result<Option<Adaptation<A>>, String> {
975        // Get recent performance data
976        let recent_performance = performance_tracker.get_recent_performance(10);
977        if recent_performance.is_empty() {
978            return Ok(None);
979        }
980
981        // Calculate average processing time
982        let avg_processing_time = recent_performance
983            .iter()
984            .map(|p| p.timestamp.elapsed().as_millis() as f64)
985            .sum::<f64>()
986            / recent_performance.len() as f64;
987
988        // If processing is too slow, suggest reducing buffer size
989        if avg_processing_time > 1000.0 {
990            // More than 1 second
991            let adaptation = Adaptation {
992                adaptation_type: AdaptationType::BufferSize,
993                magnitude: A::from(-0.2).unwrap(), // Reduce by 20%
994                target_component: "adaptive_buffer".to_string(),
995                parameters: std::collections::HashMap::new(),
996                priority: AdaptationPriority::Normal,
997                timestamp: Instant::now(),
998            };
999            return Ok(Some(adaptation));
1000        }
1001
1002        // If processing is very fast and buffer is often empty, suggest increasing size
1003        let avg_utilization = self.current_size() as f64 / self.sizing_strategy.target_size as f64;
1004        if avg_processing_time < 100.0 && avg_utilization < 0.3 {
1005            let adaptation = Adaptation {
1006                adaptation_type: AdaptationType::BufferSize,
1007                magnitude: A::from(0.3).unwrap(), // Increase by 30%
1008                target_component: "adaptive_buffer".to_string(),
1009                parameters: std::collections::HashMap::new(),
1010                priority: AdaptationPriority::Low,
1011                timestamp: Instant::now(),
1012            };
1013            return Ok(Some(adaptation));
1014        }
1015
1016        Ok(None)
1017    }
1018
1019    /// Applies size adaptation to the buffer
1020    pub fn apply_size_adaptation(&mut self, adaptation: &Adaptation<A>) -> Result<(), String> {
1021        if adaptation.adaptation_type == AdaptationType::BufferSize {
1022            let current_target = self.sizing_strategy.target_size;
1023            let change_factor = A::one() + adaptation.magnitude;
1024            let new_target =
1025                (current_target as f64 * change_factor.to_f64().unwrap_or(1.0)) as usize;
1026
1027            // Apply bounds
1028            let bounded_target = new_target
1029                .max(self.config.min_size)
1030                .min(self.config.max_size);
1031
1032            if bounded_target != current_target {
1033                self.sizing_strategy.target_size = bounded_target;
1034
1035                // Log the change
1036                let change_event = SizeChangeEvent {
1037                    timestamp: Instant::now(),
1038                    old_size: current_target,
1039                    new_size: bounded_target,
1040                    change_magnitude: bounded_target as i32 - current_target as i32,
1041                    reason: "adaptation".to_string(),
1042                };
1043
1044                if self.size_change_log.len() >= 100 {
1045                    self.size_change_log.pop_front();
1046                }
1047                self.size_change_log.push_back(change_event);
1048            }
1049        }
1050
1051        Ok(())
1052    }
1053
1054    /// Gets the last size change amount
1055    pub fn last_size_change(&self) -> f32 {
1056        if let Some(last_change) = self.size_change_log.back() {
1057            last_change.change_magnitude as f32
1058        } else {
1059            0.0
1060        }
1061    }
1062
1063    /// Resets the buffer to initial state
1064    pub fn reset(&mut self) -> Result<(), String> {
1065        self.buffer.clear();
1066        self.secondary_buffer.clear();
1067
1068        self.quality_metrics = BufferQualityMetrics {
1069            average_quality: A::zero(),
1070            quality_variance: A::zero(),
1071            min_quality: A::one(),
1072            max_quality: A::zero(),
1073            freshness_distribution: Vec::new(),
1074            priority_distribution: Vec::new(),
1075            quality_trend: QualityTrend {
1076                recent_changes: VecDeque::with_capacity(50),
1077                trend_direction: TrendDirection::Stable,
1078                trend_magnitude: A::zero(),
1079                confidence: A::zero(),
1080            },
1081        };
1082
1083        self.statistics.total_items_processed = 0;
1084        self.statistics.total_items_discarded = 0;
1085        self.last_processing = Instant::now();
1086        self.size_change_log.clear();
1087
1088        Ok(())
1089    }
1090
1091    /// Gets diagnostic information
1092    pub fn get_diagnostics(&self) -> BufferDiagnostics {
1093        BufferDiagnostics {
1094            current_size: self.current_size(),
1095            target_size: self.sizing_strategy.target_size,
1096            utilization: self.current_size() as f64 / self.sizing_strategy.target_size as f64,
1097            average_quality: self.quality_metrics.average_quality.to_f64().unwrap_or(0.0),
1098            total_processed: self.statistics.total_items_processed,
1099            total_discarded: self.statistics.total_items_discarded,
1100            size_changes: self.size_change_log.len(),
1101        }
1102    }
1103}
1104
1105// Implement Ord for PrioritizedDataPoint to work with BinaryHeap
1106impl<A: Float + Send + Sync + Send + Sync> Ord for PrioritizedDataPoint<A> {
1107    fn cmp(&self, other: &Self) -> Ordering {
1108        self.priority_score
1109            .partial_cmp(&other.priority_score)
1110            .unwrap_or(Ordering::Equal)
1111    }
1112}
1113
1114impl<A: Float + Send + Sync + Send + Sync> PartialOrd for PrioritizedDataPoint<A> {
1115    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1116        Some(self.cmp(other))
1117    }
1118}
1119
1120impl<A: Float + Send + Sync + Send + Sync> PartialEq for PrioritizedDataPoint<A> {
1121    fn eq(&self, other: &Self) -> bool {
1122        self.priority_score == other.priority_score
1123    }
1124}
1125
1126impl<A: Float + Send + Sync + Send + Sync> Eq for PrioritizedDataPoint<A> {}
1127
1128impl<A: Float + Send + Sync + Send + Sync> BufferSizingStrategy<A> {
1129    fn new(strategy_type: BufferSizeStrategy, initial_size: usize) -> Self {
1130        Self {
1131            strategy_type,
1132            target_size: initial_size,
1133            adjustment_params: SizeAdjustmentParams {
1134                growth_rate: A::from(0.2).unwrap(),
1135                shrinkage_rate: A::from(0.15).unwrap(),
1136                stability_threshold: A::from(0.05).unwrap(),
1137                performance_sensitivity: A::from(0.1).unwrap(),
1138                quality_sensitivity: A::from(0.1).unwrap(),
1139                memory_sensitivity: A::from(0.2).unwrap(),
1140            },
1141            performance_feedback: VecDeque::with_capacity(100),
1142            sizing_history: VecDeque::with_capacity(100),
1143        }
1144    }
1145}
1146
1147impl<A: Float + Send + Sync + Send + Sync> DataRetentionPolicy<A> {
1148    fn new(strategy: RetentionStrategy) -> Self {
1149        Self {
1150            strategy,
1151            age_policy: AgeBasedRetention {
1152                max_age: Duration::from_secs(7200),        // 2 hours
1153                soft_age_limit: Duration::from_secs(3600), // 1 hour
1154                age_weight: 0.3,
1155                adaptive_limits: true,
1156            },
1157            quality_policy: QualityBasedRetention {
1158                min_quality_threshold: A::from(0.3).unwrap(),
1159                quality_weight: A::from(0.5).unwrap(),
1160                adaptive_thresholds: true,
1161                quality_targets: QualityDistributionTargets {
1162                    high_quality_target: A::from(0.3).unwrap(),
1163                    medium_quality_target: A::from(0.5).unwrap(),
1164                    low_quality_target: A::from(0.2).unwrap(),
1165                    high_quality_threshold: A::from(0.8).unwrap(),
1166                    medium_quality_threshold: A::from(0.5).unwrap(),
1167                },
1168            },
1169            relevance_policy: RelevanceBasedRetention {
1170                relevance_method: RelevanceMethod::Similarity,
1171                relevance_weight: A::from(0.2).unwrap(),
1172                temporal_decay: true,
1173                decay_rate: A::from(0.1).unwrap(),
1174            },
1175            retention_scorer: RetentionScorer::new(),
1176        }
1177    }
1178}
1179
1180impl<A: Float + Send + Sync + Send + Sync> RetentionScorer<A> {
1181    fn new() -> Self {
1182        Self {
1183            weights: RetentionWeights {
1184                age_weight: A::from(0.2).unwrap(),
1185                quality_weight: A::from(0.3).unwrap(),
1186                relevance_weight: A::from(0.2).unwrap(),
1187                priority_weight: A::from(0.15).unwrap(),
1188                freshness_weight: A::from(0.1).unwrap(),
1189                diversity_weight: A::from(0.05).unwrap(),
1190            },
1191            scoring_history: VecDeque::with_capacity(1000),
1192            performance_feedback: VecDeque::with_capacity(100),
1193        }
1194    }
1195}
1196
1197/// Diagnostic information for buffer management
1198#[derive(Debug, Clone)]
1199pub struct BufferDiagnostics {
1200    pub current_size: usize,
1201    pub target_size: usize,
1202    pub utilization: f64,
1203    pub average_quality: f64,
1204    pub total_processed: u64,
1205    pub total_discarded: u64,
1206    pub size_changes: usize,
1207}