use super::config::*;
use super::optimizer::{Adaptation, AdaptationPriority, AdaptationType, StreamingDataPoint};
use super::performance::{PerformanceSnapshot, PerformanceTracker};
use scirs2_core::numeric::Float;
use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::collections::{BinaryHeap, HashMap, VecDeque};
use std::time::{Duration, Instant};
pub struct AdaptiveBuffer<A: Float + Send + Sync> {
config: BufferConfig,
buffer: BinaryHeap<PrioritizedDataPoint<A>>,
secondary_buffer: VecDeque<StreamingDataPoint<A>>,
quality_metrics: BufferQualityMetrics<A>,
sizing_strategy: BufferSizingStrategy<A>,
retention_policy: DataRetentionPolicy<A>,
statistics: BufferStatistics<A>,
last_processing: Instant,
size_change_log: VecDeque<SizeChangeEvent>,
}
#[derive(Debug, Clone)]
pub struct PrioritizedDataPoint<A: Float + Send + Sync> {
pub data_point: StreamingDataPoint<A>,
pub priority_score: A,
pub buffer_timestamp: Instant,
pub expected_processing_time: Duration,
pub freshness_score: A,
pub relevance_score: A,
}
#[derive(Debug, Clone)]
pub struct BufferQualityMetrics<A: Float + Send + Sync> {
pub average_quality: A,
pub quality_variance: A,
pub min_quality: A,
pub max_quality: A,
pub freshness_distribution: Vec<A>,
pub priority_distribution: Vec<A>,
pub quality_trend: QualityTrend<A>,
}
#[derive(Debug, Clone)]
pub struct QualityTrend<A: Float + Send + Sync> {
pub recent_changes: VecDeque<A>,
pub trend_direction: TrendDirection,
pub trend_magnitude: A,
pub confidence: A,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TrendDirection {
Improving,
Degrading,
Stable,
Oscillating,
}
pub struct BufferSizingStrategy<A: Float + Send + Sync> {
strategy_type: BufferSizeStrategy,
target_size: usize,
adjustment_params: SizeAdjustmentParams<A>,
performance_feedback: VecDeque<SizingPerformanceFeedback<A>>,
sizing_history: VecDeque<SizingEvent>,
}
#[derive(Debug, Clone)]
pub struct SizeAdjustmentParams<A: Float + Send + Sync> {
pub growth_rate: A,
pub shrinkage_rate: A,
pub stability_threshold: A,
pub performance_sensitivity: A,
pub quality_sensitivity: A,
pub memory_sensitivity: A,
}
#[derive(Debug, Clone)]
pub struct SizingPerformanceFeedback<A: Float + Send + Sync> {
pub buffer_size: usize,
pub processing_latency: Duration,
pub throughput: A,
pub quality_score: A,
pub memory_usage: usize,
pub timestamp: Instant,
}
#[derive(Debug, Clone)]
pub struct SizingEvent {
pub timestamp: Instant,
pub old_size: usize,
pub new_size: usize,
pub reason: SizingReason,
pub performance_impact: Option<f64>,
}
#[derive(Debug, Clone)]
pub enum SizingReason {
PerformanceOptimization,
QualityImprovement,
MemoryPressure,
LatencyRequirement,
ThroughputOptimization,
Manual,
Configuration,
}
pub struct DataRetentionPolicy<A: Float + Send + Sync> {
strategy: RetentionStrategy,
age_policy: AgeBasedRetention,
quality_policy: QualityBasedRetention<A>,
relevance_policy: RelevanceBasedRetention<A>,
retention_scorer: RetentionScorer<A>,
}
#[derive(Debug, Clone)]
pub enum RetentionStrategy {
FIFO,
LIFO,
LRU,
Priority,
Quality,
Age,
Hybrid,
Adaptive,
}
#[derive(Debug, Clone)]
pub struct AgeBasedRetention {
pub max_age: Duration,
pub soft_age_limit: Duration,
pub age_weight: f64,
pub adaptive_limits: bool,
}
#[derive(Debug, Clone)]
pub struct QualityBasedRetention<A: Float + Send + Sync> {
pub min_quality_threshold: A,
pub quality_weight: A,
pub adaptive_thresholds: bool,
pub quality_targets: QualityDistributionTargets<A>,
}
#[derive(Debug, Clone)]
pub struct QualityDistributionTargets<A: Float + Send + Sync> {
pub high_quality_target: A,
pub medium_quality_target: A,
pub low_quality_target: A,
pub high_quality_threshold: A,
pub medium_quality_threshold: A,
}
#[derive(Debug, Clone)]
pub struct RelevanceBasedRetention<A: Float + Send + Sync> {
pub relevance_method: RelevanceMethod,
pub relevance_weight: A,
pub temporal_decay: bool,
pub decay_rate: A,
}
#[derive(Debug, Clone)]
pub enum RelevanceMethod {
Distance,
Similarity,
FeatureImportance,
Uncertainty,
Diversity,
Custom(String),
}
pub struct RetentionScorer<A: Float + Send + Sync> {
weights: RetentionWeights<A>,
scoring_history: VecDeque<RetentionScore<A>>,
performance_feedback: VecDeque<RetentionPerformanceFeedback<A>>,
}
#[derive(Debug, Clone)]
pub struct RetentionWeights<A: Float + Send + Sync> {
pub age_weight: A,
pub quality_weight: A,
pub relevance_weight: A,
pub priority_weight: A,
pub freshness_weight: A,
pub diversity_weight: A,
}
#[derive(Debug, Clone)]
pub struct RetentionScore<A: Float + Send + Sync> {
pub overall_score: A,
pub component_scores: HashMap<String, A>,
pub should_retain: bool,
pub confidence: A,
pub timestamp: Instant,
}
#[derive(Debug, Clone)]
pub struct RetentionPerformanceFeedback<A: Float + Send + Sync> {
pub items_retained: usize,
pub items_discarded: usize,
pub retained_quality: A,
pub discarded_quality: A,
pub performance_impact: A,
pub timestamp: Instant,
}
#[derive(Debug, Clone)]
pub struct BufferStatistics<A: Float + Send + Sync> {
pub total_items_processed: u64,
pub total_items_discarded: u64,
pub avg_buffer_utilization: A,
pub peak_buffer_utilization: A,
pub avg_processing_latency: Duration,
pub throughput_stats: ThroughputStatistics<A>,
pub quality_stats: QualityStatistics<A>,
pub memory_stats: MemoryStatistics,
}
#[derive(Debug, Clone)]
pub struct ThroughputStatistics<A: Float + Send + Sync> {
pub current_throughput: A,
pub avg_throughput: A,
pub peak_throughput: A,
pub throughput_trend: TrendDirection,
pub stability: A,
}
#[derive(Debug, Clone)]
pub struct QualityStatistics<A: Float + Send + Sync> {
pub current_avg_quality: A,
pub historical_avg_quality: A,
pub quality_improvement_rate: A,
pub quality_distribution: HashMap<String, A>,
pub predicted_quality: Option<A>,
}
#[derive(Debug, Clone)]
pub struct MemoryStatistics {
pub current_usage_bytes: usize,
pub peak_usage_bytes: usize,
pub avg_usage_bytes: usize,
pub memory_efficiency: f64,
pub fragmentation: f64,
}
#[derive(Debug, Clone)]
pub struct SizeChangeEvent {
pub timestamp: Instant,
pub old_size: usize,
pub new_size: usize,
pub change_magnitude: i32,
pub reason: String,
}
impl<A: Float + Default + Clone + Send + Sync + std::iter::Sum + std::fmt::Debug>
AdaptiveBuffer<A>
{
pub fn new(config: &StreamingConfig) -> Result<Self, String> {
let buffer_config = config.buffer_config.clone();
let quality_metrics = BufferQualityMetrics {
average_quality: A::zero(),
quality_variance: A::zero(),
min_quality: A::one(),
max_quality: A::zero(),
freshness_distribution: Vec::new(),
priority_distribution: Vec::new(),
quality_trend: QualityTrend {
recent_changes: VecDeque::with_capacity(50),
trend_direction: TrendDirection::Stable,
trend_magnitude: A::zero(),
confidence: A::zero(),
},
};
let sizing_strategy = BufferSizingStrategy::new(
buffer_config.size_strategy.clone(),
buffer_config.initial_size,
);
let retention_policy = DataRetentionPolicy::new(RetentionStrategy::Hybrid);
let statistics = BufferStatistics {
total_items_processed: 0,
total_items_discarded: 0,
avg_buffer_utilization: A::zero(),
peak_buffer_utilization: A::zero(),
avg_processing_latency: Duration::ZERO,
throughput_stats: ThroughputStatistics {
current_throughput: A::zero(),
avg_throughput: A::zero(),
peak_throughput: A::zero(),
throughput_trend: TrendDirection::Stable,
stability: A::zero(),
},
quality_stats: QualityStatistics {
current_avg_quality: A::zero(),
historical_avg_quality: A::zero(),
quality_improvement_rate: A::zero(),
quality_distribution: HashMap::new(),
predicted_quality: None,
},
memory_stats: MemoryStatistics {
current_usage_bytes: 0,
peak_usage_bytes: 0,
avg_usage_bytes: 0,
memory_efficiency: 0.0,
fragmentation: 0.0,
},
};
Ok(Self {
config: buffer_config,
buffer: BinaryHeap::new(),
secondary_buffer: VecDeque::new(),
quality_metrics,
sizing_strategy,
retention_policy,
statistics,
last_processing: Instant::now(),
size_change_log: VecDeque::with_capacity(100),
})
}
pub fn add_batch(&mut self, batch: Vec<StreamingDataPoint<A>>) -> Result<(), String> {
for data_point in batch {
self.add_single_point(data_point)?;
}
self.update_quality_metrics()?;
self.check_buffer_resizing()?;
if self.current_size() > self.sizing_strategy.target_size {
self.apply_retention_policy()?;
}
Ok(())
}
fn add_single_point(&mut self, data_point: StreamingDataPoint<A>) -> Result<(), String> {
let priority_score = self.calculate_priority_score(&data_point)?;
let freshness_score = self.calculate_freshness_score(&data_point);
let relevance_score = self.calculate_relevance_score(&data_point)?;
let prioritized_point = PrioritizedDataPoint {
data_point,
priority_score,
buffer_timestamp: Instant::now(),
expected_processing_time: Duration::from_millis(100), freshness_score,
relevance_score,
};
if priority_score >= A::from(self.config.quality_threshold).expect("unwrap failed") {
self.buffer.push(prioritized_point);
} else {
self.secondary_buffer
.push_back(prioritized_point.data_point);
}
self.statistics.total_items_processed += 1;
Ok(())
}
fn calculate_priority_score(&self, data_point: &StreamingDataPoint<A>) -> Result<A, String> {
let mut score = data_point.quality_score;
let age = data_point.timestamp.elapsed().as_secs_f64();
let recency_bonus = A::from(1.0 / (1.0 + age / 3600.0)).expect("unwrap failed"); score = score + recency_bonus * A::from(0.1).expect("unwrap failed");
let novelty_score = self.calculate_novelty_score(data_point)?;
score = score + novelty_score * A::from(0.2).expect("unwrap failed");
Ok(score)
}
fn calculate_novelty_score(&self, data_point: &StreamingDataPoint<A>) -> Result<A, String> {
if self.buffer.is_empty() {
return Ok(A::from(0.5).expect("unwrap failed")); }
let recent_points: Vec<_> = self.buffer.iter().take(10).collect();
if recent_points.is_empty() {
return Ok(A::from(0.5).expect("unwrap failed"));
}
let mut total_distance = A::zero();
for recent_point in &recent_points {
let distance = self.calculate_feature_distance(
&data_point.features,
&recent_point.data_point.features,
)?;
total_distance = total_distance + distance;
}
let avg_distance = total_distance / A::from(recent_points.len()).expect("unwrap failed");
let normalized_novelty = avg_distance / (avg_distance + A::one());
Ok(normalized_novelty)
}
fn calculate_feature_distance(
&self,
features1: &scirs2_core::ndarray::Array1<A>,
features2: &scirs2_core::ndarray::Array1<A>,
) -> Result<A, String> {
if features1.len() != features2.len() {
return Err("Feature vectors have different lengths".to_string());
}
let mut distance = A::zero();
for (f1, f2) in features1.iter().zip(features2.iter()) {
let diff = *f1 - *f2;
distance = distance + diff * diff;
}
Ok(distance.sqrt())
}
fn calculate_freshness_score(&self, data_point: &StreamingDataPoint<A>) -> A {
let age_seconds = data_point.timestamp.elapsed().as_secs_f64();
let max_age = 3600.0;
let freshness = (max_age - age_seconds.min(max_age)) / max_age;
A::from(freshness.max(0.0)).expect("unwrap failed")
}
fn calculate_relevance_score(&self, _data_point: &StreamingDataPoint<A>) -> Result<A, String> {
Ok(A::from(0.7).expect("unwrap failed")) }
pub fn get_batch_for_processing(&mut self) -> Result<Vec<StreamingDataPoint<A>>, String> {
let batch_size = self.calculate_optimal_batch_size()?;
let mut processing_batch = Vec::with_capacity(batch_size);
while processing_batch.len() < batch_size && !self.buffer.is_empty() {
if let Some(prioritized_point) = self.buffer.pop() {
processing_batch.push(prioritized_point.data_point);
}
}
while processing_batch.len() < batch_size && !self.secondary_buffer.is_empty() {
if let Some(data_point) = self.secondary_buffer.pop_front() {
processing_batch.push(data_point);
}
}
self.last_processing = Instant::now();
self.update_throughput_stats(processing_batch.len())?;
Ok(processing_batch)
}
fn calculate_optimal_batch_size(&self) -> Result<usize, String> {
let mut batch_size = self.config.initial_size.min(32);
let buffer_utilization =
self.current_size() as f64 / self.sizing_strategy.target_size as f64;
if buffer_utilization > 0.8 {
batch_size = (batch_size as f64 * 1.5) as usize; } else if buffer_utilization < 0.3 {
batch_size = (batch_size as f64 * 0.7) as usize; }
if self.statistics.avg_processing_latency > Duration::from_millis(500) {
batch_size = (batch_size as f64 * 0.8) as usize; }
Ok(batch_size.max(1).min(self.current_size().min(100)))
}
fn update_quality_metrics(&mut self) -> Result<(), String> {
if self.buffer.is_empty() && self.secondary_buffer.is_empty() {
return Ok(());
}
let mut quality_sum = A::zero();
let mut quality_values = Vec::new();
for prioritized_point in &self.buffer {
let quality = prioritized_point.data_point.quality_score;
quality_sum = quality_sum + quality;
quality_values.push(quality);
}
for data_point in &self.secondary_buffer {
let quality = data_point.quality_score;
quality_sum = quality_sum + quality;
quality_values.push(quality);
}
if !quality_values.is_empty() {
let count = A::from(quality_values.len()).expect("unwrap failed");
self.quality_metrics.average_quality = quality_sum / count;
self.quality_metrics.min_quality =
quality_values.iter().cloned().fold(A::one(), A::min);
self.quality_metrics.max_quality =
quality_values.iter().cloned().fold(A::zero(), A::max);
let mean = self.quality_metrics.average_quality;
let variance_sum = quality_values
.iter()
.map(|&q| (q - mean) * (q - mean))
.sum::<A>();
self.quality_metrics.quality_variance = variance_sum / count;
self.update_quality_trend(self.quality_metrics.average_quality)?;
}
Ok(())
}
fn update_quality_trend(&mut self, current_quality: A) -> Result<(), String> {
let trend = &mut self.quality_metrics.quality_trend;
if trend.recent_changes.len() >= 50 {
trend.recent_changes.pop_front();
}
trend.recent_changes.push_back(current_quality);
if trend.recent_changes.len() >= 10 {
let recent: Vec<A> = trend.recent_changes.iter().cloned().collect();
let first_half_avg = recent.iter().take(recent.len() / 2).cloned().sum::<A>()
/ A::from(recent.len() / 2).expect("unwrap failed");
let second_half_avg = recent.iter().skip(recent.len() / 2).cloned().sum::<A>()
/ A::from(recent.len() - recent.len() / 2).expect("unwrap failed");
let change = second_half_avg - first_half_avg;
let change_threshold = A::from(0.05).expect("unwrap failed");
trend.trend_direction = if change > change_threshold {
TrendDirection::Improving
} else if change < -change_threshold {
TrendDirection::Degrading
} else {
TrendDirection::Stable
};
trend.trend_magnitude = change.abs();
trend.confidence = A::from(0.8).expect("unwrap failed"); }
Ok(())
}
fn check_buffer_resizing(&mut self) -> Result<(), String> {
if !self.config.enable_adaptive_sizing {
return Ok(());
}
let current_size = self.current_size();
let target_size = self.sizing_strategy.target_size;
let utilization = current_size as f64 / target_size as f64;
let should_resize = if utilization > 0.9 {
Some(SizingReason::ThroughputOptimization)
} else if utilization < 0.3 && target_size > self.config.min_size {
Some(SizingReason::MemoryPressure)
} else {
None
};
if let Some(reason) = should_resize {
self.resize_buffer(reason)?;
}
Ok(())
}
fn resize_buffer(&mut self, reason: SizingReason) -> Result<(), String> {
let old_size = self.sizing_strategy.target_size;
let new_size = match reason {
SizingReason::ThroughputOptimization => {
let growth_factor = 1.0
+ self
.sizing_strategy
.adjustment_params
.growth_rate
.to_f64()
.unwrap_or(0.2);
((old_size as f64) * growth_factor) as usize
}
SizingReason::MemoryPressure => {
let shrink_factor = 1.0
- self
.sizing_strategy
.adjustment_params
.shrinkage_rate
.to_f64()
.unwrap_or(0.2);
((old_size as f64) * shrink_factor) as usize
}
_ => old_size, };
let bounded_size = new_size.max(self.config.min_size).min(self.config.max_size);
if bounded_size != old_size {
self.sizing_strategy.target_size = bounded_size;
let change_event = SizeChangeEvent {
timestamp: Instant::now(),
old_size,
new_size: bounded_size,
change_magnitude: bounded_size as i32 - old_size as i32,
reason: format!("{:?}", reason),
};
if self.size_change_log.len() >= 100 {
self.size_change_log.pop_front();
}
self.size_change_log.push_back(change_event);
}
Ok(())
}
fn apply_retention_policy(&mut self) -> Result<(), String> {
let target_size = self.sizing_strategy.target_size;
let current_size = self.current_size();
if current_size <= target_size {
return Ok(());
}
let items_to_remove = current_size - target_size;
let mut removed_count = 0;
while removed_count < items_to_remove && !self.secondary_buffer.is_empty() {
if self.should_remove_from_secondary()? {
self.secondary_buffer.pop_front();
removed_count += 1;
self.statistics.total_items_discarded += 1;
} else {
break;
}
}
let mut temp_buffer = Vec::new();
while let Some(item) = self.buffer.pop() {
temp_buffer.push(item);
}
temp_buffer.sort_by(|a, b| {
let score_a = self
.calculate_retention_score(&a.data_point)
.unwrap_or(A::zero());
let score_b = self
.calculate_retention_score(&b.data_point)
.unwrap_or(A::zero());
score_b.partial_cmp(&score_a).unwrap_or(Ordering::Equal)
});
let items_to_keep = (temp_buffer.len()).saturating_sub(items_to_remove - removed_count);
for item in temp_buffer.into_iter().take(items_to_keep) {
self.buffer.push(item);
}
Ok(())
}
fn should_remove_from_secondary(&self) -> Result<bool, String> {
if let Some(oldest) = self.secondary_buffer.front() {
let age = oldest.timestamp.elapsed();
Ok(age > Duration::from_secs(3600)) } else {
Ok(false)
}
}
fn calculate_retention_score(&self, data_point: &StreamingDataPoint<A>) -> Result<A, String> {
let age_score = self.calculate_age_score(data_point);
let quality_score = data_point.quality_score;
let freshness_score = self.calculate_freshness_score(data_point);
let retention_score = quality_score * A::from(0.5).expect("unwrap failed")
+ freshness_score * A::from(0.3).expect("unwrap failed")
+ age_score * A::from(0.2).expect("unwrap failed");
Ok(retention_score)
}
fn calculate_age_score(&self, data_point: &StreamingDataPoint<A>) -> A {
let age_seconds = data_point.timestamp.elapsed().as_secs_f64();
let max_age = 7200.0;
let age_score = (max_age - age_seconds.min(max_age)) / max_age;
A::from(age_score.max(0.0)).expect("unwrap failed")
}
fn update_throughput_stats(&mut self, items_processed: usize) -> Result<(), String> {
let time_since_last = self.last_processing.elapsed().as_secs_f64();
if time_since_last > 0.0 {
let current_throughput = items_processed as f64 / time_since_last;
let throughput_value = A::from(current_throughput).expect("unwrap failed");
self.statistics.throughput_stats.current_throughput = throughput_value;
let alpha = A::from(0.1).expect("unwrap failed"); self.statistics.throughput_stats.avg_throughput = alpha * throughput_value
+ (A::one() - alpha) * self.statistics.throughput_stats.avg_throughput;
self.statistics.throughput_stats.peak_throughput = self
.statistics
.throughput_stats
.peak_throughput
.max(throughput_value);
}
Ok(())
}
pub fn current_size(&self) -> usize {
self.buffer.len() + self.secondary_buffer.len()
}
pub fn time_since_last_processing(&self) -> Duration {
self.last_processing.elapsed()
}
pub fn get_quality_metrics(&self) -> BufferQualityMetrics<A> {
self.quality_metrics.clone()
}
pub fn compute_size_adaptation(
&self,
performance_tracker: &PerformanceTracker<A>,
) -> Result<Option<Adaptation<A>>, String> {
let recent_performance = performance_tracker.get_recent_performance(10);
if recent_performance.is_empty() {
return Ok(None);
}
let avg_processing_time = recent_performance
.iter()
.map(|p| p.timestamp.elapsed().as_millis() as f64)
.sum::<f64>()
/ recent_performance.len() as f64;
if avg_processing_time > 1000.0 {
let adaptation = Adaptation {
adaptation_type: AdaptationType::BufferSize,
magnitude: A::from(-0.2).expect("unwrap failed"), target_component: "adaptive_buffer".to_string(),
parameters: std::collections::HashMap::new(),
priority: AdaptationPriority::Normal,
timestamp: Instant::now(),
};
return Ok(Some(adaptation));
}
let avg_utilization = self.current_size() as f64 / self.sizing_strategy.target_size as f64;
if avg_processing_time < 100.0 && avg_utilization < 0.3 {
let adaptation = Adaptation {
adaptation_type: AdaptationType::BufferSize,
magnitude: A::from(0.3).expect("unwrap failed"), target_component: "adaptive_buffer".to_string(),
parameters: std::collections::HashMap::new(),
priority: AdaptationPriority::Low,
timestamp: Instant::now(),
};
return Ok(Some(adaptation));
}
Ok(None)
}
pub fn apply_size_adaptation(&mut self, adaptation: &Adaptation<A>) -> Result<(), String> {
if adaptation.adaptation_type == AdaptationType::BufferSize {
let current_target = self.sizing_strategy.target_size;
let change_factor = A::one() + adaptation.magnitude;
let new_target =
(current_target as f64 * change_factor.to_f64().unwrap_or(1.0)) as usize;
let bounded_target = new_target
.max(self.config.min_size)
.min(self.config.max_size);
if bounded_target != current_target {
self.sizing_strategy.target_size = bounded_target;
let change_event = SizeChangeEvent {
timestamp: Instant::now(),
old_size: current_target,
new_size: bounded_target,
change_magnitude: bounded_target as i32 - current_target as i32,
reason: "adaptation".to_string(),
};
if self.size_change_log.len() >= 100 {
self.size_change_log.pop_front();
}
self.size_change_log.push_back(change_event);
}
}
Ok(())
}
pub fn last_size_change(&self) -> f32 {
if let Some(last_change) = self.size_change_log.back() {
last_change.change_magnitude as f32
} else {
0.0
}
}
pub fn reset(&mut self) -> Result<(), String> {
self.buffer.clear();
self.secondary_buffer.clear();
self.quality_metrics = BufferQualityMetrics {
average_quality: A::zero(),
quality_variance: A::zero(),
min_quality: A::one(),
max_quality: A::zero(),
freshness_distribution: Vec::new(),
priority_distribution: Vec::new(),
quality_trend: QualityTrend {
recent_changes: VecDeque::with_capacity(50),
trend_direction: TrendDirection::Stable,
trend_magnitude: A::zero(),
confidence: A::zero(),
},
};
self.statistics.total_items_processed = 0;
self.statistics.total_items_discarded = 0;
self.last_processing = Instant::now();
self.size_change_log.clear();
Ok(())
}
pub fn get_diagnostics(&self) -> BufferDiagnostics {
BufferDiagnostics {
current_size: self.current_size(),
target_size: self.sizing_strategy.target_size,
utilization: self.current_size() as f64 / self.sizing_strategy.target_size as f64,
average_quality: self.quality_metrics.average_quality.to_f64().unwrap_or(0.0),
total_processed: self.statistics.total_items_processed,
total_discarded: self.statistics.total_items_discarded,
size_changes: self.size_change_log.len(),
}
}
}
impl<A: Float + Send + Sync + Send + Sync> Ord for PrioritizedDataPoint<A> {
fn cmp(&self, other: &Self) -> Ordering {
self.priority_score
.partial_cmp(&other.priority_score)
.unwrap_or(Ordering::Equal)
}
}
impl<A: Float + Send + Sync + Send + Sync> PartialOrd for PrioritizedDataPoint<A> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<A: Float + Send + Sync + Send + Sync> PartialEq for PrioritizedDataPoint<A> {
fn eq(&self, other: &Self) -> bool {
self.priority_score == other.priority_score
}
}
impl<A: Float + Send + Sync + Send + Sync> Eq for PrioritizedDataPoint<A> {}
impl<A: Float + Send + Sync + Send + Sync> BufferSizingStrategy<A> {
fn new(strategy_type: BufferSizeStrategy, initial_size: usize) -> Self {
Self {
strategy_type,
target_size: initial_size,
adjustment_params: SizeAdjustmentParams {
growth_rate: A::from(0.2).expect("unwrap failed"),
shrinkage_rate: A::from(0.15).expect("unwrap failed"),
stability_threshold: A::from(0.05).expect("unwrap failed"),
performance_sensitivity: A::from(0.1).expect("unwrap failed"),
quality_sensitivity: A::from(0.1).expect("unwrap failed"),
memory_sensitivity: A::from(0.2).expect("unwrap failed"),
},
performance_feedback: VecDeque::with_capacity(100),
sizing_history: VecDeque::with_capacity(100),
}
}
}
impl<A: Float + Send + Sync + Send + Sync> DataRetentionPolicy<A> {
fn new(strategy: RetentionStrategy) -> Self {
Self {
strategy,
age_policy: AgeBasedRetention {
max_age: Duration::from_secs(7200), soft_age_limit: Duration::from_secs(3600), age_weight: 0.3,
adaptive_limits: true,
},
quality_policy: QualityBasedRetention {
min_quality_threshold: A::from(0.3).expect("unwrap failed"),
quality_weight: A::from(0.5).expect("unwrap failed"),
adaptive_thresholds: true,
quality_targets: QualityDistributionTargets {
high_quality_target: A::from(0.3).expect("unwrap failed"),
medium_quality_target: A::from(0.5).expect("unwrap failed"),
low_quality_target: A::from(0.2).expect("unwrap failed"),
high_quality_threshold: A::from(0.8).expect("unwrap failed"),
medium_quality_threshold: A::from(0.5).expect("unwrap failed"),
},
},
relevance_policy: RelevanceBasedRetention {
relevance_method: RelevanceMethod::Similarity,
relevance_weight: A::from(0.2).expect("unwrap failed"),
temporal_decay: true,
decay_rate: A::from(0.1).expect("unwrap failed"),
},
retention_scorer: RetentionScorer::new(),
}
}
}
impl<A: Float + Send + Sync + Send + Sync> RetentionScorer<A> {
fn new() -> Self {
Self {
weights: RetentionWeights {
age_weight: A::from(0.2).expect("unwrap failed"),
quality_weight: A::from(0.3).expect("unwrap failed"),
relevance_weight: A::from(0.2).expect("unwrap failed"),
priority_weight: A::from(0.15).expect("unwrap failed"),
freshness_weight: A::from(0.1).expect("unwrap failed"),
diversity_weight: A::from(0.05).expect("unwrap failed"),
},
scoring_history: VecDeque::with_capacity(1000),
performance_feedback: VecDeque::with_capacity(100),
}
}
}
#[derive(Debug, Clone)]
pub struct BufferDiagnostics {
pub current_size: usize,
pub target_size: usize,
pub utilization: f64,
pub average_quality: f64,
pub total_processed: u64,
pub total_discarded: u64,
pub size_changes: usize,
}