arnalisa 0.6.8

Pipeline system for calculating values
Documentation
//! A bin that calculates the last calm point of a curve.
//! The calm point means that the `y` input value has not changed by
//! more than `y_max_delta` since this point.
//!
//! ```text
//!   ┌───[last_calm_point]───┐
//!  ⇒│x                calm_x│⇒
//!  ⇒│y                calm_y│⇒
//!  ⇒│y_max_delta            │
//!   └───────────────────────┘
//! ```

use super::{
    build_names, BinBuildEnvironment, BinDescription, Calculator,
    FetchItem, GetCalibration, Item, Iteration, Result, Scope, SinkBin,
    SinkNames, SourceBin, SourceId, SourceNames, SourceSinkBinDescription,
    WriteDotSimple, SINK_X, SINK_Y,
};
use crate::error;
use crate::R64;
use indexmap::{IndexMap, IndexSet};

static BIN_TYPE: &str = "last_calm_point";
static SINK_Y_MAX_DELTA: &str = "y_max_delta";
static SOURCE_CALM_X: &str = "calm_x";
static SOURCE_CALM_Y: &str = "calm_y";

#[derive(Debug, Clone)]
struct Point {
    x: R64,
    y: R64,
}

/// A bin calculates the last calm point of a curve.
#[derive(Debug)]
pub struct Bin {
    scope: Scope,

    source_x: Box<dyn FetchItem>,
    source_y: Box<dyn FetchItem>,
    source_y_max_delta: Box<dyn FetchItem>,

    curve: IndexMap<R64, R64>,
    min: Option<Point>,
    max: Option<Point>,
    last_calm: Option<Point>,

    result_calm_x: Item,
    result_calm_y: Item,
}

impl SinkBin for Bin {}

impl SourceBin for Bin {
    fn get_source_data(&self, source: &SourceId) -> Result<Item> {
        if source.id == SOURCE_CALM_X {
            Ok(self.result_calm_x.clone())
        } else if source.id == SOURCE_CALM_Y {
            Ok(self.result_calm_y.clone())
        } else {
            error::MissingSourceName {
                scope: self.scope.clone(),
                name: source.id.to_string(),
                bin_type: BIN_TYPE.to_string(),
            }
            .fail()
        }
    }
}

impl Calculator for Bin {
    fn calculate(&mut self, _iteration: &Iteration) -> Result<()> {
        let x = self.source_x.fetch_item(&self.scope)?;
        let y = self.source_y.fetch_item(&self.scope)?;
        let y_max_delta =
            self.source_y_max_delta.fetch_item(&self.scope)?;

        match (x.to_float(), y.to_float(), y_max_delta.to_float()) {
            (Ok(_), Ok(_), Ok(y_max_delta)) if y_max_delta <= 0f64 => {
                // Clear when y_max_delta is zero or lower
                self.clear();
            }
            (Ok(_), Err(_), _)
            | (Err(_), Ok(_), _)
            | (Err(_), Err(_), _) => {
                // Do nothing when either X or Y is not a float
            }
            (Ok(_), Ok(_), Err(_)) => {
                // Clear when y_max_delta is not a float
                self.clear();
            }
            (Ok(x), Ok(y), Ok(y_max_delta)) => {
                let point = Point { x, y };
                self.add_point(&point);

                if self.current_delta() > y_max_delta
                    || self.last_calm.is_none()
                {
                    self.update_calculation(y_max_delta);
                }
            }
        }

        let (calm_x, calm_y) = match self.last_calm {
            Some(Point { ref x, ref y }) => {
                (Item::from(*x), Item::from(*y))
            }
            None => (Item::Nothing, Item::Nothing),
        };
        self.result_calm_x = calm_x;
        self.result_calm_y = calm_y;
        Ok(())
    }
}

impl Bin {
    fn clear(&mut self) {
        self.curve.clear();
        self.min = None;
        self.max = None;
        self.last_calm = None;
    }

    fn add_point(&mut self, p: &Point) {
        self.curve.insert(p.x, p.y);
        self.curve.sort_keys();

        self.min = Self::min_point(&self.min, &Some(p.clone()));
        self.max = Self::max_point(&self.max, &Some(p.clone()));
    }

