use std::fmt::{Debug, Display, Formatter};
use serde::{Deserialize, Serialize};
pub(crate) use network_channel::*;
pub(crate) use topology::*;
use crate::operator::StreamElement;
use crate::scheduler::{BlockId, HostId, ReplicaId};
#[cfg(feature = "async-tokio")]
mod tokio;
#[cfg(feature = "async-tokio")]
use tokio::*;
#[cfg(not(feature = "async-tokio"))]
mod sync;
#[cfg(not(feature = "async-tokio"))]
use sync::*;
mod network_channel;
mod topology;
#[derive(Debug, Clone)]
pub enum NetworkDataIterator<T> {
Batch(std::vec::IntoIter<T>),
}
impl<T> Iterator for NetworkDataIterator<T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
match self {
NetworkDataIterator::Batch(i) => i.next(),
}
}
}
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize)]
pub enum NetworkData<T> {
Batch(Vec<T>),
}
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize)]
pub struct NetworkMessage<T> {
sender: Coord,
data: NetworkData<StreamElement<T>>,
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, Ord, PartialOrd)]
pub struct BlockCoord {
pub block_id: BlockId,
pub host_id: HostId,
}
#[derive(
Debug, Clone, Copy, Eq, PartialEq, Hash, Ord, PartialOrd, Default, Deserialize, Serialize,
)]
pub struct Coord {
pub block_id: BlockId,
pub host_id: HostId,
pub replica_id: ReplicaId,
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, Ord, PartialOrd)]
pub struct ReceiverEndpoint {
pub coord: Coord,
pub prev_block_id: BlockId,
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, Ord, PartialOrd)]
pub struct DemuxCoord {
pub coord: BlockCoord,
pub prev_block_id: BlockId,
}
impl<T> NetworkMessage<T> {
pub fn new_single(data: StreamElement<T>, sender: Coord) -> Self {
Self {
data: NetworkData::Batch(vec![data]),
sender,
}
}
pub fn new_batch(data: Vec<StreamElement<T>>, sender: Coord) -> Self {
Self {
data: NetworkData::Batch(data),
sender,
}
}
pub fn sender(&self) -> Coord {
self.sender
}
pub fn num_items(&self) -> usize {
match &self.data {
NetworkData::Batch(v) => v.len(),
}
}
}
impl<T> IntoIterator for NetworkMessage<T> {
type Item = StreamElement<T>;
type IntoIter = NetworkDataIterator<StreamElement<T>>;
fn into_iter(self) -> Self::IntoIter {
match self.data {
NetworkData::Batch(v) => NetworkDataIterator::Batch(v.into_iter()),
}
}
}
impl Coord {
pub fn new(block_id: BlockId, host_id: HostId, replica_id: ReplicaId) -> Self {
Self {
block_id,
host_id,
replica_id,
}
}
}
impl ReceiverEndpoint {
pub fn new(coord: Coord, prev_block_id: BlockId) -> Self {
Self {
coord,
prev_block_id,
}
}
}
impl DemuxCoord {
pub fn new(from: Coord, to: Coord) -> Self {
Self {
coord: to.into(),
prev_block_id: from.block_id,
}
}
pub fn includes_channel(&self, from: Coord, to: Coord) -> bool {
self.coord == BlockCoord::from(to) && self.prev_block_id == from.block_id
}
}
impl Display for Coord {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"(b{:02}.h{:02}.r{:02})",
self.block_id, self.host_id, self.replica_id
)
}
}
impl Display for BlockCoord {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "(b{:02}.h{:02})", self.block_id, self.host_id)
}
}
impl Display for ReceiverEndpoint {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Endpoint{{(b{:02}) -> {}}}",
self.prev_block_id, self.coord
)
}
}
impl Display for DemuxCoord {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Demux{{(b{:02}) -> {}}}", self.prev_block_id, self.coord)
}
}
impl From<Coord> for BlockCoord {
fn from(coord: Coord) -> Self {
Self {
block_id: coord.block_id,
host_id: coord.host_id,
}
}
}
impl From<ReceiverEndpoint> for DemuxCoord {
fn from(endpoint: ReceiverEndpoint) -> Self {
Self {
coord: endpoint.coord.into(),
prev_block_id: endpoint.prev_block_id,
}
}
}