1use crate::core::types::MsgId;
20
21use super::message::Msg;
22use super::msg_type::MsgType;
23
24pub const MAX_REPLICAS_PER_DC: usize = 3;
27
28#[derive(Debug)]
30struct GoodResponse {
31 msg: Box<Msg>,
32 checksum: u32,
33}
34
35#[derive(Copy, Clone, Debug, Eq, PartialEq)]
37pub enum QuorumOutcome {
38 Pending,
40 Achieved,
42 Mismatched,
45 Failed,
47}
48
49#[derive(Debug)]
54pub struct ResponseMgr {
55 is_read: bool,
56 max_responses: u8,
57 quorum_responses: u8,
58 error_responses: u8,
59 good: Vec<GoodResponse>,
60 err_rsp: Option<Box<Msg>>,
61 msg_id: MsgId,
62 msg_type: MsgType,
63 dc_name: Option<String>,
64}
65
66impl ResponseMgr {
67 #[must_use]
86 pub fn new(req: &Msg, max_responses: u8, dc_name: Option<String>) -> Self {
87 debug_assert!(max_responses >= 1);
88 debug_assert!(usize::from(max_responses) <= MAX_REPLICAS_PER_DC);
89 let max_responses = max_responses.max(1);
90 Self {
91 is_read: req.flags().is_read,
92 max_responses,
93 quorum_responses: max_responses / 2 + 1,
94 error_responses: 0,
95 good: Vec::with_capacity(MAX_REPLICAS_PER_DC),
96 err_rsp: None,
97 msg_id: req.id(),
98 msg_type: req.ty(),
99 dc_name,
100 }
101 }
102
103 #[must_use]
112 pub fn is_read(&self) -> bool {
113 self.is_read
114 }
115
116 #[must_use]
118 pub fn max_responses(&self) -> u8 {
119 self.max_responses
120 }
121
122 #[must_use]
125 pub fn quorum_responses(&self) -> u8 {
126 self.quorum_responses
127 }
128
129 #[must_use]
131 pub fn good_responses(&self) -> u8 {
132 u8::try_from(self.good.len()).unwrap_or(u8::MAX)
133 }
134
135 #[must_use]
137 pub fn error_responses(&self) -> u8 {
138 self.error_responses
139 }
140
141 #[must_use]
143 pub fn pending_responses(&self) -> u8 {
144 self.max_responses
145 .saturating_sub(self.good_responses())
146 .saturating_sub(self.error_responses)
147 }
148
149 #[must_use]
151 pub fn dc_name(&self) -> Option<&str> {
152 self.dc_name.as_deref()
153 }
154
155 #[must_use]
157 pub fn msg_id(&self) -> MsgId {
158 self.msg_id
159 }
160
161 #[must_use]
163 pub fn msg_type(&self) -> MsgType {
164 self.msg_type
165 }
166
167 pub fn submit_response(&mut self, rsp: Msg, checksum: u32) {
187 if rsp.flags().is_error {
188 self.error_responses = self.error_responses.saturating_add(1);
189 if self.err_rsp.is_none() {
190 self.err_rsp = Some(Box::new(rsp));
191 }
192 return;
193 }
194 if self.good.len() < MAX_REPLICAS_PER_DC {
195 self.good.push(GoodResponse {
196 msg: Box::new(rsp),
197 checksum,
198 });
199 }
200 }
201
202 #[must_use]
220 pub fn outcome(&self) -> QuorumOutcome {
221 let good = self.good_responses();
222 let pending = self.pending_responses();
223
224 if good < self.quorum_responses {
225 if pending + good < self.quorum_responses {
226 return QuorumOutcome::Failed;
227 }
228 return QuorumOutcome::Pending;
229 }
230
231 if self.is_quorum_achieved() {
232 return QuorumOutcome::Achieved;
233 }
234
235 if pending == 0 {
236 QuorumOutcome::Mismatched
237 } else {
238 QuorumOutcome::Pending
239 }
240 }
241
242 #[must_use]
255 pub fn is_done(&self) -> bool {
256 !matches!(self.outcome(), QuorumOutcome::Pending)
257 }
258
259 fn is_quorum_achieved(&self) -> bool {
260 let good = self.good_responses();
261 if self.quorum_responses == 1 && good == self.quorum_responses {
262 return true;
263 }
264 if good < self.quorum_responses {
265 return false;
266 }
267 let chk0 = self.good[0].checksum;
268 let chk1 = self.good[1].checksum;
269 if chk0 == chk1 {
270 return true;
271 }
272 if good < 3 {
273 return false;
274 }
275 let chk2 = self.good[2].checksum;
276 chk1 == chk2 || chk0 == chk2
277 }
278
279 #[must_use]
289 pub fn error_response(&self) -> Option<&Msg> {
290 self.err_rsp.as_deref()
291 }
292
293 pub fn good_iter(&self) -> impl Iterator<Item = (&Msg, u32)> {
296 self.good.iter().map(|g| (g.msg.as_ref(), g.checksum))
297 }
298
299 #[must_use]
322 pub fn pick_response(&self) -> Option<&Msg> {
323 if !matches!(self.outcome(), QuorumOutcome::Achieved) {
324 return None;
325 }
326 match self.good.len() {
327 1 | 2 => Some(self.good[0].msg.as_ref()),
328 3 => {
329 let c0 = self.good[0].checksum;
330 let c1 = self.good[1].checksum;
331 let c2 = self.good[2].checksum;
332 if c0 == c1 {
333 Some(self.good[0].msg.as_ref())
334 } else if c1 == c2 {
335 Some(self.good[1].msg.as_ref())
336 } else if c0 == c2 {
337 Some(self.good[0].msg.as_ref())
338 } else {
339 None
340 }
341 }
342 _ => None,
343 }
344 }
345}
346
347#[cfg(test)]
348mod tests {
349 use super::*;
350 use crate::msg::{Msg, MsgType};
351
352 fn req() -> Msg {
353 let mut m = Msg::new(1, MsgType::ReqRedisGet, true);
354 m.flags_mut().is_read = true;
355 m
356 }
357
358 fn rsp(id: u64, is_error: bool) -> Msg {
359 let mut m = Msg::new(id, MsgType::RspRedisStatus, false);
360 m.flags_mut().is_error = is_error;
361 m
362 }
363
364 #[test]
365 fn dc_one_one_good() {
366 let mut mgr = ResponseMgr::new(&req(), 1, Some("dc1".into()));
367 assert_eq!(mgr.outcome(), QuorumOutcome::Pending);
368 mgr.submit_response(rsp(2, false), 1);
369 assert_eq!(mgr.outcome(), QuorumOutcome::Achieved);
370 assert!(mgr.pick_response().is_some());
371 }
372
373 #[test]
374 fn dc_one_single_error() {
375 let mut mgr = ResponseMgr::new(&req(), 1, None);
376 mgr.submit_response(rsp(2, true), 0);
377 assert_eq!(mgr.outcome(), QuorumOutcome::Failed);
378 assert!(mgr.error_response().is_some());
379 assert!(mgr.pick_response().is_none());
380 }
381
382 #[test]
383 fn dc_quorum_two_matching() {
384 let mut mgr = ResponseMgr::new(&req(), 2, None);
385 assert_eq!(mgr.quorum_responses(), 2);
386 mgr.submit_response(rsp(2, false), 7);
387 assert_eq!(mgr.outcome(), QuorumOutcome::Pending);
388 mgr.submit_response(rsp(3, false), 7);
389 assert_eq!(mgr.outcome(), QuorumOutcome::Achieved);
390 }
391
392 #[test]
393 fn dc_quorum_two_mismatched_no_third_response() {
394 let mut mgr = ResponseMgr::new(&req(), 2, None);
395 mgr.submit_response(rsp(2, false), 7);
396 mgr.submit_response(rsp(3, false), 9);
397 assert_eq!(mgr.outcome(), QuorumOutcome::Mismatched);
398 }
399
400 #[test]
401 fn dc_quorum_one_good_one_error_fails() {
402 let mut mgr = ResponseMgr::new(&req(), 2, None);
403 mgr.submit_response(rsp(2, false), 7);
404 mgr.submit_response(rsp(3, true), 0);
405 assert_eq!(mgr.outcome(), QuorumOutcome::Failed);
406 }
407
408 #[test]
409 fn dc_safe_quorum_three_all_match() {
410 let mut mgr = ResponseMgr::new(&req(), 3, None);
411 assert_eq!(mgr.quorum_responses(), 2);
412 for id in 2..=4 {
413 mgr.submit_response(rsp(id, false), 11);
414 }
415 assert_eq!(mgr.outcome(), QuorumOutcome::Achieved);
416 assert_eq!(mgr.pick_response().unwrap().id(), 2);
417 }
418
419 #[test]
420 fn dc_safe_quorum_two_match_one_dissent() {
421 let mut mgr = ResponseMgr::new(&req(), 3, None);
422 mgr.submit_response(rsp(2, false), 5);
423 mgr.submit_response(rsp(3, false), 5);
424 assert_eq!(mgr.outcome(), QuorumOutcome::Achieved);
426 mgr.submit_response(rsp(4, false), 9);
427 assert_eq!(mgr.outcome(), QuorumOutcome::Achieved);
428 }
429
430 #[test]
431 fn dc_safe_quorum_three_disagreeing_mismatched() {
432 let mut mgr = ResponseMgr::new(&req(), 3, None);
433 mgr.submit_response(rsp(2, false), 1);
434 mgr.submit_response(rsp(3, false), 2);
435 assert_eq!(mgr.outcome(), QuorumOutcome::Pending);
437 mgr.submit_response(rsp(4, false), 3);
438 assert_eq!(mgr.outcome(), QuorumOutcome::Mismatched);
439 assert!(mgr.pick_response().is_none());
440 }
441
442 #[test]
443 fn dc_safe_quorum_two_errors_fail_immediately() {
444 let mut mgr = ResponseMgr::new(&req(), 3, None);
445 mgr.submit_response(rsp(2, true), 0);
446 mgr.submit_response(rsp(3, true), 0);
447 assert_eq!(mgr.outcome(), QuorumOutcome::Failed);
448 }
449
450 #[test]
451 fn dc_safe_quorum_three_errors_fail() {
452 let mut mgr = ResponseMgr::new(&req(), 3, None);
453 for id in 2..=4 {
454 mgr.submit_response(rsp(id, true), 0);
455 }
456 assert_eq!(mgr.outcome(), QuorumOutcome::Failed);
457 }
458
459 #[test]
460 fn dc_safe_quorum_one_dissent_picks_majority() {
461 let mut mgr = ResponseMgr::new(&req(), 3, None);
462 mgr.submit_response(rsp(2, false), 1);
463 mgr.submit_response(rsp(3, false), 2);
464 mgr.submit_response(rsp(4, false), 2);
465 assert_eq!(mgr.outcome(), QuorumOutcome::Achieved);
466 assert_eq!(mgr.pick_response().unwrap().id(), 3);
468 }
469
470 #[test]
471 fn excess_good_responses_are_dropped() {
472 let mut mgr = ResponseMgr::new(&req(), 3, None);
473 for id in 2..=10 {
474 mgr.submit_response(rsp(id, false), 1);
475 }
476 assert_eq!(mgr.good_responses() as usize, MAX_REPLICAS_PER_DC);
477 }
478}