use std::collections::{HashMap, HashSet};
use crate::core::types::MsgId;
use crate::msg::{ConsistencyLevel, DynErrorCode, Msg, MsgType};
pub fn redis_pre_coalesce(rsp: &mut Msg) {
if rsp.is_request() {
return;
}
if rsp.frag_id() == 0 {
return;
}
match rsp.ty() {
MsgType::RspRedisInteger | MsgType::RspRedisMultibulk | MsgType::RspRedisStatus => {
}
MsgType::RspRedisError
| MsgType::RspRedisErrorErr
| MsgType::RspRedisErrorOom
| MsgType::RspRedisErrorBusy
| MsgType::RspRedisErrorNoauth
| MsgType::RspRedisErrorLoading
| MsgType::RspRedisErrorBusykey
| MsgType::RspRedisErrorMisconf
| MsgType::RspRedisErrorNoscript
| MsgType::RspRedisErrorReadonly
| MsgType::RspRedisErrorWrongtype
| MsgType::RspRedisErrorExecabort
| MsgType::RspRedisErrorMasterdown
| MsgType::RspRedisErrorNoreplicas => {
rsp.set_is_error(true);
}
_ => {
rsp.set_is_error(true);
rsp.set_dyn_error_code(DynErrorCode::BadFormat);
}
}
}
pub fn accumulate_fragment_integer(parent: &mut Msg, rsp: &Msg) {
if rsp.is_request() {
return;
}
if rsp.frag_id() == 0 {
return;
}
if !matches!(rsp.ty(), MsgType::RspRedisInteger) {
return;
}
if !matches!(parent.ty(), MsgType::ReqRedisDel | MsgType::ReqRedisExists) {
return;
}
parent.set_integer(parent.integer().saturating_add(rsp.integer()));
}
pub fn redis_post_coalesce(req: &mut Msg) {
if !req.is_request() {
return;
}
if req.flags().is_error || req.flags().is_ferror {
return;
}
req.set_done(true);
}
#[derive(Debug)]
pub struct CoalesceTracker {
req_id: MsgId,
consistency: ConsistencyLevel,
expected: u8,
targets: HashMap<u32, TargetInfo>,
received: HashMap<u32, ReplySlot>,
decided: bool,
}
#[derive(Debug, Clone)]
struct TargetInfo {
dc: String,
is_local_dc: bool,
}
#[derive(Debug)]
struct ReplySlot {
eq_key: ReplyKey,
msg: Option<Msg>,
}
#[derive(Debug, Clone, Eq, Hash, PartialEq)]
struct ReplyKey {
is_error: bool,
payload: Vec<u8>,
}
#[derive(Debug)]
pub enum CoalesceOutcome {
Pending,
Ready {
winner: Box<Msg>,
divergent_targets: Vec<u32>,
},
Error(String),
}
impl CoalesceTracker {
#[must_use]
pub fn new(
req_id: MsgId,
consistency: ConsistencyLevel,
targets: Vec<(u32, String)>,
local_dc: &str,
) -> Self {
let expected = u8::try_from(targets.len()).unwrap_or(u8::MAX).max(1);
let mut tmap: HashMap<u32, TargetInfo> = HashMap::with_capacity(targets.len());
for (idx, dc) in targets {
let is_local_dc = dc == local_dc;
tmap.insert(idx, TargetInfo { dc, is_local_dc });
}
Self {
req_id,
consistency,
expected,
targets: tmap,
received: HashMap::new(),
decided: false,
}
}
#[must_use]
pub fn req_id(&self) -> MsgId {
self.req_id
}
#[must_use]
pub fn expected(&self) -> u8 {
self.expected
}
#[must_use]
pub fn received_count(&self) -> u8 {
u8::try_from(self.received.len()).unwrap_or(u8::MAX)
}
#[must_use]
pub fn is_decided(&self) -> bool {
self.decided
}
pub fn record_reply(&mut self, source_peer_idx: u32, rsp: Msg) -> CoalesceOutcome {
if self.decided {
return CoalesceOutcome::Pending;
}
let key = reply_key(&rsp);
self.received.insert(
source_peer_idx,
ReplySlot {
eq_key: key,
msg: Some(rsp),
},
);
match self.consistency {
ConsistencyLevel::DcOne => self.evaluate_dc_one(source_peer_idx),
ConsistencyLevel::DcQuorum => self.evaluate_dc_quorum(),
ConsistencyLevel::DcSafeQuorum => self.evaluate_dc_safe_quorum(),
ConsistencyLevel::DcEachSafeQuorum => self.evaluate_dc_each_safe_quorum(),
}
}
fn evaluate_dc_one(&mut self, first_peer: u32) -> CoalesceOutcome {
self.decided = true;
let Some(slot) = self.received.get_mut(&first_peer) else {
return CoalesceOutcome::Error("dc_one: no recorded reply".into());
};
let Some(msg) = slot.msg.take() else {
return CoalesceOutcome::Error("dc_one: reply already consumed".into());
};
CoalesceOutcome::Ready {
winner: Box::new(msg),
divergent_targets: Vec::new(),
}
}
fn evaluate_dc_quorum(&mut self) -> CoalesceOutcome {
let local_targets: HashSet<u32> = self
.targets
.iter()
.filter(|(_, t)| t.is_local_dc)
.map(|(idx, _)| *idx)
.collect();
let local_count = if local_targets.is_empty() {
usize::from(self.expected)
} else {
local_targets.len()
};
let quorum = local_count / 2 + 1;
let votes = self.local_dc_votes(&local_targets);
if let Some(winner_key) = winning_key(&votes, quorum) {
return self.emit_winner(&winner_key);
}
if self.received_count() as usize >= local_count {
if let Some(winner_key) = plurality_key(&votes) {
return self.emit_winner(&winner_key);
}
self.decided = true;
return CoalesceOutcome::Error("dc_quorum: no replies in local dc".into());
}
CoalesceOutcome::Pending
}
fn evaluate_dc_safe_quorum(&mut self) -> CoalesceOutcome {
let local_targets: HashSet<u32> = self
.targets
.iter()
.filter(|(_, t)| t.is_local_dc)
.map(|(idx, _)| *idx)
.collect();
let local_count = if local_targets.is_empty() {
usize::from(self.expected)
} else {
local_targets.len()
};
let received_local: usize = self
.received
.keys()
.filter(|k| local_targets.is_empty() || local_targets.contains(k))
.count();
if received_local < local_count {
return CoalesceOutcome::Pending;
}
let votes = self.local_dc_votes(&local_targets);
if votes.len() == 1 {
let winner_key = votes
.into_keys()
.next()
.expect("invariant: votes.len() == 1");
return self.emit_winner(&winner_key);
}
self.decided = true;
CoalesceOutcome::Error("dc_safe_quorum: divergent replies".into())
}
fn evaluate_dc_each_safe_quorum(&mut self) -> CoalesceOutcome {
let mut per_dc: HashMap<String, Vec<u32>> = HashMap::new();
for (idx, info) in &self.targets {
per_dc.entry(info.dc.clone()).or_default().push(*idx);
}
let mut dcs_sorted: Vec<String> = per_dc.keys().cloned().collect();
dcs_sorted.sort();
let mut local_winner: Option<ReplyKey> = None;
let mut all_complete = true;
for dc in &dcs_sorted {
let idxs = per_dc
.get(dc)
.expect("invariant: dc was just enumerated from per_dc");
let received_dc: usize = idxs
.iter()
.filter(|i| self.received.contains_key(*i))
.count();
if received_dc < idxs.len() {
all_complete = false;
continue;
}
let mut dc_votes: HashMap<ReplyKey, Vec<u32>> = HashMap::new();
for i in idxs {
if let Some(slot) = self.received.get(i) {
dc_votes.entry(slot.eq_key.clone()).or_default().push(*i);
}
}
if dc_votes.len() != 1 {
self.decided = true;
return CoalesceOutcome::Error(format!(
"dc_each_safe_quorum: divergent replies in dc {dc}"
));
}
let dc_key = dc_votes
.into_keys()
.next()
.expect("invariant: dc_votes.len() == 1");
let is_local_dc = self.targets.get(&idxs[0]).is_some_and(|t| t.is_local_dc);
if is_local_dc {
local_winner = Some(dc_key);
}
}
if !all_complete {
return CoalesceOutcome::Pending;
}
let Some(winner_key) = local_winner.or_else(|| {
let mut idxs: Vec<u32> = self.received.keys().copied().collect();
idxs.sort_unstable();
idxs.first()
.and_then(|i| self.received.get(i))
.map(|s| s.eq_key.clone())
}) else {
self.decided = true;
return CoalesceOutcome::Error("dc_each_safe_quorum: no replies".into());
};
let mut divergent: Vec<u32> = Vec::new();
for (idx, slot) in &self.received {
if slot.eq_key != winner_key {
divergent.push(*idx);
}
}
divergent.sort_unstable();
let outcome = self.emit_winner(&winner_key);
if let CoalesceOutcome::Ready {
winner,
divergent_targets: emitted_div,
} = outcome
{
let mut combined: HashSet<u32> = emitted_div.into_iter().collect();
for d in divergent {
combined.insert(d);
}
let mut combined: Vec<u32> = combined.into_iter().collect();
combined.sort_unstable();
return CoalesceOutcome::Ready {
winner,
divergent_targets: combined,
};
}
outcome
}
fn local_dc_votes(&self, local_targets: &HashSet<u32>) -> HashMap<ReplyKey, Vec<u32>> {
let mut votes: HashMap<ReplyKey, Vec<u32>> = HashMap::new();
for (idx, slot) in &self.received {
if !local_targets.is_empty() && !local_targets.contains(idx) {
continue;
}
votes.entry(slot.eq_key.clone()).or_default().push(*idx);
}
votes
}
fn emit_winner(&mut self, winner_key: &ReplyKey) -> CoalesceOutcome {
if self.decided {
return CoalesceOutcome::Pending;
}
let mut winner_msg: Option<Msg> = None;
let mut divergent: Vec<u32> = Vec::new();
let mut idx_sorted: Vec<u32> = self.received.keys().copied().collect();
idx_sorted.sort_unstable();
for idx in idx_sorted {
let Some(slot) = self.received.get_mut(&idx) else {
continue;
};
if slot.eq_key == *winner_key {
if winner_msg.is_none() {
if let Some(m) = slot.msg.take() {
winner_msg = Some(m);
}
}
} else {
divergent.push(idx);
}
}
self.decided = true;
match winner_msg {
Some(winner) => CoalesceOutcome::Ready {
winner: Box::new(winner),
divergent_targets: divergent,
},
None => CoalesceOutcome::Error("coalesce: winner key has no surviving msg".into()),
}
}
}
fn reply_key(rsp: &Msg) -> ReplyKey {
let payload: Vec<u8> = rsp
.mbufs()
.iter()
.flat_map(|b| b.readable().to_vec())
.collect();
ReplyKey {
is_error: rsp.flags().is_error,
payload,
}
}
fn winning_key(votes: &HashMap<ReplyKey, Vec<u32>>, quorum: usize) -> Option<ReplyKey> {
let mut best: Option<(&ReplyKey, usize)> = None;
for (k, v) in votes {
if v.len() >= quorum {
match best {
None => best = Some((k, v.len())),
Some((_, b)) if v.len() > b => best = Some((k, v.len())),
_ => {}
}
}
}
best.map(|(k, _)| k.clone())
}
fn plurality_key(votes: &HashMap<ReplyKey, Vec<u32>>) -> Option<ReplyKey> {
let mut best: Option<(&ReplyKey, usize)> = None;
let mut best_min_idx: Option<u32> = None;
for (k, v) in votes {
let min_idx = v.iter().copied().min().unwrap_or(u32::MAX);
let take = match best {
None => true,
Some((_, b)) if v.len() > b => true,
Some((_, b)) if v.len() == b && best_min_idx.is_some_and(|m| min_idx < m) => true,
_ => false,
};
if take {
best = Some((k, v.len()));
best_min_idx = Some(min_idx);
}
}
best.map(|(k, _)| k.clone())
}
#[cfg(test)]
mod replica_coalesce_tests {
use super::*;
use crate::io::mbuf::MbufPool;
use crate::msg::response::make_simple_redis;
use crate::msg::{Msg, MsgType};
fn req() -> Msg {
Msg::new(1, MsgType::ReqRedisGet, true)
}
fn ok_rsp(payload: &[u8]) -> Msg {
let pool = MbufPool::default();
make_simple_redis(&req(), &pool, payload)
}
fn err_rsp(payload: &[u8]) -> Msg {
let pool = MbufPool::default();
let mut m = make_simple_redis(&req(), &pool, payload);
m.set_is_error(true);
m
}
fn winner_payload(out: &CoalesceOutcome) -> Vec<u8> {
match out {
CoalesceOutcome::Ready { winner, .. } => winner
.mbufs()
.iter()
.flat_map(|b| b.readable().to_vec())
.collect(),
other => panic!("expected Ready, got {other:?}"),
}
}
fn divergent(out: &CoalesceOutcome) -> Vec<u32> {
match out {
CoalesceOutcome::Ready {
divergent_targets, ..
} => divergent_targets.clone(),
other => panic!("expected Ready, got {other:?}"),
}
}
#[test]
fn dc_one_first_reply_wins() {
let mut t =
CoalesceTracker::new(1, ConsistencyLevel::DcOne, vec![(0, "dc1".into())], "dc1");
let out = t.record_reply(0, ok_rsp(b"+OK\r\n"));
assert_eq!(winner_payload(&out), b"+OK\r\n");
assert!(divergent(&out).is_empty());
assert!(t.is_decided());
}
#[test]
fn dc_one_late_reply_dropped() {
let mut t = CoalesceTracker::new(
1,
ConsistencyLevel::DcOne,
vec![(0, "dc1".into()), (1, "dc1".into())],
"dc1",
);
let out = t.record_reply(0, ok_rsp(b"$1\r\na\r\n"));
assert!(matches!(out, CoalesceOutcome::Ready { .. }));
let out2 = t.record_reply(1, ok_rsp(b"$1\r\nb\r\n"));
assert!(matches!(out2, CoalesceOutcome::Pending));
}
#[test]
fn dc_quorum_three_all_agree() {
let mut t = CoalesceTracker::new(
1,
ConsistencyLevel::DcQuorum,
vec![(0, "dc1".into()), (1, "dc1".into()), (2, "dc1".into())],
"dc1",
);
assert!(matches!(
t.record_reply(0, ok_rsp(b"$1\r\na\r\n")),
CoalesceOutcome::Pending
));
let out = t.record_reply(1, ok_rsp(b"$1\r\na\r\n"));
assert_eq!(winner_payload(&out), b"$1\r\na\r\n");
assert!(divergent(&out).is_empty());
}
#[test]
fn dc_quorum_one_divergent_repaired() {
let mut t = CoalesceTracker::new(
1,
ConsistencyLevel::DcQuorum,
vec![(0, "dc1".into()), (1, "dc1".into()), (2, "dc1".into())],
"dc1",
);
assert!(matches!(
t.record_reply(0, ok_rsp(b"$2\r\nv1\r\n")),
CoalesceOutcome::Pending
));
assert!(matches!(
t.record_reply(1, ok_rsp(b"$2\r\nv2\r\n")),
CoalesceOutcome::Pending
));
let out = t.record_reply(2, ok_rsp(b"$2\r\nv2\r\n"));
assert_eq!(winner_payload(&out), b"$2\r\nv2\r\n");
assert_eq!(divergent(&out), vec![0]);
}
#[test]
fn dc_quorum_all_divergent_picks_plurality() {
let mut t = CoalesceTracker::new(
1,
ConsistencyLevel::DcQuorum,
vec![(0, "dc1".into()), (1, "dc1".into()), (2, "dc1".into())],
"dc1",
);
let _ = t.record_reply(0, ok_rsp(b"$1\r\na\r\n"));
let _ = t.record_reply(1, ok_rsp(b"$1\r\nb\r\n"));
let out = t.record_reply(2, ok_rsp(b"$1\r\nc\r\n"));
match &out {
CoalesceOutcome::Ready {
winner,
divergent_targets,
} => {
let w: Vec<u8> = winner
.mbufs()
.iter()
.flat_map(|b| b.readable().to_vec())
.collect();
assert_eq!(w, b"$1\r\na\r\n");
assert_eq!(divergent_targets, &vec![1, 2]);
}
other => panic!("expected Ready, got {other:?}"),
}
}
#[test]
fn dc_quorum_error_and_value_do_not_match() {
let mut t = CoalesceTracker::new(
1,
ConsistencyLevel::DcQuorum,
vec![(0, "dc1".into()), (1, "dc1".into()), (2, "dc1".into())],
"dc1",
);
let _ = t.record_reply(0, ok_rsp(b"$1\r\nx\r\n"));
let _ = t.record_reply(1, err_rsp(b"$1\r\nx\r\n"));
let out = t.record_reply(2, ok_rsp(b"$1\r\ny\r\n"));
let CoalesceOutcome::Ready {
winner,
divergent_targets,
} = out
else {
panic!("expected Ready");
};
assert!(!winner.flags().is_error);
assert_eq!(divergent_targets, vec![1, 2]);
}
#[test]
fn dc_safe_quorum_three_all_agree() {
let mut t = CoalesceTracker::new(
1,
ConsistencyLevel::DcSafeQuorum,
vec![(0, "dc1".into()), (1, "dc1".into()), (2, "dc1".into())],
"dc1",
);
assert!(matches!(
t.record_reply(0, ok_rsp(b"$1\r\nz\r\n")),
CoalesceOutcome::Pending
));
assert!(matches!(
t.record_reply(1, ok_rsp(b"$1\r\nz\r\n")),
CoalesceOutcome::Pending
));
let out = t.record_reply(2, ok_rsp(b"$1\r\nz\r\n"));
assert!(matches!(out, CoalesceOutcome::Ready { .. }));
assert!(divergent(&out).is_empty());
}
#[test]
fn dc_safe_quorum_divergence_errors() {
let mut t = CoalesceTracker::new(
1,
ConsistencyLevel::DcSafeQuorum,
vec![(0, "dc1".into()), (1, "dc1".into()), (2, "dc1".into())],
"dc1",
);
let _ = t.record_reply(0, ok_rsp(b"$1\r\na\r\n"));
let _ = t.record_reply(1, ok_rsp(b"$1\r\na\r\n"));
let out = t.record_reply(2, ok_rsp(b"$1\r\nb\r\n"));
assert!(matches!(out, CoalesceOutcome::Error(_)), "{out:?}");
assert!(t.is_decided());
}
#[test]
fn dc_each_safe_quorum_two_dcs_all_agree() {
let mut t = CoalesceTracker::new(
1,
ConsistencyLevel::DcEachSafeQuorum,
vec![
(0, "dc1".into()),
(1, "dc1".into()),
(2, "dc2".into()),
(3, "dc2".into()),
],
"dc1",
);
for idx in 0..4 {
let _ = t.record_reply(idx, ok_rsp(b"$1\r\nq\r\n"));
}
assert!(t.is_decided());
}
#[test]
fn dc_each_safe_quorum_remote_dc_diverges_marks_divergent() {
let mut t = CoalesceTracker::new(
1,
ConsistencyLevel::DcEachSafeQuorum,
vec![
(0, "dc1".into()),
(1, "dc1".into()),
(2, "dc2".into()),
(3, "dc2".into()),
],
"dc1",
);
let _ = t.record_reply(0, ok_rsp(b"$1\r\na\r\n"));
let _ = t.record_reply(1, ok_rsp(b"$1\r\na\r\n"));
let _ = t.record_reply(2, ok_rsp(b"$1\r\nb\r\n"));
let out = t.record_reply(3, ok_rsp(b"$1\r\nb\r\n"));
let CoalesceOutcome::Ready {
divergent_targets, ..
} = out
else {
panic!("expected Ready: {out:?}");
};
assert_eq!(divergent_targets, vec![2, 3]);
}
#[test]
fn dc_each_safe_quorum_intra_dc_divergence_errors() {
let mut t = CoalesceTracker::new(
1,
ConsistencyLevel::DcEachSafeQuorum,
vec![
(0, "dc1".into()),
(1, "dc1".into()),
(2, "dc2".into()),
(3, "dc2".into()),
],
"dc1",
);
let _ = t.record_reply(0, ok_rsp(b"$1\r\na\r\n"));
let dc1_full = t.record_reply(1, ok_rsp(b"$1\r\nb\r\n"));
assert!(
matches!(dc1_full, CoalesceOutcome::Error(_)),
"{dc1_full:?}"
);
assert!(t.is_decided());
let _ = t.record_reply(2, ok_rsp(b"$1\r\nq\r\n"));
let late = t.record_reply(3, ok_rsp(b"$1\r\nq\r\n"));
assert!(matches!(late, CoalesceOutcome::Pending), "{late:?}");
}
#[test]
fn dc_quorum_two_targets_both_agree() {
let mut t = CoalesceTracker::new(
1,
ConsistencyLevel::DcQuorum,
vec![(0, "dc1".into()), (1, "dc1".into())],
"dc1",
);
assert!(matches!(
t.record_reply(0, ok_rsp(b"+OK\r\n")),
CoalesceOutcome::Pending
));
let out = t.record_reply(1, ok_rsp(b"+OK\r\n"));
assert!(matches!(out, CoalesceOutcome::Ready { .. }));
}
#[test]
fn duplicate_reply_overwrites() {
let mut t = CoalesceTracker::new(
1,
ConsistencyLevel::DcQuorum,
vec![(0, "dc1".into()), (1, "dc1".into()), (2, "dc1".into())],
"dc1",
);
let _ = t.record_reply(0, ok_rsp(b"$1\r\nx\r\n"));
let _ = t.record_reply(0, ok_rsp(b"$1\r\ny\r\n"));
let _ = t.record_reply(1, ok_rsp(b"$1\r\ny\r\n"));
assert!(t.is_decided());
}
#[test]
fn after_decided_subsequent_reply_pending() {
let mut t = CoalesceTracker::new(
1,
ConsistencyLevel::DcOne,
vec![(0, "dc1".into()), (1, "dc1".into())],
"dc1",
);
let _ = t.record_reply(0, ok_rsp(b"+OK\r\n"));
let out = t.record_reply(1, ok_rsp(b"+ALSO\r\n"));
assert!(matches!(out, CoalesceOutcome::Pending));
}
}