use super::{
write_dot_bin, BinBuildEnvironment, BinDescription, Calculator,
DataSource, FetchItem, GetCalibration, Item, Iteration, Result, Scope,
SinkBin, SinkNames, SourceBin, SourceId, SourceNames, SourceReference,
SourceSinkBin, SourceSinkBinDescription, SourceSinkBinProcessor,
WriteDot,
};
use crate::bins::CheckContains;
use crate::error;
use indexmap::{IndexMap, IndexSet};
use snafu::{ensure, OptionExt, ResultExt};
use std::io::Write;
use std::rc::Rc;
use std::sync::RwLock;
static BIN_TYPE: &str = "pipeline";
#[derive(Debug)]
pub struct Bin {
scope: Scope,
input_sources: IndexMap<String, Box<dyn FetchItem>>,
output_references: IndexMap<String, SourceReference>,
lookup: IndexMap<String, usize>,
bins: Vec<Rc<RwLock<Box<dyn SourceSinkBin>>>>,
}
impl Bin {
fn bin(&self, id: &str) -> Result<Rc<RwLock<Box<dyn SourceSinkBin>>>> {
let index = *self.lookup.get(&id.to_string()).context(
error::BinNotPresent {
scope: self.scope.clone(),
bin: id.to_string(),
},
)?;
Ok(Rc::clone(self.bins.get(index).context(
error::BinNotPresent {
scope: self.scope.clone(),
bin: id.to_string(),
},
)?))
}
}
impl SinkBin for Bin {}
impl SourceBin for Bin {
fn get_source_data(&self, source: &SourceId) -> Result<Item> {
let reference = self.output_references.get(&source.id).context(
error::InvalidSourceName {
scope: self.scope.clone(),
name: source.id.to_string(),
bin_type: BIN_TYPE.to_string(),
},
)?;
if let Some(ref bin_id) = reference.bin {
let bin_rwlock = self.bin(&bin_id)?;
let bin = bin_rwlock.write().unwrap();
Ok(bin.get_source_data(&SourceId {
id: reference.source.clone(),
})?)
} else {
let data_source = self
.input_sources
.get(&reference.source)
.context(error::InvalidSinkName {
name: reference.source.to_string(),
})?;
data_source.fetch_item(&self.scope)
}
}
}
impl Calculator for Bin {
fn calculate(&mut self, iteration: &Iteration) -> Result<()> {
for bin in &mut self.bins {
bin.write().unwrap().calculate(iteration)?;
}
Ok(())
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Description {
pub pipes: IndexMap<String, IndexMap<String, SourceReference>>,
pub bins: IndexMap<String, super::SourceSinkDescription>,
pub inputs: IndexSet<String>,
}
type SourceSinkBinLockedVec = Vec<Rc<RwLock<Box<dyn SourceSinkBin>>>>;
impl Description {
fn check_bin_and_pipe_ids(
scope: Scope,
bins: &IndexSet<String>,
pipes: &IndexSet<String>,
) -> Result<()> {
ensure!(
*bins == *pipes,
error::BinNamesMismatchInPipeline {
scope,
bins: bins.clone(),
pipes: pipes.clone(),
}
);
Ok(())
}
fn build_embedded_bins(
&self,
scope: &Scope,
env: &mut dyn BinBuildEnvironment,
) -> Result<(IndexMap<String, usize>, SourceSinkBinLockedVec)> {
let mut lookup = IndexMap::new();
let mut bins: Vec<Rc<RwLock<Box<dyn SourceSinkBin>>>> = Vec::new();
for (index, key) in self
.pipes
.clone()
.sorted_by_pipe_connections(scope)?
.into_iter()
.enumerate()
{
let bin = {
let scope = scope.clone();
use crate::{CalibrationSource, R64};
struct Env<'a> {
parent: &'a mut dyn BinBuildEnvironment,
pipes: &'a IndexMap<
String,
IndexMap<String, SourceReference>,
>,
key: &'a str,
lookup: &'a IndexMap<String, usize>,
bins: &'a Vec<Rc<RwLock<Box<dyn SourceSinkBin>>>>,
scope: Scope,
}
impl<'a> GetCalibration for Env<'a> {
fn calibration(
&mut self,
source: &CalibrationSource,
) -> Result<IndexMap<R64, R64>> {
self.parent.calibration(source)
}
}
impl<'a> BinBuildEnvironment for Env<'a> {
fn resolve(
&mut self,
id: &str,
) -> Result<Box<dyn FetchItem>> {
let pipe = {
let source_id = id.to_string();
self.pipes[self.key].get(&source_id).context(
error::MissingSourceName {
scope: self.scope.clone(),
name: source_id,
bin_type: BIN_TYPE.to_string(),
},
)?
};
match *pipe {
SourceReference {
bin: Some(ref bin_id),
ref source,
} => {
if let Some(i) = self.lookup.get(bin_id) {
let i: usize = *i;
if let Some(bin) = self.bins.get(i) {
Ok(Box::new(DataSource {
data_provider: Rc::downgrade(
bin,
),
source: SourceId {
id: source.to_string(),
},
}))
} else {
error::BinNotPresent {
scope: self.scope.clone(),
bin: bin_id.to_string(),
}
.fail()
}
} else {
error::BinNotPresent {
scope: self.scope.clone(),
bin: bin_id.to_string(),
}
.fail()
}
}
SourceReference {
bin: None,
ref source,
} => self.parent.resolve(source),
}
}
}
let mut local_env = Env {
parent: env,
scope: scope.clone(),
bins: &bins,
lookup: &lookup,
key: &key,
pipes: &self.pipes,
};
self.bins[&key]
.build_bin(&scope.enter(&key), &mut local_env)?
};
lookup.insert(key, index);
bins.push(Rc::new(RwLock::new(Box::new(
SourceSinkBinProcessor::new(bin),
))));
}
Ok((lookup, bins))
}
}
trait HasReferencesTo {
fn has_references_to(&self, items: &IndexSet<String>) -> bool;
}
impl HasReferencesTo for IndexMap<String, SourceReference> {
fn has_references_to(&self, items: &IndexSet<String>) -> bool {
for reference in self.values() {
if let Some(ref bin) = reference.bin {
if items.contains(bin) {
return true;
}
}
}
false
}
}
trait SortedByPipeConnections {
fn fetch_first_bin_row(&self) -> IndexSet<String>;
fn sorted_by_pipe_connections(
self,
scope: &Scope,
) -> Result<Vec<String>>;
}
impl SortedByPipeConnections
for IndexMap<String, IndexMap<String, SourceReference>>
{
fn fetch_first_bin_row(&self) -> IndexSet<String> {
let keys: IndexSet<String> =
self.keys().map(|s| s.to_string()).collect();
let mut res: IndexSet<String> = IndexSet::new();
for (k, v) in self {
if !v.has_references_to(&keys) {
res.insert(k.to_string());
}
}
res
}
fn sorted_by_pipe_connections(
mut self,
scope: &Scope,
) -> Result<Vec<String>> {
let mut res = Vec::new();
self.swap_remove("");
while !self.is_empty() {
let items = self.fetch_first_bin_row();
ensure!(
!items.is_empty(),
error::CircularPipeReference {
scope: scope.clone(),
}
);
for item in &items {
self.swap_remove(&item.to_string());
}
res.extend_from_slice(&items.into_iter().collect::<Vec<_>>());
}
Ok(res)
}
}
impl BinDescription for Description {
type Bin = Bin;
fn check_validity(
&self,
scope: &Scope,
get_calibration: &mut dyn GetCalibration,
) -> Result<()> {
let bin_ids: IndexSet<String> = self
.bins
.keys()
.filter_map(|s| {
if s.is_empty() {
None
} else {
Some(s.to_string())
}
})
.collect();
let pipe_ids: IndexSet<String> = self
.pipes
.keys()
.filter_map(|s| {
if s.is_empty() {
None
} else {
Some(s.to_string())
}
})
.collect();
Self::check_bin_and_pipe_ids(scope.clone(), &bin_ids, &pipe_ids)?;
for (name, bin) in &self.bins {
bin.check_validity(&scope.enter(name), get_calibration)?;
}
for (id, bin, pipes) in
bin_ids.iter().map(|s| (s, &self.bins[s], &self.pipes[s]))
{
bin.check_sink_names(
&scope.enter(id),
&pipes.keys().map(|s| s.to_string()).collect(),
)?;
}
for (bin_id, bin_source_pipes) in &self.pipes {
for (_pipe_id, pipe) in bin_source_pipes.iter() {
if let Some(ref source_bin_id) = pipe.bin {
let bin = &self.bins[source_bin_id];
bin.source_names()?.check_contains(
&scope.enter(bin_id),
bin.bin_type(),
&pipe.source,
)?;
} else if !bin_id.is_empty() {
self.inputs.check_contains(
&scope.enter(bin_id),
BIN_TYPE,
&pipe.source,
)?;
}
}
}
Ok(())
}
fn bin_type(&self) -> &'static str {
BIN_TYPE
}
}
impl WriteDot for Description {
fn write_dot<W: Write>(&self, w: &mut W, name: &str) -> Result<()> {
writeln!(w, "digraph {} {{", name).context(error::Io)?;
writeln!(w, "node [shape=plaintext fontsize=8 fontname=sans]")
.context(error::Io)?;
writeln!(w, "rankdir=LR").context(error::Io)?;
write_dot_bin(
w,
"in",
"incoming",
&IndexSet::new(),
&self.sink_names(),
)?;
write_dot_bin(
w,
"out",
"outgoing",
&self.source_names()?,
&IndexSet::new(),
)?;
for (ref id, ref bin) in &self.bins {
bin.write_dot(w, id)?;
}
for (ref sink_bin_id, ref sources) in &self.pipes {
let sink_bin_id = if sink_bin_id == &"" {
"out".to_string()
} else {
sink_bin_id.to_string()
};
for (ref sink, ref source) in sources.iter() {
let &SourceReference {
ref bin,
ref source,
} = *source;
let source_bin_id = if let Some(ref bin) = *bin {
bin.to_string()
} else {
"in".to_string()
};
writeln!(
w,
"{}:{}_source -> {}:{}_sink",
source_bin_id, source, sink_bin_id, sink
)
.context(error::Io)?;
}
}
writeln!(w, "}}").context(error::Io)?;
Ok(())
}
}
impl SinkNames for Description {
fn sink_names(&self) -> IndexSet<String> {
self.inputs.clone()
}
}
impl SourceNames for Description {
fn source_names(&self) -> Result<IndexSet<String>> {
match self.pipes.get("") {
Some(p) => Ok(p.keys().map(|s| s.to_string()).collect()),
None => Ok(IndexSet::new()),
}
}
}
impl SourceSinkBinDescription for Description {
fn build_bin(
&self,
scope: &Scope,
env: &mut dyn BinBuildEnvironment,
) -> Result<Self::Bin> {
let (lookup, bins) = self.build_embedded_bins(scope, env)?;
let output_references =
self.pipes.get("").cloned().unwrap_or_else(IndexMap::new);
let mut input_sources = IndexMap::new();
for (k, v) in &output_references {
if v.bin.is_none() {
input_sources
.insert(k.to_string(), env.resolve(&v.source)?);
}
}
Ok(Bin {
scope: scope.clone(),
input_sources,
output_references,
lookup,
bins,
})
}
}