1#![feature(test)]
2#![feature(slice_partition_dedup)]
3extern crate test;
4pub use crate::utils::*;
5use chrono::prelude::*;
6use plotters::prelude::*;
7use std::fs::File;
8use std::io::{BufRead, BufReader, BufWriter, Write};
9use std::path::Path;
10
11pub mod load_log_dad141;
12pub mod load_plot;
13pub mod load_process;
14pub mod utils;
15
16pub const VERSION: Option<&'static str> = option_env!("CARGO_PKG_VERSION");
17pub const ERROR_STR_GENERAL: &str = "E+999999.";
18pub const ERROR_STR_NONE: &str = "E+999998.";
19pub const ERROR_STR_INVALID: &str = "E+999997.";
20pub const ERROR_STR_SKIPPED: &str = "E+999996.";
21pub const ERROR_STR_PARSE: &str = "E+999995.";
22pub const ERROR_FLT_GENERAL: f64 = 999999.;
23pub const ERROR_FLT_NONE: f64 = 999998.;
24pub const ERROR_FLT_INVALID: f64 = 999997.;
25pub const ERROR_FLT_SKIPPED: f64 = 999996.;
26pub const ERROR_FLT_PARSE: f64 = 999995.;
27
28#[derive(Debug, Clone)]
31pub struct TimeLoad {
32 pub time: Vec<DateTime<FixedOffset>>,
33 pub load: Vec<f64>,
34}
35
36impl TimeLoad {
37 pub fn new(capacity: usize) -> TimeLoad {
40 let time: Vec<DateTime<FixedOffset>> = Vec::with_capacity(capacity);
41 let load: Vec<f64> = Vec::with_capacity(capacity);
42 let timeload: TimeLoad = TimeLoad { time, load };
43 timeload
44 }
45
46 pub fn from_csv<P>(fin: P) -> TimeLoad
52 where
53 P: AsRef<Path>,
54 {
55 let file = File::open(fin).unwrap();
56 let buf = BufReader::new(file);
57 let mut timeload = TimeLoad::new(10000 as usize);
58 for l in buf.lines().skip(1) {
59 let l_unwrap = match l {
60 Ok(l_ok) => l_ok,
61 Err(l_err) => {
62 println!("Err, could not read/unwrap line {}", l_err);
63 continue;
64 }
65 };
66 let mut l_split = l_unwrap.split(',');
67 let l_split_datetime = l_split.next().unwrap();
68 let l_split_load = l_split.next().unwrap();
69 let parsed_datetime = match DateTime::parse_from_rfc3339(l_split_datetime) {
70 Ok(parsed_datetime) => parsed_datetime,
71 Err(e) => {
72 println!(
73 "Could not parse datetime: {}, error {}",
74 l_split_datetime, e
75 );
76 continue;
77 }
78 };
79 timeload.time.push(parsed_datetime);
80 match l_split_load.parse::<f64>() {
81 Ok(parsed_load) => timeload.load.push(parsed_load),
82 Err(e) => {
83 println!(
84 "Could not parse load: {}, at datetime {}. Error: {}",
85 l_split_load, parsed_datetime, e
86 );
87 timeload.load.push(f64::NAN);
88 }
89 }
90 }
91 timeload
92 }
93
94 pub fn is_ordered(&self) {
96 self.time.windows(2).for_each(|w| {
97 assert!(
98 w[1] > w[0],
99 "time series is not ordered: {} < {}",
100 w[1],
101 w[0]
102 )
103 });
104 }
105
106 pub fn is_ordered_and_continuous(&self) {
108 self.time
109 .windows(2)
110 .map(|w| {
111 assert!(
112 w[1] > w[0],
113 "time series is not ordered: {} < {}",
114 w[1],
115 w[0]
116 );
117 w[1] - w[0]
118 })
119 .reduce(|wp, wn| {
120 assert_eq!(wp, wn, "time series is not continuous");
121 wn
122 });
123 }
124
125 pub fn fill_missing_with_nan(&self) -> TimeLoad {
129 let min_delta = self
130 .time
131 .windows(2)
132 .map(|dtw| dtw[1] - dtw[0])
133 .min()
134 .unwrap();
135 let mut timeload = TimeLoad::new(self.time.len());
136 for (dtw, load) in self.time.windows(2).zip(self.load.iter()) {
137 let mut current_dt: DateTime<FixedOffset> = dtw[0];
138 timeload.time.push(current_dt);
139 timeload.load.push(*load);
140 while current_dt + min_delta < dtw[1] {
141 current_dt = current_dt.checked_add_signed(min_delta).unwrap();
142 timeload.time.push(current_dt);
143 timeload.load.push(f64::NAN);
144 }
145 }
146 timeload.time.push(self.time[self.time.len() - 1]);
147 timeload.load.push(self.load[self.load.len() - 1]);
148 timeload
149 }
150
151 pub fn replace_bad_datetimes_with_nan(&mut self, bad_datetimes: Vec<DateTime<FixedOffset>>) {
153 for bdt in bad_datetimes.into_iter() {
154 match self.time.iter().position(|d| *d == bdt) {
155 Some(i) => self.load[i] = f64::NAN,
156 None => println!("could not find and exclude bad datetime {}", bdt),
157 }
158 }
159 }
160
161 pub fn replace_bad_time_interval_with_nan(
164 &mut self,
165 time_init: NaiveTime,
166 time_stop: NaiveTime,
167 ) {
168 self.time
169 .iter()
170 .zip(self.load.iter_mut())
171 .for_each(|(t, l)| {
172 if (t.time() > time_init) & (t.time() < time_stop) {
173 *l = f64::NAN;
174 }
175 });
176 }
177
178 pub fn replace_outliers_with_nan(&mut self, min_load: f64, max_load: f64) {
180 self.load.iter_mut().for_each(|l| {
181 if (*l > max_load) | (*l < min_load) {
182 println!(
183 "setting to NAN value out of range (min: {}, max {}): {}",
184 min_load, max_load, l
185 );
186 *l = f64::NAN;
187 }
188 });
189 }
190
191 pub fn replace_errors_with_nan(&mut self, max_value: f64) {
194 self.load.iter_mut().for_each(|l| {
195 if *l > max_value {
196 println!("found invalid value: {}", l);
197 *l = f64::NAN;
198 }
199 });
200 }
201
202 pub fn to_csv<P>(self, fout: P)
205 where
206 P: AsRef<Path>,
207 {
208 let file = File::create(fout).unwrap();
209 let mut buf = BufWriter::new(file);
210 buf.write_all("datetime,load_kg\n".as_bytes()).unwrap();
211 for (t, w) in self.time.iter().zip(self.load.iter()) {
212 buf.write_all(format!("{},{}\n", t.to_rfc3339(), w).as_bytes())
213 .unwrap();
214 }
215 }
216
217 pub fn plot_datetime<P>(&self, fout: P) -> Result<(), Box<dyn std::error::Error>>
219 where
220 P: AsRef<Path>,
221 {
222 let (xmin, xmax) = min_and_max(self.time.iter());
223 let xspan: chrono::Duration = xmax - xmin;
224 let xfmt = suitable_xfmt(xspan);
225 let (ymin, ymax) = min_and_max(self.load.iter().filter(|x| !x.is_nan()));
226 let yspan = (ymax - ymin) / 10f64;
227 let ymin = ymin - yspan;
228 let ymax = ymax + yspan;
229 let root = SVGBackend::new(&fout, (1600, 800)).into_drawing_area();
230 root.fill(&WHITE)?;
231 let mut chart = ChartBuilder::on(&root)
232 .margin(50)
233 .x_label_area_size(40)
234 .y_label_area_size(100)
235 .build_cartesian_2d(xmin.clone()..xmax.clone(), ymin..ymax)?;
236 chart
237 .configure_mesh()
238 .light_line_style(&TRANSPARENT)
239 .bold_line_style(RGBColor(100, 100, 100).mix(0.5).stroke_width(2))
240 .set_all_tick_mark_size(2)
241 .label_style(("sans-serif", 20))
242 .y_desc("load [kg]")
243 .x_labels(16)
244 .y_labels(25)
245 .x_label_formatter(&|x| x.format(xfmt).to_string())
246 .y_label_formatter(&|x: &f64| format!("{:5}", x))
247 .x_desc(format!("datetime [{}]", xfmt.replace("%", "")))
248 .draw()?;
249 let witer = &mut self.load[..].split(|x| x.is_nan());
250 let titer = &mut self.time[..].into_iter();
251 for wchunk in witer.into_iter() {
252 if wchunk.len() == 0 {
253 titer.next();
254 continue;
255 } else {
256 let area =
257 AreaSeries::new(titer.zip(wchunk).map(|(x, y)| (*x, *y)), 0.0, &RED.mix(0.2))
258 .border_style(BLACK.stroke_width(1));
259 chart.draw_series(area)?;
260 }
261 }
262 Ok(())
263 }
264}
265
266impl std::fmt::Display for TimeLoad {
267 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
268 write!(f, "datetime, load [kg]\n")?;
269 for (t, w) in self.time.iter().zip(self.load.iter()) {
270 write!(f, "{},{}\n", t.to_rfc3339(), w)?
271 }
272 Ok(())
273 }
274}
275
276#[cfg(test)]
281mod tests {
282
283 use super::*;
284
285 #[test]
286 fn datetime_parsing_with_timezone() {
289 let timezone_hours: i32 = -8;
290 let timezone = timezone_hours * 60 * 60;
291 let timezone_fixed_offset = FixedOffset::east(timezone);
292
293 let dtstr_dst = "2021-11-07T01:30:00-07:00";
295 let dtiso_dst = DateTime::parse_from_rfc3339(dtstr_dst).unwrap();
296 let dtfix_dst = dtiso_dst.with_timezone(&timezone_fixed_offset);
297 println!(
298 "datetime str {} parsed as {}, fixed {}",
299 dtstr_dst, dtiso_dst, dtfix_dst
300 );
301
302 let dtstr_pst = "2021-11-07T01:30:00-08:00";
304 let dtiso_pst = DateTime::parse_from_rfc3339(dtstr_pst).unwrap();
305 let dtfix_pst = dtiso_pst.with_timezone(&timezone_fixed_offset);
306 println!(
307 "datetime str {} parsed as {}, fixed {}",
308 dtstr_pst, dtiso_pst, dtfix_pst
309 );
310
311 let timediff = chrono::Duration::hours(1);
313 assert!(dtfix_pst - timediff == dtfix_dst);
314 }
315
316 #[test]
317 fn test_get_current_datetime_offset() {
319 let dtnow: DateTime<Local> = Local::now();
320 println!(
321 "local time is {}",
322 dtnow.to_rfc3339_opts(SecondsFormat::Secs, false)
323 );
324 let local_offset = dtnow.offset();
325 println!("local offet is {}", local_offset);
326 }
327
328 #[test]
329 fn test_find_anomaly_homogeneous() {
331 let a = [5.0f64; 15];
332 let expected: Vec<f64> = Vec::new();
333 let (_, anomalies_load) = find_anomalies(&a, 7usize, 6usize, 5.0f64);
334 assert!(anomalies_load == expected);
335 }
336
337 #[test]
338 fn test_find_anomaly_linear() {
340 let v: Vec<f64> = (1..15).map(|n| n as f64).collect();
341 let expected: Vec<f64> = Vec::new();
342 let (_, anomalies_load) = find_anomalies(&v, 7usize, 6usize, 5.0f64);
343 assert!(anomalies_load == expected);
344 }
345
346 #[test]
347 fn test_find_anomaly_nans() {
349 let mut v: Vec<f64> = (1..15).map(|n| n as f64).collect();
350 v.iter_mut().enumerate().for_each(|(i, e)| {
351 if i < 6usize {
352 *e = f64::NAN
353 }
354 });
355 let expected: Vec<f64> = Vec::new();
356 let (_, anomalies_load) = find_anomalies(&v, 7usize, 6usize, 5.0f64);
357 assert!(anomalies_load == expected);
358 }
359
360 #[test]
361 fn test_find_anomaly_discontinuity() {
363 let mut v: Vec<f64> = (1..15).map(|n| n as f64).collect();
364 v.iter_mut().enumerate().for_each(|(i, e)| {
365 if i < 8usize {
366 *e = 20.
367 }
368 });
369 let (_, anomalies_load) = find_anomalies(&v, 7usize, 6usize, 5.0f64);
370 let expected: Vec<f64> = vec![20.0, 20.0, 20.0, 20.0, 9.0, 10.0, 11.0, 12.0, 13.0];
371 assert!(anomalies_load == expected);
372 }
373
374 #[test]
375 fn test_deduplicate_load_series() {
378 let mut v_all = vec![1., 2., 2., 2., 3., 4., 4., 5., 6., 6.];
379 let v_unique = vec![1., 2., 3., 4., 5., 6.];
380 let v_duplicates = vec![2., 2., 4., 6.];
381
382 v_all.sort_by(|a, b| a.partial_cmp(b).unwrap());
383 let (dedup, duplicates) = v_all.partition_dedup_by(|a, b| a == b);
384
385 let mut duplicates = duplicates.to_owned();
386 duplicates.sort_by(|a, b| a.partial_cmp(b).unwrap());
387
388 assert!(v_unique == dedup);
390 assert!(v_duplicates == duplicates);
391 }
392
393 #[test]
394 fn test_discharge_by_index() {
395 let vall: Vec<f64> = (1..20).map(|n| n as f64).collect();
396 let indices: Vec<usize> = vec![2, 3, 7, 12, 13, 18];
397 let mut expected: Vec<f64> = (1..20).map(|n| n as f64).collect();
398 for i in indices.iter().rev() {
399 expected.remove(*i);
400 }
401 let vout = discharge_by_index(&vall, &indices);
402 println!("vout\n{:?}\nexpected\n{:?}", vout, expected);
403 assert!(compare_vecf64_approx(&vout, &expected));
404 }
405
406 #[test]
407 fn test_setnan_by_index() {
408 let mut vall: Vec<f64> = (1..20).map(|n| n as f64).collect();
409 let indices: Vec<usize> = vec![2, 3, 7, 12, 13, 18];
410 let mut expected: Vec<f64> = (1..20).map(|n| n as f64).collect();
411 indices.iter().for_each(|i| expected[*i] = f64::NAN);
412 setnan_by_index(&mut vall, &indices);
413 assert!(compare_vecf64_approx(&vall, &expected));
414 }
415
416 #[test]
417 fn test_all_steps() {
419 let mut timezone: i32 = -8;
421 timezone *= 60 * 60;
422 let timezone_fixed_offset = FixedOffset::east(timezone);
423
424 let mut tl = TimeLoad::from_csv(String::from("./test/timeload_raw.csv"));
426 tl.time
427 .iter_mut()
428 .for_each(|t| *t = t.with_timezone(&timezone_fixed_offset));
429 println!("{}", tl);
430
431 tl.is_ordered();
433
434 let mut ctl = tl.fill_missing_with_nan();
436 ctl.is_ordered_and_continuous();
437 println!("{}", ctl);
438
439 let bad = read_bad_datetimes("./test/bad_datetimes.csv");
441 ctl.replace_bad_datetimes_with_nan(bad);
442 println!("{}", ctl);
443
444 let time_init = NaiveTime::parse_from_str("01:02", "%H:%M").unwrap();
447 let time_stop = NaiveTime::parse_from_str("01:05", "%H:%M").unwrap();
448 ctl.replace_bad_time_interval_with_nan(time_init, time_stop);
449 println!("{}", ctl);
450
451 ctl.replace_errors_with_nan(99995.);
453 println!("{}", ctl);
454
455 ctl.replace_outliers_with_nan(10000., 18000.);
457 println!("{}", ctl);
458
459 let (anomalies_indices, _) = find_anomalies(&ctl.load, 16usize, 8usize, 40.0f64);
461 println!("indices of the anomalies:\n{:?}", anomalies_indices);
462 let mut atl = TimeLoad::new(anomalies_indices.len());
463 for i in anomalies_indices.iter() {
464 atl.time.push(ctl.time.get(*i).unwrap().clone());
465 atl.load.push(ctl.load.get(*i).unwrap().clone());
466 }
467 atl.to_csv("./test/timeload_anomalies.csv");
468
469 setnan_by_index(&mut ctl.load[..], &anomalies_indices);
470
471 let mavg_window = make_window(3., 1., 5usize);
473 let smooth = mavg(&ctl.load[..], &mavg_window, 5 as usize, 80.);
474 println!("{:?}", smooth);
475
476 ctl.load = smooth;
479
480 ctl.plot_datetime("./test/timeload_processed.svg").unwrap();
482
483 ctl.to_csv("./test/timeload_processed.csv");
485 }
486
487 #[test]
488 fn test_all_steps_parallel() {
490 let mut timezone: i32 = -8;
492 timezone *= 60 * 60;
493 let timezone_fixed_offset = FixedOffset::east(timezone);
494
495 let mut tl = TimeLoad::from_csv(String::from("./test/parallel_timeload_raw.csv"));
497 tl.time
498 .iter_mut()
499 .for_each(|t| *t = t.with_timezone(&timezone_fixed_offset));
500 println!("{}", tl);
501
502 tl.is_ordered();
504
505 let mut ctl = tl.fill_missing_with_nan();
507 ctl.is_ordered_and_continuous();
508 println!("{}", ctl);
509
510 let bad = read_bad_datetimes("./test/parallel_bad_datetimes.csv");
512 ctl.replace_bad_datetimes_with_nan(bad);
513 println!("{}", ctl);
514
515 let time_init = NaiveTime::parse_from_str("01:02", "%H:%M").unwrap();
518 let time_stop = NaiveTime::parse_from_str("01:05", "%H:%M").unwrap();
519 ctl.replace_bad_time_interval_with_nan(time_init, time_stop);
520 println!("{}", ctl);
521
522 ctl.replace_errors_with_nan(99995.);
524 println!("{}", ctl);
525
526 ctl.replace_outliers_with_nan(10000., 18000.);
528 println!("{}", ctl);
529
530 let (anomalies_indices, _) = find_anomalies(&ctl.load, 16usize, 8usize, 40.0f64);
532 println!("indices of the anomalies:\n{:?}", anomalies_indices);
533 let mut atl = TimeLoad::new(anomalies_indices.len());
534 for i in anomalies_indices.iter() {
535 atl.time.push(ctl.time.get(*i).unwrap().clone());
536 atl.load.push(ctl.load.get(*i).unwrap().clone());
537 }
538 atl.to_csv("./test/parallel_timeload_anomalies.csv");
539
540 setnan_by_index(&mut ctl.load[..], &anomalies_indices);
541
542 let mavg_window = make_window(3., 1., 2usize);
544 let smooth = mavg_parallel_fold(&ctl.load[..], &mavg_window);
545 println!("{:?}", smooth);
546
547 let correct_smooth = vec![
548 f64::NAN,
549 f64::NAN,
550 13003.0,
551 13004.0,
552 13005.0,
553 13006.0,
554 13007.0,
555 13008.0,
556 13008.8,
557 13009.6,
558 13010.3,
559 13011.1,
560 13012.0,
561 13013.0,
562 13014.0,
563 13015.0,
564 13016.0,
565 13017.0,
566 13018.0,
567 f64::NAN,
568 f64::NAN,
569 ];
570
571 assert! {compare_vecf64_approx(&smooth, &correct_smooth)};
572
573 ctl.load = smooth;
576
577 ctl.plot_datetime("./test/parallel_timeload_processed.svg")
579 .unwrap();
580
581 ctl.to_csv("./test/parallel_timeload_processed.csv");
583 }
584}
585
586#[bench]
587fn bench_mavg_parallel_simd(b: &mut test::Bencher) {
588 let v = vec![1000.; 1E+5 as usize];
589 let w = make_window(3., 1., 180 as usize);
590 b.iter(|| {
591 mavg_parallel_simd(&v, &w);
592 });
593}
594
595#[bench]
596fn bench_mavg_parallel_fold(b: &mut test::Bencher) {
597 let v = vec![1000.; 1E+5 as usize];
598 let w = make_window(3., 1., 180 as usize);
599 b.iter(|| {
600 mavg_parallel_fold(&v, &w);
601 });
602}
603
604#[bench]
605fn bench_mavg(b: &mut test::Bencher) {
606 let v = vec![1000.; 1E+5 as usize];
607 let w = make_window(3., 1., 180 as usize);
608 b.iter(|| {
609 mavg(&v, &w, 1usize, 1f64);
610 });
611}