pub mod windowing;
pub mod batching;
pub mod buffering;
pub mod partitioning;
pub use windowing::*;
pub use batching::*;
pub use buffering::*;
pub use partitioning::*;
use crate::error::{MetricsError, Result};
use scirs2_core::numeric::Float;
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::time::{Duration, Instant, SystemTime};
pub use super::config::{WindowAdaptationStrategy, AdaptationTrigger, BatchingStrategy, BufferConfig, EvictionPolicy};
#[derive(Debug)]
pub struct AdaptiveWindowManager<F: Float + std::fmt::Debug> {
current_window_size: usize,
min_window_size: usize,
max_window_size: usize,
strategy: WindowAdaptationStrategy,
window_data: VecDeque<WindowDataPoint<F>>,
adaptation_history: VecDeque<WindowAdaptation>,
performance_tracker: WindowPerformanceTracker<F>,
last_adaptation: Instant,
adaptation_triggers: Vec<AdaptationTrigger>,
}
impl<F: Float + std::fmt::Debug + Send + Sync> AdaptiveWindowManager<F> {
pub fn new(
min_size: usize,
max_size: usize,
initial_size: usize,
strategy: WindowAdaptationStrategy,
) -> Self {
Self {
current_window_size: initial_size.max(min_size).min(max_size),
min_window_size: min_size,
max_window_size: max_size,
strategy,
window_data: VecDeque::new(),
adaptation_history: VecDeque::new(),
performance_tracker: WindowPerformanceTracker::new(),
last_adaptation: Instant::now(),
adaptation_triggers: vec![],
}
}
pub fn add_data_point(&mut self, value: F, timestamp: SystemTime) -> Result<WindowUpdateResult<F>> {
let data_point = WindowDataPoint {
value,
timestamp,
weight: F::one(),
};
self.window_data.push_back(data_point);
while self.window_data.len() > self.current_window_size {
self.window_data.pop_front();
}
self.performance_tracker.update(&self.window_data);
let adaptation_needed = self.check_adaptation_triggers()?;
let mut result = WindowUpdateResult {
window_size: self.current_window_size,
adaptation_performed: false,
new_window_size: None,
performance_metrics: self.performance_tracker.get_current_metrics(),
trigger_reason: None,
};
if adaptation_needed.is_some() {
let trigger = adaptation_needed.expect("Operation failed");
let new_size = self.adapt_window_size(&trigger)?;
if new_size != self.current_window_size {
self.current_window_size = new_size;
self.record_adaptation(trigger.clone(), new_size);
result.adaptation_performed = true;
result.new_window_size = Some(new_size);
result.trigger_reason = Some(trigger);
while self.window_data.len() > self.current_window_size {
self.window_data.pop_front();
}
}
}
Ok(result)
}
fn check_adaptation_triggers(&self) -> Result<Option<AdaptationTrigger>> {
for trigger in &self.adaptation_triggers {
match trigger {
AdaptationTrigger::Time(duration) => {
if self.last_adaptation.elapsed() >= *duration {
return Ok(Some(trigger.clone()));
}
}
AdaptationTrigger::SampleCount(count) => {
if self.window_data.len() >= *count {
return Ok(Some(trigger.clone()));
}
}
AdaptationTrigger::Performance { accuracy_threshold, latency_threshold } => {
let metrics = self.performance_tracker.get_current_metrics();
if metrics.accuracy < *accuracy_threshold ||
metrics.processing_latency > *latency_threshold {
return Ok(Some(trigger.clone()));
}
}
AdaptationTrigger::Drift { confidence } => {
let metrics = self.performance_tracker.get_current_metrics();
if metrics.variance > F::from(*confidence).expect("Failed to convert to float") {
return Ok(Some(trigger.clone()));
}
}
AdaptationTrigger::Manual => {
continue;
}
AdaptationTrigger::Combined(triggers) => {
let mut all_met = true;
for sub_trigger in triggers {
if self.check_single_trigger(sub_trigger)?.is_none() {
all_met = false;
break;
}
}
if all_met {
return Ok(Some(trigger.clone()));
}
}
}
}
Ok(None)
}
fn check_single_trigger(&self, trigger: &AdaptationTrigger) -> Result<Option<AdaptationTrigger>> {
match trigger {
AdaptationTrigger::Time(duration) => {
if self.last_adaptation.elapsed() >= *duration {
Ok(Some(trigger.clone()))
} else {
Ok(None)
}
}
AdaptationTrigger::SampleCount(count) => {
if self.window_data.len() >= *count {
Ok(Some(trigger.clone()))
} else {
Ok(None)
}
}
_ => Ok(None), }
}
fn adapt_window_size(&self, trigger: &AdaptationTrigger) -> Result<usize> {
let current_metrics = self.performance_tracker.get_current_metrics();
let new_size = match &self.strategy {
WindowAdaptationStrategy::Fixed => self.current_window_size,
WindowAdaptationStrategy::ExponentialDecay { decay_rate } => {
let decay = F::from(*decay_rate).expect("Failed to convert to float");
let adapted_size = F::from(self.current_window_size).expect("Failed to convert to float") * decay;
adapted_size.to_usize().unwrap_or(self.current_window_size)
}
WindowAdaptationStrategy::PerformanceBased { target_accuracy } => {
if current_metrics.accuracy < *target_accuracy {
(self.current_window_size as f64 * 1.2) as usize
} else {
(self.current_window_size as f64 * 0.9) as usize
}
}
WindowAdaptationStrategy::DriftBased => {
match trigger {
AdaptationTrigger::Drift { confidence } => {
if *confidence > 0.8 {
(self.current_window_size as f64 * 0.5) as usize
} else {
(self.current_window_size as f64 * 0.8) as usize
}
}
_ => {
if current_metrics.variance > F::from(0.1).expect("Failed to convert constant to float") {
(self.current_window_size as f64 * 0.9) as usize
} else {
(self.current_window_size as f64 * 1.1) as usize
}
}
}
}
WindowAdaptationStrategy::Hybrid { strategies, weights } => {
let mut weighted_size = 0.0;
let mut total_weight = 0.0;
for (strategy, weight) in strategies.iter().zip(weights.iter()) {
let temp_manager = AdaptiveWindowManager {
current_window_size: self.current_window_size,
min_window_size: self.min_window_size,
max_window_size: self.max_window_size,
strategy: strategy.clone(),
window_data: self.window_data.clone(),
adaptation_history: VecDeque::new(),
performance_tracker: self.performance_tracker.clone(),
last_adaptation: self.last_adaptation,
adaptation_triggers: vec![],
};
let size = temp_manager.adapt_window_size(trigger)?;
weighted_size += size as f64 * weight;
total_weight += weight;
}
if total_weight > 0.0 {
(weighted_size / total_weight) as usize
} else {
self.current_window_size
}
}
WindowAdaptationStrategy::MLBased { model_type: _ } => {
self.current_window_size
}
};
Ok(new_size.max(self.min_window_size).min(self.max_window_size))
}
fn record_adaptation(&mut self, trigger: AdaptationTrigger, new_size: usize) {
let adaptation = WindowAdaptation {
timestamp: SystemTime::now(),
old_size: self.current_window_size,
new_size,
trigger,
performance_before: self.performance_tracker.get_current_metrics(),
};
self.adaptation_history.push_back(adaptation);
while self.adaptation_history.len() > 100 {
self.adaptation_history.pop_front();
}
self.last_adaptation = Instant::now();
}
pub fn get_window_statistics(&self) -> WindowStatistics<F> {
if self.window_data.is_empty() {
return WindowStatistics {
size: self.current_window_size,
actual_size: 0,
mean: F::zero(),
variance: F::zero(),
min_value: F::zero(),
max_value: F::zero(),
age: Duration::from_secs(0),
};
}
let values: Vec<F> = self.window_data.iter().map(|p| p.value).collect();
let mean = values.iter().copied().fold(F::zero(), |acc, x| acc + x) / F::from(values.len()).expect("Operation failed");
let variance = values.iter()
.map(|&x| {
let diff = x - mean;
diff * diff
})
.fold(F::zero(), |acc, x| acc + x) / F::from(values.len()).expect("Operation failed");
let min_value = values.iter().copied().fold(F::infinity(), |acc, x| acc.min(x));
let max_value = values.iter().copied().fold(F::neg_infinity(), |acc, x| acc.max(x));
let oldest_timestamp = self.window_data.front().expect("Operation failed").timestamp;
let age = SystemTime::now().duration_since(oldest_timestamp).unwrap_or(Duration::from_secs(0));
WindowStatistics {
size: self.current_window_size,
actual_size: self.window_data.len(),
mean,
variance,
min_value,
max_value,
age,
}
}
pub fn add_trigger(&mut self, trigger: AdaptationTrigger) {
self.adaptation_triggers.push(trigger);
}
pub fn clear_triggers(&mut self) {
self.adaptation_triggers.clear();
}
pub fn get_adaptation_history(&self) -> Vec<WindowAdaptation> {
self.adaptation_history.iter().cloned().collect()
}
pub fn get_current_size(&self) -> usize {
self.current_window_size
}
pub fn set_window_size(&mut self, size: usize) -> Result<()> {
let new_size = size.max(self.min_window_size).min(self.max_window_size);
if new_size != self.current_window_size {
let trigger = AdaptationTrigger::Manual;
self.record_adaptation(trigger, new_size);
self.current_window_size = new_size;
while self.window_data.len() > self.current_window_size {
self.window_data.pop_front();
}
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct WindowDataPoint<F: Float + std::fmt::Debug> {
pub value: F,
pub timestamp: SystemTime,
pub weight: F,
}
#[derive(Debug, Clone)]
pub struct WindowAdaptation {
pub timestamp: SystemTime,
pub old_size: usize,
pub new_size: usize,
pub trigger: AdaptationTrigger,
pub performance_before: WindowPerformanceMetrics<f64>,
}
#[derive(Debug, Clone)]
pub struct WindowUpdateResult<F: Float + std::fmt::Debug> {
pub window_size: usize,
pub adaptation_performed: bool,
pub new_window_size: Option<usize>,
pub performance_metrics: WindowPerformanceMetrics<F>,
pub trigger_reason: Option<AdaptationTrigger>,
}
#[derive(Debug, Clone)]
pub struct WindowStatistics<F: Float + std::fmt::Debug> {
pub size: usize,
pub actual_size: usize,
pub mean: F,
pub variance: F,
pub min_value: F,
pub max_value: F,
pub age: Duration,
}
#[derive(Debug, Clone)]
pub struct WindowPerformanceTracker<F: Float + std::fmt::Debug> {
current_metrics: WindowPerformanceMetrics<F>,
metrics_history: VecDeque<WindowPerformanceMetrics<F>>,
last_update: Instant,
}
impl<F: Float + std::fmt::Debug> WindowPerformanceTracker<F> {
pub fn new() -> Self {
Self {
current_metrics: WindowPerformanceMetrics::default(),
metrics_history: VecDeque::new(),
last_update: Instant::now(),
}
}
pub fn update(&mut self, window_data: &VecDeque<WindowDataPoint<F>>) {
if window_data.is_empty() {
return;
}
let values: Vec<F> = window_data.iter().map(|p| p.value).collect();
let mean = values.iter().copied().fold(F::zero(), |acc, x| acc + x) / F::from(values.len()).expect("Operation failed");
let variance = values.iter()
.map(|&x| {
let diff = x - mean;
diff * diff
})
.fold(F::zero(), |acc, x| acc + x) / F::from(values.len()).expect("Operation failed");
let processing_time = self.last_update.elapsed();
self.current_metrics = WindowPerformanceMetrics {
accuracy: F::from(0.95).expect("Failed to convert constant to float"), variance,
processing_latency: processing_time,
throughput: F::from(values.len()).expect("Operation failed") / F::from(processing_time.as_secs_f64()).expect("Operation failed"),
memory_usage: F::from(values.len() * std::mem::size_of::<F>()).expect("Operation failed"),
};
self.metrics_history.push_back(self.current_metrics.clone());
while self.metrics_history.len() > 1000 {
self.metrics_history.pop_front();
}
self.last_update = Instant::now();
}
pub fn get_current_metrics(&self) -> WindowPerformanceMetrics<F> {
self.current_metrics.clone()
}
pub fn get_metrics_history(&self) -> Vec<WindowPerformanceMetrics<F>> {
self.metrics_history.iter().cloned().collect()
}
}
#[derive(Debug, Clone)]
pub struct WindowPerformanceMetrics<F: Float + std::fmt::Debug> {
pub accuracy: F,
pub variance: F,
pub processing_latency: Duration,
pub throughput: F,
pub memory_usage: F,
}
impl<F: Float + std::fmt::Debug> Default for WindowPerformanceMetrics<F> {
fn default() -> Self {
Self {
accuracy: F::one(),
variance: F::zero(),
processing_latency: Duration::from_millis(0),
throughput: F::zero(),
memory_usage: F::zero(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_adaptive_window_manager_creation() {
let manager = AdaptiveWindowManager::<f64>::new(
10,
1000,
100,
WindowAdaptationStrategy::Fixed
);
assert_eq!(manager.get_current_size(), 100);
}
#[test]
fn test_window_data_point_addition() {
let mut manager = AdaptiveWindowManager::<f64>::new(
10,
1000,
100,
WindowAdaptationStrategy::Fixed
);
let result = manager.add_data_point(1.0, SystemTime::now()).expect("Operation failed");
assert_eq!(result.window_size, 100);
assert!(!result.adaptation_performed);
}
#[test]
fn test_window_size_bounds() {
let mut manager = AdaptiveWindowManager::<f64>::new(
10,
100,
50,
WindowAdaptationStrategy::Fixed
);
manager.set_window_size(5).expect("Operation failed");
assert_eq!(manager.get_current_size(), 10);
manager.set_window_size(200).expect("Operation failed");
assert_eq!(manager.get_current_size(), 100);
}
#[test]
fn test_window_statistics() {
let mut manager = AdaptiveWindowManager::<f64>::new(
10,
1000,
5,
WindowAdaptationStrategy::Fixed
);
for i in 0..5 {
manager.add_data_point(i as f64, SystemTime::now()).expect("Operation failed");
}
let stats = manager.get_window_statistics();
assert_eq!(stats.actual_size, 5);
assert_eq!(stats.mean, 2.0); }
#[test]
fn test_adaptation_triggers() {
let mut manager = AdaptiveWindowManager::<f64>::new(
10,
1000,
100,
WindowAdaptationStrategy::PerformanceBased { target_accuracy: 0.9 }
);
manager.add_trigger(AdaptationTrigger::SampleCount(50));
assert_eq!(manager.adaptation_triggers.len(), 1);
manager.clear_triggers();
assert_eq!(manager.adaptation_triggers.len(), 0);
}
#[test]
fn test_performance_tracker() {
let mut tracker = WindowPerformanceTracker::<f64>::new();
let mut window_data = VecDeque::new();
for i in 0..10 {
window_data.push_back(WindowDataPoint {
value: i as f64,
timestamp: SystemTime::now(),
weight: 1.0,
});
}
tracker.update(&window_data);
let metrics = tracker.get_current_metrics();
assert!(metrics.throughput > 0.0);
assert!(metrics.variance > 0.0);
}
}