1#![allow(dead_code)]
12use std::collections::HashSet;
36use std::time::{Duration, Instant};
37
38use serde::{Deserialize, Serialize};
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct QuorumPolicy {
44 pub n: usize,
46 pub w: usize,
49 pub ack_timeout: Duration,
52 pub clock_skew_warn: Duration,
55}
56
57impl QuorumPolicy {
58 pub fn new(
65 n: usize,
66 w: usize,
67 ack_timeout: Duration,
68 clock_skew_warn: Duration,
69 ) -> Result<Self, QuorumError> {
70 if n == 0 {
71 return Err(QuorumError::InvalidPolicy {
72 detail: "n must be >= 1".to_string(),
73 });
74 }
75 Ok(Self {
76 n,
77 w: w.clamp(1, n),
78 ack_timeout,
79 clock_skew_warn,
80 })
81 }
82
83 pub fn majority(n: usize) -> Result<Self, QuorumError> {
90 let w = n.div_ceil(2).max(1);
91 Self::new(n, w, Duration::from_secs(2), Duration::from_secs(30))
92 }
93}
94
95#[non_exhaustive]
98#[derive(Debug, Clone, PartialEq, Eq)]
99pub enum QuorumError {
100 QuorumNotMet {
103 got: usize,
104 needed: usize,
105 reason: QuorumFailureReason,
106 },
107 InvalidPolicy { detail: String },
109 LocalWriteFailed { detail: String },
111}
112
113impl std::fmt::Display for QuorumError {
114 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115 match self {
116 Self::QuorumNotMet {
117 got,
118 needed,
119 reason,
120 } => write!(
121 f,
122 "quorum not met (got {got}, need {needed}, reason {reason:?})"
123 ),
124 Self::InvalidPolicy { detail } => write!(f, "invalid quorum policy: {detail}"),
125 Self::LocalWriteFailed { detail } => write!(f, "local write failed: {detail}"),
126 }
127 }
128}
129
130impl std::error::Error for QuorumError {}
131
132#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
134#[serde(rename_all = "snake_case")]
135pub enum QuorumFailureReason {
136 Unreachable,
139 Timeout,
142 IdDrift,
145 InFlight,
152}
153
154#[derive(Debug)]
156pub struct AckTracker {
157 policy: QuorumPolicy,
158 deadline: Instant,
159 local_committed: bool,
160 acks: HashSet<String>,
161 id_drifts: Vec<String>,
162}
163
164impl AckTracker {
165 #[must_use]
168 pub fn new(policy: QuorumPolicy, now: Instant) -> Self {
169 let deadline = now + policy.ack_timeout;
170 Self {
171 policy,
172 deadline,
173 local_committed: false,
174 acks: HashSet::new(),
175 id_drifts: Vec::new(),
176 }
177 }
178
179 pub fn record_local(&mut self) {
182 self.local_committed = true;
183 }
184
185 pub fn record_peer_ack(&mut self, peer_id: impl Into<String>) {
189 self.acks.insert(peer_id.into());
190 }
191
192 pub fn record_id_drift(&mut self, peer_id: impl Into<String>) {
196 self.id_drifts.push(peer_id.into());
197 }
198
199 #[must_use]
202 pub fn is_quorum_met(&self, now: Instant) -> bool {
203 if !self.local_committed || now > self.deadline {
204 return false;
205 }
206 let total = self.acks.len() + 1;
208 total >= self.policy.w
209 }
210
211 pub fn finalise(&self, now: Instant) -> Result<usize, QuorumError> {
219 if !self.local_committed {
220 return Err(QuorumError::LocalWriteFailed {
221 detail: "local commit not recorded before finalise".to_string(),
222 });
223 }
224 let got = self.acks.len() + 1;
225 if got >= self.policy.w {
226 return Ok(got);
227 }
228 let reason = if now > self.deadline {
242 if self.acks.is_empty() {
243 QuorumFailureReason::Unreachable
244 } else {
245 QuorumFailureReason::Timeout
246 }
247 } else {
248 QuorumFailureReason::InFlight
249 };
250 Err(QuorumError::QuorumNotMet {
251 got,
252 needed: self.policy.w,
253 reason,
254 })
255 }
256
257 #[must_use]
260 pub fn id_drift_count(&self) -> usize {
261 self.id_drifts.len()
262 }
263
264 #[must_use]
272 pub fn acked_peer_ids(&self) -> &HashSet<String> {
273 &self.acks
274 }
275}
276
277#[cfg(test)]
278mod tests {
279 use super::*;
280
281 fn instant_base() -> Instant {
282 Instant::now()
283 }
284
285 #[test]
286 fn policy_rejects_zero_n() {
287 let err = QuorumPolicy::new(0, 1, Duration::from_millis(500), Duration::from_secs(30))
288 .unwrap_err();
289 assert!(matches!(err, QuorumError::InvalidPolicy { .. }));
290 }
291
292 #[test]
293 fn policy_clamps_w_to_n() {
294 let p =
295 QuorumPolicy::new(3, 9, Duration::from_millis(500), Duration::from_secs(30)).unwrap();
296 assert_eq!(p.n, 3);
297 assert_eq!(p.w, 3);
298 }
299
300 #[test]
301 fn majority_default_matches_adr() {
302 assert_eq!(QuorumPolicy::majority(1).unwrap().w, 1);
304 assert_eq!(QuorumPolicy::majority(3).unwrap().w, 2);
305 assert_eq!(QuorumPolicy::majority(5).unwrap().w, 3);
306 assert_eq!(QuorumPolicy::majority(7).unwrap().w, 4);
307 }
308
309 #[test]
310 fn quorum_met_with_local_plus_peers() {
311 let policy = QuorumPolicy::majority(3).unwrap();
312 let mut tracker = AckTracker::new(policy, instant_base());
313 tracker.record_local();
314 tracker.record_peer_ack("peer-1");
315 assert!(tracker.is_quorum_met(instant_base()));
316 }
317
318 #[test]
319 fn quorum_dedupes_duplicate_peer() {
320 let policy =
321 QuorumPolicy::new(5, 3, Duration::from_millis(500), Duration::from_secs(30)).unwrap();
322 let mut tracker = AckTracker::new(policy, instant_base());
323 tracker.record_local();
324 tracker.record_peer_ack("peer-1");
325 tracker.record_peer_ack("peer-1");
326 tracker.record_peer_ack("peer-1");
327 assert!(!tracker.is_quorum_met(instant_base()));
329 tracker.record_peer_ack("peer-2");
330 assert!(tracker.is_quorum_met(instant_base()));
331 }
332
333 #[test]
334 fn quorum_not_met_without_local() {
335 let policy = QuorumPolicy::majority(3).unwrap();
336 let mut tracker = AckTracker::new(policy, instant_base());
337 tracker.record_peer_ack("peer-1");
339 tracker.record_peer_ack("peer-2");
340 assert!(!tracker.is_quorum_met(instant_base()));
341 }
342
343 #[test]
344 fn quorum_expired_after_deadline() {
345 let policy =
346 QuorumPolicy::new(3, 2, Duration::from_millis(1), Duration::from_secs(30)).unwrap();
347 let t0 = instant_base();
348 let mut tracker = AckTracker::new(policy, t0);
349 tracker.record_local();
350 let later = t0 + Duration::from_millis(50);
351 assert!(!tracker.is_quorum_met(later));
353 let err = tracker.finalise(later).unwrap_err();
354 match err {
355 QuorumError::QuorumNotMet {
356 got,
357 needed,
358 reason,
359 } => {
360 assert_eq!(got, 1);
361 assert_eq!(needed, 2);
362 assert_eq!(reason, QuorumFailureReason::Unreachable);
363 }
364 other => panic!("expected QuorumNotMet, got {other:?}"),
365 }
366 }
367
368 #[test]
369 fn quorum_finalise_reports_timeout_when_partial_acks() {
370 let policy =
371 QuorumPolicy::new(5, 3, Duration::from_millis(1), Duration::from_secs(30)).unwrap();
372 let t0 = instant_base();
373 let mut tracker = AckTracker::new(policy, t0);
374 tracker.record_local();
375 tracker.record_peer_ack("peer-1");
376 let err = tracker
379 .finalise(t0 + Duration::from_millis(50))
380 .unwrap_err();
381 match err {
382 QuorumError::QuorumNotMet { reason, .. } => {
383 assert_eq!(reason, QuorumFailureReason::Timeout);
384 }
385 other => panic!("expected QuorumNotMet/Timeout, got {other:?}"),
386 }
387 }
388
389 #[test]
390 fn id_drift_counted_but_does_not_satisfy_quorum() {
391 let policy = QuorumPolicy::majority(3).unwrap();
392 let mut tracker = AckTracker::new(policy, instant_base());
393 tracker.record_local();
394 tracker.record_id_drift("peer-1");
395 tracker.record_id_drift("peer-2");
396 assert_eq!(tracker.id_drift_count(), 2);
398 assert!(!tracker.is_quorum_met(instant_base()));
399 }
400
401 #[test]
402 fn finalise_without_local_commit_errors_local_write_failed() {
403 let policy = QuorumPolicy::majority(3).unwrap();
404 let tracker = AckTracker::new(policy, instant_base());
405 let err = tracker.finalise(instant_base()).unwrap_err();
406 assert!(matches!(err, QuorumError::LocalWriteFailed { .. }));
407 }
408
409 #[test]
410 fn quorum_error_is_displayable_and_is_an_error() {
411 let e = QuorumError::QuorumNotMet {
412 got: 1,
413 needed: 3,
414 reason: QuorumFailureReason::Timeout,
415 };
416 let display = format!("{e}");
417 assert!(display.contains("quorum not met"));
418 let _: &dyn std::error::Error = &e;
420 }
421
422 #[test]
423 fn single_node_quorum_is_trivially_met() {
424 let policy =
429 QuorumPolicy::new(1, 1, Duration::from_millis(500), Duration::from_secs(30)).unwrap();
430 let mut tracker = AckTracker::new(policy, instant_base());
431 tracker.record_local();
432 assert!(tracker.is_quorum_met(instant_base()));
433 }
434
435 #[test]
440 fn finalise_pre_deadline_partial_acks_is_inflight() {
441 let policy =
442 QuorumPolicy::new(5, 3, Duration::from_secs(10), Duration::from_secs(30)).unwrap();
443 let t0 = instant_base();
444 let mut tracker = AckTracker::new(policy, t0);
445 tracker.record_local();
446 tracker.record_peer_ack("peer-1");
447 let err = tracker.finalise(t0).unwrap_err();
449 match err {
450 QuorumError::QuorumNotMet { reason, .. } => {
451 assert_eq!(reason, QuorumFailureReason::InFlight);
452 }
453 other => panic!("expected QuorumNotMet/InFlight, got {other:?}"),
454 }
455 }
456
457 #[test]
458 fn invalid_policy_display_contains_detail() {
459 let e = QuorumError::InvalidPolicy {
460 detail: "n must be >= 1".to_string(),
461 };
462 let s = format!("{e}");
463 assert!(s.contains("invalid quorum policy"));
464 assert!(s.contains("n must be >= 1"));
465 }
466
467 #[test]
468 fn local_write_failed_display_contains_detail() {
469 let e = QuorumError::LocalWriteFailed {
470 detail: "disk full".to_string(),
471 };
472 let s = format!("{e}");
473 assert!(s.contains("local write failed"));
474 assert!(s.contains("disk full"));
475 }
476
477 #[test]
478 fn quorum_policy_serde_roundtrip() {
479 let p = QuorumPolicy::new(5, 3, Duration::from_secs(2), Duration::from_secs(30)).unwrap();
480 let json = serde_json::to_string(&p).unwrap();
481 let back: QuorumPolicy = serde_json::from_str(&json).unwrap();
482 assert_eq!(back.n, p.n);
483 assert_eq!(back.w, p.w);
484 }
485
486 #[test]
487 fn quorum_failure_reason_serde_snake_case() {
488 let json = serde_json::to_string(&QuorumFailureReason::InFlight).unwrap();
489 assert_eq!(json, "\"in_flight\"");
490 let back: QuorumFailureReason = serde_json::from_str("\"unreachable\"").unwrap();
491 assert_eq!(back, QuorumFailureReason::Unreachable);
492 }
493
494 #[test]
495 fn finalise_succeeds_returns_count() {
496 let policy =
497 QuorumPolicy::new(3, 2, Duration::from_secs(10), Duration::from_secs(30)).unwrap();
498 let t0 = instant_base();
499 let mut tracker = AckTracker::new(policy, t0);
500 tracker.record_local();
501 tracker.record_peer_ack("p1");
502 let n = tracker.finalise(t0).unwrap();
503 assert_eq!(n, 2);
504 }
505
506 #[test]
507 fn id_drift_count_zero_initially() {
508 let policy = QuorumPolicy::majority(3).unwrap();
509 let tracker = AckTracker::new(policy, instant_base());
510 assert_eq!(tracker.id_drift_count(), 0);
511 }
512}