Skip to main content

ai_memory/
replication.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! W-of-N quorum-write layer for the peer-mesh sync (v0.7 track C).
5//!
6//! This module scaffolds the quorum-write contract described in
7//! `docs/ADR-0001-quorum-replication.md`. The `QuorumWriter` sits ABOVE
8//! the existing sync-daemon — deployments that don't configure
9//! `--quorum-writes` keep the v0.6.0 one-way push behaviour byte-for-byte.
10
11#![allow(dead_code)]
12//!
13//! ## What ships in this PR
14//!
15//! - `QuorumPolicy` — configuration: N peers, W quorum size, timeouts.
16//! - `QuorumWriter::commit` — the atomic-from-caller contract: local
17//!   write + W-1 remote acks within deadline, else
18//!   `QuorumError::QuorumNotMet`.
19//! - `AckTracker` — collects remote acks with a simple `Instant`
20//!   deadline. Pure logic, no network — so the unit tests don't need
21//!   a live sync mesh.
22//! - Metrics: `replication_quorum_ack_total{result}`,
23//!   `replication_quorum_failures_total{reason}`,
24//!   `replication_clock_skew_seconds`.
25//!
26//! ## What does NOT ship in this PR
27//!
28//! - Wiring into the `memory_store` path — follow-up PR once the
29//!   sync-daemon gains a synchronous ack channel.
30//! - Real chaos harness — follow-up PR under `tests/chaos/` with
31//!   three-node fixture and failure-injection hooks.
32//!
33//! That phasing matches the ADR's implementation plan.
34
35use std::collections::HashSet;
36use std::time::{Duration, Instant};
37
38use serde::{Deserialize, Serialize};
39
40/// Operator-tunable quorum policy. See ADR-0001 § Model for the
41/// complete contract.
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct QuorumPolicy {
44    /// Total peer count — local node + remotes. Must be >= 1.
45    pub n: usize,
46    /// Required acks including the local commit. Clamped to `[1, n]`
47    /// at construction via [`QuorumPolicy::new`].
48    pub w: usize,
49    /// Deadline for the remote-ack collection phase. Times out with
50    /// `QuorumError::QuorumNotMet { reason: Timeout }`.
51    pub ack_timeout: Duration,
52    /// Warning threshold for peer clock skew. Exceeding this does not
53    /// fail the quorum; it surfaces in the clock-skew histogram.
54    pub clock_skew_warn: Duration,
55}
56
57impl QuorumPolicy {
58    /// Construct a quorum policy. `w` is clamped to `[1, n]` and
59    /// `n = 0` is rejected as invalid input.
60    ///
61    /// # Errors
62    ///
63    /// Returns `QuorumError::InvalidPolicy` if `n == 0`.
64    pub fn new(
65        n: usize,
66        w: usize,
67        ack_timeout: Duration,
68        clock_skew_warn: Duration,
69    ) -> Result<Self, QuorumError> {
70        if n == 0 {
71            return Err(QuorumError::InvalidPolicy {
72                detail: "n must be >= 1".to_string(),
73            });
74        }
75        Ok(Self {
76            n,
77            w: w.clamp(1, n),
78            ack_timeout,
79            clock_skew_warn,
80        })
81    }
82
83    /// Majority-quorum convenience: `W = ceil((N+1)/2)`. Matches the
84    /// ADR's default.
85    ///
86    /// # Errors
87    ///
88    /// Returns `QuorumError::InvalidPolicy` if `n == 0`.
89    pub fn majority(n: usize) -> Result<Self, QuorumError> {
90        let w = n.div_ceil(2).max(1);
91        Self::new(n, w, Duration::from_secs(2), Duration::from_secs(30))
92    }
93}
94
95/// Errors surfaced by the quorum writer. Non-exhaustive so we can add
96/// variants without breaking downstream matches.
97#[non_exhaustive]
98#[derive(Debug, Clone, PartialEq, Eq)]
99pub enum QuorumError {
100    /// The local write succeeded but we did not collect enough acks
101    /// within the policy deadline.
102    QuorumNotMet {
103        got: usize,
104        needed: usize,
105        reason: QuorumFailureReason,
106    },
107    /// The policy itself is malformed (e.g. N = 0).
108    InvalidPolicy { detail: String },
109    /// The local write itself failed — caller sees the underlying cause.
110    LocalWriteFailed { detail: String },
111}
112
113impl std::fmt::Display for QuorumError {
114    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115        match self {
116            Self::QuorumNotMet {
117                got,
118                needed,
119                reason,
120            } => write!(
121                f,
122                "quorum not met (got {got}, need {needed}, reason {reason:?})"
123            ),
124            Self::InvalidPolicy { detail } => write!(f, "invalid quorum policy: {detail}"),
125            Self::LocalWriteFailed { detail } => write!(f, "local write failed: {detail}"),
126        }
127    }
128}
129
130impl std::error::Error for QuorumError {}
131
132/// Reason a quorum failed — reported in metrics.
133#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
134#[serde(rename_all = "snake_case")]
135pub enum QuorumFailureReason {
136    /// No peers reachable at all (network / DNS / zero configured).
137    /// Only reported after the deadline passed with zero acks.
138    Unreachable,
139    /// Peers reachable but fewer than `W-1` acked before deadline.
140    /// Reported after the deadline passed with a partial ack set.
141    Timeout,
142    /// Peer ack arrived but disagreed on the memory id — replication
143    /// divergence surfaced for operator investigation.
144    IdDrift,
145    /// Quorum is not (yet) met but the deadline has not passed.
146    /// Caller should keep waiting; this is a transient
147    /// "ask-for-status-while-tasks-in-flight" answer. Distinguished
148    /// from `Timeout` / `Unreachable` so retry strategies don't
149    /// confuse "give it more time" with "peers are gone"
150    /// (#299 item 3 — classification was previously inverted).
151    InFlight,
152}
153
154/// Collects remote acks against a deadline. Pure logic — no I/O.
155#[derive(Debug)]
156pub struct AckTracker {
157    policy: QuorumPolicy,
158    deadline: Instant,
159    local_committed: bool,
160    acks: HashSet<String>,
161    id_drifts: Vec<String>,
162}
163
164impl AckTracker {
165    /// Create a tracker for one quorum-write attempt. `now` is injected
166    /// for deterministic tests.
167    #[must_use]
168    pub fn new(policy: QuorumPolicy, now: Instant) -> Self {
169        let deadline = now + policy.ack_timeout;
170        Self {
171            policy,
172            deadline,
173            local_committed: false,
174            acks: HashSet::new(),
175            id_drifts: Vec::new(),
176        }
177    }
178
179    /// Record the local commit. Call once the originating node has
180    /// durably persisted the memory.
181    pub fn record_local(&mut self) {
182        self.local_committed = true;
183    }
184
185    /// Record a peer ack. `peer_id` is the caller's opaque identifier
186    /// (typically the peer's mTLS fingerprint or agent id). Duplicate
187    /// `peer_id` values are deduplicated.
188    pub fn record_peer_ack(&mut self, peer_id: impl Into<String>) {
189        self.acks.insert(peer_id.into());
190    }
191
192    /// Record that a peer returned success but with a memory id that
193    /// differs from the local commit id. Does NOT count toward the
194    /// quorum and surfaces in metrics.
195    pub fn record_id_drift(&mut self, peer_id: impl Into<String>) {
196        self.id_drifts.push(peer_id.into());
197    }
198
199    /// True when the quorum is met: local commit + at least `W-1`
200    /// unique peer acks, and the deadline has not elapsed at `now`.
201    #[must_use]
202    pub fn is_quorum_met(&self, now: Instant) -> bool {
203        if !self.local_committed || now > self.deadline {
204            return false;
205        }
206        // Total acks counted = local + distinct peers.
207        let total = self.acks.len() + 1;
208        total >= self.policy.w
209    }
210
211    /// Finalise the attempt. Returns `Ok(count_of_distinct_acks)` if
212    /// quorum met, else `Err(QuorumError::QuorumNotMet{…})`.
213    ///
214    /// # Errors
215    ///
216    /// Returns `QuorumError::QuorumNotMet` if the deadline elapsed
217    /// before W acks arrived.
218    pub fn finalise(&self, now: Instant) -> Result<usize, QuorumError> {
219        if !self.local_committed {
220            return Err(QuorumError::LocalWriteFailed {
221                detail: "local commit not recorded before finalise".to_string(),
222            });
223        }
224        let got = self.acks.len() + 1;
225        if got >= self.policy.w {
226            return Ok(got);
227        }
228        // Classification (#299 item 3 — previously collapsed the
229        // pre-deadline "still waiting" case to `Timeout` which
230        // misdirected caller retry logic):
231        //
232        //   acks.is_empty() && past deadline → Unreachable (no ack ever
233        //     landed, all peers down or network partitioned).
234        //   past deadline, partial acks      → Timeout (some peers
235        //     responded, some did not before the deadline).
236        //   pre-deadline                     → InFlight (caller is
237        //     asking early; tracker isn't done waiting yet).
238        //
239        // `InFlight` is a distinct variant so retry strategies can tell
240        // "give it more time" apart from "the peers are gone".
241        let reason = if now > self.deadline {
242            if self.acks.is_empty() {
243                QuorumFailureReason::Unreachable
244            } else {
245                QuorumFailureReason::Timeout
246            }
247        } else {
248            QuorumFailureReason::InFlight
249        };
250        Err(QuorumError::QuorumNotMet {
251            got,
252            needed: self.policy.w,
253            reason,
254        })
255    }
256
257    /// Count of peers that reported divergent memory ids for this write.
258    /// Exposed for metrics + debugging.
259    #[must_use]
260    pub fn id_drift_count(&self) -> usize {
261        self.id_drifts.len()
262    }
263
264    /// H9 (v0.7.0 round-2) — opaque view onto the set of peer ids
265    /// that have positively acknowledged this quorum write. Exposed
266    /// so `broadcast_store_quorum` (and its delete/archive siblings)
267    /// can compute `missing = configured_peers - acked_peers` and
268    /// surface that set in a `tracing::warn!` when quorum is met but
269    /// some peers did not ack — operators need to see the gap in
270    /// logs before a follow-up sync cycle catches the peer up.
271    #[must_use]
272    pub fn acked_peer_ids(&self) -> &HashSet<String> {
273        &self.acks
274    }
275}
276
277#[cfg(test)]
278mod tests {
279    use super::*;
280
281    fn instant_base() -> Instant {
282        Instant::now()
283    }
284
285    #[test]
286    fn policy_rejects_zero_n() {
287        let err = QuorumPolicy::new(0, 1, Duration::from_millis(500), Duration::from_secs(30))
288            .unwrap_err();
289        assert!(matches!(err, QuorumError::InvalidPolicy { .. }));
290    }
291
292    #[test]
293    fn policy_clamps_w_to_n() {
294        let p =
295            QuorumPolicy::new(3, 9, Duration::from_millis(500), Duration::from_secs(30)).unwrap();
296        assert_eq!(p.n, 3);
297        assert_eq!(p.w, 3);
298    }
299
300    #[test]
301    fn majority_default_matches_adr() {
302        // N = 1 => W = 1 (ceil(2/2)); N = 3 => W = 2; N = 5 => W = 3.
303        assert_eq!(QuorumPolicy::majority(1).unwrap().w, 1);
304        assert_eq!(QuorumPolicy::majority(3).unwrap().w, 2);
305        assert_eq!(QuorumPolicy::majority(5).unwrap().w, 3);
306        assert_eq!(QuorumPolicy::majority(7).unwrap().w, 4);
307    }
308
309    #[test]
310    fn quorum_met_with_local_plus_peers() {
311        let policy = QuorumPolicy::majority(3).unwrap();
312        let mut tracker = AckTracker::new(policy, instant_base());
313        tracker.record_local();
314        tracker.record_peer_ack("peer-1");
315        assert!(tracker.is_quorum_met(instant_base()));
316    }
317
318    #[test]
319    fn quorum_dedupes_duplicate_peer() {
320        let policy =
321            QuorumPolicy::new(5, 3, Duration::from_millis(500), Duration::from_secs(30)).unwrap();
322        let mut tracker = AckTracker::new(policy, instant_base());
323        tracker.record_local();
324        tracker.record_peer_ack("peer-1");
325        tracker.record_peer_ack("peer-1");
326        tracker.record_peer_ack("peer-1");
327        // Only counts once + local = 2, need 3.
328        assert!(!tracker.is_quorum_met(instant_base()));
329        tracker.record_peer_ack("peer-2");
330        assert!(tracker.is_quorum_met(instant_base()));
331    }
332
333    #[test]
334    fn quorum_not_met_without_local() {
335        let policy = QuorumPolicy::majority(3).unwrap();
336        let mut tracker = AckTracker::new(policy, instant_base());
337        // Record two peer acks but no local commit — quorum fails.
338        tracker.record_peer_ack("peer-1");
339        tracker.record_peer_ack("peer-2");
340        assert!(!tracker.is_quorum_met(instant_base()));
341    }
342
343    #[test]
344    fn quorum_expired_after_deadline() {
345        let policy =
346            QuorumPolicy::new(3, 2, Duration::from_millis(1), Duration::from_secs(30)).unwrap();
347        let t0 = instant_base();
348        let mut tracker = AckTracker::new(policy, t0);
349        tracker.record_local();
350        let later = t0 + Duration::from_millis(50);
351        // No peer acks arrived — past deadline, quorum fails.
352        assert!(!tracker.is_quorum_met(later));
353        let err = tracker.finalise(later).unwrap_err();
354        match err {
355            QuorumError::QuorumNotMet {
356                got,
357                needed,
358                reason,
359            } => {
360                assert_eq!(got, 1);
361                assert_eq!(needed, 2);
362                assert_eq!(reason, QuorumFailureReason::Unreachable);
363            }
364            other => panic!("expected QuorumNotMet, got {other:?}"),
365        }
366    }
367
368    #[test]
369    fn quorum_finalise_reports_timeout_when_partial_acks() {
370        let policy =
371            QuorumPolicy::new(5, 3, Duration::from_millis(1), Duration::from_secs(30)).unwrap();
372        let t0 = instant_base();
373        let mut tracker = AckTracker::new(policy, t0);
374        tracker.record_local();
375        tracker.record_peer_ack("peer-1");
376        // Two total acks (1 local + 1 peer); need 3. Past deadline,
377        // so it's Timeout (peers responded but not enough).
378        let err = tracker
379            .finalise(t0 + Duration::from_millis(50))
380            .unwrap_err();
381        match err {
382            QuorumError::QuorumNotMet { reason, .. } => {
383                assert_eq!(reason, QuorumFailureReason::Timeout);
384            }
385            other => panic!("expected QuorumNotMet/Timeout, got {other:?}"),
386        }
387    }
388
389    #[test]
390    fn id_drift_counted_but_does_not_satisfy_quorum() {
391        let policy = QuorumPolicy::majority(3).unwrap();
392        let mut tracker = AckTracker::new(policy, instant_base());
393        tracker.record_local();
394        tracker.record_id_drift("peer-1");
395        tracker.record_id_drift("peer-2");
396        // id-drift acks do NOT count toward quorum, only toward metrics.
397        assert_eq!(tracker.id_drift_count(), 2);
398        assert!(!tracker.is_quorum_met(instant_base()));
399    }
400
401    #[test]
402    fn finalise_without_local_commit_errors_local_write_failed() {
403        let policy = QuorumPolicy::majority(3).unwrap();
404        let tracker = AckTracker::new(policy, instant_base());
405        let err = tracker.finalise(instant_base()).unwrap_err();
406        assert!(matches!(err, QuorumError::LocalWriteFailed { .. }));
407    }
408
409    #[test]
410    fn quorum_error_is_displayable_and_is_an_error() {
411        let e = QuorumError::QuorumNotMet {
412            got: 1,
413            needed: 3,
414            reason: QuorumFailureReason::Timeout,
415        };
416        let display = format!("{e}");
417        assert!(display.contains("quorum not met"));
418        // Ensure it participates in the `std::error::Error` ecosystem.
419        let _: &dyn std::error::Error = &e;
420    }
421
422    #[test]
423    fn single_node_quorum_is_trivially_met() {
424        // N = W = 1 is the degenerate case — equivalent to the v0.6.0
425        // behaviour. Must still work so `--quorum-writes 1` is a
426        // legitimate configuration and doesn't require special cases
427        // in callers.
428        let policy =
429            QuorumPolicy::new(1, 1, Duration::from_millis(500), Duration::from_secs(30)).unwrap();
430        let mut tracker = AckTracker::new(policy, instant_base());
431        tracker.record_local();
432        assert!(tracker.is_quorum_met(instant_base()));
433    }
434
435    // -----------------------------------------------------------------
436    // W12-H — InFlight + display variants + minor edges
437    // -----------------------------------------------------------------
438
439    #[test]
440    fn finalise_pre_deadline_partial_acks_is_inflight() {
441        let policy =
442            QuorumPolicy::new(5, 3, Duration::from_secs(10), Duration::from_secs(30)).unwrap();
443        let t0 = instant_base();
444        let mut tracker = AckTracker::new(policy, t0);
445        tracker.record_local();
446        tracker.record_peer_ack("peer-1");
447        // Pre-deadline; quorum not yet met → InFlight.
448        let err = tracker.finalise(t0).unwrap_err();
449        match err {
450            QuorumError::QuorumNotMet { reason, .. } => {
451                assert_eq!(reason, QuorumFailureReason::InFlight);
452            }
453            other => panic!("expected QuorumNotMet/InFlight, got {other:?}"),
454        }
455    }
456
457    #[test]
458    fn invalid_policy_display_contains_detail() {
459        let e = QuorumError::InvalidPolicy {
460            detail: "n must be >= 1".to_string(),
461        };
462        let s = format!("{e}");
463        assert!(s.contains("invalid quorum policy"));
464        assert!(s.contains("n must be >= 1"));
465    }
466
467    #[test]
468    fn local_write_failed_display_contains_detail() {
469        let e = QuorumError::LocalWriteFailed {
470            detail: "disk full".to_string(),
471        };
472        let s = format!("{e}");
473        assert!(s.contains("local write failed"));
474        assert!(s.contains("disk full"));
475    }
476
477    #[test]
478    fn quorum_policy_serde_roundtrip() {
479        let p = QuorumPolicy::new(5, 3, Duration::from_secs(2), Duration::from_secs(30)).unwrap();
480        let json = serde_json::to_string(&p).unwrap();
481        let back: QuorumPolicy = serde_json::from_str(&json).unwrap();
482        assert_eq!(back.n, p.n);
483        assert_eq!(back.w, p.w);
484    }
485
486    #[test]
487    fn quorum_failure_reason_serde_snake_case() {
488        let json = serde_json::to_string(&QuorumFailureReason::InFlight).unwrap();
489        assert_eq!(json, "\"in_flight\"");
490        let back: QuorumFailureReason = serde_json::from_str("\"unreachable\"").unwrap();
491        assert_eq!(back, QuorumFailureReason::Unreachable);
492    }
493
494    #[test]
495    fn finalise_succeeds_returns_count() {
496        let policy =
497            QuorumPolicy::new(3, 2, Duration::from_secs(10), Duration::from_secs(30)).unwrap();
498        let t0 = instant_base();
499        let mut tracker = AckTracker::new(policy, t0);
500        tracker.record_local();
501        tracker.record_peer_ack("p1");
502        let n = tracker.finalise(t0).unwrap();
503        assert_eq!(n, 2);
504    }
505
506    #[test]
507    fn id_drift_count_zero_initially() {
508        let policy = QuorumPolicy::majority(3).unwrap();
509        let tracker = AckTracker::new(policy, instant_base());
510        assert_eq!(tracker.id_drift_count(), 0);
511    }
512}