kizzasi_io/
adaptive.rs

1//! Adaptive buffering and rate control for streams
2//!
3//! This module provides sophisticated buffering strategies that dynamically
4//! adapt to network conditions, latency, and throughput requirements.
5
6use crate::error::{IoError, IoResult};
7use std::collections::VecDeque;
8use std::time::{Duration, Instant};
9
10/// Strategy for adapting buffer size
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum AdaptiveStrategy {
13    /// Minimize latency (smaller buffers)
14    LowLatency,
15    /// Maximize throughput (larger buffers)
16    HighThroughput,
17    /// Balance latency and throughput
18    Balanced,
19    /// Custom thresholds
20    Custom,
21}
22
23/// Configuration for adaptive buffer
24#[derive(Debug, Clone)]
25pub struct AdaptiveConfig {
26    /// Initial buffer size
27    pub initial_size: usize,
28    /// Minimum buffer size
29    pub min_size: usize,
30    /// Maximum buffer size
31    pub max_size: usize,
32    /// Adaptation strategy
33    pub strategy: AdaptiveStrategy,
34    /// Target latency (for low latency mode)
35    pub target_latency: Duration,
36    /// Underrun threshold (0.0 to 1.0)
37    pub underrun_threshold: f32,
38    /// Overrun threshold (0.0 to 1.0)
39    pub overrun_threshold: f32,
40    /// Adjustment step size (samples)
41    pub adjustment_step: usize,
42    /// Measurement window for statistics
43    pub measurement_window: Duration,
44}
45
46impl Default for AdaptiveConfig {
47    fn default() -> Self {
48        Self {
49            initial_size: 4096,
50            min_size: 1024,
51            max_size: 16384,
52            strategy: AdaptiveStrategy::Balanced,
53            target_latency: Duration::from_millis(50),
54            underrun_threshold: 0.2,
55            overrun_threshold: 0.8,
56            adjustment_step: 512,
57            measurement_window: Duration::from_secs(1),
58        }
59    }
60}
61
62impl AdaptiveConfig {
63    /// Create low latency configuration
64    pub fn low_latency() -> Self {
65        Self {
66            initial_size: 2048,
67            min_size: 512,
68            max_size: 8192,
69            strategy: AdaptiveStrategy::LowLatency,
70            target_latency: Duration::from_millis(20),
71            underrun_threshold: 0.3,
72            overrun_threshold: 0.7,
73            adjustment_step: 256,
74            ..Default::default()
75        }
76    }
77
78    /// Create high throughput configuration
79    pub fn high_throughput() -> Self {
80        Self {
81            initial_size: 8192,
82            min_size: 4096,
83            max_size: 32768,
84            strategy: AdaptiveStrategy::HighThroughput,
85            target_latency: Duration::from_millis(100),
86            underrun_threshold: 0.1,
87            overrun_threshold: 0.9,
88            adjustment_step: 1024,
89            ..Default::default()
90        }
91    }
92}
93
94/// Adaptive buffer that adjusts size based on usage patterns
95pub struct AdaptiveBuffer<T> {
96    buffer: VecDeque<T>,
97    config: AdaptiveConfig,
98    current_capacity: usize,
99    stats: BufferStats,
100}
101
102/// Statistics for buffer performance
103#[derive(Debug, Clone)]
104struct BufferStats {
105    total_writes: u64,
106    total_reads: u64,
107    underruns: u64,
108    overruns: u64,
109    last_adjustment: Instant,
110    fill_levels: VecDeque<f32>,
111    latencies: VecDeque<Duration>,
112}
113
114impl BufferStats {
115    fn new() -> Self {
116        Self {
117            total_writes: 0,
118            total_reads: 0,
119            underruns: 0,
120            overruns: 0,
121            last_adjustment: Instant::now(),
122            fill_levels: VecDeque::with_capacity(100),
123            latencies: VecDeque::with_capacity(100),
124        }
125    }
126
127    fn record_fill_level(&mut self, level: f32) {
128        self.fill_levels.push_back(level);
129        if self.fill_levels.len() > 100 {
130            self.fill_levels.pop_front();
131        }
132    }
133
134    fn average_fill_level(&self) -> f32 {
135        if self.fill_levels.is_empty() {
136            return 0.0;
137        }
138        self.fill_levels.iter().sum::<f32>() / self.fill_levels.len() as f32
139    }
140
141    #[allow(dead_code)]
142    fn record_latency(&mut self, latency: Duration) {
143        self.latencies.push_back(latency);
144        if self.latencies.len() > 100 {
145            self.latencies.pop_front();
146        }
147    }
148
149    fn average_latency(&self) -> Duration {
150        if self.latencies.is_empty() {
151            return Duration::from_secs(0);
152        }
153        let total: Duration = self.latencies.iter().sum();
154        total / self.latencies.len() as u32
155    }
156}
157
158impl<T> AdaptiveBuffer<T> {
159    /// Create new adaptive buffer with configuration
160    pub fn new(config: AdaptiveConfig) -> Self {
161        let capacity = config.initial_size;
162        Self {
163            buffer: VecDeque::with_capacity(capacity),
164            config,
165            current_capacity: capacity,
166            stats: BufferStats::new(),
167        }
168    }
169
170    /// Create with default configuration
171    pub fn with_capacity(capacity: usize) -> Self {
172        let config = AdaptiveConfig {
173            initial_size: capacity,
174            ..Default::default()
175        };
176        Self::new(config)
177    }
178
179    /// Push item to buffer
180    pub fn push(&mut self, item: T) -> IoResult<()> {
181        let fill_level = self.fill_level();
182        self.stats.record_fill_level(fill_level);
183        self.stats.total_writes += 1;
184
185        if self.buffer.len() >= self.current_capacity {
186            self.stats.overruns += 1;
187            if fill_level > self.config.overrun_threshold {
188                self.try_grow()?;
189            }
190            return Err(IoError::BufferFull);
191        }
192
193        self.buffer.push_back(item);
194        Ok(())
195    }
196
197    /// Pop item from buffer
198    pub fn pop(&mut self) -> IoResult<T> {
199        let fill_level = self.fill_level();
200        self.stats.record_fill_level(fill_level);
201        self.stats.total_reads += 1;
202
203        match self.buffer.pop_front() {
204            Some(item) => {
205                if fill_level < self.config.underrun_threshold {
206                    self.try_shrink();
207                }
208                Ok(item)
209            }
210            None => {
211                self.stats.underruns += 1;
212                Err(IoError::BufferEmpty)
213            }
214        }
215    }
216
217    /// Get current fill level (0.0 to 1.0)
218    pub fn fill_level(&self) -> f32 {
219        if self.current_capacity == 0 {
220            return 0.0;
221        }
222        self.buffer.len() as f32 / self.current_capacity as f32
223    }
224
225    /// Get current buffer size
226    pub fn len(&self) -> usize {
227        self.buffer.len()
228    }
229
230    /// Check if buffer is empty
231    pub fn is_empty(&self) -> bool {
232        self.buffer.is_empty()
233    }
234
235    /// Get current capacity
236    pub fn capacity(&self) -> usize {
237        self.current_capacity
238    }
239
240    /// Get buffer statistics
241    pub fn stats(&self) -> AdaptiveBufferStats {
242        AdaptiveBufferStats {
243            total_writes: self.stats.total_writes,
244            total_reads: self.stats.total_reads,
245            underruns: self.stats.underruns,
246            overruns: self.stats.overruns,
247            current_size: self.buffer.len(),
248            current_capacity: self.current_capacity,
249            average_fill_level: self.stats.average_fill_level(),
250            average_latency: self.stats.average_latency(),
251        }
252    }
253
254    /// Try to grow buffer capacity
255    fn try_grow(&mut self) -> IoResult<()> {
256        let elapsed = self.stats.last_adjustment.elapsed();
257        if elapsed < self.config.measurement_window {
258            return Ok(());
259        }
260
261        let new_capacity =
262            (self.current_capacity + self.config.adjustment_step).min(self.config.max_size);
263
264        if new_capacity > self.current_capacity {
265            self.current_capacity = new_capacity;
266            self.buffer.reserve(self.config.adjustment_step);
267            self.stats.last_adjustment = Instant::now();
268            tracing::debug!(
269                "Grew adaptive buffer to {} (from {})",
270                new_capacity,
271                self.current_capacity
272            );
273        }
274
275        Ok(())
276    }
277
278    /// Try to shrink buffer capacity
279    fn try_shrink(&mut self) {
280        let elapsed = self.stats.last_adjustment.elapsed();
281        if elapsed < self.config.measurement_window {
282            return;
283        }
284
285        let new_capacity = (self
286            .current_capacity
287            .saturating_sub(self.config.adjustment_step))
288        .max(self.config.min_size);
289
290        if new_capacity < self.current_capacity {
291            self.current_capacity = new_capacity;
292            self.stats.last_adjustment = Instant::now();
293            tracing::debug!(
294                "Shrunk adaptive buffer to {} (from {})",
295                new_capacity,
296                self.current_capacity
297            );
298        }
299    }
300
301    /// Clear buffer
302    pub fn clear(&mut self) {
303        self.buffer.clear();
304    }
305}
306
307/// Statistics about adaptive buffer performance
308#[derive(Debug, Clone)]
309pub struct AdaptiveBufferStats {
310    pub total_writes: u64,
311    pub total_reads: u64,
312    pub underruns: u64,
313    pub overruns: u64,
314    pub current_size: usize,
315    pub current_capacity: usize,
316    pub average_fill_level: f32,
317    pub average_latency: Duration,
318}
319
320/// Rate limiter for controlling data flow
321pub struct RateLimiter {
322    /// Target rate (samples per second)
323    target_rate: f32,
324    /// Current accumulated tokens
325    tokens: f32,
326    /// Maximum burst size
327    max_burst: f32,
328    /// Last update time
329    last_update: Instant,
330}
331
332impl RateLimiter {
333    /// Create new rate limiter
334    pub fn new(samples_per_second: f32) -> Self {
335        Self {
336            target_rate: samples_per_second,
337            tokens: 0.0,
338            max_burst: samples_per_second,
339            last_update: Instant::now(),
340        }
341    }
342
343    /// Create with custom burst size
344    pub fn with_burst(samples_per_second: f32, max_burst: f32) -> Self {
345        Self {
346            target_rate: samples_per_second,
347            tokens: 0.0,
348            max_burst,
349            last_update: Instant::now(),
350        }
351    }
352
353    /// Try to consume tokens for n samples
354    /// Returns true if allowed, false if rate limited
355    pub fn try_consume(&mut self, n_samples: usize) -> bool {
356        self.update_tokens();
357
358        if self.tokens >= n_samples as f32 {
359            self.tokens -= n_samples as f32;
360            true
361        } else {
362            false
363        }
364    }
365
366    /// Wait until tokens are available (async)
367    pub async fn consume(&mut self, n_samples: usize) {
368        while !self.try_consume(n_samples) {
369            let deficit = n_samples as f32 - self.tokens;
370            let wait_time = Duration::from_secs_f32(deficit / self.target_rate);
371            tokio::time::sleep(wait_time).await;
372        }
373    }
374
375    /// Update token bucket based on elapsed time
376    fn update_tokens(&mut self) {
377        let now = Instant::now();
378        let elapsed = now.duration_since(self.last_update).as_secs_f32();
379        self.last_update = now;
380
381        self.tokens = (self.tokens + elapsed * self.target_rate).min(self.max_burst);
382    }
383
384    /// Get current token count
385    pub fn available_tokens(&mut self) -> f32 {
386        self.update_tokens();
387        self.tokens
388    }
389
390    /// Set new target rate
391    pub fn set_rate(&mut self, samples_per_second: f32) {
392        self.target_rate = samples_per_second;
393    }
394}
395
396/// Adaptive rate controller that adjusts based on buffer levels
397pub struct AdaptiveRateController {
398    rate_limiter: RateLimiter,
399    target_fill_level: f32,
400    min_rate: f32,
401    max_rate: f32,
402    adjustment_factor: f32,
403}
404
405impl AdaptiveRateController {
406    /// Create new adaptive rate controller
407    pub fn new(initial_rate: f32, min_rate: f32, max_rate: f32) -> Self {
408        Self {
409            rate_limiter: RateLimiter::new(initial_rate),
410            target_fill_level: 0.5,
411            min_rate,
412            max_rate,
413            adjustment_factor: 0.1,
414        }
415    }
416
417    /// Adjust rate based on buffer fill level
418    pub fn adjust_rate(&mut self, fill_level: f32) {
419        let error = fill_level - self.target_fill_level;
420        let current_rate = self.rate_limiter.target_rate;
421
422        // If buffer is too full, increase rate (send faster)
423        // If buffer is too empty, decrease rate (send slower)
424        let adjustment = -error * self.adjustment_factor * current_rate;
425        let new_rate = (current_rate + adjustment).clamp(self.min_rate, self.max_rate);
426
427        self.rate_limiter.set_rate(new_rate);
428    }
429
430    /// Try to consume tokens
431    pub fn try_consume(&mut self, n_samples: usize) -> bool {
432        self.rate_limiter.try_consume(n_samples)
433    }
434
435    /// Wait and consume tokens
436    pub async fn consume(&mut self, n_samples: usize) {
437        self.rate_limiter.consume(n_samples).await;
438    }
439
440    /// Get current rate
441    pub fn current_rate(&self) -> f32 {
442        self.rate_limiter.target_rate
443    }
444}
445
446#[cfg(test)]
447mod tests {
448    use super::*;
449
450    #[test]
451    fn test_adaptive_buffer_basic() {
452        let mut buffer = AdaptiveBuffer::with_capacity(10);
453
454        // Push items
455        for i in 0..5 {
456            buffer.push(i).unwrap();
457        }
458
459        assert_eq!(buffer.len(), 5);
460        assert!(buffer.fill_level() > 0.0);
461
462        // Pop items
463        for i in 0..5 {
464            assert_eq!(buffer.pop().unwrap(), i);
465        }
466
467        assert!(buffer.is_empty());
468    }
469
470    #[test]
471    fn test_rate_limiter() {
472        let mut limiter = RateLimiter::new(1000.0); // 1000 samples/sec
473
474        // Should not be able to consume 2000 samples immediately
475        assert!(!limiter.try_consume(2000));
476
477        // Should be able to consume 500 samples
478        std::thread::sleep(Duration::from_millis(500));
479        assert!(limiter.try_consume(500));
480    }
481
482    #[test]
483    fn test_adaptive_config() {
484        let config = AdaptiveConfig::low_latency();
485        assert_eq!(config.strategy, AdaptiveStrategy::LowLatency);
486        assert!(config.min_size < config.initial_size);
487        assert!(config.initial_size < config.max_size);
488    }
489
490    #[tokio::test]
491    async fn test_rate_limiter_async() {
492        let mut limiter = RateLimiter::new(1000.0); // 1000 samples/sec
493
494        let start = Instant::now();
495        limiter.consume(100).await; // 100 samples at 1000/sec = 100ms max
496        let elapsed = start.elapsed();
497
498        // Should complete within reasonable time
499        // Allow some tolerance for test environment variability
500        assert!(elapsed < Duration::from_millis(500));
501    }
502}