use crate::auth::middleware::AuthResult;
use crate::replication::primary::ReplicaState;
use reddb_wire::topology::{Endpoint, ReplicaInfo, Topology};
pub const DEFAULT_REPLICA_TIMEOUT_MS: u128 = 5_000;
pub const TOPOLOGY_READ_CAPABILITY: &str = "cluster:topology:read";
pub struct TopologyAuthGate;
impl TopologyAuthGate {
pub fn allows(auth: &AuthResult) -> bool {
match auth {
AuthResult::Authenticated { .. } => true,
AuthResult::Anonymous => false,
AuthResult::Denied(_) => false,
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct LagConfig {
pub replica_timeout_ms: u128,
pub records_per_ms: Option<f64>,
pub now_unix_ms: u128,
}
impl LagConfig {
pub fn from_now() -> Self {
Self {
replica_timeout_ms: DEFAULT_REPLICA_TIMEOUT_MS,
records_per_ms: None,
now_unix_ms: crate::utils::now_unix_millis() as u128,
}
}
}
pub struct TopologyAdvertiser;
impl TopologyAdvertiser {
pub fn advertise(
replicas: &[ReplicaState],
auth: &AuthResult,
epoch: u64,
primary_endpoint: Endpoint,
primary_current_lsn: u64,
lag: &LagConfig,
) -> Topology {
if !TopologyAuthGate::allows(auth) {
return Topology {
epoch,
primary: primary_endpoint,
replicas: Vec::new(),
};
}
let infos = replicas
.iter()
.map(|r| replica_to_info(r, primary_current_lsn, lag))
.collect();
Topology {
epoch,
primary: primary_endpoint,
replicas: infos,
}
}
}
fn replica_to_info(state: &ReplicaState, primary_lsn: u64, lag: &LagConfig) -> ReplicaInfo {
let healthy = is_healthy(state, lag);
let lag_ms = compute_lag_ms(state, primary_lsn, lag);
ReplicaInfo {
addr: state.id.clone(),
region: state
.region
.clone()
.unwrap_or_else(|| "unknown".to_string()),
healthy,
lag_ms,
last_applied_lsn: state.last_acked_lsn,
}
}
fn is_healthy(state: &ReplicaState, lag: &LagConfig) -> bool {
let last_seen = state.last_seen_at_unix_ms;
if lag.now_unix_ms < last_seen {
return true;
}
(lag.now_unix_ms - last_seen) <= lag.replica_timeout_ms
}
fn compute_lag_ms(state: &ReplicaState, primary_lsn: u64, lag: &LagConfig) -> u32 {
let lag_records = primary_lsn.saturating_sub(state.last_acked_lsn);
if lag_records == 0 {
return 0;
}
let Some(rate) = lag.records_per_ms else {
return u32::MAX;
};
if rate <= 0.0 || !rate.is_finite() {
return u32::MAX;
}
let ms = (lag_records as f64) / rate;
if !ms.is_finite() || ms < 0.0 {
return u32::MAX;
}
if ms >= u32::MAX as f64 {
return u32::MAX;
}
ms.round() as u32
}
#[cfg(test)]
mod tests {
use super::*;
use crate::auth::middleware::AuthSource;
use crate::auth::Role;
fn anonymous() -> AuthResult {
AuthResult::Anonymous
}
fn tenant() -> AuthResult {
AuthResult::Authenticated {
username: "tenant-alice".into(),
role: Role::Read,
source: AuthSource::Password,
}
}
fn operator() -> AuthResult {
AuthResult::Authenticated {
username: "operator-bob".into(),
role: Role::Write,
source: AuthSource::Password,
}
}
fn admin() -> AuthResult {
AuthResult::Authenticated {
username: "admin-root".into(),
role: Role::Admin,
source: AuthSource::Password,
}
}
fn primary_ep() -> Endpoint {
Endpoint {
addr: "primary.example.com:5050".into(),
region: "us-east-1".into(),
}
}
fn replica(id: &str, region: Option<&str>, last_seen_offset_ms: i128) -> ReplicaState {
let now = lag_now_ms();
let last_seen = (now as i128 + last_seen_offset_ms).max(0) as u128;
ReplicaState {
id: id.to_string(),
last_acked_lsn: 100,
last_sent_lsn: 100,
last_durable_lsn: 100,
connected_at_unix_ms: now,
last_seen_at_unix_ms: last_seen,
region: region.map(String::from),
}
}
fn lag_now_ms() -> u128 {
1_700_000_000_000
}
fn lag_default() -> LagConfig {
LagConfig {
replica_timeout_ms: DEFAULT_REPLICA_TIMEOUT_MS,
records_per_ms: None,
now_unix_ms: lag_now_ms(),
}
}
#[test]
fn topology_advertiser_gate_allows_authenticated() {
assert!(TopologyAuthGate::allows(&tenant()));
assert!(TopologyAuthGate::allows(&operator()));
assert!(TopologyAuthGate::allows(&admin()));
}
#[test]
fn topology_advertiser_gate_blocks_anonymous_and_denied() {
assert!(!TopologyAuthGate::allows(&anonymous()));
assert!(!TopologyAuthGate::allows(&AuthResult::Denied(
"nope".into()
)));
}
fn shape_empty() -> Vec<ReplicaState> {
Vec::new()
}
fn shape_one() -> Vec<ReplicaState> {
vec![replica("replica-a:5050", Some("us-east-1"), -100)]
}
fn shape_multi_region() -> Vec<ReplicaState> {
vec![
replica("replica-a:5050", Some("us-east-1"), -100),
replica("replica-b:5050", Some("us-west-2"), -200),
replica("replica-c:5050", Some("eu-central-1"), -300),
]
}
#[test]
fn topology_advertiser_anonymous_gets_primary_only() {
for shape in [shape_empty(), shape_one(), shape_multi_region()] {
let topo = TopologyAdvertiser::advertise(
&shape,
&anonymous(),
42,
primary_ep(),
500,
&lag_default(),
);
assert_eq!(topo.epoch, 42);
assert_eq!(topo.primary, primary_ep());
assert!(
topo.replicas.is_empty(),
"anonymous must never see replicas, got {:?}",
topo.replicas
);
}
}
#[test]
fn topology_advertiser_authenticated_gets_full_list() {
for ctx in [tenant(), operator(), admin()] {
let topo = TopologyAdvertiser::advertise(
&shape_multi_region(),
&ctx,
7,
primary_ep(),
500,
&lag_default(),
);
assert_eq!(topo.epoch, 7);
assert_eq!(topo.replicas.len(), 3);
let regions: Vec<&str> = topo.replicas.iter().map(|r| r.region.as_str()).collect();
assert!(regions.contains(&"us-east-1"));
assert!(regions.contains(&"us-west-2"));
assert!(regions.contains(&"eu-central-1"));
}
}
#[test]
fn topology_advertiser_authenticated_empty_registry_returns_no_replicas() {
let topo = TopologyAdvertiser::advertise(
&shape_empty(),
&admin(),
1,
primary_ep(),
0,
&lag_default(),
);
assert!(topo.replicas.is_empty());
assert_eq!(topo.primary, primary_ep());
}
#[test]
fn topology_advertiser_denied_collapses_to_primary_only() {
let topo = TopologyAdvertiser::advertise(
&shape_multi_region(),
&AuthResult::Denied("invalid token".into()),
9,
primary_ep(),
500,
&lag_default(),
);
assert!(topo.replicas.is_empty());
}
#[test]
fn topology_advertiser_recent_ack_is_healthy() {
let mut shape = shape_one();
shape[0].last_seen_at_unix_ms = lag_now_ms() - 100;
let topo =
TopologyAdvertiser::advertise(&shape, &admin(), 1, primary_ep(), 100, &lag_default());
assert!(topo.replicas[0].healthy);
}
#[test]
fn topology_advertiser_stale_ack_is_unhealthy() {
let mut shape = shape_one();
shape[0].last_seen_at_unix_ms = lag_now_ms() - DEFAULT_REPLICA_TIMEOUT_MS - 1;
let topo =
TopologyAdvertiser::advertise(&shape, &admin(), 1, primary_ep(), 100, &lag_default());
assert!(!topo.replicas[0].healthy);
}
#[test]
fn topology_advertiser_lag_unknown_reports_u32_max() {
let mut shape = shape_one();
shape[0].last_acked_lsn = 50;
let topo =
TopologyAdvertiser::advertise(&shape, &admin(), 1, primary_ep(), 500, &lag_default());
assert_eq!(topo.replicas[0].lag_ms, u32::MAX);
}
#[test]
fn topology_advertiser_lag_zero_when_replica_caught_up() {
let mut shape = shape_one();
shape[0].last_acked_lsn = 500;
let topo =
TopologyAdvertiser::advertise(&shape, &admin(), 1, primary_ep(), 500, &lag_default());
assert_eq!(topo.replicas[0].lag_ms, 0);
}
#[test]
fn topology_advertiser_lag_uses_progress_estimate_when_provided() {
let mut shape = shape_one();
shape[0].last_acked_lsn = 400;
let lag = LagConfig {
records_per_ms: Some(10.0), ..lag_default()
};
let topo = TopologyAdvertiser::advertise(&shape, &admin(), 1, primary_ep(), 500, &lag);
assert_eq!(topo.replicas[0].lag_ms, 10);
}
#[test]
fn topology_advertiser_lag_zero_rate_falls_back_to_u32_max() {
let mut shape = shape_one();
shape[0].last_acked_lsn = 50;
let lag = LagConfig {
records_per_ms: Some(0.0),
..lag_default()
};
let topo = TopologyAdvertiser::advertise(&shape, &admin(), 1, primary_ep(), 500, &lag);
assert_eq!(topo.replicas[0].lag_ms, u32::MAX);
}
#[test]
fn topology_advertiser_propagates_epoch_verbatim() {
for epoch in [0, 1, 42, u64::MAX] {
let topo = TopologyAdvertiser::advertise(
&shape_one(),
&admin(),
epoch,
primary_ep(),
100,
&lag_default(),
);
assert_eq!(topo.epoch, epoch);
}
}
#[test]
fn topology_advertiser_output_round_trips_through_canonical_encoder() {
use reddb_wire::topology::{decode_topology, encode_topology};
let topo = TopologyAdvertiser::advertise(
&shape_multi_region(),
&admin(),
13,
primary_ep(),
500,
&lag_default(),
);
let bytes = encode_topology(&topo);
let decoded = decode_topology(&bytes).expect("decode").expect("v1");
assert_eq!(decoded, topo);
}
#[test]
fn topology_advertiser_output_round_trips_through_hello_ack_wrapper() {
use reddb_wire::topology::{decode_topology_from_hello_ack, encode_topology_for_hello_ack};
let topo = TopologyAdvertiser::advertise(
&shape_multi_region(),
&operator(),
21,
primary_ep(),
500,
&lag_default(),
);
let field = encode_topology_for_hello_ack(&topo);
let decoded = decode_topology_from_hello_ack(&field)
.expect("decode")
.expect("v1");
assert_eq!(decoded, topo);
}
}