use std::cmp::Ordering;
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, HashSet};
use std::fmt::{Debug, Formatter};
use std::net::{Ipv4Addr, SocketAddr};
use std::ops::Bound;
use std::time::Duration;
use itertools::Itertools;
use lru::LruCache;
use rand::Rng;
use rand::prelude::SliceRandom;
use serde::{Deserialize, Serialize};
use tokio::sync::watch;
use tokio::time::Instant;
use tracing::{info, warn};
use crate::delta::{Delta, DeltaSerializer, NodeDelta};
use crate::digest::{Digest, NodeDigest};
use crate::listener::Listeners;
use crate::types::{DeletionStatus, DeletionStatusMutation};
use crate::{
ChitchatId, GARBAGE_COLLECTED_NODE_HISTORY_SIZE, Heartbeat, KeyChangeEvent, Version,
VersionedValue,
};
#[derive(Clone, Serialize, Deserialize)]
pub struct NodeState {
chitchat_id: ChitchatId,
heartbeat: Heartbeat,
key_values: BTreeMap<String, VersionedValue>,
#[serde(skip)]
listeners: Listeners,
max_version: Version,
last_gc_version: Version,
}
impl Debug for NodeState {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
f.debug_struct("NodeState")
.field("heartbeat", &self.heartbeat)
.field("key_values", &self.key_values)
.field("max_version", &self.max_version)
.field("last_gc_version", &self.last_gc_version)
.finish()
}
}
impl NodeState {
fn new(chitchat_id: ChitchatId, listeners: Listeners) -> NodeState {
NodeState {
chitchat_id,
heartbeat: Heartbeat(0),
key_values: Default::default(),
max_version: 0u64,
listeners,
last_gc_version: 0u64,
}
}
pub fn chitchat_id(&self) -> &ChitchatId {
&self.chitchat_id
}
pub fn last_gc_version(&self) -> Version {
self.last_gc_version
}
pub(crate) fn set_last_gc_version(&mut self, last_gc_version: Version) {
self.last_gc_version = last_gc_version;
}
pub fn for_test() -> NodeState {
NodeState {
chitchat_id: ChitchatId {
node_id: "test-node".to_string(),
generation_id: 0,
gossip_advertise_addr: SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 7280),
},
heartbeat: Heartbeat(0),
key_values: Default::default(),
max_version: Default::default(),
listeners: Listeners::default(),
last_gc_version: 0u64,
}
}
pub fn heartbeat(&self) -> Heartbeat {
self.heartbeat
}
pub fn max_version(&self) -> Version {
self.max_version
}
pub fn key_values_including_deleted(&self) -> impl Iterator<Item = (&str, &VersionedValue)> {
self.key_values
.iter()
.map(|(key, versioned_value)| (key.as_str(), versioned_value))
}
pub fn key_values(&self) -> impl Iterator<Item = (&str, &str)> {
self.key_values_including_deleted()
.filter(|(_, versioned_value)| !versioned_value.is_deleted())
.map(|(key, versioned_value)| (key, versioned_value.value.as_str()))
}
pub fn set_max_version(&mut self, max_version: Version) {
self.max_version = max_version;
}
#[must_use]
fn check_delta_status(&self, node_delta: &NodeDelta) -> DeltaStatus {
if node_delta.from_version_excluded > self.max_version {
info!(
node=?node_delta.chitchat_id,
from_version=node_delta.from_version_excluded,
last_gc_version=node_delta.last_gc_version,
current_last_gc_version=self.last_gc_version,
"received delta from the future, ignoring it"
);
return DeltaStatus::Reject;
}
let compatible_without_reset =
node_delta.last_gc_version <= self.last_gc_version ||
node_delta.last_gc_version <= self.max_version();
if !compatible_without_reset {
if node_delta.from_version_excluded != 0 {
warn!(
node=?node_delta.chitchat_id,
from_version=node_delta.from_version_excluded,
last_gc_version=node_delta.last_gc_version,
current_last_gc_version=self.last_gc_version,
"received an inapplicable delta, ignoring it");
return DeltaStatus::Reject;
} else {
return DeltaStatus::ApplyAfterReset;
}
}
if self.max_version() < node_delta.max_version {
DeltaStatus::Apply
} else {
DeltaStatus::Reject
}
}
pub fn monotonic_property(&self) -> (Version, Version) {
(self.last_gc_version(), self.max_version())
}
fn reset_node(&mut self, last_gc_version: Version) {
*self = NodeState::new(self.chitchat_id.clone(), self.listeners.clone());
self.max_version = 0;
self.last_gc_version = last_gc_version;
}
fn apply_delta(&mut self, node_delta: NodeDelta, now: Instant) -> DeltaStatus {
let delta_status = self.check_delta_status(&node_delta);
match delta_status {
DeltaStatus::Reject => {
return delta_status;
}
DeltaStatus::Apply => {}
DeltaStatus::ApplyAfterReset => {
info!(
"resetting for node id {:?} last_gc_version {}",
&node_delta.chitchat_id, node_delta.last_gc_version
);
self.reset_node(node_delta.last_gc_version);
}
}
let current_max_version = self.max_version();
for key_value_mutation in node_delta.key_values {
if key_value_mutation.version <= current_max_version {
continue;
}
if key_value_mutation.status.scheduled_for_deletion() {
if key_value_mutation.version <= self.last_gc_version {
continue;
}
}
let new_versioned_value = VersionedValue {
value: key_value_mutation.value,
version: key_value_mutation.version,
status: key_value_mutation.status.into_status(now),
};
self.set_versioned_value(key_value_mutation.key, new_versioned_value);
}
assert!(node_delta.max_version >= self.max_version);
self.max_version = node_delta.max_version;
delta_status
}
pub fn iter_prefix<'a>(
&'a self,
prefix: &'a str,
) -> impl Iterator<Item = (&'a str, &'a VersionedValue)> + 'a {
let range = (Bound::Included(prefix), Bound::Unbounded);
self.key_values
.range::<str, _>(range)
.take_while(move |(key, _)| key.starts_with(prefix))
.filter(|&(_, versioned_value)| !versioned_value.is_deleted())
.map(|(key, versioned_value)| (key.as_str(), versioned_value))
}
pub fn num_key_values(&self) -> usize {
self.key_values().count()
}
pub fn contains_key(&self, key: &str) -> bool {
self.get(key).is_some()
}
pub fn get(&self, key: &str) -> Option<&str> {
let versioned_value = self.get_versioned(key)?;
if versioned_value.is_deleted() {
return None;
}
Some(versioned_value.value.as_str())
}
pub fn get_versioned(&self, key: &str) -> Option<&VersionedValue> {
self.key_values.get(key)
}
pub fn set(&mut self, key: impl ToString, value: impl ToString) {
let key = key.to_string();
let value = value.to_string();
if let Some(previous_versioned_value) = self.get_versioned(&key) {
if previous_versioned_value.value == value
&& matches!(previous_versioned_value.status, DeletionStatus::Set)
{
return;
}
}
let new_version = self.max_version + 1;
self.set_with_version(key, value, new_version);
}
pub fn set_with_ttl(&mut self, key: impl ToString, value: impl ToString) {
let key = key.to_string();
let value = value.to_string();
if let Some(previous_versioned_value) = self.get_versioned(&key) {
if previous_versioned_value.value == value
&& matches!(
previous_versioned_value.status,
DeletionStatus::DeleteAfterTtl(_)
)
{
return;
}
}
let new_version = self.max_version + 1;
self.set_versioned_value(
key.to_string(),
VersionedValue {
value: value.to_string(),
version: new_version,
status: DeletionStatus::DeleteAfterTtl(Instant::now()),
},
);
}
pub fn delete(&mut self, key: &str) {
let Some(versioned_value) = self.key_values.get_mut(key) else {
warn!("Key `{key}` does not exist in the node's state and could not be deleted.",);
return;
};
self.max_version += 1;
versioned_value.version = self.max_version;
versioned_value.value = "".to_string();
versioned_value.status = DeletionStatusMutation::Delete.into_status(Instant::now());
}
pub fn delete_after_ttl(&mut self, key: &str) {
let Some(versioned_value) = self.key_values.get_mut(key) else {
warn!(
"Key `{key}` does not exist in the node's state and could not scheduled for an \
eventual deletion.",
);
return;
};
self.max_version += 1;
versioned_value.version = self.max_version;
versioned_value.status = DeletionStatusMutation::DeleteAfterTtl.into_status(Instant::now());
}
pub(crate) fn inc_heartbeat(&mut self) {
self.heartbeat.inc();
}
pub fn try_set_heartbeat(&mut self, heartbeat_new_value: Heartbeat) -> bool {
if self.heartbeat.0 == 0 {
self.heartbeat = heartbeat_new_value;
return false;
}
if heartbeat_new_value > self.heartbeat {
self.heartbeat = heartbeat_new_value;
true
} else {
false
}
}
fn digest(&self) -> NodeDigest {
NodeDigest {
heartbeat: self.heartbeat,
last_gc_version: self.last_gc_version,
max_version: self.max_version,
}
}
fn gc_keys_marked_for_deletion(&mut self, grace_period: Duration) {
let now = Instant::now();
let mut max_deleted_version = self.last_gc_version;
self.key_values
.retain(|_, versioned_value: &mut VersionedValue| {
let Some(deleted_start_instant) = versioned_value
.status
.time_of_start_scheduled_for_deletion()
else {
return true;
};
if now < deleted_start_instant + grace_period {
return true;
}
max_deleted_version = versioned_value.version.max(max_deleted_version);
false
});
self.last_gc_version = max_deleted_version;
}
pub(crate) fn remove_key_value_internal(&mut self, key: &str) {
self.key_values.remove(key);
}
fn stale_key_values(
&self,
floor_version: u64,
) -> impl Iterator<Item = (&str, &VersionedValue)> {
self.key_values_including_deleted()
.filter(move |(_key, versioned_value)| versioned_value.version > floor_version)
}
pub(crate) fn set_versioned_value(
&mut self,
key: String,
versioned_value_update: VersionedValue,
) {
let key_clone = key.clone();
let key_change_event = KeyChangeEvent {
key: key_clone.as_str(),
value: &versioned_value_update.value,
node: &self.chitchat_id,
};
self.max_version = versioned_value_update.version.max(self.max_version);
match self.key_values.entry(key) {
Entry::Occupied(mut occupied) => {
let occupied_versioned_value = occupied.get_mut();
if occupied_versioned_value.version >= versioned_value_update.version {
return;
}
*occupied_versioned_value = versioned_value_update.clone();
}
Entry::Vacant(vacant) => {
vacant.insert(versioned_value_update.clone());
}
};
if !versioned_value_update.is_deleted() {
self.listeners.trigger_event(key_change_event);
}
}
fn set_with_version(&mut self, key: impl ToString, value: impl ToString, version: Version) {
assert!(version > self.max_version);
self.set_versioned_value(
key.to_string(),
VersionedValue {
value: value.to_string(),
version,
status: DeletionStatus::Set,
},
);
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
enum DeltaStatus {
Reject,
Apply,
ApplyAfterReset,
}
pub(crate) struct ClusterState {
node_states: BTreeMap<ChitchatId, NodeState>,
seed_addrs: watch::Receiver<HashSet<SocketAddr>>,
pub(crate) listeners: Listeners,
garbage_collected_nodes: LruCache<ChitchatId, Heartbeat>,
}
impl Debug for ClusterState {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
f.debug_struct("Cluster")
.field("seed_addrs", &self.seed_addrs.borrow())
.field("node_states", &self.node_states)
.finish()
}
}
#[cfg(test)]
impl Default for ClusterState {
fn default() -> Self {
let (_seed_addrs_tx, seed_addrs_rx) = watch::channel(Default::default());
Self {
node_states: Default::default(),
seed_addrs: seed_addrs_rx,
listeners: Default::default(),
garbage_collected_nodes: LruCache::new(GARBAGE_COLLECTED_NODE_HISTORY_SIZE),
}
}
}
impl ClusterState {
pub fn with_seed_addrs(seed_addrs: watch::Receiver<HashSet<SocketAddr>>) -> ClusterState {
ClusterState {
seed_addrs,
node_states: BTreeMap::new(),
listeners: Default::default(),
garbage_collected_nodes: LruCache::new(GARBAGE_COLLECTED_NODE_HISTORY_SIZE),
}
}
pub(crate) fn node_states(&self) -> &BTreeMap<ChitchatId, NodeState> {
&self.node_states
}
pub(crate) fn node_state_mut_or_init(&mut self, chitchat_id: &ChitchatId) -> &mut NodeState {
let entry = self.node_states.entry(chitchat_id.clone());
match entry {
Entry::Occupied(value) => value.into_mut(),
Entry::Vacant(vacant) => {
self.garbage_collected_nodes.pop(chitchat_id);
let new_node_state = NodeState::new(chitchat_id.clone(), self.listeners.clone());
vacant.insert(new_node_state)
}
}
}
pub fn node_state(&self, chitchat_id: &ChitchatId) -> Option<&NodeState> {
self.node_states.get(chitchat_id)
}
pub(crate) fn node_state_mut(&mut self, chitchat_id: &ChitchatId) -> Option<&mut NodeState> {
self.node_states.get_mut(chitchat_id)
}
pub fn nodes(&self) -> impl Iterator<Item = &ChitchatId> {
self.node_states.keys()
}
pub fn seed_addrs(&self) -> HashSet<SocketAddr> {
self.seed_addrs.borrow().clone()
}
pub(crate) fn remove_node(&mut self, chitchat_id: &ChitchatId) {
let node_state = self.node_states.remove(chitchat_id);
if let Some(node_state) = node_state {
self.garbage_collected_nodes
.push(chitchat_id.clone(), node_state.heartbeat);
}
}
pub(crate) fn apply_delta(&mut self, delta: Delta) -> bool {
let now = Instant::now();
let mut contains_reset = false;
for node_delta in delta.node_deltas {
if let Some(node_state) = self.node_state_mut(&node_delta.chitchat_id) {
let monotonic_property_before = node_state.monotonic_property();
let delta_status = node_state.apply_delta(node_delta, now);
let monotonic_property_after = node_state.monotonic_property();
assert!(
monotonic_property_after >= monotonic_property_before,
"after {monotonic_property_after:?}, before {monotonic_property_before:?}"
);
contains_reset |= delta_status == DeltaStatus::ApplyAfterReset;
}
}
contains_reset
}
pub fn compute_digest(&self, scheduled_for_deletion: &HashSet<&ChitchatId>) -> Digest {
Digest {
node_digests: self
.node_states
.iter()
.filter(|(chitchat_id, _)| !scheduled_for_deletion.contains(chitchat_id))
.map(|(chitchat_id, node_state)| (chitchat_id.clone(), node_state.digest()))
.collect(),
}
}
pub fn gc_keys_marked_for_deletion(&mut self, marked_for_deletion_grace_period: Duration) {
for node_state in self.node_states.values_mut() {
node_state.gc_keys_marked_for_deletion(marked_for_deletion_grace_period);
}
}
pub fn compute_partial_delta_respecting_mtu(
&self,
digest: &Digest,
mtu: usize,
scheduled_for_deletion: &HashSet<&ChitchatId>,
) -> Delta {
let mut stale_nodes = SortedStaleNodes::default();
for (chitchat_id, node_state) in &self.node_states {
if scheduled_for_deletion.contains(chitchat_id) {
continue;
}
let (digest_last_gc_version, digest_max_version) = digest
.node_digests
.get(chitchat_id)
.map(|node_digest| (node_digest.last_gc_version, node_digest.max_version))
.unwrap_or((0u64, 0u64));
if node_state.max_version <= digest_max_version {
continue;
}
let should_reset = digest_last_gc_version < node_state.last_gc_version
&& digest_max_version < node_state.last_gc_version;
let from_version_excluded = if should_reset {
warn!(
"Node to reset {chitchat_id:?} last gc version: {} max version: {}",
node_state.last_gc_version, digest_max_version
);
0u64
} else {
digest_max_version
};
stale_nodes.offer(chitchat_id, node_state, from_version_excluded);
}
let mut delta_serializer = DeltaSerializer::with_mtu(mtu);
for stale_node in stale_nodes.into_iter() {
if !delta_serializer.try_add_node(
stale_node.chitchat_id.clone(),
stale_node.node_state.last_gc_version,
stale_node.from_version_excluded,
) {
break;
};
let mut added_something = false;
for (key, versioned_value) in stale_node.stale_key_values() {
if !delta_serializer.try_add_kv(key, versioned_value.clone()) {
return delta_serializer.finish();
}
added_something = true;
}
if !added_something {
let _ = delta_serializer.try_set_max_version(stale_node.node_state.max_version);
}
}
delta_serializer.finish()
}
pub fn last_heartbeat_if_deleted(&self, chitchat_id: &ChitchatId) -> Option<Heartbeat> {
self.garbage_collected_nodes.peek(chitchat_id).copied()
}
}
#[derive(Clone, Copy, Eq, PartialEq, Debug)]
struct Staleness {
is_unknown: bool,
max_version: u64,
num_stale_key_values: usize,
}
impl Ord for Staleness {
fn cmp(&self, other: &Self) -> Ordering {
self.is_unknown.cmp(&other.is_unknown).then_with(|| {
if self.is_unknown {
self.max_version.cmp(&other.max_version).reverse()
} else {
self.num_stale_key_values.cmp(&other.num_stale_key_values)
}
})
}
}
impl PartialOrd for Staleness {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
#[derive(Default)]
struct SortedStaleNodes<'a> {
stale_nodes: BTreeMap<Staleness, Vec<StaleNode<'a>>>,
}
fn staleness_score(node_state: &NodeState, floor_version: u64) -> Option<Staleness> {
if node_state.max_version() <= floor_version {
return None;
}
let is_unknown = floor_version == 0u64;
let num_stale_key_values = if is_unknown {
node_state.num_key_values()
} else {
node_state.stale_key_values(floor_version).count()
};
Some(Staleness {
is_unknown,
max_version: node_state.max_version,
num_stale_key_values,
})
}
impl<'a> SortedStaleNodes<'a> {
fn offer(
&mut self,
chitchat_id: &'a ChitchatId,
node_state: &'a NodeState,
from_version_excluded: u64,
) {
let Some(staleness) = staleness_score(node_state, from_version_excluded) else {
return;
};
let stale_node = StaleNode {
chitchat_id,
node_state,
from_version_excluded,
};
self.stale_nodes
.entry(staleness)
.or_default()
.push(stale_node);
}
fn into_iter(self) -> impl Iterator<Item = StaleNode<'a>> {
let mut rng = random_generator();
self.stale_nodes
.into_values()
.rev()
.flat_map(move |mut stale_nodes| {
stale_nodes.shuffle(&mut rng);
stale_nodes.into_iter()
})
}
}
#[derive(Debug)]
struct StaleNode<'a> {
chitchat_id: &'a ChitchatId,
node_state: &'a NodeState,
from_version_excluded: u64,
}
impl StaleNode<'_> {
fn stale_key_values(&self) -> impl Iterator<Item = (&str, &VersionedValue)> {
self.node_state
.stale_key_values(self.from_version_excluded)
.sorted_unstable_by_key(|(_, versioned_value)| versioned_value.version)
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ClusterStateSnapshot {
pub node_states: Vec<NodeState>,
pub seed_addrs: HashSet<SocketAddr>,
}
impl From<&ClusterState> for ClusterStateSnapshot {
fn from(cluster_state: &ClusterState) -> Self {
let node_states = cluster_state.node_states.values().cloned().collect();
Self {
node_states,
seed_addrs: cluster_state.seed_addrs(),
}
}
}
#[cfg(not(test))]
fn random_generator() -> impl Rng {
rand::rng()
}
#[cfg(test)]
fn random_generator() -> impl Rng {
use rand::SeedableRng;
use rand::prelude::StdRng;
StdRng::seed_from_u64(9u64)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::MAX_UDP_DATAGRAM_PAYLOAD_SIZE;
use crate::serialize::Serializable;
use crate::types::{DeletionStatusMutation, KeyValueMutation};
#[test]
fn test_stale_node_iter_stale_key_values() {
{
let node = ChitchatId::for_local_test(10_001);
let node_state = NodeState::for_test();
let stale_node = StaleNode {
chitchat_id: &node,
node_state: &node_state,
from_version_excluded: 0u64,
};
assert!(stale_node.stale_key_values().next().is_none());
}
{
let node = ChitchatId::for_local_test(10_001);
let mut node_state = NodeState::for_test();
node_state
.key_values
.insert("key_a".to_string(), VersionedValue::for_test("value_a", 3));
node_state
.key_values
.insert("key_b".to_string(), VersionedValue::for_test("value_b", 2));
node_state
.key_values
.insert("key_c".to_string(), VersionedValue::for_test("value_c", 1));
let stale_node = StaleNode {
chitchat_id: &node,
node_state: &node_state,
from_version_excluded: 1u64,
};
assert_eq!(
stale_node.stale_key_values().collect::<Vec<_>>(),
vec![
("key_b", &VersionedValue::for_test("value_b", 2)),
("key_a", &VersionedValue::for_test("value_a", 3))
]
);
}
}
#[test]
fn test_sorted_stale_nodes_empty() {
let stale_nodes = SortedStaleNodes::default();
assert!(stale_nodes.into_iter().next().is_none());
}
#[test]
fn test_sorted_stale_nodes_insert() {
let mut stale_nodes = SortedStaleNodes::default();
let node1 = ChitchatId::for_local_test(10_001);
let node2 = ChitchatId::for_local_test(10_002);
let node3 = ChitchatId::for_local_test(10_003);
let mut node_state1 = NodeState::for_test();
node_state1.set_max_version(2);
stale_nodes.offer(&node1, &node_state1, 0u64);
assert_eq!(stale_nodes.stale_nodes.len(), 1);
let mut node2_state = NodeState::for_test();
node2_state.set_with_version("key_a", "value_a", 1);
stale_nodes.offer(&node2, &node2_state, 0u64);
let expected_staleness = Staleness {
is_unknown: true,
max_version: 1,
num_stale_key_values: 0,
};
assert_eq!(stale_nodes.stale_nodes[&expected_staleness].len(), 1);
let mut node3_state = NodeState::for_test();
node3_state.set_with_version("key_b", "value_b", 2);
node3_state.set_with_version("key_c", "value_c", 3);
stale_nodes.offer(&node3, &node3_state, 0u64);
let expected_staleness = Staleness {
is_unknown: true,
max_version: 3,
num_stale_key_values: 3,
};
assert_eq!(stale_nodes.stale_nodes[&expected_staleness].len(), 1);
}
#[test]
fn test_sorted_stale_nodes_offer() {
let mut stale_nodes = SortedStaleNodes::default();
let node1 = ChitchatId::for_local_test(10_001);
let node1_state = NodeState::for_test();
stale_nodes.offer(&node1, &node1_state, 1u64);
assert!(stale_nodes.stale_nodes.is_empty());
let node2 = ChitchatId::for_local_test(10_002);
let mut node2_state = NodeState::for_test();
node2_state.set_with_version("key_a", "value_a", 1);
stale_nodes.offer(&node2, &node2_state, 1u64);
assert!(stale_nodes.stale_nodes.is_empty());
let node3 = ChitchatId::for_local_test(10_002);
let mut node3_state = NodeState::for_test();
node3_state.set_with_version("key_a", "value_a", 1);
node3_state.set_with_version("key_b", "value_b", 2);
node3_state.set_with_version("key_c", "value_c", 3);
stale_nodes.offer(&node3, &node3_state, 1u64);
assert_eq!(stale_nodes.stale_nodes.len(), 1);
let expected_staleness = Staleness {
is_unknown: false,
max_version: 1,
num_stale_key_values: 2,
};
assert_eq!(stale_nodes.stale_nodes[&expected_staleness].len(), 1);
}
#[test]
fn test_sorted_stale_nodes_into_iter() {
let mut stale_nodes = SortedStaleNodes::default();
let node1 = ChitchatId::for_local_test(10_001);
let mut node_state1 = NodeState::for_test();
node_state1.set_with_version("key_a", "value_a", 1);
node_state1.set_with_version("key_b", "value_b", 2);
node_state1.set_with_version("key_c", "value_c", 3);
stale_nodes.offer(&node1, &node_state1, 1u64);
let node2 = ChitchatId::for_local_test(10_002);
let mut node_state2 = NodeState::for_test();
node_state2.set_with_version("key_a", "value", 1);
node_state2.set_with_version("key_b", "value_b", 2);
node_state2.set_with_version("key_c", "value_c", 5);
stale_nodes.offer(&node2, &node_state2, 2u64);
let node3 = ChitchatId::for_local_test(10_003);
let mut node_state3 = NodeState::for_test();
node_state3.set_with_version("key_a", "value_a", 1);
node_state3.set_with_version("key_b", "value_b", 2);
node_state3.set_with_version("key_c", "value_c", 3);
stale_nodes.offer(&node3, &node_state3, 7u64);
let node4 = ChitchatId::for_local_test(10_004);
let mut node_state4 = NodeState::for_test();
node_state4.set_with_version("key_a", "value_a", 1);
node_state4.set_with_version("key_b", "value_b", 2);
node_state4.set_with_version("key_c", "value_c", 5);
node_state4.set_with_version("key_d", "value_d", 7);
stale_nodes.offer(&node4, &node_state4, 1);
let node5 = ChitchatId::for_local_test(10_005);
let node_state5 = NodeState::for_test();
stale_nodes.offer(&node5, &node_state5, 0);
let node6 = ChitchatId::for_local_test(10_006);
let mut node_state6 = NodeState::for_test();
node_state6.set_with_version("key_a", "value_a", 1);
stale_nodes.offer(&node6, &node_state6, 0u64);
assert_eq!(
stale_nodes
.into_iter()
.map(|stale_node| stale_node.chitchat_id.gossip_advertise_addr.port())
.collect::<Vec<_>>(),
vec![10_006, 10_004, 10_001, 10_002]
);
}
#[test]
fn test_cluster_state_missing_node() {
let cluster_state = ClusterState::default();
let node_state = cluster_state.node_state(&ChitchatId::for_local_test(10_001));
assert!(node_state.is_none());
}
#[test]
fn test_cluster_state_first_version_is_one() {
let mut cluster_state = ClusterState::default();
let node_state = cluster_state.node_state_mut_or_init(&ChitchatId::for_local_test(10_001));
node_state.set("key_a", "");
assert_eq!(
node_state.get_versioned("key_a").unwrap(),
&VersionedValue {
value: "".to_string(),
version: 1,
status: DeletionStatus::Set,
}
);
}
#[test]
fn test_cluster_state_set() {
let mut cluster_state = ClusterState::default();
let node_state = cluster_state.node_state_mut_or_init(&ChitchatId::for_local_test(10_001));
node_state.set("key_a", "1");
assert_eq!(
node_state.get_versioned("key_a").unwrap(),
&VersionedValue {
value: "1".to_string(),
version: 1,
status: DeletionStatus::Set,
}
);
node_state.set("key_b", "2");
assert_eq!(
node_state.get_versioned("key_a").unwrap(),
&VersionedValue {
value: "1".to_string(),
version: 1,
status: DeletionStatus::Set,
}
);
assert_eq!(
node_state.get_versioned("key_b").unwrap(),
&VersionedValue {
value: "2".to_string(),
version: 2,
status: DeletionStatus::Set,
}
);
node_state.set("key_a", "3");
assert_eq!(
node_state.get_versioned("key_a").unwrap(),
&VersionedValue {
value: "3".to_string(),
version: 3,
status: DeletionStatus::Set
}
);
}
#[test]
fn test_cluster_state_set_with_same_value_updates_version() {
let mut cluster_state = ClusterState::default();
let node_state = cluster_state.node_state_mut_or_init(&ChitchatId::for_local_test(10_001));
node_state.set("key", "1");
assert_eq!(
node_state.get_versioned("key").unwrap(),
&VersionedValue {
value: "1".to_string(),
version: 1,
status: DeletionStatus::Set
}
);
node_state.set("key", "1");
assert_eq!(
node_state.get_versioned("key").unwrap(),
&VersionedValue {
value: "1".to_string(),
version: 1,
status: DeletionStatus::Set,
}
);
}
#[test]
fn test_cluster_state_set_and_mark_for_deletion() {
let mut cluster_state = ClusterState::default();
let node_state = cluster_state.node_state_mut_or_init(&ChitchatId::for_local_test(10_001));
node_state.heartbeat = Heartbeat(10);
node_state.set("key", "1");
node_state.delete("key");
assert!(node_state.get("key").is_none());
{
let versioned_value = node_state.get_versioned("key").unwrap();
assert_eq!(&versioned_value.value, "");
assert_eq!(versioned_value.version, 2u64);
assert!(versioned_value.is_deleted());
assert!(
versioned_value
.status
.time_of_start_scheduled_for_deletion()
.is_some()
);
}
node_state.set("key", "2");
{
let versioned_value = node_state.get_versioned("key").unwrap();
assert_eq!(&versioned_value.value, "2");
assert_eq!(versioned_value.version, 3u64);
assert!(!versioned_value.is_deleted());
assert!(
versioned_value
.status
.time_of_start_scheduled_for_deletion()
.is_none()
);
}
}
#[test]
fn test_cluster_state_delete_after_ttl() {
let mut cluster_state = ClusterState::default();
let node_state = cluster_state.node_state_mut_or_init(&ChitchatId::for_local_test(10_001));
node_state.heartbeat = Heartbeat(10);
node_state.set("key", "1");
node_state.delete_after_ttl("key");
{
let value = node_state.get("key").unwrap();
assert_eq!(value, "1");
let versioned_value = node_state.get_versioned("key").unwrap();
assert_eq!(&versioned_value.value, "1");
assert_eq!(versioned_value.version, 2u64);
assert!(
versioned_value
.status
.time_of_start_scheduled_for_deletion()
.is_some()
);
assert!(!versioned_value.is_deleted());
assert!(matches!(
versioned_value.status,
DeletionStatus::DeleteAfterTtl(_)
));
}
node_state.set("key", "2");
{
let versioned_value = node_state.get_versioned("key").unwrap();
assert_eq!(&versioned_value.value, "2");
assert_eq!(versioned_value.version, 3u64);
assert!(!versioned_value.is_deleted());
assert!(
versioned_value
.status
.time_of_start_scheduled_for_deletion()
.is_none()
);
assert!(matches!(versioned_value.status, DeletionStatus::Set));
}
}
#[test]
fn test_cluster_state_compute_digest() {
let mut cluster_state = ClusterState::default();
let node1 = ChitchatId::for_local_test(10_001);
let node1_state = cluster_state.node_state_mut_or_init(&node1);
node1_state.set("key_a", "");
let node2 = ChitchatId::for_local_test(10_002);
let node2_state = cluster_state.node_state_mut_or_init(&node2);
node2_state.set_last_gc_version(10u64);
node2_state.set("key_a", "");
node2_state.set("key_b", "");
let digest = cluster_state.compute_digest(&HashSet::new());
let mut expected_node_digests = Digest::default();
expected_node_digests.add_node(node1.clone(), Heartbeat(0), 0, 1);
expected_node_digests.add_node(node2.clone(), Heartbeat(0), 10u64, 2);
assert_eq!(&digest, &expected_node_digests);
}
#[tokio::test]
async fn test_cluster_state_gc_keys_marked_for_deletion() {
tokio::time::pause();
let mut cluster_state = ClusterState::default();
let node1 = ChitchatId::for_local_test(10_001);
let node1_state = cluster_state.node_state_mut_or_init(&node1);
node1_state.set("key_a", "1");
node1_state.delete("key_a"); tokio::time::advance(Duration::from_secs(5)).await;
node1_state.set_with_version("key_b".to_string(), "3".to_string(), 13); node1_state.heartbeat = Heartbeat(110);
cluster_state.gc_keys_marked_for_deletion(Duration::from_secs(10));
cluster_state
.node_state(&node1)
.unwrap()
.key_values
.get("key_a")
.unwrap();
cluster_state
.node_state(&node1)
.unwrap()
.key_values
.get("key_b")
.unwrap();
tokio::time::advance(Duration::from_secs(5)).await;
cluster_state.gc_keys_marked_for_deletion(Duration::from_secs(10));
assert!(
!cluster_state
.node_state(&node1)
.unwrap()
.key_values
.contains_key("key_a")
);
cluster_state
.node_state(&node1)
.unwrap()
.key_values
.get("key_b")
.unwrap();
}
#[test]
fn test_cluster_state_apply_delta() {
let mut cluster_state = ClusterState::default();
let node1 = ChitchatId::for_local_test(10_001);
let node1_state = cluster_state.node_state_mut_or_init(&node1);
node1_state.set_with_version("key_a".to_string(), "1".to_string(), 1); node1_state.set_with_version("key_b".to_string(), "3".to_string(), 3);
let node2 = ChitchatId::for_local_test(10_002);
let node2_state = cluster_state.node_state_mut_or_init(&node2);
node2_state.set_with_version("key_c".to_string(), "3".to_string(), 1);
let mut delta = Delta::default();
delta.add_node(node1.clone(), 0u64, 0u64);
delta.add_kv(&node1, "key_b", "2", 2, false);
delta.add_kv(&node1, "key_a", "4", 4, false);
delta.add_node(node2.clone(), 3, 0);
delta.add_kv(&node2, "key_d", "4", 4, false);
cluster_state.apply_delta(delta);
let node1_state = cluster_state.node_state(&node1).unwrap();
assert_eq!(
node1_state.get_versioned("key_a").unwrap(),
&VersionedValue {
value: "4".to_string(),
version: 4,
status: DeletionStatus::Set,
}
);
assert_eq!(
node1_state.get_versioned("key_b").unwrap(),
&VersionedValue {
value: "3".to_string(),
version: 3,
status: DeletionStatus::Set,
}
);
let node2_state = cluster_state.node_state(&node2).unwrap();
assert_eq!(node2_state.key_values.len(), 1);
assert_eq!(
node2_state.get_versioned("key_d").unwrap(),
&VersionedValue {
value: "4".to_string(),
version: 4,
status: DeletionStatus::Set
}
);
}
fn test_with_varying_max_transmitted_kv_helper(
cluster_state: &ClusterState,
digest: &Digest,
dead_nodes: &HashSet<&ChitchatId>,
expected_delta_atoms: &[(&ChitchatId, &str, &str, Version, bool)],
) {
let max_delta =
cluster_state.compute_partial_delta_respecting_mtu(digest, usize::MAX, dead_nodes);
let mut buf = Vec::new();
max_delta.serialize(&mut buf);
let mut mtu_per_num_entries = Vec::new();
for mtu in 100..buf.len() {
let delta = cluster_state.compute_partial_delta_respecting_mtu(digest, mtu, dead_nodes);
let num_tuples = delta.num_tuples();
if mtu_per_num_entries.len() == num_tuples + 1 {
continue;
}
buf.clear();
delta.serialize(&mut buf);
mtu_per_num_entries.push(buf.len());
}
for (num_entries, &mtu) in mtu_per_num_entries.iter().enumerate() {
let mut expected_delta = Delta::default();
for &(node, key, val, version, tombstone) in &expected_delta_atoms[..num_entries] {
expected_delta.add_node(node.clone(), 0u64, 0u64);
expected_delta.add_kv(node, key, val, version, tombstone);
}
{
let delta =
cluster_state.compute_partial_delta_respecting_mtu(digest, mtu, dead_nodes);
assert_eq!(&delta, &expected_delta);
}
{
let delta =
cluster_state.compute_partial_delta_respecting_mtu(digest, mtu + 1, dead_nodes);
assert_eq!(&delta, &expected_delta);
}
}
}
fn test_cluster_state() -> ClusterState {
let mut cluster_state = ClusterState::default();
let node1 = ChitchatId::for_local_test(10_001);
let node1_state = cluster_state.node_state_mut_or_init(&node1);
node1_state.set_with_version("key_a".to_string(), "1".to_string(), 1); node1_state.set_with_version("key_b".to_string(), "2".to_string(), 2);
let node2 = ChitchatId::for_local_test(10_002);
let node2_state = cluster_state.node_state_mut_or_init(&node2);
node2_state.set_with_version("key_a".to_string(), "1".to_string(), 1); node2_state.set_with_version("key_b".to_string(), "2".to_string(), 2); node2_state.set_with_version("key_c".to_string(), "3".to_string(), 3); node2_state.set_with_version("key_d".to_string(), "4".to_string(), 4); node2_state.delete("key_d");
cluster_state
}
#[test]
fn test_cluster_state_compute_delta_depth_first_single_node() {
let cluster_state = test_cluster_state();
let mut digest = Digest::default();
let node1 = ChitchatId::for_local_test(10_001);
let node2 = ChitchatId::for_local_test(10_002);
digest.add_node(node1.clone(), Heartbeat(0), 0, 1);
digest.add_node(node2.clone(), Heartbeat(0), 0, 2);
test_with_varying_max_transmitted_kv_helper(
&cluster_state,
&digest,
&HashSet::new(),
&[
(&node2, "key_c", "3", 3, false),
(&node2, "key_d", "", 5, true),
(&node1, "key_b", "2", 2, false),
],
);
}
#[test]
fn test_cluster_state_compute_delta_depth_first_chitchat() {
let cluster_state = test_cluster_state();
let mut digest = Digest::default();
let node1 = ChitchatId::for_local_test(10_001);
let node2 = ChitchatId::for_local_test(10_002);
digest.add_node(node1.clone(), Heartbeat(0), 0, 1);
digest.add_node(node2.clone(), Heartbeat(0), 0, 2);
test_with_varying_max_transmitted_kv_helper(
&cluster_state,
&digest,
&HashSet::new(),
&[
(&node2, "key_c", "3", 3, false),
(&node2, "key_d", "", 5, true),
(&node1, "key_b", "2", 2, false),
],
);
}
#[test]
fn test_cluster_state_compute_delta_missing_node() {
let cluster_state = test_cluster_state();
let mut digest = Digest::default();
let node1 = ChitchatId::for_local_test(10_001);
let node2 = ChitchatId::for_local_test(10_002);
digest.add_node(node2.clone(), Heartbeat(0), 0, 3);
test_with_varying_max_transmitted_kv_helper(
&cluster_state,
&digest,
&HashSet::new(),
&[
(&node1, "key_a", "1", 1, false),
(&node1, "key_b", "2", 2, false),
(&node2, "key_d", "4", 4, false),
],
);
}
#[test]
fn test_cluster_state_compute_delta_should_ignore_dead_nodes() {
let cluster_state = test_cluster_state();
let digest = Digest::default();
let node1 = ChitchatId::for_local_test(10_001);
let node2 = ChitchatId::for_local_test(10_002);
let dead_nodes = HashSet::from_iter([&node2]);
test_with_varying_max_transmitted_kv_helper(
&cluster_state,
&digest,
&dead_nodes,
&[
(&node1, "key_a", "1", 1, false),
(&node1, "key_b", "2", 2, false),
],
);
}
#[tokio::test]
async fn test_cluster_state_compute_delta_with_old_node_state_that_needs_reset() {
tokio::time::pause();
let mut cluster_state = ClusterState::default();
let node1 = ChitchatId::for_local_test(10_001);
let node2 = ChitchatId::for_local_test(10_002);
{
let node1_state = cluster_state.node_state_mut_or_init(&node1);
node1_state.heartbeat = Heartbeat(10000);
node1_state.set_with_version("key_a".to_string(), "1".to_string(), 1); node1_state.set_with_version("key_b".to_string(), "2".to_string(), 2);
let node2_state = cluster_state.node_state_mut_or_init(&node2);
node2_state.set_with_version("key_c".to_string(), "3".to_string(), 2);
}
{
let mut digest = Digest::default();
digest.add_node(node1.clone(), Heartbeat(0), 0, 1);
let delta = cluster_state.compute_partial_delta_respecting_mtu(
&digest,
MAX_UDP_DATAGRAM_PAYLOAD_SIZE,
&HashSet::new(),
);
let mut expected_delta = Delta::default();
expected_delta.add_node(node2.clone(), 0u64, 0u64);
expected_delta.add_kv(&node2.clone(), "key_c", "3", 2, false);
expected_delta.add_node(node1.clone(), 0u64, 1u64);
expected_delta.add_kv(&node1, "key_b", "2", 2, false);
expected_delta.set_serialized_len(76);
assert_eq!(delta, expected_delta);
}
cluster_state
.node_state_mut(&node1)
.unwrap()
.delete("key_a");
tokio::time::advance(Duration::from_secs(5)).await;
cluster_state.gc_keys_marked_for_deletion(Duration::from_secs(10));
{
let mut digest = Digest::default();
let node1 = ChitchatId::for_local_test(10_001);
digest.add_node(node1.clone(), Heartbeat(0), 0, 1);
let delta = cluster_state.compute_partial_delta_respecting_mtu(
&digest,
MAX_UDP_DATAGRAM_PAYLOAD_SIZE,
&HashSet::new(),
);
let mut expected_delta = Delta::default();
expected_delta.add_node(node2.clone(), 0u64, 0u64);
expected_delta.add_kv(&node2.clone(), "key_c", "3", 2, false);
expected_delta.add_node(node1.clone(), 0u64, 1u64);
expected_delta.add_kv(&node1, "key_b", "2", 2, false);
expected_delta.add_kv(&node1, "key_a", "", 3, true);
expected_delta.set_serialized_len(90);
assert_eq!(delta, expected_delta);
}
const DELETE_GRACE_PERIOD: Duration = Duration::from_secs(10);
tokio::time::advance(DELETE_GRACE_PERIOD).await;
cluster_state
.node_state_mut(&node1)
.unwrap()
.gc_keys_marked_for_deletion(DELETE_GRACE_PERIOD);
{
let mut digest = Digest::default();
digest.add_node(node1.clone(), Heartbeat(0), 0, 1);
let delta = cluster_state.compute_partial_delta_respecting_mtu(
&digest,
MAX_UDP_DATAGRAM_PAYLOAD_SIZE,
&HashSet::new(),
);
let mut expected_delta = Delta::default();
expected_delta.add_node(node2.clone(), 0u64, 0u64);
expected_delta.add_kv(&node2.clone(), "key_c", "3", 2, false);
expected_delta.add_node(node1.clone(), 3u64, 0u64);
expected_delta.add_kv(&node1, "key_b", "2", 2, false);
expected_delta.set_serialized_len(75);
assert_eq!(&delta, &expected_delta);
}
}
#[test]
fn test_iter_prefix() {
let mut node_state = NodeState::for_test();
node_state.set("Europe", "");
node_state.set("Europe:", "");
node_state.set("Europe:UK", "");
node_state.set("Asia:Japan", "");
node_state.set("Europe:Italy", "");
node_state.set("Africa:Uganda", "");
node_state.set("Oceania", "");
node_state.delete("Europe:UK");
let node_states: Vec<&str> = node_state
.iter_prefix("Europe:")
.map(|(key, _v)| key)
.collect();
assert_eq!(node_states, &["Europe:", "Europe:Italy"]);
}
#[test]
fn test_node_apply_delta_simple() {
let mut node_state = NodeState::for_test();
node_state.set_with_version("key_a", "val_a", 1);
node_state.set_with_version("key_b", "val_a", 2);
let node_delta = NodeDelta {
chitchat_id: node_state.chitchat_id.clone(),
from_version_excluded: 2,
last_gc_version: 0u64,
max_version: 4,
key_values: vec![
KeyValueMutation {
key: "key_c".to_string(),
value: "val_c".to_string(),
version: 4,
status: DeletionStatusMutation::Set,
},
KeyValueMutation {
key: "key_b".to_string(),
value: "val_b2".to_string(),
version: 3,
status: DeletionStatusMutation::Set,
},
],
};
node_state.apply_delta(node_delta, Instant::now());
assert_eq!(node_state.num_key_values(), 3);
assert_eq!(node_state.max_version(), 4);
assert_eq!(node_state.last_gc_version, 0);
assert_eq!(node_state.get("key_a").unwrap(), "val_a");
assert_eq!(node_state.get("key_b").unwrap(), "val_b2");
assert_eq!(node_state.get("key_c").unwrap(), "val_c");
}
#[test]
fn test_node_apply_same_value_different_version() {
let mut node_state = NodeState::for_test();
node_state.set_with_version("key_a", "val_a", 1);
let node_delta = NodeDelta {
chitchat_id: node_state.chitchat_id.clone(),
from_version_excluded: 1,
last_gc_version: 0,
max_version: 3,
key_values: vec![KeyValueMutation {
key: "key_a".to_string(),
value: "val_a".to_string(),
version: 3,
status: DeletionStatusMutation::Set,
}],
};
node_state.apply_delta(node_delta, Instant::now());
let versioned_a = node_state.get_versioned("key_a").unwrap();
assert_eq!(versioned_a.version, 3);
assert_eq!(versioned_a.status, DeletionStatus::Set);
assert_eq!(&versioned_a.value, "val_a");
}
#[test]
fn test_node_skip_delta_from_the_future() {
let mut node_state = NodeState::for_test();
node_state.set_with_version("key_a", "val_a", 5);
assert_eq!(node_state.max_version(), 5);
let node_delta = NodeDelta {
chitchat_id: node_state.chitchat_id.clone(),
from_version_excluded: 6, last_gc_version: 0,
max_version: 7,
key_values: vec![KeyValueMutation {
key: "key_a".to_string(),
value: "new_val".to_string(),
version: 7,
status: DeletionStatusMutation::Set,
}],
};
node_state.apply_delta(node_delta, Instant::now());
let versioned_a = node_state.get_versioned("key_a").unwrap();
assert_eq!(versioned_a.version, 5);
assert_eq!(versioned_a.status, DeletionStatus::Set);
assert_eq!(&versioned_a.value, "val_a");
}
#[tokio::test]
async fn test_node_apply_delta_different_last_gc_is_ok_if_below_max_version() {
tokio::time::pause();
const GC_PERIOD: Duration = Duration::from_secs(10);
let mut node_state = NodeState::for_test();
node_state.set_with_version("key_a", "val_a", 17);
node_state.delete("key_a");
tokio::time::advance(GC_PERIOD).await;
node_state.gc_keys_marked_for_deletion(GC_PERIOD);
assert_eq!(node_state.last_gc_version, 18);
assert_eq!(node_state.max_version(), 18);
node_state.set_with_version("key_a", "val_a", 31);
let node_delta = NodeDelta {
chitchat_id: node_state.chitchat_id.clone(),
from_version_excluded: 31, last_gc_version: 30,
max_version: 32,
key_values: vec![KeyValueMutation {
key: "key_a".to_string(),
value: "new_val".to_string(),
version: 32,
status: DeletionStatusMutation::Set,
}],
};
node_state.apply_delta(node_delta, Instant::now());
let versioned_a = node_state.get_versioned("key_a").unwrap();
assert_eq!(versioned_a.version, 32);
assert_eq!(node_state.max_version(), 32);
assert_eq!(versioned_a.status, DeletionStatus::Set);
assert_eq!(&versioned_a.value, "new_val");
}
#[tokio::test]
async fn test_node_apply_delta_on_reset_fresher_version() {
tokio::time::pause();
let mut node_state = NodeState::for_test();
node_state.set_with_version("key_a", "val_a", 17);
assert_eq!(node_state.max_version(), 17);
let node_delta = NodeDelta {
chitchat_id: node_state.chitchat_id.clone(),
from_version_excluded: 0, last_gc_version: 30,
max_version: 32,
key_values: vec![KeyValueMutation {
key: "key_b".to_string(),
value: "val_b".to_string(),
version: 32,
status: DeletionStatusMutation::Set,
}],
};
node_state.apply_delta(node_delta, Instant::now());
assert!(node_state.get_versioned("key_a").is_none());
let versioned_b = node_state.get_versioned("key_b").unwrap();
assert_eq!(versioned_b.version, 32);
}
#[tokio::test]
async fn test_node_apply_delta_no_reset_if_older_version() {
tokio::time::pause();
let mut node_state = NodeState::for_test();
node_state.set_with_version("key_a", "val_a", 31);
node_state.set_with_version("key_b", "val_b2", 32);
assert_eq!(node_state.max_version(), 32);
let node_delta = NodeDelta {
chitchat_id: node_state.chitchat_id.clone(),
from_version_excluded: 0, last_gc_version: 17,
max_version: 30,
key_values: vec![KeyValueMutation {
key: "key_b".to_string(),
value: "val_b".to_string(),
version: 30,
status: DeletionStatusMutation::Set,
}],
};
node_state.apply_delta(node_delta, Instant::now());
assert_eq!(node_state.max_version, 32);
let versioned_b = node_state.get_versioned("key_b").unwrap();
assert_eq!(versioned_b.version, 32);
assert_eq!(versioned_b.value, "val_b2");
}
#[test]
fn test_apply_delta_reset_does_not_violate_monotonic_property() {
let mut node_state = NodeState::for_test();
node_state.set_with_version("key_a", "val_a", 50);
node_state.set_with_version("key_b", "val_b", 70);
node_state.last_gc_version = 50;
assert_eq!(node_state.max_version(), 70);
assert_eq!(node_state.last_gc_version, 50);
let monotonic_before = node_state.monotonic_property();
let node_delta = NodeDelta {
chitchat_id: node_state.chitchat_id.clone(),
from_version_excluded: 0,
last_gc_version: 120,
max_version: 80,
key_values: vec![KeyValueMutation {
key: "key_c".to_string(),
value: "val_c".to_string(),
version: 80,
status: DeletionStatusMutation::Set,
}],
};
let delta_status = node_state.apply_delta(node_delta, Instant::now());
assert_eq!(delta_status, DeltaStatus::ApplyAfterReset);
let monotonic_after = node_state.monotonic_property();
assert_eq!(monotonic_after, (120, 80));
assert!(monotonic_after >= monotonic_before);
assert_eq!(node_state.max_version(), 80);
assert_eq!(node_state.last_gc_version, 120);
assert!(node_state.get("key_a").is_none());
assert!(node_state.get("key_b").is_none());
assert_eq!(node_state.get("key_c").unwrap(), "val_c");
}
#[test]
fn test_delta_accepted_when_sender_gc_equals_receiver_max() {
let node_id = ChitchatId::for_local_test(10_001);
let mut node_state_a = NodeState::for_test();
node_state_a.set_with_version("key_a", "val_a", 70);
node_state_a.last_gc_version = 50;
assert_eq!(node_state_a.max_version(), 70);
let node_delta = NodeDelta {
chitchat_id: node_id,
from_version_excluded: 70,
last_gc_version: 70,
max_version: 100,
key_values: vec![KeyValueMutation {
key: "key_b".to_string(),
value: "val_b".to_string(),
version: 100,
status: DeletionStatusMutation::Set,
}],
};
let delta_status = node_state_a.apply_delta(node_delta, Instant::now());
assert_eq!(delta_status, DeltaStatus::Apply);
assert_eq!(node_state_a.max_version(), 100);
assert_eq!(node_state_a.get("key_b").unwrap(), "val_b");
}
#[test]
fn test_cluster_state_apply_delta_last_gc_equal_max_does_not_reset() {
let mut cluster_state = ClusterState::default();
let node = ChitchatId::for_local_test(10_001);
let node_state = cluster_state.node_state_mut_or_init(&node);
node_state.set_with_version("key_a", "val_a", 20);
node_state.set_last_gc_version(0);
let mut delta = Delta::default();
delta.add_node(node.clone(), 20, 0);
delta.add_kv(&node, "key_b", "val_b", 18, false);
let contains_reset = cluster_state.apply_delta(delta);
assert!(!contains_reset);
let node_state_after = cluster_state.node_state(&node).unwrap();
assert_eq!(node_state_after.max_version(), 20);
assert_eq!(node_state_after.get("key_a"), Some("val_a"));
}
#[test]
fn test_node_set_delete() {
let mut node_state = NodeState::for_test();
node_state.set("key_a", "val_b");
node_state.delete("key_a");
assert!(node_state.get("key_a").is_none());
}
#[test]
fn test_node_set_delete_after_ttl_set() {
let mut node_state = NodeState::for_test();
node_state.set("key_a", "val_b");
node_state.delete_after_ttl("key_a");
node_state.set("key_a", "val_b2");
assert!(matches!(
node_state.get_versioned("key_a").unwrap().status,
DeletionStatus::Set
));
}
#[test]
fn test_node_set_with_ttl() {
let mut node_state = NodeState::for_test();
node_state.set_with_ttl("key_a", "val_b");
let versioned_value = node_state.get_versioned("key_a").unwrap();
assert!(matches!(
versioned_value.status,
DeletionStatus::DeleteAfterTtl(_)
));
assert_eq!(versioned_value.value, "val_b");
}
}