arnalisa 0.6.8

Pipeline system for calculating values
Documentation
//! A bin that outputs the values of a XIO job run.
//!
//! ```text
//!   ┌────[xiosource]────┐
//!   │         <output 0>│⇒
//!   │         <output 1>│⇒
//!   │         <output 2>│⇒
//!   ┊         …         ┊⇒
//!   │         <output n>│⇒
//!   └───────────────────┘
//! ```

use crate::bins::{
    BinDescription, Iteration, SourceBin, SourceId, SourceNames,
    SourceOnlyBin, SourceOnlyBinDescription,
};
use crate::{error, GetCalibration, Item, Proceed, Result, Scope};
use crossbeam_channel::Receiver;
use indexmap::{IndexMap, IndexSet};
use log::debug;
use snafu::ensure;
use snafu::OptionExt;

/// The current internal state of the bin.
#[derive(Clone, Debug)]
pub enum BinState {
    /// Initialized, waiting for the `Started` event.
    Initialized,
    /// Job was Started, waiting for the `MeasurementData` or `Finished`
    /// event.
    WaitingForData {
        /// The tags received by the `Started` event.
        tags: IndexMap<String, Vec<String>>,
    },
    /// Already received some data, waiting for more `MeasurementData`
    /// events or a `Finished` event.
    Running {
        /// The tags received by the `Started` event.
        tags: IndexMap<String, Vec<String>>,
        /// The last measurement data row.
        data: IndexMap<String, Item>,
    },
    /// Finished state, the measurement was successfully finished.
    Finished,
    /// Failed state, something went wrong.
    Failed,
}

/// Description of the xiosource bin.
#[derive(Debug)]
pub struct Description {
    /// The mapping of `MeasurementData` tags to the output sources.
    pub mapping: IndexMap<String, IndexMap<String, String>>,
    /// The receiver from which the job events get fetched.
    pub receiver: Option<Receiver<xio_webapi::JobEvent>>,
    /// Additional fields that get produced on every row.
    pub additional_sources: IndexMap<String, Item>,
}

#[derive(Debug)]
struct XioDataSource {
    tag: String,
    field: String,
}

/// A bin that outputs the values of a XIO job run.
#[derive(Debug)]
pub struct Bin {
    scope: Scope,
    xio_sources: IndexMap<String, XioDataSource>,
    additional_sources: IndexMap<String, Item>,
    receiver: Option<Receiver<xio_webapi::JobEvent>>,
    state: BinState,
}

static BIN_TYPE: &str = "xio_arnalisa";

impl BinDescription for Description {
    type Bin = Bin;

    fn check_validity(
        &self,
        _scope: &Scope,
        _get_calibration: &mut dyn GetCalibration,
    ) -> Result<()> {
        // TODO: implement validity check
        // TODO: check that receiver field names don't overlap with
        // xio source names.
        Ok(())
    }

    fn bin_type(&self) -> &'static str {
        BIN_TYPE
    }
}

impl SourceNames for Description {
    fn source_names(&self) -> Result<IndexSet<String>> {
        Ok(self
            .mapping
            .iter()
            .map(|(_k, v)| v.iter())
            .flatten()
            .map(|(_k, v)| v.to_string())
            .chain(self.additional_sources.keys().map(|k| k.to_string()))
            .collect())
    }
}

impl Description {
    fn build_xio_sources(
        &self,
    ) -> Result<IndexMap<String, XioDataSource>> {
        let mut s = IndexMap::new();

        for (tag, mapping) in self.mapping.iter() {
            for (xio_source, arnalisa_sink) in mapping {
                let entry = s.entry(arnalisa_sink.to_string());
                use indexmap::map::Entry as E;
                match entry {
                    E::Occupied(_) => {
                        error::XioMappingDuplicateSource {
                            mapping: self.mapping.clone(),
                            name: arnalisa_sink.to_string(),
                        }
                        .fail()?;
                    }
                    E::Vacant(v) => {
                        v.insert(XioDataSource {
                            tag: tag.to_string(),
                            field: xio_source.to_string(),
                        });
                    }
                }
            }
        }

        Ok(s)
    }
}

impl SourceOnlyBinDescription for Description {
    fn build_bin(&self, scope: &Scope) -> Result<Self::Bin> {
        // TODO: add required
        Ok(Bin {
            scope: scope.clone(),
            xio_sources: self.build_xio_sources()?,
            additional_sources: self.additional_sources.clone(),
            receiver: self.receiver.clone(),
            state: BinState::Initialized,
        })
    }
}

