Skip to main content

dynomite/msg/
response_mgr.rs

1//! Per-DC response aggregation and quorum decisions.
2//!
3//! When the engine forwards a request to multiple peer replicas, the
4//! arriving responses are routed through a [`ResponseMgr`] that
5//! tracks how many replies are good, how many errored, and whether
6//! the body checksums agree. The aggregator keeps a fixed-size
7//! array sized for [`MAX_REPLICAS_PER_DC`] replicas because the
8//! consistency model is fundamentally a small, finite-state
9//! decision table.
10//!
11//! The state machine is the union of two observations:
12//!
13//! 1. quorum size is `max_responses / 2 + 1`, matching the C
14//!    formula in `init_response_mgr`;
15//! 2. once at least `quorum_responses` good replies have arrived,
16//!    the manager checks whether their body checksums agree and
17//!    declares the request done.
18
19use crate::core::types::MsgId;
20
21use super::message::Msg;
22use super::msg_type::MsgType;
23
24/// Maximum replicas per datacenter the engine tracks. Matches
25/// `MAX_REPLICAS_PER_DC` from the C reference.
26pub const MAX_REPLICAS_PER_DC: usize = 3;
27
28/// One accepted response together with its payload checksum.
29#[derive(Debug)]
30struct GoodResponse {
31    msg: Box<Msg>,
32    checksum: u32,
33}
34
35/// Decision the manager has reached about an outstanding request.
36#[derive(Copy, Clone, Debug, Eq, PartialEq)]
37pub enum QuorumOutcome {
38    /// Still waiting for more responses.
39    Pending,
40    /// Quorum was achieved (a majority agree on the body checksum).
41    Achieved,
42    /// Enough responses arrived but they disagree on the body and no
43    /// further responses are pending; the dispatcher must reconcile.
44    Mismatched,
45    /// Quorum is impossible: too many error responses.
46    Failed,
47}
48
49/// Per-DC response aggregator.
50///
51/// The manager is bound to a single request and caps the reply count
52/// at the rack count of its datacenter.
53#[derive(Debug)]
54pub struct ResponseMgr {
55    is_read: bool,
56    max_responses: u8,
57    quorum_responses: u8,
58    error_responses: u8,
59    good: Vec<GoodResponse>,
60    err_rsp: Option<Box<Msg>>,
61    msg_id: MsgId,
62    msg_type: MsgType,
63    dc_name: Option<String>,
64}
65
66impl ResponseMgr {
67    /// Build a manager bound to `req` that expects at most
68    /// `max_responses` replies.
69    ///
70    /// `max_responses` must be in the range `1..=MAX_REPLICAS_PER_DC`;
71    /// the reference engine derives it from the rack count of the
72    /// target datacenter and panics on out-of-range values, which the
73    /// Rust port surfaces as a debug assertion.
74    ///
75    /// # Examples
76    ///
77    /// ```
78    /// use dynomite::msg::{Msg, MsgType, ResponseMgr};
79    ///
80    /// let req = Msg::new(1, MsgType::ReqRedisGet, true);
81    /// let mgr = ResponseMgr::new(&req, 3, None);
82    /// assert_eq!(mgr.max_responses(), 3);
83    /// assert_eq!(mgr.quorum_responses(), 2);
84    /// ```
85    #[must_use]
86    pub fn new(req: &Msg, max_responses: u8, dc_name: Option<String>) -> Self {
87        debug_assert!(max_responses >= 1);
88        debug_assert!(usize::from(max_responses) <= MAX_REPLICAS_PER_DC);
89        let max_responses = max_responses.max(1);
90        Self {
91            is_read: req.flags().is_read,
92            max_responses,
93            quorum_responses: max_responses / 2 + 1,
94            error_responses: 0,
95            good: Vec::with_capacity(MAX_REPLICAS_PER_DC),
96            err_rsp: None,
97            msg_id: req.id(),
98            msg_type: req.ty(),
99            dc_name,
100        }
101    }
102
103    /// True when the manager is bound to a read request.
104    ///
105    /// # Examples
106    /// ```
107    /// use dynomite::msg::{Msg, MsgType, ResponseMgr};
108    /// let req = Msg::new(1, MsgType::ReqRedisGet, true);
109    /// assert!(ResponseMgr::new(&req, 1, None).is_read());
110    /// ```
111    #[must_use]
112    pub fn is_read(&self) -> bool {
113        self.is_read
114    }
115
116    /// Highest response count this manager will ever accept.
117    #[must_use]
118    pub fn max_responses(&self) -> u8 {
119        self.max_responses
120    }
121
122    /// Number of good responses required before quorum can be
123    /// declared.
124    #[must_use]
125    pub fn quorum_responses(&self) -> u8 {
126        self.quorum_responses
127    }
128
129    /// Number of accepted (non-error) responses received so far.
130    #[must_use]
131    pub fn good_responses(&self) -> u8 {
132        u8::try_from(self.good.len()).unwrap_or(u8::MAX)
133    }
134
135    /// Number of error responses received so far.
136    #[must_use]
137    pub fn error_responses(&self) -> u8 {
138        self.error_responses
139    }
140
141    /// Number of replies still expected before any decision.
142    #[must_use]
143    pub fn pending_responses(&self) -> u8 {
144        self.max_responses
145            .saturating_sub(self.good_responses())
146            .saturating_sub(self.error_responses)
147    }
148
149    /// Datacenter label this manager was created for.
150    #[must_use]
151    pub fn dc_name(&self) -> Option<&str> {
152        self.dc_name.as_deref()
153    }
154
155    /// Id of the request this manager is bound to.
156    #[must_use]
157    pub fn msg_id(&self) -> MsgId {
158        self.msg_id
159    }
160
161    /// Type tag of the request this manager is bound to.
162    #[must_use]
163    pub fn msg_type(&self) -> MsgType {
164        self.msg_type
165    }
166
167    /// Submit `rsp` paired with its body checksum `checksum`.
168    ///
169    /// Errors are tallied separately; the first error response is
170    /// retained as the canonical error to propagate when no quorum
171    /// is possible. Good responses past `MAX_REPLICAS_PER_DC` are
172    /// dropped to mirror the fixed-size array in the reference
173    /// engine.
174    ///
175    /// # Examples
176    ///
177    /// ```
178    /// use dynomite::msg::{Msg, MsgType, ResponseMgr};
179    ///
180    /// let req = Msg::new(1, MsgType::ReqRedisGet, true);
181    /// let mut mgr = ResponseMgr::new(&req, 1, None);
182    /// let rsp = Msg::new(2, MsgType::RspRedisStatus, false);
183    /// mgr.submit_response(rsp, 0xdead_beef);
184    /// assert_eq!(mgr.good_responses(), 1);
185    /// ```
186    pub fn submit_response(&mut self, rsp: Msg, checksum: u32) {
187        if rsp.flags().is_error {
188            self.error_responses = self.error_responses.saturating_add(1);
189            if self.err_rsp.is_none() {
190                self.err_rsp = Some(Box::new(rsp));
191            }
192            return;
193        }
194        if self.good.len() < MAX_REPLICAS_PER_DC {
195            self.good.push(GoodResponse {
196                msg: Box::new(rsp),
197                checksum,
198            });
199        }
200    }
201
202    /// Determine whether the manager has reached a final decision
203    /// and, if so, what kind. Mirrors `rspmgr_check_is_done` plus
204    /// `rspmgr_is_quorum_achieved`.
205    ///
206    /// # Examples
207    ///
208    /// ```
209    /// use dynomite::msg::{
210    ///     Msg, MsgType, QuorumOutcome, ResponseMgr,
211    /// };
212    ///
213    /// let req = Msg::new(1, MsgType::ReqRedisGet, true);
214    /// let mut mgr = ResponseMgr::new(&req, 1, None);
215    /// assert_eq!(mgr.outcome(), QuorumOutcome::Pending);
216    /// mgr.submit_response(Msg::new(2, MsgType::RspRedisStatus, false), 1);
217    /// assert_eq!(mgr.outcome(), QuorumOutcome::Achieved);
218    /// ```
219    #[must_use]
220    pub fn outcome(&self) -> QuorumOutcome {
221        let good = self.good_responses();
222        let pending = self.pending_responses();
223
224        if good < self.quorum_responses {
225            if pending + good < self.quorum_responses {
226                return QuorumOutcome::Failed;
227            }
228            return QuorumOutcome::Pending;
229        }
230
231        if self.is_quorum_achieved() {
232            return QuorumOutcome::Achieved;
233        }
234
235        if pending == 0 {
236            QuorumOutcome::Mismatched
237        } else {
238            QuorumOutcome::Pending
239        }
240    }
241
242    /// Convenience: true when `outcome` reports anything but
243    /// [`QuorumOutcome::Pending`].
244    ///
245    /// # Examples
246    ///
247    /// ```
248    /// use dynomite::msg::{Msg, MsgType, ResponseMgr};
249    ///
250    /// let req = Msg::new(1, MsgType::ReqRedisGet, true);
251    /// let mgr = ResponseMgr::new(&req, 3, None);
252    /// assert!(!mgr.is_done());
253    /// ```
254    #[must_use]
255    pub fn is_done(&self) -> bool {
256        !matches!(self.outcome(), QuorumOutcome::Pending)
257    }
258
259    fn is_quorum_achieved(&self) -> bool {
260        let good = self.good_responses();
261        if self.quorum_responses == 1 && good == self.quorum_responses {
262            return true;
263        }
264        if good < self.quorum_responses {
265            return false;
266        }
267        let chk0 = self.good[0].checksum;
268        let chk1 = self.good[1].checksum;
269        if chk0 == chk1 {
270            return true;
271        }
272        if good < 3 {
273            return false;
274        }
275        let chk2 = self.good[2].checksum;
276        chk1 == chk2 || chk0 == chk2
277    }
278
279    /// Borrow the first error response, if any.
280    ///
281    /// # Examples
282    /// ```
283    /// use dynomite::msg::{Msg, MsgType, ResponseMgr};
284    /// let req = Msg::new(1, MsgType::ReqRedisGet, true);
285    /// let mgr = ResponseMgr::new(&req, 1, None);
286    /// assert!(mgr.error_response().is_none());
287    /// ```
288    #[must_use]
289    pub fn error_response(&self) -> Option<&Msg> {
290        self.err_rsp.as_deref()
291    }
292
293    /// Iterate over the accepted responses paired with their
294    /// checksums.
295    pub fn good_iter(&self) -> impl Iterator<Item = (&Msg, u32)> {
296        self.good.iter().map(|g| (g.msg.as_ref(), g.checksum))
297    }
298
299    /// Pick a response to forward to the client according to the
300    /// majority-checksum rule.
301    ///
302    /// Returns `None` when quorum has not been achieved (the caller
303    /// should consult [`Self::outcome`] and propagate the error
304    /// response from [`Self::error_response`]).
305    ///
306    /// # Examples
307    ///
308    /// ```
309    /// use dynomite::msg::{
310    ///     Msg, MsgType, QuorumOutcome, ResponseMgr,
311    /// };
312    ///
313    /// let req = Msg::new(1, MsgType::ReqRedisGet, true);
314    /// let mut mgr = ResponseMgr::new(&req, 3, None);
315    /// for id in 2..=4 {
316    ///     mgr.submit_response(Msg::new(id, MsgType::RspRedisStatus, false), 7);
317    /// }
318    /// assert_eq!(mgr.outcome(), QuorumOutcome::Achieved);
319    /// assert!(mgr.pick_response().is_some());
320    /// ```
321    #[must_use]
322    pub fn pick_response(&self) -> Option<&Msg> {
323        if !matches!(self.outcome(), QuorumOutcome::Achieved) {
324            return None;
325        }
326        match self.good.len() {
327            1 | 2 => Some(self.good[0].msg.as_ref()),
328            3 => {
329                let c0 = self.good[0].checksum;
330                let c1 = self.good[1].checksum;
331                let c2 = self.good[2].checksum;
332                if c0 == c1 {
333                    Some(self.good[0].msg.as_ref())
334                } else if c1 == c2 {
335                    Some(self.good[1].msg.as_ref())
336                } else if c0 == c2 {
337                    Some(self.good[0].msg.as_ref())
338                } else {
339                    None
340                }
341            }
342            _ => None,
343        }
344    }
345}
346
347#[cfg(test)]
348mod tests {
349    use super::*;
350    use crate::msg::{Msg, MsgType};
351
352    fn req() -> Msg {
353        let mut m = Msg::new(1, MsgType::ReqRedisGet, true);
354        m.flags_mut().is_read = true;
355        m
356    }
357
358    fn rsp(id: u64, is_error: bool) -> Msg {
359        let mut m = Msg::new(id, MsgType::RspRedisStatus, false);
360        m.flags_mut().is_error = is_error;
361        m
362    }
363
364    #[test]
365    fn dc_one_one_good() {
366        let mut mgr = ResponseMgr::new(&req(), 1, Some("dc1".into()));
367        assert_eq!(mgr.outcome(), QuorumOutcome::Pending);
368        mgr.submit_response(rsp(2, false), 1);
369        assert_eq!(mgr.outcome(), QuorumOutcome::Achieved);
370        assert!(mgr.pick_response().is_some());
371    }
372
373    #[test]
374    fn dc_one_single_error() {
375        let mut mgr = ResponseMgr::new(&req(), 1, None);
376        mgr.submit_response(rsp(2, true), 0);
377        assert_eq!(mgr.outcome(), QuorumOutcome::Failed);
378        assert!(mgr.error_response().is_some());
379        assert!(mgr.pick_response().is_none());
380    }
381
382    #[test]
383    fn dc_quorum_two_matching() {
384        let mut mgr = ResponseMgr::new(&req(), 2, None);
385        assert_eq!(mgr.quorum_responses(), 2);
386        mgr.submit_response(rsp(2, false), 7);
387        assert_eq!(mgr.outcome(), QuorumOutcome::Pending);
388        mgr.submit_response(rsp(3, false), 7);
389        assert_eq!(mgr.outcome(), QuorumOutcome::Achieved);
390    }
391
392    #[test]
393    fn dc_quorum_two_mismatched_no_third_response() {
394        let mut mgr = ResponseMgr::new(&req(), 2, None);
395        mgr.submit_response(rsp(2, false), 7);
396        mgr.submit_response(rsp(3, false), 9);
397        assert_eq!(mgr.outcome(), QuorumOutcome::Mismatched);
398    }
399
400    #[test]
401    fn dc_quorum_one_good_one_error_fails() {
402        let mut mgr = ResponseMgr::new(&req(), 2, None);
403        mgr.submit_response(rsp(2, false), 7);
404        mgr.submit_response(rsp(3, true), 0);
405        assert_eq!(mgr.outcome(), QuorumOutcome::Failed);
406    }
407
408    #[test]
409    fn dc_safe_quorum_three_all_match() {
410        let mut mgr = ResponseMgr::new(&req(), 3, None);
411        assert_eq!(mgr.quorum_responses(), 2);
412        for id in 2..=4 {
413            mgr.submit_response(rsp(id, false), 11);
414        }
415        assert_eq!(mgr.outcome(), QuorumOutcome::Achieved);
416        assert_eq!(mgr.pick_response().unwrap().id(), 2);
417    }
418
419    #[test]
420    fn dc_safe_quorum_two_match_one_dissent() {
421        let mut mgr = ResponseMgr::new(&req(), 3, None);
422        mgr.submit_response(rsp(2, false), 5);
423        mgr.submit_response(rsp(3, false), 5);
424        // Quorum already achieved before the third reply lands.
425        assert_eq!(mgr.outcome(), QuorumOutcome::Achieved);
426        mgr.submit_response(rsp(4, false), 9);
427        assert_eq!(mgr.outcome(), QuorumOutcome::Achieved);
428    }
429
430    #[test]
431    fn dc_safe_quorum_three_disagreeing_mismatched() {
432        let mut mgr = ResponseMgr::new(&req(), 3, None);
433        mgr.submit_response(rsp(2, false), 1);
434        mgr.submit_response(rsp(3, false), 2);
435        // First two disagree, but a third reply is still pending.
436        assert_eq!(mgr.outcome(), QuorumOutcome::Pending);
437        mgr.submit_response(rsp(4, false), 3);
438        assert_eq!(mgr.outcome(), QuorumOutcome::Mismatched);
439        assert!(mgr.pick_response().is_none());
440    }
441
442    #[test]
443    fn dc_safe_quorum_two_errors_fail_immediately() {
444        let mut mgr = ResponseMgr::new(&req(), 3, None);
445        mgr.submit_response(rsp(2, true), 0);
446        mgr.submit_response(rsp(3, true), 0);
447        assert_eq!(mgr.outcome(), QuorumOutcome::Failed);
448    }
449
450    #[test]
451    fn dc_safe_quorum_three_errors_fail() {
452        let mut mgr = ResponseMgr::new(&req(), 3, None);
453        for id in 2..=4 {
454            mgr.submit_response(rsp(id, true), 0);
455        }
456        assert_eq!(mgr.outcome(), QuorumOutcome::Failed);
457    }
458
459    #[test]
460    fn dc_safe_quorum_one_dissent_picks_majority() {
461        let mut mgr = ResponseMgr::new(&req(), 3, None);
462        mgr.submit_response(rsp(2, false), 1);
463        mgr.submit_response(rsp(3, false), 2);
464        mgr.submit_response(rsp(4, false), 2);
465        assert_eq!(mgr.outcome(), QuorumOutcome::Achieved);
466        // chk1 == chk2 -> response index 1 wins.
467        assert_eq!(mgr.pick_response().unwrap().id(), 3);
468    }
469
470    #[test]
471    fn excess_good_responses_are_dropped() {
472        let mut mgr = ResponseMgr::new(&req(), 3, None);
473        for id in 2..=10 {
474            mgr.submit_response(rsp(id, false), 1);
475        }
476        assert_eq!(mgr.good_responses() as usize, MAX_REPLICAS_PER_DC);
477    }
478}