use std::collections::BTreeMap;
use crate::replication::bookmark::CausalBookmark;
use crate::replication::election::{Member, MemberKind, VotingState};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum ReplicaClass {
#[default]
Voting,
AsyncReadReplica,
}
impl ReplicaClass {
pub fn from_member(member: &Member) -> Self {
match (member.kind, member.is_voter()) {
(MemberKind::Data, false) => ReplicaClass::AsyncReadReplica,
_ => ReplicaClass::Voting,
}
}
pub fn may_cascade(self) -> bool {
matches!(self, ReplicaClass::AsyncReadReplica)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CascadeUpstream {
pub node_id: String,
pub addr: String,
}
impl CascadeUpstream {
pub fn new(node_id: impl Into<String>, addr: impl Into<String>) -> Self {
Self {
node_id: node_id.into(),
addr: addr.into(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum UpstreamChoice {
Primary,
Intermediate(CascadeUpstream),
}
impl UpstreamChoice {
pub fn is_cascade(&self) -> bool {
matches!(self, UpstreamChoice::Intermediate(_))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CascadeRefusal {
VotingMemberDirectOnly,
SelfReference,
}
impl CascadeRefusal {
pub fn as_str(self) -> &'static str {
match self {
Self::VotingMemberDirectOnly => "voting-member-direct-only",
Self::SelfReference => "self-reference",
}
}
}
pub fn plan_upstream(
self_node_id: &str,
class: ReplicaClass,
requested: Option<&CascadeUpstream>,
) -> (UpstreamChoice, Option<CascadeRefusal>) {
let Some(upstream) = requested else {
return (UpstreamChoice::Primary, None);
};
if !class.may_cascade() {
return (
UpstreamChoice::Primary,
Some(CascadeRefusal::VotingMemberDirectOnly),
);
}
if upstream.node_id == self_node_id {
return (UpstreamChoice::Primary, Some(CascadeRefusal::SelfReference));
}
(UpstreamChoice::Intermediate(upstream.clone()), None)
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DownstreamSlot {
pub id: String,
pub confirmed_lsn: u64,
pub sent_lsn: u64,
}
#[derive(Debug, Clone)]
pub struct CascadeRelay {
node_id: String,
self_applied_lsn: u64,
downstream: BTreeMap<String, DownstreamSlot>,
}
impl CascadeRelay {
pub fn new(node_id: impl Into<String>) -> Self {
Self {
node_id: node_id.into(),
self_applied_lsn: 0,
downstream: BTreeMap::new(),
}
}
pub fn node_id(&self) -> &str {
&self.node_id
}
pub fn record_self_applied(&mut self, lsn: u64) {
self.self_applied_lsn = self.self_applied_lsn.max(lsn);
}
pub fn self_applied_lsn(&self) -> u64 {
self.self_applied_lsn
}
pub fn register_downstream(&mut self, id: impl Into<String>, start_lsn: u64) -> u64 {
let id = id.into();
let slot = self
.downstream
.entry(id.clone())
.or_insert_with(|| DownstreamSlot {
id,
confirmed_lsn: start_lsn,
sent_lsn: start_lsn,
});
slot.confirmed_lsn = slot.confirmed_lsn.max(start_lsn);
slot.sent_lsn = slot.sent_lsn.max(slot.confirmed_lsn);
slot.confirmed_lsn
}
pub fn unregister_downstream(&mut self, id: &str) -> bool {
self.downstream.remove(id).is_some()
}
pub fn record_downstream_ack(&mut self, id: &str, lsn: u64) {
if let Some(slot) = self.downstream.get_mut(id) {
slot.confirmed_lsn = slot.confirmed_lsn.max(lsn);
slot.sent_lsn = slot.sent_lsn.max(slot.confirmed_lsn);
}
}
pub fn note_forwarded(&mut self, id: &str, lsn: u64) {
if let Some(slot) = self.downstream.get_mut(id) {
slot.sent_lsn = slot.sent_lsn.max(lsn);
}
}
pub fn downstream_ids(&self) -> Vec<String> {
self.downstream.keys().cloned().collect()
}
pub fn downstream_slot(&self, id: &str) -> Option<&DownstreamSlot> {
self.downstream.get(id)
}
pub fn downstream_count(&self) -> usize {
self.downstream.len()
}
pub fn upstream_confirmed_lsn(&self) -> u64 {
match self
.downstream
.values()
.map(|slot| slot.confirmed_lsn)
.min()
{
Some(min_downstream) => min_downstream.min(self.self_applied_lsn),
None => self.self_applied_lsn,
}
}
pub fn upstream_confirmed_bookmark(&self, term: u64) -> CausalBookmark {
CausalBookmark::new(term, self.upstream_confirmed_lsn())
}
pub fn downstream_visible_frontier(&self, id: &str) -> Option<u64> {
self.downstream
.get(id)
.map(|slot| slot.confirmed_lsn.min(self.self_applied_lsn))
}
pub fn downstream_can_serve(&self, id: &str, bookmark: &CausalBookmark) -> bool {
self.downstream_visible_frontier(id)
.is_some_and(|frontier| frontier >= bookmark.commit_lsn())
}
pub fn records_to_forward<'a, T>(
&self,
requested_since_lsn: u64,
available: &'a [(u64, T)],
) -> Vec<&'a (u64, T)> {
let ceiling = self.self_applied_lsn;
available
.iter()
.filter(|(lsn, _)| *lsn > requested_since_lsn && *lsn <= ceiling)
.collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::replication::election::Member;
#[test]
fn no_requested_source_streams_from_primary() {
let (choice, refusal) = plan_upstream("r1", ReplicaClass::AsyncReadReplica, None);
assert_eq!(choice, UpstreamChoice::Primary);
assert!(refusal.is_none());
}
#[test]
fn async_read_replica_cascades_from_intermediate() {
let up = CascadeUpstream::new("inter", "http://inter:50051");
let (choice, refusal) = plan_upstream("leaf", ReplicaClass::AsyncReadReplica, Some(&up));
assert!(choice.is_cascade());
assert_eq!(choice, UpstreamChoice::Intermediate(up));
assert!(refusal.is_none());
}
#[test]
fn voting_member_refuses_cascade_and_falls_back_to_primary() {
let up = CascadeUpstream::new("inter", "http://inter:50051");
let (choice, refusal) = plan_upstream("voter", ReplicaClass::Voting, Some(&up));
assert_eq!(choice, UpstreamChoice::Primary);
assert_eq!(refusal, Some(CascadeRefusal::VotingMemberDirectOnly));
}
#[test]
fn node_refuses_to_cascade_from_itself() {
let up = CascadeUpstream::new("self", "http://self:50051");
let (choice, refusal) = plan_upstream("self", ReplicaClass::AsyncReadReplica, Some(&up));
assert_eq!(choice, UpstreamChoice::Primary);
assert_eq!(refusal, Some(CascadeRefusal::SelfReference));
}
#[test]
fn class_from_member_keeps_voters_direct() {
assert_eq!(
ReplicaClass::from_member(&Member::data_voting("v")),
ReplicaClass::Voting
);
assert_eq!(
ReplicaClass::from_member(&Member::witness("w")),
ReplicaClass::Voting
);
assert_eq!(
ReplicaClass::from_member(&Member::data_catching_up("c")),
ReplicaClass::AsyncReadReplica
);
}
#[test]
fn relay_with_no_downstream_reports_own_applied_frontier() {
let mut relay = CascadeRelay::new("inter");
relay.record_self_applied(42);
assert_eq!(relay.upstream_confirmed_lsn(), 42);
}
#[test]
fn register_downstream_holds_slot_and_is_idempotent() {
let mut relay = CascadeRelay::new("inter");
assert_eq!(relay.register_downstream("leaf", 10), 10);
relay.record_downstream_ack("leaf", 25);
assert_eq!(relay.register_downstream("leaf", 5), 25);
assert_eq!(relay.downstream_count(), 1);
}
#[test]
fn slow_leaf_pins_chain_retention_frontier() {
let mut relay = CascadeRelay::new("inter");
relay.record_self_applied(100);
relay.register_downstream("fast", 0);
relay.register_downstream("slow", 0);
relay.record_downstream_ack("fast", 90);
relay.record_downstream_ack("slow", 40);
assert_eq!(relay.upstream_confirmed_lsn(), 40);
relay.record_downstream_ack("slow", 95);
assert_eq!(relay.upstream_confirmed_lsn(), 90);
relay.record_downstream_ack("fast", 100);
relay.record_downstream_ack("slow", 100);
assert_eq!(relay.upstream_confirmed_lsn(), 100);
}
#[test]
fn releasing_slow_leaf_unblocks_frontier() {
let mut relay = CascadeRelay::new("inter");
relay.record_self_applied(100);
relay.register_downstream("slow", 0);
relay.record_downstream_ack("slow", 10);
assert_eq!(relay.upstream_confirmed_lsn(), 10);
assert!(relay.unregister_downstream("slow"));
assert_eq!(relay.upstream_confirmed_lsn(), 100);
assert!(!relay.unregister_downstream("slow"));
}
#[test]
fn acks_and_forwards_are_monotonic() {
let mut relay = CascadeRelay::new("inter");
relay.record_self_applied(50);
relay.register_downstream("leaf", 0);
relay.record_downstream_ack("leaf", 30);
relay.record_downstream_ack("leaf", 20); relay.note_forwarded("leaf", 45);
relay.note_forwarded("leaf", 10); let slot = relay.downstream_slot("leaf").unwrap();
assert_eq!(slot.confirmed_lsn, 30);
assert_eq!(slot.sent_lsn, 45);
relay.record_self_applied(20); assert_eq!(relay.self_applied_lsn(), 50);
}
#[test]
fn records_to_forward_bounds_by_since_and_self_applied() {
let mut relay = CascadeRelay::new("inter");
relay.record_self_applied(4);
let available: Vec<(u64, &str)> =
vec![(1, "a"), (2, "b"), (3, "c"), (4, "d"), (5, "e"), (6, "f")];
let picked = relay.records_to_forward(2, &available);
let lsns: Vec<u64> = picked.iter().map(|(lsn, _)| *lsn).collect();
assert_eq!(lsns, vec![3, 4]);
relay.record_self_applied(6);
let picked = relay.records_to_forward(2, &available);
let lsns: Vec<u64> = picked.iter().map(|(lsn, _)| *lsn).collect();
assert_eq!(lsns, vec![3, 4, 5, 6]);
}
#[test]
fn visible_frontier_is_monotonically_non_increasing_down_chain() {
let mut relay = CascadeRelay::new("inter");
relay.record_self_applied(80);
relay.register_downstream("leaf", 0);
relay.record_downstream_ack("leaf", 60);
assert_eq!(relay.downstream_visible_frontier("leaf"), Some(60));
relay.record_downstream_ack("leaf", 200);
assert_eq!(relay.downstream_visible_frontier("leaf"), Some(80));
assert_eq!(relay.downstream_visible_frontier("unknown"), None);
}
#[test]
fn downstream_can_serve_bookmark_only_when_frontier_covers_it() {
let mut relay = CascadeRelay::new("inter");
relay.record_self_applied(100);
relay.register_downstream("leaf", 0);
relay.record_downstream_ack("leaf", 50);
let within = CausalBookmark::new(1, 50);
let beyond = CausalBookmark::new(1, 51);
assert!(relay.downstream_can_serve("leaf", &within));
assert!(!relay.downstream_can_serve("leaf", &beyond));
assert!(!relay.downstream_can_serve("missing", &within));
}
#[test]
fn upstream_confirmed_bookmark_stamps_term() {
let mut relay = CascadeRelay::new("inter");
relay.record_self_applied(100);
relay.register_downstream("leaf", 0);
relay.record_downstream_ack("leaf", 70);
let bm = relay.upstream_confirmed_bookmark(7);
assert_eq!(bm.term(), 7);
assert_eq!(bm.commit_lsn(), 70);
}
}