1#![allow(dead_code)]
12use std::collections::HashSet;
36use std::time::{Duration, Instant};
37
38use serde::{Deserialize, Serialize};
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct QuorumPolicy {
44 pub n: usize,
46 pub w: usize,
49 pub ack_timeout: Duration,
52 pub clock_skew_warn: Duration,
55}
56
57impl QuorumPolicy {
58 pub fn new(
65 n: usize,
66 w: usize,
67 ack_timeout: Duration,
68 clock_skew_warn: Duration,
69 ) -> Result<Self, QuorumError> {
70 if n == 0 {
71 return Err(QuorumError::InvalidPolicy {
72 detail: "n must be >= 1".to_string(),
73 });
74 }
75 Ok(Self {
76 n,
77 w: w.clamp(1, n),
78 ack_timeout,
79 clock_skew_warn,
80 })
81 }
82
83 pub fn majority(n: usize) -> Result<Self, QuorumError> {
90 let w = n.div_ceil(2).max(1);
91 Self::new(n, w, Duration::from_secs(2), Duration::from_secs(30))
92 }
93}
94
95#[non_exhaustive]
98#[derive(Debug, Clone, PartialEq, Eq)]
99pub enum QuorumError {
100 QuorumNotMet {
103 got: usize,
104 needed: usize,
105 reason: QuorumFailureReason,
106 },
107 InvalidPolicy { detail: String },
109 LocalWriteFailed { detail: String },
111}
112
113impl std::fmt::Display for QuorumError {
114 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115 match self {
116 Self::QuorumNotMet {
117 got,
118 needed,
119 reason,
120 } => write!(
121 f,
122 "quorum not met (got {got}, need {needed}, reason {reason:?})"
123 ),
124 Self::InvalidPolicy { detail } => write!(f, "invalid quorum policy: {detail}"),
125 Self::LocalWriteFailed { detail } => write!(f, "local write failed: {detail}"),
126 }
127 }
128}
129
130impl std::error::Error for QuorumError {}
131
132#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
134#[serde(rename_all = "snake_case")]
135pub enum QuorumFailureReason {
136 Unreachable,
139 Timeout,
142 IdDrift,
145 InFlight,
152}
153
154#[derive(Debug)]
156pub struct AckTracker {
157 policy: QuorumPolicy,
158 deadline: Instant,
159 local_committed: bool,
160 acks: HashSet<String>,
161 id_drifts: Vec<String>,
162}
163
164impl AckTracker {
165 #[must_use]
168 pub fn new(policy: QuorumPolicy, now: Instant) -> Self {
169 let deadline = now + policy.ack_timeout;
170 Self {
171 policy,
172 deadline,
173 local_committed: false,
174 acks: HashSet::new(),
175 id_drifts: Vec::new(),
176 }
177 }
178
179 pub fn record_local(&mut self) {
182 self.local_committed = true;
183 }
184
185 pub fn record_peer_ack(&mut self, peer_id: impl Into<String>) {
189 self.acks.insert(peer_id.into());
190 }
191
192 pub fn record_id_drift(&mut self, peer_id: impl Into<String>) {
196 self.id_drifts.push(peer_id.into());
197 }
198
199 #[must_use]
202 pub fn is_quorum_met(&self, now: Instant) -> bool {
203 if !self.local_committed || now > self.deadline {
204 return false;
205 }
206 let total = self.acks.len() + 1;
208 total >= self.policy.w
209 }
210
211 pub fn finalise(&self, now: Instant) -> Result<usize, QuorumError> {
219 if !self.local_committed {
220 return Err(QuorumError::LocalWriteFailed {
221 detail: "local commit not recorded before finalise".to_string(),
222 });
223 }
224 let got = self.acks.len() + 1;
225 if got >= self.policy.w {
226 return Ok(got);
227 }
228 let reason = if now > self.deadline {
242 if self.acks.is_empty() {
243 QuorumFailureReason::Unreachable
244 } else {
245 QuorumFailureReason::Timeout
246 }
247 } else {
248 QuorumFailureReason::InFlight
249 };
250 Err(QuorumError::QuorumNotMet {
251 got,
252 needed: self.policy.w,
253 reason,
254 })
255 }
256
257 #[must_use]
260 pub fn id_drift_count(&self) -> usize {
261 self.id_drifts.len()
262 }
263}
264
265#[cfg(test)]
266mod tests {
267 use super::*;
268
269 fn instant_base() -> Instant {
270 Instant::now()
271 }
272
273 #[test]
274 fn policy_rejects_zero_n() {
275 let err = QuorumPolicy::new(0, 1, Duration::from_millis(500), Duration::from_secs(30))
276 .unwrap_err();
277 assert!(matches!(err, QuorumError::InvalidPolicy { .. }));
278 }
279
280 #[test]
281 fn policy_clamps_w_to_n() {
282 let p =
283 QuorumPolicy::new(3, 9, Duration::from_millis(500), Duration::from_secs(30)).unwrap();
284 assert_eq!(p.n, 3);
285 assert_eq!(p.w, 3);
286 }
287
288 #[test]
289 fn majority_default_matches_adr() {
290 assert_eq!(QuorumPolicy::majority(1).unwrap().w, 1);
292 assert_eq!(QuorumPolicy::majority(3).unwrap().w, 2);
293 assert_eq!(QuorumPolicy::majority(5).unwrap().w, 3);
294 assert_eq!(QuorumPolicy::majority(7).unwrap().w, 4);
295 }
296
297 #[test]
298 fn quorum_met_with_local_plus_peers() {
299 let policy = QuorumPolicy::majority(3).unwrap();
300 let mut tracker = AckTracker::new(policy, instant_base());
301 tracker.record_local();
302 tracker.record_peer_ack("peer-1");
303 assert!(tracker.is_quorum_met(instant_base()));
304 }
305
306 #[test]
307 fn quorum_dedupes_duplicate_peer() {
308 let policy =
309 QuorumPolicy::new(5, 3, Duration::from_millis(500), Duration::from_secs(30)).unwrap();
310 let mut tracker = AckTracker::new(policy, instant_base());
311 tracker.record_local();
312 tracker.record_peer_ack("peer-1");
313 tracker.record_peer_ack("peer-1");
314 tracker.record_peer_ack("peer-1");
315 assert!(!tracker.is_quorum_met(instant_base()));
317 tracker.record_peer_ack("peer-2");
318 assert!(tracker.is_quorum_met(instant_base()));
319 }
320
321 #[test]
322 fn quorum_not_met_without_local() {
323 let policy = QuorumPolicy::majority(3).unwrap();
324 let mut tracker = AckTracker::new(policy, instant_base());
325 tracker.record_peer_ack("peer-1");
327 tracker.record_peer_ack("peer-2");
328 assert!(!tracker.is_quorum_met(instant_base()));
329 }
330
331 #[test]
332 fn quorum_expired_after_deadline() {
333 let policy =
334 QuorumPolicy::new(3, 2, Duration::from_millis(1), Duration::from_secs(30)).unwrap();
335 let t0 = instant_base();
336 let mut tracker = AckTracker::new(policy, t0);
337 tracker.record_local();
338 let later = t0 + Duration::from_millis(50);
339 assert!(!tracker.is_quorum_met(later));
341 let err = tracker.finalise(later).unwrap_err();
342 match err {
343 QuorumError::QuorumNotMet {
344 got,
345 needed,
346 reason,
347 } => {
348 assert_eq!(got, 1);
349 assert_eq!(needed, 2);
350 assert_eq!(reason, QuorumFailureReason::Unreachable);
351 }
352 other => panic!("expected QuorumNotMet, got {other:?}"),
353 }
354 }
355
356 #[test]
357 fn quorum_finalise_reports_timeout_when_partial_acks() {
358 let policy =
359 QuorumPolicy::new(5, 3, Duration::from_millis(1), Duration::from_secs(30)).unwrap();
360 let t0 = instant_base();
361 let mut tracker = AckTracker::new(policy, t0);
362 tracker.record_local();
363 tracker.record_peer_ack("peer-1");
364 let err = tracker
367 .finalise(t0 + Duration::from_millis(50))
368 .unwrap_err();
369 match err {
370 QuorumError::QuorumNotMet { reason, .. } => {
371 assert_eq!(reason, QuorumFailureReason::Timeout);
372 }
373 other => panic!("expected QuorumNotMet/Timeout, got {other:?}"),
374 }
375 }
376
377 #[test]
378 fn id_drift_counted_but_does_not_satisfy_quorum() {
379 let policy = QuorumPolicy::majority(3).unwrap();
380 let mut tracker = AckTracker::new(policy, instant_base());
381 tracker.record_local();
382 tracker.record_id_drift("peer-1");
383 tracker.record_id_drift("peer-2");
384 assert_eq!(tracker.id_drift_count(), 2);
386 assert!(!tracker.is_quorum_met(instant_base()));
387 }
388
389 #[test]
390 fn finalise_without_local_commit_errors_local_write_failed() {
391 let policy = QuorumPolicy::majority(3).unwrap();
392 let tracker = AckTracker::new(policy, instant_base());
393 let err = tracker.finalise(instant_base()).unwrap_err();
394 assert!(matches!(err, QuorumError::LocalWriteFailed { .. }));
395 }
396
397 #[test]
398 fn quorum_error_is_displayable_and_is_an_error() {
399 let e = QuorumError::QuorumNotMet {
400 got: 1,
401 needed: 3,
402 reason: QuorumFailureReason::Timeout,
403 };
404 let display = format!("{e}");
405 assert!(display.contains("quorum not met"));
406 let _: &dyn std::error::Error = &e;
408 }
409
410 #[test]
411 fn single_node_quorum_is_trivially_met() {
412 let policy =
417 QuorumPolicy::new(1, 1, Duration::from_millis(500), Duration::from_secs(30)).unwrap();
418 let mut tracker = AckTracker::new(policy, instant_base());
419 tracker.record_local();
420 assert!(tracker.is_quorum_met(instant_base()));
421 }
422
423 #[test]
428 fn finalise_pre_deadline_partial_acks_is_inflight() {
429 let policy =
430 QuorumPolicy::new(5, 3, Duration::from_secs(10), Duration::from_secs(30)).unwrap();
431 let t0 = instant_base();
432 let mut tracker = AckTracker::new(policy, t0);
433 tracker.record_local();
434 tracker.record_peer_ack("peer-1");
435 let err = tracker.finalise(t0).unwrap_err();
437 match err {
438 QuorumError::QuorumNotMet { reason, .. } => {
439 assert_eq!(reason, QuorumFailureReason::InFlight);
440 }
441 other => panic!("expected QuorumNotMet/InFlight, got {other:?}"),
442 }
443 }
444
445 #[test]
446 fn invalid_policy_display_contains_detail() {
447 let e = QuorumError::InvalidPolicy {
448 detail: "n must be >= 1".to_string(),
449 };
450 let s = format!("{e}");
451 assert!(s.contains("invalid quorum policy"));
452 assert!(s.contains("n must be >= 1"));
453 }
454
455 #[test]
456 fn local_write_failed_display_contains_detail() {
457 let e = QuorumError::LocalWriteFailed {
458 detail: "disk full".to_string(),
459 };
460 let s = format!("{e}");
461 assert!(s.contains("local write failed"));
462 assert!(s.contains("disk full"));
463 }
464
465 #[test]
466 fn quorum_policy_serde_roundtrip() {
467 let p = QuorumPolicy::new(5, 3, Duration::from_secs(2), Duration::from_secs(30)).unwrap();
468 let json = serde_json::to_string(&p).unwrap();
469 let back: QuorumPolicy = serde_json::from_str(&json).unwrap();
470 assert_eq!(back.n, p.n);
471 assert_eq!(back.w, p.w);
472 }
473
474 #[test]
475 fn quorum_failure_reason_serde_snake_case() {
476 let json = serde_json::to_string(&QuorumFailureReason::InFlight).unwrap();
477 assert_eq!(json, "\"in_flight\"");
478 let back: QuorumFailureReason = serde_json::from_str("\"unreachable\"").unwrap();
479 assert_eq!(back, QuorumFailureReason::Unreachable);
480 }
481
482 #[test]
483 fn finalise_succeeds_returns_count() {
484 let policy =
485 QuorumPolicy::new(3, 2, Duration::from_secs(10), Duration::from_secs(30)).unwrap();
486 let t0 = instant_base();
487 let mut tracker = AckTracker::new(policy, t0);
488 tracker.record_local();
489 tracker.record_peer_ack("p1");
490 let n = tracker.finalise(t0).unwrap();
491 assert_eq!(n, 2);
492 }
493
494 #[test]
495 fn id_drift_count_zero_initially() {
496 let policy = QuorumPolicy::majority(3).unwrap();
497 let tracker = AckTracker::new(policy, instant_base());
498 assert_eq!(tracker.id_drift_count(), 0);
499 }
500}