use std::collections::VecDeque;
use std::fmt::Display;
use std::sync::Arc;
use crate::block::{BlockStructure, OperatorReceiver, OperatorStructure, Replication};
use crate::operator::iteration::IterationStateLock;
use crate::operator::start::{BinaryElement, BinaryStartOperator, Start};
use crate::operator::{ExchangeData, Operator, StreamElement};
use crate::scheduler::{BlockId, ExecutionMetadata};
use super::source::Source;
#[derive(Clone)]
pub struct Zip<Out1: ExchangeData, Out2: ExchangeData> {
prev: BinaryStartOperator<Out1, Out2>,
stash1: VecDeque<StreamElement<Out1>>,
stash2: VecDeque<StreamElement<Out2>>,
prev_block_id1: BlockId,
prev_block_id2: BlockId,
}
impl<Out1: ExchangeData, Out2: ExchangeData> Display for Zip<Out1, Out2> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Zip[{}, {}]",
std::any::type_name::<Out1>(),
std::any::type_name::<Out2>()
)
}
}
impl<Out1: ExchangeData, Out2: ExchangeData> Zip<Out1, Out2> {
pub(super) fn new(
prev_block_id1: BlockId,
prev_block_id2: BlockId,
left_cache: bool,
right_cache: bool,
state_lock: Option<Arc<IterationStateLock>>,
) -> Self {
Self {
prev: Start::multiple(
prev_block_id1,
prev_block_id2,
left_cache,
right_cache,
state_lock,
),
stash1: Default::default(),
stash2: Default::default(),
prev_block_id1,
prev_block_id2,
}
}
}
impl<Out1: ExchangeData, Out2: ExchangeData> Operator for Zip<Out1, Out2> {
type Out = (Out1, Out2);
fn setup(&mut self, metadata: &mut ExecutionMetadata) {
self.prev.setup(metadata);
}
#[inline]
fn next(&mut self) -> StreamElement<(Out1, Out2)> {
while self.stash1.is_empty() || self.stash2.is_empty() {
let item = self.prev.next();
match item {
StreamElement::Item(BinaryElement::Left(left)) => {
self.stash1.push_back(StreamElement::Item(left))
}
StreamElement::Timestamped(BinaryElement::Left(left), ts) => {
self.stash1.push_back(StreamElement::Timestamped(left, ts))
}
StreamElement::Item(BinaryElement::Right(right)) => {
self.stash2.push_back(StreamElement::Item(right))
}
StreamElement::Timestamped(BinaryElement::Right(right), ts) => {
self.stash2.push_back(StreamElement::Timestamped(right, ts))
}
StreamElement::Item(_) | StreamElement::Timestamped(_, _) => continue,
StreamElement::Watermark(_) => return item.map(|_| unreachable!()),
StreamElement::FlushAndRestart => {
self.stash1.clear();
self.stash2.clear();
return item.map(|_| unreachable!());
}
StreamElement::FlushBatch | StreamElement::Terminate => {
return item.map(|_| unreachable!())
}
}
}
let item1 = self.stash1.pop_front().unwrap();
let item2 = self.stash2.pop_front().unwrap();
match (item1, item2) {
(StreamElement::Item(item1), StreamElement::Item(item2)) => {
StreamElement::Item((item1, item2))
}
(StreamElement::Timestamped(item1, ts1), StreamElement::Timestamped(item2, ts2)) => {
StreamElement::Timestamped((item1, item2), ts1.max(ts2))
}
_ => panic!("Unsupported mixing of timestamped and non-timestamped items"),
}
}
fn structure(&self) -> BlockStructure {
let mut operator = OperatorStructure::new::<(Out1, Out2), _>("Zip");
operator
.receivers
.push(OperatorReceiver::new::<Out1>(self.prev_block_id1));
operator
.receivers
.push(OperatorReceiver::new::<Out2>(self.prev_block_id2));
BlockStructure::default().add_operator(operator)
}
}
impl<Out1: ExchangeData, Out2: ExchangeData> Source for Zip<Out1, Out2> {
fn replication(&self) -> Replication {
Replication::Unlimited
}
}
#[cfg(test)]
mod tests {
use crate::network::{Coord, NetworkMessage, NetworkSender};
use crate::operator::zip::Zip;
use crate::operator::{Operator, StreamElement};
use crate::test::FakeNetworkTopology;
#[test]
fn zip() {
let mut t = FakeNetworkTopology::new(2, 1);
let (coord_l, sender_l) = t.senders_mut()[0].pop().unwrap();
let (coord_r, sender_r) = t.senders_mut()[1].pop().unwrap();
let mut zip = Zip::<i32, i32>::new(coord_l.block_id, coord_r.block_id, false, false, None);
zip.setup(&mut t.metadata());
let send = |sender: &NetworkSender<i32>, from: Coord, data: Vec<StreamElement<i32>>| {
sender.send(NetworkMessage::new_batch(data, from)).unwrap();
};
send(
&sender_l,
coord_l,
vec![StreamElement::Item(1), StreamElement::Item(2)],
);
send(&sender_r, coord_r, vec![StreamElement::Item(100)]);
assert_eq!(zip.next(), StreamElement::Item((1, 100)));
send(&sender_l, coord_l, vec![StreamElement::FlushAndRestart]);
send(&sender_r, coord_r, vec![StreamElement::FlushAndRestart]);
assert_eq!(zip.next(), StreamElement::FlushAndRestart);
send(&sender_l, coord_l, vec![StreamElement::Item(3)]);
send(&sender_r, coord_r, vec![StreamElement::Item(300)]);
assert_eq!(zip.next(), StreamElement::Item((3, 300)));
}
}