s2gpp 1.0.2

Algorithm for Highly Efficient Detection of Correlation Anomalies in Multivariate Time Series
Documentation
mod helper;
mod messages;

pub use crate::data_manager::preprocessor::messages::{
    PreprocessColumnMessage, PreprocessingDoneMessage, ProcessedColumnMessage,
};
use actix::{Addr, AsyncContext, Handler, SyncArbiter};
use ndarray::ArcArray2;

use crate::data_manager::preprocessor::helper::PreprocessorHelper;

use crate::messages::PoisonPill;

use crate::data_manager::DataManager;

pub struct Preprocessing {
    helpers: Addr<PreprocessorHelper>,
    n_cols_total: usize,
    n_cols_processed: usize,
    n_cols_distributed: usize,
}

impl Preprocessing {
    pub fn new(data: ArcArray2<f32>, n_threads: usize, window_size: usize) -> Self {
        let n_cols_total = data.ncols();
        let helpers = SyncArbiter::start(n_threads, move || {
            PreprocessorHelper::new(data.clone(), window_size)
        });

        Self {
            helpers,
            n_cols_total,
            n_cols_processed: 0,
            n_cols_distributed: 0,
        }
    }
}

pub trait Preprocessor {
    fn distribute_work(&mut self, source: Addr<Self>)
    where
        Self: actix::Actor;
}

impl Preprocessor for DataManager {
    fn distribute_work(&mut self, source: Addr<Self>) {
        let preprocessing = self.preprocessing.as_mut().unwrap();
        let max_distribution = self.parameters.n_threads
            - (preprocessing.n_cols_distributed - preprocessing.n_cols_processed);

        for (i, column) in
            (preprocessing.n_cols_distributed..preprocessing.n_cols_total).enumerate()
        {
            if i < max_distribution {
                preprocessing.helpers.do_send(PreprocessColumnMessage {
                    column,
                    source: source.clone(),
                    std: 0.0,
                });
                preprocessing.n_cols_distributed += 1;
            } else {
                break;
            }
        }
    }
}

impl Handler<ProcessedColumnMessage> for DataManager {
    type Result = ();

    fn handle(&mut self, _msg: ProcessedColumnMessage, ctx: &mut Self::Context) -> Self::Result {
        let preprocessing = self.preprocessing.as_mut().unwrap();
        preprocessing.n_cols_processed += 1;
        if preprocessing.n_cols_processed == preprocessing.n_cols_total {
            ctx.address().do_send(PreprocessingDoneMessage);
            preprocessing.helpers.do_send(PoisonPill);
        } else {
            self.distribute_work(ctx.address());
        }
    }
}