s2gpp 1.0.2

Algorithm for Highly Efficient Detection of Correlation Anomalies in Multivariate Time Series
Documentation
mod sink;
#[cfg(test)]
mod tests;

use crate::interface::sink::{MySink, SinkActor};
use crate::training::DetectionResponse;
use crate::{Parameters, StartTrainingMessage, Training};
use actix::io::SinkWrite;
use actix::{Actor, Handler};
use anyhow::{Error, Result};
use ndarray::{Array1, Array2};
use tokio::sync::mpsc;

pub trait SyncInterface<A> {
    fn init(parameters: Parameters) -> Self;
    fn fit(&mut self, data: Array2<A>) -> Result<SyncResult>;
}

pub type SyncResult = Array1<f32>;

impl Handler<DetectionResponse> for SinkActor<SyncResult> {
    type Result = ();

    fn handle(&mut self, msg: DetectionResponse, _ctx: &mut Self::Context) -> Self::Result {
        let _ = self.sink.write(msg.anomaly_score);
        self.sink.close()
    }
}

pub async fn actor_fit(actor: Training, data: Array2<f32>) -> Result<SyncResult> {
    let (sender, mut receiver) = mpsc::unbounded_channel();

    let sink_actor = SinkActor::create(move |ctx| {
        let sink = MySink::new(sender);
        SinkActor::new(SinkWrite::new(sink, ctx))
    });

    let addr = actor.start();
    addr.do_send(StartTrainingMessage {
        nodes: Default::default(),
        source: Some(sink_actor.recipient()),
        data: Some(data),
    });

    if let Some(r) = receiver.recv().await {
        Ok(r)
    } else {
        Err(Error::msg("Await resulted in None value!"))
    }
}