    fn min_point(a: &Option<Point>, b: &Option<Point>) -> Option<Point> {
        match (a, b) {
            (&Some(ref a), &Some(ref b)) => {
                if a.y < b.y {
                    Some(a.clone())
                } else {
                    Some(b.clone())
                }
            }
            (&Some(ref a), &None) => Some(a.clone()),
            (&None, &Some(ref b)) => Some(b.clone()),
            (&None, &None) => None,
        }
    }

    fn max_point(a: &Option<Point>, b: &Option<Point>) -> Option<Point> {
        match (a, b) {
            (&Some(ref a), &Some(ref b)) => {
                if a.y > b.y {
                    Some(a.clone())
                } else {
                    Some(b.clone())
                }
            }
            (&Some(ref a), &None) => Some(a.clone()),
            (&None, &Some(ref b)) => Some(b.clone()),
            (&None, &None) => None,
        }
    }

    fn delta(a: &Option<Point>, b: &Option<Point>) -> R64 {
        use decorum::Real;
        match (a, b) {
            (&Some(ref a), &Some(ref b)) => {
                (a.clone().y - b.clone().y).abs()
            }
            _ => R64::from(0f64),
        }
    }

    fn current_delta(&self) -> R64 {
        use decorum::Real;
        match (&self.min, &self.max) {
            (
                &Some(Point {
                    x: ref _min_x,
                    y: ref min_y,
                }),
                &Some(Point {
                    x: ref _max_x,
                    y: ref max_y,
                }),
            ) => (*max_y - *min_y).abs(),
            _ => R64::from(0f64),
        }
    }

    fn update_calculation(&mut self, max_delta: R64) {
        let mut min = None;
        let mut max = None;

        let mut keep = true;

        let mut recalculated = self
            .curve
            .iter()
            .rev()
            .filter_map(|(x, y)| {
                if keep {
                    let point = Point { x: *x, y: *y };

                    let delta: R64 = {
                        let mut delta = R64::from(0f64);

                        delta = delta
                            .max(Self::delta(&Some(point.clone()), &min));
                        delta = delta
                            .max(Self::delta(&Some(point.clone()), &max));
                        delta = delta.max(Self::delta(&min, &max));
                        delta
                    };
                    min = Self::min_point(&min, &Some(point.clone()));
                    max = Self::max_point(&max, &Some(point.clone()));

                    keep = delta <= max_delta;
                    if keep {
                        Some((*x, *y))
                    } else {
                        None
                    }
                } else {
                    None
                }
            })
            .collect::<IndexMap<R64, R64>>();
        recalculated.sort_keys();

        self.min = min;
        self.max = max;
        self.curve = recalculated;
        self.last_calm = self
            .curve
            .iter()
            .map(|(x, y)| Point { x: *x, y: *y })
            .next();
    }
}

/// Description of the last_calm_point bin.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Description;

impl BinDescription for Description {
    type Bin = Bin;

    fn check_validity(
        &self,
        _scope: &Scope,
        _get_calibration: &mut dyn GetCalibration,
    ) -> Result<()> {
        Ok(())
    }

    fn bin_type(&self) -> &'static str {
        BIN_TYPE
    }
}

impl SinkNames for Description {
    fn sink_names(&self) -> IndexSet<String> {
        build_names(&[SINK_X, SINK_Y, SINK_Y_MAX_DELTA])
    }
}

impl SourceNames for Description {
    fn source_names(&self) -> Result<IndexSet<String>> {
        Ok(build_names(&[SOURCE_CALM_X, SOURCE_CALM_Y]))
    }
}

impl SourceSinkBinDescription for Description {
    fn build_bin(
        &self,
        scope: &Scope,
        env: &mut dyn BinBuildEnvironment,
    ) -> Result<Self::Bin> {
        Ok(Bin {
            scope: scope.clone(),

            source_x: env.resolve(SINK_X)?,
            source_y: env.resolve(SINK_Y)?,
            source_y_max_delta: env.resolve(SINK_Y_MAX_DELTA)?,

            curve: IndexMap::new(),
            min: None,
            max: None,
            last_calm: None,
            result_calm_x: Item::Nothing,
            result_calm_y: Item::Nothing,
        })
    }
}

