use std::fmt::{Debug, Display};
use std::sync::Arc;
use std::time::Duration;
pub(crate) use binary::*;
pub(crate) use simple::*;
#[cfg(feature = "timestamp")]
use super::Timestamp;
use crate::block::{BlockStructure, Replication};
use crate::channel::RecvTimeoutError;
use crate::network::{Coord, NetworkDataIterator, NetworkMessage};
use crate::operator::iteration::IterationStateLock;
use crate::operator::source::Source;
use crate::operator::start::watermark_frontier::WatermarkFrontier;
use crate::operator::{ExchangeData, Operator, StreamElement};
use crate::scheduler::{BlockId, ExecutionMetadata};
mod binary;
mod simple;
mod watermark_frontier;
pub(crate) trait StartReceiver: Clone {
type Out;
fn setup(&mut self, metadata: &mut ExecutionMetadata);
fn prev_replicas(&self) -> Vec<Coord>;
fn cached_replicas(&self) -> usize;
fn recv_timeout(
&mut self,
timeout: Duration,
) -> Result<NetworkMessage<Self::Out>, RecvTimeoutError>;
fn recv(&mut self) -> NetworkMessage<Self::Out>;
fn structure(&self) -> BlockStructure;
}
pub(crate) type BinaryStartOperator<OutL, OutR> = Start<BinaryStartReceiver<OutL, OutR>>;
pub(crate) type SimpleStartOperator<Out> = Start<SimpleStartReceiver<Out>>;
#[derive(Debug)]
pub(crate) struct Start<Receiver: StartReceiver + Send> {
max_delay: Option<Duration>,
coord: Option<Coord>,
receiver: Receiver,
batch_iter: Option<(Coord, NetworkDataIterator<StreamElement<Receiver::Out>>)>,
missing_terminate: usize,
missing_flush_and_restart: usize,
num_previous_replicas: usize,
already_timed_out: bool,
watermark_frontier: WatermarkFrontier,
wait_for_state: bool,
state_lock: Option<Arc<IterationStateLock>>,
state_generation: usize,
}
impl<Receiver: StartReceiver + Send> Clone for Start<Receiver> {
fn clone(&self) -> Self {
Self {
max_delay: self.max_delay,
coord: self.coord,
receiver: self.receiver.clone(),
batch_iter: Default::default(),
missing_terminate: self.missing_terminate,
missing_flush_and_restart: self.missing_flush_and_restart,
num_previous_replicas: self.num_previous_replicas,
already_timed_out: self.already_timed_out,
watermark_frontier: self.watermark_frontier.clone(),
wait_for_state: self.wait_for_state,
state_lock: self.state_lock.clone(),
state_generation: self.state_generation,
}
}
}
impl<Receiver: StartReceiver + Send> Display for Start<Receiver> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[{}]", std::any::type_name::<Receiver::Out>())
}
}
impl<Out: ExchangeData> Start<SimpleStartReceiver<Out>> {
pub(crate) fn single(
previous_block_id: BlockId,
state_lock: Option<Arc<IterationStateLock>>,
) -> SimpleStartOperator<Out> {
Start::new(SimpleStartReceiver::new(previous_block_id), state_lock)
}
}
impl<OutL: ExchangeData, OutR: ExchangeData> Start<BinaryStartReceiver<OutL, OutR>> {
pub(crate) fn multiple(
previous_block_id1: BlockId,
previous_block_id2: BlockId,
left_cache: bool,
right_cache: bool,
state_lock: Option<Arc<IterationStateLock>>,
) -> BinaryStartOperator<OutL, OutR> {
Start::new(
BinaryStartReceiver::new(
previous_block_id1,
previous_block_id2,
left_cache,
right_cache,
),
state_lock,
)
}
}
impl<Receiver: StartReceiver + Send> Start<Receiver> {
fn new(receiver: Receiver, state_lock: Option<Arc<IterationStateLock>>) -> Self {
Self {
coord: Default::default(),
max_delay: Default::default(),
receiver,
batch_iter: None,
missing_terminate: Default::default(),
missing_flush_and_restart: Default::default(),
num_previous_replicas: 0,
already_timed_out: Default::default(),
watermark_frontier: Default::default(),
wait_for_state: Default::default(),
state_lock,
state_generation: Default::default(),
}
}
pub(crate) fn receiver(&self) -> &Receiver {
&self.receiver
}
}
impl<Receiver> Operator for Start<Receiver>
where
Receiver: StartReceiver + Send,
Receiver::Out: ExchangeData,
{
type Out = Receiver::Out;
fn setup(&mut self, metadata: &mut ExecutionMetadata) {
self.receiver.setup(metadata);
let prev_replicas = self.receiver.prev_replicas();
self.num_previous_replicas = prev_replicas.len();
self.missing_terminate = self.num_previous_replicas;
self.missing_flush_and_restart = self.num_previous_replicas;
self.watermark_frontier = WatermarkFrontier::new(prev_replicas);
log::trace!(
"{} initialized <{}>",
metadata.coord,
std::any::type_name::<Receiver::Out>()
);
self.coord = Some(metadata.coord);
self.max_delay = metadata.batch_mode.max_delay();
}
fn next(&mut self) -> StreamElement<Receiver::Out> {
let coord = self.coord.unwrap();
loop {
if self.missing_terminate == 0 {
log::trace!("{} ended", coord);
return StreamElement::Terminate;
}
if self.missing_flush_and_restart == 0 {
log::trace!("{} flush_restart", coord);
self.missing_flush_and_restart = self.num_previous_replicas;
self.watermark_frontier.reset();
self.wait_for_state = true;
self.state_generation += 2;
return StreamElement::FlushAndRestart;
}
if let Some((sender, ref mut inner)) = self.batch_iter {
let msg = match inner.next() {
None => {
self.batch_iter = None;
continue;
}
Some(item) => {
match item {
StreamElement::Watermark(ts) => {
match self.watermark_frontier.update(sender, ts) {
Some(ts) => StreamElement::Watermark(ts), None => continue,
}
}
StreamElement::FlushAndRestart => {
#[cfg(feature = "timestamp")]
{
self.watermark_frontier.update(sender, Timestamp::MAX);
}
self.missing_flush_and_restart -= 1;
continue;
}
StreamElement::Terminate => {
self.missing_terminate -= 1;
log::trace!(
"{} received terminate, {} left",
coord,
self.missing_terminate
);
continue;
}
_ => item,
}
}
};
if self.wait_for_state {
if let Some(lock) = self.state_lock.as_ref() {
lock.wait_for_update(self.state_generation);
}
self.wait_for_state = false;
}
return msg;
}
let net_msg = match (self.already_timed_out, self.max_delay) {
(false, Some(max_delay)) => {
match self.receiver.recv_timeout(max_delay) {
Ok(net_msg) => net_msg,
Err(_) => {
self.already_timed_out = true;
NetworkMessage::new_single(
StreamElement::FlushBatch,
Default::default(),
)
}
}
}
_ => {
self.already_timed_out = false;
self.receiver.recv()
}
};
self.batch_iter = Some((net_msg.sender(), net_msg.into_iter()));
}
}
fn structure(&self) -> BlockStructure {
self.receiver.structure()
}
}
impl<Receiver> Source for Start<Receiver>
where
Receiver: StartReceiver + Send,
Receiver::Out: ExchangeData,
{
fn replication(&self) -> Replication {
Replication::Unlimited
}
}
#[cfg(test)]
mod tests {
use crate::network::NetworkMessage;
use crate::operator::{BinaryElement, Operator, Start, StreamElement, Timestamp};
use crate::test::FakeNetworkTopology;
#[cfg(feature = "timestamp")]
fn ts(millis: u64) -> Timestamp {
millis as i64
}
#[test]
fn test_single() {
let mut t = FakeNetworkTopology::new(1, 2);
let (from1, sender1) = t.senders_mut()[0].pop().unwrap();
let (from2, sender2) = t.senders_mut()[0].pop().unwrap();
let mut start_block = Start::single(sender1.receiver_endpoint.prev_block_id, None);
start_block.setup(&mut t.metadata());
sender1
.send(NetworkMessage::new_batch(
vec![StreamElement::Item(42), StreamElement::FlushAndRestart],
from1,
))
.unwrap();
assert_eq!(StreamElement::Item(42), start_block.next());
assert_eq!(StreamElement::FlushBatch, start_block.next());
sender2
.send(NetworkMessage::new_batch(
vec![StreamElement::FlushAndRestart],
from2,
))
.unwrap();
assert_eq!(StreamElement::FlushAndRestart, start_block.next());
sender1
.send(NetworkMessage::new_single(StreamElement::Terminate, from1))
.unwrap();
sender2
.send(NetworkMessage::new_single(StreamElement::Terminate, from2))
.unwrap();
assert_eq!(StreamElement::Terminate, start_block.next());
}
#[test]
#[cfg(feature = "timestamp")]
fn test_single_watermark() {
let mut t = FakeNetworkTopology::new(1, 2);
let (from1, sender1) = t.senders_mut()[0].pop().unwrap();
let (from2, sender2) = t.senders_mut()[0].pop().unwrap();
let mut start_block = Start::single(sender1.receiver_endpoint.prev_block_id, None);
start_block.setup(&mut t.metadata());
sender1
.send(NetworkMessage::new_batch(
vec![
StreamElement::Timestamped(42, ts(10)),
StreamElement::Watermark(ts(20)),
],
from1,
))
.unwrap();
assert_eq!(StreamElement::Timestamped(42, ts(10)), start_block.next());
assert_eq!(StreamElement::FlushBatch, start_block.next());
sender2
.send(NetworkMessage::new_batch(
vec![StreamElement::Watermark(ts(100))],
from2,
))
.unwrap();
assert_eq!(StreamElement::Watermark(ts(20)), start_block.next());
assert_eq!(StreamElement::FlushBatch, start_block.next());
sender1
.send(NetworkMessage::new_batch(
vec![StreamElement::FlushAndRestart],
from1,
))
.unwrap();
sender2
.send(NetworkMessage::new_batch(
vec![StreamElement::Watermark(ts(110))],
from2,
))
.unwrap();
assert_eq!(StreamElement::Watermark(ts(110)), start_block.next());
}
#[test]
#[cfg(feature = "timestamp")]
fn test_multiple_no_cache() {
let mut t = FakeNetworkTopology::new(2, 1);
let (from1, sender1) = t.senders_mut()[0].pop().unwrap();
let (from2, sender2) = t.senders_mut()[1].pop().unwrap();
let mut start_block = Start::multiple(from1.block_id, from2.block_id, false, false, None);
start_block.setup(&mut t.metadata());
sender1
.send(NetworkMessage::new_batch(
vec![
StreamElement::Timestamped(42, ts(10)),
StreamElement::Watermark(ts(20)),
],
from1,
))
.unwrap();
assert_eq!(
StreamElement::Timestamped(BinaryElement::Left(42), ts(10)),
start_block.next()
);
assert_eq!(StreamElement::FlushBatch, start_block.next());
sender2
.send(NetworkMessage::new_batch(
vec![
StreamElement::Timestamped(69, ts(10)),
StreamElement::Watermark(ts(20)),
],
from2,
))
.unwrap();
assert_eq!(
StreamElement::Timestamped(BinaryElement::Right(69), ts(10)),
start_block.next()
);
assert_eq!(StreamElement::Watermark(ts(20)), start_block.next());
sender1
.send(NetworkMessage::new_batch(
vec![StreamElement::FlushAndRestart],
from1,
))
.unwrap();
assert_eq!(
StreamElement::Item(BinaryElement::LeftEnd),
start_block.next()
);
sender2
.send(NetworkMessage::new_batch(
vec![StreamElement::FlushAndRestart],
from2,
))
.unwrap();
assert_eq!(
StreamElement::Item(BinaryElement::RightEnd),
start_block.next()
);
assert_eq!(StreamElement::FlushAndRestart, start_block.next());
}
#[test]
fn test_multiple_cache() {
let mut t = FakeNetworkTopology::new(2, 1);
let (from1, sender1) = t.senders_mut()[0].pop().unwrap();
let (from2, sender2) = t.senders_mut()[1].pop().unwrap();
let mut start_block = Start::multiple(from1.block_id, from2.block_id, true, false, None);
start_block.setup(&mut t.metadata());
sender1
.send(NetworkMessage::new_single(StreamElement::Item(42), from1))
.unwrap();
sender1
.send(NetworkMessage::new_single(StreamElement::Item(43), from1))
.unwrap();
sender2
.send(NetworkMessage::new_single(StreamElement::Item(69), from2))
.unwrap();
let mut recv = [start_block.next(), start_block.next(), start_block.next()];
recv.sort(); assert_eq!(StreamElement::Item(BinaryElement::Left(42)), recv[0]);
assert_eq!(StreamElement::Item(BinaryElement::Left(43)), recv[1]);
assert_eq!(StreamElement::Item(BinaryElement::Right(69)), recv[2]);
sender1
.send(NetworkMessage::new_batch(
vec![StreamElement::FlushAndRestart, StreamElement::Terminate],
from1,
))
.unwrap();
sender2
.send(NetworkMessage::new_batch(
vec![StreamElement::FlushAndRestart],
from2,
))
.unwrap();
let mut recv = [start_block.next(), start_block.next()];
recv.sort(); assert_eq!(StreamElement::Item(BinaryElement::LeftEnd), recv[0]);
assert_eq!(StreamElement::Item(BinaryElement::RightEnd), recv[1]);
assert_eq!(StreamElement::FlushAndRestart, start_block.next());
sender2
.send(NetworkMessage::new_batch(
vec![StreamElement::Item(6969), StreamElement::FlushAndRestart],
from2,
))
.unwrap();
let mut recv = [
start_block.next(),
start_block.next(),
start_block.next(),
start_block.next(),
start_block.next(),
];
recv.sort(); assert_eq!(StreamElement::Item(BinaryElement::Left(42)), recv[0]);
assert_eq!(StreamElement::Item(BinaryElement::Left(43)), recv[1]);
assert_eq!(StreamElement::Item(BinaryElement::Right(6969)), recv[2]);
assert_eq!(StreamElement::Item(BinaryElement::LeftEnd), recv[3]);
assert_eq!(StreamElement::Item(BinaryElement::RightEnd), recv[4]);
assert_eq!(StreamElement::FlushAndRestart, start_block.next());
sender2
.send(NetworkMessage::new_single(StreamElement::Terminate, from2))
.unwrap();
assert_eq!(StreamElement::Terminate, start_block.next());
}
#[test]
fn test_multiple_cache_other_side() {
let mut t = FakeNetworkTopology::new(2, 1);
let (from1, sender1) = t.senders_mut()[0].pop().unwrap();
let (from2, sender2) = t.senders_mut()[1].pop().unwrap();
let mut start_block = Start::multiple(from1.block_id, from2.block_id, false, true, None);
start_block.setup(&mut t.metadata());
sender1
.send(NetworkMessage::new_single(StreamElement::Item(42), from1))
.unwrap();
sender1
.send(NetworkMessage::new_single(StreamElement::Item(43), from1))
.unwrap();
sender2
.send(NetworkMessage::new_single(StreamElement::Item(69), from2))
.unwrap();
let mut recv = [start_block.next(), start_block.next(), start_block.next()];
recv.sort(); assert_eq!(StreamElement::Item(BinaryElement::Left(42)), recv[0]);
assert_eq!(StreamElement::Item(BinaryElement::Left(43)), recv[1]);
assert_eq!(StreamElement::Item(BinaryElement::Right(69)), recv[2]);
sender1
.send(NetworkMessage::new_batch(
vec![StreamElement::FlushAndRestart, StreamElement::Terminate],
from1,
))
.unwrap();
sender2
.send(NetworkMessage::new_batch(
vec![StreamElement::FlushAndRestart],
from2,
))
.unwrap();
let mut recv = [start_block.next(), start_block.next()];
recv.sort(); assert_eq!(StreamElement::Item(BinaryElement::LeftEnd), recv[0]);
assert_eq!(StreamElement::Item(BinaryElement::RightEnd), recv[1]);
assert_eq!(StreamElement::FlushAndRestart, start_block.next());
sender2
.send(NetworkMessage::new_single(StreamElement::Terminate, from2))
.unwrap();
assert_eq!(StreamElement::Terminate, start_block.next());
}
}