use crate::error::StatsResult;
use scirs2_core::ndarray::{Array1, ArrayView1};
use scirs2_core::numeric::{Float, NumCast, One, Zero};
use scirs2_core::{simd_ops::SimdUnifiedOps, validation::*};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::marker::PhantomData;
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AdvancedStreamingConfig {
pub default_windowsize: usize,
pub adaptive_windowing: bool,
pub max_buffer_memory: usize,
pub change_point_detection: bool,
pub incremental_ml: bool,
pub distributed_processing: bool,
pub high_throughput_threshold: f64,
pub anomaly_detection: bool,
pub significance_level: f64,
pub intelligent_compression: bool,
pub realtime_visualization: bool,
pub approximate_algorithms: bool,
}
impl Default for AdvancedStreamingConfig {
fn default() -> Self {
Self {
default_windowsize: 1000,
adaptive_windowing: true,
max_buffer_memory: 100 * 1024 * 1024, change_point_detection: true,
incremental_ml: true,
distributed_processing: false,
high_throughput_threshold: 10000.0, anomaly_detection: true,
significance_level: 0.05,
intelligent_compression: true,
realtime_visualization: false,
approximate_algorithms: false,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum WindowingStrategy {
Sliding { size: usize },
Tumbling { size: usize },
Session { timeout: Duration },
TimeBased { duration: Duration },
Adaptive {
minsize: usize,
maxsize: usize,
adaptation_rate: f64,
},
EventDriven { trigger_condition: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum StreamProcessingMode {
RealTime,
MicroBatch { batchsize: usize },
Adaptive,
EventDriven,
}
#[derive(Debug, Clone)]
pub struct StreamingStatistics<F> {
pub count: usize,
pub mean: F,
pub variance: F,
pub std_dev: F,
pub min: F,
pub max: F,
pub skewness: F,
pub kurtosis: F,
pub last_update: Instant,
pub throughput: f64, pub memory_usage: usize,
pub change_points: Vec<Instant>,
pub anomalies: Vec<(Instant, F)>,
}
pub struct AdvancedAdvancedStreamingProcessor<F> {
config: AdvancedStreamingConfig,
windowing_strategy: WindowingStrategy,
processing_mode: StreamProcessingMode,
buffer: Arc<RwLock<VecDeque<(Instant, F)>>>,
statistics: Arc<RwLock<StreamingStatistics<F>>>,
change_detector: Arc<Mutex<ChangePointDetector<F>>>,
anomaly_detector: Arc<Mutex<AnomalyDetector<F>>>,
ml_model: Option<Arc<Mutex<IncrementalMLModel<F>>>>,
compression_engine: Arc<Mutex<CompressionEngine<F>>>,
_phantom: PhantomData<F>,
}
pub struct ChangePointDetector<F> {
algorithm: ChangePointAlgorithm,
windowdata: VecDeque<F>,
threshold: f64,
last_detection: Option<Instant>,
_phantom: PhantomData<F>,
}
#[derive(Debug, Clone)]
pub enum ChangePointAlgorithm {
CUSUM { drift: f64, threshold: f64 },
BOCPD { hazard_rate: f64 },
EWMA { lambda: f64, threshold: f64 },
PageHinkley { delta: f64, threshold: f64 },
ADWIN { confidence: f64 },
}
pub struct AnomalyDetector<F> {
algorithm: AnomalyDetectionAlgorithm,
baseline_statistics: StreamingStatistics<F>,
detection_threshold: f64,
anomaly_history: VecDeque<(Instant, F, AnomalyType)>,
_phantom: PhantomData<F>,
}
#[derive(Debug, Clone)]
pub enum AnomalyDetectionAlgorithm {
ZScore { threshold: f64 },
IQR { factor: f64 },
IsolationForest { contamination: f64 },
LOF { neighbors: usize },
OneClassSVM { nu: f64, gamma: f64 },
}
#[derive(Debug, Clone)]
pub enum AnomalyType {
PointAnomaly,
ContextualAnomaly,
CollectiveAnomaly,
}
pub struct IncrementalMLModel<F> {
model_type: MLModelType,
parameters: HashMap<String, F>,
trainingdata: VecDeque<Array1<F>>,
model_performance: ModelPerformance<F>,
_phantom: PhantomData<F>,
}
#[derive(Debug, Clone)]
pub enum MLModelType {
OnlineLinearRegression,
IncrementalPCA { components: usize },
OnlineKMeans { k: usize },
StreamingRandomForest { trees: usize },
OnlineNeuralNetwork { layers: Vec<usize> },
}
#[derive(Debug, Clone)]
pub struct ModelPerformance<F> {
pub accuracy: F,
pub precision: F,
pub recall: F,
pub f1_score: F,
pub training_samples: usize,
pub last_updated: Instant,
}
pub struct CompressionEngine<F> {
algorithm: CompressionAlgorithm,
compression_ratio: f64,
historicaldata: VecDeque<CompressedDataPoint<F>>,
metadata: CompressionMetadata,
_phantom: PhantomData<F>,
}
#[derive(Debug, Clone)]
pub enum CompressionAlgorithm {
PAA { segments: usize },
SAX {
alphabetsize: usize,
segments: usize,
},
DFT { coefficients: usize },
Wavelet { levels: usize, threshold: f64 },
Adaptive,
}
#[derive(Debug, Clone)]
pub struct CompressedDataPoint<F> {
pub timestamp: Instant,
pub compressed_value: Vec<F>,
pub compression_metadata: String,
pub reconstruction_error: F,
}
#[derive(Debug, Clone)]
pub struct CompressionMetadata {
pub originalsize: usize,
pub compressedsize: usize,
pub compression_ratio: f64,
pub reconstruction_accuracy: f64,
pub algorithm_used: String,
}
#[derive(Debug, Clone)]
pub struct StreamingAnalyticsResult<F> {
pub real_time_statistics: StreamingStatistics<F>,
pub change_points: Vec<ChangePointEvent>,
pub anomalies: Vec<AnomalyEvent<F>>,
pub ml_predictions: Option<Vec<F>>,
pub compression_summary: CompressionSummary,
pub performance_metrics: StreamingPerformanceMetrics,
pub recommendations: Vec<StreamingRecommendation>,
}
#[derive(Debug, Clone)]
pub struct ChangePointEvent {
pub timestamp: Instant,
pub confidence: f64,
pub algorithm: String,
pub statistical_significance: f64,
pub description: String,
}
#[derive(Debug, Clone)]
pub struct AnomalyEvent<F> {
pub timestamp: Instant,
pub value: F,
pub anomaly_type: AnomalyType,
pub severity: AnomalySeverity,
pub confidence: f64,
pub context: String,
}
#[derive(Debug, Clone)]
pub enum AnomalySeverity {
Low,
Medium,
High,
Critical,
}
#[derive(Debug, Clone)]
pub struct CompressionSummary {
pub total_compressed_points: usize,
pub average_compression_ratio: f64,
pub memory_saved: usize,
pub reconstruction_accuracy: f64,
pub compression_latency: Duration,
}
#[derive(Debug, Clone)]
pub struct StreamingPerformanceMetrics {
pub throughput_samples_per_sec: f64,
pub latency_microseconds: f64,
pub memory_usage_mb: f64,
pub cpu_utilization_percent: f64,
pub accuracy_vs_batch: f64,
pub data_freshness_seconds: f64,
}
#[derive(Debug, Clone)]
pub struct StreamingRecommendation {
pub category: RecommendationCategory,
pub message: String,
pub priority: RecommendationPriority,
pub estimated_impact: f64,
}
#[derive(Debug, Clone)]
pub enum RecommendationCategory {
WindowingStrategy,
ProcessingMode,
MemoryOptimization,
AlgorithmSelection,
PerformanceTuning,
AnomalyDetection,
Compression,
}
#[derive(Debug, Clone)]
pub enum RecommendationPriority {
Low,
Medium,
High,
Critical,
}
impl<F> AdvancedAdvancedStreamingProcessor<F>
where
F: Float
+ NumCast
+ SimdUnifiedOps
+ Zero
+ One
+ PartialOrd
+ Copy
+ Send
+ Sync
+ 'static
+ std::fmt::Display,
{
pub fn new(config: AdvancedStreamingConfig) -> Self {
let windowing_strategy = WindowingStrategy::Sliding {
size: config.default_windowsize,
};
let processing_mode = StreamProcessingMode::Adaptive;
let statistics = StreamingStatistics {
count: 0,
mean: F::zero(),
variance: F::zero(),
std_dev: F::zero(),
min: F::infinity(),
max: F::neg_infinity(),
skewness: F::zero(),
kurtosis: F::zero(),
last_update: Instant::now(),
throughput: 0.0,
memory_usage: 0,
change_points: Vec::new(),
anomalies: Vec::new(),
};
Self {
config,
windowing_strategy,
processing_mode,
buffer: Arc::new(RwLock::new(VecDeque::new())),
statistics: Arc::new(RwLock::new(statistics)),
change_detector: Arc::new(Mutex::new(ChangePointDetector::new())),
anomaly_detector: Arc::new(Mutex::new(AnomalyDetector::new())),
ml_model: None,
compression_engine: Arc::new(Mutex::new(CompressionEngine::new())),
_phantom: PhantomData,
}
}
pub fn processdata_point(&mut self, value: F) -> StatsResult<()> {
let timestamp = Instant::now();
{
let mut buffer = self.buffer.write().expect("Operation failed");
buffer.push_back((timestamp, value));
self.apply_windowing_strategy(&mut buffer)?;
}
self.update_statistics(value, timestamp)?;
if self.config.change_point_detection {
self.detect_change_points(value)?;
}
if self.config.anomaly_detection {
self.detect_anomalies(value, timestamp)?;
}
if self.config.incremental_ml && self.ml_model.is_some() {
self.update_ml_model(value)?;
}
if self.config.intelligent_compression {
self.apply_compression(value, timestamp)?;
}
Ok(())
}
pub fn process_batch(&mut self, values: &ArrayView1<F>) -> StatsResult<()> {
checkarray_finite(values, "values")?;
let start_time = Instant::now();
if values.len() >= 64 {
self.process_batch_simd(values)?;
} else {
for &value in values.iter() {
self.processdata_point(value)?;
}
}
let elapsed = start_time.elapsed();
let throughput = values.len() as f64 / elapsed.as_secs_f64();
{
let mut stats = self.statistics.write().expect("Operation failed");
stats.throughput = throughput;
}
Ok(())
}
fn process_batch_simd(&mut self, values: &ArrayView1<F>) -> StatsResult<()> {
let batch_mean = F::simd_mean(values);
let squared_values = F::simd_mul(values, values);
let mean_squared = F::simd_mean(&squared_values.view());
let batch_variance = mean_squared - batch_mean * batch_mean;
let batch_min = F::simd_min_element(values);
let batch_max = F::simd_max_element(values);
{
let mut stats = self.statistics.write().expect("Operation failed");
let n = F::from(stats.count).expect("Failed to convert to float");
let m = F::from(values.len()).expect("Operation failed");
let total = n + m;
let delta = batch_mean - stats.mean;
stats.mean = stats.mean + delta * m / total;
stats.variance = (stats.variance * n + batch_variance * m) / total;
stats.std_dev = stats.variance.sqrt();
stats.count += values.len();
if batch_min < stats.min {
stats.min = batch_min;
}
if batch_max > stats.max {
stats.max = batch_max;
}
stats.last_update = Instant::now();
}
Ok(())
}
fn apply_windowing_strategy(&self, buffer: &mut VecDeque<(Instant, F)>) -> StatsResult<()> {
match &self.windowing_strategy {
WindowingStrategy::Sliding { size } => {
while buffer.len() > *size {
buffer.pop_front();
}
}
WindowingStrategy::Tumbling { size } => {
if buffer.len() >= *size {
buffer.clear();
}
}
WindowingStrategy::TimeBased { duration } => {
let cutoff = Instant::now() - *duration;
while let Some((timestamp_, _)) = buffer.front() {
if *timestamp_ < cutoff {
buffer.pop_front();
} else {
break;
}
}
}
WindowingStrategy::Adaptive {
minsize, maxsize, ..
} => {
let adaptivesize = self.calculate_adaptive_windowsize(*minsize, *maxsize)?;
while buffer.len() > adaptivesize {
buffer.pop_front();
}
}
_ => {
}
}
Ok(())
}
fn calculate_adaptive_windowsize(&self, minsize: usize, maxsize: usize) -> StatsResult<usize> {
let stats = self.statistics.read().expect("Operation failed");
let variance_factor = if stats.variance > F::zero() {
(stats.variance.sqrt()).to_f64().unwrap_or(1.0)
} else {
1.0
};
let throughput_factor = (stats.throughput / 1000.0).max(0.1).min(10.0);
let adaptivesize = (minsize as f64 * variance_factor * throughput_factor) as usize;
Ok(adaptivesize.max(minsize).min(maxsize))
}
fn update_statistics(&self, value: F, timestamp: Instant) -> StatsResult<()> {
let mut stats = self.statistics.write().expect("Operation failed");
if stats.count == 0 {
stats.mean = value;
stats.variance = F::zero();
stats.std_dev = F::zero();
stats.min = value;
stats.max = value;
stats.count = 1;
} else {
let n = F::from(stats.count).expect("Failed to convert to float");
let delta = value - stats.mean;
stats.mean = stats.mean + delta / (n + F::one());
let delta2 = value - stats.mean;
stats.variance = stats.variance + delta * delta2;
stats.std_dev = (stats.variance / n).sqrt();
stats.count += 1;
if value < stats.min {
stats.min = value;
}
if value > stats.max {
stats.max = value;
}
}
let elapsed = timestamp.duration_since(stats.last_update);
if elapsed.as_secs_f64() > 0.0 {
stats.throughput = 1.0 / elapsed.as_secs_f64();
}
stats.last_update = timestamp;
Ok(())
}
fn detect_change_points(&self, value: F) -> StatsResult<()> {
let mut detector = self.change_detector.lock().expect("Operation failed");
if let Some(change_point) = detector.detect(value)? {
let mut stats = self.statistics.write().expect("Operation failed");
stats.change_points.push(change_point);
}
Ok(())
}
fn detect_anomalies(&self, value: F, timestamp: Instant) -> StatsResult<()> {
let mut detector = self.anomaly_detector.lock().expect("Operation failed");
if let Some(_anomaly_type) = detector.detect(value)? {
let mut stats = self.statistics.write().expect("Operation failed");
stats.anomalies.push((timestamp, value));
}
Ok(())
}
fn update_ml_model(&self, data: F) -> StatsResult<()> {
Ok(())
}
fn apply_compression(&self, value: F, timestamp: Instant) -> StatsResult<()> {
let mut engine = self.compression_engine.lock().expect("Operation failed");
engine.compressdata_point(value, timestamp)?;
Ok(())
}
pub fn get_analytics_results(&self) -> StatsResult<StreamingAnalyticsResult<F>> {
let stats = self.statistics.read().expect("Operation failed").clone();
let change_points: Vec<ChangePointEvent> = stats
.change_points
.iter()
.map(|×tamp| ChangePointEvent {
timestamp,
confidence: 0.95, algorithm: "CUSUM".to_string(),
statistical_significance: 0.01,
description: "Significant change detected in data distribution".to_string(),
})
.collect();
let anomalies: Vec<AnomalyEvent<F>> = stats
.anomalies
.iter()
.map(|(timestamp, value)| AnomalyEvent {
timestamp: *timestamp,
value: *value,
anomaly_type: AnomalyType::PointAnomaly,
severity: AnomalySeverity::Medium,
confidence: 0.8,
context: "Statistical outlier detected".to_string(),
})
.collect();
let performance_metrics = StreamingPerformanceMetrics {
throughput_samples_per_sec: stats.throughput,
latency_microseconds: 50.0, memory_usage_mb: (stats.memory_usage as f64) / (1024.0 * 1024.0),
cpu_utilization_percent: 25.0, accuracy_vs_batch: 0.999, data_freshness_seconds: 0.1,
};
let compression_summary = CompressionSummary {
total_compressed_points: 0, average_compression_ratio: 0.7,
memory_saved: 0,
reconstruction_accuracy: 0.99,
compression_latency: Duration::from_micros(10),
};
let recommendations = self.generate_recommendations(&stats, &performance_metrics)?;
Ok(StreamingAnalyticsResult {
real_time_statistics: stats,
change_points,
anomalies,
ml_predictions: None,
compression_summary,
performance_metrics,
recommendations,
})
}
fn generate_recommendations(
&self,
stats: &StreamingStatistics<F>,
performance: &StreamingPerformanceMetrics,
) -> StatsResult<Vec<StreamingRecommendation>> {
let mut recommendations = Vec::new();
if performance.throughput_samples_per_sec < self.config.high_throughput_threshold {
recommendations.push(StreamingRecommendation {
category: RecommendationCategory::PerformanceTuning,
message: "Consider enabling approximate algorithms for higher throughput"
.to_string(),
priority: RecommendationPriority::Medium,
estimated_impact: 2.0,
});
}
if performance.memory_usage_mb > 50.0 {
recommendations.push(StreamingRecommendation {
category: RecommendationCategory::MemoryOptimization,
message: "Enable intelligent compression to reduce memory usage".to_string(),
priority: RecommendationPriority::High,
estimated_impact: 0.5,
});
}
if stats.count > self.config.default_windowsize * 2 {
recommendations.push(StreamingRecommendation {
category: RecommendationCategory::WindowingStrategy,
message: "Consider adaptive windowing for better performance".to_string(),
priority: RecommendationPriority::Low,
estimated_impact: 1.2,
});
}
Ok(recommendations)
}
}
impl<F> ChangePointDetector<F>
where
F: Float + NumCast + Copy + std::fmt::Display,
{
fn new() -> Self {
Self {
algorithm: ChangePointAlgorithm::CUSUM {
drift: 0.5,
threshold: 5.0,
},
windowdata: VecDeque::new(),
threshold: 0.05,
last_detection: None,
_phantom: PhantomData,
}
}
fn detect(&mut self, value: F) -> StatsResult<Option<Instant>> {
self.windowdata.push_back(value);
match &self.algorithm {
ChangePointAlgorithm::CUSUM {
drift: _,
threshold,
} => {
if self.windowdata.len() >= 10 {
let mean = self.calculate_mean()?;
let diff = value.to_f64().expect("Operation failed") - mean;
if diff.abs() > *threshold {
self.last_detection = Some(Instant::now());
return Ok(Some(Instant::now()));
}
}
}
_ => {
}
}
Ok(None)
}
fn calculate_mean(&self) -> StatsResult<f64> {
if self.windowdata.is_empty() {
return Ok(0.0);
}
let sum: f64 = self
.windowdata
.iter()
.map(|&x| x.to_f64().unwrap_or(0.0))
.sum();
Ok(sum / self.windowdata.len() as f64)
}
}
impl<F> AnomalyDetector<F>
where
F: Float + NumCast + Copy + std::fmt::Display,
{
fn new() -> Self {
let baseline = StreamingStatistics {
count: 0,
mean: F::zero(),
variance: F::zero(),
std_dev: F::zero(),
min: F::infinity(),
max: F::neg_infinity(),
skewness: F::zero(),
kurtosis: F::zero(),
last_update: Instant::now(),
throughput: 0.0,
memory_usage: 0,
change_points: Vec::new(),
anomalies: Vec::new(),
};
Self {
algorithm: AnomalyDetectionAlgorithm::ZScore { threshold: 3.0 },
baseline_statistics: baseline,
detection_threshold: 0.05,
anomaly_history: VecDeque::new(),
_phantom: PhantomData,
}
}
fn detect(&mut self, value: F) -> StatsResult<Option<AnomalyType>> {
match &self.algorithm {
AnomalyDetectionAlgorithm::ZScore { threshold } => {
if self.baseline_statistics.count > 10 {
let z_score = self.calculate_z_score(value)?;
if z_score.abs() > *threshold {
return Ok(Some(AnomalyType::PointAnomaly));
}
}
}
_ => {
}
}
Ok(None)
}
fn calculate_z_score(&self, value: F) -> StatsResult<f64> {
if self.baseline_statistics.std_dev == F::zero() {
return Ok(0.0);
}
let diff = value - self.baseline_statistics.mean;
let z_score = (diff / self.baseline_statistics.std_dev)
.to_f64()
.unwrap_or(0.0);
Ok(z_score)
}
}
impl<F> CompressionEngine<F>
where
F: Float + NumCast + Copy + std::fmt::Display,
{
fn new() -> Self {
Self {
algorithm: CompressionAlgorithm::PAA { segments: 10 },
compression_ratio: 0.7,
historicaldata: VecDeque::new(),
metadata: CompressionMetadata {
originalsize: 0,
compressedsize: 0,
compression_ratio: 1.0,
reconstruction_accuracy: 1.0,
algorithm_used: "PAA".to_string(),
},
_phantom: PhantomData,
}
}
fn compressdata_point(&mut self, value: F, timestamp: Instant) -> StatsResult<()> {
match &self.algorithm {
CompressionAlgorithm::PAA { segments: _ } => {
let compressed = CompressedDataPoint {
timestamp,
compressed_value: vec![value], compression_metadata: "PAA compression".to_string(),
reconstruction_error: F::zero(),
};
self.historicaldata.push_back(compressed);
}
_ => {
}
}
Ok(())
}
}
#[allow(dead_code)]
pub fn create_advanced_streaming_processor<F>() -> AdvancedAdvancedStreamingProcessor<F>
where
F: Float
+ NumCast
+ SimdUnifiedOps
+ Zero
+ One
+ PartialOrd
+ Copy
+ Send
+ Sync
+ 'static
+ std::fmt::Display,
{
AdvancedAdvancedStreamingProcessor::new(AdvancedStreamingConfig::default())
}
#[allow(dead_code)]
pub fn create_streaming_processor_with_config<F>(
config: AdvancedStreamingConfig,
) -> AdvancedAdvancedStreamingProcessor<F>
where
F: Float
+ NumCast
+ SimdUnifiedOps
+ Zero
+ One
+ PartialOrd
+ Copy
+ Send
+ Sync
+ 'static
+ std::fmt::Display,
{
AdvancedAdvancedStreamingProcessor::new(config)
}
#[cfg(test)]
mod tests {
use super::*;
use scirs2_core::ndarray::array;
#[test]
fn test_streaming_processor_creation() {
let processor = create_advanced_streaming_processor::<f64>();
let config = &processor.config;
assert_eq!(config.default_windowsize, 1000);
assert!(config.adaptive_windowing);
}
#[test]
fn test_singledata_point_processing() {
let mut processor = create_advanced_streaming_processor::<f64>();
let result = processor.processdata_point(5.0);
assert!(result.is_ok());
let stats = processor.statistics.read().expect("Operation failed");
assert_eq!(stats.count, 1);
assert_eq!(stats.mean, 5.0);
}
#[test]
fn test_batch_processing() {
let mut processor = create_advanced_streaming_processor::<f64>();
let data = array![1.0, 2.0, 3.0, 4.0, 5.0];
let result = processor.process_batch(&data.view());
assert!(result.is_ok());
let stats = processor.statistics.read().expect("Operation failed");
assert_eq!(stats.count, 5);
assert_eq!(stats.mean, 3.0);
}
#[test]
fn test_analytics_results() {
let mut processor = create_advanced_streaming_processor::<f64>();
let data = array![1.0, 2.0, 3.0, 4.0, 5.0, 100.0]; let _ = processor.process_batch(&data.view());
let results = processor.get_analytics_results().expect("Operation failed");
assert!(results.performance_metrics.throughput_samples_per_sec > 0.0);
}
#[test]
fn test_change_point_detector() {
let mut detector = ChangePointDetector::<f64>::new();
for i in 1..=20 {
let _ = detector.detect(i as f64);
}
let result = detector.detect(100.0);
assert!(result.is_ok());
}
#[test]
fn test_anomaly_detector() {
let mut detector = AnomalyDetector::<f64>::new();
for i in 1..=20 {
let _ = detector.detect(i as f64);
}
let result = detector.detect(1000.0); assert!(result.is_ok());
}
#[test]
fn test_compression_engine() {
let mut engine = CompressionEngine::<f64>::new();
let timestamp = Instant::now();
let result = engine.compressdata_point(42.0, timestamp);
assert!(result.is_ok());
assert_eq!(engine.historicaldata.len(), 1);
}
#[test]
fn test_windowing_strategies() {
let config = AdvancedStreamingConfig::default();
let processor = AdvancedAdvancedStreamingProcessor::<f64>::new(config);
let mut buffer = VecDeque::new();
for i in 0..2000 {
buffer.push_back((Instant::now(), i as f64));
}
let result = processor.apply_windowing_strategy(&mut buffer);
assert!(result.is_ok());
assert!(buffer.len() <= 1000); }
}