s2gpp 1.0.2

Algorithm for Highly Efficient Detection of Correlation Anomalies in Multivariate Time Series
Documentation
use actix::prelude::*;
use actix_telepathy::prelude::*;
use ndarray::prelude::*;

use std::sync::{Arc, Mutex};

use actix_broker::BrokerSubscribe;
use ndarray_linalg::assert::close_l1;
use std::collections::HashMap;
use std::net::SocketAddr;

use crate::data_manager::messages::DataLoadedAndProcessed;
use crate::data_manager::{DataManager, LoadDataMessage};
use crate::parameters::{Parameters, Role};
use crate::utils::ClusterNodes;
use log::*;
use ndarray::arr3;
use port_scanner::request_open_port;
use rayon::prelude::*;
use tokio::time::{sleep, Duration};

#[derive(Default)]
struct DataResult {
    pub phase_space: Option<ArcArray<f32, Ix3>>,
    pub data_ref: Option<ArcArray<f32, Ix3>>,
}

struct OwnListener {
    pub cluster_nodes: HashMap<usize, RemoteAddr>,
    pub parameters: Parameters,
    pub result: Arc<Mutex<DataResult>>,
}

impl ClusterListener for OwnListener {}
impl Supervised for OwnListener {}

impl OwnListener {
    fn load_data(&mut self, ctx: &mut Context<Self>) {
        let cluster_nodes = ClusterNodes::from(self.cluster_nodes.clone());
        let dm = DataManager::new(
            cluster_nodes.clone(),
            self.parameters.clone(),
            ctx.address().recipient(),
        )
        .start();
        dm.do_send(LoadDataMessage {
            nodes: cluster_nodes,
        });
    }
}

impl Actor for OwnListener {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Context<Self>) {
        self.subscribe_system_async::<ClusterLog>(ctx);
    }
}

impl Handler<ClusterLog> for OwnListener {
    type Result = ();

    fn handle(&mut self, msg: ClusterLog, ctx: &mut Context<Self>) -> Self::Result {
        match msg {
            ClusterLog::NewMember(addr, remote_addr) => {
                debug!("new member {}", addr.to_string());
                match &self.parameters.role {
                    Role::Main { .. } => self.cluster_nodes.insert(1, remote_addr),
                    _ => self.cluster_nodes.insert(0, remote_addr),
                };
                self.load_data(ctx)
            }
            ClusterLog::MemberLeft(_addr) => {}
        }
    }
}

impl Handler<DataLoadedAndProcessed> for OwnListener {
    type Result = ();

    fn handle(&mut self, msg: DataLoadedAndProcessed, _ctx: &mut Self::Context) -> Self::Result {
        let mut dataresult = self.result.lock().unwrap();
        (*dataresult).data_ref = Some(msg.data_ref);
        (*dataresult).phase_space = Some(msg.phase_space);
    }
}

struct TestParams {
    ip: SocketAddr,
    seeds: Vec<SocketAddr>,
    main: bool,
    expected_phase_space: ArcArray<f32, Ix3>,
    expected_data_ref: ArcArray<f32, Ix3>,
}

