atomr_distributed_data/
pruning.rs1use std::collections::BTreeMap;
20
21use serde::{Deserialize, Serialize};
22
23#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
25pub enum PruningPhase {
26 Initialized { owner: String },
28 Performed { owner: String, obsolete_at: u64 },
32}
33
34#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
38pub struct PruningState {
39 pub markers: BTreeMap<String, PruningPhase>,
40}
41
42impl PruningState {
43 pub fn new() -> Self {
44 Self::default()
45 }
46
47 pub fn initialize(&mut self, removed_node: String, owner: String) {
51 self.markers.entry(removed_node).or_insert(PruningPhase::Initialized { owner });
52 }
53
54 pub fn mark_performed(&mut self, removed_node: &str, obsolete_at: u64) -> bool {
59 match self.markers.get_mut(removed_node) {
60 Some(PruningPhase::Initialized { owner }) => {
61 let owner = std::mem::take(owner);
62 self.markers.insert(removed_node.to_string(), PruningPhase::Performed { owner, obsolete_at });
63 true
64 }
65 _ => false,
66 }
67 }
68
69 pub fn is_pruned(&self, removed_node: &str) -> bool {
71 self.markers.contains_key(removed_node)
72 }
73
74 pub fn owner(&self, removed_node: &str) -> Option<&str> {
76 match self.markers.get(removed_node)? {
77 PruningPhase::Initialized { owner } | PruningPhase::Performed { owner, .. } => Some(owner),
78 }
79 }
80
81 pub fn gc(&mut self, current_round: u64) -> usize {
84 let before = self.markers.len();
85 self.markers.retain(|_, phase| match phase {
86 PruningPhase::Initialized { .. } => true,
87 PruningPhase::Performed { obsolete_at, .. } => *obsolete_at > current_round,
88 });
89 before - self.markers.len()
90 }
91
92 pub fn merge(&mut self, other: &Self) {
96 for (k, v) in &other.markers {
97 match (self.markers.get(k), v) {
98 (None, _) => {
99 self.markers.insert(k.clone(), v.clone());
100 }
101 (Some(PruningPhase::Initialized { .. }), PruningPhase::Performed { .. }) => {
102 self.markers.insert(k.clone(), v.clone());
103 }
104 (
105 Some(PruningPhase::Performed { obsolete_at: lhs, .. }),
106 PruningPhase::Performed { obsolete_at: rhs, .. },
107 ) if rhs > lhs => {
108 self.markers.insert(k.clone(), v.clone());
109 }
110 _ => {}
111 }
112 }
113 }
114}
115
116#[cfg(test)]
117mod tests {
118 use super::*;
119
120 #[test]
121 fn initialize_records_owner() {
122 let mut p = PruningState::new();
123 p.initialize("dead".into(), "alive".into());
124 assert!(p.is_pruned("dead"));
125 assert_eq!(p.owner("dead"), Some("alive"));
126 }
127
128 #[test]
129 fn double_initialize_is_idempotent() {
130 let mut p = PruningState::new();
131 p.initialize("dead".into(), "alive1".into());
132 p.initialize("dead".into(), "alive2".into());
133 assert_eq!(p.owner("dead"), Some("alive1"));
135 }
136
137 #[test]
138 fn perform_advances_phase() {
139 let mut p = PruningState::new();
140 p.initialize("dead".into(), "alive".into());
141 assert!(p.mark_performed("dead", 100));
142 assert!(!p.mark_performed("dead", 200));
144 }
145
146 #[test]
147 fn gc_drops_obsolete_markers() {
148 let mut p = PruningState::new();
149 p.initialize("dead".into(), "alive".into());
150 p.mark_performed("dead", 5);
151 let removed = p.gc(10);
152 assert_eq!(removed, 1);
153 assert!(!p.is_pruned("dead"));
154 }
155
156 #[test]
157 fn gc_keeps_initialized_markers() {
158 let mut p = PruningState::new();
159 p.initialize("dead".into(), "alive".into());
160 let removed = p.gc(10_000);
161 assert_eq!(removed, 0);
162 assert!(p.is_pruned("dead"));
163 }
164
165 #[test]
166 fn merge_promotes_initialized_to_performed() {
167 let mut a = PruningState::new();
168 a.initialize("dead".into(), "alive".into());
169
170 let mut b = PruningState::new();
171 b.initialize("dead".into(), "alive".into());
172 b.mark_performed("dead", 50);
173
174 a.merge(&b);
175 assert!(matches!(a.markers["dead"], PruningPhase::Performed { obsolete_at: 50, .. }));
176 }
177
178 #[test]
179 fn merge_picks_latest_obsolete_at() {
180 let mut a = PruningState::new();
181 a.initialize("dead".into(), "alive".into());
182 a.mark_performed("dead", 10);
183
184 let mut b = PruningState::new();
185 b.initialize("dead".into(), "alive".into());
186 b.mark_performed("dead", 50);
187
188 a.merge(&b);
189 assert!(matches!(a.markers["dead"], PruningPhase::Performed { obsolete_at: 50, .. }));
190 }
191}
192
193#[derive(Debug)]
198pub struct WriteAggregator {
199 target: usize,
200 received: usize,
201 nacks: usize,
202}
203
204impl WriteAggregator {
205 pub fn new(target: usize) -> Self {
206 Self { target: target.max(1), received: 0, nacks: 0 }
207 }
208
209 pub fn ack(&mut self) {
210 self.received += 1;
211 }
212
213 pub fn nack(&mut self) {
214 self.nacks += 1;
215 }
216
217 pub fn is_satisfied(&self) -> bool {
219 self.received >= self.target
220 }
221
222 pub fn is_failed(&self, cluster_size: usize) -> bool {
225 self.nacks > cluster_size.saturating_sub(self.target)
226 }
227
228 pub fn received(&self) -> usize {
229 self.received
230 }
231
232 pub fn target(&self) -> usize {
233 self.target
234 }
235}
236
237#[derive(Debug)]
241pub struct ReadAggregator {
242 target: usize,
243 received: usize,
244}
245
246impl ReadAggregator {
247 pub fn new(target: usize) -> Self {
248 Self { target: target.max(1), received: 0 }
249 }
250
251 pub fn reply(&mut self) {
252 self.received += 1;
253 }
254
255 pub fn is_satisfied(&self) -> bool {
256 self.received >= self.target
257 }
258
259 pub fn target(&self) -> usize {
260 self.target
261 }
262}
263
264#[cfg(test)]
265mod aggregator_tests {
266 use super::*;
267
268 #[test]
269 fn write_satisfied_after_target_acks() {
270 let mut a = WriteAggregator::new(3);
271 a.ack();
272 a.ack();
273 assert!(!a.is_satisfied());
274 a.ack();
275 assert!(a.is_satisfied());
276 }
277
278 #[test]
279 fn write_fails_when_too_many_nacks() {
280 let mut a = WriteAggregator::new(3);
281 a.nack();
283 assert!(!a.is_failed(4));
284 a.nack();
285 assert!(a.is_failed(4));
286 }
287
288 #[test]
289 fn read_satisfied_after_target_replies() {
290 let mut a = ReadAggregator::new(2);
291 a.reply();
292 assert!(!a.is_satisfied());
293 a.reply();
294 assert!(a.is_satisfied());
295 }
296}