1use crate::error::{IoError, IoResult};
7use std::collections::VecDeque;
8use std::time::{Duration, Instant};
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum AdaptiveStrategy {
13 LowLatency,
15 HighThroughput,
17 Balanced,
19 Custom,
21}
22
23#[derive(Debug, Clone)]
25pub struct AdaptiveConfig {
26 pub initial_size: usize,
28 pub min_size: usize,
30 pub max_size: usize,
32 pub strategy: AdaptiveStrategy,
34 pub target_latency: Duration,
36 pub underrun_threshold: f32,
38 pub overrun_threshold: f32,
40 pub adjustment_step: usize,
42 pub measurement_window: Duration,
44}
45
46impl Default for AdaptiveConfig {
47 fn default() -> Self {
48 Self {
49 initial_size: 4096,
50 min_size: 1024,
51 max_size: 16384,
52 strategy: AdaptiveStrategy::Balanced,
53 target_latency: Duration::from_millis(50),
54 underrun_threshold: 0.2,
55 overrun_threshold: 0.8,
56 adjustment_step: 512,
57 measurement_window: Duration::from_secs(1),
58 }
59 }
60}
61
62impl AdaptiveConfig {
63 pub fn low_latency() -> Self {
65 Self {
66 initial_size: 2048,
67 min_size: 512,
68 max_size: 8192,
69 strategy: AdaptiveStrategy::LowLatency,
70 target_latency: Duration::from_millis(20),
71 underrun_threshold: 0.3,
72 overrun_threshold: 0.7,
73 adjustment_step: 256,
74 ..Default::default()
75 }
76 }
77
78 pub fn high_throughput() -> Self {
80 Self {
81 initial_size: 8192,
82 min_size: 4096,
83 max_size: 32768,
84 strategy: AdaptiveStrategy::HighThroughput,
85 target_latency: Duration::from_millis(100),
86 underrun_threshold: 0.1,
87 overrun_threshold: 0.9,
88 adjustment_step: 1024,
89 ..Default::default()
90 }
91 }
92}
93
94pub struct AdaptiveBuffer<T> {
96 buffer: VecDeque<T>,
97 config: AdaptiveConfig,
98 current_capacity: usize,
99 stats: BufferStats,
100}
101
102#[derive(Debug, Clone)]
104struct BufferStats {
105 total_writes: u64,
106 total_reads: u64,
107 underruns: u64,
108 overruns: u64,
109 last_adjustment: Instant,
110 fill_levels: VecDeque<f32>,
111 latencies: VecDeque<Duration>,
112}
113
114impl BufferStats {
115 fn new() -> Self {
116 Self {
117 total_writes: 0,
118 total_reads: 0,
119 underruns: 0,
120 overruns: 0,
121 last_adjustment: Instant::now(),
122 fill_levels: VecDeque::with_capacity(100),
123 latencies: VecDeque::with_capacity(100),
124 }
125 }
126
127 fn record_fill_level(&mut self, level: f32) {
128 self.fill_levels.push_back(level);
129 if self.fill_levels.len() > 100 {
130 self.fill_levels.pop_front();
131 }
132 }
133
134 fn average_fill_level(&self) -> f32 {
135 if self.fill_levels.is_empty() {
136 return 0.0;
137 }
138 self.fill_levels.iter().sum::<f32>() / self.fill_levels.len() as f32
139 }
140
141 #[allow(dead_code)]
142 fn record_latency(&mut self, latency: Duration) {
143 self.latencies.push_back(latency);
144 if self.latencies.len() > 100 {
145 self.latencies.pop_front();
146 }
147 }
148
149 fn average_latency(&self) -> Duration {
150 if self.latencies.is_empty() {
151 return Duration::from_secs(0);
152 }
153 let total: Duration = self.latencies.iter().sum();
154 total / self.latencies.len() as u32
155 }
156}
157
158impl<T> AdaptiveBuffer<T> {
159 pub fn new(config: AdaptiveConfig) -> Self {
161 let capacity = config.initial_size;
162 Self {
163 buffer: VecDeque::with_capacity(capacity),
164 config,
165 current_capacity: capacity,
166 stats: BufferStats::new(),
167 }
168 }
169
170 pub fn with_capacity(capacity: usize) -> Self {
172 let config = AdaptiveConfig {
173 initial_size: capacity,
174 ..Default::default()
175 };
176 Self::new(config)
177 }
178
179 pub fn push(&mut self, item: T) -> IoResult<()> {
181 let fill_level = self.fill_level();
182 self.stats.record_fill_level(fill_level);
183 self.stats.total_writes += 1;
184
185 if self.buffer.len() >= self.current_capacity {
186 self.stats.overruns += 1;
187 if fill_level > self.config.overrun_threshold {
188 self.try_grow()?;
189 }
190 return Err(IoError::BufferFull);
191 }
192
193 self.buffer.push_back(item);
194 Ok(())
195 }
196
197 pub fn pop(&mut self) -> IoResult<T> {
199 let fill_level = self.fill_level();
200 self.stats.record_fill_level(fill_level);
201 self.stats.total_reads += 1;
202
203 match self.buffer.pop_front() {
204 Some(item) => {
205 if fill_level < self.config.underrun_threshold {
206 self.try_shrink();
207 }
208 Ok(item)
209 }
210 None => {
211 self.stats.underruns += 1;
212 Err(IoError::BufferEmpty)
213 }
214 }
215 }
216
217 pub fn fill_level(&self) -> f32 {
219 if self.current_capacity == 0 {
220 return 0.0;
221 }
222 self.buffer.len() as f32 / self.current_capacity as f32
223 }
224
225 pub fn len(&self) -> usize {
227 self.buffer.len()
228 }
229
230 pub fn is_empty(&self) -> bool {
232 self.buffer.is_empty()
233 }
234
235 pub fn capacity(&self) -> usize {
237 self.current_capacity
238 }
239
240 pub fn stats(&self) -> AdaptiveBufferStats {
242 AdaptiveBufferStats {
243 total_writes: self.stats.total_writes,
244 total_reads: self.stats.total_reads,
245 underruns: self.stats.underruns,
246 overruns: self.stats.overruns,
247 current_size: self.buffer.len(),
248 current_capacity: self.current_capacity,
249 average_fill_level: self.stats.average_fill_level(),
250 average_latency: self.stats.average_latency(),
251 }
252 }
253
254 fn try_grow(&mut self) -> IoResult<()> {
256 let elapsed = self.stats.last_adjustment.elapsed();
257 if elapsed < self.config.measurement_window {
258 return Ok(());
259 }
260
261 let new_capacity =
262 (self.current_capacity + self.config.adjustment_step).min(self.config.max_size);
263
264 if new_capacity > self.current_capacity {
265 self.current_capacity = new_capacity;
266 self.buffer.reserve(self.config.adjustment_step);
267 self.stats.last_adjustment = Instant::now();
268 tracing::debug!(
269 "Grew adaptive buffer to {} (from {})",
270 new_capacity,
271 self.current_capacity
272 );
273 }
274
275 Ok(())
276 }
277
278 fn try_shrink(&mut self) {
280 let elapsed = self.stats.last_adjustment.elapsed();
281 if elapsed < self.config.measurement_window {
282 return;
283 }
284
285 let new_capacity = (self
286 .current_capacity
287 .saturating_sub(self.config.adjustment_step))
288 .max(self.config.min_size);
289
290 if new_capacity < self.current_capacity {
291 self.current_capacity = new_capacity;
292 self.stats.last_adjustment = Instant::now();
293 tracing::debug!(
294 "Shrunk adaptive buffer to {} (from {})",
295 new_capacity,
296 self.current_capacity
297 );
298 }
299 }
300
301 pub fn clear(&mut self) {
303 self.buffer.clear();
304 }
305}
306
307#[derive(Debug, Clone)]
309pub struct AdaptiveBufferStats {
310 pub total_writes: u64,
311 pub total_reads: u64,
312 pub underruns: u64,
313 pub overruns: u64,
314 pub current_size: usize,
315 pub current_capacity: usize,
316 pub average_fill_level: f32,
317 pub average_latency: Duration,
318}
319
320pub struct RateLimiter {
322 target_rate: f32,
324 tokens: f32,
326 max_burst: f32,
328 last_update: Instant,
330}
331
332impl RateLimiter {
333 pub fn new(samples_per_second: f32) -> Self {
335 Self {
336 target_rate: samples_per_second,
337 tokens: 0.0,
338 max_burst: samples_per_second,
339 last_update: Instant::now(),
340 }
341 }
342
343 pub fn with_burst(samples_per_second: f32, max_burst: f32) -> Self {
345 Self {
346 target_rate: samples_per_second,
347 tokens: 0.0,
348 max_burst,
349 last_update: Instant::now(),
350 }
351 }
352
353 pub fn try_consume(&mut self, n_samples: usize) -> bool {
356 self.update_tokens();
357
358 if self.tokens >= n_samples as f32 {
359 self.tokens -= n_samples as f32;
360 true
361 } else {
362 false
363 }
364 }
365
366 pub async fn consume(&mut self, n_samples: usize) {
368 while !self.try_consume(n_samples) {
369 let deficit = n_samples as f32 - self.tokens;
370 let wait_time = Duration::from_secs_f32(deficit / self.target_rate);
371 tokio::time::sleep(wait_time).await;
372 }
373 }
374
375 fn update_tokens(&mut self) {
377 let now = Instant::now();
378 let elapsed = now.duration_since(self.last_update).as_secs_f32();
379 self.last_update = now;
380
381 self.tokens = (self.tokens + elapsed * self.target_rate).min(self.max_burst);
382 }
383
384 pub fn available_tokens(&mut self) -> f32 {
386 self.update_tokens();
387 self.tokens
388 }
389
390 pub fn set_rate(&mut self, samples_per_second: f32) {
392 self.target_rate = samples_per_second;
393 }
394}
395
396pub struct AdaptiveRateController {
398 rate_limiter: RateLimiter,
399 target_fill_level: f32,
400 min_rate: f32,
401 max_rate: f32,
402 adjustment_factor: f32,
403}
404
405impl AdaptiveRateController {
406 pub fn new(initial_rate: f32, min_rate: f32, max_rate: f32) -> Self {
408 Self {
409 rate_limiter: RateLimiter::new(initial_rate),
410 target_fill_level: 0.5,
411 min_rate,
412 max_rate,
413 adjustment_factor: 0.1,
414 }
415 }
416
417 pub fn adjust_rate(&mut self, fill_level: f32) {
419 let error = fill_level - self.target_fill_level;
420 let current_rate = self.rate_limiter.target_rate;
421
422 let adjustment = -error * self.adjustment_factor * current_rate;
425 let new_rate = (current_rate + adjustment).clamp(self.min_rate, self.max_rate);
426
427 self.rate_limiter.set_rate(new_rate);
428 }
429
430 pub fn try_consume(&mut self, n_samples: usize) -> bool {
432 self.rate_limiter.try_consume(n_samples)
433 }
434
435 pub async fn consume(&mut self, n_samples: usize) {
437 self.rate_limiter.consume(n_samples).await;
438 }
439
440 pub fn current_rate(&self) -> f32 {
442 self.rate_limiter.target_rate
443 }
444}
445
446#[cfg(test)]
447mod tests {
448 use super::*;
449
450 #[test]
451 fn test_adaptive_buffer_basic() {
452 let mut buffer = AdaptiveBuffer::with_capacity(10);
453
454 for i in 0..5 {
456 buffer.push(i).unwrap();
457 }
458
459 assert_eq!(buffer.len(), 5);
460 assert!(buffer.fill_level() > 0.0);
461
462 for i in 0..5 {
464 assert_eq!(buffer.pop().unwrap(), i);
465 }
466
467 assert!(buffer.is_empty());
468 }
469
470 #[test]
471 fn test_rate_limiter() {
472 let mut limiter = RateLimiter::new(1000.0); assert!(!limiter.try_consume(2000));
476
477 std::thread::sleep(Duration::from_millis(500));
479 assert!(limiter.try_consume(500));
480 }
481
482 #[test]
483 fn test_adaptive_config() {
484 let config = AdaptiveConfig::low_latency();
485 assert_eq!(config.strategy, AdaptiveStrategy::LowLatency);
486 assert!(config.min_size < config.initial_size);
487 assert!(config.initial_size < config.max_size);
488 }
489
490 #[tokio::test]
491 async fn test_rate_limiter_async() {
492 let mut limiter = RateLimiter::new(1000.0); let start = Instant::now();
495 limiter.consume(100).await; let elapsed = start.elapsed();
497
498 assert!(elapsed < Duration::from_millis(500));
501 }
502}