use crate::bins::{
BinDescription, Iteration, SourceBin, SourceId, SourceNames,
SourceOnlyBin, SourceOnlyBinDescription,
};
use crate::{error, GetCalibration, Item, Proceed, Result, Scope};
use crossbeam_channel::Receiver;
use indexmap::{IndexMap, IndexSet};
use snafu::ensure;
use snafu::OptionExt;
#[derive(Clone, Debug)]
pub enum BinState {
Initialized,
WaitingForData {
tags: IndexMap<String, Vec<String>>,
},
Running {
tags: IndexMap<String, Vec<String>>,
data: IndexMap<String, Item>,
},
Finished,
Failed,
}
#[derive(Debug)]
pub struct Description {
pub mapping: IndexMap<String, IndexMap<String, String>>,
pub receiver: Receiver<xio_webapi::JobEvent>,
}
#[derive(Debug)]
struct XioDataSource {
tag: String,
field: String,
}
#[derive(Debug)]
pub struct Bin {
scope: Scope,
xio_sources: IndexMap<String, XioDataSource>,
receiver: Receiver<xio_webapi::JobEvent>,
state: BinState,
}
static BIN_TYPE: &str = "xio_arnalisa";
impl BinDescription for Description {
type Bin = Bin;
fn check_validity(
&self,
_scope: &Scope,
_get_calibration: &mut dyn GetCalibration,
) -> Result<()> {
Ok(())
}
fn bin_type(&self) -> &'static str {
BIN_TYPE
}
}
impl SourceNames for Description {
fn source_names(&self) -> Result<IndexSet<String>> {
Ok(self
.mapping
.iter()
.map(|(_k, v)| v.iter())
.flatten()
.map(|(_k, v)| v.to_string())
.collect())
}
}
impl Description {
fn build_xio_sources(
&self,
) -> Result<IndexMap<String, XioDataSource>> {
let mut s = IndexMap::new();
for (tag, mapping) in self.mapping.iter() {
for (xio_source, arnalisa_sink) in mapping {
let entry = s.entry(arnalisa_sink.to_string());
use indexmap::map::Entry as E;
match entry {
E::Occupied(_) => {
error::XioMappingDuplicateSource {
mapping: self.mapping.clone(),
name: arnalisa_sink.to_string(),
}
.fail()?;
}
E::Vacant(v) => {
v.insert(XioDataSource {
tag: tag.to_string(),
field: xio_source.to_string(),
});
}
}
}
}
Ok(s)
}
}
impl SourceOnlyBinDescription for Description {
fn build_bin(&self, scope: &Scope) -> Result<Self::Bin> {
Ok(Bin {
scope: scope.clone(),
xio_sources: self.build_xio_sources()?,
receiver: self.receiver.clone(),
state: BinState::Initialized,
})
}
}
impl SourceBin for Bin {
fn get_source_data(&self, source: &SourceId) -> Result<Item> {
match &self.state {
BinState::Running { data, .. } => data
.get(&source.id)
.context(error::InvalidSourceName {
scope: self.scope.clone(),
name: source.id.to_string(),
bin_type: BIN_TYPE.to_string(),
})
.map(|item| item.clone()),
current_state => error::XioGettingDataWhileNotInRunningState {
current_state: current_state.clone(),
}
.fail(),
}
}
}
impl Bin {
fn build_data(
&self,
tags: &IndexMap<String, Vec<String>>,
tag: String,
data: Vec<xio_base_datatypes::DataValueDescriptive>,
) -> Result<IndexMap<String, Item>> {
let column_names =
tags.get(&tag).context(error::XioTagNotAvailable {
tag: tag.to_string(),
available: tags
.keys()
.cloned()
.collect::<IndexSet<String>>(),
})?;
ensure!(
data.len() == column_names.len(),
error::XioColumnCountMismatch {
tag: tag.to_string(),
expected: column_names.len(),
received: data.len()
}
);
let index = column_names
.iter()
.zip(data.into_iter())
.map(|(k, dv)| {
use xio_base_datatypes::DataValue as DV;
let v = match dv {
DV::Boolean(v) => Item::from(v),
DV::UInt8(v) => Item::from(v),
DV::UInt16(v) => Item::from(v),
DV::UInt32(v) => Item::from(v),
DV::UInt64(v) => Item::from(v),
DV::Int8(v) => Item::from(v),
DV::Int16(v) => Item::from(v),
DV::Int32(v) => Item::from(v),
DV::Int64(v) => Item::from(v),
DV::ParameterMask(_m) => Item::Nothing,
};
(k, v)
})
.collect::<IndexMap<_, _>>();
let row = self
.xio_sources
.iter()
.map(|(k, reference)| {
(
k.to_string(),
if reference.tag == tag {
index
.get(&reference.field)
.cloned()
.unwrap_or_else(|| Item::Nothing)
} else {
Item::Nothing
},
)
})
.collect();
Ok(row)
}
}
impl SourceOnlyBin for Bin {
fn fetch_next(&mut self, _iteration: &Iteration) -> Result<Proceed> {
for event in self.receiver.iter() {
use xio_webapi::JobEvent as E;
use BinState as S;
match (self.state.clone(), event) {
(S::Initialized, E::Started { tags, .. }) => {
self.state = S::WaitingForData { tags };
}
(
S::WaitingForData { tags },
E::Data { tag, values, .. },
) => {
let data = self.build_data(&tags, tag, values)?;
self.state = S::Running { tags, data };
return Ok(Proceed::Continue);
}
(S::WaitingForData { .. }, E::Position { .. }) => {}
(S::Running { tags, .. }, E::Data { tag, values, .. }) => {
let data = self.build_data(&tags, tag, values)?;
self.state = S::Running { tags, data };
return Ok(Proceed::Continue);
}
(S::Running { .. }, E::Position { .. }) => {}
(S::Running { .. }, E::Stopped { .. }) => {
self.state = S::Finished;
return Ok(Proceed::Stop);
}
(state, event) => {
return error::XioReceivedUnexpectedEventInState {
state: state.clone(),
event,
}
.fail();
}
}
}
Ok(Proceed::Stop)
}
}