1#![feature(test)]
2#![feature(slice_partition_dedup)]
3extern crate test;
4pub use crate::utils::*;
5use chrono::prelude::*;
6use std::fs::File;
7use std::io::{BufRead, BufReader, BufWriter, Write};
8use std::path::Path;
9use plotly::{Plot, Scatter};
10use plotly::layout::{Axis, Layout, Margin};
11use plotly::common::{Font};
12use plotly::color::Rgba;
13
14pub mod load_log_dad141;
15pub mod load_plot;
16pub mod load_process;
17pub mod load_to_hourly;
18pub mod utils;
19
20pub const VERSION: Option<&'static str> = option_env!("CARGO_PKG_VERSION");
21pub const ERROR_STR_GENERAL: &str = "E+999999.";
22pub const ERROR_STR_NONE: &str = "E+999998.";
23pub const ERROR_STR_INVALID: &str = "E+999997.";
24pub const ERROR_STR_SKIPPED: &str = "E+999996.";
25pub const ERROR_STR_PARSE: &str = "E+999995.";
26pub const ERROR_FLT_GENERAL: f64 = 999999.;
27pub const ERROR_FLT_NONE: f64 = 999998.;
28pub const ERROR_FLT_INVALID: f64 = 999997.;
29pub const ERROR_FLT_SKIPPED: f64 = 999996.;
30pub const ERROR_FLT_PARSE: f64 = 999995.;
31
32#[derive(Debug, Clone)]
34pub struct TimeLoad {
35 pub time: Vec<DateTime<FixedOffset>>,
36 pub load: Vec<f64>,
37}
38
39impl TimeLoad {
40 pub fn new(capacity: usize) -> TimeLoad {
43 let time: Vec<DateTime<FixedOffset>> = Vec::with_capacity(capacity);
44 let load: Vec<f64> = Vec::with_capacity(capacity);
45 let timeload: TimeLoad = TimeLoad { time, load };
46 timeload
47 }
48
49 pub fn from_csv<P>(fin: P) -> TimeLoad
55 where
56 P: AsRef<Path>,
57 {
58 let file = File::open(fin).unwrap();
59 let buf = BufReader::new(file);
60 let mut timeload = TimeLoad::new(10000 as usize);
61
62 for l in buf.lines().skip(1) {
63 let l_unwrap = match l {
64 Ok(l_ok) => l_ok,
65 Err(l_err) => {
66 println!("Err, could not read/unwrap line {}", l_err);
67 continue;
68 }
69 };
70 let mut l_split = l_unwrap.split(',');
71 let l_split_datetime = l_split.next().unwrap();
72 let l_split_load = l_split.next().unwrap();
73 let parsed_datetime = match DateTime::parse_from_rfc3339(l_split_datetime) {
74 Ok(parsed_datetime) => parsed_datetime,
75 Err(e) => {
76 println!(
77 "Could not parse datetime: {}, error {}",
78 l_split_datetime, e
79 );
80 continue;
81 }
82 };
83 timeload.time.push(parsed_datetime);
84 match l_split_load.parse::<f64>() {
85 Ok(parsed_load) => timeload.load.push(parsed_load),
86 Err(e) => {
87 println!(
88 "Could not parse load: {}, at datetime {}. Error: {}",
89 l_split_load, parsed_datetime, e
90 );
91 timeload.load.push(f64::NAN);
92 }
93 }
94 }
95 timeload
96 }
97
98 pub fn is_ordered(&self) {
100 self.time.windows(2).for_each(|w| {
101 assert!(
102 w[1] > w[0],
103 "time series is not ordered: {} < {}",
104 w[1],
105 w[0]
106 )
107 });
108 }
109
110 pub fn is_ordered_and_continuous(&self) {
112 self.time
113 .windows(2)
114 .map(|w| {
115 assert!( w[1] > w[0], "time series is not ordered: {} < {}", w[1], w[0]);
116 w[1] - w[0]
117 })
118 .reduce(|wp, wn| {
119 assert_eq!(wp, wn, "time series is not continuous");
120 wn
121 });
122 }
123
124 pub fn fill_missing_with_nan(&self) -> TimeLoad {
128 let min_delta = self
129 .time
130 .windows(2)
131 .map(|dtw| dtw[1] - dtw[0])
132 .min()
133 .unwrap();
134 let mut timeload = TimeLoad::new(self.time.len());
135 for (dtw, load) in self.time.windows(2).zip(self.load.iter()) {
136 let mut current_dt: DateTime<FixedOffset> = dtw[0];
137 timeload.time.push(current_dt);
138 timeload.load.push(*load);
139 while current_dt + min_delta < dtw[1] {
140 current_dt = current_dt.checked_add_signed(min_delta).unwrap();
141 timeload.time.push(current_dt);
142 timeload.load.push(f64::NAN);
143 }
144 }
145 timeload.time.push(self.time[self.time.len() - 1]);
146 timeload.load.push(self.load[self.load.len() - 1]);
147 timeload
148 }
149
150 pub fn replace_bad_datetimes_with_nan(&mut self, bad_datetimes: Vec<DateTime<FixedOffset>>) {
152 for bdt in bad_datetimes.into_iter() {
153 match self.time.iter().position(|d| *d == bdt) {
154 Some(i) => self.load[i] = f64::NAN,
155 None => println!("could not find and exclude bad datetime {}", bdt),
156 }
157 }
158 }
159
160 pub fn to_hourly(& self) -> Result<TimeLoad, EmptyTimeLoad> {
162
163 if self.time.len() == 0 {
164 return Err(EmptyTimeLoad{})
165 }
166
167 let mut hourly_timeload = TimeLoad::new(self.time.len() / 60);
169 let mut hourly_time: Option<DateTime<FixedOffset>> = None;
170 let mut hourly_loads: Vec<f64> = Vec::with_capacity(60);
171
172 self.time
173 .iter()
174 .zip(self.load.iter())
175 .for_each(|(t, l)| {
176
177 let mut iter_time = t.clone();
179 if iter_time.minute() >= 30u32 {
180 iter_time += chrono::Duration::hours(1i64);
181 }
182 iter_time = iter_time.with_minute(0u32).unwrap();
183 iter_time = iter_time.trunc_subsecs(0u16);
184
185 match hourly_time {
186
187 Some(ht) => {
188
189 if ht == iter_time {
190 if !l.is_nan() {
191 hourly_loads.push(*l)
192 }
193
194 } else {
195
196 let hourly_mean_load = mean_or_nan(&hourly_loads);
198 hourly_timeload.time.push(ht);
199 hourly_timeload.load.push(hourly_mean_load);
200
201 hourly_time = Some(iter_time);
203 hourly_loads.clear();
205 if !l.is_nan() {
206 hourly_loads.push(*l)
207 }
208
209 }
210 },
211 None => {
212 hourly_time = Some(iter_time);
213 if !l.is_nan() {
214 hourly_loads.push(*l)
215 }
216
217 },
218 }
219 });
220
221 let hourly_mean_load = mean_or_nan(&hourly_loads);
223 hourly_timeload.time.push(hourly_time.unwrap());
224 hourly_timeload.load.push(hourly_mean_load);
225
226 Ok(hourly_timeload)
227 }
228
229 pub fn replace_bad_time_interval_with_nan(
232 &mut self,
233 time_init: NaiveTime,
234 time_stop: NaiveTime,
235 ) {
236 self.time
237 .iter()
238 .zip(self.load.iter_mut())
239 .for_each(|(t, l)| {
240 if (t.time() > time_init) & (t.time() < time_stop) {
241 *l = f64::NAN;
242 }
243 });
244 }
245
246 pub fn replace_outliers_with_nan(&mut self, min_load: f64, max_load: f64) {
248 self.load.iter_mut().for_each(|l| {
249 if (*l > max_load) | (*l < min_load) {
250 println!(
251 "setting to NAN value out of range (min: {}, max {}): {}",
252 min_load, max_load, l
253 );
254 *l = f64::NAN;
255 }
256 });
257 }
258
259 pub fn replace_errors_with_nan(&mut self, max_value: f64) {
262 self.load.iter_mut().for_each(|l| {
263 if *l > max_value {
264 println!("found invalid value: {}", l);
265 *l = f64::NAN;
266 }
267 });
268 }
269
270 pub fn to_csv<P>(self, fout: P)
274 where
275 P: AsRef<Path>,
276 {
277 let file = File::create(fout).unwrap();
278 let mut buf = BufWriter::new(file);
279 buf.write_all("datetime,load_kg\n".as_bytes()).unwrap();
280 for (t, w) in self.time.iter().zip(self.load.iter()) {
281 buf.write_all(format!("{},{}\n", t.to_rfc3339(), w).as_bytes())
282 .unwrap();
283 }
284 }
285
286 pub fn plotly_plot_datetime<P>(&self, fout: P) -> Result<(), Box<dyn std::error::Error>>
287 where
288 P: AsRef<Path>,
289 {
290 let margin = Margin::new().left(100).pad(20);
291 let mut plot = Plot::new();
292 let trace = Scatter::new(self.time.iter().map(|t| t.to_rfc3339()).collect(), self.load.clone());
293 plot.add_trace(trace);
294 let background_color: Rgba = Rgba::new(200, 200, 200, 0.5);
295 let layout = Layout::new()
296 .x_axis(Axis::new().title("Time").zero_line(false).line_width(2).n_ticks(12))
297 .y_axis(Axis::new().title("Load kg").zero_line(false).tick_format("d").line_width(2))
298 .font(Font::new().size(16))
299 .plot_background_color(background_color)
300 .margin(margin)
301 .height(720).width(1280); plot.set_layout(layout);
303 plot.write_html(fout);
304 plot.show();
305 Ok(())
306 }
307}
308
309impl std::fmt::Display for TimeLoad {
310 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
311 write!(f, "datetime, load [kg]\n")?;
312 for (t, w) in self.time.iter().zip(self.load.iter()) {
313 write!(f, "{},{}\n", t.to_rfc3339(), w)?
314 }
315 Ok(())
316 }
317}
318
319#[cfg(test)]
324mod tests {
325
326 use super::*;
327
328 #[test]
329 fn datetime_parsing_with_timezone() {
332 let timezone_hours: i32 = -8;
333 let timezone = timezone_hours * 60 * 60;
334 let timezone_fixed_offset = FixedOffset::east_opt(timezone).unwrap();
335
336 let dtstr_dst = "2021-11-07T01:30:00-07:00";
338 let dtiso_dst = DateTime::parse_from_rfc3339(dtstr_dst).unwrap();
339 let dtfix_dst = dtiso_dst.with_timezone(&timezone_fixed_offset);
340 println!(
341 "datetime str {} parsed as {}, fixed {}",
342 dtstr_dst, dtiso_dst, dtfix_dst
343 );
344
345 let dtstr_pst = "2021-11-07T01:30:00-08:00";
347 let dtiso_pst = DateTime::parse_from_rfc3339(dtstr_pst).unwrap();
348 let dtfix_pst = dtiso_pst.with_timezone(&timezone_fixed_offset);
349 println!(
350 "datetime str {} parsed as {}, fixed {}",
351 dtstr_pst, dtiso_pst, dtfix_pst
352 );
353
354 let timediff = chrono::Duration::hours(1);
356 assert!(dtfix_pst - timediff == dtfix_dst);
357 }
358
359 #[test]
360 fn test_get_current_datetime_offset() {
362 let dtnow: DateTime<Local> = Local::now();
363 println!(
364 "local time is {}",
365 dtnow.to_rfc3339_opts(SecondsFormat::Secs, false)
366 );
367 let local_offset = dtnow.offset();
368 println!("local offet is {}", local_offset);
369 }
370
371 #[test]
372 fn test_find_anomaly_homogeneous() {
374 let a = [5.0f64; 15];
375 let expected: Vec<f64> = Vec::new();
376 let (_, anomalies_load) = find_anomalies(&a, 7usize, 6usize, 5.0f64);
377 assert!(anomalies_load == expected);
378 }
379
380 #[test]
381 fn test_find_anomaly_linear() {
383 let v: Vec<f64> = (1..15).map(|n| n as f64).collect();
384 let expected: Vec<f64> = Vec::new();
385 let (_, anomalies_load) = find_anomalies(&v, 7usize, 6usize, 5.0f64);
386 assert!(anomalies_load == expected);
387 }
388
389 #[test]
390 fn test_find_anomaly_nans() {
392 let mut v: Vec<f64> = (1..15).map(|n| n as f64).collect();
393 v.iter_mut().enumerate().for_each(|(i, e)| {
394 if i < 6usize {
395 *e = f64::NAN
396 }
397 });
398 let expected: Vec<f64> = Vec::new();
399 let (_, anomalies_load) = find_anomalies(&v, 7usize, 6usize, 5.0f64);
400 assert!(anomalies_load == expected);
401 }
402
403 #[test]
404 fn test_find_anomaly_discontinuity() {
406 let mut v: Vec<f64> = (1..15).map(|n| n as f64).collect();
407 v.iter_mut().enumerate().for_each(|(i, e)| {
408 if i < 8usize {
409 *e = 20.
410 }
411 });
412 let (_, anomalies_load) = find_anomalies(&v, 7usize, 6usize, 5.0f64);
413 let expected: Vec<f64> = vec![20.0, 20.0, 20.0, 20.0, 9.0, 10.0, 11.0, 12.0, 13.0];
414 assert!(anomalies_load == expected);
415 }
416
417 #[test]
418 fn test_deduplicate_load_series() {
421 let mut v_all = vec![1., 2., 2., 2., 3., 4., 4., 5., 6., 6.];
422 let v_unique = vec![1., 2., 3., 4., 5., 6.];
423 let v_duplicates = vec![2., 2., 4., 6.];
424
425 v_all.sort_by(|a, b| a.partial_cmp(b).unwrap());
426 let (dedup, duplicates) = v_all.partition_dedup_by(|a, b| a == b);
427
428 let mut duplicates = duplicates.to_owned();
429 duplicates.sort_by(|a, b| a.partial_cmp(b).unwrap());
430
431 assert!(v_unique == dedup);
433 assert!(v_duplicates == duplicates);
434 }
435
436 #[test]
437 fn test_discharge_by_index() {
438 let vall: Vec<f64> = (1..20).map(|n| n as f64).collect();
439 let indices: Vec<usize> = vec![2, 3, 7, 12, 13, 18];
440 let mut expected: Vec<f64> = (1..20).map(|n| n as f64).collect();
441 for i in indices.iter().rev() {
442 expected.remove(*i);
443 }
444 let vout = discharge_by_index(&vall, &indices);
445 println!("vout\n{:?}\nexpected\n{:?}", vout, expected);
446 assert!(compare_vecf64_approx(&vout, &expected));
447 }
448
449 #[test]
450 fn test_setnan_by_index() {
451 let mut vall: Vec<f64> = (1..20).map(|n| n as f64).collect();
452 let indices: Vec<usize> = vec![2, 3, 7, 12, 13, 18];
453 let mut expected: Vec<f64> = (1..20).map(|n| n as f64).collect();
454 indices.iter().for_each(|i| expected[*i] = f64::NAN);
455 setnan_by_index(&mut vall, &indices);
456 assert!(compare_vecf64_approx(&vall, &expected));
457 }
458
459 #[test]
460 fn test_all_steps() {
462 let mut timezone: i32 = -8;
464 timezone *= 60 * 60;
465 let timezone_fixed_offset = FixedOffset::east_opt(timezone).unwrap();
466
467 let mut tl = TimeLoad::from_csv(String::from("./test/timeload_raw.csv"));
469 tl.time
470 .iter_mut()
471 .for_each(|t| *t = t.with_timezone(&timezone_fixed_offset));
472 println!("{}", tl);
473
474 tl.is_ordered();
476
477 let mut ctl = tl.fill_missing_with_nan();
479 ctl.is_ordered_and_continuous();
480 println!("{}", ctl);
481
482 let bad = read_bad_datetimes("./test/bad_datetimes.csv");
484 ctl.replace_bad_datetimes_with_nan(bad);
485 println!("{}", ctl);
486
487 let time_init = NaiveTime::parse_from_str("01:02", "%H:%M").unwrap();
490 let time_stop = NaiveTime::parse_from_str("01:05", "%H:%M").unwrap();
491 ctl.replace_bad_time_interval_with_nan(time_init, time_stop);
492 println!("{}", ctl);
493
494 ctl.replace_errors_with_nan(99995.);
496 println!("{}", ctl);
497
498 ctl.replace_outliers_with_nan(10000., 18000.);
500 println!("{}", ctl);
501
502 let (anomalies_indices, _) = find_anomalies(&ctl.load, 16usize, 8usize, 40.0f64);
504 println!("indices of the anomalies:\n{:?}", anomalies_indices);
505 let mut atl = TimeLoad::new(anomalies_indices.len());
506 for i in anomalies_indices.iter() {
507 atl.time.push(ctl.time.get(*i).unwrap().clone());
508 atl.load.push(ctl.load.get(*i).unwrap().clone());
509 }
510 atl.to_csv("./test/timeload_anomalies.csv");
511
512 setnan_by_index(&mut ctl.load[..], &anomalies_indices);
513
514 let mavg_window = make_window(3., 1., 5usize);
516 let smooth = mavg(&ctl.load[..], &mavg_window, 5 as usize, 80.);
517 println!("{:?}", smooth);
518
519 ctl.load = smooth;
522
523 ctl.plotly_plot_datetime("./test/timeload_processed.svg").unwrap();
525
526 ctl.to_csv("./test/timeload_processed.csv");
528 }
529
530 #[test]
531 fn test_to_hourly() {
532 let tl = TimeLoad::from_csv(String::from("./test/short_for_hourly.csv"));
533 let htl = &tl.to_hourly().expect("empty data set");
534 let correct_hourly_loads = vec![1.0f64, 1.5f64];
535 let correct_hourly_times = vec![
536 DateTime::parse_from_rfc3339("2021-10-13T23:00:00-08:00").unwrap(),
537 DateTime::parse_from_rfc3339("2021-10-14T00:00:00-08:00").unwrap(),
538 ];
539 assert! {compare_vecf64_approx(&htl.load, &correct_hourly_loads)};
540 assert! {htl.time == correct_hourly_times};
541 }
542
543 #[test]
544 fn test_all_steps_parallel() {
546 let mut timezone: i32 = -8;
548 timezone *= 60 * 60;
549 let timezone_fixed_offset = FixedOffset::east_opt(timezone).unwrap();
550
551 let mut tl = TimeLoad::from_csv(String::from("./test/parallel_timeload_raw.csv"));
553 tl.time
554 .iter_mut()
555 .for_each(|t| *t = t.with_timezone(&timezone_fixed_offset));
556 println!("{}", tl);
557
558 tl.is_ordered();
560
561 let mut ctl = tl.fill_missing_with_nan();
563 ctl.is_ordered_and_continuous();
564 println!("{}", ctl);
565
566 let bad = read_bad_datetimes("./test/parallel_bad_datetimes.csv");
568 ctl.replace_bad_datetimes_with_nan(bad);
569 println!("{}", ctl);
570
571 let time_init = NaiveTime::parse_from_str("01:02", "%H:%M").unwrap();
574 let time_stop = NaiveTime::parse_from_str("01:05", "%H:%M").unwrap();
575 ctl.replace_bad_time_interval_with_nan(time_init, time_stop);
576 println!("{}", ctl);
577
578 ctl.replace_errors_with_nan(99995.);
580 println!("{}", ctl);
581
582 ctl.replace_outliers_with_nan(10000., 18000.);
584 println!("{}", ctl);
585
586 let (anomalies_indices, _) = find_anomalies(&ctl.load, 16usize, 8usize, 40.0f64);
588 println!("indices of the anomalies:\n{:?}", anomalies_indices);
589 let mut atl = TimeLoad::new(anomalies_indices.len());
590 for i in anomalies_indices.iter() {
591 atl.time.push(ctl.time.get(*i).unwrap().clone());
592 atl.load.push(ctl.load.get(*i).unwrap().clone());
593 }
594 atl.to_csv("./test/parallel_timeload_anomalies.csv");
595
596 setnan_by_index(&mut ctl.load[..], &anomalies_indices);
597
598 let mavg_window = make_window(3., 1., 2usize);
600 let smooth = mavg_parallel_fold(&ctl.load[..], &mavg_window);
601 println!("{:?}", smooth);
602
603 let correct_smooth = vec![
604 f64::NAN,
605 f64::NAN,
606 13003.0,
607 13004.0,
608 13005.0,
609 13006.0,
610 13007.0,
611 13008.0,
612 13008.8,
613 13009.6,
614 13010.3,
615 13011.1,
616 13012.0,
617 13013.0,
618 13014.0,
619 13015.0,
620 13016.0,
621 13017.0,
622 13018.0,
623 f64::NAN,
624 f64::NAN,
625 ];
626
627 assert! {compare_vecf64_approx(&smooth, &correct_smooth)};
628
629 ctl.load = smooth;
632
633 ctl.plotly_plot_datetime("./test/parallel_timeload_processed.svg")
635 .unwrap();
636
637 ctl.to_csv("./test/parallel_timeload_processed.csv");
639 }
640
641 #[test]
642 fn test_awat() {
643 let min_w: u8 = 5;
644 let max_w: u8 = 21;
645
646 let mut v: [f64; 30] = core::array::from_fn(|i| i as f64);
649 for i in 0..30 {
650 v[i] = 1.
651 }
652 let (k, b) = awat_regression(&v, 21usize);
653 println!{"selected polynomial order is {}, with associated b of {}", k, b};
654 let mut calc_w = (b * max_w as f64).round() as u8;
655 if calc_w % 2 == 0 {
656 calc_w += 1
657 }
658 println!{"calc window width: {}", calc_w};
659 let w = std::cmp::max(min_w , calc_w);
660 println!{"selected window width: {}", w};
661 assert! { w == min_w}
662
663 let mut v: [f64; 30] = core::array::from_fn(|i| i as f64);
666 for i in 0..30 {
667 if i % 2 == 0 {
668 v[i] = 1.
669 } else {
670 v[i] = 10.
671 }
672 }
673 let (k, b) = awat_regression(&v, 21usize);
674 println!{"selected polynomial order is {}, with associated b of {}", k, b};
675 let mut calc_w = (b * max_w as f64).round() as u8;
676 if calc_w % 2 == 0 {
677 calc_w += 1
678 }
679 println!{"calc window width: {}", calc_w};
680 let w = std::cmp::max(min_w , calc_w);
681 println!{"selected window width: {}", w};
682 assert! { w == calc_w}
683
684 }
685}
686
687#[bench]
688fn bench_mavg_parallel_simd(b: &mut test::Bencher) {
689 let v = vec![1000.; 1E+5 as usize];
690 let w = make_window(3., 1., 180 as usize);
691 b.iter(|| {
692 mavg_parallel_simd(&v, &w);
693 });
694}
695
696#[bench]
697fn bench_mavg_parallel_fold(b: &mut test::Bencher) {
698 let v = vec![1000.; 1E+5 as usize];
699 let w = make_window(3., 1., 180 as usize);
700 b.iter(|| {
701 mavg_parallel_fold(&v, &w);
702 });
703}
704
705#[bench]
706fn bench_mavg(b: &mut test::Bencher) {
707 let v = vec![1000.; 1E+5 as usize];
708 let w = make_window(3., 1., 180 as usize);
709 b.iter(|| {
710 mavg(&v, &w, 1usize, 1f64);
711 });
712}