Skip to main content

oximedia_proxy/
proxy_bandwidth.rs

1#![allow(dead_code)]
2//! Bandwidth estimation and throttling for proxy file transfers.
3//!
4//! Provides tools to measure available bandwidth, estimate transfer times,
5//! and apply rate limiting to proxy file transfers to avoid saturating
6//! network links during production workflows.
7
8use std::collections::VecDeque;
9use std::time::Duration;
10
11/// Units for bandwidth measurement.
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum BandwidthUnit {
14    /// Bits per second.
15    Bps,
16    /// Kilobits per second.
17    Kbps,
18    /// Megabits per second.
19    Mbps,
20    /// Gigabits per second.
21    Gbps,
22}
23
24impl BandwidthUnit {
25    /// Convert a value in this unit to bits per second.
26    #[allow(clippy::cast_precision_loss)]
27    pub fn to_bps(&self, value: f64) -> f64 {
28        match self {
29            Self::Bps => value,
30            Self::Kbps => value * 1_000.0,
31            Self::Mbps => value * 1_000_000.0,
32            Self::Gbps => value * 1_000_000_000.0,
33        }
34    }
35
36    /// Convert bits per second to this unit.
37    #[allow(clippy::cast_precision_loss)]
38    pub fn from_bps(&self, bps: f64) -> f64 {
39        match self {
40            Self::Bps => bps,
41            Self::Kbps => bps / 1_000.0,
42            Self::Mbps => bps / 1_000_000.0,
43            Self::Gbps => bps / 1_000_000_000.0,
44        }
45    }
46}
47
48/// A single bandwidth measurement sample.
49#[derive(Debug, Clone, Copy)]
50pub struct BandwidthSample {
51    /// Bytes transferred in this sample.
52    pub bytes: u64,
53    /// Duration of the sample.
54    pub duration_ms: u64,
55}
56
57impl BandwidthSample {
58    /// Create a new sample.
59    pub fn new(bytes: u64, duration_ms: u64) -> Self {
60        Self { bytes, duration_ms }
61    }
62
63    /// Compute bits per second for this sample.
64    #[allow(clippy::cast_precision_loss)]
65    pub fn bps(&self) -> f64 {
66        if self.duration_ms == 0 {
67            return 0.0;
68        }
69        (self.bytes as f64 * 8.0 * 1000.0) / self.duration_ms as f64
70    }
71}
72
73/// Rolling bandwidth estimator based on recent samples.
74pub struct BandwidthEstimator {
75    /// Recent samples.
76    samples: VecDeque<BandwidthSample>,
77    /// Maximum number of samples to keep.
78    max_samples: usize,
79}
80
81impl BandwidthEstimator {
82    /// Create a new estimator with a given window size.
83    pub fn new(max_samples: usize) -> Self {
84        Self {
85            samples: VecDeque::new(),
86            max_samples: max_samples.max(1),
87        }
88    }
89
90    /// Add a new sample.
91    pub fn add_sample(&mut self, sample: BandwidthSample) {
92        if self.samples.len() >= self.max_samples {
93            self.samples.pop_front();
94        }
95        self.samples.push_back(sample);
96    }
97
98    /// Number of samples currently stored.
99    pub fn sample_count(&self) -> usize {
100        self.samples.len()
101    }
102
103    /// Average bandwidth in bits per second.
104    #[allow(clippy::cast_precision_loss)]
105    pub fn average_bps(&self) -> f64 {
106        if self.samples.is_empty() {
107            return 0.0;
108        }
109        let total_bytes: u64 = self.samples.iter().map(|s| s.bytes).sum();
110        let total_ms: u64 = self.samples.iter().map(|s| s.duration_ms).sum();
111        if total_ms == 0 {
112            return 0.0;
113        }
114        (total_bytes as f64 * 8.0 * 1000.0) / total_ms as f64
115    }
116
117    /// Estimated time to transfer the given number of bytes.
118    #[allow(clippy::cast_precision_loss)]
119    pub fn estimate_transfer_time(&self, bytes: u64) -> Duration {
120        let bps = self.average_bps();
121        if bps <= 0.0 {
122            return Duration::from_secs(u64::MAX);
123        }
124        let bits = bytes as f64 * 8.0;
125        let seconds = bits / bps;
126        Duration::from_secs_f64(seconds)
127    }
128
129    /// Peak bandwidth observed across all samples.
130    pub fn peak_bps(&self) -> f64 {
131        self.samples.iter().map(|s| s.bps()).fold(0.0_f64, f64::max)
132    }
133
134    /// Minimum bandwidth observed.
135    pub fn min_bps(&self) -> f64 {
136        self.samples
137            .iter()
138            .map(|s| s.bps())
139            .fold(f64::MAX, f64::min)
140    }
141
142    /// Clear all samples.
143    pub fn clear(&mut self) {
144        self.samples.clear();
145    }
146}
147
148/// Rate limiter for proxy transfers using a token bucket algorithm.
149pub struct RateLimiter {
150    /// Maximum rate in bytes per second.
151    max_bytes_per_sec: u64,
152    /// Token bucket capacity in bytes.
153    bucket_capacity: u64,
154    /// Current tokens available.
155    tokens: u64,
156    /// Last refill timestamp in milliseconds.
157    last_refill_ms: u64,
158}
159
160impl RateLimiter {
161    /// Create a new rate limiter.
162    pub fn new(max_bytes_per_sec: u64) -> Self {
163        let capacity = max_bytes_per_sec; // 1 second of burst
164        Self {
165            max_bytes_per_sec,
166            bucket_capacity: capacity,
167            tokens: capacity,
168            last_refill_ms: 0,
169        }
170    }
171
172    /// Set the bucket capacity (burst size).
173    pub fn with_burst_size(mut self, bytes: u64) -> Self {
174        self.bucket_capacity = bytes;
175        self.tokens = self.tokens.min(bytes);
176        self
177    }
178
179    /// Refill tokens based on elapsed time.
180    #[allow(clippy::cast_precision_loss)]
181    pub fn refill(&mut self, current_time_ms: u64) {
182        if current_time_ms <= self.last_refill_ms {
183            return;
184        }
185        let elapsed_ms = current_time_ms - self.last_refill_ms;
186        let new_tokens = (self.max_bytes_per_sec as f64 * elapsed_ms as f64 / 1000.0) as u64;
187        self.tokens = (self.tokens + new_tokens).min(self.bucket_capacity);
188        self.last_refill_ms = current_time_ms;
189    }
190
191    /// Try to consume tokens for a transfer of the given size.
192    /// Returns `true` if the transfer is allowed.
193    pub fn try_consume(&mut self, bytes: u64) -> bool {
194        if bytes <= self.tokens {
195            self.tokens -= bytes;
196            true
197        } else {
198            false
199        }
200    }
201
202    /// Current available tokens.
203    pub fn available_tokens(&self) -> u64 {
204        self.tokens
205    }
206
207    /// Maximum rate in bytes per second.
208    pub fn max_rate(&self) -> u64 {
209        self.max_bytes_per_sec
210    }
211}
212
213/// Transfer time estimate for a specific file.
214#[derive(Debug, Clone)]
215pub struct TransferEstimate {
216    /// File size in bytes.
217    pub file_size_bytes: u64,
218    /// Estimated bandwidth in bits per second.
219    pub estimated_bps: f64,
220    /// Estimated transfer duration.
221    pub estimated_duration: Duration,
222    /// Confidence level from 0.0 to 1.0.
223    pub confidence: f64,
224}
225
226impl TransferEstimate {
227    /// Create a new transfer estimate.
228    #[allow(clippy::cast_precision_loss)]
229    pub fn new(file_size_bytes: u64, estimated_bps: f64, sample_count: usize) -> Self {
230        let bits = file_size_bytes as f64 * 8.0;
231        let seconds = if estimated_bps > 0.0 {
232            bits / estimated_bps
233        } else {
234            f64::MAX
235        };
236        let confidence = (sample_count as f64 / 20.0).min(1.0);
237        Self {
238            file_size_bytes,
239            estimated_bps,
240            estimated_duration: Duration::from_secs_f64(seconds.min(u64::MAX as f64)),
241            confidence,
242        }
243    }
244
245    /// Whether the estimate has high confidence (>= 0.8).
246    pub fn is_high_confidence(&self) -> bool {
247        self.confidence >= 0.8
248    }
249}
250
251#[cfg(test)]
252mod tests {
253    use super::*;
254
255    #[test]
256    fn test_bandwidth_unit_to_bps() {
257        assert!((BandwidthUnit::Kbps.to_bps(1.0) - 1000.0).abs() < 1e-9);
258        assert!((BandwidthUnit::Mbps.to_bps(1.0) - 1_000_000.0).abs() < 1e-9);
259        assert!((BandwidthUnit::Gbps.to_bps(1.0) - 1_000_000_000.0).abs() < 1e-9);
260    }
261
262    #[test]
263    fn test_bandwidth_unit_from_bps() {
264        assert!((BandwidthUnit::Mbps.from_bps(1_000_000.0) - 1.0).abs() < 1e-9);
265        assert!((BandwidthUnit::Kbps.from_bps(1_000.0) - 1.0).abs() < 1e-9);
266    }
267
268    #[test]
269    fn test_bandwidth_sample_bps() {
270        let sample = BandwidthSample::new(1000, 1000);
271        // 1000 bytes in 1000ms = 8000 bps
272        assert!((sample.bps() - 8000.0).abs() < 1e-9);
273    }
274
275    #[test]
276    fn test_bandwidth_sample_zero_duration() {
277        let sample = BandwidthSample::new(1000, 0);
278        assert_eq!(sample.bps(), 0.0);
279    }
280
281    #[test]
282    fn test_estimator_empty() {
283        let est = BandwidthEstimator::new(10);
284        assert_eq!(est.average_bps(), 0.0);
285        assert_eq!(est.sample_count(), 0);
286    }
287
288    #[test]
289    fn test_estimator_single_sample() {
290        let mut est = BandwidthEstimator::new(10);
291        est.add_sample(BandwidthSample::new(1_000_000, 1000));
292        // 1MB in 1s = 8Mbps
293        assert!((est.average_bps() - 8_000_000.0).abs() < 1e-3);
294    }
295
296    #[test]
297    fn test_estimator_window_eviction() {
298        let mut est = BandwidthEstimator::new(3);
299        for i in 0..5 {
300            est.add_sample(BandwidthSample::new(100 * (i + 1), 1000));
301        }
302        assert_eq!(est.sample_count(), 3);
303    }
304
305    #[test]
306    fn test_estimate_transfer_time() {
307        let mut est = BandwidthEstimator::new(10);
308        // 1MB/s = 8Mbps
309        est.add_sample(BandwidthSample::new(1_000_000, 1000));
310        let time = est.estimate_transfer_time(10_000_000);
311        // 10MB at 1MB/s = 10 seconds
312        assert!((time.as_secs_f64() - 10.0).abs() < 0.01);
313    }
314
315    #[test]
316    fn test_peak_and_min_bps() {
317        let mut est = BandwidthEstimator::new(10);
318        est.add_sample(BandwidthSample::new(100, 1000)); // 800 bps
319        est.add_sample(BandwidthSample::new(1000, 1000)); // 8000 bps
320        est.add_sample(BandwidthSample::new(500, 1000)); // 4000 bps
321        assert!((est.peak_bps() - 8000.0).abs() < 1e-9);
322        assert!((est.min_bps() - 800.0).abs() < 1e-9);
323    }
324
325    #[test]
326    fn test_rate_limiter_consume() {
327        let mut limiter = RateLimiter::new(1_000_000);
328        assert!(limiter.try_consume(500_000));
329        assert_eq!(limiter.available_tokens(), 500_000);
330        assert!(limiter.try_consume(500_000));
331        assert!(!limiter.try_consume(1));
332    }
333
334    #[test]
335    fn test_rate_limiter_refill() {
336        let mut limiter = RateLimiter::new(1000);
337        limiter.try_consume(1000);
338        assert_eq!(limiter.available_tokens(), 0);
339        limiter.refill(500); // 0.5s -> 500 bytes
340        assert_eq!(limiter.available_tokens(), 500);
341    }
342
343    #[test]
344    fn test_rate_limiter_burst() {
345        let limiter = RateLimiter::new(1000).with_burst_size(500);
346        assert_eq!(limiter.available_tokens(), 500);
347    }
348
349    #[test]
350    fn test_transfer_estimate_confidence() {
351        let est = TransferEstimate::new(1_000_000, 8_000_000.0, 5);
352        assert!(!est.is_high_confidence());
353        let est2 = TransferEstimate::new(1_000_000, 8_000_000.0, 20);
354        assert!(est2.is_high_confidence());
355    }
356
357    #[test]
358    fn test_estimator_clear() {
359        let mut est = BandwidthEstimator::new(10);
360        est.add_sample(BandwidthSample::new(100, 100));
361        est.clear();
362        assert_eq!(est.sample_count(), 0);
363        assert_eq!(est.average_bps(), 0.0);
364    }
365}