use std::fmt::Debug;
use std::{
ops::{Deref, DerefMut},
rc::Rc,
sync::Mutex,
};
use indexmap::{IndexMap, IndexSet};
use serde::{Deserialize, Serialize};
use crate::runtime::BiCommunicationClient;
use crate::{runtime::communication::Distributable, types::*};
pub trait DistKey: Key + Distributable {}
impl<T: Key + Distributable> DistKey for T {}
pub trait DistData: MaybeData + Distributable {}
impl<T: MaybeData + Distributable> DistData for T {}
pub trait DistTimestamp: MaybeTime + Distributable {}
impl<T: MaybeTime + Distributable> DistTimestamp for T {}
pub(super) type Version = u64;
#[derive(Debug, Serialize, Deserialize, Clone)]
pub(super) enum NetworkMessage<K, V, T> {
Data(NetworkDataMessage<K, V, T>),
Epoch(T),
BarrierMarker,
SuspendMarker,
Acquire(NetworkAcquire<K>),
Upgrade(Version),
AckUpgrade(Version),
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub(super) struct NetworkDataMessage<K, V, T> {
pub content: DataMessage<K, V, T>,
pub version: Version,
}
impl<K, V, T> NetworkDataMessage<K, V, T> {
pub(super) fn new(content: DataMessage<K, V, T>, version: Version) -> Self {
Self { content, version }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(super) struct NetworkAcquire<K> {
pub(super) key: K,
collection: IndexMap<OperatorId, Vec<u8>>,
}
impl<K> NetworkAcquire<K> {
pub(super) fn new(key: K, collection: IndexMap<OperatorId, Vec<u8>>) -> Self {
Self { key, collection }
}
}
impl<K> From<NetworkAcquire<K>> for Acquire<K> {
fn from(value: NetworkAcquire<K>) -> Self {
Self {
key: value.key,
collection: Rc::new(Mutex::new(value.collection)),
}
}
}
impl<K> TryFrom<Collect<K>> for NetworkAcquire<K> {
type Error = Collect<K>;
fn try_from(value: Collect<K>) -> Result<Self, Self::Error> {
value
.try_unwrap()
.map(|(key, collection)| Self::new(key, collection))
}
}
#[derive(Debug, Serialize, Deserialize)]
pub(super) struct RemoteState<T> {
pub is_barred: bool,
pub frontier: Option<T>,
pub last_version: Option<Version>,
pub last_ack_version: Option<Version>,
#[serde(skip)]
pub sent_suspend: bool,
}
impl<T> Default for RemoteState<T> {
fn default() -> Self {
Self {
is_barred: Default::default(),
frontier: Default::default(),
last_ack_version: Default::default(),
last_version: None,
sent_suspend: false,
}
}
}
pub(super) struct Container<T> {
inner: Option<T>,
}
impl<T> Container<T> {
pub(super) fn new(value: T) -> Self {
Self { inner: Some(value) }
}
pub(super) fn apply(&mut self, func: impl FnOnce(T) -> T) {
let new_value = func(self.inner.take().expect("Always Some"));
self.inner = Some(new_value);
}
}
impl<T> Deref for Container<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.inner.as_ref().expect("Always Some")
}
}
impl<T> DerefMut for Container<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.inner.as_mut().expect("Always Some")
}
}
pub type WorkerPartitioner<K> = fn(&K, &IndexSet<WorkerId>) -> WorkerId;
use crate::types::OperatorId;
#[derive(Clone)]
pub struct Interrogate<K> {
shared: Rc<Mutex<IndexSet<K>>>,
tester: Rc<dyn Fn(&K) -> bool>,
}
impl<K> Interrogate<K>
where
K: Key,
{
pub(crate) fn new(tester: Rc<dyn Fn(&K) -> bool>) -> Self {
let shared = Rc::new(Mutex::new(IndexSet::new()));
Self { shared, tester }
}
pub fn add_keys<'a>(&mut self, keys: impl IntoIterator<Item = &'a K>) {
#[allow(clippy::unwrap_used)]
let mut guard = self.shared.lock().unwrap();
for key in keys {
if (self.tester)(key) {
guard.insert(key.clone());
}
}
}
pub(crate) fn try_unwrap(self) -> Result<IndexSet<K>, Self> {
#[allow(clippy::unwrap_used)]
Rc::try_unwrap(self.shared)
.map(|x| Mutex::into_inner(x).unwrap())
.map_err(|x| Self {
shared: x,
tester: self.tester,
})
}
}
#[derive(Clone, Debug)]
pub struct Collect<K> {
pub key: K,
collection: Rc<Mutex<IndexMap<OperatorId, Vec<u8>>>>,
}
impl<K> Collect<K>
where
K: Key,
{
pub(crate) fn new(key: K) -> Self {
Self {
key,
collection: Rc::new(Mutex::new(IndexMap::new())),
}
}
pub fn add_state<S: Distributable>(&mut self, operator_id: OperatorId, state: S) {
#[allow(clippy::unwrap_used)]
self.collection
.lock()
.unwrap()
.insert(operator_id, BiCommunicationClient::encode(state));
}
}
impl<K> Collect<K> {
pub(crate) fn try_unwrap(self) -> Result<(K, IndexMap<OperatorId, Vec<u8>>), Self> {
#[allow(clippy::unwrap_used)]
match Rc::try_unwrap(self.collection).map(|mutex| mutex.into_inner().unwrap()) {
Ok(collection) => Ok((self.key, collection)),
Err(collection) => Err(Self {
key: self.key,
collection,
}),
}
}
}
impl<K> Debug for Interrogate<K>
where
K: Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Interrogate")
.field("shared", &self.shared)
.field("tester", &"Fn(&K) -> bool")
.finish()
}
}
#[derive(Debug, Clone)]
pub struct Acquire<K> {
key: K,
collection: Rc<Mutex<IndexMap<OperatorId, Vec<u8>>>>,
}
impl<K> Acquire<K> {
#[cfg(test)]
pub(crate) fn new(key: K, collection: IndexMap<OperatorId, Vec<u8>>) -> Self {
Self {
key,
collection: Rc::new(Mutex::new(collection)),
}
}
}
impl<K> Acquire<K>
where
K: Key,
{
pub fn take_state<S: Distributable>(&self, operator_id: &OperatorId) -> Option<(K, S)> {
#[allow(clippy::unwrap_used)]
self.collection
.lock()
.unwrap()
.swap_remove(operator_id)
.map(|x| BiCommunicationClient::decode(&x))
.map(|s| (self.key.clone(), s))
}
}