impl SourceBin for Bin {
    fn get_source_data(&self, source: &SourceId) -> Result<Item> {
        match &self.state {
            BinState::Running { data, .. } => data
                .get(&source.id)
                .context(error::InvalidSourceName {
                    scope: self.scope.clone(),
                    name: source.id.to_string(),
                    bin_type: BIN_TYPE.to_string(),
                })
                .map(|item| item.clone()),
            current_state => error::XioGettingDataWhileNotInRunningState {
                current_state: current_state.clone(),
            }
            .fail(),
        }
    }
}

impl Bin {
    fn build_data(
        &self,
        tags: &IndexMap<String, Vec<String>>,
        tag: String,
        data: Vec<xio_base_datatypes::DataValueDescriptive>,
    ) -> Result<IndexMap<String, Item>> {
        let column_names =
            tags.get(&tag).context(error::XioTagNotAvailable {
                tag: tag.to_string(),
                available: tags
                    .keys()
                    .cloned()
                    .collect::<IndexSet<String>>(),
            })?;
        ensure!(
            data.len() == column_names.len(),
            error::XioColumnCountMismatch {
                tag: tag.to_string(),
                expected: column_names.len(),
                received: data.len()
            }
        );

        let index = column_names
            .iter()
            .zip(data.into_iter())
            .map(|(k, dv)| {
                use xio_base_datatypes::DataValue as DV;
                let v = match dv {
                    DV::Boolean(v) => Item::from(v),
                    DV::UInt8(v) => Item::from(v),
                    DV::UInt16(v) => Item::from(v),
                    DV::UInt32(v) => Item::from(v),
                    DV::UInt64(v) => Item::from(v),
                    DV::Int8(v) => Item::from(v),
                    DV::Int16(v) => Item::from(v),
                    DV::Int32(v) => Item::from(v),
                    DV::Int64(v) => Item::from(v),
                    DV::ParameterMask(_m) => Item::Nothing,
                };
                (k, v)
            })
            .collect::<IndexMap<_, _>>();
        let row = self
            .xio_sources
            .iter()
            .map(|(k, reference)| {
                (
                    k.to_string(),
                    if reference.tag == tag {
                        index
                            .get(&reference.field)
                            .cloned()
                            .unwrap_or_else(|| Item::Nothing)
                    } else {
                        Item::Nothing
                    },
                )
            })
            .chain(
                self.additional_sources
                    .iter()
                    .map(|(k, v)| (k.clone(), v.clone())),
            )
            .collect();
        Ok(row)
    }

    fn process_events(
        &mut self,
        receiver: &Receiver<xio_webapi::JobEvent>,
    ) -> Result<Proceed> {
        for event in receiver.iter() {
            use xio_webapi::JobEvent as E;
            use BinState as S;
            debug!("Received event {:?} in state {:?}", event, self.state);
            match (self.state.clone(), event) {
                (S::Initialized, E::Started { tags, .. }) => {
                    self.state = S::WaitingForData { tags };
                }
                (
                    S::WaitingForData { tags },
                    E::Data { tag, values, .. },
                ) => {
                    let data = self.build_data(&tags, tag, values)?;
                    self.state = S::Running { tags, data };
                    return Ok(Proceed::Continue);
                }
                (S::WaitingForData { .. }, E::Position { .. }) => {}
                (S::Running { tags, .. }, E::Data { tag, values, .. }) => {
                    let data = self.build_data(&tags, tag, values)?;
                    self.state = S::Running { tags, data };
                    return Ok(Proceed::Continue);
                }
                (S::Running { .. }, E::Position { .. }) => {}
                (S::Running { .. }, E::Stopped { .. }) => {
                    self.state = S::Finished;
                    return Ok(Proceed::Stop);
                }
                (state, event) => {
                    return error::XioReceivedUnexpectedEventInState {
                        state: state.clone(),
                        event,
                    }
                    .fail();
                }
            }
        }
        Ok(Proceed::Stop)
    }
}

impl SourceOnlyBin for Bin {
    fn fetch_next(&mut self, _iteration: &Iteration) -> Result<Proceed> {
        let proceed = if let Some(receiver) = self.receiver.clone() {
            self.process_events(&receiver)
        } else {
            Ok(Proceed::Stop)
        };

        match proceed {
            Ok(Proceed::Stop) | Err(_) => {
                self.receiver = None;
            }
            _ => {}
        }

        proceed
    }
}