s2gpp 1.0.2

Algorithm for Highly Efficient Detection of Correlation Anomalies in Multivariate Time Series
Documentation
use actix::{Actor, ActorContext, Addr, AsyncContext, Context, Handler, Recipient};
use std::cmp::Ordering;

use ndarray::{Array1, Array2, Array3, Dim};

use crate::data_manager::data_reader::{DataPartitionMessage, DataReader, DataReading};
pub use crate::data_manager::messages::{DataLoadedAndProcessed, LoadDataMessage};
use crate::data_manager::preprocessor::{Preprocessing, PreprocessingDoneMessage, Preprocessor};
use crate::parameters::{Parameters, Role};
use actix_telepathy::prelude::*;

use crate::data_manager::data_reader::messages::LocalReadDataMessage;
use crate::data_manager::phase_spacer::PhaseSpacer;
use crate::data_manager::reference_dataset_builder::ReferenceDatasetBuilder;
pub use crate::data_manager::stats_collector::DatasetStats;
use crate::data_manager::stats_collector::{
    MinMaxCalculation, MinMaxCalculator, MinMaxDoneMessage, MinMaxNodeMessage, StdCalculation,
    StdCalculator, StdDoneMessage, StdNodeMessage,
};
use crate::messages::PoisonPill;
use crate::utils::itertools::FromToAble;
use crate::utils::{ClusterNodes, ConsoleLogger};
use log::*;
use std::str::FromStr;

pub mod data_reader;
mod messages;
mod phase_spacer;
mod preprocessor;
mod reference_dataset_builder;
mod stats_collector;
#[cfg(test)]
mod tests;

#[derive(RemoteActor)]
#[remote_messages(
    DataPartitionMessage,
    StdNodeMessage,
    StdDoneMessage,
    MinMaxNodeMessage,
    MinMaxDoneMessage
)]
pub struct DataManager {
    data: Option<Array2<f32>>,
    cluster_nodes: ClusterNodes,
    parameters: Parameters,
    data_reading: Option<DataReading>,
    minmax_calculation: Option<MinMaxCalculation>,
    std_calculation: Option<StdCalculation>,
    preprocessing: Option<Preprocessing>,
    receiver: Recipient<DataLoadedAndProcessed>,
    dataset_stats: DatasetStats,
    reference_dataset: Option<Array3<f32>>,
    phase_space: Option<Array3<f32>>,
    partition_buffer: Vec<DataPartitionMessage>,
}

impl DataManager {
    pub fn new(
        mut nodes: ClusterNodes,
        parameters: Parameters,
        receiver: Recipient<DataLoadedAndProcessed>,
    ) -> Self {
        nodes.change_ids("DataManager");

        Self {
            data: None,
            cluster_nodes: nodes,
            parameters,
            data_reading: None,
            minmax_calculation: None,
            std_calculation: None,
            preprocessing: None,
            receiver,
            dataset_stats: DatasetStats::default(),
            reference_dataset: None,
            phase_space: None,
            partition_buffer: vec![],
        }
    }

    fn resolve_buffer(&mut self, addr: Addr<Self>) {
        debug!("resolve buffer");
        while let Some(msg) = self.partition_buffer.pop() {
            addr.do_send(msg);
        }
    }

    fn calculate_datastats(&mut self, addr: Addr<Self>) {
        self.minmax_calculation = Some(MinMaxCalculation {
            nodes: vec![],
            min: None,
            max: None,
        });
        self.std_calculation = Some(StdCalculation {
            nodes: vec![],
            n: None,
            mean: None,
            m2: None,
        });

        self.calculate_minmax(addr.clone());
        self.calculate_std(addr);
    }

    fn datastats_finished(&mut self, addr: Addr<Self>) {
        if self.dataset_stats.is_done() {
            self.preprocess(addr);
        }
    }

    fn preprocess(&mut self, addr: Addr<Self>) {
        ConsoleLogger::new(3, 12, "Preprocessing Data".to_string()).print();
        match &self.data {
            Some(data) => {
                self.preprocessing = Some(Preprocessing::new(
                    data.to_shared(),
                    self.parameters.n_threads,
                    self.parameters.pattern_length,
                ));
                self.distribute_work(addr);
            }
            None => panic!("Data should be set by now!"),
        }
    }

    fn build_reference_dataset(&mut self) {
        let reference_dataset =
            ReferenceDatasetBuilder::new(self.dataset_stats.clone(), self.parameters.clone())
                .build();
        self.reference_dataset = Some(reference_dataset);
    }