#[test]
#[ignore] //gitlab workflows don't get the timing right
fn test_data_management() {
    let ip1: SocketAddr = format!("127.0.0.1:{}", request_open_port().unwrap_or(8000))
        .parse()
        .unwrap();
    let ip2: SocketAddr = format!("127.0.0.1:{}", request_open_port().unwrap_or(8000))
        .parse()
        .unwrap();

    let expected_main_phase_space = arr3(&[
        [
            [2.18241731, 4.45332896, 2.18779886],
            [2.66874929, 3.94888408, 2.75649561],
            [2.7519442, 3.59467315, 2.90582321],
            [2.40929932, 2.88509776, 3.00519938],
            [3.19799933, 2.14067433, 3.50562114],
            [3.16218139, 2.19186071, 3.4127196],
            [3.35120425, 2.04415429, 3.52248153],
            [2.87850828, 2.52159156, 3.21316997],
            [2.05225496, 3.13280805, 3.09949073],
            [1.96246017, 3.51603486, 2.89856417],
            [1.64841517, 3.98851209, 3.0967204],
            [2.10740366, 3.59520781, 2.84686332],
            [2.40328684, 3.04233411, 2.85587264],
            [2.74209838, 2.48709708, 2.71956808],
        ],
        [
            [2.66874929, 3.94888408, 2.75649561],
            [2.7519442, 3.59467315, 2.90582321],
            [2.40929932, 2.88509776, 3.00519938],
            [3.19799933, 2.14067433, 3.50562114],
            [3.16218139, 2.19186071, 3.4127196],
            [3.35120425, 2.04415429, 3.52248153],
            [2.87850828, 2.52159156, 3.21316997],
            [2.05225496, 3.13280805, 3.09949073],
            [1.96246017, 3.51603486, 2.89856417],
            [1.64841517, 3.98851209, 3.0967204],
            [2.10740366, 3.59520781, 2.84686332],
            [2.40328684, 3.04233411, 2.85587264],
            [2.74209838, 2.48709708, 2.71956808],
            [2.59091275, 2.34457487, 2.3470922],
        ],
        [
            [2.7519442, 3.59467315, 2.90582321],
            [2.40929932, 2.88509776, 3.00519938],
            [3.19799933, 2.14067433, 3.50562114],
            [3.16218139, 2.19186071, 3.4127196],
            [3.35120425, 2.04415429, 3.52248153],
            [2.87850828, 2.52159156, 3.21316997],
            [2.05225496, 3.13280805, 3.09949073],
            [1.96246017, 3.51603486, 2.89856417],
            [1.64841517, 3.98851209, 3.0967204],
            [2.10740366, 3.59520781, 2.84686332],
            [2.40328684, 3.04233411, 2.85587264],
            [2.74209838, 2.48709708, 2.71956808],
            [2.59091275, 2.34457487, 2.3470922],
            [3.10206848, 2.36022877, 2.47890813],
        ],
    ]);
    let expected_main_data_ref = arr3(&[
        [
            [0.03510282, 0.23744748, 0.0171921],
            [0.03510282, 0.23744748, 0.0171921],
            [0.03510282, 0.23744748, 0.0171921],
            [0.03510282, 0.23744748, 0.0171921],
            [0.03510282, 0.23744748, 0.0171921],
            [0.03510282, 0.23744748, 0.0171921],
            [0.03510282, 0.23744748, 0.0171921],
            [0.03510282, 0.23744748, 0.0171921],
            [0.03510282, 0.23744748, 0.0171921],
            [0.03510282, 0.23744748, 0.0171921],
            [0.03510282, 0.23744748, 0.0171921],
            [0.03510282, 0.23744748, 0.0171921],
            [0.03510282, 0.23744748, 0.0171921],
            [0.03510282, 0.23744748, 0.0171921],
        ],
        [
            [0.0953012, 0.29507496, 0.07676196],
            [0.0953012, 0.29507496, 0.07676196],
            [0.0953012, 0.29507496, 0.07676196],
            [0.0953012, 0.29507496, 0.07676196],
            [0.0953012, 0.29507496, 0.07676196],
            [0.0953012, 0.29507496, 0.07676196],
            [0.0953012, 0.29507496, 0.07676196],
            [0.0953012, 0.29507496, 0.07676196],
            [0.0953012, 0.29507496, 0.07676196],
            [0.0953012, 0.29507496, 0.07676196],
            [0.0953012, 0.29507496, 0.07676196],
            [0.0953012, 0.29507496, 0.07676196],
            [0.0953012, 0.29507496, 0.07676196],
            [0.0953012, 0.29507496, 0.07676196],
        ],
        [
            [0.15549958, 0.35270244, 0.13633183],
            [0.15549958, 0.35270244, 0.13633183],
            [0.15549958, 0.35270244, 0.13633183],
            [0.15549958, 0.35270244, 0.13633183],
            [0.15549958, 0.35270244, 0.13633183],
            [0.15549958, 0.35270244, 0.13633183],
            [0.15549958, 0.35270244, 0.13633183],
            [0.15549958, 0.35270244, 0.13633183],
            [0.15549958, 0.35270244, 0.13633183],
            [0.15549958, 0.35270244, 0.13633183],
            [0.15549958, 0.35270244, 0.13633183],
            [0.15549958, 0.35270244, 0.13633183],
            [0.15549958, 0.35270244, 0.13633183],
            [0.15549958, 0.35270244, 0.13633183],
        ],
    ]);
    let expected_sub_phase_space = arr3(&[
        [
            [3.8206407, 2.36796312, 2.04560368],
            [3.69942443, 2.69881685, 1.92483223],
            [3.43937846, 2.47279845, 2.82239268],
            [2.68208942, 2.62061081, 2.47343859],
            [2.7694733, 2.76932965, 3.06451969],
            [3.11964936, 2.8656878, 3.40156312],
            [3.52588479, 2.65253322, 3.51483735],
            [3.70875014, 2.34566095, 4.06043449],
            [3.81531461, 2.84629147, 3.61087337],
            [4.01906744, 3.09874158, 3.6589223],
            [3.61144769, 2.81179194, 3.08275119],
            [2.90571607, 2.68702359, 2.5996823],
            [3.065959, 2.82708009, 2.32338734],
            [2.94808153, 3.21541566, 2.10343877],
        ],
        [
            [3.69942443, 2.69881685, 1.92483223],
            [3.43937846, 2.47279845, 2.82239268],
            [2.68208942, 2.62061081, 2.47343859],
            [2.7694733, 2.76932965, 3.06451969],
            [3.11964936, 2.8656878, 3.40156312],
            [3.52588479, 2.65253322, 3.51483735],
            [3.70875014, 2.34566095, 4.06043449],
            [3.81531461, 2.84629147, 3.61087337],
            [4.01906744, 3.09874158, 3.6589223],
            [3.61144769, 2.81179194, 3.08275119],
            [2.90571607, 2.68702359, 2.5996823],
            [3.065959, 2.82708009, 2.32338734],
            [2.94808153, 3.21541566, 2.10343877],
            [3.10133045, 2.82264117, 2.03362369],
        ],
        [
            [3.43937846, 2.47279845, 2.82239268],
            [2.68208942, 2.62061081, 2.47343859],
            [2.7694733, 2.76932965, 3.06451969],
            [3.11964936, 2.8656878, 3.40156312],
            [3.52588479, 2.65253322, 3.51483735],
            [3.70875014, 2.34566095, 4.06043449],
            [3.81531461, 2.84629147, 3.61087337],
            [4.01906744, 3.09874158, 3.6589223],
            [3.61144769, 2.81179194, 3.08275119],
            [2.90571607, 2.68702359, 2.5996823],
            [3.065959, 2.82708009, 2.32338734],
            [2.94808153, 3.21541566, 2.10343877],
            [3.10133045, 2.82264117, 2.03362369],
            [3.11609803, 2.0462867, 1.60809344],
        ],
    ]);
    let expected_sub_data_ref = expected_main_data_ref.clone();

    let arr = [
        TestParams {
            ip: ip1.clone(),
            seeds: vec![],
            main: true,
            expected_phase_space: expected_main_phase_space.to_shared(),
            expected_data_ref: expected_main_data_ref.to_shared(),
        },
        TestParams {
            ip: ip2.clone(),
            seeds: vec![ip1.clone()],
            main: false,
            expected_phase_space: expected_sub_phase_space.to_shared(),
            expected_data_ref: expected_sub_data_ref.to_shared(),
        },
    ];
    arr.into_par_iter().for_each(|p| {
        start_reading(
            p.ip,
            p.seeds.clone(),
            p.main,
            p.expected_phase_space.clone(),
            p.expected_data_ref.clone(),
        )
    });
}

