Skip to main content

atomr_distributed_data/
pruning.rs

1//! Pruning state for replicated CRDTs.
2//!
3//! When a cluster member is permanently removed, its contributions to
4//! a CRDT (vector entries, set elements added by it, etc.) must be
5//! transferred to a still-alive "seen-by" node so causal ordering
6//! is preserved. The transfer is two-phase:
7//!
8//! 1. **Initialized**: a still-alive `owner` is chosen to take over
9//!    the removed node's contributions. The pruning is announced to
10//!    every replica.
11//! 2. **Performed**: every replica has applied the pruning. The
12//!    pruning marker can then be garbage-collected once the
13//!    `obsolete_at` round has fully propagated.
14//!
15//! This module ships the type and the state-machine helpers; the
16//! actual rewriting of CRDT internals happens in the per-CRDT
17//! `prune` implementation (added per-CRDT as needed).
18
19use std::collections::BTreeMap;
20
21use serde::{Deserialize, Serialize};
22
23/// Per-(removed-node, owner) pruning state.
24#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
25pub enum PruningPhase {
26    /// Pruning announced; not yet observed by every replica.
27    Initialized { owner: String },
28    /// Pruning observed by every replica. The marker can be
29    /// garbage-collected after `obsolete_at` rounds have elapsed
30    /// since `Performed` was set.
31    Performed { owner: String, obsolete_at: u64 },
32}
33
34/// State carried alongside a CRDT entry — maps each *removed* node
35/// to its pruning phase. Per, the map's keys
36/// are the addresses that have left the cluster.
37#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
38pub struct PruningState {
39    pub markers: BTreeMap<String, PruningPhase>,
40}
41
42impl PruningState {
43    pub fn new() -> Self {
44        Self::default()
45    }
46
47    /// Announce that `removed_node` is being pruned, with `owner`
48    /// taking over. No-op if a marker for `removed_node` already
49    /// exists in any phase.
50    pub fn initialize(&mut self, removed_node: String, owner: String) {
51        self.markers.entry(removed_node).or_insert(PruningPhase::Initialized { owner });
52    }
53
54    /// Mark that pruning of `removed_node` has been observed
55    /// everywhere; the marker can be aged out at `obsolete_at`.
56    /// Returns `true` if the phase advanced from Initialized →
57    /// Performed.
58    pub fn mark_performed(&mut self, removed_node: &str, obsolete_at: u64) -> bool {
59        match self.markers.get_mut(removed_node) {
60            Some(PruningPhase::Initialized { owner }) => {
61                let owner = std::mem::take(owner);
62                self.markers.insert(removed_node.to_string(), PruningPhase::Performed { owner, obsolete_at });
63                true
64            }
65            _ => false,
66        }
67    }
68
69    /// True if `removed_node` is currently being pruned.
70    pub fn is_pruned(&self, removed_node: &str) -> bool {
71        self.markers.contains_key(removed_node)
72    }
73
74    /// Return the owner that has taken over `removed_node`, if any.
75    pub fn owner(&self, removed_node: &str) -> Option<&str> {
76        match self.markers.get(removed_node)? {
77            PruningPhase::Initialized { owner } | PruningPhase::Performed { owner, .. } => Some(owner),
78        }
79    }
80
81    /// Discard pruning markers whose `obsolete_at` is ≤ `current_round`.
82    /// Returns the number of markers removed.
83    pub fn gc(&mut self, current_round: u64) -> usize {
84        let before = self.markers.len();
85        self.markers.retain(|_, phase| match phase {
86            PruningPhase::Initialized { .. } => true,
87            PruningPhase::Performed { obsolete_at, .. } => *obsolete_at > current_round,
88        });
89        before - self.markers.len()
90    }
91
92    /// Merge `other` into self. Performed wins over Initialized when
93    /// the same node is referenced; latest `obsolete_at` wins among
94    /// two Performed entries.
95    pub fn merge(&mut self, other: &Self) {
96        for (k, v) in &other.markers {
97            match (self.markers.get(k), v) {
98                (None, _) => {
99                    self.markers.insert(k.clone(), v.clone());
100                }
101                (Some(PruningPhase::Initialized { .. }), PruningPhase::Performed { .. }) => {
102                    self.markers.insert(k.clone(), v.clone());
103                }
104                (
105                    Some(PruningPhase::Performed { obsolete_at: lhs, .. }),
106                    PruningPhase::Performed { obsolete_at: rhs, .. },
107                ) if rhs > lhs => {
108                    self.markers.insert(k.clone(), v.clone());
109                }
110                _ => {}
111            }
112        }
113    }
114}
115
116#[cfg(test)]
117mod tests {
118    use super::*;
119
120    #[test]
121    fn initialize_records_owner() {
122        let mut p = PruningState::new();
123        p.initialize("dead".into(), "alive".into());
124        assert!(p.is_pruned("dead"));
125        assert_eq!(p.owner("dead"), Some("alive"));
126    }
127
128    #[test]
129    fn double_initialize_is_idempotent() {
130        let mut p = PruningState::new();
131        p.initialize("dead".into(), "alive1".into());
132        p.initialize("dead".into(), "alive2".into());
133        // Owner does not change once recorded.
134        assert_eq!(p.owner("dead"), Some("alive1"));
135    }
136
137    #[test]
138    fn perform_advances_phase() {
139        let mut p = PruningState::new();
140        p.initialize("dead".into(), "alive".into());
141        assert!(p.mark_performed("dead", 100));
142        // Second attempt is a no-op (already Performed).
143        assert!(!p.mark_performed("dead", 200));
144    }
145
146    #[test]
147    fn gc_drops_obsolete_markers() {
148        let mut p = PruningState::new();
149        p.initialize("dead".into(), "alive".into());
150        p.mark_performed("dead", 5);
151        let removed = p.gc(10);
152        assert_eq!(removed, 1);
153        assert!(!p.is_pruned("dead"));
154    }
155
156    #[test]
157    fn gc_keeps_initialized_markers() {
158        let mut p = PruningState::new();
159        p.initialize("dead".into(), "alive".into());
160        let removed = p.gc(10_000);
161        assert_eq!(removed, 0);
162        assert!(p.is_pruned("dead"));
163    }
164
165    #[test]
166    fn merge_promotes_initialized_to_performed() {
167        let mut a = PruningState::new();
168        a.initialize("dead".into(), "alive".into());
169
170        let mut b = PruningState::new();
171        b.initialize("dead".into(), "alive".into());
172        b.mark_performed("dead", 50);
173
174        a.merge(&b);
175        assert!(matches!(a.markers["dead"], PruningPhase::Performed { obsolete_at: 50, .. }));
176    }
177
178    #[test]
179    fn merge_picks_latest_obsolete_at() {
180        let mut a = PruningState::new();
181        a.initialize("dead".into(), "alive".into());
182        a.mark_performed("dead", 10);
183
184        let mut b = PruningState::new();
185        b.initialize("dead".into(), "alive".into());
186        b.mark_performed("dead", 50);
187
188        a.merge(&b);
189        assert!(matches!(a.markers["dead"], PruningPhase::Performed { obsolete_at: 50, .. }));
190    }
191}
192
193// -- WriteAggregator / ReadAggregator -------------------------------
194
195/// Counts acks against a target derived from a [`crate::WriteConsistency`]
196/// and `cluster_size`.
197#[derive(Debug)]
198pub struct WriteAggregator {
199    target: usize,
200    received: usize,
201    nacks: usize,
202}
203
204impl WriteAggregator {
205    pub fn new(target: usize) -> Self {
206        Self { target: target.max(1), received: 0, nacks: 0 }
207    }
208
209    pub fn ack(&mut self) {
210        self.received += 1;
211    }
212
213    pub fn nack(&mut self) {
214        self.nacks += 1;
215    }
216
217    /// True when enough positive acks have arrived.
218    pub fn is_satisfied(&self) -> bool {
219        self.received >= self.target
220    }
221
222    /// True when so many negative acks have arrived that the target
223    /// can no longer be reached.
224    pub fn is_failed(&self, cluster_size: usize) -> bool {
225        self.nacks > cluster_size.saturating_sub(self.target)
226    }
227
228    pub fn received(&self) -> usize {
229        self.received
230    }
231
232    pub fn target(&self) -> usize {
233        self.target
234    }
235}
236
237/// Counts replies against a target derived from a [`crate::ReadConsistency`]
238/// and `cluster_size`. Identical shape to
239/// `WriteAggregator` but distinct so call sites cannot mix them up.
240#[derive(Debug)]
241pub struct ReadAggregator {
242    target: usize,
243    received: usize,
244}
245
246impl ReadAggregator {
247    pub fn new(target: usize) -> Self {
248        Self { target: target.max(1), received: 0 }
249    }
250
251    pub fn reply(&mut self) {
252        self.received += 1;
253    }
254
255    pub fn is_satisfied(&self) -> bool {
256        self.received >= self.target
257    }
258
259    pub fn target(&self) -> usize {
260        self.target
261    }
262}
263
264#[cfg(test)]
265mod aggregator_tests {
266    use super::*;
267
268    #[test]
269    fn write_satisfied_after_target_acks() {
270        let mut a = WriteAggregator::new(3);
271        a.ack();
272        a.ack();
273        assert!(!a.is_satisfied());
274        a.ack();
275        assert!(a.is_satisfied());
276    }
277
278    #[test]
279    fn write_fails_when_too_many_nacks() {
280        let mut a = WriteAggregator::new(3);
281        // cluster_size=4, target=3 → can tolerate 1 nack; 2 fails.
282        a.nack();
283        assert!(!a.is_failed(4));
284        a.nack();
285        assert!(a.is_failed(4));
286    }
287
288    #[test]
289    fn read_satisfied_after_target_replies() {
290        let mut a = ReadAggregator::new(2);
291        a.reply();
292        assert!(!a.is_satisfied());
293        a.reply();
294        assert!(a.is_satisfied());
295    }
296}