    fn build_phase_space(&mut self) {
        let phase_space = PhaseSpacer::new(
            self.data.as_ref().unwrap().to_shared(),
            self.parameters.clone(),
        )
        .build();
        self.phase_space = Some(phase_space);
    }

    fn finalize(&mut self) {
        self.receiver
            .do_send(DataLoadedAndProcessed {
                data_ref: self.reference_dataset.as_ref().unwrap().to_shared(),
                phase_space: self.phase_space.as_ref().unwrap().to_shared(),
                dataset_stats: self.dataset_stats.clone(),
            })
            .unwrap();
    }
}

impl Actor for DataManager {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        self.register(ctx.address().recipient());
    }
}

impl Handler<LoadDataMessage> for DataManager {
    type Result = ();

    fn handle(&mut self, msg: LoadDataMessage, ctx: &mut Self::Context) -> Self::Result {
        ConsoleLogger::new(1, 12, "Reading Data".to_string()).print();
        self.cluster_nodes = msg.nodes;
        self.data_reading = Some(DataReading {
            with_header: true,
            overlap: self.parameters.pattern_length - 1,
        });

        let role = self.parameters.role.clone();
        if let Role::Main {
            data_path: Some(data_path),
        } = role
        {
            self.read_csv(&data_path, ctx.address())
        } else {
            self.resolve_buffer(ctx.address())
        }
    }
}

impl Handler<DataPartitionMessage> for DataManager {
    type Result = ();

    fn handle(&mut self, msg: DataPartitionMessage, ctx: &mut Self::Context) -> Self::Result {
        if !self.cluster_nodes.all_connected(&self.parameters) {
            debug!("not all nodes are connected");
            self.partition_buffer.push(msg);
            return;
        }

        debug!("all nodes are now connected");

        ConsoleLogger::new(2, 12, "Calculating Data Stats".to_string()).print();
        let n_rows = msg.data.len();
        let until_column = match self.parameters.column_end.cmp(&0) {
            Ordering::Equal => msg.data[0].len(),
            Ordering::Greater => self.parameters.column_end as usize,
            Ordering::Less => (msg.data[0].len() as isize + self.parameters.column_end) as usize,
        };

        let n_columns = until_column - self.parameters.column_start;

        let flat_data: Array1<f32> = msg
            .data
            .into_iter()
            .flat_map(|rec| {
                rec.iter()
                    .fromto(self.parameters.column_start, until_column)
                    .map(|b| f32::from_str(b).unwrap())
                    .collect::<Vec<f32>>()
            })
            .collect();

        self.data = Some(
            flat_data
                .into_shape(Dim([n_rows, n_columns]))
                .expect("Could not deserialize sent data"),
        );

        self.calculate_datastats(ctx.address());
    }
}

impl Handler<LocalReadDataMessage> for DataManager {
    type Result = ();

    fn handle(&mut self, msg: LocalReadDataMessage, ctx: &mut Self::Context) -> Self::Result {
        self.data = Some(msg.data);
        self.calculate_datastats(ctx.address());
    }
}

impl Handler<MinMaxDoneMessage> for DataManager {
    type Result = ();

    fn handle(&mut self, msg: MinMaxDoneMessage, ctx: &mut Self::Context) -> Self::Result {
        self.dataset_stats.min_col = Some(msg.min);
        self.dataset_stats.max_col = Some(msg.max);
        self.datastats_finished(ctx.address());
    }
}

impl Handler<StdDoneMessage> for DataManager {
    type Result = ();

    fn handle(&mut self, msg: StdDoneMessage, ctx: &mut Self::Context) -> Self::Result {
        self.dataset_stats.std_col = Some(msg.std);
        self.dataset_stats.n = Some(msg.n);
        self.datastats_finished(ctx.address());
    }
}

impl Handler<PreprocessingDoneMessage> for DataManager {
    type Result = ();

    fn handle(&mut self, _msg: PreprocessingDoneMessage, _ctx: &mut Self::Context) -> Self::Result {
        ConsoleLogger::new(4, 12, "Building Reference Dataset".to_string()).print();
        self.build_reference_dataset();
        ConsoleLogger::new(5, 12, "Building Phase Space".to_string()).print();
        self.build_phase_space();
        self.finalize();
    }
}

impl Handler<PoisonPill> for DataManager {
    type Result = ();

    fn handle(&mut self, _msg: PoisonPill, ctx: &mut Self::Context) -> Self::Result {
        ctx.stop();
    }
}