use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use std::sync::Arc;
use std::time::Instant;
use parking_lot::Mutex;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use super::file::RedexFile;
use super::replication::{
ChannelId, ReplicaRole, SyncHeartbeat, SyncNack, SyncRequest, SyncResponse,
};
use super::replication_budget::BandwidthBudget;
use super::replication_catchup::{apply_sync_response, handle_sync_request, SyncRequestOutcome};
use super::replication_coordinator::{ChannelIdentity, CoordinatorError, ReplicationCoordinator};
use super::replication_heartbeat::HeartbeatTracker;
use super::replication_step::{
election_outcome, tick, OutboundMessage, TickInputs, SYNC_REQUEST_CHUNK_MAX_DEFAULT,
};
use crate::adapter::net::behavior::placement::NodeId;
use crate::error::AdapterError;
use std::time::Duration;
#[async_trait::async_trait]
pub trait ReplicationDispatcher: Send + Sync {
async fn send_heartbeat(&self, target: NodeId, msg: SyncHeartbeat) -> Result<(), AdapterError>;
async fn send_sync_request(&self, target: NodeId, msg: SyncRequest)
-> Result<(), AdapterError>;
async fn send_sync_response(
&self,
target: NodeId,
msg: SyncResponse,
) -> Result<(), AdapterError>;
async fn send_sync_nack(&self, target: NodeId, msg: SyncNack) -> Result<(), AdapterError>;
}
pub type RttLookup = Arc<dyn Fn(NodeId) -> Option<Duration> + Send + Sync>;
pub trait ReplicationInboundRouter: Send + Sync {
fn try_route(&self, channel_id: ChannelId, inbound: Inbound) -> Result<(), Inbound>;
}
#[async_trait::async_trait]
impl ReplicationDispatcher for crate::adapter::net::MeshNode {
async fn send_heartbeat(&self, target: NodeId, msg: SyncHeartbeat) -> Result<(), AdapterError> {
send_redex_payload(self, target, msg.to_bytes()).await
}
async fn send_sync_request(
&self,
target: NodeId,
msg: SyncRequest,
) -> Result<(), AdapterError> {
send_redex_payload(self, target, msg.to_bytes()).await
}
async fn send_sync_response(
&self,
target: NodeId,
msg: SyncResponse,
) -> Result<(), AdapterError> {
send_redex_payload(self, target, msg.to_bytes()).await
}
async fn send_sync_nack(&self, target: NodeId, msg: SyncNack) -> Result<(), AdapterError> {
send_redex_payload(self, target, msg.to_bytes()).await
}
}
async fn send_redex_payload(
mesh: &crate::adapter::net::MeshNode,
target: NodeId,
payload: Vec<u8>,
) -> Result<(), AdapterError> {
let peer_addr = mesh.peer_addr(target).ok_or_else(|| {
AdapterError::Connection(format!("replication: peer {target:#x} unknown"))
})?;
mesh.send_subprotocol(peer_addr, super::replication::SUBPROTOCOL_REDEX, &payload)
.await
}
#[derive(Debug, Clone)]
pub enum Inbound {
Heartbeat {
from: NodeId,
msg: SyncHeartbeat,
},
SyncRequest {
from: NodeId,
msg: SyncRequest,
},
SyncResponse {
from: NodeId,
msg: SyncResponse,
},
SyncNack {
from: NodeId,
msg: SyncNack,
},
Shutdown,
}
pub struct RuntimeInputs {
pub channel: ChannelIdentity,
pub channel_id: ChannelId,
pub self_node_id: NodeId,
pub replica_set: Vec<NodeId>,
pub heartbeat_ms: u64,
pub wall_clock_provider: Arc<dyn Fn() -> u64 + Send + Sync>,
pub tail_provider: Arc<dyn Fn() -> u64 + Send + Sync>,
pub rtt_lookup: RttLookup,
pub file: RedexFile,
pub default_bandwidth_class: super::bandwidth::BandwidthClass,
pub background_fraction: f32,
}
pub struct ReplicationRuntimeHandle {
inbox: mpsc::Sender<Inbound>,
priority_inbox: mpsc::Sender<Inbound>,
task: Mutex<Option<JoinHandle<()>>>,
coordinator: Arc<ReplicationCoordinator>,
stopped: AtomicBool,
}
#[inline]
fn is_priority_event(event: &Inbound) -> bool {
matches!(
event,
Inbound::Shutdown | Inbound::SyncResponse { .. } | Inbound::SyncNack { .. }
)
}
impl ReplicationRuntimeHandle {
pub fn coordinator(&self) -> &Arc<ReplicationCoordinator> {
&self.coordinator
}
pub async fn dispatch(&self, event: Inbound) -> Result<(), AdapterError> {
let sender = if is_priority_event(&event) {
&self.priority_inbox
} else {
&self.inbox
};
sender
.send(event)
.await
.map_err(|_| AdapterError::Transient("replication runtime task exited".to_string()))
}
pub fn try_dispatch(&self, event: Inbound) -> Result<(), Inbound> {
let sender = if is_priority_event(&event) {
&self.priority_inbox
} else {
&self.inbox
};
sender.try_send(event).map_err(|e| e.into_inner())
}
pub async fn cancel(&self) {
let handle = self.task.lock().take();
if let Some(h) = handle {
match self.priority_inbox.try_send(Inbound::Shutdown) {
Ok(()) | Err(mpsc::error::TrySendError::Closed(_)) => {
let _ = h.await;
}
Err(mpsc::error::TrySendError::Full(_)) => {
h.abort();
let _ = h.await;
}
}
self.stopped.store(true, AtomicOrdering::Release);
}
}
pub fn is_stopped(&self) -> bool {
self.stopped.load(AtomicOrdering::Acquire)
}
}
#[derive(Debug, Default)]
pub struct CatchupBackoff {
entries: std::collections::HashMap<NodeId, BackoffEntry>,
}
#[derive(Debug, Default, Clone, Copy)]
struct BackoffEntry {
consecutive_empty: u32,
backoff_until: Option<Instant>,
}
pub const CATCHUP_BACKOFF_THRESHOLD: u32 = 3;
pub const CATCHUP_BACKOFF_INITIAL: Duration = Duration::from_secs(1);
pub const CATCHUP_BACKOFF_CAP: Duration = Duration::from_secs(30);
#[derive(Debug, Default)]
pub struct OutstandingRequests {
entries: std::collections::HashMap<(NodeId, u64), Instant>,
}
pub const REQUEST_TTL: Duration = Duration::from_secs(30);
pub const REQUEST_REGISTRY_SOFT_CAP: usize = 256;
impl OutstandingRequests {
pub fn new() -> Self {
Self {
entries: std::collections::HashMap::new(),
}
}
pub fn record(&mut self, leader: NodeId, request_id: u64, now: Instant) {
if self.entries.len() >= REQUEST_REGISTRY_SOFT_CAP {
self.entries
.retain(|_, &mut inserted| now.saturating_duration_since(inserted) < REQUEST_TTL);
}
self.entries.insert((leader, request_id), now);
}
pub fn take(&mut self, leader: NodeId, request_id: u64, now: Instant) -> bool {
match self.entries.remove(&(leader, request_id)) {
Some(inserted) => now.saturating_duration_since(inserted) < REQUEST_TTL,
None => false,
}
}
pub fn clear_leader(&mut self, leader: NodeId) {
self.entries.retain(|(l, _), _| *l != leader);
}
}
impl CatchupBackoff {
pub fn new() -> Self {
Self {
entries: std::collections::HashMap::new(),
}
}
pub fn record_empty(&mut self, leader: NodeId, now: Instant) {
let entry = self.entries.entry(leader).or_default();
entry.consecutive_empty = entry.consecutive_empty.saturating_add(1);
if entry.consecutive_empty > CATCHUP_BACKOFF_THRESHOLD {
let shift = entry
.consecutive_empty
.saturating_sub(CATCHUP_BACKOFF_THRESHOLD + 1)
.min(20);
let multiplier: u32 = 1u32.checked_shl(shift).unwrap_or(u32::MAX);
let backoff = CATCHUP_BACKOFF_INITIAL
.saturating_mul(multiplier)
.min(CATCHUP_BACKOFF_CAP);
entry.backoff_until = Some(now + backoff);
}
}
pub fn record_progress(&mut self, leader: NodeId) {
self.entries.remove(&leader);
}
pub fn is_in_backoff(&self, leader: NodeId, now: Instant) -> bool {
self.entries
.get(&leader)
.and_then(|e| e.backoff_until)
.is_some_and(|until| now < until)
}
pub fn gc_expired(&mut self, now: Instant, cap: Duration) {
self.entries.retain(|_, e| match e.backoff_until {
Some(until) => now.saturating_duration_since(until) < cap,
None => true,
});
}
}
impl Drop for ReplicationRuntimeHandle {
fn drop(&mut self) {
if let Some(mut guard) = self.task.try_lock() {
if let Some(h) = guard.take() {
h.abort();
}
}
}
}
pub const RUNTIME_INBOX_CAPACITY: usize = 1024;
struct RuntimeState {
tracker: Arc<Mutex<HeartbeatTracker>>,
budget: Arc<Mutex<BandwidthBudget>>,
backoff: Arc<Mutex<CatchupBackoff>>,
outstanding: Arc<Mutex<OutstandingRequests>>,
}
pub const RUNTIME_PRIORITY_INBOX_CAPACITY: usize = 128;
pub fn spawn_replication_runtime(
inputs: RuntimeInputs,
coordinator: Arc<ReplicationCoordinator>,
dispatcher: Arc<dyn ReplicationDispatcher>,
budget: Arc<Mutex<BandwidthBudget>>,
) -> ReplicationRuntimeHandle {
let state = RuntimeState {
tracker: Arc::new(Mutex::new(HeartbeatTracker::new(inputs.heartbeat_ms))),
budget,
backoff: Arc::new(Mutex::new(CatchupBackoff::new())),
outstanding: Arc::new(Mutex::new(OutstandingRequests::new())),
};
let (tx, rx) = mpsc::channel::<Inbound>(RUNTIME_INBOX_CAPACITY);
let (priority_tx, priority_rx) = mpsc::channel::<Inbound>(RUNTIME_PRIORITY_INBOX_CAPACITY);
let coordinator_for_task = coordinator.clone();
let task = tokio::spawn(run(
inputs,
coordinator_for_task,
dispatcher,
state,
rx,
priority_rx,
));
ReplicationRuntimeHandle {
inbox: tx,
priority_inbox: priority_tx,
task: Mutex::new(Some(task)),
coordinator,
stopped: AtomicBool::new(false),
}
}
async fn run(
inputs: RuntimeInputs,
coordinator: Arc<ReplicationCoordinator>,
dispatcher: Arc<dyn ReplicationDispatcher>,
state: RuntimeState,
mut inbox: mpsc::Receiver<Inbound>,
mut priority_inbox: mpsc::Receiver<Inbound>,
) {
let heartbeat_interval = Duration::from_millis(inputs.heartbeat_ms);
let mut interval = tokio::time::interval(heartbeat_interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
interval.tick().await;
loop {
tokio::select! {
biased;
event = priority_inbox.recv() => {
match event {
Some(Inbound::Shutdown) | None => {
let _ = coordinator
.transition_to(
ReplicaRole::Idle,
super::replication_state::TransitionSignal::ChannelClose,
)
.await;
return;
}
Some(event) => {
on_inbound(&inputs, &coordinator, &dispatcher, &state, event).await;
}
}
}
_ = interval.tick() => {
on_tick(&inputs, &coordinator, &dispatcher, &state).await;
}
event = inbox.recv() => {
match event {
Some(Inbound::Shutdown) | None => {
let _ = coordinator
.transition_to(
ReplicaRole::Idle,
super::replication_state::TransitionSignal::ChannelClose,
)
.await;
return;
}
Some(event) => {
on_inbound(&inputs, &coordinator, &dispatcher, &state, event).await;
}
}
}
}
}
}
#[derive(Debug)]
enum LagObservation {
Leader(Duration),
Replica(Duration),
None,
}
fn observe_lag(
role: ReplicaRole,
replica_set: &[NodeId],
self_node_id: NodeId,
tracker: &HeartbeatTracker,
now: Instant,
) -> LagObservation {
match role {
ReplicaRole::Leader => {
let worst = replica_set
.iter()
.copied()
.filter(|&p| p != self_node_id)
.filter_map(|p| tracker.peer_lag(p, now))
.max();
match worst {
Some(d) => LagObservation::Leader(d),
None => LagObservation::None,
}
}
ReplicaRole::Replica => match tracker.believed_leader() {
Some(leader) => match tracker.peer_lag(leader, now) {
Some(d) => LagObservation::Replica(d),
None => LagObservation::None,
},
None => LagObservation::None,
},
ReplicaRole::Candidate | ReplicaRole::Idle => LagObservation::None,
}
}
fn clear_leader_belief_and_tokens(
tracker: &Arc<Mutex<HeartbeatTracker>>,
outstanding: &Arc<Mutex<OutstandingRequests>>,
) {
let prior = {
let mut t = tracker.lock();
let p = t.believed_leader();
t.clear_believed_leader();
p
};
if let Some(prior) = prior {
outstanding.lock().clear_leader(prior);
}
}
async fn on_tick(
inputs: &RuntimeInputs,
coordinator: &Arc<ReplicationCoordinator>,
dispatcher: &Arc<dyn ReplicationDispatcher>,
state: &RuntimeState,
) {
let RuntimeState {
tracker,
budget: _,
backoff,
outstanding,
} = state;
let now = tokio::time::Instant::now().into_std();
backoff
.lock()
.gc_expired(now, CATCHUP_BACKOFF_CAP.saturating_mul(2));
let tail_seq = (inputs.tail_provider)();
let advertised_tail = if coordinator.role() == ReplicaRole::Leader {
let max_peer_tail = tracker
.lock()
.peer_tail_seqs()
.into_iter()
.filter(|(id, _)| *id != inputs.self_node_id)
.map(|(_, t)| t)
.max();
match max_peer_tail {
Some(p) => tail_seq.min(p),
None => tail_seq,
}
} else {
tail_seq
};
coordinator.record_tail_seq(advertised_tail);
let wall_clock_ms = (inputs.wall_clock_provider)();
let (outcome, lag_observation) = {
let t = tracker.lock();
let current_role = coordinator.role();
let outcome = tick(TickInputs {
self_node_id: inputs.self_node_id,
current_role,
channel_id: inputs.channel_id,
tail_seq,
replica_set: &inputs.replica_set,
tracker: &t,
wall_clock_ms,
chunk_max_bytes: SYNC_REQUEST_CHUNK_MAX_DEFAULT,
now,
default_bandwidth_class: inputs.default_bandwidth_class,
});
let lag = observe_lag(
current_role,
&inputs.replica_set,
inputs.self_node_id,
&t,
now,
);
(outcome, lag)
};
match lag_observation {
LagObservation::Leader(d) => coordinator.metrics().record_leader_lag(d),
LagObservation::Replica(d) => coordinator.metrics().record_replica_lag(d),
LagObservation::None => {}
}
for msg in outcome.outbound {
match msg {
OutboundMessage::Heartbeat { target, msg } => {
if let Err(e) = dispatcher.send_heartbeat(target, msg).await {
tracing::trace!(target=?target, error=?e, "replication: heartbeat send failed");
}
}
OutboundMessage::SyncRequest { target, mut msg } => {
if backoff.lock().is_in_backoff(target, now) {
tracing::trace!(
target = target,
"replication: skipping SyncRequest — leader is in catchup backoff"
);
continue;
}
let mut id_bytes = [0u8; 8];
if getrandom::fill(&mut id_bytes).is_err() {
tracing::trace!(
target = target,
"replication: getrandom failure; skipping SyncRequest this tick"
);
continue;
}
let token = u64::from_le_bytes(id_bytes);
msg.request_id = token;
outstanding.lock().record(target, token, now);
if let Err(e) = dispatcher.send_sync_request(target, msg).await {
tracing::trace!(target=?target, error=?e, "replication: sync_request send failed");
}
}
}
}
if let Some(pending) = outcome.transition {
if let Err(e) = coordinator
.transition_to(pending.target, pending.signal)
.await
{
tracing::warn!(error=?e, "replication: transition_to({:?}, {:?}) failed", pending.target, pending.signal);
return;
}
if pending.target == ReplicaRole::Candidate {
let healthy = tracker.lock().healthy_peers(now);
let elect = election_outcome(
inputs.self_node_id,
&inputs.replica_set,
inputs.rtt_lookup.as_ref(),
|peer| peer == inputs.self_node_id || healthy.contains(&peer),
);
if let Some(pt) = elect {
match coordinator.transition_to(pt.target, pt.signal).await {
Ok(_) => {
clear_leader_belief_and_tokens(tracker, outstanding);
}
Err(CoordinatorError::TagSink(e)) => {
tracing::warn!(
error = ?e,
target = ?pt.target,
"replication: post-election chain-tag side-effect failed; state advanced"
);
clear_leader_belief_and_tokens(tracker, outstanding);
}
Err(CoordinatorError::Transition(e)) => {
tracing::warn!(
error = ?e,
target = ?pt.target,
"replication: post-election transition rejected; state moved out from under us"
);
clear_leader_belief_and_tokens(tracker, outstanding);
}
}
}
}
}
}
#[allow(clippy::too_many_arguments)]
async fn on_inbound(
inputs: &RuntimeInputs,
coordinator: &Arc<ReplicationCoordinator>,
dispatcher: &Arc<dyn ReplicationDispatcher>,
state: &RuntimeState,
event: Inbound,
) {
let RuntimeState {
tracker,
budget,
backoff,
outstanding,
} = state;
let from_node = match &event {
Inbound::Shutdown => None,
Inbound::Heartbeat { from, .. } => Some(*from),
Inbound::SyncRequest { from, .. } => Some(*from),
Inbound::SyncResponse { from, .. } => Some(*from),
Inbound::SyncNack { from, .. } => Some(*from),
};
if let Some(from) = from_node {
if !inputs.replica_set.contains(&from) {
tracing::trace!(
from = from,
channel = ?inputs.channel_id,
"replication: dropping inbound from peer not in replica_set"
);
return;
}
}
match event {
Inbound::Shutdown => {
unreachable!("Shutdown is filtered in the main loop");
}
Inbound::Heartbeat { from, msg } => {
if msg.channel_id != inputs.channel_id {
tracing::trace!(
from = from,
"replication: dropping heartbeat for wrong channel"
);
return;
}
tracker.lock().record_heartbeat(
from,
msg.role,
msg.tail_seq,
tokio::time::Instant::now().into_std(),
);
if msg.role == ReplicaRole::Leader
&& coordinator.role() == ReplicaRole::Leader
&& from != inputs.self_node_id
{
let local_tail = (inputs.tail_provider)();
let peer_tail = msg.tail_seq;
let local_wins = local_tail > peer_tail
|| (local_tail == peer_tail && inputs.self_node_id < from);
if !local_wins {
tracing::warn!(
from = from,
peer_tail = peer_tail,
local_tail = local_tail,
local = inputs.self_node_id,
"replication: peer-leader observed; conceding via Leader → Replica"
);
let _ = coordinator
.transition_to(
ReplicaRole::Replica,
super::replication_state::TransitionSignal::PeerLeaderObserved,
)
.await;
}
}
}
Inbound::SyncRequest { from, msg } => {
if msg.channel_id != inputs.channel_id {
tracing::trace!(
from = from,
"replication: dropping SyncRequest for wrong channel"
);
return;
}
if coordinator.role() != ReplicaRole::Leader {
let nack = SyncNack {
channel_id: inputs.channel_id,
since_seq: msg.since_seq,
error_code: super::replication::SyncNackError::NotLeader,
leader_first_retained_seq: 0,
request_id: msg.request_id,
detail: String::new(),
};
if let Err(e) = dispatcher.send_sync_nack(from, nack).await {
tracing::trace!(from = from, error = ?e, "replication: NotLeader NACK send failed");
}
return;
}
match handle_sync_request(&inputs.file, &msg, inputs.channel_id) {
SyncRequestOutcome::Response(resp) => {
let byte_estimate = estimate_response_bytes(&resp);
let admitted = {
let mut bb = budget.lock();
bb.try_consume_with_class(
byte_estimate,
msg.class,
tokio::time::Instant::now().into_std(),
inputs.background_fraction,
)
};
if !admitted {
let nack = SyncNack {
channel_id: inputs.channel_id,
since_seq: msg.since_seq,
error_code: super::replication::SyncNackError::Backpressure,
leader_first_retained_seq: 0,
request_id: msg.request_id,
detail: String::new(),
};
if let Err(e) = dispatcher.send_sync_nack(from, nack).await {
tracing::trace!(from = from, error = ?e, "replication: Backpressure NACK send failed");
}
return;
}
if coordinator.role() != ReplicaRole::Leader {
let nack = SyncNack {
channel_id: inputs.channel_id,
since_seq: msg.since_seq,
error_code: super::replication::SyncNackError::NotLeader,
leader_first_retained_seq: 0,
request_id: msg.request_id,
detail: String::new(),
};
if let Err(e) = dispatcher.send_sync_nack(from, nack).await {
tracing::trace!(from = from, error = ?e, "replication: post-op NotLeader NACK send failed");
}
return;
}
coordinator.metrics().incr_sync_bytes(byte_estimate);
if let Err(e) = dispatcher.send_sync_response(from, resp).await {
{
let mut bb = budget.lock();
bb.refund(byte_estimate);
}
tracing::trace!(from = from, error = ?e, "replication: SyncResponse send failed");
}
}
SyncRequestOutcome::Nack {
error_code,
leader_first_retained_seq,
detail,
} => {
let nack = SyncNack {
channel_id: inputs.channel_id,
since_seq: msg.since_seq,
error_code,
leader_first_retained_seq,
request_id: msg.request_id,
detail,
};
if let Err(e) = dispatcher.send_sync_nack(from, nack).await {
tracing::trace!(from = from, error = ?e, "replication: SyncNack send failed");
}
}
}
}
Inbound::SyncResponse { from, msg } => {
if msg.channel_id != inputs.channel_id {
tracing::trace!(
from = from,
"replication: dropping SyncResponse for wrong channel"
);
return;
}
{
let now = tokio::time::Instant::now().into_std();
if !outstanding.lock().take(from, msg.request_id, now) {
tracing::trace!(
from = from,
request_id = msg.request_id,
"replication: dropping SyncResponse with unknown request_id"
);
return;
}
}
let leader_belief = tracker.lock().believed_leader();
if leader_belief != Some(from) {
tracing::trace!(
from = from,
believed_leader = ?leader_belief,
"replication: dropping SyncResponse from non-leader peer"
);
return;
}
if coordinator.role() != ReplicaRole::Replica {
tracing::trace!(
from = from,
"replication: SyncResponse received in role {:?}; ignoring",
coordinator.role(),
);
return;
}
let pre_apply_tail = inputs.file.next_seq();
match apply_sync_response(&inputs.file, &msg, inputs.channel_id) {
Ok(new_tail) => {
coordinator.record_tail_seq(new_tail);
if new_tail > pre_apply_tail {
backoff.lock().record_progress(from);
} else {
let now = tokio::time::Instant::now().into_std();
let strike = {
let t = tracker.lock();
let peer = t.peer_state(from);
let lag = t.peer_lag(from, now);
let fresh_window = std::time::Duration::from_millis(
t.heartbeat_ms().saturating_mul(t.miss_threshold() as u64),
);
matches!(
(peer, lag),
(Some(p), Some(elapsed))
if elapsed < fresh_window && p.tail_seq > new_tail
)
};
if strike {
backoff.lock().record_empty(from, now);
}
}
tracing::trace!(
from = from,
new_tail = new_tail,
"replication: applied chunk"
);
}
Err(super::replication_catchup::ApplyError::AppendFailed(detail)) => {
handle_disk_pressure(coordinator, &inputs.file, &detail, from).await;
}
Err(super::replication_catchup::ApplyError::GapBeforeChunk {
first_seq,
local_next,
divergence_suspected,
}) => {
coordinator.metrics().incr_skip_ahead();
match inputs.file.skip_to(first_seq) {
Ok(()) => {
debug_assert!(first_seq > local_next);
if divergence_suspected {
tracing::warn!(
from = from,
from_seq = local_next,
to_seq = first_seq,
gap = first_seq.saturating_sub(local_next),
"replication: skip-ahead crossed leader's retained range — \
divergent log suspected (split-brain post-mortem)"
);
} else {
tracing::warn!(
from = from,
from_seq = local_next,
to_seq = first_seq,
gap = first_seq.saturating_sub(local_next),
"replication: skip-ahead — leader trimmed past local tail"
);
}
match apply_sync_response(&inputs.file, &msg, inputs.channel_id) {
Ok(new_tail) => {
coordinator.record_tail_seq(new_tail);
tracing::trace!(
from = from,
new_tail = new_tail,
"replication: applied chunk after skip-ahead"
);
}
Err(e) => {
tracing::warn!(
from = from,
error = ?e,
"replication: apply after skip-ahead failed"
);
}
}
}
Err(e) => {
tracing::warn!(
from = from,
first_seq = first_seq,
error = %e,
"replication: skip_to rejected; falling back to heartbeat-cycle recovery"
);
}
}
}
Err(e) => {
tracing::warn!(
from = from,
error = ?e,
"replication: apply_sync_response failed"
);
}
}
}
Inbound::SyncNack { from, msg } => {
if msg.channel_id != inputs.channel_id {
tracing::trace!(
from = from,
"replication: dropping SyncNack for wrong channel"
);
return;
}
let leader_belief = tracker.lock().believed_leader();
if leader_belief != Some(from) {
tracing::trace!(
from = from,
believed_leader = ?leader_belief,
"replication: dropping SyncNack from non-leader peer"
);
return;
}
{
let now = tokio::time::Instant::now().into_std();
if !outstanding.lock().take(from, msg.request_id, now) {
tracing::trace!(
from = from,
request_id = msg.request_id,
"replication: dropping SyncNack with unknown request_id"
);
return;
}
}
use super::replication::SyncNackError;
match msg.error_code {
SyncNackError::NotLeader => {
clear_leader_belief_and_tokens(tracker, outstanding);
tracing::trace!(
from = from,
"replication: NACK NotLeader — cleared believed leader"
);
}
SyncNackError::BadRange => {
coordinator.metrics().incr_skip_ahead();
let target = if msg.leader_first_retained_seq > 0 {
msg.leader_first_retained_seq
} else {
msg.since_seq.saturating_add(1)
};
match inputs.file.skip_to(target) {
Ok(()) => tracing::warn!(
from = from,
since_seq = msg.since_seq,
leader_first_retained_seq = msg.leader_first_retained_seq,
target = target,
"replication: NACK BadRange — local tail skipped to leader's first-retained seq"
),
Err(e) => tracing::trace!(
from = from,
error = %e,
"replication: NACK BadRange — skip_to rejected, falling back to heartbeat retry"
),
}
}
SyncNackError::Backpressure => {
tracing::trace!(
from = from,
"replication: NACK Backpressure — deferring next request"
);
}
SyncNackError::ChannelClosed => {
tracing::warn!(
from = from,
"replication: NACK ChannelClosed — withdrawing role"
);
let _ = coordinator
.transition_to(
ReplicaRole::Idle,
super::replication_state::TransitionSignal::ChannelClose,
)
.await;
}
}
}
}
}
async fn handle_disk_pressure(
coordinator: &Arc<ReplicationCoordinator>,
file: &super::file::RedexFile,
detail: &str,
from: NodeId,
) {
use super::replication_config::UnderCapacity;
coordinator.metrics().incr_under_capacity();
let policy = coordinator.config().on_under_capacity;
match policy {
UnderCapacity::Withdraw => {
tracing::warn!(
from = from,
detail = detail,
"replication: disk pressure → withdrawing role"
);
let signal = match coordinator.role() {
ReplicaRole::Replica => {
super::replication_state::TransitionSignal::DiskPressureWithdraw
}
ReplicaRole::Idle => {
super::replication_state::TransitionSignal::ChannelClose
}
ReplicaRole::Leader => {
super::replication_state::TransitionSignal::LeaderDiskPressureWithdraw
}
ReplicaRole::Candidate => {
super::replication_state::TransitionSignal::CandidateDiskPressureWithdraw
}
};
if let Err(e) = coordinator.transition_to(ReplicaRole::Idle, signal).await {
tracing::warn!(
error=?e,
"replication: disk-pressure withdraw transition failed"
);
}
}
UnderCapacity::EvictOldest => {
tracing::warn!(
from = from,
detail = detail,
"replication: disk pressure → sweeping retention"
);
file.sweep_retention();
}
}
}
fn estimate_response_bytes(resp: &SyncResponse) -> u64 {
let mut bytes: u64 = 47;
for ev in &resp.events {
bytes += 8 + 4 + ev.payload.len() as u64;
}
bytes
}
#[cfg(test)]
mod tests {
use super::*;
use crate::adapter::net::channel::ChannelName;
use crate::adapter::net::redex::replication::ReplicaRole;
use crate::adapter::net::redex::replication_config::ReplicationConfig;
use crate::adapter::net::redex::replication_coordinator::ChainTagSink;
use crate::adapter::net::redex::replication_metrics::ReplicationMetricsRegistry;
use parking_lot::Mutex as ParkingMutex;
#[derive(Default)]
struct NoopTagSink;
#[async_trait::async_trait]
impl ChainTagSink for NoopTagSink {
async fn announce_chain(
&self,
_origin_hash: u64,
_tip_seq: u64,
) -> Result<(), AdapterError> {
Ok(())
}
async fn withdraw_chain(&self, _origin_hash: u64) -> Result<(), AdapterError> {
Ok(())
}
}
#[derive(Default)]
struct RecorderDispatcher {
heartbeats: ParkingMutex<Vec<(NodeId, SyncHeartbeat)>>,
sync_requests: ParkingMutex<Vec<(NodeId, SyncRequest)>>,
sync_responses: ParkingMutex<Vec<(NodeId, SyncResponse)>>,
sync_nacks: ParkingMutex<Vec<(NodeId, SyncNack)>>,
}
#[async_trait::async_trait]
impl ReplicationDispatcher for RecorderDispatcher {
async fn send_heartbeat(
&self,
target: NodeId,
msg: SyncHeartbeat,
) -> Result<(), AdapterError> {
self.heartbeats.lock().push((target, msg));
Ok(())
}
async fn send_sync_request(
&self,
target: NodeId,
msg: SyncRequest,
) -> Result<(), AdapterError> {
self.sync_requests.lock().push((target, msg));
Ok(())
}
async fn send_sync_response(
&self,
target: NodeId,
msg: SyncResponse,
) -> Result<(), AdapterError> {
self.sync_responses.lock().push((target, msg));
Ok(())
}
async fn send_sync_nack(&self, target: NodeId, msg: SyncNack) -> Result<(), AdapterError> {
self.sync_nacks.lock().push((target, msg));
Ok(())
}
}
fn channel_id_for(name: &str) -> ChannelId {
let cn = ChannelName::new(name).unwrap();
ChannelId::from_name(&cn)
}
fn build_file_for_tests() -> RedexFile {
use crate::adapter::net::redex::config::RedexFileConfig;
use crate::adapter::net::redex::manager::Redex;
let r = Redex::new();
let cn = ChannelName::new("test/runtime").unwrap();
r.open_file(&cn, RedexFileConfig::default()).unwrap()
}
fn build_inputs(self_id: NodeId, replicas: Vec<NodeId>, hb_ms: u64) -> RuntimeInputs {
RuntimeInputs {
channel: ChannelIdentity {
channel_name: "test/runtime".to_string(),
origin_hash: 0xCAFE_BABE,
},
channel_id: channel_id_for("test/runtime"),
self_node_id: self_id,
replica_set: replicas,
heartbeat_ms: hb_ms,
wall_clock_provider: Arc::new(|| 1_700_000_000_000),
tail_provider: Arc::new(|| 42),
rtt_lookup: Arc::new(|_| Some(Duration::from_millis(5))),
file: build_file_for_tests(),
default_bandwidth_class: Default::default(),
background_fraction: 0.3,
}
}
fn build_coordinator(
self_id: NodeId,
replicas: Vec<NodeId>,
) -> (Arc<ReplicationCoordinator>, ReplicationMetricsRegistry) {
let _ = (self_id, replicas);
let registry = ReplicationMetricsRegistry::new();
let sink: Arc<dyn ChainTagSink> = Arc::new(NoopTagSink);
let coordinator = ReplicationCoordinator::new(
ChannelIdentity {
channel_name: "test/runtime".to_string(),
origin_hash: 0xCAFE_BABE,
},
ReplicationConfig::new(),
sink,
®istry,
);
(Arc::new(coordinator), registry)
}
fn build_budget() -> Arc<Mutex<BandwidthBudget>> {
let now = Instant::now();
Arc::new(Mutex::new(BandwidthBudget::new(0.5, 10_000_000, now)))
}
fn build_backoff() -> Arc<Mutex<CatchupBackoff>> {
Arc::new(Mutex::new(CatchupBackoff::new()))
}
fn build_state(
tracker: Arc<Mutex<HeartbeatTracker>>,
budget: Arc<Mutex<BandwidthBudget>>,
) -> RuntimeState {
RuntimeState {
tracker,
budget,
backoff: Arc::new(Mutex::new(CatchupBackoff::new())),
outstanding: Arc::new(Mutex::new(OutstandingRequests::new())),
}
}
#[tokio::test]
async fn leader_emits_heartbeat_to_peers_each_tick() {
let inputs = build_inputs(0x10, vec![0x10, 0x20, 0x30], 100);
let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20, 0x30]);
coordinator
.transition_to(
ReplicaRole::Replica,
super::super::replication_state::TransitionSignal::CapabilitySelected,
)
.await
.unwrap();
coordinator
.transition_to(
ReplicaRole::Candidate,
super::super::replication_state::TransitionSignal::MissedHeartbeats,
)
.await
.unwrap();
coordinator
.transition_to(
ReplicaRole::Leader,
super::super::replication_state::TransitionSignal::ElectionWon,
)
.await
.unwrap();
let dispatcher = Arc::new(RecorderDispatcher::default());
let handle =
spawn_replication_runtime(inputs, coordinator, dispatcher.clone(), build_budget());
tokio::time::sleep(Duration::from_millis(350)).await;
let heartbeats = dispatcher.heartbeats.lock().clone();
assert!(
heartbeats.len() >= 2,
"expected ≥ 2 heartbeats over 350ms at 100ms cadence; got {}",
heartbeats.len(),
);
for (target, msg) in &heartbeats {
assert!(*target == 0x20 || *target == 0x30);
assert_eq!(msg.role, ReplicaRole::Leader);
assert_eq!(msg.tail_seq, 42);
}
handle.cancel().await;
assert!(handle.is_stopped());
}
#[tokio::test]
async fn inbound_heartbeat_records_into_tracker() {
let inputs = build_inputs(0x10, vec![0x10, 0x20], 100);
let cid = inputs.channel_id;
let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
coordinator
.transition_to(
ReplicaRole::Replica,
super::super::replication_state::TransitionSignal::CapabilitySelected,
)
.await
.unwrap();
let dispatcher = Arc::new(RecorderDispatcher::default());
let handle =
spawn_replication_runtime(inputs, coordinator, dispatcher.clone(), build_budget());
handle
.dispatch(Inbound::Heartbeat {
from: 0x20,
msg: SyncHeartbeat {
channel_id: cid,
tail_seq: 99,
role: ReplicaRole::Leader,
wall_clock_ms: 1_700_000_000_000,
},
})
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(150)).await;
let _heartbeats = dispatcher.heartbeats.lock().clone();
handle.cancel().await;
}
#[tokio::test]
async fn peer_leader_observation_demotes_loser_to_replica() {
let inputs = build_inputs(0x10, vec![0x10, 0x20], 60_000);
let cid = inputs.channel_id;
let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
for (role, signal) in [
(
ReplicaRole::Replica,
super::super::replication_state::TransitionSignal::CapabilitySelected,
),
(
ReplicaRole::Candidate,
super::super::replication_state::TransitionSignal::MissedHeartbeats,
),
(
ReplicaRole::Leader,
super::super::replication_state::TransitionSignal::ElectionWon,
),
] {
coordinator.transition_to(role, signal).await.unwrap();
}
assert_eq!(coordinator.role(), ReplicaRole::Leader);
let dispatcher = Arc::new(RecorderDispatcher::default());
let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
let budget = build_budget();
on_inbound(
&inputs,
&coordinator,
&(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
&build_state(tracker.clone(), budget.clone()),
Inbound::Heartbeat {
from: 0x20,
msg: SyncHeartbeat {
channel_id: cid,
tail_seq: 99,
role: ReplicaRole::Leader,
wall_clock_ms: 0,
},
},
)
.await;
assert_eq!(
coordinator.role(),
ReplicaRole::Replica,
"Leader with lower tail must concede on PeerLeaderObserved"
);
}
#[tokio::test]
async fn peer_leader_tail_tie_lower_node_id_wins() {
let inputs = build_inputs(0x10, vec![0x10, 0x20], 60_000);
let cid = inputs.channel_id;
let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
for (role, signal) in [
(
ReplicaRole::Replica,
super::super::replication_state::TransitionSignal::CapabilitySelected,
),
(
ReplicaRole::Candidate,
super::super::replication_state::TransitionSignal::MissedHeartbeats,
),
(
ReplicaRole::Leader,
super::super::replication_state::TransitionSignal::ElectionWon,
),
] {
coordinator.transition_to(role, signal).await.unwrap();
}
let dispatcher = Arc::new(RecorderDispatcher::default());
let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
let budget = build_budget();
on_inbound(
&inputs,
&coordinator,
&(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
&build_state(tracker.clone(), budget.clone()),
Inbound::Heartbeat {
from: 0x20,
msg: SyncHeartbeat {
channel_id: cid,
tail_seq: 42,
role: ReplicaRole::Leader,
wall_clock_ms: 0,
},
},
)
.await;
assert_eq!(
coordinator.role(),
ReplicaRole::Leader,
"tail-tie tiebreak: lower node id keeps Leader"
);
}
#[tokio::test]
async fn replica_with_silent_leader_runs_election_and_promotes_self() {
let inputs = build_inputs(0x10, vec![0x10, 0x20], 50);
let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
coordinator
.transition_to(
ReplicaRole::Replica,
super::super::replication_state::TransitionSignal::CapabilitySelected,
)
.await
.unwrap();
let dispatcher = Arc::new(RecorderDispatcher::default());
let handle = spawn_replication_runtime(
inputs,
coordinator.clone(),
dispatcher.clone(),
build_budget(),
);
let cid = channel_id_for("test/runtime");
handle
.dispatch(Inbound::Heartbeat {
from: 0x20,
msg: SyncHeartbeat {
channel_id: cid,
tail_seq: 99,
role: ReplicaRole::Leader,
wall_clock_ms: 1_700_000_000_000,
},
})
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(500)).await;
assert_eq!(coordinator.role(), ReplicaRole::Leader);
handle.cancel().await;
}
struct FailingAfterNAnnounceSink {
remaining: ParkingMutex<usize>,
}
#[async_trait::async_trait]
impl ChainTagSink for FailingAfterNAnnounceSink {
async fn announce_chain(
&self,
_origin_hash: u64,
_tip_seq: u64,
) -> Result<(), AdapterError> {
let mut r = self.remaining.lock();
if *r == 0 {
return Err(AdapterError::Transient(
"simulated sink failure".to_string(),
));
}
*r -= 1;
Ok(())
}
async fn withdraw_chain(&self, _origin_hash: u64) -> Result<(), AdapterError> {
Ok(())
}
}
#[tokio::test]
async fn post_election_tag_sink_failure_does_not_strand_candidate() {
let inputs = build_inputs(0x10, vec![0x10, 0x20], 50);
let cid = inputs.channel_id;
let registry = ReplicationMetricsRegistry::new();
let sink: Arc<dyn ChainTagSink> = Arc::new(FailingAfterNAnnounceSink {
remaining: ParkingMutex::new(1),
});
let coordinator = Arc::new(ReplicationCoordinator::new(
ChannelIdentity {
channel_name: "test/runtime".to_string(),
origin_hash: 0xCAFE_BABE,
},
ReplicationConfig::new(),
sink,
®istry,
));
coordinator
.transition_to(
ReplicaRole::Replica,
super::super::replication_state::TransitionSignal::CapabilitySelected,
)
.await
.unwrap();
let dispatcher = Arc::new(RecorderDispatcher::default());
let handle = spawn_replication_runtime(
inputs,
coordinator.clone(),
dispatcher.clone(),
build_budget(),
);
handle
.dispatch(Inbound::Heartbeat {
from: 0x20,
msg: SyncHeartbeat {
channel_id: cid,
tail_seq: 99,
role: ReplicaRole::Leader,
wall_clock_ms: 1_700_000_000_000,
},
})
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(500)).await;
assert_eq!(coordinator.role(), ReplicaRole::Leader);
handle.cancel().await;
}
#[tokio::test]
async fn shutdown_drives_idle_transition() {
let inputs = build_inputs(0x10, vec![0x10, 0x20], 100);
let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
coordinator
.transition_to(
ReplicaRole::Replica,
super::super::replication_state::TransitionSignal::CapabilitySelected,
)
.await
.unwrap();
let dispatcher = Arc::new(RecorderDispatcher::default());
let handle =
spawn_replication_runtime(inputs, coordinator.clone(), dispatcher, build_budget());
handle.cancel().await;
assert!(handle.is_stopped());
assert_eq!(coordinator.role(), ReplicaRole::Idle);
}
#[tokio::test]
async fn leader_on_tick_clamps_advertised_tail_to_max_peer_tail() {
let self_id: NodeId = 0x10;
let peer_id: NodeId = 0x20;
let inputs = RuntimeInputs {
channel: ChannelIdentity {
channel_name: "test/runtime".to_string(),
origin_hash: 0xCAFE_BABE,
},
channel_id: channel_id_for("test/runtime"),
self_node_id: self_id,
replica_set: vec![self_id, peer_id],
heartbeat_ms: 100,
wall_clock_provider: Arc::new(|| 1_700_000_000_000),
tail_provider: Arc::new(|| 100),
rtt_lookup: Arc::new(|_| Some(Duration::from_millis(5))),
file: build_file_for_tests(),
default_bandwidth_class: Default::default(),
background_fraction: 0.3,
};
let (coordinator, _registry) = build_coordinator(self_id, vec![self_id, peer_id]);
for (role, signal) in [
(
ReplicaRole::Replica,
super::super::replication_state::TransitionSignal::CapabilitySelected,
),
(
ReplicaRole::Candidate,
super::super::replication_state::TransitionSignal::MissedHeartbeats,
),
(
ReplicaRole::Leader,
super::super::replication_state::TransitionSignal::ElectionWon,
),
] {
coordinator.transition_to(role, signal).await.unwrap();
}
let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
tracker
.lock()
.record_heartbeat(peer_id, ReplicaRole::Replica, 50, Instant::now());
let dispatcher: Arc<dyn ReplicationDispatcher> = Arc::new(RecorderDispatcher::default());
on_tick(
&inputs,
&coordinator,
&dispatcher,
&build_state(tracker.clone(), build_budget()),
)
.await;
assert_eq!(
coordinator.tail_seq(),
50,
"leader must advertise the highest peer-confirmed tail (50), \
not the pre-replication local tail (100); pre-fix this would \
be 100 and a crash here would lose the 50 un-replicated events \
while failover discovery still thought tip_seq=100",
);
}
#[tokio::test]
async fn leader_on_tick_falls_back_to_local_tail_with_no_peers() {
let self_id: NodeId = 0x10;
let peer_id: NodeId = 0x20;
let inputs = RuntimeInputs {
channel: ChannelIdentity {
channel_name: "test/runtime".to_string(),
origin_hash: 0xCAFE_BABE,
},
channel_id: channel_id_for("test/runtime"),
self_node_id: self_id,
replica_set: vec![self_id, peer_id],
heartbeat_ms: 100,
wall_clock_provider: Arc::new(|| 1_700_000_000_000),
tail_provider: Arc::new(|| 77),
rtt_lookup: Arc::new(|_| Some(Duration::from_millis(5))),
file: build_file_for_tests(),
default_bandwidth_class: Default::default(),
background_fraction: 0.3,
};
let (coordinator, _registry) = build_coordinator(self_id, vec![self_id, peer_id]);
for (role, signal) in [
(
ReplicaRole::Replica,
super::super::replication_state::TransitionSignal::CapabilitySelected,
),
(
ReplicaRole::Candidate,
super::super::replication_state::TransitionSignal::MissedHeartbeats,
),
(
ReplicaRole::Leader,
super::super::replication_state::TransitionSignal::ElectionWon,
),
] {
coordinator.transition_to(role, signal).await.unwrap();
}
let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
let dispatcher: Arc<dyn ReplicationDispatcher> = Arc::new(RecorderDispatcher::default());
on_tick(
&inputs,
&coordinator,
&dispatcher,
&build_state(tracker.clone(), build_budget()),
)
.await;
assert_eq!(
coordinator.tail_seq(),
77,
"no peer heartbeats → no clamp, raw local tail is advertised",
);
}
#[tokio::test]
async fn empty_response_does_not_strike_on_stale_heartbeat() {
let inputs = build_inputs(0x10, vec![0x10, 0x20], 100);
let cid = inputs.channel_id;
let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
coordinator
.transition_to(
ReplicaRole::Replica,
super::super::replication_state::TransitionSignal::CapabilitySelected,
)
.await
.unwrap();
let dispatcher = Arc::new(RecorderDispatcher::default());
let budget = build_budget();
let backoff = build_backoff();
let outstanding = Arc::new(Mutex::new(OutstandingRequests::new()));
let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
let stale_when = Instant::now() - Duration::from_secs(10);
tracker
.lock()
.record_heartbeat(0x20, ReplicaRole::Leader, 200, stale_when);
outstanding.lock().record(0x20, 0, Instant::now());
let event = Inbound::SyncResponse {
from: 0x20,
msg: SyncResponse {
channel_id: cid,
first_seq: 0,
leader_first_retained_seq: 0,
events: Vec::new(),
request_id: 0,
},
};
on_inbound(
&inputs,
&coordinator,
&(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
&RuntimeState {
tracker: tracker.clone(),
budget: budget.clone(),
backoff: backoff.clone(),
outstanding: outstanding.clone(),
},
event,
)
.await;
assert!(
!backoff.lock().is_in_backoff(0x20, Instant::now()),
"stale-heartbeat empty must NOT engage backoff"
);
for _ in 0..CATCHUP_BACKOFF_THRESHOLD + 1 {
outstanding.lock().record(0x20, 0, Instant::now());
let event = Inbound::SyncResponse {
from: 0x20,
msg: SyncResponse {
channel_id: cid,
first_seq: 0,
leader_first_retained_seq: 0,
events: Vec::new(),
request_id: 0,
},
};
on_inbound(
&inputs,
&coordinator,
&(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
&RuntimeState {
tracker: tracker.clone(),
budget: budget.clone(),
backoff: backoff.clone(),
outstanding: outstanding.clone(),
},
event,
)
.await;
}
assert!(
!backoff.lock().is_in_backoff(0x20, Instant::now()),
"accumulated stale-heartbeat empties must NEVER engage backoff",
);
}
#[test]
fn catchup_backoff_threshold_and_reset() {
let now = Instant::now();
let mut b = CatchupBackoff::new();
for _ in 0..CATCHUP_BACKOFF_THRESHOLD {
b.record_empty(0x20, now);
}
assert!(
!b.is_in_backoff(0x20, now),
"backoff must not engage before the threshold is crossed"
);
b.record_empty(0x20, now);
assert!(
b.is_in_backoff(0x20, now),
"backoff must engage once the empty count crosses the threshold"
);
b.record_progress(0x20);
assert!(
!b.is_in_backoff(0x20, now),
"record_progress must clear the backoff window"
);
}
#[tokio::test]
async fn priority_lane_drains_under_low_priority_saturation() {
let inputs = build_inputs(0x10, vec![0x10, 0x20], 60_000);
let cid = inputs.channel_id;
let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
coordinator
.transition_to(
ReplicaRole::Replica,
super::super::replication_state::TransitionSignal::CapabilitySelected,
)
.await
.unwrap();
let dispatcher = Arc::new(RecorderDispatcher::default());
let handle =
spawn_replication_runtime(inputs, coordinator.clone(), dispatcher, build_budget());
for _ in 0..RUNTIME_INBOX_CAPACITY * 2 {
let _ = handle.try_dispatch(Inbound::Heartbeat {
from: 0x20,
msg: SyncHeartbeat {
channel_id: cid,
tail_seq: 0,
role: ReplicaRole::Replica,
wall_clock_ms: 0,
},
});
}
let cancel_fut = handle.cancel();
let bounded = tokio::time::timeout(Duration::from_secs(2), cancel_fut).await;
assert!(
bounded.is_ok(),
"shutdown on priority lane must drain under low-priority saturation"
);
assert_eq!(
coordinator.role(),
ReplicaRole::Idle,
"graceful Idle transition must complete via priority-lane Shutdown"
);
}
#[tokio::test]
async fn try_dispatch_returns_event_on_full_buffer() {
let inputs = build_inputs(0x10, vec![0x10, 0x20], 60_000); let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
coordinator
.transition_to(
ReplicaRole::Replica,
super::super::replication_state::TransitionSignal::CapabilitySelected,
)
.await
.unwrap();
let dispatcher = Arc::new(RecorderDispatcher::default());
let handle = spawn_replication_runtime(inputs, coordinator, dispatcher, build_budget());
let cid = channel_id_for("test/runtime");
for _ in 0..RUNTIME_INBOX_CAPACITY + 10 {
let event = Inbound::Heartbeat {
from: 0x20,
msg: SyncHeartbeat {
channel_id: cid,
tail_seq: 0,
role: ReplicaRole::Replica,
wall_clock_ms: 0,
},
};
let _ = handle.try_dispatch(event);
}
handle.cancel().await;
}
#[tokio::test]
async fn cancel_with_full_inbox_does_not_hang() {
let inputs = build_inputs(0x10, vec![0x10, 0x20], 60_000);
let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
coordinator
.transition_to(
ReplicaRole::Replica,
super::super::replication_state::TransitionSignal::CapabilitySelected,
)
.await
.unwrap();
let dispatcher = Arc::new(RecorderDispatcher::default());
let handle = spawn_replication_runtime(inputs, coordinator, dispatcher, build_budget());
let cid = channel_id_for("test/runtime");
for _ in 0..RUNTIME_INBOX_CAPACITY {
let _ = handle.try_dispatch(Inbound::Heartbeat {
from: 0x20,
msg: SyncHeartbeat {
channel_id: cid,
tail_seq: 0,
role: ReplicaRole::Replica,
wall_clock_ms: 0,
},
});
}
tokio::time::timeout(Duration::from_secs(2), handle.cancel())
.await
.expect("cancel() must not hang on full inbox");
assert!(handle.is_stopped());
}
#[tokio::test]
async fn dropping_handle_aborts_task() {
let inputs = build_inputs(0x10, vec![0x10, 0x20], 60_000);
let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
let dispatcher = Arc::new(RecorderDispatcher::default());
let dispatcher_clone: Arc<dyn ReplicationDispatcher> = dispatcher.clone();
let handle =
spawn_replication_runtime(inputs, coordinator, dispatcher_clone, build_budget());
tokio::time::sleep(Duration::from_millis(20)).await;
assert!(Arc::strong_count(&dispatcher) >= 2);
drop(handle);
for _ in 0..20 {
tokio::time::sleep(Duration::from_millis(10)).await;
if Arc::strong_count(&dispatcher) == 1 {
return;
}
}
panic!(
"task did not release dispatcher Arc after handle drop; strong_count = {}",
Arc::strong_count(&dispatcher)
);
}
#[tokio::test]
async fn dispatch_after_cancel_errors() {
let inputs = build_inputs(0x10, vec![0x10, 0x20], 100);
let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
let dispatcher = Arc::new(RecorderDispatcher::default());
let handle = spawn_replication_runtime(inputs, coordinator, dispatcher, build_budget());
handle.cancel().await;
let cid = channel_id_for("test/runtime");
let result = handle
.dispatch(Inbound::Heartbeat {
from: 0x20,
msg: SyncHeartbeat {
channel_id: cid,
tail_seq: 0,
role: ReplicaRole::Replica,
wall_clock_ms: 0,
},
})
.await;
assert!(result.is_err(), "dispatch must error after cancel");
}
#[tokio::test]
async fn channel_id_mismatch_drops_heartbeat() {
let inputs = build_inputs(0x10, vec![0x10, 0x20], 100);
let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
coordinator
.transition_to(
ReplicaRole::Replica,
super::super::replication_state::TransitionSignal::CapabilitySelected,
)
.await
.unwrap();
let dispatcher = Arc::new(RecorderDispatcher::default());
let handle =
spawn_replication_runtime(inputs, coordinator.clone(), dispatcher, build_budget());
let wrong = channel_id_for("test/wrong_channel");
handle
.dispatch(Inbound::Heartbeat {
from: 0x20,
msg: SyncHeartbeat {
channel_id: wrong,
tail_seq: 99,
role: ReplicaRole::Leader,
wall_clock_ms: 0,
},
})
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(500)).await;
let role = coordinator.role();
assert!(
matches!(role, ReplicaRole::Replica | ReplicaRole::Leader),
"expected Replica or Leader; got {role:?}",
);
handle.cancel().await;
}
#[test]
fn observe_lag_idle_emits_none() {
let tracker = HeartbeatTracker::new(500);
let now = Instant::now();
match observe_lag(ReplicaRole::Idle, &[0x10, 0x20], 0x10, &tracker, now) {
LagObservation::None => {}
other => panic!("expected None for Idle, got {other:?}"),
}
}
#[test]
fn observe_lag_candidate_emits_none() {
let tracker = HeartbeatTracker::new(500);
let now = Instant::now();
match observe_lag(ReplicaRole::Candidate, &[0x10, 0x20], 0x10, &tracker, now) {
LagObservation::None => {}
other => panic!("expected None for Candidate, got {other:?}"),
}
}
#[test]
fn observe_lag_leader_with_no_peer_observations_emits_none() {
let tracker = HeartbeatTracker::new(500);
let now = Instant::now();
match observe_lag(
ReplicaRole::Leader,
&[0x10, 0x20, 0x30],
0x10,
&tracker,
now,
) {
LagObservation::None => {}
other => panic!("expected None when peers have not heartbeated, got {other:?}"),
}
}
#[test]
fn observe_lag_leader_picks_worst_replica() {
let mut tracker = HeartbeatTracker::new(500);
let base = Instant::now();
tracker.record_heartbeat(0x20, ReplicaRole::Replica, 0, base);
tracker.record_heartbeat(
0x30,
ReplicaRole::Replica,
0,
base + Duration::from_millis(100),
);
let now = base + Duration::from_millis(1000);
match observe_lag(
ReplicaRole::Leader,
&[0x10, 0x20, 0x30],
0x10,
&tracker,
now,
) {
LagObservation::Leader(d) => assert_eq!(d, Duration::from_millis(1000)),
other => panic!("expected Leader(1000ms), got {other:?}"),
}
}
#[test]
fn observe_lag_replica_emits_believed_leader_lag() {
let mut tracker = HeartbeatTracker::new(500);
let base = Instant::now();
tracker.record_heartbeat(0x42, ReplicaRole::Leader, 99, base);
let now = base + Duration::from_millis(250);
match observe_lag(ReplicaRole::Replica, &[0x10, 0x42], 0x10, &tracker, now) {
LagObservation::Replica(d) => assert_eq!(d, Duration::from_millis(250)),
other => panic!("expected Replica(250ms), got {other:?}"),
}
}
#[test]
fn observe_lag_replica_with_no_believed_leader_emits_none() {
let tracker = HeartbeatTracker::new(500);
let now = Instant::now();
match observe_lag(ReplicaRole::Replica, &[0x10, 0x42], 0x10, &tracker, now) {
LagObservation::None => {}
other => panic!("expected None for replica with no believed leader, got {other:?}"),
}
}
#[test]
fn observe_lag_leader_skips_self_in_replica_set() {
let mut tracker = HeartbeatTracker::new(500);
let base = Instant::now();
tracker.record_heartbeat(0x20, ReplicaRole::Replica, 0, base);
let now = base + Duration::from_millis(500);
match observe_lag(ReplicaRole::Leader, &[0x10, 0x20], 0x10, &tracker, now) {
LagObservation::Leader(d) => assert_eq!(d, Duration::from_millis(500)),
other => panic!("expected Leader(500ms), got {other:?}"),
}
}
fn build_coordinator_with_policy(
policy: super::super::replication_config::UnderCapacity,
) -> (
Arc<ReplicationCoordinator>,
Arc<super::super::replication_metrics::ChannelMetricsAtomic>,
) {
let registry = ReplicationMetricsRegistry::new();
let sink: Arc<dyn ChainTagSink> = Arc::new(NoopTagSink);
let config = ReplicationConfig::new().with_on_under_capacity(policy);
let coordinator = ReplicationCoordinator::new(
ChannelIdentity {
channel_name: "test/runtime".to_string(),
origin_hash: 0xCAFE_BABE,
},
config,
sink,
®istry,
);
let metrics = registry.for_channel("test/runtime");
(Arc::new(coordinator), metrics)
}
#[tokio::test]
async fn disk_pressure_withdraw_drives_idle_transition() {
let (coord, metrics) = build_coordinator_with_policy(
super::super::replication_config::UnderCapacity::Withdraw,
);
coord
.transition_to(
ReplicaRole::Replica,
super::super::replication_state::TransitionSignal::CapabilitySelected,
)
.await
.unwrap();
assert_eq!(coord.role(), ReplicaRole::Replica);
let file = build_file_for_tests();
handle_disk_pressure(&coord, &file, "test detail", 0x20).await;
assert_eq!(coord.role(), ReplicaRole::Idle, "Withdraw flips to Idle");
assert_eq!(
metrics
.under_capacity_total
.load(std::sync::atomic::Ordering::Relaxed),
1,
"Withdraw bumps under_capacity_total"
);
}
#[tokio::test]
async fn disk_pressure_evict_oldest_keeps_role_and_sweeps() {
let (coord, metrics) = build_coordinator_with_policy(
super::super::replication_config::UnderCapacity::EvictOldest,
);
coord
.transition_to(
ReplicaRole::Replica,
super::super::replication_state::TransitionSignal::CapabilitySelected,
)
.await
.unwrap();
use crate::adapter::net::redex::config::RedexFileConfig;
use crate::adapter::net::redex::manager::Redex;
let r = Redex::new();
let cn = ChannelName::new("test/runtime").unwrap();
let cfg = RedexFileConfig::default().with_retention_max_events(1);
let file = r.open_file(&cn, cfg).unwrap();
for i in 0..5 {
file.append(format!("event-{i}").as_bytes()).unwrap();
}
assert_eq!(file.len(), 5);
handle_disk_pressure(&coord, &file, "test detail", 0x20).await;
assert_eq!(
coord.role(),
ReplicaRole::Replica,
"EvictOldest preserves Replica role"
);
assert_eq!(file.len(), 1, "retention sweep dropped to cap of 1");
assert_eq!(
metrics
.under_capacity_total
.load(std::sync::atomic::Ordering::Relaxed),
1,
"EvictOldest also bumps under_capacity_total"
);
}
#[tokio::test]
async fn disk_pressure_withdraw_from_leader_picks_channel_close_signal() {
let (coord, _metrics) = build_coordinator_with_policy(
super::super::replication_config::UnderCapacity::Withdraw,
);
coord
.transition_to(
ReplicaRole::Replica,
super::super::replication_state::TransitionSignal::CapabilitySelected,
)
.await
.unwrap();
coord
.transition_to(
ReplicaRole::Candidate,
super::super::replication_state::TransitionSignal::MissedHeartbeats,
)
.await
.unwrap();
coord
.transition_to(
ReplicaRole::Leader,
super::super::replication_state::TransitionSignal::ElectionWon,
)
.await
.unwrap();
assert_eq!(coord.role(), ReplicaRole::Leader);
let file = build_file_for_tests();
handle_disk_pressure(&coord, &file, "test detail", 0x20).await;
assert_eq!(
coord.role(),
ReplicaRole::Idle,
"Leader disk-pressure must withdraw to Idle"
);
}
#[tokio::test]
async fn disk_pressure_withdraw_is_idempotent_on_idle_already() {
let (coord, metrics) = build_coordinator_with_policy(
super::super::replication_config::UnderCapacity::Withdraw,
);
let file = build_file_for_tests();
handle_disk_pressure(&coord, &file, "test detail", 0x20).await;
assert_eq!(coord.role(), ReplicaRole::Idle);
assert_eq!(
metrics
.under_capacity_total
.load(std::sync::atomic::Ordering::Relaxed),
1,
);
}
#[tokio::test]
async fn sync_request_post_op_role_flip_emits_notleader_nack() {
let inputs = build_inputs(0x10, vec![0x10, 0x20], 60_000);
let cid = inputs.channel_id;
let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
for (role, signal) in [
(
ReplicaRole::Replica,
super::super::replication_state::TransitionSignal::CapabilitySelected,
),
(
ReplicaRole::Candidate,
super::super::replication_state::TransitionSignal::MissedHeartbeats,
),
(
ReplicaRole::Leader,
super::super::replication_state::TransitionSignal::ElectionWon,
),
] {
coordinator.transition_to(role, signal).await.unwrap();
}
let dispatcher = Arc::new(RecorderDispatcher::default());
let budget = build_budget();
let event = Inbound::SyncRequest {
from: 0x20,
msg: SyncRequest {
channel_id: cid,
since_seq: 0,
chunk_max: 1024,
request_id: 0,
class: Default::default(),
},
};
coordinator
.transition_to(
ReplicaRole::Idle,
super::super::replication_state::TransitionSignal::GracefulRelinquish,
)
.await
.unwrap();
let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
on_inbound(
&inputs,
&coordinator,
&(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
&build_state(tracker.clone(), budget.clone()),
event,
)
.await;
let nacks = dispatcher.sync_nacks.lock().clone();
assert_eq!(nacks.len(), 1, "expected one NotLeader NACK");
let (target, nack) = &nacks[0];
assert_eq!(*target, 0x20);
assert_eq!(
nack.error_code,
super::super::replication::SyncNackError::NotLeader
);
assert_eq!(nack.channel_id, cid);
assert!(
dispatcher.sync_responses.lock().is_empty(),
"no SyncResponse must ship when role isn't Leader"
);
}
#[tokio::test]
async fn sync_request_with_wrong_channel_id_is_dropped() {
let inputs = build_inputs(0x10, vec![0x10, 0x20], 60_000);
let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
for (role, signal) in [
(
ReplicaRole::Replica,
super::super::replication_state::TransitionSignal::CapabilitySelected,
),
(
ReplicaRole::Candidate,
super::super::replication_state::TransitionSignal::MissedHeartbeats,
),
(
ReplicaRole::Leader,
super::super::replication_state::TransitionSignal::ElectionWon,
),
] {
coordinator.transition_to(role, signal).await.unwrap();
}
let dispatcher = Arc::new(RecorderDispatcher::default());
let budget = build_budget();
let wrong = channel_id_for("test/wrong_channel");
let event = Inbound::SyncRequest {
from: 0x20,
msg: SyncRequest {
channel_id: wrong,
since_seq: 0,
chunk_max: 1024,
request_id: 0,
class: Default::default(),
},
};
let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
on_inbound(
&inputs,
&coordinator,
&(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
&build_state(tracker.clone(), budget.clone()),
event,
)
.await;
assert!(
dispatcher.sync_nacks.lock().is_empty(),
"no NACK on wrong-channel — silently dropped"
);
assert!(
dispatcher.sync_responses.lock().is_empty(),
"no SyncResponse on wrong-channel"
);
}
#[tokio::test]
async fn inbound_from_non_replica_set_peer_is_dropped() {
let inputs = build_inputs(0x10, vec![0x10, 0x20], 60_000);
let cid = inputs.channel_id;
let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
coordinator
.transition_to(
ReplicaRole::Replica,
super::super::replication_state::TransitionSignal::CapabilitySelected,
)
.await
.unwrap();
let dispatcher = Arc::new(RecorderDispatcher::default());
let budget = build_budget();
let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
let baseline_next = inputs.file.next_seq();
let event = Inbound::SyncResponse {
from: 0x99,
msg: SyncResponse {
channel_id: cid,
first_seq: 0,
leader_first_retained_seq: 0,
events: Vec::new(),
request_id: 0,
},
};
on_inbound(
&inputs,
&coordinator,
&(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
&build_state(tracker.clone(), budget.clone()),
event,
)
.await;
assert_eq!(
inputs.file.next_seq(),
baseline_next,
"out-of-set peer must not advance local tail"
);
let event = Inbound::Heartbeat {
from: 0x99,
msg: SyncHeartbeat {
channel_id: cid,
tail_seq: 7,
role: ReplicaRole::Leader,
wall_clock_ms: 0,
},
};
on_inbound(
&inputs,
&coordinator,
&(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
&build_state(tracker.clone(), budget.clone()),
event,
)
.await;
assert!(
tracker.lock().believed_leader().is_none(),
"out-of-set heartbeat must not seed believed_leader"
);
}
#[tokio::test]
async fn sync_response_from_non_leader_replica_peer_is_dropped() {
let inputs = build_inputs(0x10, vec![0x10, 0x20, 0x30], 60_000);
let cid = inputs.channel_id;
let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20, 0x30]);
coordinator
.transition_to(
ReplicaRole::Replica,
super::super::replication_state::TransitionSignal::CapabilitySelected,
)
.await
.unwrap();
let dispatcher = Arc::new(RecorderDispatcher::default());
let budget = build_budget();
let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
tracker
.lock()
.record_heartbeat(0x20, ReplicaRole::Leader, 0, Instant::now());
assert_eq!(tracker.lock().believed_leader(), Some(0x20));
let baseline_next = inputs.file.next_seq();
let event = Inbound::SyncResponse {
from: 0x30,
msg: SyncResponse {
channel_id: cid,
first_seq: 0,
leader_first_retained_seq: 0,
events: Vec::new(),
request_id: 0,
},
};
on_inbound(
&inputs,
&coordinator,
&(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
&build_state(tracker.clone(), budget.clone()),
event,
)
.await;
assert_eq!(
inputs.file.next_seq(),
baseline_next,
"non-leader replica_set peer must not advance local tail via SyncResponse"
);
}
#[tokio::test]
async fn is_stopped_is_false_before_first_cancel_completes() {
let inputs = build_inputs(0x10, vec![0x10, 0x20], 60_000);
let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
let dispatcher = Arc::new(RecorderDispatcher::default());
let handle = spawn_replication_runtime(inputs, coordinator, dispatcher, build_budget());
assert!(
!handle.is_stopped(),
"fresh runtime must report not stopped"
);
handle.cancel().await;
assert!(
handle.is_stopped(),
"post-cancel().await runtime must report stopped"
);
handle.cancel().await;
assert!(handle.is_stopped());
}
#[tokio::test]
async fn sync_nack_notleader_clears_believed_leader() {
let inputs = build_inputs(0x10, vec![0x10, 0x20], 60_000);
let cid = inputs.channel_id;
let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
coordinator
.transition_to(
ReplicaRole::Replica,
super::super::replication_state::TransitionSignal::CapabilitySelected,
)
.await
.unwrap();
let dispatcher = Arc::new(RecorderDispatcher::default());
let budget = build_budget();
let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
tracker
.lock()
.record_heartbeat(0x20, ReplicaRole::Leader, 99, Instant::now());
assert_eq!(tracker.lock().believed_leader(), Some(0x20));
let outstanding = Arc::new(Mutex::new(OutstandingRequests::new()));
outstanding.lock().record(0x20, 0, Instant::now());
let event = Inbound::SyncNack {
from: 0x20,
msg: SyncNack {
channel_id: cid,
since_seq: 0,
error_code: super::super::replication::SyncNackError::NotLeader,
leader_first_retained_seq: 0,
detail: String::new(),
request_id: 0,
},
};
on_inbound(
&inputs,
&coordinator,
&(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
&RuntimeState {
tracker: tracker.clone(),
budget: budget.clone(),
backoff: build_backoff(),
outstanding: outstanding.clone(),
},
event,
)
.await;
assert!(
tracker.lock().believed_leader().is_none(),
"NACK NotLeader must clear the cached believed leader"
);
}
#[tokio::test]
async fn sync_nack_badrange_skips_local_tail() {
let inputs = build_inputs(0x10, vec![0x10, 0x20], 60_000);
let cid = inputs.channel_id;
let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
coordinator
.transition_to(
ReplicaRole::Replica,
super::super::replication_state::TransitionSignal::CapabilitySelected,
)
.await
.unwrap();
let dispatcher = Arc::new(RecorderDispatcher::default());
let budget = build_budget();
let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
tracker
.lock()
.record_heartbeat(0x20, ReplicaRole::Leader, 41, Instant::now());
let baseline_next = inputs.file.next_seq();
let outstanding = Arc::new(Mutex::new(OutstandingRequests::new()));
outstanding.lock().record(0x20, 0, Instant::now());
let event = Inbound::SyncNack {
from: 0x20,
msg: SyncNack {
channel_id: cid,
since_seq: 42,
error_code: super::super::replication::SyncNackError::BadRange,
leader_first_retained_seq: 100,
detail: String::new(),
request_id: 0,
},
};
on_inbound(
&inputs,
&coordinator,
&(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
&RuntimeState {
tracker: tracker.clone(),
budget: budget.clone(),
backoff: build_backoff(),
outstanding: outstanding.clone(),
},
event,
)
.await;
let after = inputs.file.next_seq();
assert!(
after > baseline_next,
"BadRange must advance local next_seq (got {after}, baseline {baseline_next})"
);
assert_eq!(
after, 100,
"BadRange with leader_first_retained_seq must jump local tail to the floor"
);
assert_eq!(
coordinator
.metrics()
.skip_ahead_total
.load(std::sync::atomic::Ordering::Relaxed),
1
);
}
#[tokio::test]
async fn post_election_failed_transition_preserves_believed_leader() {
let inputs = build_inputs(0x10, vec![0x10, 0x20], 50);
let cid = inputs.channel_id;
let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
coordinator
.transition_to(
ReplicaRole::Replica,
super::super::replication_state::TransitionSignal::CapabilitySelected,
)
.await
.unwrap();
let dispatcher = Arc::new(RecorderDispatcher::default());
let handle = spawn_replication_runtime(
inputs,
coordinator.clone(),
dispatcher.clone(),
build_budget(),
);
handle
.dispatch(Inbound::Heartbeat {
from: 0x20,
msg: SyncHeartbeat {
channel_id: cid,
tail_seq: 99,
role: ReplicaRole::Leader,
wall_clock_ms: 1_700_000_000_000,
},
})
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(20)).await;
coordinator
.transition_to(
ReplicaRole::Idle,
super::super::replication_state::TransitionSignal::ChannelClose,
)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(300)).await;
assert_eq!(coordinator.role(), ReplicaRole::Idle);
handle.cancel().await;
}
#[tokio::test]
async fn replica_on_tick_stamps_inputs_default_bandwidth_class_on_sync_request() {
use crate::adapter::net::redex::bandwidth::BandwidthClass;
let self_id: NodeId = 0x10;
let leader_id: NodeId = 0x20;
let inputs = RuntimeInputs {
channel: ChannelIdentity {
channel_name: "test/runtime".to_string(),
origin_hash: 0xCAFE_BABE,
},
channel_id: channel_id_for("test/runtime"),
self_node_id: self_id,
replica_set: vec![self_id, leader_id],
heartbeat_ms: 100,
wall_clock_provider: Arc::new(|| 1_700_000_000_000),
tail_provider: Arc::new(|| 0),
rtt_lookup: Arc::new(|_| Some(Duration::from_millis(5))),
file: build_file_for_tests(),
default_bandwidth_class: BandwidthClass::Background,
background_fraction: 0.3,
};
let (coordinator, _registry) = build_coordinator(self_id, vec![self_id, leader_id]);
coordinator
.transition_to(
ReplicaRole::Replica,
super::super::replication_state::TransitionSignal::CapabilitySelected,
)
.await
.unwrap();
let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
tracker
.lock()
.record_heartbeat(leader_id, ReplicaRole::Leader, 50, Instant::now());
let dispatcher = Arc::new(RecorderDispatcher::default());
let dyn_dispatcher: Arc<dyn ReplicationDispatcher> = dispatcher.clone();
on_tick(
&inputs,
&coordinator,
&dyn_dispatcher,
&build_state(tracker.clone(), build_budget()),
)
.await;
let sync_requests = dispatcher.sync_requests.lock().clone();
assert_eq!(
sync_requests.len(),
1,
"expected exactly one catchup SyncRequest"
);
let (target, req) = &sync_requests[0];
assert_eq!(*target, leader_id);
assert_eq!(
req.class,
BandwidthClass::Background,
"emitted SyncRequest must carry inputs.default_bandwidth_class, \
not Default::default()",
);
}
#[tokio::test]
async fn leader_serve_path_uses_class_aware_admission_for_background_request() {
use crate::adapter::net::redex::bandwidth::BandwidthClass;
let inputs = RuntimeInputs {
channel: ChannelIdentity {
channel_name: "test/runtime".to_string(),
origin_hash: 0xCAFE_BABE,
},
channel_id: channel_id_for("test/runtime"),
self_node_id: 0x10,
replica_set: vec![0x10, 0x20],
heartbeat_ms: 60_000,
wall_clock_provider: Arc::new(|| 1_700_000_000_000),
tail_provider: Arc::new(|| 0),
rtt_lookup: Arc::new(|_| Some(Duration::from_millis(5))),
file: build_file_for_tests(),
default_bandwidth_class: BandwidthClass::Foreground,
background_fraction: 0.3,
};
let cid = inputs.channel_id;
let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
for (role, signal) in [
(
ReplicaRole::Replica,
super::super::replication_state::TransitionSignal::CapabilitySelected,
),
(
ReplicaRole::Candidate,
super::super::replication_state::TransitionSignal::MissedHeartbeats,
),
(
ReplicaRole::Leader,
super::super::replication_state::TransitionSignal::ElectionWon,
),
] {
coordinator.transition_to(role, signal).await.unwrap();
}
let budget = Arc::new(Mutex::new(BandwidthBudget::new(1.0, 100, Instant::now())));
let dispatcher = Arc::new(RecorderDispatcher::default());
let event = Inbound::SyncRequest {
from: 0x20,
msg: SyncRequest {
channel_id: cid,
since_seq: 0,
chunk_max: 1024,
request_id: 0,
class: BandwidthClass::Background,
},
};
let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
on_inbound(
&inputs,
&coordinator,
&(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
&build_state(tracker.clone(), budget.clone()),
event,
)
.await;
let nacks = dispatcher.sync_nacks.lock().clone();
assert_eq!(
nacks.len(),
1,
"Background admission must be denied by the class-aware reserve gate; \
pre-fix this admitted via the class-blind try_consume path",
);
let (target, nack) = &nacks[0];
assert_eq!(*target, 0x20);
assert_eq!(
nack.error_code,
super::super::replication::SyncNackError::Backpressure,
"denial must surface as Backpressure NACK so the replica backs off",
);
assert!(
dispatcher.sync_responses.lock().is_empty(),
"denied requests must not leak a SyncResponse",
);
}
#[tokio::test]
async fn leader_serve_path_admits_foreground_under_tight_budget() {
use crate::adapter::net::redex::bandwidth::BandwidthClass;
let inputs = RuntimeInputs {
channel: ChannelIdentity {
channel_name: "test/runtime".to_string(),
origin_hash: 0xCAFE_BABE,
},
channel_id: channel_id_for("test/runtime"),
self_node_id: 0x10,
replica_set: vec![0x10, 0x20],
heartbeat_ms: 60_000,
wall_clock_provider: Arc::new(|| 1_700_000_000_000),
tail_provider: Arc::new(|| 0),
rtt_lookup: Arc::new(|_| Some(Duration::from_millis(5))),
file: build_file_for_tests(),
default_bandwidth_class: BandwidthClass::Foreground,
background_fraction: 0.3,
};
let cid = inputs.channel_id;
let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
for (role, signal) in [
(
ReplicaRole::Replica,
super::super::replication_state::TransitionSignal::CapabilitySelected,
),
(
ReplicaRole::Candidate,
super::super::replication_state::TransitionSignal::MissedHeartbeats,
),
(
ReplicaRole::Leader,
super::super::replication_state::TransitionSignal::ElectionWon,
),
] {
coordinator.transition_to(role, signal).await.unwrap();
}
let budget = Arc::new(Mutex::new(BandwidthBudget::new(1.0, 100, Instant::now())));
let dispatcher = Arc::new(RecorderDispatcher::default());
let event = Inbound::SyncRequest {
from: 0x20,
msg: SyncRequest {
channel_id: cid,
since_seq: 0,
chunk_max: 1024,
request_id: 0,
class: BandwidthClass::Foreground,
},
};
let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
on_inbound(
&inputs,
&coordinator,
&(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
&build_state(tracker.clone(), budget.clone()),
event,
)
.await;
assert!(
dispatcher.sync_nacks.lock().is_empty(),
"Foreground under the same budget must admit (available >= cost)",
);
assert_eq!(
dispatcher.sync_responses.lock().len(),
1,
"Foreground admit must ship a SyncResponse",
);
}
}