use std::collections::BTreeMap;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum PruningPhase {
Initialized { owner: String },
Performed { owner: String, obsolete_at: u64 },
}
#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PruningState {
pub markers: BTreeMap<String, PruningPhase>,
}
impl PruningState {
pub fn new() -> Self {
Self::default()
}
pub fn initialize(&mut self, removed_node: String, owner: String) {
self.markers.entry(removed_node).or_insert(PruningPhase::Initialized { owner });
}
pub fn mark_performed(&mut self, removed_node: &str, obsolete_at: u64) -> bool {
match self.markers.get_mut(removed_node) {
Some(PruningPhase::Initialized { owner }) => {
let owner = std::mem::take(owner);
self.markers.insert(removed_node.to_string(), PruningPhase::Performed { owner, obsolete_at });
true
}
_ => false,
}
}
pub fn is_pruned(&self, removed_node: &str) -> bool {
self.markers.contains_key(removed_node)
}
pub fn owner(&self, removed_node: &str) -> Option<&str> {
match self.markers.get(removed_node)? {
PruningPhase::Initialized { owner } | PruningPhase::Performed { owner, .. } => Some(owner),
}
}
pub fn gc(&mut self, current_round: u64) -> usize {
let before = self.markers.len();
self.markers.retain(|_, phase| match phase {
PruningPhase::Initialized { .. } => true,
PruningPhase::Performed { obsolete_at, .. } => *obsolete_at > current_round,
});
before - self.markers.len()
}
pub fn merge(&mut self, other: &Self) {
for (k, v) in &other.markers {
match (self.markers.get(k), v) {
(None, _) => {
self.markers.insert(k.clone(), v.clone());
}
(Some(PruningPhase::Initialized { .. }), PruningPhase::Performed { .. }) => {
self.markers.insert(k.clone(), v.clone());
}
(
Some(PruningPhase::Performed { obsolete_at: lhs, .. }),
PruningPhase::Performed { obsolete_at: rhs, .. },
) if rhs > lhs => {
self.markers.insert(k.clone(), v.clone());
}
_ => {}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn initialize_records_owner() {
let mut p = PruningState::new();
p.initialize("dead".into(), "alive".into());
assert!(p.is_pruned("dead"));
assert_eq!(p.owner("dead"), Some("alive"));
}
#[test]
fn double_initialize_is_idempotent() {
let mut p = PruningState::new();
p.initialize("dead".into(), "alive1".into());
p.initialize("dead".into(), "alive2".into());
assert_eq!(p.owner("dead"), Some("alive1"));
}
#[test]
fn perform_advances_phase() {
let mut p = PruningState::new();
p.initialize("dead".into(), "alive".into());
assert!(p.mark_performed("dead", 100));
assert!(!p.mark_performed("dead", 200));
}
#[test]
fn gc_drops_obsolete_markers() {
let mut p = PruningState::new();
p.initialize("dead".into(), "alive".into());
p.mark_performed("dead", 5);
let removed = p.gc(10);
assert_eq!(removed, 1);
assert!(!p.is_pruned("dead"));
}
#[test]
fn gc_keeps_initialized_markers() {
let mut p = PruningState::new();
p.initialize("dead".into(), "alive".into());
let removed = p.gc(10_000);
assert_eq!(removed, 0);
assert!(p.is_pruned("dead"));
}
#[test]
fn merge_promotes_initialized_to_performed() {
let mut a = PruningState::new();
a.initialize("dead".into(), "alive".into());
let mut b = PruningState::new();
b.initialize("dead".into(), "alive".into());
b.mark_performed("dead", 50);
a.merge(&b);
assert!(matches!(a.markers["dead"], PruningPhase::Performed { obsolete_at: 50, .. }));
}
#[test]
fn merge_picks_latest_obsolete_at() {
let mut a = PruningState::new();
a.initialize("dead".into(), "alive".into());
a.mark_performed("dead", 10);
let mut b = PruningState::new();
b.initialize("dead".into(), "alive".into());
b.mark_performed("dead", 50);
a.merge(&b);
assert!(matches!(a.markers["dead"], PruningPhase::Performed { obsolete_at: 50, .. }));
}
}
#[derive(Debug)]
pub struct WriteAggregator {
target: usize,
received: usize,
nacks: usize,
}
impl WriteAggregator {
pub fn new(target: usize) -> Self {
Self { target: target.max(1), received: 0, nacks: 0 }
}
pub fn ack(&mut self) {
self.received += 1;
}
pub fn nack(&mut self) {
self.nacks += 1;
}
pub fn is_satisfied(&self) -> bool {
self.received >= self.target
}
pub fn is_failed(&self, cluster_size: usize) -> bool {
self.nacks > cluster_size.saturating_sub(self.target)
}
pub fn received(&self) -> usize {
self.received
}
pub fn target(&self) -> usize {
self.target
}
}
#[derive(Debug)]
pub struct ReadAggregator {
target: usize,
received: usize,
}
impl ReadAggregator {
pub fn new(target: usize) -> Self {
Self { target: target.max(1), received: 0 }
}
pub fn reply(&mut self) {
self.received += 1;
}
pub fn is_satisfied(&self) -> bool {
self.received >= self.target
}
pub fn target(&self) -> usize {
self.target
}
}
#[cfg(test)]
mod aggregator_tests {
use super::*;
#[test]
fn write_satisfied_after_target_acks() {
let mut a = WriteAggregator::new(3);
a.ack();
a.ack();
assert!(!a.is_satisfied());
a.ack();
assert!(a.is_satisfied());
}
#[test]
fn write_fails_when_too_many_nacks() {
let mut a = WriteAggregator::new(3);
a.nack();
assert!(!a.is_failed(4));
a.nack();
assert!(a.is_failed(4));
}
#[test]
fn read_satisfied_after_target_replies() {
let mut a = ReadAggregator::new(2);
a.reply();
assert!(!a.is_satisfied());
a.reply();
assert!(a.is_satisfied());
}
}