load_lpp/
lib.rs

1#![feature(test)]
2#![feature(slice_partition_dedup)]
3extern crate test;
4pub use crate::utils::*;
5use chrono::prelude::*;
6use plotters::prelude::*;
7use std::fs::File;
8use std::io::{BufRead, BufReader, BufWriter, Write};
9use std::path::Path;
10
11pub mod load_log_dad141;
12pub mod load_plot;
13pub mod load_process;
14pub mod utils;
15
16pub const VERSION: Option<&'static str> = option_env!("CARGO_PKG_VERSION");
17pub const ERROR_STR_GENERAL: &str = "E+999999.";
18pub const ERROR_STR_NONE: &str = "E+999998.";
19pub const ERROR_STR_INVALID: &str = "E+999997.";
20pub const ERROR_STR_SKIPPED: &str = "E+999996.";
21pub const ERROR_STR_PARSE: &str = "E+999995.";
22pub const ERROR_FLT_GENERAL: f64 = 999999.;
23pub const ERROR_FLT_NONE: f64 = 999998.;
24pub const ERROR_FLT_INVALID: f64 = 999997.;
25pub const ERROR_FLT_SKIPPED: f64 = 999996.;
26pub const ERROR_FLT_PARSE: f64 = 999995.;
27
28/// The main struct for the load time series.
29// #[derive(Debug, Clone, Serialize, Deserialize)]
30#[derive(Debug, Clone)]
31pub struct TimeLoad {
32    pub time: Vec<DateTime<FixedOffset>>,
33    pub load: Vec<f64>,
34}
35
36impl TimeLoad {
37    /// Initiate a new TimeLoad instance
38    /// using the given capacity for the time and load vectors
39    pub fn new(capacity: usize) -> TimeLoad {
40        let time: Vec<DateTime<FixedOffset>> = Vec::with_capacity(capacity);
41        let load: Vec<f64> = Vec::with_capacity(capacity);
42        let timeload: TimeLoad = TimeLoad { time, load };
43        timeload
44    }
45
46    /// Initiate a TimeLoad from csv
47    /// setting load to NAN in case of load parsing errors,
48    /// but panic for datatime errors.
49    /// Do not check the continuity of the time series and presence of error flags,
50    /// these are checked separately afterwards.
51    pub fn from_csv<P>(fin: P) -> TimeLoad
52    where
53        P: AsRef<Path>,
54    {
55        let file = File::open(fin).unwrap();
56        let buf = BufReader::new(file);
57        let mut timeload = TimeLoad::new(10000 as usize);
58        for l in buf.lines().skip(1) {
59            let l_unwrap = match l {
60                Ok(l_ok) => l_ok,
61                Err(l_err) => {
62                    println!("Err, could not read/unwrap line {}", l_err);
63                    continue;
64                }
65            };
66            let mut l_split = l_unwrap.split(',');
67            let l_split_datetime = l_split.next().unwrap();
68            let l_split_load = l_split.next().unwrap();
69            let parsed_datetime = match DateTime::parse_from_rfc3339(l_split_datetime) {
70                Ok(parsed_datetime) => parsed_datetime,
71                Err(e) => {
72                    println!(
73                        "Could not parse datetime: {}, error {}",
74                        l_split_datetime, e
75                    );
76                    continue;
77                }
78            };
79            timeload.time.push(parsed_datetime);
80            match l_split_load.parse::<f64>() {
81                Ok(parsed_load) => timeload.load.push(parsed_load),
82                Err(e) => {
83                    println!(
84                        "Could not parse load: {}, at datetime {}. Error: {}",
85                        l_split_load, parsed_datetime, e
86                    );
87                    timeload.load.push(f64::NAN);
88                }
89            }
90        }
91        timeload
92    }
93
94    // Assert that the time series is ordered.
95    pub fn is_ordered(&self) {
96        self.time.windows(2).for_each(|w| {
97            assert!(
98                w[1] > w[0],
99                "time series is not ordered: {} < {}",
100                w[1],
101                w[0]
102            )
103        });
104    }
105
106    // Assert that the time series is ordered and continuous.
107    pub fn is_ordered_and_continuous(&self) {
108        self.time
109            .windows(2)
110            .map(|w| {
111                assert!(
112                    w[1] > w[0],
113                    "time series is not ordered: {} < {}",
114                    w[1],
115                    w[0]
116                );
117                w[1] - w[0]
118            })
119            .reduce(|wp, wn| {
120                assert_eq!(wp, wn, "time series is not continuous");
121                wn
122            });
123    }
124
125    /// Fill the datetime gaps with NANs to have continuous datetime.
126    /// Take a reference to the read TimeLoad and return a new continuous TimeLoad.
127    /// Heuristically use the minimum time interval in the data to determine the desired time step for the output.
128    pub fn fill_missing_with_nan(&self) -> TimeLoad {
129        let min_delta = self
130            .time
131            .windows(2)
132            .map(|dtw| dtw[1] - dtw[0])
133            .min()
134            .unwrap();
135        let mut timeload = TimeLoad::new(self.time.len());
136        for (dtw, load) in self.time.windows(2).zip(self.load.iter()) {
137            let mut current_dt: DateTime<FixedOffset> = dtw[0];
138            timeload.time.push(current_dt);
139            timeload.load.push(*load);
140            while current_dt + min_delta < dtw[1] {
141                current_dt = current_dt.checked_add_signed(min_delta).unwrap();
142                timeload.time.push(current_dt);
143                timeload.load.push(f64::NAN);
144            }
145        }
146        timeload.time.push(self.time[self.time.len() - 1]);
147        timeload.load.push(self.load[self.load.len() - 1]);
148        timeload
149    }
150
151    /// Set to NAN the load values corresponsiding to the input bad datetimes.
152    pub fn replace_bad_datetimes_with_nan(&mut self, bad_datetimes: Vec<DateTime<FixedOffset>>) {
153        for bdt in bad_datetimes.into_iter() {
154            match self.time.iter().position(|d| *d == bdt) {
155                Some(i) => self.load[i] = f64::NAN,
156                None => println!("could not find and exclude bad datetime {}", bdt),
157            }
158        }
159    }
160
161    /// Replace all values measured within the time interval with NANs.
162    /// Given in standard time, fixed offset for the chosen timezone.
163    pub fn replace_bad_time_interval_with_nan(
164        &mut self,
165        time_init: NaiveTime,
166        time_stop: NaiveTime,
167    ) {
168        self.time
169            .iter()
170            .zip(self.load.iter_mut())
171            .for_each(|(t, l)| {
172                if (t.time() > time_init) & (t.time() < time_stop) {
173                    *l = f64::NAN;
174                }
175            });
176    }
177
178    /// Set to NAN all the load values that are out of the expected range.
179    pub fn replace_outliers_with_nan(&mut self, min_load: f64, max_load: f64) {
180        self.load.iter_mut().for_each(|l| {
181            if (*l > max_load) | (*l < min_load) {
182                println!(
183                    "setting to NAN value out of range (min: {}, max {}): {}",
184                    min_load, max_load, l
185                );
186                *l = f64::NAN;
187            }
188        });
189    }
190
191    /// Consider all the values > max_value as invalid and replace them with NAN.
192    /// These high values are reserved for the errors.
193    pub fn replace_errors_with_nan(&mut self, max_value: f64) {
194        self.load.iter_mut().for_each(|l| {
195            if *l > max_value {
196                println!("found invalid value: {}", l);
197                *l = f64::NAN;
198            }
199        });
200    }
201
202    /// Write the datetime and load columns to a csv file at the given path.
203    /// Use RFC 3339 - ISO 8601 for datetime.
204    pub fn to_csv<P>(self, fout: P)
205    where
206        P: AsRef<Path>,
207    {
208        let file = File::create(fout).unwrap();
209        let mut buf = BufWriter::new(file);
210        buf.write_all("datetime,load_kg\n".as_bytes()).unwrap();
211        for (t, w) in self.time.iter().zip(self.load.iter()) {
212            buf.write_all(format!("{},{}\n", t.to_rfc3339(), w).as_bytes())
213                .unwrap();
214        }
215    }
216
217    /// Plot the load time series to svg.
218    pub fn plot_datetime<P>(&self, fout: P) -> Result<(), Box<dyn std::error::Error>>
219    where
220        P: AsRef<Path>,
221    {
222        let (xmin, xmax) = min_and_max(self.time.iter());
223        let xspan: chrono::Duration = xmax - xmin;
224        let xfmt = suitable_xfmt(xspan);
225        let (ymin, ymax) = min_and_max(self.load.iter().filter(|x| !x.is_nan()));
226        let yspan = (ymax - ymin) / 10f64;
227        let ymin = ymin - yspan;
228        let ymax = ymax + yspan;
229        let root = SVGBackend::new(&fout, (1600, 800)).into_drawing_area();
230        root.fill(&WHITE)?;
231        let mut chart = ChartBuilder::on(&root)
232            .margin(50)
233            .x_label_area_size(40)
234            .y_label_area_size(100)
235            .build_cartesian_2d(xmin.clone()..xmax.clone(), ymin..ymax)?;
236        chart
237            .configure_mesh()
238            .light_line_style(&TRANSPARENT)
239            .bold_line_style(RGBColor(100, 100, 100).mix(0.5).stroke_width(2))
240            .set_all_tick_mark_size(2)
241            .label_style(("sans-serif", 20))
242            .y_desc("load [kg]")
243            .x_labels(16)
244            .y_labels(25)
245            .x_label_formatter(&|x| x.format(xfmt).to_string())
246            .y_label_formatter(&|x: &f64| format!("{:5}", x))
247            .x_desc(format!("datetime [{}]", xfmt.replace("%", "")))
248            .draw()?;
249        let witer = &mut self.load[..].split(|x| x.is_nan());
250        let titer = &mut self.time[..].into_iter();
251        for wchunk in witer.into_iter() {
252            if wchunk.len() == 0 {
253                titer.next();
254                continue;
255            } else {
256                let area =
257                    AreaSeries::new(titer.zip(wchunk).map(|(x, y)| (*x, *y)), 0.0, &RED.mix(0.2))
258                        .border_style(BLACK.stroke_width(1));
259                chart.draw_series(area)?;
260            }
261        }
262        Ok(())
263    }
264}
265
266impl std::fmt::Display for TimeLoad {
267    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
268        write!(f, "datetime, load [kg]\n")?;
269        for (t, w) in self.time.iter().zip(self.load.iter()) {
270            write!(f, "{},{}\n", t.to_rfc3339(), w)?
271        }
272        Ok(())
273    }
274}
275
276// use crate::utils::compare_vecf64;
277// Run the tests with:
278// cargo test -- --nocapture
279// to allow println!(...) to stdout.
280#[cfg(test)]
281mod tests {
282
283    use super::*;
284
285    #[test]
286    // Test the correct correction for the daylight saving,
287    // needed for long term monitorings
288    fn datetime_parsing_with_timezone() {
289        let timezone_hours: i32 = -8;
290        let timezone = timezone_hours * 60 * 60;
291        let timezone_fixed_offset = FixedOffset::east(timezone);
292
293        // test DST
294        let dtstr_dst = "2021-11-07T01:30:00-07:00";
295        let dtiso_dst = DateTime::parse_from_rfc3339(dtstr_dst).unwrap();
296        let dtfix_dst = dtiso_dst.with_timezone(&timezone_fixed_offset);
297        println!(
298            "datetime str {} parsed as {}, fixed {}",
299            dtstr_dst, dtiso_dst, dtfix_dst
300        );
301
302        // test PST
303        let dtstr_pst = "2021-11-07T01:30:00-08:00";
304        let dtiso_pst = DateTime::parse_from_rfc3339(dtstr_pst).unwrap();
305        let dtfix_pst = dtiso_pst.with_timezone(&timezone_fixed_offset);
306        println!(
307            "datetime str {} parsed as {}, fixed {}",
308            dtstr_pst, dtiso_pst, dtfix_pst
309        );
310
311        // assert that DST goes 1 hour back
312        let timediff = chrono::Duration::hours(1);
313        assert!(dtfix_pst - timediff == dtfix_dst);
314    }
315
316    #[test]
317    // Get the reading datetime with the correct offset
318    fn test_get_current_datetime_offset() {
319        let dtnow: DateTime<Local> = Local::now();
320        println!(
321            "local time is {}",
322            dtnow.to_rfc3339_opts(SecondsFormat::Secs, false)
323        );
324        let local_offset = dtnow.offset();
325        println!("local offet is {}", local_offset);
326    }
327
328    #[test]
329    // Assert that a homogenous load time series gives no anomalies
330    fn test_find_anomaly_homogeneous() {
331        let a = [5.0f64; 15];
332        let expected: Vec<f64> = Vec::new();
333        let (_, anomalies_load) = find_anomalies(&a, 7usize, 6usize, 5.0f64);
334        assert!(anomalies_load == expected);
335    }
336
337    #[test]
338    // Assert that a linear load time series with small increment gives no anomalies
339    fn test_find_anomaly_linear() {
340        let v: Vec<f64> = (1..15).map(|n| n as f64).collect();
341        let expected: Vec<f64> = Vec::new();
342        let (_, anomalies_load) = find_anomalies(&v, 7usize, 6usize, 5.0f64);
343        assert!(anomalies_load == expected);
344    }
345
346    #[test]
347    // Assert that a NANs are correctly handled while finding anomalies
348    fn test_find_anomaly_nans() {
349        let mut v: Vec<f64> = (1..15).map(|n| n as f64).collect();
350        v.iter_mut().enumerate().for_each(|(i, e)| {
351            if i < 6usize {
352                *e = f64::NAN
353            }
354        });
355        let expected: Vec<f64> = Vec::new();
356        let (_, anomalies_load) = find_anomalies(&v, 7usize, 6usize, 5.0f64);
357        assert!(anomalies_load == expected);
358    }
359
360    #[test]
361    // Assert that the anomalies are correctly identified by adding a big discontinuity
362    fn test_find_anomaly_discontinuity() {
363        let mut v: Vec<f64> = (1..15).map(|n| n as f64).collect();
364        v.iter_mut().enumerate().for_each(|(i, e)| {
365            if i < 8usize {
366                *e = 20.
367            }
368        });
369        let (_, anomalies_load) = find_anomalies(&v, 7usize, 6usize, 5.0f64);
370        let expected: Vec<f64> = vec![20.0, 20.0, 20.0, 20.0, 9.0, 10.0, 11.0, 12.0, 13.0];
371        assert!(anomalies_load == expected);
372    }
373
374    #[test]
375    // Deduplicate removes consecutive repeated elements,
376    // thus if the input is sorted dedup returns no duplicates
377    fn test_deduplicate_load_series() {
378        let mut v_all = vec![1., 2., 2., 2., 3., 4., 4., 5., 6., 6.];
379        let v_unique = vec![1., 2., 3., 4., 5., 6.];
380        let v_duplicates = vec![2., 2., 4., 6.];
381
382        v_all.sort_by(|a, b| a.partial_cmp(b).unwrap());
383        let (dedup, duplicates) = v_all.partition_dedup_by(|a, b| a == b);
384
385        let mut duplicates = duplicates.to_owned();
386        duplicates.sort_by(|a, b| a.partial_cmp(b).unwrap());
387
388        // println!("test:\ndedup {:?} == {:?}?\nduplicates {:?} == {:?}?", dedup, v_unique, duplicates, v_duplicates);
389        assert!(v_unique == dedup);
390        assert!(v_duplicates == duplicates);
391    }
392
393    #[test]
394    fn test_discharge_by_index() {
395        let vall: Vec<f64> = (1..20).map(|n| n as f64).collect();
396        let indices: Vec<usize> = vec![2, 3, 7, 12, 13, 18];
397        let mut expected: Vec<f64> = (1..20).map(|n| n as f64).collect();
398        for i in indices.iter().rev() {
399            expected.remove(*i);
400        }
401        let vout = discharge_by_index(&vall, &indices);
402        println!("vout\n{:?}\nexpected\n{:?}", vout, expected);
403        assert!(compare_vecf64_approx(&vout, &expected));
404    }
405
406    #[test]
407    fn test_setnan_by_index() {
408        let mut vall: Vec<f64> = (1..20).map(|n| n as f64).collect();
409        let indices: Vec<usize> = vec![2, 3, 7, 12, 13, 18];
410        let mut expected: Vec<f64> = (1..20).map(|n| n as f64).collect();
411        indices.iter().for_each(|i| expected[*i] = f64::NAN);
412        setnan_by_index(&mut vall, &indices);
413        assert!(compare_vecf64_approx(&vall, &expected));
414    }
415
416    #[test]
417    // full processing test, including all the optional steps
418    fn test_all_steps() {
419        // define time zone for the test
420        let mut timezone: i32 = -8;
421        timezone *= 60 * 60;
422        let timezone_fixed_offset = FixedOffset::east(timezone);
423
424        // read the time series and adjust to the deifned time zone
425        let mut tl = TimeLoad::from_csv(String::from("./test/timeload_raw.csv"));
426        tl.time
427            .iter_mut()
428            .for_each(|t| *t = t.with_timezone(&timezone_fixed_offset));
429        println!("{}", tl);
430
431        // make sure the time series is ordered before processing
432        tl.is_ordered();
433
434        // make continuous by filling missing values with NANs, then assert continuity
435        let mut ctl = tl.fill_missing_with_nan();
436        ctl.is_ordered_and_continuous();
437        println!("{}", ctl);
438
439        // read bad datetimes and replace them with NANs
440        let bad = read_bad_datetimes("./test/bad_datetimes.csv");
441        ctl.replace_bad_datetimes_with_nan(bad);
442        println!("{}", ctl);
443
444        // define a daily interval over which values should be ignored (maintenance, etc.),
445        // then these intervals to NAN
446        let time_init = NaiveTime::parse_from_str("01:02", "%H:%M").unwrap();
447        let time_stop = NaiveTime::parse_from_str("01:05", "%H:%M").unwrap();
448        ctl.replace_bad_time_interval_with_nan(time_init, time_stop);
449        println!("{}", ctl);
450
451        // replace errors with NANs, in this case all the values above 99995.
452        ctl.replace_errors_with_nan(99995.);
453        println!("{}", ctl);
454
455        // keep only load values within a specific range, set the outliers to NAN
456        ctl.replace_outliers_with_nan(10000., 18000.);
457        println!("{}", ctl);
458
459        // find anomalies and set to NAN
460        let (anomalies_indices, _) = find_anomalies(&ctl.load, 16usize, 8usize, 40.0f64);
461        println!("indices of the anomalies:\n{:?}", anomalies_indices);
462        let mut atl = TimeLoad::new(anomalies_indices.len());
463        for i in anomalies_indices.iter() {
464            atl.time.push(ctl.time.get(*i).unwrap().clone());
465            atl.load.push(ctl.load.get(*i).unwrap().clone());
466        }
467        atl.to_csv("./test/timeload_anomalies.csv");
468
469        setnan_by_index(&mut ctl.load[..], &anomalies_indices);
470
471        // apply a weighted moving average to smooth the filtered time series
472        let mavg_window = make_window(3., 1., 5usize);
473        let smooth = mavg(&ctl.load[..], &mavg_window, 5 as usize, 80.);
474        println!("{:?}", smooth);
475
476        // in this case simply replace the original load series with the smooth one;
477        // if preferred keep both and compare
478        ctl.load = smooth;
479
480        // plot the filtered and smooth load series
481        ctl.plot_datetime("./test/timeload_processed.svg").unwrap();
482
483        // save the filtered and smooth load series
484        ctl.to_csv("./test/timeload_processed.csv");
485    }
486
487    #[test]
488    // full processing test, including all the optional steps
489    fn test_all_steps_parallel() {
490        // define time zone for the test
491        let mut timezone: i32 = -8;
492        timezone *= 60 * 60;
493        let timezone_fixed_offset = FixedOffset::east(timezone);
494
495        // read the time series and adjust to the deifned time zone
496        let mut tl = TimeLoad::from_csv(String::from("./test/parallel_timeload_raw.csv"));
497        tl.time
498            .iter_mut()
499            .for_each(|t| *t = t.with_timezone(&timezone_fixed_offset));
500        println!("{}", tl);
501
502        // make sure the time series is ordered before processing
503        tl.is_ordered();
504
505        // make continuous by filling missing values with NANs, then assert continuity
506        let mut ctl = tl.fill_missing_with_nan();
507        ctl.is_ordered_and_continuous();
508        println!("{}", ctl);
509
510        // read bad datetimes and replace them with NANs
511        let bad = read_bad_datetimes("./test/parallel_bad_datetimes.csv");
512        ctl.replace_bad_datetimes_with_nan(bad);
513        println!("{}", ctl);
514
515        // define a daily interval over which values should be ignored (maintenance, etc.),
516        // then these intervals to NAN
517        let time_init = NaiveTime::parse_from_str("01:02", "%H:%M").unwrap();
518        let time_stop = NaiveTime::parse_from_str("01:05", "%H:%M").unwrap();
519        ctl.replace_bad_time_interval_with_nan(time_init, time_stop);
520        println!("{}", ctl);
521
522        // replace errors with NANs, in this case all the values above 99995.
523        ctl.replace_errors_with_nan(99995.);
524        println!("{}", ctl);
525
526        // keep only load values within a specific range, set the outliers to NAN
527        ctl.replace_outliers_with_nan(10000., 18000.);
528        println!("{}", ctl);
529
530        // find anomalies and set to NAN
531        let (anomalies_indices, _) = find_anomalies(&ctl.load, 16usize, 8usize, 40.0f64);
532        println!("indices of the anomalies:\n{:?}", anomalies_indices);
533        let mut atl = TimeLoad::new(anomalies_indices.len());
534        for i in anomalies_indices.iter() {
535            atl.time.push(ctl.time.get(*i).unwrap().clone());
536            atl.load.push(ctl.load.get(*i).unwrap().clone());
537        }
538        atl.to_csv("./test/parallel_timeload_anomalies.csv");
539
540        setnan_by_index(&mut ctl.load[..], &anomalies_indices);
541
542        // apply a weighted moving average to smooth the filtered time series
543        let mavg_window = make_window(3., 1., 2usize);
544        let smooth = mavg_parallel_fold(&ctl.load[..], &mavg_window);
545        println!("{:?}", smooth);
546
547        let correct_smooth = vec![
548            f64::NAN,
549            f64::NAN,
550            13003.0,
551            13004.0,
552            13005.0,
553            13006.0,
554            13007.0,
555            13008.0,
556            13008.8,
557            13009.6,
558            13010.3,
559            13011.1,
560            13012.0,
561            13013.0,
562            13014.0,
563            13015.0,
564            13016.0,
565            13017.0,
566            13018.0,
567            f64::NAN,
568            f64::NAN,
569        ];
570
571        assert! {compare_vecf64_approx(&smooth, &correct_smooth)};
572
573        // in this case simply replace the original load series with the smooth one;
574        // if preferred keep both and compare
575        ctl.load = smooth;
576
577        // plot the filtered and smooth load series
578        ctl.plot_datetime("./test/parallel_timeload_processed.svg")
579            .unwrap();
580
581        // save the filtered and smooth load series
582        ctl.to_csv("./test/parallel_timeload_processed.csv");
583    }
584}
585
586#[bench]
587fn bench_mavg_parallel_simd(b: &mut test::Bencher) {
588    let v = vec![1000.; 1E+5 as usize];
589    let w = make_window(3., 1., 180 as usize);
590    b.iter(|| {
591        mavg_parallel_simd(&v, &w);
592    });
593}
594
595#[bench]
596fn bench_mavg_parallel_fold(b: &mut test::Bencher) {
597    let v = vec![1000.; 1E+5 as usize];
598    let w = make_window(3., 1., 180 as usize);
599    b.iter(|| {
600        mavg_parallel_fold(&v, &w);
601    });
602}
603
604#[bench]
605fn bench_mavg(b: &mut test::Bencher) {
606    let v = vec![1000.; 1E+5 as usize];
607    let w = make_window(3., 1., 180 as usize);
608    b.iter(|| {
609        mavg(&v, &w, 1usize, 1f64);
610    });
611}