arnalisa 0.6.8

Pipeline system for calculating values
Documentation
//! A bin that reads it's output values from a JSONL file.
//!
//! ```text
//!   ┌───[jsonlsource]───┐
//!   │         <output 0>│⇒
//!   │         <output 1>│⇒
//!   │         <output 2>│⇒
//!   ┊         …         ┊⇒
//!   │         <output n>│⇒
//!   └───────────────────┘
//! ```

use serde_json;

use super::{
    BinDescription, GetCalibration, Item, Iteration, Proceed, Result,
    Scope, SourceBin, SourceId, SourceNames, SourceOnlyBin,
    SourceOnlyBinDescription,
};
use crate::error;
use indexmap::IndexSet;
use snafu::{ensure, OptionExt, ResultExt};
use std::fs::File;
use std::io::{BufRead, BufReader};

static BIN_TYPE: &str = "jsonlsource";

/// A bin that reads it's output values from a JSONL file.
#[derive(Debug)]
pub struct Bin {
    scope: Scope,
    file_path: String,
    lines: std::io::Lines<BufReader<File>>,
    header: Vec<String>,

    outputs: Vec<Item>,
}

impl SourceOnlyBin for Bin {
    fn fetch_next(&mut self, iteration: &Iteration) -> Result<Proceed> {
        if let Some(line) = self.lines.next() {
            let items: Vec<Item> =
                serde_json::from_str(&line.context(error::Io)?)
                    .context(error::SerdeJson)?;

            ensure!(
                items.len() == self.header.len(),
                error::InvalidJsonLItemCount {
                    path: self.file_path.clone(),
                    line: iteration.index,
                    required: self.header.len(),
                    found: items.len(),
                }
            );

            self.outputs.iter_mut().zip(items.into_iter()).for_each(
                |(output, item)| {
                    *output = item;
                },
            );

            Ok(Proceed::Continue)
        } else {
            Ok(Proceed::Stop)
        }
    }
}

impl SourceBin for Bin {
    fn get_source_data(&self, source: &SourceId) -> Result<Item> {
        let index = self.header.iter().position(|s| s == &source.id);

        match index {
            Some(index) => Ok(self.outputs.get(index).cloned().unwrap()),
            None => error::InvalidSourceName {
                scope: self.scope.clone(),
                name: source.id.to_string(),
                bin_type: BIN_TYPE.to_string(),
            }
            .fail(),
        }
    }
}

/// Description for the jsonl source bin.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Description {
    /// The path of the input file.
    pub file_path: String,
}

impl Description {
    fn open_file_and_load_header(
        &self,
    ) -> Result<(Vec<String>, std::io::Lines<BufReader<File>>)> {
        let file = File::open(&self.file_path).context(error::Io)?;
        let reader = BufReader::new(file);
        let mut lines = reader.lines();
        let first_line = lines
            .next()
            .context(error::InvalidJsonL {
                path: self.file_path.to_string(),
                line: 1usize,
            })?
            .context(error::Io)?;
        Ok((
            serde_json::from_str::<Vec<String>>(&first_line)
                .context(error::SerdeJson)?,
            lines,
        ))
    }
}

impl BinDescription for Description {
    type Bin = Bin;

    fn check_validity(
        &self,
        _scope: &Scope,
        _get_calibration: &mut dyn GetCalibration,
    ) -> Result<()> {
        Ok(())
    }

    fn bin_type(&self) -> &'static str {
        BIN_TYPE
    }
}

impl SourceNames for Description {
    fn source_names(&self) -> Result<IndexSet<String>> {
        Ok(self
            .open_file_and_load_header()?
            .0
            .into_iter()
            .collect::<_>())
    }
}

impl SourceOnlyBinDescription for Description {
    fn build_bin(&self, scope: &Scope) -> Result<Self::Bin> {
        let (header, lines) = self.open_file_and_load_header()?;

        let outputs = header.iter().map(|_| Item::Nothing).collect();

        Ok(Bin {
            scope: scope.clone(),
            file_path: self.file_path.to_string(),
            lines,
            header,
            outputs,
        })
    }
}