arnalisa 0.6.8

Pipeline system for calculating values
Documentation
//! A bin that calculates a linear regression.
//!
//! ```text
//!   ┌───[linear_regression]───┐
//!  ⇒│x                      b0│⇒
//!  ⇒│y                      b1│⇒
//!  ⇒│num_items           mid_x│⇒
//!   │                    mid_y│⇒
//!   └─────────────────────────┘
//! ```

use super::{
    build_names, BinBuildEnvironment, BinDescription, Calculator,
    FetchItem, GetCalibration, Item, Iteration, Result, Scope, SinkBin,
    SinkNames, SourceBin, SourceId, SourceNames, SourceSinkBinDescription,
    WriteDotSimple, SINK_X, SINK_Y,
};
use crate::{error, R64};
use indexmap::IndexSet;
use std::collections::VecDeque;

static BIN_TYPE: &str = "linear_regression";
static SINK_NUM_ITEMS: &str = "num_items";
static SOURCE_B0: &str = "b0";
static SOURCE_B1: &str = "b1";
static SOURCE_MID_X: &str = "mid_x";
static SOURCE_MID_Y: &str = "mid_y";

#[derive(Debug, Clone)]
struct Point {
    x: R64,
    y: R64,
}

/// A bin that calculates a linear regression.
#[derive(Debug)]
pub struct Bin {
    scope: Scope,

    source_x: Box<dyn FetchItem>,
    source_y: Box<dyn FetchItem>,
    source_num_items: Box<dyn FetchItem>,

    fifo: VecDeque<Point>,
    x_sliding_sum: R64,
    y_sliding_sum: R64,
    sliding_count: usize,

    result_b0: Item,
    result_b1: Item,
    result_mid_x: Item,
    result_mid_y: Item,
}

impl SinkBin for Bin {}

impl SourceBin for Bin {
    fn get_source_data(&self, source: &SourceId) -> Result<Item> {
        if source.id == SOURCE_B0 {
            Ok(self.result_b0.clone())
        } else if source.id == SOURCE_B1 {
            Ok(self.result_b1.clone())
        } else if source.id == SOURCE_MID_X {
            Ok(self.result_mid_x.clone())
        } else if source.id == SOURCE_MID_Y {
            Ok(self.result_mid_y.clone())
        } else {
            error::MissingSourceName {
                scope: self.scope.clone(),
                name: source.id.to_string(),
                bin_type: BIN_TYPE.to_string(),
            }
            .fail()
        }
    }
}

impl Calculator for Bin {
    fn calculate(&mut self, _iteration: &Iteration) -> Result<()> {
        let x = self.source_x.fetch_item(&self.scope)?;
        let y = self.source_y.fetch_item(&self.scope)?;
        let num_items = self.source_num_items.fetch_item(&self.scope)?;

        let num_items = num_items.to_usize().unwrap_or_default();

        if let (Ok(x), Ok(y)) = (x.to_float(), y.to_float()) {
            self.fifo.push_back(Point { x, y });
            self.x_sliding_sum += x;
            self.y_sliding_sum += y;
            self.sliding_count += 1;
        }

        while num_items < self.fifo.len() {
            if let Some(Point { x, y }) = self.fifo.pop_front() {
                self.x_sliding_sum -= x;
                self.y_sliding_sum -= y;
                self.sliding_count -= 1;
            }
        }

        self.result_b0 = Item::Nothing;
        self.result_b1 = Item::Nothing;
        self.result_mid_x = Item::Nothing;
        self.result_mid_y = Item::Nothing;

        if num_items != 0 && self.sliding_count == num_items {
            let sliding_count = R64::from(self.sliding_count as f64);
            let mean_x = self.x_sliding_sum / sliding_count;
            let mean_y = self.y_sliding_sum / sliding_count;

            let mut ss_xy = R64::from(0f64);
            let mut ss_xx = R64::from(0f64);
            for p in &self.fifo {
                let x = p.x;
                let y = p.y;
                ss_xy += x * y;
                ss_xx += x * x;
            }
            ss_xx -= sliding_count * mean_x * mean_x;
            ss_xy -= sliding_count * mean_y * mean_x;

            if ss_xx != 0f64 {
                let b1 = ss_xy / ss_xx;
                let b0 = mean_y - b1 * mean_x;

                self.result_b0 = b0.into();
                self.result_b1 = b1.into();
            }

            let Point { x, y } = self.fifo[num_items / 2];
            self.result_mid_x = x.into();
            self.result_mid_y = y.into();
        }

        Ok(())
    }
}

/// Description for the linear_regression bin.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Description;

impl BinDescription for Description {
    type Bin = Bin;

    fn check_validity(
        &self,
        _scope: &Scope,
        _get_calibration: &mut dyn GetCalibration,
    ) -> Result<()> {
        Ok(())
    }

    fn bin_type(&self) -> &'static str {
        BIN_TYPE
    }
}

impl SinkNames for Description {
    fn sink_names(&self) -> IndexSet<String> {
        build_names(&[SINK_X, SINK_Y, SINK_NUM_ITEMS])
    }
}

