use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;
use parking_lot::{Mutex, RwLock};
use super::replication::ReplicaRole;
use super::replication_config::ReplicationConfig;
use super::replication_metrics::{ChannelMetricsAtomic, ReplicationMetricsRegistry};
use super::replication_state::{StateTransition, StateTransitionError, TransitionSignal};
use crate::adapter::net::MeshNode;
use crate::error::AdapterError;
#[async_trait::async_trait]
pub trait ChainTagSink: Send + Sync {
async fn announce_chain(&self, origin_hash: u64, tip_seq: u64) -> Result<(), AdapterError>;
async fn withdraw_chain(&self, origin_hash: u64) -> Result<(), AdapterError>;
}
#[async_trait::async_trait]
impl ChainTagSink for MeshNode {
async fn announce_chain(&self, origin_hash: u64, tip_seq: u64) -> Result<(), AdapterError> {
MeshNode::announce_chain(self, origin_hash, tip_seq).await
}
async fn withdraw_chain(&self, origin_hash: u64) -> Result<(), AdapterError> {
MeshNode::withdraw_chain(self, origin_hash).await
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
#[non_exhaustive]
pub enum ReplicaTransitionEvent {
BecameHolder {
origin_hash: u64,
at: Instant,
},
Idled {
origin_hash: u64,
at: Instant,
},
LeaderChanged {
origin_hash: u64,
at: Instant,
},
LeaderLost {
origin_hash: u64,
at: Instant,
},
LeaderLostAndIdled {
origin_hash: u64,
at: Instant,
},
BecameHolderAndLeader {
origin_hash: u64,
at: Instant,
},
}
pub trait ReplicaTransitionObserver: Send + Sync + 'static {
fn observe(&self, event: ReplicaTransitionEvent);
}
#[derive(Debug, thiserror::Error)]
pub enum CoordinatorError {
#[error("invalid state transition: {0}")]
Transition(#[from] StateTransitionError),
#[error("chain-tag side-effect failed: {0}")]
TagSink(#[source] AdapterError),
}
#[derive(Debug, Clone)]
pub struct ChannelIdentity {
pub channel_name: String,
pub origin_hash: u64,
}
pub struct ReplicationCoordinator {
channel: ChannelIdentity,
config: ReplicationConfig,
sink: Arc<dyn ChainTagSink>,
metrics: Arc<ChannelMetricsAtomic>,
state: Mutex<ReplicaRole>,
tail_seq: AtomicU64,
transition_lock: tokio::sync::Mutex<()>,
observer: RwLock<Option<Arc<dyn ReplicaTransitionObserver>>>,
}
impl ReplicationCoordinator {
pub fn new(
channel: ChannelIdentity,
config: ReplicationConfig,
sink: Arc<dyn ChainTagSink>,
registry: &ReplicationMetricsRegistry,
) -> Self {
let metrics = registry.for_channel(&channel.channel_name);
Self {
channel,
config,
sink,
metrics,
state: Mutex::new(ReplicaRole::Idle),
tail_seq: AtomicU64::new(0),
transition_lock: tokio::sync::Mutex::new(()),
observer: RwLock::new(None),
}
}
pub fn set_transition_observer(
&self,
observer: Option<Arc<dyn ReplicaTransitionObserver>>,
) -> Option<Arc<dyn ReplicaTransitionObserver>> {
let mut guard = self.observer.write();
std::mem::replace(&mut *guard, observer)
}
pub fn has_transition_observer(&self) -> bool {
self.observer.read().is_some()
}
fn fire_transition(&self, event: ReplicaTransitionEvent) {
if let Some(observer) = self.observer.read().clone() {
observer.observe(event);
}
}
pub fn role(&self) -> ReplicaRole {
*self.state.lock()
}
pub fn tail_seq(&self) -> u64 {
self.tail_seq.load(Ordering::Relaxed)
}
pub fn record_tail_seq(&self, seq: u64) {
let mut current = self.tail_seq.load(Ordering::Relaxed);
while seq > current {
match self.tail_seq.compare_exchange_weak(
current,
seq,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => return,
Err(now) => current = now,
}
}
}
pub fn channel(&self) -> &ChannelIdentity {
&self.channel
}
pub fn config(&self) -> &ReplicationConfig {
&self.config
}
pub fn metrics(&self) -> &ChannelMetricsAtomic {
&self.metrics
}
pub async fn transition_to(
&self,
target: ReplicaRole,
signal: TransitionSignal,
) -> Result<Option<StateTransition>, CoordinatorError> {
let _guard = self.transition_lock.lock().await;
let transition = {
let mut state = self.state.lock();
let from = *state;
if from == ReplicaRole::Idle
&& target == ReplicaRole::Idle
&& signal == TransitionSignal::ChannelClose
{
return Ok(None);
}
let t = StateTransition::apply(from, target, signal)?;
*state = target;
t
};
if transition.to == ReplicaRole::Leader {
self.metrics.incr_leader_change();
}
if matches!(transition.signal, TransitionSignal::MissedHeartbeats) {
self.metrics.incr_election_thrash();
}
let origin = self.channel.origin_hash;
let is_withdraw = transition.to == ReplicaRole::Idle;
let result = match (transition.from, transition.to) {
(ReplicaRole::Idle, ReplicaRole::Replica)
| (ReplicaRole::Candidate, ReplicaRole::Leader) => {
let tip = self.tail_seq.load(Ordering::Relaxed);
self.sink.announce_chain(origin, tip).await
}
(_, ReplicaRole::Idle) => self.sink.withdraw_chain(origin).await,
_ => Ok(()),
};
if let Err(e) = result {
if is_withdraw {
self.metrics.incr_announce_divergence();
tracing::warn!(
origin = format!("{:#x}", origin),
from = ?transition.from,
error = %e,
"replication coordinator: state advanced to Idle but sink withdraw failed; \
advertised-vs-local divergence until next transition_to or cancel()",
);
}
return Err(CoordinatorError::TagSink(e));
}
let at = Instant::now();
match (transition.from, transition.to) {
(ReplicaRole::Idle, ReplicaRole::Replica) => {
self.fire_transition(ReplicaTransitionEvent::BecameHolder {
origin_hash: origin,
at,
});
}
(ReplicaRole::Idle, ReplicaRole::Leader) => {
self.fire_transition(ReplicaTransitionEvent::BecameHolderAndLeader {
origin_hash: origin,
at,
});
}
(ReplicaRole::Leader, ReplicaRole::Idle) => {
self.fire_transition(ReplicaTransitionEvent::LeaderLostAndIdled {
origin_hash: origin,
at,
});
}
(_, ReplicaRole::Idle) => {
self.fire_transition(ReplicaTransitionEvent::Idled {
origin_hash: origin,
at,
});
}
_ => {}
}
if matches!(
(transition.from, transition.to),
(ReplicaRole::Replica, ReplicaRole::Leader)
| (ReplicaRole::Candidate, ReplicaRole::Leader)
) {
self.fire_transition(ReplicaTransitionEvent::LeaderChanged {
origin_hash: origin,
at,
});
}
if matches!(
(transition.from, transition.to),
(ReplicaRole::Leader, ReplicaRole::Replica)
) {
self.fire_transition(ReplicaTransitionEvent::LeaderLost {
origin_hash: origin,
at,
});
}
Ok(Some(transition))
}
}
#[cfg(test)]
mod tests {
use super::*;
use parking_lot::Mutex as ParkingMutex;
#[derive(Default)]
struct RecorderSink {
calls: ParkingMutex<Vec<SinkCall>>,
fail_next: ParkingMutex<Option<AdapterError>>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
enum SinkCall {
Announce { origin_hash: u64, tip_seq: u64 },
Withdraw { origin_hash: u64 },
}
impl RecorderSink {
fn calls(&self) -> Vec<SinkCall> {
self.calls.lock().clone()
}
fn arm_failure(&self, err: AdapterError) {
*self.fail_next.lock() = Some(err);
}
}
#[async_trait::async_trait]
impl ChainTagSink for RecorderSink {
async fn announce_chain(&self, origin_hash: u64, tip_seq: u64) -> Result<(), AdapterError> {
if let Some(err) = self.fail_next.lock().take() {
return Err(err);
}
self.calls.lock().push(SinkCall::Announce {
origin_hash,
tip_seq,
});
Ok(())
}
async fn withdraw_chain(&self, origin_hash: u64) -> Result<(), AdapterError> {
if let Some(err) = self.fail_next.lock().take() {
return Err(err);
}
self.calls.lock().push(SinkCall::Withdraw { origin_hash });
Ok(())
}
}
fn build_coordinator() -> (
Arc<RecorderSink>,
ReplicationMetricsRegistry,
ReplicationCoordinator,
) {
let sink = Arc::new(RecorderSink::default());
let registry = ReplicationMetricsRegistry::new();
let coordinator = ReplicationCoordinator::new(
ChannelIdentity {
channel_name: "payments/settlements".to_string(),
origin_hash: 0xCAFE_BABE_DEAD_BEEF,
},
ReplicationConfig::new(),
sink.clone() as Arc<dyn ChainTagSink>,
®istry,
);
(sink, registry, coordinator)
}
#[tokio::test]
async fn starts_in_idle_with_zero_tail() {
let (_, _, c) = build_coordinator();
assert_eq!(c.role(), ReplicaRole::Idle);
assert_eq!(c.tail_seq(), 0);
}
#[tokio::test]
async fn idle_to_replica_announces_chain() {
let (sink, _, c) = build_coordinator();
c.record_tail_seq(42);
c.transition_to(ReplicaRole::Replica, TransitionSignal::CapabilitySelected)
.await
.expect("valid transition");
assert_eq!(c.role(), ReplicaRole::Replica);
assert_eq!(
sink.calls(),
vec![SinkCall::Announce {
origin_hash: 0xCAFE_BABE_DEAD_BEEF,
tip_seq: 42,
}],
);
}
#[tokio::test]
async fn candidate_to_leader_announces_chain() {
let (sink, _, c) = build_coordinator();
c.transition_to(ReplicaRole::Replica, TransitionSignal::CapabilitySelected)
.await
.unwrap();
c.transition_to(ReplicaRole::Candidate, TransitionSignal::MissedHeartbeats)
.await
.unwrap();
c.record_tail_seq(999);
c.transition_to(ReplicaRole::Leader, TransitionSignal::ElectionWon)
.await
.unwrap();
assert_eq!(c.role(), ReplicaRole::Leader);
let calls = sink.calls();
assert_eq!(calls.len(), 2);
assert!(matches!(calls[0], SinkCall::Announce { tip_seq: 0, .. }));
assert!(matches!(calls[1], SinkCall::Announce { tip_seq: 999, .. }));
}
#[tokio::test]
async fn candidate_does_not_emit_tag_side_effect() {
let (sink, _, c) = build_coordinator();
c.transition_to(ReplicaRole::Replica, TransitionSignal::CapabilitySelected)
.await
.unwrap();
let baseline = sink.calls().len();
c.transition_to(ReplicaRole::Candidate, TransitionSignal::MissedHeartbeats)
.await
.unwrap();
assert_eq!(sink.calls().len(), baseline, "Candidate must not emit tags");
}
#[tokio::test]
async fn candidate_to_replica_no_tag_side_effect() {
let (sink, _, c) = build_coordinator();
c.transition_to(ReplicaRole::Replica, TransitionSignal::CapabilitySelected)
.await
.unwrap();
c.transition_to(ReplicaRole::Candidate, TransitionSignal::MissedHeartbeats)
.await
.unwrap();
let baseline = sink.calls().len();
c.transition_to(ReplicaRole::Replica, TransitionSignal::ElectionLost)
.await
.unwrap();
assert_eq!(
sink.calls().len(),
baseline,
"Candidate→Replica should not double-announce"
);
}
#[tokio::test]
async fn leader_to_idle_withdraws_chain() {
let (sink, _, c) = build_coordinator();
c.transition_to(ReplicaRole::Replica, TransitionSignal::CapabilitySelected)
.await
.unwrap();
c.transition_to(ReplicaRole::Candidate, TransitionSignal::MissedHeartbeats)
.await
.unwrap();
c.transition_to(ReplicaRole::Leader, TransitionSignal::ElectionWon)
.await
.unwrap();
c.transition_to(ReplicaRole::Idle, TransitionSignal::GracefulRelinquish)
.await
.unwrap();
let calls = sink.calls();
let last = calls.last().expect("at least one call");
assert_eq!(
*last,
SinkCall::Withdraw {
origin_hash: 0xCAFE_BABE_DEAD_BEEF,
},
"graceful relinquish must withdraw the chain tag",
);
}
#[tokio::test]
async fn replica_to_idle_disk_pressure_withdraws() {
let (sink, _, c) = build_coordinator();
c.transition_to(ReplicaRole::Replica, TransitionSignal::CapabilitySelected)
.await
.unwrap();
c.transition_to(ReplicaRole::Idle, TransitionSignal::DiskPressureWithdraw)
.await
.unwrap();
let calls = sink.calls();
assert_eq!(
*calls.last().unwrap(),
SinkCall::Withdraw {
origin_hash: 0xCAFE_BABE_DEAD_BEEF,
},
);
}
#[tokio::test]
async fn channel_close_from_idle_is_idempotent_noop() {
let (sink, registry, c) = build_coordinator();
let result = c
.transition_to(ReplicaRole::Idle, TransitionSignal::ChannelClose)
.await
.unwrap();
assert!(result.is_none(), "idempotent close must return None");
assert!(sink.calls().is_empty());
let snapshot = registry.snapshot();
assert_eq!(snapshot.channels.len(), 1);
let c_metrics = &snapshot.channels[0];
assert_eq!(c_metrics.leader_changes_total, 0);
assert_eq!(c_metrics.election_thrash_total, 0);
}
#[tokio::test]
async fn channel_close_from_active_state_withdraws() {
let (sink, _, c) = build_coordinator();
c.transition_to(ReplicaRole::Replica, TransitionSignal::CapabilitySelected)
.await
.unwrap();
c.transition_to(ReplicaRole::Idle, TransitionSignal::ChannelClose)
.await
.unwrap();
let calls = sink.calls();
assert!(matches!(*calls.last().unwrap(), SinkCall::Withdraw { .. }));
}
#[tokio::test]
async fn invalid_transition_does_not_mutate_state() {
let (sink, _, c) = build_coordinator();
let err = c
.transition_to(ReplicaRole::Leader, TransitionSignal::ElectionWon)
.await
.expect_err("Idle→Leader must reject");
assert!(matches!(err, CoordinatorError::Transition(_)));
assert_eq!(c.role(), ReplicaRole::Idle, "state must not advance");
assert!(sink.calls().is_empty(), "no side-effect on rejection");
}
#[tokio::test]
async fn record_tail_seq_is_monotonic() {
let (_, _, c) = build_coordinator();
c.record_tail_seq(100);
assert_eq!(c.tail_seq(), 100);
c.record_tail_seq(50); assert_eq!(c.tail_seq(), 100);
c.record_tail_seq(200);
assert_eq!(c.tail_seq(), 200);
}
#[tokio::test]
async fn metric_increments_on_leader_entry() {
let (_, registry, c) = build_coordinator();
c.transition_to(ReplicaRole::Replica, TransitionSignal::CapabilitySelected)
.await
.unwrap();
c.transition_to(ReplicaRole::Candidate, TransitionSignal::MissedHeartbeats)
.await
.unwrap();
c.transition_to(ReplicaRole::Leader, TransitionSignal::ElectionWon)
.await
.unwrap();
let snap = registry.snapshot();
let row = snap.channel("payments/settlements").unwrap();
assert_eq!(row.leader_changes_total, 1);
assert_eq!(row.election_thrash_total, 1, "MissedHeartbeats triggered");
}
#[tokio::test]
async fn metric_increments_on_repeat_leader_entries() {
let (_, registry, c) = build_coordinator();
c.transition_to(ReplicaRole::Replica, TransitionSignal::CapabilitySelected)
.await
.unwrap();
c.transition_to(ReplicaRole::Candidate, TransitionSignal::MissedHeartbeats)
.await
.unwrap();
c.transition_to(ReplicaRole::Leader, TransitionSignal::ElectionWon)
.await
.unwrap();
c.transition_to(ReplicaRole::Idle, TransitionSignal::GracefulRelinquish)
.await
.unwrap();
c.transition_to(ReplicaRole::Replica, TransitionSignal::CapabilitySelected)
.await
.unwrap();
c.transition_to(ReplicaRole::Candidate, TransitionSignal::MissedHeartbeats)
.await
.unwrap();
c.transition_to(ReplicaRole::Leader, TransitionSignal::ElectionWon)
.await
.unwrap();
let snap = registry.snapshot();
let row = snap.channel("payments/settlements").unwrap();
assert_eq!(row.leader_changes_total, 2);
}
#[tokio::test]
async fn concurrent_transitions_serialize_chain_tag_side_effects() {
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
struct BarrierSink {
calls: tokio::sync::Mutex<Vec<SinkCall>>,
in_flight: AtomicUsize,
max_in_flight: AtomicUsize,
}
#[async_trait::async_trait]
impl ChainTagSink for BarrierSink {
async fn announce_chain(
&self,
origin_hash: u64,
tip_seq: u64,
) -> Result<(), AdapterError> {
let n = self.in_flight.fetch_add(1, AtomicOrdering::SeqCst) + 1;
self.max_in_flight.fetch_max(n, AtomicOrdering::SeqCst);
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
self.calls.lock().await.push(SinkCall::Announce {
origin_hash,
tip_seq,
});
self.in_flight.fetch_sub(1, AtomicOrdering::SeqCst);
Ok(())
}
async fn withdraw_chain(&self, origin_hash: u64) -> Result<(), AdapterError> {
let n = self.in_flight.fetch_add(1, AtomicOrdering::SeqCst) + 1;
self.max_in_flight.fetch_max(n, AtomicOrdering::SeqCst);
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
self.calls
.lock()
.await
.push(SinkCall::Withdraw { origin_hash });
self.in_flight.fetch_sub(1, AtomicOrdering::SeqCst);
Ok(())
}
}
let sink = Arc::new(BarrierSink {
calls: tokio::sync::Mutex::new(Vec::new()),
in_flight: AtomicUsize::new(0),
max_in_flight: AtomicUsize::new(0),
});
let registry = ReplicationMetricsRegistry::new();
let coord = Arc::new(ReplicationCoordinator::new(
ChannelIdentity {
channel_name: "concurrent/serialize".to_string(),
origin_hash: 0xC0FFEE,
},
ReplicationConfig::new(),
sink.clone() as Arc<dyn ChainTagSink>,
®istry,
));
let c1 = coord.clone();
let t1 = tokio::spawn(async move {
c1.transition_to(ReplicaRole::Replica, TransitionSignal::CapabilitySelected)
.await
});
let c2 = coord.clone();
let t2 = tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(2)).await;
c2.transition_to(ReplicaRole::Idle, TransitionSignal::DiskPressureWithdraw)
.await
});
let (r1, r2) = tokio::join!(t1, t2);
r1.unwrap().expect("T1 transition succeeds");
r2.unwrap().expect("T2 transition succeeds");
let max_concurrent = sink.max_in_flight.load(AtomicOrdering::SeqCst);
assert_eq!(
max_concurrent, 1,
"transition_lock must serialize sink calls (observed max in-flight = {max_concurrent})"
);
let calls = sink.calls.lock().await.clone();
assert_eq!(calls.len(), 2);
assert!(matches!(calls[0], SinkCall::Announce { .. }));
assert!(matches!(calls[1], SinkCall::Withdraw { .. }));
}
#[tokio::test]
async fn tag_sink_failure_surfaces_but_state_mutated() {
let (sink, _, c) = build_coordinator();
c.transition_to(ReplicaRole::Replica, TransitionSignal::CapabilitySelected)
.await
.unwrap();
sink.arm_failure(AdapterError::Transient(
"simulated network blip".to_string(),
));
let err = c
.transition_to(ReplicaRole::Idle, TransitionSignal::DiskPressureWithdraw)
.await
.expect_err("must surface sink failure");
assert!(matches!(err, CoordinatorError::TagSink(_)));
assert_eq!(c.role(), ReplicaRole::Idle);
}
#[tokio::test]
async fn tag_sink_failure_bumps_divergence_counter() {
use std::sync::atomic::Ordering as AtomicOrdering;
let (sink, _, c) = build_coordinator();
c.transition_to(ReplicaRole::Replica, TransitionSignal::CapabilitySelected)
.await
.unwrap();
let before = c
.metrics()
.announce_divergence_total
.load(AtomicOrdering::Relaxed);
sink.arm_failure(AdapterError::Transient(
"simulated network blip".to_string(),
));
let _ = c
.transition_to(ReplicaRole::Idle, TransitionSignal::DiskPressureWithdraw)
.await
.expect_err("must surface sink failure");
let after = c
.metrics()
.announce_divergence_total
.load(AtomicOrdering::Relaxed);
assert_eq!(
after,
before + 1,
"announce_divergence_total must bump by exactly 1 on the failed withdraw"
);
}
}