running_average/
lib.rs

1//! `RunningAverage` and `RealTimeRunningAverage` types allow to calculate running average with specified time window width using constant memory.
2//! 
3//! The `RunningAverage` type can be used when processing streams of temporal data while `RealTimeRunningAverage` can be used when measured events are happening in real time.
4//! 
5//! For example `RealTimeRunningAverage` can be used to measure download throughput by inserting how many bytes were transferred.
6//! ```
7//! use running_average::RealTimeRunningAverage;
8//! 
9//! // By default use 8 second window with 16 accumulators
10//! let mut tw = RealTimeRunningAverage::default();
11//! 
12//! // Connect and start downloading
13//! // Got 2KB of data
14//! tw.insert(2000);
15//! 
16//! // Waiting for more data
17//! // Got 1KB of data
18//! tw.insert(1000);
19//! 
20//! // Print average transfer for last 8 seconds
21//! println!("{}", tw.measurement());
22//! ```
23
24use std::collections::VecDeque;
25use std::time::{Instant, Duration};
26use std::ops::AddAssign;
27use std::iter::Sum;
28use std::default::Default;
29
30/// Types implementing this trait can be used as Instant type in TimeSource trait and for RunningAverage
31pub trait TimeInstant {
32    /// Returns Duration elapsed since given TimeInstant and Self.
33    fn duration_since(&self, since: Self) -> Duration;
34    /// Forward Self by given Duration into future.
35    fn forward(&mut self, duration: Duration);
36}
37
38/// Types implementing this trait can be used as TimeSource for RealTimeRunningAverage.
39pub trait TimeSource {
40    /// Type implementing TimeInstant for this TimeSource.
41    type Instant: TimeInstant + Copy;
42    /// Return current Instant.
43    fn now(&self) -> Self::Instant;
44}
45
46impl TimeInstant for Instant {
47    fn duration_since(&self, earlier: Self) -> Duration {
48        self.duration_since(earlier)
49    }
50
51    fn forward(&mut self, duration: Duration) {
52        *self += duration;
53    }
54}
55
56/// TimeSource that uses real time clock via `Instant::now()`.
57#[derive(Debug)]
58pub struct RealTimeSource;
59impl TimeSource for RealTimeSource {
60    type Instant = Instant;
61
62    fn now(&self) -> Self::Instant {
63        Instant::now()
64    }
65}
66
67fn dts(duration: Duration) -> f64 {
68    duration.as_secs() as f64 + duration.subsec_nanos() as f64 * 1e-9
69}
70
71fn std(seconds: f64) -> Duration {
72    assert!(seconds >= 0.0, "RunningAverage negative duration - time going backwards?");
73    Duration::new(seconds.floor() as u64, ((seconds - seconds.floor()) * 1e-9) as u32)
74}
75
76impl TimeInstant for f64 {
77    fn duration_since(&self, earlier: Self) -> Duration {
78        std(self - earlier)
79    }
80
81    fn forward(&mut self, duration: Duration) {
82        *self += dts(duration);
83    }
84}
85
86/// TimeSource that has to be manually progressed forward via `ManualTimeSource::time_shift()` method.
87#[derive(Debug)]
88pub struct ManualTimeSource {
89    now: f64,
90}
91
92impl TimeSource for ManualTimeSource {
93    type Instant = f64;
94
95    fn now(&self) -> Self::Instant {
96        self.now
97    }
98}
99
100impl ManualTimeSource {
101    pub fn new() -> ManualTimeSource {
102        ManualTimeSource {
103            now: 0.0
104        }
105    }
106
107    pub fn time_shift(&mut self, seconds: f64) {
108        self.now += seconds;
109    }
110}
111
112/// Represent result of the calculation of running average
113#[derive(Debug)]
114pub struct Measurement<T> {
115    value: T, 
116    duration: Duration,
117}
118
119use std::fmt;
120impl<T> fmt::Display for Measurement<T> where T: Clone + fmt::Display + ToRate, <T as ToRate>::Output: Into<f64> {
121    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
122        write!(f, "{:.3}", self.rate().into())
123    }
124}
125
126impl<T> Measurement<T> {
127    /// Returns pointer to internal value of the measurement which is sum of all samples within time window
128    pub fn value(&self) -> &T {
129        &self.value
130    }
131
132    /// Returns internal value of the measurement which is sum of all samples within time window consuming self
133    pub fn unwrap(self) -> T {
134        self.value
135    }
136
137    /// Calculates actual running average value based on sum of all samples and width of the time window
138    pub fn rate(&self) -> <T as ToRate>::Output where T: Clone + ToRate {
139        self.value.clone().to_rate(self.duration)
140    }
141
142    /// Calculates actual running average value based on sum of all samples and width of the time window consuming self
143    pub fn to_rate(self) -> <T as ToRate>::Output where T: ToRate {
144        self.value.to_rate(self.duration)
145    }
146}
147
148/// Represents running average calculation window.
149/// It is using specified window width that will consist of given number of accumulator buckets to ensure constant memory usage.
150#[derive(Debug)]
151pub struct RunningAverage<V: Default, I: TimeInstant + Copy> {
152    window: VecDeque<V>,
153    front: Option<I>,
154    duration: Duration,
155}
156
157impl<V: Default, I: TimeInstant + Copy> Default for RunningAverage<V, I> {
158    /// Crate new RunningAverage instance with window of 8 seconds width and 16 buckets.
159    fn default() -> RunningAverage<V, I> {
160        RunningAverage::new(Duration::from_secs(8))
161    }
162}
163
164impl<V: Default, I: TimeInstant + Copy> RunningAverage<V, I> {
165    /// Crate new RunningAverage instance that will average over window of width of given duration using 16 buckets.
166    pub fn new(duration: Duration) -> RunningAverage<V, I> {
167        RunningAverage::with_capacity(duration, 16)
168    }
169
170    /// Crate new RunningAverage instance that will average over window of width of given duration with specific number of buckets to use.
171    pub fn with_capacity(duration: Duration, capacity: usize) -> RunningAverage<V, I> {
172        assert!(capacity > 0, "RunningAverage capacity cannot be 0");
173        RunningAverage {
174            window: (0..capacity).map(|_| V::default()).collect(),
175            front: None,
176            duration: duration,
177        }
178    }
179
180    fn shift(&mut self, now: I) {
181        let front = self.front.get_or_insert(now);
182        let slot_duration = self.duration / self.window.len() as u32;
183        let mut slots_to_go = self.window.len();
184
185        while now.duration_since(*front) >= slot_duration {
186            // Stop if we zeroed all slots or this can loop for long time if shift was not called recently
187            if slots_to_go == 0 {
188                let since_front = now.duration_since(*front);
189                front.forward(since_front);
190                break;
191            }
192            self.window.pop_back();
193            self.window.push_front(V::default());
194            front.forward(slot_duration);
195            slots_to_go -= 1;
196        }
197    }
198    
199    /// Insert value to be average over at given time instant.
200    /// Panics if now is less than previous now - time cannot go backwards
201    pub fn insert(&mut self, now: I, val: V) where V: AddAssign<V> {
202        self.shift(now);
203        *self.window.front_mut().unwrap() += val;
204    }
205
206    /// Calculate running average using time window ending at given time instant.
207    /// Panics if now is less than previous now - time cannot go backwards.
208    pub fn measurement<'i>(&'i mut self, now: I) -> Measurement<V> where V: Sum<&'i V> {
209        self.shift(now);
210
211        Measurement {
212            value: self.window.iter().sum(),
213            duration: self.duration,
214        }
215    }
216}
217
218/// Represents running average calculation window where `shift` and `measurement` are using given time source to obtain value of `now` instant.
219/// It is using specified window width that will consist of given number of accumulator buckets to ensure constant memory usage.
220#[derive(Debug)]
221pub struct RealTimeRunningAverage<V: Default, TS: TimeSource = RealTimeSource> {
222    inner: RunningAverage<V, TS::Instant>,
223    time_source: TS,
224}
225
226impl<V: Default> Default for RealTimeRunningAverage<V, RealTimeSource> {
227    fn default() -> RealTimeRunningAverage<V, RealTimeSource> {
228        RealTimeRunningAverage::new(Duration::from_secs(8))
229    }
230}
231
232impl<V: Default> RealTimeRunningAverage<V, RealTimeSource> {
233    /// Crate new instance with window of given width duration and using RealTimeSource as time source for `now` instant.
234    /// Note: new() is parametrizing output to RealTimeSource as this cannot be inferred otherwise.
235    pub fn new(duration: Duration) -> RealTimeRunningAverage<V, RealTimeSource> {
236        let time_source = RealTimeSource;
237
238        RealTimeRunningAverage {
239            inner: RunningAverage::new(duration),
240            time_source,
241        }
242    }
243}
244
245impl<V: Default, TS: TimeSource> RealTimeRunningAverage<V, TS> {
246    /// Crate new instance with window of given width duration and using given as time source for `now` instant.
247    pub fn with_time_source(duration: Duration, capacity: usize, time_source: TS) -> RealTimeRunningAverage<V, TS> {
248        RealTimeRunningAverage {
249            inner: RunningAverage::with_capacity(duration, capacity),
250            time_source,
251        }
252    }
253
254    /// Insert value to be average over now.
255    /// Panics if time source time goes backwards.
256    pub fn insert(&mut self, val: V) where V: AddAssign<V> {
257        let now = self.time_source.now();
258        self.inner.insert(now, val)
259    }
260    
261    /// Calculate running average using time window ending now.
262    /// Panics if time source time goes backwards.
263    pub fn measurement<'i>(&'i mut self) -> Measurement<V> where V: Sum<&'i V> {
264        let now = self.time_source.now();
265        self.inner.measurement(now)
266    }
267
268    /// Return mutable reference to time source used.
269    pub fn time_source(&mut self) -> &mut TS {
270        &mut self.time_source
271    }
272}
273
274/// Types implementing this trait can be used to calculate `Measurement::rate()` from.
275/// Note: This is not implemented for u64 as it cannot be converted precisely to f64 - use f64 instead for big numbers
276/// Note: Duration can be converted to f64 but will be rounded to fit in it so it is not 100% precise for max Duration
277pub trait ToRate {
278    type Output;
279    fn to_rate(self, duration: Duration) -> Self::Output;
280}
281
282impl<T: Into<f64>> ToRate for T {
283    type Output = f64;
284
285    fn to_rate(self, duration: Duration) -> f64 {
286        let v: f64 = self.into();
287        v / dts(duration)
288    }
289}
290
291#[cfg(test)]
292mod tests {
293    #[test]
294    fn const_over_different_capacity() {
295        use super::*;
296
297        for capacity in 1..31 {
298            let mut tw = RealTimeRunningAverage::with_time_source(Duration::from_secs(4), capacity, ManualTimeSource::new());
299
300            tw.insert(10);
301            tw.time_source().time_shift(1.0);
302            tw.insert(10);
303            tw.time_source().time_shift(1.0);
304            tw.insert(10);
305            tw.time_source().time_shift(1.0);
306            tw.insert(10);
307
308            assert_eq!(tw.measurement().unwrap(), 40, "for capacity {}: {:?}", capacity, tw);
309            assert_eq!(tw.measurement().to_rate(), 10.0, "for capacity {}: {:?}", capacity, tw);
310        }
311    }
312
313    #[test]
314    fn const_half_time_over_different_capacity() {
315        use super::*;
316
317        for capacity in 1..31 {
318            let mut tw = RealTimeRunningAverage::with_time_source(Duration::from_secs(4), capacity, ManualTimeSource::new());
319
320            tw.insert(10);
321            tw.time_source().time_shift(1.0);
322            tw.insert(10);
323            tw.time_source().time_shift(1.0);
324            tw.time_source().time_shift(1.0);
325
326            assert_eq!(tw.measurement().unwrap(), 20, "for capacity {}: {:?}", capacity, tw);
327            assert_eq!(tw.measurement().to_rate(), 5.0, "for capacity {}: {:?}", capacity, tw);
328        }
329    }
330
331    #[test]
332    fn default_int() {
333        use super::*;
334
335        let mut tw = RealTimeRunningAverage::default();
336
337        tw.insert(10);
338        tw.insert(10);
339
340        // Note: this may fail as it is based on real time
341        assert_eq!(tw.measurement().unwrap(), 20, "default: {:?}", tw);
342        assert_eq!(tw.measurement().to_rate(), 2.5, "default: {:?}", tw);
343    }
344
345    #[test]
346    fn default_f64() {
347        use super::*;
348
349        let mut tw = RealTimeRunningAverage::default();
350
351        tw.insert(10f64);
352        tw.insert(10.0);
353
354        // Note: this may fail as it is based on real time
355        assert_eq!(tw.measurement().unwrap(), 20.0, "default: {:?}", tw);
356        assert_eq!(tw.measurement().to_rate(), 2.5, "default: {:?}", tw);
357    }
358
359    #[test]
360    fn long_time_shift() {
361        use super::*;
362
363        let mut tw = RealTimeRunningAverage::with_time_source(Duration::from_secs(4), 16, ManualTimeSource::new());
364
365        tw.insert(10);
366        tw.time_source().time_shift(1_000_000_000.0);
367        tw.insert(10);
368        tw.time_source().time_shift(1.0);
369        tw.insert(10);
370        tw.time_source().time_shift(1.0);
371        tw.insert(10);
372        tw.time_source().time_shift(1.0);
373        tw.insert(10);
374
375        assert_eq!(tw.measurement().unwrap(), 40, "long: {:?}", tw);
376        assert_eq!(tw.measurement().to_rate(), 10.0, "long: {:?}", tw);
377    }
378
379    #[test]
380    fn measurement_display() {
381        use super::*;
382
383        let mut tw = RealTimeRunningAverage::default();
384
385        tw.insert(10);
386        tw.insert(10);
387
388        assert_eq!(&format!("{}", tw.measurement()), "2.500");
389    }
390}