1#![no_std]
2
3extern crate alloc;
4
5use alloc::vec::Vec;
6use core::cmp::max;
7use core::ops::Index;
8
9pub struct DataStream {
10 lo_ts: u64,
11 hi_ts: u64,
12
13 history_size: usize,
14 granularity_ms: u64,
15
16 values: Vec<f64>,
17 exists: Vec<bool>,
18 threshold_index: usize,
19}
20
21#[must_use]
22pub fn create_data_stream(hi_ts: u64, history_size: usize, granularity_ms: u64) -> DataStream {
23 let history_ms = (history_size as u64) * granularity_ms;
24
25 let mut values: Vec<f64> = Vec::new();
26 values.resize(history_size, 0f64);
27 let mut exists: Vec<bool> = Vec::new();
28 exists.resize(history_size, false);
29
30 DataStream {
31 hi_ts: max(hi_ts, history_ms),
32 lo_ts: if history_ms >= hi_ts {
33 0
34 } else {
35 hi_ts - history_ms
36 },
37
38 history_size,
39 granularity_ms,
40
41 values,
42 exists,
43
44 threshold_index: history_size - 1,
45 }
46}
47
48#[must_use]
49#[allow(clippy::cast_precision_loss)]
50pub fn aggregation_avg(batch: &Vec<f64>) -> f64 {
51 if batch.is_empty() {
52 return f64::NAN;
53 }
54
55 let mut sum: f64 = 0f64;
56 for x in batch {
57 sum += x;
58 }
59
60 sum / (batch.len() as f64)
61}
62
63#[must_use]
64pub fn aggregation_max(batch: &Vec<f64>) -> f64 {
65 if batch.is_empty() {
66 return f64::NAN;
67 }
68
69 let mut max_value = batch.index(0);
70
71 for x in batch {
72 if x > max_value {
73 max_value = x;
74 }
75 }
76
77 *max_value
78}
79
80#[must_use]
81pub fn aggregation_min(batch: &Vec<f64>) -> f64 {
82 if batch.is_empty() {
83 return f64::NAN;
84 }
85
86 let mut min_value = batch.index(0);
87
88 for x in batch {
89 if x < min_value {
90 min_value = x;
91 }
92 }
93
94 *min_value
95}
96
97pub trait DataStreamAggregations {
98 fn agg<F>(&mut self, func: F, aggregation_ms: u64, out: &mut Vec<f64>)
99 where
100 F: Fn(&Vec<f64>) -> f64;
101}
102
103impl DataStreamAggregations for DataStream {
104 fn agg<F>(&mut self, func: F, aggregation_ms: u64, out: &mut Vec<f64>)
105 where
106 F: Fn(&Vec<f64>) -> f64,
107 {
108 out.clear();
109
110 let mut buf: Vec<f64> = Vec::new();
111 let mut local_offset_ts: u64 = 0;
112 let mut pos: usize = (self.threshold_index + 1) % self.history_size;
113 let mut terminated = false;
114
115 loop {
116 if terminated {
117 return;
118 }
119
120 if pos == self.threshold_index {
121 terminated = true;
122 }
123
124 if self.exists[pos] {
125 buf.push(self.values[pos]);
126 }
127
128 local_offset_ts += self.granularity_ms;
129 if local_offset_ts >= aggregation_ms {
130 if buf.is_empty() {
131 out.push(f64::NAN);
132 } else {
133 out.push(func(&buf));
134 }
135 buf.clear();
136 local_offset_ts = 0;
137 }
138
139 pos = (pos + 1) % self.history_size;
140 }
141 }
142}
143
144pub trait DataStreamOperations {
145 fn add_value(&mut self, ts: u64, value: f64);
146 fn max_timestamp(&mut self) -> u64;
147 fn max_value(&mut self) -> f64;
148 fn last_timestamp(&mut self) -> u64;
149 fn last_value(&mut self) -> f64;
150 fn value_counts(&mut self) -> usize;
151}
152
153#[allow(dead_code)]
154pub struct DataStreamValue {
155 timestamp: u64,
156 value: f64,
157}
158
159pub struct DataStreamIterMut<'a> {
160 terminated: bool,
161 pos: usize,
162 offset: u64,
163 data: &'a DataStream,
164}
165
166impl DataStream {
167 #[allow(dead_code)]
168 fn iter(&self) -> DataStreamIterMut {
169 <&Self as IntoIterator>::into_iter(self)
170 }
171}
172
173impl Iterator for DataStreamIterMut<'_> {
174 type Item = DataStreamValue;
175
176 fn next(&mut self) -> Option<Self::Item> {
177 if self.terminated {
178 return None;
179 }
180
181 loop {
182 if self.terminated {
183 return None;
184 }
185
186 if self.pos == self.data.threshold_index {
187 self.terminated = true;
188 }
189
190 if self.data.exists[self.pos] {
191 let timestamp = self.data.lo_ts + self.data.granularity_ms * (self.offset + 1);
192 let value = self.data.values[self.pos];
193 self.offset += 1;
194 self.pos = (self.pos + 1) % self.data.history_size;
195
196 return Option::from(DataStreamValue { timestamp, value });
197 }
198
199 self.offset += 1;
200 self.pos = (self.pos + 1) % self.data.history_size;
201 }
202 }
203}
204
205impl<'a> IntoIterator for &'a DataStream {
206 type Item = DataStreamValue;
207 type IntoIter = DataStreamIterMut<'a>;
208
209 fn into_iter(self) -> Self::IntoIter {
210 DataStreamIterMut {
211 terminated: false,
212 pos: (self.threshold_index + 1) % self.history_size,
213 offset: 0,
214 data: self,
215 }
216 }
217}
218
219impl DataStreamOperations for DataStream {
220 fn add_value(&mut self, ts: u64, value: f64) {
221 if ts < self.lo_ts {
222 return;
223 }
224
225 if ts > self.hi_ts {
226 let buckets_shift = (ts - self.hi_ts) / self.granularity_ms;
227 if buckets_shift > 0 {
228 let shift_value = buckets_shift * self.granularity_ms;
229 let buckets_shift = usize::try_from(buckets_shift).unwrap();
230 for i in 0..buckets_shift {
231 let offset = (i + self.threshold_index + 1) % self.history_size;
232 self.exists[offset] = false;
233 }
234 self.lo_ts += shift_value;
235 self.hi_ts += shift_value;
236 self.threshold_index = (self.threshold_index + buckets_shift) % self.history_size;
237 }
238 }
239
240 let offset = ts - self.lo_ts;
241 let bucket = offset / self.granularity_ms;
242
243 let bucket: usize = usize::try_from(bucket).unwrap();
244 let bucket = (bucket + self.threshold_index) % self.history_size;
245
246 self.values[bucket] = value;
247 self.exists[bucket] = true;
248 }
249
250 fn max_timestamp(&mut self) -> u64 {
251 self.hi_ts
252 }
253
254 fn max_value(&mut self) -> f64 {
255 let mut is_exists = false;
256 let mut value = 0f64;
257
258 for i in 0..self.history_size {
259 if self.exists[i] && (!is_exists || self.values[i] > value) {
260 is_exists = true;
261 value = self.values[i];
262 }
263 }
264
265 if is_exists {
266 value
267 } else {
268 f64::NAN
269 }
270 }
271
272 fn last_timestamp(&mut self) -> u64 {
273 let mut offset = 0;
274 let mut index = self.threshold_index % self.history_size;
275
276 loop {
277 if self.exists[index] {
278 return self.hi_ts - self.granularity_ms * offset;
279 }
280
281 offset += 1;
282 index = if index == 0 {
283 self.history_size - 1
284 } else {
285 index - 1
286 };
287 if index == self.threshold_index {
288 break;
289 }
290 }
291
292 0
293 }
294
295 fn last_value(&mut self) -> f64 {
296 for i in 0..self.history_size {
297 let index = (i + self.threshold_index) % self.history_size;
298 if self.exists[index] {
299 return self.values[index];
300 }
301 }
302
303 f64::NAN
304 }
305
306 fn value_counts(&mut self) -> usize {
307 let mut value_counts: usize = 0;
308
309 for is_exists in self.exists.iter() {
310 if *is_exists {
311 value_counts += 1;
312 }
313 }
314
315 value_counts
316 }
317}
318
319#[cfg(test)]
320mod tests {
321 use super::*;
322 use alloc::vec;
323 use core::iter::zip;
324 use core::ops::Index;
325
326 const HISTORY_SIZE: usize = 10;
327 const BUCKET_SIZE_MS: u64 = 1_000;
328 const INITIAL_TS: u64 = 1_714_321_497_981;
329
330 #[test]
331 fn should_skip_add_value_for_too_late_values() {
332 let mut data_stream: DataStream =
333 create_data_stream(INITIAL_TS, HISTORY_SIZE, BUCKET_SIZE_MS);
334
335 data_stream.add_value(42u64, 1f64);
336
337 assert_eq!(INITIAL_TS, data_stream.max_timestamp());
338 }
339
340 #[test]
341 fn should_use_zero_as_marker_for_no_values_in_data_stream() {
342 let mut data_stream: DataStream =
343 create_data_stream(INITIAL_TS, HISTORY_SIZE, BUCKET_SIZE_MS);
344
345 assert_eq!(0u64, data_stream.last_timestamp());
346 }
347
348 #[test]
349 fn should_add_value_inside_observation_window() {
350 let mut data_stream: DataStream =
351 create_data_stream(INITIAL_TS, HISTORY_SIZE, BUCKET_SIZE_MS);
352 let next_ts = INITIAL_TS;
353 let next_value = 3f64;
354
355 data_stream.add_value(next_ts, next_value);
356
357 assert_eq!(INITIAL_TS, data_stream.max_timestamp());
358 assert_eq!(0, next_ts - data_stream.last_timestamp());
359 assert_eq!(next_ts, data_stream.last_timestamp());
360 assert_eq!(next_value, data_stream.last_value());
361 }
362
363 #[test]
364 fn should_return_last_value() {
365 let mut data_stream: DataStream =
366 create_data_stream(INITIAL_TS, HISTORY_SIZE, BUCKET_SIZE_MS);
367
368 data_stream.add_value(INITIAL_TS, 3f64);
369 data_stream.add_value(INITIAL_TS - BUCKET_SIZE_MS, 2f64);
370 data_stream.add_value(INITIAL_TS - 2 * BUCKET_SIZE_MS, 1f64);
371
372 assert_eq!(3f64, data_stream.last_value());
373 }
374
375 #[test]
376 fn should_evaluate_max_value() {
377 let mut data_stream: DataStream =
378 create_data_stream(INITIAL_TS, HISTORY_SIZE, BUCKET_SIZE_MS);
379
380 data_stream.add_value(INITIAL_TS - BUCKET_SIZE_MS, 1f64);
381 data_stream.add_value(INITIAL_TS - 2 * BUCKET_SIZE_MS, 3f64);
382 data_stream.add_value(INITIAL_TS - 3 * BUCKET_SIZE_MS, 2f64);
383
384 assert_eq!(3f64, data_stream.max_value());
385 }
386
387 #[test]
388 fn should_evaluate_max_value_nan_if_empty_data_stream() {
389 let mut data_stream: DataStream =
390 create_data_stream(INITIAL_TS, HISTORY_SIZE, BUCKET_SIZE_MS);
391
392 assert!(f64::is_nan(data_stream.max_value()));
393 }
394
395 #[test]
396 fn should_process_window_shift() {
397 let mut data_stream: DataStream =
398 create_data_stream(INITIAL_TS, HISTORY_SIZE, BUCKET_SIZE_MS);
399
400 data_stream.add_value(INITIAL_TS, 1f64);
401 data_stream.add_value(INITIAL_TS - BUCKET_SIZE_MS, 0f64);
402 data_stream.add_value(INITIAL_TS, 1f64);
403 data_stream.add_value(INITIAL_TS + BUCKET_SIZE_MS, 2f64);
404 data_stream.add_value(INITIAL_TS + 2 * BUCKET_SIZE_MS, 3f64);
405
406 assert_eq!(3f64, data_stream.last_value());
407 assert_eq!(
408 0,
409 INITIAL_TS + 2 * BUCKET_SIZE_MS - data_stream.last_timestamp()
410 );
411 assert_eq!(
412 INITIAL_TS + 2 * BUCKET_SIZE_MS,
413 data_stream.last_timestamp()
414 );
415 }
416
417 #[test]
418 fn should_count_existed_values_without_shift() {
419 let mut data_stream: DataStream =
420 create_data_stream(INITIAL_TS, HISTORY_SIZE, BUCKET_SIZE_MS);
421
422 let ts = vec![
423 INITIAL_TS - 3 * BUCKET_SIZE_MS,
424 INITIAL_TS - 2 * BUCKET_SIZE_MS,
425 INITIAL_TS - BUCKET_SIZE_MS,
426 INITIAL_TS,
427 ];
428 let values = vec![1f64, 2f64, 3f64, 4f64];
429
430 assert_eq!(ts.len(), values.len());
431
432 let mut offset: usize = 0;
433 zip(ts.clone(), values.clone()).for_each(|(t, y)| {
434 data_stream.add_value(t, y);
435 offset += 1;
436 assert_eq!(offset, data_stream.value_counts());
437 });
438
439 assert_eq!(ts.len(), data_stream.value_counts());
440 }
441
442 #[test]
443 fn should_count_existed_values_with_shift() {
444 let mut data_stream: DataStream =
445 create_data_stream(INITIAL_TS, HISTORY_SIZE, BUCKET_SIZE_MS);
446
447 let ts = vec![
448 INITIAL_TS - 2 * BUCKET_SIZE_MS,
449 INITIAL_TS - BUCKET_SIZE_MS,
450 INITIAL_TS,
451 INITIAL_TS + BUCKET_SIZE_MS,
452 INITIAL_TS + 2 * BUCKET_SIZE_MS,
453 ];
454 let values = vec![1f64, 2f64, 3f64, 4f64, 5f64];
455
456 assert_eq!(ts.len(), values.len());
457
458 let mut offset: usize = 0;
459 zip(ts.clone(), values.clone()).for_each(|(t, y)| {
460 data_stream.add_value(t, y);
461 offset += 1;
462 assert_eq!(offset, data_stream.value_counts());
463 });
464
465 assert_eq!(ts.len(), data_stream.value_counts());
466 }
467
468 #[test]
469 fn should_return_values_using_iterator() {
470 let mut data_stream: DataStream =
471 create_data_stream(INITIAL_TS, HISTORY_SIZE, BUCKET_SIZE_MS);
472
473 let ts = vec![
474 INITIAL_TS - 2 * BUCKET_SIZE_MS,
475 INITIAL_TS,
476 INITIAL_TS + BUCKET_SIZE_MS,
477 INITIAL_TS + 2 * BUCKET_SIZE_MS,
478 ];
479 let values = vec![1f64, 2f64, 3f64, 4f64];
480
481 assert_eq!(ts.len(), values.len());
482
483 zip(ts.clone(), values.clone()).for_each(|(t, y)| {
484 data_stream.add_value(t, y);
485 });
486
487 for (offset, data_stream_value) in data_stream.into_iter().enumerate() {
488 let _t = data_stream_value.timestamp;
489 let _v = data_stream_value.value;
490
491 assert_eq!(values[offset], data_stream_value.value);
492 assert_eq!(ts[offset], data_stream_value.timestamp);
493 }
494 }
495
496 #[test]
497 fn should_use_iterator() {
498 let mut data_stream: DataStream =
499 create_data_stream(INITIAL_TS, HISTORY_SIZE, BUCKET_SIZE_MS);
500
501 let ts = vec![
502 INITIAL_TS - 2 * BUCKET_SIZE_MS,
503 INITIAL_TS,
504 INITIAL_TS + BUCKET_SIZE_MS,
505 INITIAL_TS + 2 * BUCKET_SIZE_MS,
506 ];
507 let values = vec![1f64, 2f64, 3f64, 4f64];
508
509 assert_eq!(ts.len(), values.len());
510
511 zip(ts.clone(), values.clone()).for_each(|(t, y)| {
512 data_stream.add_value(t, y);
513 });
514
515 for (offset, data_stream_value) in data_stream.iter().enumerate() {
516 let _t = data_stream_value.timestamp;
517 let _v = data_stream_value.value;
518
519 assert_eq!(values[offset], data_stream_value.value);
520 assert_eq!(ts[offset], data_stream_value.timestamp);
521 }
522 }
523
524 #[test]
525 fn should_evaluate_avg_aggregation() {
526 const LOCAL_HISTORY_SIZE: usize = 4;
527 let mut data_stream: DataStream =
528 create_data_stream(INITIAL_TS, LOCAL_HISTORY_SIZE, BUCKET_SIZE_MS);
529
530 let ts = vec![
531 INITIAL_TS - BUCKET_SIZE_MS,
532 INITIAL_TS,
533 INITIAL_TS + BUCKET_SIZE_MS,
534 INITIAL_TS + 2 * BUCKET_SIZE_MS,
535 ];
536 let values = vec![2f64, 2f64, 3f64, 3f64];
537 assert_eq!(ts.len(), values.len());
538 zip(ts.clone(), values.clone()).for_each(|(t, y)| {
539 data_stream.add_value(t, y);
540 });
541
542 const AGGREGATION_MS: u64 = 2 * BUCKET_SIZE_MS;
543 let mut out: Vec<f64> = Vec::new();
544 data_stream.agg(aggregation_avg, AGGREGATION_MS, &mut out);
545
546 assert_eq!(2usize, out.len());
547 assert_eq!(2f64, *out.index(0));
548 assert_eq!(3f64, *out.index(1));
549 }
550
551 #[test]
552 fn should_evaluate_max_aggregation() {
553 const LOCAL_HISTORY_SIZE: usize = 4;
554 let mut data_stream: DataStream =
555 create_data_stream(INITIAL_TS, LOCAL_HISTORY_SIZE, BUCKET_SIZE_MS);
556
557 let ts = vec![
558 INITIAL_TS - BUCKET_SIZE_MS,
559 INITIAL_TS,
560 INITIAL_TS + BUCKET_SIZE_MS,
561 INITIAL_TS + 2 * BUCKET_SIZE_MS,
562 ];
563 let values = vec![5f64, 2f64, 3f64, 10f64];
564 assert_eq!(ts.len(), values.len());
565 zip(ts.clone(), values.clone()).for_each(|(t, y)| {
566 data_stream.add_value(t, y);
567 });
568
569 const AGGREGATION_MS: u64 = 2 * BUCKET_SIZE_MS;
570 let mut out: Vec<f64> = Vec::new();
571 data_stream.agg(aggregation_max, AGGREGATION_MS, &mut out);
572
573 assert_eq!(2usize, out.len());
574 assert_eq!(5f64, *out.index(0));
575 assert_eq!(10f64, *out.index(1));
576 }
577
578 #[test]
579 fn should_evaluate_min_aggregation() {
580 const LOCAL_HISTORY_SIZE: usize = 4;
581 let mut data_stream: DataStream =
582 create_data_stream(INITIAL_TS, LOCAL_HISTORY_SIZE, BUCKET_SIZE_MS);
583
584 let ts = vec![
585 INITIAL_TS - BUCKET_SIZE_MS,
586 INITIAL_TS,
587 INITIAL_TS + BUCKET_SIZE_MS,
588 INITIAL_TS + 2 * BUCKET_SIZE_MS,
589 ];
590 let values = vec![5f64, 2f64, 3f64, 10f64];
591 assert_eq!(ts.len(), values.len());
592 zip(ts.clone(), values.clone()).for_each(|(t, y)| {
593 data_stream.add_value(t, y);
594 });
595
596 const AGGREGATION_MS: u64 = 2 * BUCKET_SIZE_MS;
597 let mut out: Vec<f64> = Vec::new();
598 data_stream.agg(aggregation_min, AGGREGATION_MS, &mut out);
599
600 assert_eq!(2usize, out.len());
601 assert_eq!(2f64, *out.index(0));
602 assert_eq!(3f64, *out.index(1));
603 }
604}