s2gpp 1.0.2

Algorithm for Highly Efficient Detection of Correlation Anomalies in Multivariate Time Series
Documentation
use actix::io::SinkWrite;
use actix::{Actor, System};
use futures_sink::Sink;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::mpsc;

pub struct MySink<T>
where
    T: Unpin,
{
    sender: mpsc::UnboundedSender<T>,
    content: Option<T>,
}

impl<T> MySink<T>
where
    T: Unpin,
{
    pub fn new(sender: mpsc::UnboundedSender<T>) -> Self {
        MySink {
            sender,
            content: None,
        }
    }
}

impl<T> Sink<T> for MySink<T>
where
    T: Unpin + Clone,
{
    type Error = ();

    fn poll_ready(self: Pin<&mut Self>, _ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        let this = self.get_mut();
        match &this.content {
            Some(content) => {
                let _r = this.sender.send(content.clone());
                Poll::Ready(Ok(()))
            }
            None => Poll::Ready(Ok(())),
        }
    }

    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
        self.get_mut().content = Some(item);
        Ok(())
    }

    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.poll_ready(cx)
    }

    fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        self.poll_ready(cx)
    }
}

pub struct SinkActor<T>
where
    T: Unpin + Clone,
{
    pub sink: SinkWrite<T, MySink<T>>,
    pub result: Option<T>,
}

impl<T> SinkActor<T>
where
    T: Unpin + Clone,
{
    pub fn new(sink: SinkWrite<T, MySink<T>>) -> Self {
        Self { sink, result: None }
    }
}

impl<T> Actor for SinkActor<T>
where
    T: Unpin + 'static + Clone,
{
    type Context = actix::Context<SinkActor<T>>;
}

impl<T> actix::io::WriteHandler<()> for SinkActor<T>
where
    T: Unpin + 'static + Clone,
{
    fn finished(&mut self, _ctxt: &mut Self::Context) {
        System::current().stop();
    }
}