s2gpp 1.0.2

Algorithm for Highly Efficient Detection of Correlation Anomalies in Multivariate Time Series
Documentation
use crate::data_manager::stats_collector::messages::{MinMaxDoneMessage, MinMaxNodeMessage};
use crate::utils::Stats;
use actix::prelude::*;
use actix_telepathy::prelude::*;
use ndarray::{Array1, Axis};

use crate::data_manager::DataManager;

pub struct MinMaxCalculation {
    pub nodes: Vec<RemoteAddr>,
    pub min: Option<Array1<f32>>,
    pub max: Option<Array1<f32>>,
}

pub trait MinMaxCalculator {
    fn set_intermediate_minmax(&mut self, min: Array1<f32>, max: Array1<f32>);
    fn calculate_minmax(&mut self, addr: Addr<Self>)
    where
        Self: actix::Actor;
}

impl MinMaxCalculator for DataManager {
    fn set_intermediate_minmax(&mut self, min: Array1<f32>, max: Array1<f32>) {
        self.minmax_calculation.as_mut().unwrap().min = Some(min);
        self.minmax_calculation.as_mut().unwrap().max = Some(max);
    }

    fn calculate_minmax(&mut self, addr: Addr<Self>) {
        let min = self.data.as_ref().unwrap().to_shared().min_axis(Axis(0));
        let max = self.data.as_ref().unwrap().to_shared().max_axis(Axis(0));

        let main = match self.cluster_nodes.get_main_node() {
            None => AnyAddr::Local(addr),
            Some(remote_addr) => {
                let mut remote_addr = remote_addr.clone();
                remote_addr.change_id("DataManager".to_string());
                AnyAddr::Remote(remote_addr)
            }
        };
        main.do_send(MinMaxNodeMessage {
            min,
            max,
            source: RemoteAddr::new_from_id(self.parameters.local_host, "DataManager"),
        });
    }
}

impl Handler<MinMaxNodeMessage> for DataManager {
    type Result = ();

    fn handle(&mut self, msg: MinMaxNodeMessage, ctx: &mut Self::Context) -> Self::Result {
        let minmax_calculation = self.minmax_calculation.as_mut().unwrap();
        minmax_calculation.nodes.push(msg.source);
        match (&minmax_calculation.min, &minmax_calculation.max) {
            (Some(min), Some(max)) => {
                let new_min: Array1<f32> = msg
                    .min
                    .iter()
                    .zip(min.iter())
                    .map(|(sent, local)| sent.min(*local))
                    .collect();
                let new_max: Array1<f32> = msg
                    .max
                    .iter()
                    .zip(max.iter())
                    .map(|(sent, local)| sent.max(*local))
                    .collect();

                minmax_calculation.min = Some(new_min);
                minmax_calculation.max = Some(new_max);
            }
            _ => {
                self.set_intermediate_minmax(msg.min, msg.max);
            }
        }

        if self.minmax_calculation.as_ref().unwrap().nodes.len() == self.parameters.n_cluster_nodes
        {
            for node in self.minmax_calculation.as_ref().unwrap().nodes.iter() {
                let receiving_node = match &node.network_interface {
                    Some(_) => AnyAddr::Remote(node.clone()),
                    None => AnyAddr::Local(ctx.address()),
                };
                receiving_node.do_send(MinMaxDoneMessage {
                    min: self
                        .minmax_calculation
                        .as_ref()
                        .unwrap()
                        .min
                        .as_ref()
                        .unwrap()
                        .clone(),
                    max: self
                        .minmax_calculation
                        .as_ref()
                        .unwrap()
                        .max
                        .as_ref()
                        .unwrap()
                        .clone(),
                });
            }
        }
    }
}