datastreams_rs/
lib.rs

1#![no_std]
2
3extern crate alloc;
4
5use alloc::vec::Vec;
6use core::cmp::max;
7use core::ops::Index;
8
9pub struct DataStream {
10    lo_ts: u64,
11    hi_ts: u64,
12
13    history_size: usize,
14    granularity_ms: u64,
15
16    values: Vec<f64>,
17    exists: Vec<bool>,
18    threshold_index: usize,
19}
20
21#[must_use]
22pub fn create_data_stream(hi_ts: u64, history_size: usize, granularity_ms: u64) -> DataStream {
23    let history_ms = (history_size as u64) * granularity_ms;
24
25    let mut values: Vec<f64> = Vec::new();
26    values.resize(history_size, 0f64);
27    let mut exists: Vec<bool> = Vec::new();
28    exists.resize(history_size, false);
29
30    DataStream {
31        hi_ts: max(hi_ts, history_ms),
32        lo_ts: if history_ms >= hi_ts {
33            0
34        } else {
35            hi_ts - history_ms
36        },
37
38        history_size,
39        granularity_ms,
40
41        values,
42        exists,
43
44        threshold_index: history_size - 1,
45    }
46}
47
48#[must_use]
49#[allow(clippy::cast_precision_loss)]
50pub fn aggregation_avg(batch: &Vec<f64>) -> f64 {
51    if batch.is_empty() {
52        return f64::NAN;
53    }
54
55    let mut sum: f64 = 0f64;
56    for x in batch {
57        sum += x;
58    }
59
60    sum / (batch.len() as f64)
61}
62
63#[must_use]
64pub fn aggregation_max(batch: &Vec<f64>) -> f64 {
65    if batch.is_empty() {
66        return f64::NAN;
67    }
68
69    let mut max_value = batch.index(0);
70
71    for x in batch {
72        if x > max_value {
73            max_value = x;
74        }
75    }
76
77    *max_value
78}
79
80#[must_use]
81pub fn aggregation_min(batch: &Vec<f64>) -> f64 {
82    if batch.is_empty() {
83        return f64::NAN;
84    }
85
86    let mut min_value = batch.index(0);
87
88    for x in batch {
89        if x < min_value {
90            min_value = x;
91        }
92    }
93
94    *min_value
95}
96
97pub trait DataStreamAggregations {
98    fn agg<F>(&mut self, func: F, aggregation_ms: u64, out: &mut Vec<f64>)
99    where
100        F: Fn(&Vec<f64>) -> f64;
101}
102
103impl DataStreamAggregations for DataStream {
104    fn agg<F>(&mut self, func: F, aggregation_ms: u64, out: &mut Vec<f64>)
105    where
106        F: Fn(&Vec<f64>) -> f64,
107    {
108        out.clear();
109
110        let mut buf: Vec<f64> = Vec::new();
111        let mut local_offset_ts: u64 = 0;
112        let mut pos: usize = (self.threshold_index + 1) % self.history_size;
113        let mut terminated = false;
114
115        loop {
116            if terminated {
117                return;
118            }
119
120            if pos == self.threshold_index {
121                terminated = true;
122            }
123
124            if self.exists[pos] {
125                buf.push(self.values[pos]);
126            }
127
128            local_offset_ts += self.granularity_ms;
129            if local_offset_ts >= aggregation_ms {
130                if buf.is_empty() {
131                    out.push(f64::NAN);
132                } else {
133                    out.push(func(&buf));
134                }
135                buf.clear();
136                local_offset_ts = 0;
137            }
138
139            pos = (pos + 1) % self.history_size;
140        }
141    }
142}
143
144pub trait DataStreamOperations {
145    fn add_value(&mut self, ts: u64, value: f64);
146    fn max_timestamp(&mut self) -> u64;
147    fn max_value(&mut self) -> f64;
148    fn last_timestamp(&mut self) -> u64;
149    fn last_value(&mut self) -> f64;
150    fn value_counts(&mut self) -> usize;
151}
152
153#[allow(dead_code)]
154pub struct DataStreamValue {
155    timestamp: u64,
156    value: f64,
157}
158
159pub struct DataStreamIterMut<'a> {
160    terminated: bool,
161    pos: usize,
162    offset: u64,
163    data: &'a DataStream,
164}
165
166impl DataStream {
167    #[allow(dead_code)]
168    fn iter(&self) -> DataStreamIterMut {
169        <&Self as IntoIterator>::into_iter(self)
170    }
171}
172
173impl Iterator for DataStreamIterMut<'_> {
174    type Item = DataStreamValue;
175
176    fn next(&mut self) -> Option<Self::Item> {
177        if self.terminated {
178            return None;
179        }
180
181        loop {
182            if self.terminated {
183                return None;
184            }
185
186            if self.pos == self.data.threshold_index {
187                self.terminated = true;
188            }
189
190            if self.data.exists[self.pos] {
191                let timestamp = self.data.lo_ts + self.data.granularity_ms * (self.offset + 1);
192                let value = self.data.values[self.pos];
193                self.offset += 1;
194                self.pos = (self.pos + 1) % self.data.history_size;
195
196                return Option::from(DataStreamValue { timestamp, value });
197            }
198
199            self.offset += 1;
200            self.pos = (self.pos + 1) % self.data.history_size;
201        }
202    }
203}
204
205impl<'a> IntoIterator for &'a DataStream {
206    type Item = DataStreamValue;
207    type IntoIter = DataStreamIterMut<'a>;
208
209    fn into_iter(self) -> Self::IntoIter {
210        DataStreamIterMut {
211            terminated: false,
212            pos: (self.threshold_index + 1) % self.history_size,
213            offset: 0,
214            data: self,
215        }
216    }
217}
218
219impl DataStreamOperations for DataStream {
220    fn add_value(&mut self, ts: u64, value: f64) {
221        if ts < self.lo_ts {
222            return;
223        }
224
225        if ts > self.hi_ts {
226            let buckets_shift = (ts - self.hi_ts) / self.granularity_ms;
227            if buckets_shift > 0 {
228                let shift_value = buckets_shift * self.granularity_ms;
229                let buckets_shift = usize::try_from(buckets_shift).unwrap();
230                for i in 0..buckets_shift {
231                    let offset = (i + self.threshold_index + 1) % self.history_size;
232                    self.exists[offset] = false;
233                }
234                self.lo_ts += shift_value;
235                self.hi_ts += shift_value;
236                self.threshold_index = (self.threshold_index + buckets_shift) % self.history_size;
237            }
238        }
239
240        let offset = ts - self.lo_ts;
241        let bucket = offset / self.granularity_ms;
242
243        let bucket: usize = usize::try_from(bucket).unwrap();
244        let bucket = (bucket + self.threshold_index) % self.history_size;
245
246        self.values[bucket] = value;
247        self.exists[bucket] = true;
248    }
249
250    fn max_timestamp(&mut self) -> u64 {
251        self.hi_ts
252    }
253
254    fn max_value(&mut self) -> f64 {
255        let mut is_exists = false;
256        let mut value = 0f64;
257
258        for i in 0..self.history_size {
259            if self.exists[i] && (!is_exists || self.values[i] > value) {
260                is_exists = true;
261                value = self.values[i];
262            }
263        }
264
265        if is_exists {
266            value
267        } else {
268            f64::NAN
269        }
270    }
271
272    fn last_timestamp(&mut self) -> u64 {
273        let mut offset = 0;
274        let mut index = self.threshold_index % self.history_size;
275
276        loop {
277            if self.exists[index] {
278                return self.hi_ts - self.granularity_ms * offset;
279            }
280
281            offset += 1;
282            index = if index == 0 {
283                self.history_size - 1
284            } else {
285                index - 1
286            };
287            if index == self.threshold_index {
288                break;
289            }
290        }
291
292        0
293    }
294
295    fn last_value(&mut self) -> f64 {
296        for i in 0..self.history_size {
297            let index = (i + self.threshold_index) % self.history_size;
298            if self.exists[index] {
299                return self.values[index];
300            }
301        }
302
303        f64::NAN
304    }
305
306    fn value_counts(&mut self) -> usize {
307        let mut value_counts: usize = 0;
308
309        for is_exists in self.exists.iter() {
310            if *is_exists {
311                value_counts += 1;
312            }
313        }
314
315        value_counts
316    }
317}
318
319#[cfg(test)]
320mod tests {
321    use super::*;
322    use alloc::vec;
323    use core::iter::zip;
324    use core::ops::Index;
325
326    const HISTORY_SIZE: usize = 10;
327    const BUCKET_SIZE_MS: u64 = 1_000;
328    const INITIAL_TS: u64 = 1_714_321_497_981;
329
330    #[test]
331    fn should_skip_add_value_for_too_late_values() {
332        let mut data_stream: DataStream =
333            create_data_stream(INITIAL_TS, HISTORY_SIZE, BUCKET_SIZE_MS);
334
335        data_stream.add_value(42u64, 1f64);
336
337        assert_eq!(INITIAL_TS, data_stream.max_timestamp());
338    }
339
340    #[test]
341    fn should_use_zero_as_marker_for_no_values_in_data_stream() {
342        let mut data_stream: DataStream =
343            create_data_stream(INITIAL_TS, HISTORY_SIZE, BUCKET_SIZE_MS);
344
345        assert_eq!(0u64, data_stream.last_timestamp());
346    }
347
348    #[test]
349    fn should_add_value_inside_observation_window() {
350        let mut data_stream: DataStream =
351            create_data_stream(INITIAL_TS, HISTORY_SIZE, BUCKET_SIZE_MS);
352        let next_ts = INITIAL_TS;
353        let next_value = 3f64;
354
355        data_stream.add_value(next_ts, next_value);
356
357        assert_eq!(INITIAL_TS, data_stream.max_timestamp());
358        assert_eq!(0, next_ts - data_stream.last_timestamp());
359        assert_eq!(next_ts, data_stream.last_timestamp());
360        assert_eq!(next_value, data_stream.last_value());
361    }
362
363    #[test]
364    fn should_return_last_value() {
365        let mut data_stream: DataStream =
366            create_data_stream(INITIAL_TS, HISTORY_SIZE, BUCKET_SIZE_MS);
367
368        data_stream.add_value(INITIAL_TS, 3f64);
369        data_stream.add_value(INITIAL_TS - BUCKET_SIZE_MS, 2f64);
370        data_stream.add_value(INITIAL_TS - 2 * BUCKET_SIZE_MS, 1f64);
371
372        assert_eq!(3f64, data_stream.last_value());
373    }
374
375    #[test]
376    fn should_evaluate_max_value() {
377        let mut data_stream: DataStream =
378            create_data_stream(INITIAL_TS, HISTORY_SIZE, BUCKET_SIZE_MS);
379
380        data_stream.add_value(INITIAL_TS - BUCKET_SIZE_MS, 1f64);
381        data_stream.add_value(INITIAL_TS - 2 * BUCKET_SIZE_MS, 3f64);
382        data_stream.add_value(INITIAL_TS - 3 * BUCKET_SIZE_MS, 2f64);
383
384        assert_eq!(3f64, data_stream.max_value());
385    }
386
387    #[test]
388    fn should_evaluate_max_value_nan_if_empty_data_stream() {
389        let mut data_stream: DataStream =
390            create_data_stream(INITIAL_TS, HISTORY_SIZE, BUCKET_SIZE_MS);
391
392        assert!(f64::is_nan(data_stream.max_value()));
393    }
394
395    #[test]
396    fn should_process_window_shift() {
397        let mut data_stream: DataStream =
398            create_data_stream(INITIAL_TS, HISTORY_SIZE, BUCKET_SIZE_MS);
399
400        data_stream.add_value(INITIAL_TS, 1f64);
401        data_stream.add_value(INITIAL_TS - BUCKET_SIZE_MS, 0f64);
402        data_stream.add_value(INITIAL_TS, 1f64);
403        data_stream.add_value(INITIAL_TS + BUCKET_SIZE_MS, 2f64);
404        data_stream.add_value(INITIAL_TS + 2 * BUCKET_SIZE_MS, 3f64);
405
406        assert_eq!(3f64, data_stream.last_value());
407        assert_eq!(
408            0,
409            INITIAL_TS + 2 * BUCKET_SIZE_MS - data_stream.last_timestamp()
410        );
411        assert_eq!(
412            INITIAL_TS + 2 * BUCKET_SIZE_MS,
413            data_stream.last_timestamp()
414        );
415    }
416
417    #[test]
418    fn should_count_existed_values_without_shift() {
419        let mut data_stream: DataStream =
420            create_data_stream(INITIAL_TS, HISTORY_SIZE, BUCKET_SIZE_MS);
421
422        let ts = vec![
423            INITIAL_TS - 3 * BUCKET_SIZE_MS,
424            INITIAL_TS - 2 * BUCKET_SIZE_MS,
425            INITIAL_TS - BUCKET_SIZE_MS,
426            INITIAL_TS,
427        ];
428        let values = vec![1f64, 2f64, 3f64, 4f64];
429
430        assert_eq!(ts.len(), values.len());
431
432        let mut offset: usize = 0;
433        zip(ts.clone(), values.clone()).for_each(|(t, y)| {
434            data_stream.add_value(t, y);
435            offset += 1;
436            assert_eq!(offset, data_stream.value_counts());
437        });
438
439        assert_eq!(ts.len(), data_stream.value_counts());
440    }
441
442    #[test]
443    fn should_count_existed_values_with_shift() {
444        let mut data_stream: DataStream =
445            create_data_stream(INITIAL_TS, HISTORY_SIZE, BUCKET_SIZE_MS);
446
447        let ts = vec![
448            INITIAL_TS - 2 * BUCKET_SIZE_MS,
449            INITIAL_TS - BUCKET_SIZE_MS,
450            INITIAL_TS,
451            INITIAL_TS + BUCKET_SIZE_MS,
452            INITIAL_TS + 2 * BUCKET_SIZE_MS,
453        ];
454        let values = vec![1f64, 2f64, 3f64, 4f64, 5f64];
455
456        assert_eq!(ts.len(), values.len());
457
458        let mut offset: usize = 0;
459        zip(ts.clone(), values.clone()).for_each(|(t, y)| {
460            data_stream.add_value(t, y);
461            offset += 1;
462            assert_eq!(offset, data_stream.value_counts());
463        });
464
465        assert_eq!(ts.len(), data_stream.value_counts());
466    }
467
468    #[test]
469    fn should_return_values_using_iterator() {
470        let mut data_stream: DataStream =
471            create_data_stream(INITIAL_TS, HISTORY_SIZE, BUCKET_SIZE_MS);
472
473        let ts = vec![
474            INITIAL_TS - 2 * BUCKET_SIZE_MS,
475            INITIAL_TS,
476            INITIAL_TS + BUCKET_SIZE_MS,
477            INITIAL_TS + 2 * BUCKET_SIZE_MS,
478        ];
479        let values = vec![1f64, 2f64, 3f64, 4f64];
480
481        assert_eq!(ts.len(), values.len());
482
483        zip(ts.clone(), values.clone()).for_each(|(t, y)| {
484            data_stream.add_value(t, y);
485        });
486
487        for (offset, data_stream_value) in data_stream.into_iter().enumerate() {
488            let _t = data_stream_value.timestamp;
489            let _v = data_stream_value.value;
490
491            assert_eq!(values[offset], data_stream_value.value);
492            assert_eq!(ts[offset], data_stream_value.timestamp);
493        }
494    }
495
496    #[test]
497    fn should_use_iterator() {
498        let mut data_stream: DataStream =
499            create_data_stream(INITIAL_TS, HISTORY_SIZE, BUCKET_SIZE_MS);
500
501        let ts = vec![
502            INITIAL_TS - 2 * BUCKET_SIZE_MS,
503            INITIAL_TS,
504            INITIAL_TS + BUCKET_SIZE_MS,
505            INITIAL_TS + 2 * BUCKET_SIZE_MS,
506        ];
507        let values = vec![1f64, 2f64, 3f64, 4f64];
508
509        assert_eq!(ts.len(), values.len());
510
511        zip(ts.clone(), values.clone()).for_each(|(t, y)| {
512            data_stream.add_value(t, y);
513        });
514
515        for (offset, data_stream_value) in data_stream.iter().enumerate() {
516            let _t = data_stream_value.timestamp;
517            let _v = data_stream_value.value;
518
519            assert_eq!(values[offset], data_stream_value.value);
520            assert_eq!(ts[offset], data_stream_value.timestamp);
521        }
522    }
523
524    #[test]
525    fn should_evaluate_avg_aggregation() {
526        const LOCAL_HISTORY_SIZE: usize = 4;
527        let mut data_stream: DataStream =
528            create_data_stream(INITIAL_TS, LOCAL_HISTORY_SIZE, BUCKET_SIZE_MS);
529
530        let ts = vec![
531            INITIAL_TS - BUCKET_SIZE_MS,
532            INITIAL_TS,
533            INITIAL_TS + BUCKET_SIZE_MS,
534            INITIAL_TS + 2 * BUCKET_SIZE_MS,
535        ];
536        let values = vec![2f64, 2f64, 3f64, 3f64];
537        assert_eq!(ts.len(), values.len());
538        zip(ts.clone(), values.clone()).for_each(|(t, y)| {
539            data_stream.add_value(t, y);
540        });
541
542        const AGGREGATION_MS: u64 = 2 * BUCKET_SIZE_MS;
543        let mut out: Vec<f64> = Vec::new();
544        data_stream.agg(aggregation_avg, AGGREGATION_MS, &mut out);
545
546        assert_eq!(2usize, out.len());
547        assert_eq!(2f64, *out.index(0));
548        assert_eq!(3f64, *out.index(1));
549    }
550
551    #[test]
552    fn should_evaluate_max_aggregation() {
553        const LOCAL_HISTORY_SIZE: usize = 4;
554        let mut data_stream: DataStream =
555            create_data_stream(INITIAL_TS, LOCAL_HISTORY_SIZE, BUCKET_SIZE_MS);
556
557        let ts = vec![
558            INITIAL_TS - BUCKET_SIZE_MS,
559            INITIAL_TS,
560            INITIAL_TS + BUCKET_SIZE_MS,
561            INITIAL_TS + 2 * BUCKET_SIZE_MS,
562        ];
563        let values = vec![5f64, 2f64, 3f64, 10f64];
564        assert_eq!(ts.len(), values.len());
565        zip(ts.clone(), values.clone()).for_each(|(t, y)| {
566            data_stream.add_value(t, y);
567        });
568
569        const AGGREGATION_MS: u64 = 2 * BUCKET_SIZE_MS;
570        let mut out: Vec<f64> = Vec::new();
571        data_stream.agg(aggregation_max, AGGREGATION_MS, &mut out);
572
573        assert_eq!(2usize, out.len());
574        assert_eq!(5f64, *out.index(0));
575        assert_eq!(10f64, *out.index(1));
576    }
577
578    #[test]
579    fn should_evaluate_min_aggregation() {
580        const LOCAL_HISTORY_SIZE: usize = 4;
581        let mut data_stream: DataStream =
582            create_data_stream(INITIAL_TS, LOCAL_HISTORY_SIZE, BUCKET_SIZE_MS);
583
584        let ts = vec![
585            INITIAL_TS - BUCKET_SIZE_MS,
586            INITIAL_TS,
587            INITIAL_TS + BUCKET_SIZE_MS,
588            INITIAL_TS + 2 * BUCKET_SIZE_MS,
589        ];
590        let values = vec![5f64, 2f64, 3f64, 10f64];
591        assert_eq!(ts.len(), values.len());
592        zip(ts.clone(), values.clone()).for_each(|(t, y)| {
593            data_stream.add_value(t, y);
594        });
595
596        const AGGREGATION_MS: u64 = 2 * BUCKET_SIZE_MS;
597        let mut out: Vec<f64> = Vec::new();
598        data_stream.agg(aggregation_min, AGGREGATION_MS, &mut out);
599
600        assert_eq!(2usize, out.len());
601        assert_eq!(2f64, *out.index(0));
602        assert_eq!(3f64, *out.index(1));
603    }
604}