use indexmap::IndexSet;
use serde::{Deserialize, Serialize};
use std::rc::Rc;
use crate::{
keyed::distributed::{Acquire, Collect, Interrogate},
snapshot::Barrier,
};
use super::{Timestamp, WorkerId};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct DataMessage<K, V, T> {
pub key: K,
pub value: V,
pub timestamp: T,
}
impl<K, V, T> DataMessage<K, V, T> {
pub fn new(key: K, value: V, timestamp: T) -> Self {
Self {
timestamp,
key,
value,
}
}
}
#[derive(Debug)]
pub enum Message<K, V, T> {
Data(DataMessage<K, V, T>),
Epoch(T),
AbsBarrier(Barrier),
Rescale(RescaleMessage),
SuspendMarker(SuspendMarker),
Interrogate(Interrogate<K>),
Collect(Collect<K>),
Acquire(Acquire<K>),
}
macro_rules! impl_from_variants {
($($variant:ident($variant_type:ty)),* $(,)?) => {
$(
impl<K, V, T> From<$variant_type> for Message<K, V, T> {
fn from(value: $variant_type) -> Self {
Message::$variant(value)
}
}
)*
};
}
impl_from_variants!(
Data(DataMessage<K, V, T>),
AbsBarrier(Barrier),
Rescale(RescaleMessage),
SuspendMarker(SuspendMarker),
Interrogate(Interrogate<K>),
Collect(Collect<K>),
Acquire(Acquire<K>),
);
impl<K, V, T> From<T> for Message<K, V, T>
where
T: Timestamp,
{
fn from(value: T) -> Self {
Message::Epoch(value)
}
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct RescaleMessage {
workers: IndexSet<WorkerId>,
version: u64,
rc: Rc<()>,
}
impl RescaleMessage {
pub(crate) fn new(workers: IndexSet<WorkerId>, version: u64) -> Self {
Self {
workers,
version,
rc: Rc::new(()),
}
}
pub fn get_new_workers(&self) -> &IndexSet<WorkerId> {
&self.workers
}
pub fn get_version(&self) -> u64 {
self.version
}
pub(crate) fn strong_count(&self) -> usize {
Rc::strong_count(&self.rc)
}
}
impl<K, V, T> Clone for Message<K, V, T>
where
K: Clone,
V: Clone,
T: Clone,
{
fn clone(&self) -> Self {
match self {
Self::Data(x) => Self::Data(x.clone()),
Self::Epoch(x) => Self::Epoch(x.clone()),
Self::AbsBarrier(x) => Self::AbsBarrier(x.clone()),
Self::Rescale(x) => Self::Rescale(x.clone()),
Self::SuspendMarker(x) => Self::SuspendMarker(x.clone()),
Self::Interrogate(x) => Self::Interrogate(x.clone()),
Self::Collect(x) => Self::Collect(x.clone()),
Self::Acquire(x) => Self::Acquire(x.clone()),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct SuspendMarker {
rc: Rc<()>,
}
impl SuspendMarker {
pub(crate) fn strong_count(&self) -> usize {
Rc::strong_count(&self.rc)
}
}