#[actix_rt::main]
async fn start_reading(
    ip_address: SocketAddr,
    seed_nodes: Vec<SocketAddr>,
    main: bool,
    expected_phase_space: ArcArray<f32, Ix3>,
    expected_data_ref: ArcArray<f32, Ix3>,
) {
    let result: Arc<Mutex<DataResult>> = Arc::new(Mutex::new(DataResult::default()));

    let parameters = Parameters {
        role: if main {
            Role::Main {
                data_path: Some("data/test.csv".to_string()),
            }
        } else {
            Role::Sub {
                mainhost: seed_nodes[0],
            }
        },
        local_host: ip_address,
        pattern_length: 20,
        latent: 6,
        rate: 100,
        n_threads: 1,
        n_cluster_nodes: 2,
        ..Default::default()
    };

    let _listener = OwnListener {
        cluster_nodes: HashMap::new(),
        parameters,
        result: result.clone(),
    }
    .start();

    sleep(Duration::from_millis(200)).await;

    let _cluster = Cluster::new(ip_address, seed_nodes.clone());

    sleep(Duration::from_millis(200)).await;

    let data_result = result.lock().unwrap();

    println!(
        "phase_space {:?}",
        data_result.phase_space.as_ref().unwrap().shape()
    );
    println!(
        "data_ref {:?}",
        data_result.data_ref.as_ref().unwrap().shape()
    );

    close_l1(
        &data_result
            .phase_space
            .as_ref()
            .unwrap()
            .slice(s![..3_usize, .., ..])
            .to_owned(),
        &expected_phase_space,
        0.0005,
    );
    close_l1(
        &data_result
            .data_ref
            .as_ref()
            .unwrap()
            .slice(s![..3_usize, .., ..])
            .to_owned(),
        &expected_data_ref,
        0.0005,
    );
}