s2gpp 1.0.2

Algorithm for Highly Efficient Detection of Correlation Anomalies in Multivariate Time Series
Documentation
pub(crate) mod messages;

pub use messages::DataReceivedMessage;

use csv::{ReaderBuilder, Trim};
use ndarray::prelude::*;

pub use crate::data_manager::data_reader::messages::DataPartitionMessage;
use actix::{Actor, Addr};
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::str::FromStr;

use crate::data_manager::DataManager;
use crate::utils::AnyClusterNodesIterator;
use std::ops::Not;

use log::*;

pub struct DataReading {
    pub with_header: bool,
    pub overlap: usize,
}

pub trait DataReader {
    fn read_csv(&mut self, file_path: &str, addr: Addr<Self>)
    where
        Self: Actor;
}

impl DataReader for DataManager {
    fn read_csv(&mut self, file_path: &str, addr: Addr<Self>) {
        let file = File::open(file_path).unwrap();
        let count_reader = BufReader::new(file);
        let n_lines = if self.data_reading.as_ref().unwrap().with_header {
            count_reader.lines().count() - 1
        } else {
            count_reader.lines().count()
        };

        let mut nodes = self.cluster_nodes.clone();
        nodes.change_ids("DataManager");
        let receivers = nodes.to_any(addr);
        let file = File::open(&file_path).unwrap();
        let mut reader = ReaderBuilder::new()
            .has_headers(true)
            .trim(Trim::All)
            .from_reader(file);

        let partition_len = num_integer::Integer::div_floor(&n_lines, &receivers.len());
        let last_overlap = n_lines - (partition_len * receivers.len());
        let mut receiver_iterator: AnyClusterNodesIterator<Self> = receivers.clone().into_iter();
        let mut buffer = vec![];
        let mut overlap_buffer = vec![];

        for record in reader.records() {
            match record {
                Ok(r) => {
                    let strings = r.iter().map(|x| x.to_string()).collect();
                    if buffer.len() < partition_len {
                        buffer.push(strings);
                    } else if receivers.len() > 1
                        && ((overlap_buffer.len() < self.data_reading.as_ref().unwrap().overlap
                            && receiver_iterator.last_position().not())
                            || (overlap_buffer.len() < last_overlap
                                && receiver_iterator.last_position()))
                    {
                        overlap_buffer.push(strings);
                    } else {
                        let mut data = buffer.clone();
                        data.extend(overlap_buffer.clone());
                        receiver_iterator
                            .next()
                            .unwrap()
                            .do_send(DataPartitionMessage { data });
                        debug!(
                            "Sent data to receiver {}",
                            receiver_iterator.get_position() - 1
                        );

                        buffer.clear();
                        buffer.extend(overlap_buffer.clone());
                        buffer.push(strings);
                        overlap_buffer.clear();
                    }
                }
                Err(e) => panic!("{}", e.to_string()),
            }
        }

        let mut data = buffer.clone();
        data.extend(overlap_buffer.clone());
        receiver_iterator
            .next()
            .unwrap()
            .do_send(DataPartitionMessage { data });
        debug!(
            "Sent data to receiver {}",
            receiver_iterator.get_position() - 1
        );
    }
}

#[allow(dead_code)]
pub fn read_data_(file_path: &str) -> Array2<f32> {
    let file = File::open(file_path).unwrap();
    let count_reader = BufReader::new(file);
    let n_lines = count_reader.lines().count() - 1;

    let file = File::open(file_path).unwrap();
    let mut reader = ReaderBuilder::new()
        .has_headers(true)
        .trim(Trim::All)
        .from_reader(file);

    let n_rows = n_lines;
    let n_columns = reader.headers().unwrap().len();

    let flat_data: Array1<f32> = reader
        .records()
        .into_iter()
        .flat_map(|rec| {
            rec.unwrap()
                .iter()
                .map(|b| f32::from_str(b).unwrap())
                .collect::<Vec<f32>>()
        })
        .collect();

    flat_data
        .into_shape((n_rows, n_columns))
        .expect("Could not deserialize sent data")
}