impl SourceNames for Description {
    fn source_names(&self) -> Result<IndexSet<String>> {
        Ok(build_names(&[
            SOURCE_B0,
            SOURCE_B1,
            SOURCE_MID_X,
            SOURCE_MID_Y,
        ]))
    }
}

impl SourceSinkBinDescription for Description {
    fn build_bin(
        &self,
        scope: &Scope,
        env: &mut dyn BinBuildEnvironment,
    ) -> Result<Self::Bin> {
        Ok(Bin {
            scope: scope.clone(),

            source_x: env.resolve(SINK_X)?,
            source_y: env.resolve(SINK_Y)?,
            source_num_items: env.resolve(SINK_NUM_ITEMS)?,

            fifo: VecDeque::new(),
            x_sliding_sum: R64::from(0f64),
            y_sliding_sum: R64::from(0f64),
            sliding_count: 0usize,

            result_b0: Item::Nothing,
            result_b1: Item::Nothing,
            result_mid_x: Item::Nothing,
            result_mid_y: Item::Nothing,
        })
    }
}

impl WriteDotSimple for Description {}

#[cfg(test)]
mod tests {
    use super::Description;
    use crate::bins::{directsource, verificationsink};
    use crate::Item as I;
    use crate::{run_bin, Result};
    use indexmap::indexset;

    #[test]
    fn simulate() -> Result<()> {
        use crate::Item::*;

        let input = directsource::Description {
            columns: indexset![
                "x".to_string(),
                "y".to_string(),
                "num_items".to_string(),
            ],
            rows: vec![
                vec![Nothing, I::from(5.0f64), I::from(4.0f64)],
                vec![Nothing, I::from(5.0f64), I::from(4.0f64)],
                vec![Nothing, I::from(5.0f64), I::from(4.0f64)],
                vec![Nothing, I::from(5.0f64), I::from(4.0f64)],
                vec![I::from(0.0f64), I::from(5.0f64), I::from(4.0f64)],
                vec![I::from(1.0f64), I::from(5.0f64), I::from(4.0f64)],
                vec![I::from(2.0f64), I::from(5.0f64), I::from(4.0f64)],
                vec![I::from(3.0f64), I::from(5.0f64), I::from(4.0f64)],
                vec![I::from(4.0f64), I::from(10.0f64), I::from(4.0f64)],
                vec![I::from(5.0f64), I::from(10.0f64), I::from(4.0f64)],
                vec![I::from(6.0f64), I::from(10.0f64), I::from(4.0f64)],
                vec![I::from(7.0f64), I::from(10.0f64), I::from(4.0f64)],
                vec![I::from(8.0f64), I::from(0.0f64), I::from(4.0f64)],
                vec![I::from(9.0f64), I::from(0.0f64), I::from(4.0f64)],
                vec![I::from(10.0f64), I::from(0.0f64), I::from(4.0f64)],
                vec![I::from(11.0f64), I::from(0.0f64), I::from(4.0f64)],
            ]
            .into(),
        };
        let verification = verificationsink::Description {
            columns: indexset![
                "b0".to_string(),
                "b1".to_string(),
                "mid_x".to_string(),
                "mid_y".to_string(),
            ],
            expected: vec![
                vec![Nothing, Nothing, Nothing, Nothing],
                vec![Nothing, Nothing, Nothing, Nothing],
                vec![Nothing, Nothing, Nothing, Nothing],
                vec![Nothing, Nothing, Nothing, Nothing],
                vec![Nothing, Nothing, Nothing, Nothing],
                vec![Nothing, Nothing, Nothing, Nothing],
                vec![Nothing, Nothing, Nothing, Nothing],
                vec![
                    I::from(5.0f64),
                    I::from(0.0f64),
                    I::from(2f64),
                    I::from(5f64),
                ],
                vec![
                    I::from(2.5f64),
                    I::from(1.5f64),
                    I::from(3f64),
                    I::from(5f64),
                ],
                vec![
                    I::from(0.5f64),
                    I::from(2.0f64),
                    I::from(4f64),
                    I::from(10f64),
                ],
                vec![
                    I::from(2f64),
                    I::from(1.5f64),
                    I::from(5f64),
                    I::from(10f64),
                ],
                vec![
                    I::from(10f64),
                    I::from(0.0f64),
                    I::from(6f64),
                    I::from(10f64),
                ],
                vec![
                    I::from(27f64),
                    I::from(-3f64),
                    I::from(7f64),
                    I::from(10f64),
                ],
                vec![
                    I::from(35f64),
                    I::from(-4f64),
                    I::from(8f64),
                    I::from(0f64),
                ],
                vec![
                    I::from(28f64),
                    I::from(-3f64),
                    I::from(9f64),
                    I::from(0f64),
                ],
                vec![
                    I::from(0f64),
                    I::from(0f64),
                    I::from(10f64),
                    I::from(0f64),
                ],
            ]
            .into(),
        };

        run_bin(&input, &Description {}, &verification)
    }
}