use crate::buffer::{DeltaReplica, ReplicaId, SeqNo};
use mdcs_core::lattice::Lattice;
use std::collections::VecDeque;
#[derive(Debug, Clone)]
pub enum AntiEntropyMessage<D> {
Delta {
from: ReplicaId,
to: ReplicaId,
delta: D,
seq: SeqNo,
},
Ack {
from: ReplicaId,
to: ReplicaId,
seq: SeqNo,
},
}
#[derive(Debug)]
pub struct NetworkSimulator<D> {
in_flight: VecDeque<AntiEntropyMessage<D>>,
lost: Vec<AntiEntropyMessage<D>>,
config: NetworkConfig,
rng_state: u64,
}
#[derive(Debug, Clone)]
pub struct NetworkConfig {
pub loss_rate: f64,
pub dup_rate: f64,
pub reorder_rate: f64,
}
impl Default for NetworkConfig {
fn default() -> Self {
Self {
loss_rate: 0.0,
dup_rate: 0.0,
reorder_rate: 0.0,
}
}
}
impl NetworkConfig {
pub fn lossy(loss_rate: f64) -> Self {
Self {
loss_rate,
..Default::default()
}
}
pub fn with_dups(dup_rate: f64) -> Self {
Self {
dup_rate,
..Default::default()
}
}
pub fn chaotic() -> Self {
Self {
loss_rate: 0.1,
dup_rate: 0.2,
reorder_rate: 0.3,
}
}
}
impl<D: Clone> NetworkSimulator<D> {
pub fn new(config: NetworkConfig) -> Self {
Self {
in_flight: VecDeque::new(),
lost: Vec::new(),
config,
rng_state: 12345,
}
}
fn next_random(&mut self) -> f64 {
self.rng_state = self.rng_state.wrapping_mul(1103515245).wrapping_add(12345);
((self.rng_state >> 16) & 0x7fff) as f64 / 32768.0
}
pub fn send(&mut self, msg: AntiEntropyMessage<D>) {
if self.next_random() < self.config.loss_rate {
self.lost.push(msg);
return;
}
if self.next_random() < self.config.dup_rate {
self.in_flight.push_back(msg.clone());
}
if self.next_random() < self.config.reorder_rate && !self.in_flight.is_empty() {
let pos = (self.next_random() * self.in_flight.len() as f64) as usize;
let pos = pos.min(self.in_flight.len());
self.in_flight.push_back(msg);
if pos < self.in_flight.len() - 1 {
self.in_flight.swap(pos, self.in_flight.len() - 1);
}
} else {
self.in_flight.push_back(msg);
}
}
pub fn receive(&mut self) -> Option<AntiEntropyMessage<D>> {
self.in_flight.pop_front()
}
pub fn retransmit_lost(&mut self) {
for msg in self.lost.drain(..) {
self.in_flight.push_back(msg);
}
}
pub fn is_empty(&self) -> bool {
self.in_flight.is_empty()
}
pub fn in_flight_count(&self) -> usize {
self.in_flight.len()
}
pub fn lost_count(&self) -> usize {
self.lost.len()
}
}
#[derive(Debug)]
pub struct AntiEntropyCluster<S: Lattice + Clone> {
replicas: Vec<DeltaReplica<S, S>>,
network: NetworkSimulator<S>,
}
impl<S: Lattice + Clone> AntiEntropyCluster<S> {
pub fn new(n: usize, config: NetworkConfig) -> Self {
let mut replicas = Vec::with_capacity(n);
for i in 0..n {
let mut replica = DeltaReplica::new(format!("replica_{}", i));
for j in 0..n {
if i != j {
replica.register_peer(format!("replica_{}", j));
}
}
replicas.push(replica);
}
Self {
replicas,
network: NetworkSimulator::new(config),
}
}
pub fn replica(&self, idx: usize) -> &DeltaReplica<S, S> {
&self.replicas[idx]
}
pub fn replica_mut(&mut self, idx: usize) -> &mut DeltaReplica<S, S> {
&mut self.replicas[idx]
}
pub fn mutate<F>(&mut self, replica_idx: usize, mutator: F) -> S
where
F: FnOnce(&S) -> S,
{
self.replicas[replica_idx].mutate(mutator)
}
pub fn initiate_sync(&mut self, from_idx: usize, to_idx: usize) {
let to_id = self.replicas[to_idx].id.clone();
if let Some((delta, seq)) = self.replicas[from_idx].prepare_sync(&to_id) {
let msg = AntiEntropyMessage::Delta {
from: self.replicas[from_idx].id.clone(),
to: to_id.clone(),
delta,
seq,
};
self.network.send(msg);
}
}
pub fn process_one(&mut self) -> bool {
if let Some(msg) = self.network.receive() {
match msg {
AntiEntropyMessage::Delta {
from,
to,
delta,
seq,
} => {
for replica in &mut self.replicas {
if replica.id == to {
replica.receive_delta(&delta);
let ack = AntiEntropyMessage::Ack {
from: replica.id.clone(),
to: from.clone(),
seq,
};
self.network.send(ack);
break;
}
}
}
AntiEntropyMessage::Ack { from, to, seq } => {
for replica in &mut self.replicas {
if replica.id == to {
replica.process_ack(&from, seq);
break;
}
}
}
}
true
} else {
false
}
}
pub fn drain_network(&mut self) {
while self.process_one() {}
}
pub fn broadcast(&mut self, from_idx: usize) {
let n = self.replicas.len();
for to_idx in 0..n {
if from_idx != to_idx {
self.initiate_sync(from_idx, to_idx);
}
}
}
pub fn full_sync_round(&mut self) {
let n = self.replicas.len();
for from_idx in 0..n {
for to_idx in 0..n {
if from_idx != to_idx {
self.initiate_sync(from_idx, to_idx);
}
}
}
self.drain_network();
}
pub fn is_converged(&self) -> bool {
if self.replicas.len() < 2 {
return true;
}
let first = self.replicas[0].state();
self.replicas.iter().skip(1).all(|r| r.state() == first)
}
pub fn retransmit_and_process(&mut self) {
self.network.retransmit_lost();
self.drain_network();
}
pub fn len(&self) -> usize {
self.replicas.len()
}
pub fn is_empty(&self) -> bool {
self.replicas.is_empty()
}
}
#[cfg(test)]
mod tests {
use super::*;
use mdcs_core::gset::GSet;
#[test]
fn test_network_simulator_basic() {
let mut net: NetworkSimulator<i32> = NetworkSimulator::new(NetworkConfig::default());
net.send(AntiEntropyMessage::Delta {
from: "r1".to_string(),
to: "".to_string(),
delta: 42,
seq: 1,
});
assert_eq!(net.in_flight_count(), 1);
let msg = net.receive().unwrap();
match msg {
AntiEntropyMessage::Delta { delta, .. } => assert_eq!(delta, 42),
_ => panic!("Expected delta message"),
}
}
#[test]
fn test_cluster_basic_convergence() {
let mut cluster: AntiEntropyCluster<GSet<i32>> =
AntiEntropyCluster::new(3, NetworkConfig::default());
cluster.mutate(0, |_| {
let mut d = GSet::new();
d.insert(1);
d
});
cluster.mutate(1, |_| {
let mut d = GSet::new();
d.insert(2);
d
});
assert!(!cluster.is_converged());
cluster.full_sync_round();
assert!(cluster.is_converged());
for i in 0..3 {
assert!(cluster.replica(i).state().contains(&1));
assert!(cluster.replica(i).state().contains(&2));
}
}
#[test]
fn test_convergence_under_loss() {
let mut cluster: AntiEntropyCluster<GSet<i32>> =
AntiEntropyCluster::new(3, NetworkConfig::lossy(0.5));
for i in 0..3 {
let val = (i + 1) as i32;
cluster.mutate(i, move |_| {
let mut d = GSet::new();
d.insert(val);
d
});
}
for _ in 0..10 {
cluster.full_sync_round();
cluster.retransmit_and_process();
}
assert!(cluster.is_converged());
for i in 0..3 {
for val in 1..=3 {
assert!(cluster.replica(i).state().contains(&val));
}
}
}
#[test]
fn test_convergence_with_duplicates() {
let mut cluster: AntiEntropyCluster<GSet<i32>> =
AntiEntropyCluster::new(2, NetworkConfig::with_dups(0.5));
cluster.mutate(0, |_| {
let mut d = GSet::new();
d.insert(1);
d
});
cluster.mutate(1, |_| {
let mut d = GSet::new();
d.insert(2);
d
});
for _ in 0..5 {
cluster.full_sync_round();
}
assert!(cluster.is_converged());
assert!(cluster.replica(0).state().contains(&1));
assert!(cluster.replica(0).state().contains(&2));
}
#[test]
fn test_convergence_chaotic_network() {
let mut cluster: AntiEntropyCluster<GSet<i32>> =
AntiEntropyCluster::new(4, NetworkConfig::chaotic());
for i in 0..4 {
for j in 0..5 {
let val = (i * 10 + j) as i32;
cluster.mutate(i, move |_| {
let mut d = GSet::new();
d.insert(val);
d
});
}
}
for _ in 0..20 {
cluster.full_sync_round();
cluster.retransmit_and_process();
}
assert!(cluster.is_converged());
for i in 0..4 {
for j in 0..4 {
for k in 0..5 {
let val = j * 10 + k;
assert!(
cluster.replica(i).state().contains(&val),
"Replica {} missing value {}",
i,
val
);
}
}
}
}
#[test]
fn test_idempotence_repeated_resends() {
let mut cluster: AntiEntropyCluster<GSet<i32>> =
AntiEntropyCluster::new(2, NetworkConfig::default());
cluster.mutate(0, |_| {
let mut d = GSet::new();
d.insert(42);
d
});
let initial_state = cluster.replica(1).state().clone();
cluster.full_sync_round();
let after_one = cluster.replica(1).state().clone();
for _ in 0..10 {
cluster.full_sync_round();
}
let after_many = cluster.replica(1).state().clone();
assert_eq!(after_one, after_many);
assert_ne!(initial_state, after_one);
}
}