use mdcs_core::lattice::Lattice;
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, VecDeque};
pub type SeqNo = u64;
pub type ReplicaId = String;
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct TaggedDelta<D> {
pub seq: SeqNo,
pub delta: D,
}
#[derive(Debug, Clone)]
pub struct DeltaBuffer<D: Lattice> {
current_seq: SeqNo,
deltas: VecDeque<TaggedDelta<D>>,
max_buffer_size: usize,
}
impl<D: Lattice> DeltaBuffer<D> {
pub fn new(max_buffer_size: usize) -> Self {
Self {
current_seq: 0,
deltas: VecDeque::new(),
max_buffer_size,
}
}
pub fn push(&mut self, delta: D) {
self.current_seq += 1;
self.deltas.push_back(TaggedDelta {
seq: self.current_seq,
delta,
});
if self.deltas.len() > self.max_buffer_size {
self.compact_oldest();
}
}
pub fn deltas_since(&self, acked_seq: SeqNo) -> Vec<&TaggedDelta<D>> {
self.deltas.iter().filter(|td| td.seq > acked_seq).collect()
}
pub fn delta_group_since(&self, acked_seq: SeqNo) -> Option<D> {
let deltas: Vec<_> = self.deltas_since(acked_seq);
if deltas.is_empty() {
return None;
}
let mut group = D::bottom();
for td in deltas {
group.join_assign(&td.delta);
}
Some(group)
}
pub fn ack(&mut self, acked_seq: SeqNo) -> usize {
let initial_len = self.deltas.len();
self.deltas.retain(|td| td.seq > acked_seq);
initial_len - self.deltas.len()
}
pub fn current_seq(&self) -> SeqNo {
self.current_seq
}
pub fn len(&self) -> usize {
self.deltas.len()
}
pub fn is_empty(&self) -> bool {
self.deltas.is_empty()
}
pub fn clear(&mut self) {
self.deltas.clear();
}
fn compact_oldest(&mut self) {
if self.deltas.len() < 2 {
return;
}
let oldest = self.deltas.pop_front().unwrap();
if let Some(second) = self.deltas.front_mut() {
second.delta = oldest.delta.join(&second.delta);
}
}
}
#[derive(Debug, Clone)]
pub struct AckTracker {
acked: BTreeMap<ReplicaId, SeqNo>,
}
impl AckTracker {
pub fn new() -> Self {
Self {
acked: BTreeMap::new(),
}
}
pub fn register_peer(&mut self, peer_id: ReplicaId) {
self.acked.entry(peer_id).or_insert(0);
}
pub fn update_ack(&mut self, peer_id: &str, seq: SeqNo) {
if let Some(acked) = self.acked.get_mut(peer_id) {
*acked = (*acked).max(seq);
}
}
pub fn get_ack(&self, peer_id: &str) -> SeqNo {
self.acked.get(peer_id).copied().unwrap_or(0)
}
pub fn min_acked(&self) -> SeqNo {
self.acked.values().copied().min().unwrap_or(0)
}
pub fn peers(&self) -> impl Iterator<Item = &ReplicaId> {
self.acked.keys()
}
}
impl Default for AckTracker {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct DeltaReplica<S: Lattice, D: Lattice = S> {
pub id: ReplicaId,
state: S,
buffer: DeltaBuffer<D>,
acks: AckTracker,
_phantom: std::marker::PhantomData<D>,
}
impl<S: Lattice, D: Lattice> DeltaReplica<S, D> {
pub fn new(id: impl Into<ReplicaId>) -> Self {
Self::with_buffer_size(id, 100)
}
pub fn with_buffer_size(id: impl Into<ReplicaId>, buffer_size: usize) -> Self {
Self {
id: id.into(),
state: S::bottom(),
buffer: DeltaBuffer::new(buffer_size),
acks: AckTracker::new(),
_phantom: std::marker::PhantomData,
}
}
pub fn state(&self) -> &S {
&self.state
}
pub fn buffer(&self) -> &DeltaBuffer<D> {
&self.buffer
}
pub fn register_peer(&mut self, peer_id: ReplicaId) {
self.acks.register_peer(peer_id);
}
pub fn current_seq(&self) -> SeqNo {
self.buffer.current_seq()
}
}
impl<S: Lattice + Clone> DeltaReplica<S, S> {
pub fn mutate<F>(&mut self, mutator: F) -> S
where
F: FnOnce(&S) -> S,
{
let delta = mutator(&self.state);
self.state.join_assign(&delta);
self.buffer.push(delta.clone());
delta
}
pub fn prepare_sync(&self, peer_id: &str) -> Option<(S, SeqNo)> {
let acked = self.acks.get_ack(peer_id);
self.buffer
.delta_group_since(acked)
.map(|d| (d, self.buffer.current_seq()))
}
pub fn receive_delta(&mut self, delta: &S) {
self.state.join_assign(delta);
}
pub fn process_ack(&mut self, peer_id: &str, seq: SeqNo) {
self.acks.update_ack(peer_id, seq);
let min_acked = self.acks.min_acked();
self.buffer.ack(min_acked);
}
pub fn full_state(&self) -> &S {
&self.state
}
pub fn sync_with(&mut self, other: &mut DeltaReplica<S, S>) {
let my_state = self.state.clone();
let their_state = other.state.clone();
self.receive_delta(&their_state);
other.receive_delta(&my_state);
}
}
#[cfg(test)]
mod tests {
use super::*;
use mdcs_core::gset::GSet;
#[test]
fn test_delta_buffer_basic() {
let mut buffer: DeltaBuffer<GSet<i32>> = DeltaBuffer::new(10);
let mut delta1 = GSet::new();
delta1.insert(1);
buffer.push(delta1);
assert_eq!(buffer.current_seq(), 1);
assert_eq!(buffer.len(), 1);
let mut delta2 = GSet::new();
delta2.insert(2);
buffer.push(delta2);
assert_eq!(buffer.current_seq(), 2);
assert_eq!(buffer.len(), 2);
}
#[test]
fn test_delta_buffer_group() {
let mut buffer: DeltaBuffer<GSet<i32>> = DeltaBuffer::new(10);
for i in 1..=5 {
let mut delta = GSet::new();
delta.insert(i);
buffer.push(delta);
}
let group = buffer.delta_group_since(2).unwrap();
assert!(!group.contains(&1));
assert!(!group.contains(&2));
assert!(group.contains(&3));
assert!(group.contains(&4));
assert!(group.contains(&5));
}
#[test]
fn test_delta_buffer_ack() {
let mut buffer: DeltaBuffer<GSet<i32>> = DeltaBuffer::new(10);
for i in 1..=5 {
let mut delta = GSet::new();
delta.insert(i);
buffer.push(delta);
}
assert_eq!(buffer.len(), 5);
let removed = buffer.ack(3);
assert_eq!(removed, 3);
assert_eq!(buffer.len(), 2);
}
#[test]
fn test_delta_buffer_compaction() {
let mut buffer: DeltaBuffer<GSet<i32>> = DeltaBuffer::new(3);
for i in 1..=5 {
let mut delta = GSet::new();
delta.insert(i);
buffer.push(delta);
}
assert!(buffer.len() <= 3);
let group = buffer.delta_group_since(0).unwrap();
for i in 1..=5 {
assert!(group.contains(&i));
}
}
#[test]
fn test_ack_tracker() {
let mut tracker = AckTracker::new();
tracker.register_peer("peer1".to_string());
tracker.register_peer("peer2".to_string());
assert_eq!(tracker.get_ack("peer1"), 0);
assert_eq!(tracker.get_ack("peer2"), 0);
tracker.update_ack("peer1", 5);
assert_eq!(tracker.get_ack("peer1"), 5);
assert_eq!(tracker.min_acked(), 0);
tracker.update_ack("peer2", 3);
assert_eq!(tracker.min_acked(), 3);
tracker.update_ack("peer2", 7);
assert_eq!(tracker.min_acked(), 5);
}
#[test]
fn test_delta_replica_basic() {
let mut replica: DeltaReplica<GSet<i32>> = DeltaReplica::new("replica1");
replica.mutate(|_state| {
let mut delta = GSet::new();
delta.insert(42);
delta
});
assert!(replica.state().contains(&42));
assert_eq!(replica.current_seq(), 1);
}
#[test]
fn test_delta_replica_sync() {
let mut replica1: DeltaReplica<GSet<i32>> = DeltaReplica::new("r1");
let mut replica2: DeltaReplica<GSet<i32>> = DeltaReplica::new("r2");
replica1.mutate(|_| {
let mut d = GSet::new();
d.insert(1);
d
});
replica2.mutate(|_| {
let mut d = GSet::new();
d.insert(2);
d
});
assert!(replica1.state().contains(&1));
assert!(!replica1.state().contains(&2));
assert!(!replica2.state().contains(&1));
assert!(replica2.state().contains(&2));
replica1.sync_with(&mut replica2);
assert!(replica1.state().contains(&1));
assert!(replica1.state().contains(&2));
assert!(replica2.state().contains(&1));
assert!(replica2.state().contains(&2));
}
}