use std::time::Instant;
use super::replication::{ChannelId, ReplicaRole, SyncHeartbeat, SyncRequest};
pub const SYNC_REQUEST_CHUNK_MAX_DEFAULT: u32 = 256 * 1024;
use super::replication_election::{elect, ElectionOutcome};
use super::replication_heartbeat::HeartbeatTracker;
use super::replication_state::TransitionSignal;
use crate::adapter::net::behavior::placement::NodeId;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum OutboundMessage {
Heartbeat {
target: NodeId,
msg: SyncHeartbeat,
},
SyncRequest {
target: NodeId,
msg: SyncRequest,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct PendingTransition {
pub target: ReplicaRole,
pub signal: TransitionSignal,
}
pub struct TickInputs<'a> {
pub self_node_id: NodeId,
pub current_role: ReplicaRole,
pub channel_id: ChannelId,
pub tail_seq: u64,
pub replica_set: &'a [NodeId],
pub tracker: &'a HeartbeatTracker,
pub wall_clock_ms: u64,
pub chunk_max_bytes: u32,
pub now: Instant,
pub default_bandwidth_class: super::bandwidth::BandwidthClass,
}
#[derive(Debug, Default)]
pub struct StepOutcome {
pub outbound: Vec<OutboundMessage>,
pub transition: Option<PendingTransition>,
}
pub fn tick(inputs: TickInputs<'_>) -> StepOutcome {
let mut outcome = StepOutcome::default();
if !inputs.current_role.emits_heartbeats() {
return outcome;
}
let mut seen: std::collections::BTreeSet<u64> = std::collections::BTreeSet::new();
for &peer in inputs.replica_set {
if peer == inputs.self_node_id {
continue;
}
if !seen.insert(peer) {
continue;
}
outcome.outbound.push(OutboundMessage::Heartbeat {
target: peer,
msg: SyncHeartbeat {
channel_id: inputs.channel_id,
tail_seq: inputs.tail_seq,
role: inputs.current_role,
wall_clock_ms: inputs.wall_clock_ms,
},
});
}
if inputs.current_role == ReplicaRole::Replica && inputs.tracker.is_leader_silent(inputs.now) {
outcome.transition = Some(PendingTransition {
target: ReplicaRole::Candidate,
signal: TransitionSignal::MissedHeartbeats,
});
return outcome;
}
if inputs.current_role == ReplicaRole::Replica {
if let Some(leader) = inputs
.tracker
.believed_leader()
.filter(|&l| l != inputs.self_node_id)
{
if let Some(peer) = inputs.tracker.peer_state(leader) {
if peer.tail_seq > inputs.tail_seq {
outcome.outbound.push(OutboundMessage::SyncRequest {
target: leader,
msg: SyncRequest {
channel_id: inputs.channel_id,
since_seq: inputs.tail_seq,
chunk_max: inputs.chunk_max_bytes,
request_id: 0,
class: inputs.default_bandwidth_class,
},
});
}
}
}
}
outcome
}
pub fn election_outcome<R, H>(
self_node_id: NodeId,
replica_set: &[NodeId],
rtt_to: R,
health_of: H,
) -> Option<PendingTransition>
where
R: Fn(NodeId) -> Option<std::time::Duration>,
H: Fn(NodeId) -> bool,
{
match elect(replica_set, self_node_id, rtt_to, health_of) {
ElectionOutcome::SelfWins => Some(PendingTransition {
target: ReplicaRole::Leader,
signal: TransitionSignal::ElectionWon,
}),
ElectionOutcome::PeerWins(_) => Some(PendingTransition {
target: ReplicaRole::Replica,
signal: TransitionSignal::ElectionLost,
}),
ElectionOutcome::NoEligibleReplica => None,
}
}
trait ReplicaRoleExt {
fn emits_heartbeats(self) -> bool;
}
impl ReplicaRoleExt for ReplicaRole {
fn emits_heartbeats(self) -> bool {
matches!(self, ReplicaRole::Leader | ReplicaRole::Replica)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::adapter::net::channel::ChannelName;
use std::time::Duration;
fn channel_id_for(name: &str) -> ChannelId {
let cn = ChannelName::new(name).unwrap();
ChannelId::from_name(&cn)
}
fn t0() -> Instant {
Instant::now()
}
fn at(base: Instant, ms: u64) -> Instant {
base + Duration::from_millis(ms)
}
fn empty_tracker() -> HeartbeatTracker {
HeartbeatTracker::new(500)
}
fn tracker_with_silent_leader(leader: NodeId, miss_seconds: u64) -> HeartbeatTracker {
let mut t = HeartbeatTracker::new(500); let base = t0();
t.record_heartbeat(leader, ReplicaRole::Leader, 0, base);
let _ = miss_seconds; t
}
#[test]
fn idle_emits_no_heartbeats() {
let tracker = empty_tracker();
let inputs = TickInputs {
self_node_id: 0x1,
current_role: ReplicaRole::Idle,
channel_id: channel_id_for("test/idle"),
tail_seq: 0,
replica_set: &[0x1, 0x2, 0x3],
tracker: &tracker,
wall_clock_ms: 0,
chunk_max_bytes: 0,
now: t0(),
default_bandwidth_class: Default::default(),
};
let outcome = tick(inputs);
assert!(outcome.outbound.is_empty());
assert!(outcome.transition.is_none());
}
#[test]
fn candidate_emits_no_heartbeats() {
let tracker = empty_tracker();
let inputs = TickInputs {
self_node_id: 0x1,
current_role: ReplicaRole::Candidate,
channel_id: channel_id_for("test/candidate"),
tail_seq: 0,
replica_set: &[0x1, 0x2, 0x3],
tracker: &tracker,
wall_clock_ms: 0,
chunk_max_bytes: 0,
now: t0(),
default_bandwidth_class: Default::default(),
};
let outcome = tick(inputs);
assert!(outcome.outbound.is_empty());
assert!(outcome.transition.is_none());
}
#[test]
fn leader_emits_to_every_other_replica() {
let tracker = empty_tracker();
let cid = channel_id_for("test/leader");
let inputs = TickInputs {
self_node_id: 0x10,
current_role: ReplicaRole::Leader,
channel_id: cid,
tail_seq: 42,
replica_set: &[0x10, 0x20, 0x30, 0x40],
tracker: &tracker,
wall_clock_ms: 1_700_000_000_000,
chunk_max_bytes: 0,
now: t0(),
default_bandwidth_class: Default::default(),
};
let outcome = tick(inputs);
assert_eq!(outcome.outbound.len(), 3);
for (i, msg) in outcome.outbound.iter().enumerate() {
let OutboundMessage::Heartbeat { target, msg } = msg else {
panic!("expected Heartbeat, got {msg:?}");
};
assert_eq!(*target, [0x20, 0x30, 0x40][i]);
assert_eq!(msg.channel_id, cid);
assert_eq!(msg.tail_seq, 42);
assert_eq!(msg.role, ReplicaRole::Leader);
assert_eq!(msg.wall_clock_ms, 1_700_000_000_000);
}
assert!(outcome.transition.is_none());
}
#[test]
fn replica_emits_to_every_other_replica() {
let tracker = empty_tracker();
let cid = channel_id_for("test/replica");
let inputs = TickInputs {
self_node_id: 0x20,
current_role: ReplicaRole::Replica,
channel_id: cid,
tail_seq: 99,
replica_set: &[0x10, 0x20, 0x30],
tracker: &tracker,
wall_clock_ms: 0,
chunk_max_bytes: 0,
now: t0(),
default_bandwidth_class: Default::default(),
};
let outcome = tick(inputs);
assert_eq!(outcome.outbound.len(), 2);
let targets: Vec<NodeId> = outcome
.outbound
.iter()
.map(|m| match m {
OutboundMessage::Heartbeat { target, .. } => *target,
OutboundMessage::SyncRequest { .. } => {
panic!("expected only heartbeats; got SyncRequest")
}
})
.collect();
assert_eq!(targets, vec![0x10, 0x30]);
assert!(outcome.transition.is_none());
}
#[test]
fn solo_node_in_replica_set_emits_no_heartbeats() {
let tracker = empty_tracker();
let inputs = TickInputs {
self_node_id: 0x1,
current_role: ReplicaRole::Leader,
channel_id: channel_id_for("test/solo"),
tail_seq: 0,
replica_set: &[0x1],
tracker: &tracker,
wall_clock_ms: 0,
chunk_max_bytes: 0,
now: t0(),
default_bandwidth_class: Default::default(),
};
let outcome = tick(inputs);
assert!(outcome.outbound.is_empty());
}
#[test]
fn self_skipped_in_emission() {
let tracker = empty_tracker();
let inputs = TickInputs {
self_node_id: 0x10,
current_role: ReplicaRole::Leader,
channel_id: channel_id_for("test/self_skip"),
tail_seq: 0,
replica_set: &[0x10, 0x10, 0x20, 0x10],
tracker: &tracker,
wall_clock_ms: 0,
chunk_max_bytes: 0,
now: t0(),
default_bandwidth_class: Default::default(),
};
let outcome = tick(inputs);
assert_eq!(outcome.outbound.len(), 1);
}
#[test]
fn replica_with_silent_leader_requests_candidate_transition() {
let leader_id = 0x42;
let mut tracker = HeartbeatTracker::new(500);
let base = t0();
tracker.record_heartbeat(leader_id, ReplicaRole::Leader, 0, base);
let now = at(base, 2_000);
let inputs = TickInputs {
self_node_id: 0x10,
current_role: ReplicaRole::Replica,
channel_id: channel_id_for("test/silent_leader"),
tail_seq: 0,
replica_set: &[0x10, leader_id],
tracker: &tracker,
wall_clock_ms: 0,
chunk_max_bytes: 0,
now,
default_bandwidth_class: Default::default(),
};
let outcome = tick(inputs);
assert_eq!(outcome.outbound.len(), 1);
assert_eq!(
outcome.transition,
Some(PendingTransition {
target: ReplicaRole::Candidate,
signal: TransitionSignal::MissedHeartbeats,
}),
);
}
#[test]
fn replica_with_fresh_leader_does_not_request_transition() {
let leader_id = 0x42;
let mut tracker = HeartbeatTracker::new(500);
let base = t0();
tracker.record_heartbeat(leader_id, ReplicaRole::Leader, 0, base);
let now = at(base, 100);
let inputs = TickInputs {
self_node_id: 0x10,
current_role: ReplicaRole::Replica,
channel_id: channel_id_for("test/fresh"),
tail_seq: 0,
replica_set: &[0x10, leader_id],
tracker: &tracker,
wall_clock_ms: 0,
chunk_max_bytes: 0,
now,
default_bandwidth_class: Default::default(),
};
let outcome = tick(inputs);
assert!(outcome.transition.is_none());
}
#[test]
fn leader_with_silent_peers_does_not_request_self_transition() {
let _ = tracker_with_silent_leader; let mut tracker = HeartbeatTracker::new(500);
let base = t0();
tracker.record_heartbeat(0x20, ReplicaRole::Replica, 0, base);
let now = at(base, 60_000);
let inputs = TickInputs {
self_node_id: 0x10,
current_role: ReplicaRole::Leader,
channel_id: channel_id_for("test/leader_silent_peers"),
tail_seq: 0,
replica_set: &[0x10, 0x20],
tracker: &tracker,
wall_clock_ms: 0,
chunk_max_bytes: 0,
now,
default_bandwidth_class: Default::default(),
};
let outcome = tick(inputs);
assert!(outcome.transition.is_none());
}
#[test]
fn candidate_with_silent_leader_does_not_request_double_transition() {
let leader_id = 0x42;
let mut tracker = HeartbeatTracker::new(500);
let base = t0();
tracker.record_heartbeat(leader_id, ReplicaRole::Leader, 0, base);
let now = at(base, 60_000);
let inputs = TickInputs {
self_node_id: 0x10,
current_role: ReplicaRole::Candidate,
channel_id: channel_id_for("test/candidate_silent"),
tail_seq: 0,
replica_set: &[0x10, leader_id],
tracker: &tracker,
wall_clock_ms: 0,
chunk_max_bytes: 0,
now,
default_bandwidth_class: Default::default(),
};
let outcome = tick(inputs);
assert!(outcome.transition.is_none());
}
#[test]
fn election_self_wins_yields_leader_transition() {
let pt = election_outcome(
0x10,
&[0x10, 0x20],
|_| Some(Duration::from_millis(100)),
|_| true,
);
assert_eq!(
pt,
Some(PendingTransition {
target: ReplicaRole::Leader,
signal: TransitionSignal::ElectionWon,
}),
);
}
#[test]
fn election_peer_wins_yields_replica_transition() {
let pt = election_outcome(
0x99,
&[0x10, 0x99],
|_| Some(Duration::from_millis(5)),
|node| node != 0x99, );
assert_eq!(
pt,
Some(PendingTransition {
target: ReplicaRole::Replica,
signal: TransitionSignal::ElectionLost,
}),
);
}
#[test]
fn election_no_eligible_yields_no_transition() {
let pt = election_outcome(0x10, &[0x10, 0x20, 0x30], |_| None, |_| false);
assert!(pt.is_none());
}
#[test]
fn emission_is_deterministic_across_calls() {
let tracker = empty_tracker();
let cid = channel_id_for("test/deterministic");
let mk_inputs = || TickInputs {
self_node_id: 0x10,
current_role: ReplicaRole::Leader,
channel_id: cid,
tail_seq: 7,
replica_set: &[0x10, 0x20, 0x30],
tracker: &tracker,
wall_clock_ms: 1234,
chunk_max_bytes: 0,
now: t0(),
default_bandwidth_class: Default::default(),
};
let a = tick(mk_inputs());
let b = tick(mk_inputs());
assert_eq!(a.outbound, b.outbound);
assert_eq!(a.transition, b.transition);
}
#[test]
fn heartbeat_carries_current_tail_seq_value() {
let tracker = empty_tracker();
let inputs = TickInputs {
self_node_id: 0x1,
current_role: ReplicaRole::Leader,
channel_id: channel_id_for("test/tail"),
tail_seq: u64::MAX - 1,
replica_set: &[0x1, 0x2],
tracker: &tracker,
wall_clock_ms: 0,
chunk_max_bytes: 0,
now: t0(),
default_bandwidth_class: Default::default(),
};
let outcome = tick(inputs);
let OutboundMessage::Heartbeat { msg, .. } = &outcome.outbound[0] else {
panic!("expected Heartbeat at index 0");
};
assert_eq!(msg.tail_seq, u64::MAX - 1);
}
#[test]
fn replica_behind_leader_emits_sync_request() {
let leader_id = 0x42;
let mut tracker = HeartbeatTracker::new(500);
let base = t0();
tracker.record_heartbeat(leader_id, ReplicaRole::Leader, 100, base);
let inputs = TickInputs {
self_node_id: 0x10,
current_role: ReplicaRole::Replica,
channel_id: channel_id_for("test/lag"),
tail_seq: 10,
replica_set: &[0x10, leader_id],
tracker: &tracker,
wall_clock_ms: 0,
chunk_max_bytes: 256 * 1024,
now: at(base, 100),
default_bandwidth_class: Default::default(),
};
let outcome = tick(inputs);
assert_eq!(outcome.outbound.len(), 2);
let sync_req = outcome.outbound.iter().find_map(|m| match m {
OutboundMessage::SyncRequest { target, msg } => Some((*target, msg.clone())),
_ => None,
});
let (target, msg) = sync_req.expect("expected one SyncRequest");
assert_eq!(target, leader_id);
assert_eq!(msg.since_seq, 10);
assert_eq!(msg.chunk_max, 256 * 1024);
assert_eq!(msg.channel_id, channel_id_for("test/lag"));
assert!(outcome.transition.is_none());
}
#[test]
fn replica_caught_up_emits_no_sync_request() {
let leader_id = 0x42;
let mut tracker = HeartbeatTracker::new(500);
let base = t0();
tracker.record_heartbeat(leader_id, ReplicaRole::Leader, 100, base);
let inputs = TickInputs {
self_node_id: 0x10,
current_role: ReplicaRole::Replica,
channel_id: channel_id_for("test/caught_up"),
tail_seq: 100, replica_set: &[0x10, leader_id],
tracker: &tracker,
wall_clock_ms: 0,
chunk_max_bytes: 256 * 1024,
now: at(base, 100),
default_bandwidth_class: Default::default(),
};
let outcome = tick(inputs);
assert_eq!(outcome.outbound.len(), 1);
assert!(matches!(
outcome.outbound[0],
OutboundMessage::Heartbeat { .. }
));
}
#[test]
fn replica_with_no_believed_leader_emits_no_sync_request() {
let tracker = empty_tracker();
let inputs = TickInputs {
self_node_id: 0x10,
current_role: ReplicaRole::Replica,
channel_id: channel_id_for("test/no_leader"),
tail_seq: 0,
replica_set: &[0x10, 0x42],
tracker: &tracker,
wall_clock_ms: 0,
chunk_max_bytes: 256 * 1024,
now: t0(),
default_bandwidth_class: Default::default(),
};
let outcome = tick(inputs);
assert!(outcome
.outbound
.iter()
.all(|m| matches!(m, OutboundMessage::Heartbeat { .. })));
}
#[test]
fn leader_does_not_emit_sync_request_even_if_peer_advertises_higher_tail() {
let peer = 0x20;
let mut tracker = HeartbeatTracker::new(500);
let base = t0();
tracker.record_heartbeat(peer, ReplicaRole::Replica, 999, base);
let inputs = TickInputs {
self_node_id: 0x10,
current_role: ReplicaRole::Leader,
channel_id: channel_id_for("test/leader_no_request"),
tail_seq: 0,
replica_set: &[0x10, peer],
tracker: &tracker,
wall_clock_ms: 0,
chunk_max_bytes: 256 * 1024,
now: at(base, 100),
default_bandwidth_class: Default::default(),
};
let outcome = tick(inputs);
assert!(outcome
.outbound
.iter()
.all(|m| matches!(m, OutboundMessage::Heartbeat { .. })));
}
#[test]
fn replica_with_silent_leader_skips_sync_request_when_electing() {
let leader_id = 0x42;
let mut tracker = HeartbeatTracker::new(500);
let base = t0();
tracker.record_heartbeat(leader_id, ReplicaRole::Leader, 100, base);
let now = at(base, 2_000);
let inputs = TickInputs {
self_node_id: 0x10,
current_role: ReplicaRole::Replica,
channel_id: channel_id_for("test/skip_during_election"),
tail_seq: 10, replica_set: &[0x10, leader_id],
tracker: &tracker,
wall_clock_ms: 0,
chunk_max_bytes: 256 * 1024,
now,
default_bandwidth_class: Default::default(),
};
let outcome = tick(inputs);
assert!(outcome
.outbound
.iter()
.all(|m| matches!(m, OutboundMessage::Heartbeat { .. })));
assert_eq!(
outcome.transition,
Some(PendingTransition {
target: ReplicaRole::Candidate,
signal: TransitionSignal::MissedHeartbeats,
}),
);
}
#[test]
fn replica_with_self_as_believed_leader_emits_no_sync_request() {
let mut tracker = HeartbeatTracker::new(500);
let base = t0();
tracker.record_heartbeat(0x10, ReplicaRole::Leader, 99, base);
let inputs = TickInputs {
self_node_id: 0x10,
current_role: ReplicaRole::Replica,
channel_id: channel_id_for("test/self_loop"),
tail_seq: 0,
replica_set: &[0x10, 0x42],
tracker: &tracker,
wall_clock_ms: 0,
chunk_max_bytes: 256 * 1024,
now: at(base, 100),
default_bandwidth_class: Default::default(),
};
let outcome = tick(inputs);
assert!(
outcome
.outbound
.iter()
.all(|m| !matches!(m, OutboundMessage::SyncRequest { .. })),
"self-as-leader must not produce a SyncRequest"
);
}
}