impl WriteDotSimple for Description {}

#[cfg(test)]
mod tests {
    use super::Description;
    use crate::bins::{directsource, verificationsink};
    use crate::Item as I;
    use crate::{run_bin, Result};
    use indexmap::indexset;

    #[test]
    fn simulate() -> Result<()> {
        let input = directsource::Description {
            columns: indexset![
                "x".to_string(),
                "y".to_string(),
                "y_max_delta".to_string(),
            ],
            rows: vec![
                vec![I::from(0.0f64), I::from(9.0f64), I::from(3.0f64)],
                vec![I::from(1.0f64), I::from(0.0f64), I::from(3.0f64)],
                vec![I::from(2.0f64), I::from(3.0f64), I::from(3.0f64)],
                vec![I::from(3.0f64), I::from(7.0f64), I::from(3.0f64)],
                vec![I::from(4.0f64), I::from(4.0f64), I::from(3.0f64)],
                vec![I::from(5.0f64), I::from(5.0f64), I::from(3.0f64)],
                vec![I::from(6.0f64), I::from(5.0f64), I::from(3.0f64)],
                vec![I::from(7.0f64), I::from(5.0f64), I::from(3.0f64)],
                vec![I::from(8.0f64), I::from(5.0f64), I::from(3.0f64)],
                vec![I::from(9.0f64), I::from(5.0f64), I::from(3.0f64)],
                vec![I::from(10.0f64), I::from(0.0f64), I::from(3.0f64)],
                vec![I::from(11.0f64), I::from(0.0f64), I::from(3.0f64)],
                vec![I::from(12.0f64), I::from(0.0f64), I::from(3.0f64)],
                vec![I::from(13.0f64), I::from(0.0f64), I::from(3.0f64)],
                vec![I::from(14.0f64), I::from(0.0f64), I::from(3.0f64)],
                vec![I::from(15.0f64), I::from(3.0f64), I::from(3.0f64)],
                vec![I::from(16.0f64), I::from(0.0f64), I::from(3.0f64)],
                vec![I::from(17.0f64), I::from(3.0f64), I::from(3.0f64)],
                vec![I::from(18.0f64), I::from(0.0f64), I::from(3.0f64)],
                vec![I::from(19.0f64), I::from(3.0f64), I::from(3.0f64)],
            ]
            .into(),
        };
        let verification = verificationsink::Description {
            columns: indexset!["calm_x".to_string(), "calm_y".to_string()],
            expected: vec![
                vec![I::from(0.0f64), I::from(9.0f64)],
                vec![I::from(1.0f64), I::from(0.0f64)],
                vec![I::from(1.0f64), I::from(0.0f64)],
                vec![I::from(3.0f64), I::from(7.0f64)],
                vec![I::from(3.0f64), I::from(7.0f64)],
                vec![I::from(3.0f64), I::from(7.0f64)],
                vec![I::from(3.0f64), I::from(7.0f64)],
                vec![I::from(3.0f64), I::from(7.0f64)],
                vec![I::from(3.0f64), I::from(7.0f64)],
                vec![I::from(3.0f64), I::from(7.0f64)],
                vec![I::from(10.0f64), I::from(0.0f64)],
                vec![I::from(10.0f64), I::from(0.0f64)],
                vec![I::from(10.0f64), I::from(0.0f64)],
                vec![I::from(10.0f64), I::from(0.0f64)],
                vec![I::from(10.0f64), I::from(0.0f64)],
                vec![I::from(10.0f64), I::from(0.0f64)],
                vec![I::from(10.0f64), I::from(0.0f64)],
                vec![I::from(10.0f64), I::from(0.0f64)],
                vec![I::from(10.0f64), I::from(0.0f64)],
                vec![I::from(10.0f64), I::from(0.0f64)],
            ]
            .into(),
        };

        run_bin(&input, &Description {}, &verification)
    }
}