1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
extern crate serde_json; use super::{ BinBuildEnvironment, BinDescription, Calculator, FetchItem, GetCalibration, Item, Iteration, Result, Scope, SinkBin, SinkNames, SinkOnlyBin, SinkOnlyBinDescription, }; use crate::error; use crossbeam_channel::Sender; use indexmap::{IndexMap, IndexSet}; static BIN_TYPE: &'static str = "channelsink"; pub struct Bin { scope: Scope, sender: Sender<IndexMap<String, Item>>, sources: IndexMap<String, Box<FetchItem>>, } impl SinkBin for Bin {} impl Calculator for Bin { fn calculate(&mut self, _iteration: &Iteration) -> Result<()> { let items = self .sources .iter() .map(|(s, ds)| { ds.fetch_item(&self.scope).map(|d| (s.to_string(), d)) }) .collect::<Result<IndexMap<String, Item>>>()?; if self.sender.send(items).is_err() { error::ChannelSendFailed.fail()?; } Ok(()) } } impl SinkOnlyBin for Bin {} #[derive(Debug)] pub struct Description { pub sinks: IndexSet<String>, pub sender: Sender<IndexMap<String, Item>>, } impl BinDescription for Description { type Bin = Bin; fn check_validity( &self, _scope: &Scope, _get_calibration: &mut GetCalibration, ) -> Result<()> { Ok(()) } fn bin_type(&self) -> &'static str { BIN_TYPE } } impl SinkNames for Description { fn sink_names(&self) -> IndexSet<String> { self.sinks.clone() } } impl SinkOnlyBinDescription for Description { fn build_bin( &self, scope: &Scope, env: &mut BinBuildEnvironment, ) -> Result<Self::Bin> { Ok(Bin { scope: scope.clone(), sender: self.sender.clone(), sources: self .sinks .iter() .map(|s| env.resolve(s).map(|ds| (s.to_string(), ds))) .collect::<Result<IndexMap<_, _>>>()?, }) } }