1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
use crate::chopper::chopper::{DataSink, MergeHeaderSink}; use crate::chopper::header_graph::{NumOfHeaderToProcess, PinId}; use crate::chopper::types::{Header, Row}; use crate::error::{CliResult, Error}; pub struct MergeJoin { input_pin_num: usize, header: Option<Header>, } impl MergeJoin { pub fn new(input_pin_num: usize) -> CliResult<Box<dyn MergeHeaderSink>> { if input_pin_num <= 0 { return Err(Error::from( "MergeJoin -- number of inputs must be at least 1", )); } let merge = MergeJoin { input_pin_num, header: None, }; Ok(Box::new(merge) as Box<dyn MergeHeaderSink>) } fn add_header(&mut self, header: &Header) { self.header = Some(header.clone()); } } impl MergeHeaderSink for MergeJoin { fn check_header(&mut self, _pin_id: PinId, header: &Header) -> CliResult<()> { match &self.header { Some(h) => { if !header.eq(h) { return Err(Error::from("MuxHeaderSink -- wrong header")); } } None => self.add_header(header), } Ok(()) } fn process_header(&mut self) -> Header { self.header.take().unwrap() } fn get_data_sink(self: Box<Self>) -> CliResult<Box<dyn DataSink>> { if self.header.is_some() { return Err(Error::from( "MuxHeaderSink -- all the headers must be processed before returning DataSink", )); } Ok(self.boxed()) } fn pin_num(&self) -> usize { self.input_pin_num } fn num_of_header_to_process(&self) -> NumOfHeaderToProcess { NumOfHeaderToProcess { counter: self.pin_num(), } } } impl DataSink for MergeJoin { fn write_row_to_pin(&mut self, _pin_id: PinId, row: Row) -> CliResult<Option<Row>> { self.write_row(row) } fn flush(&mut self) -> CliResult<()> { Ok(()) } fn boxed(self) -> Box<dyn DataSink> { Box::new(self) } }