reddb_server/replication/topology_advertiser.rs
1//! Server-side `TopologyAdvertiser` (issue #167).
2//!
3//! Produces the canonical `Topology` payload (defined in
4//! `reddb-wire::topology`) from the live primary replica registry,
5//! gated by the `cluster:topology:read` capability per ADR 0008.
6//!
7//! ## Auth gate
8//!
9//! ADR 0008 §1–§3 specifies one capability — `cluster:topology:read` —
10//! evaluated against the principal that opened the connection.
11//! Authenticated principals get the capability by default, anonymous
12//! callers do not. The advertiser collapses to a *primary-only* payload
13//! when the gate denies (ADR 0008 §3) — a `Topology { primary, replicas: [] }`
14//! that still carries the write-path endpoint so unauthenticated
15//! bootstrap keeps working without leaking the replica fleet.
16//!
17//! ## Health + lag
18//!
19//! * `healthy = (now_ms - last_seen_at_unix_ms) <= replica_timeout_ms`
20//! * `lag_ms = primary.current_lsn - replica.last_applied_lsn` mapped
21//! to milliseconds via the recent commit-rate estimate. When the
22//! estimate cannot be produced (no commit history yet), the field
23//! reports `u32::MAX` per the issue spec — the consumer side treats
24//! that as "lag unknown, fall back to URI-only routing for this
25//! replica".
26//!
27//! ## Module shape
28//!
29//! `TopologyAdvertiser` is the deep module: callers pass in the
30//! replica snapshot, an auth context, the current epoch, the primary
31//! endpoint, and a configuration knob for lag/health. It returns a
32//! `Topology` ready for the wire encoder. The auth predicate is
33//! extracted as `TopologyAuthGate` so it can be unit-tested in
34//! isolation without booting the rest of the advertiser.
35
36use crate::auth::middleware::AuthResult;
37use crate::replication::primary::ReplicaState;
38use reddb_wire::topology::{Endpoint, ReplicaInfo, Topology};
39
40/// Default replica heartbeat timeout used when an operator hasn't
41/// configured one explicitly. Matches the order of the `poll_interval_ms`
42/// default in `ReplicationConfig` (100 ms) multiplied by a generous
43/// fudge factor — five seconds without an ack flips a replica to
44/// `healthy: false`. Operators tune this via `LagConfig`.
45pub const DEFAULT_REPLICA_TIMEOUT_MS: u128 = 5_000;
46
47/// Capability name from ADR 0008 §1.
48///
49/// Kept as a constant rather than scattered string literals so a
50/// future capability-engine integration can grep for one symbol.
51pub const TOPOLOGY_READ_CAPABILITY: &str = "cluster:topology:read";
52
53// ---------------------------------------------------------------
54// TopologyAuthGate
55// ---------------------------------------------------------------
56
57/// Predicate over the caller's auth context — answers "does this
58/// principal have `cluster:topology:read`?".
59///
60/// Extracted so the gate can be unit-tested in isolation, the way
61/// ADR 0008 §1 wants ("one capability, one check, one place to
62/// grep"). The advertiser composes this gate, never reimplements it.
63///
64/// Until the capability engine lands, the policy is the one approved
65/// in ADR 0008 §2: every authenticated principal carries the
66/// capability by default; anonymous and denied callers do not.
67pub struct TopologyAuthGate;
68
69impl TopologyAuthGate {
70 /// `true` if the principal carries `cluster:topology:read` and
71 /// should receive the full topology. `false` collapses the
72 /// advertiser output to primary-only per ADR 0008 §3.
73 pub fn allows(auth: &AuthResult) -> bool {
74 match auth {
75 AuthResult::Authenticated { .. } => true,
76 AuthResult::Anonymous => false,
77 AuthResult::Denied(_) => false,
78 }
79 }
80}
81
82// ---------------------------------------------------------------
83// LagConfig
84// ---------------------------------------------------------------
85
86/// Knobs for the lag/health computation. Kept as a small struct so
87/// the call sites (gRPC `topology` RPC, RedWire HelloAck builder)
88/// thread the same defaults without each one redeclaring constants.
89#[derive(Debug, Clone, Copy)]
90pub struct LagConfig {
91 /// Replica heartbeats older than this flip `healthy` to `false`.
92 pub replica_timeout_ms: u128,
93 /// Recent-progress estimate: how many WAL records the cluster
94 /// applies per millisecond on average. `None` → lag conversion
95 /// degrades gracefully to `u32::MAX` (issue spec: "if not
96 /// estimable").
97 pub records_per_ms: Option<f64>,
98 /// `now` in unix milliseconds. Threaded explicitly so tests can
99 /// pin a deterministic clock without reaching for a global.
100 pub now_unix_ms: u128,
101}
102
103impl LagConfig {
104 /// Sensible default for production callers — `replica_timeout`
105 /// at the module default, no progress estimate (so lag reports
106 /// `u32::MAX`), `now` from the system clock.
107 pub fn from_now() -> Self {
108 Self {
109 replica_timeout_ms: DEFAULT_REPLICA_TIMEOUT_MS,
110 records_per_ms: None,
111 now_unix_ms: crate::utils::now_unix_millis() as u128,
112 }
113 }
114}
115
116// ---------------------------------------------------------------
117// TopologyAdvertiser
118// ---------------------------------------------------------------
119
120/// Server-side advertiser. Zero-sized — all state is threaded
121/// through `advertise()`'s arguments so callers control the
122/// snapshot semantics.
123pub struct TopologyAdvertiser;
124
125impl TopologyAdvertiser {
126 /// Build the canonical `Topology` payload for the given caller.
127 ///
128 /// * `replicas` — snapshot of the primary's replica registry
129 /// (`PrimaryReplication::replica_snapshots()`).
130 /// * `auth` — caller's resolved auth context. Drives the
131 /// capability gate (ADR 0008 §1).
132 /// * `epoch` — registry-change epoch; clients use this to detect
133 /// stale advertisements.
134 /// * `primary_endpoint` — what the primary advertises about
135 /// itself. Always returned regardless of auth (ADR 0008 §3).
136 /// * `primary_current_lsn` — primary's current WAL LSN, used as
137 /// the reference for the per-replica lag computation.
138 /// * `lag` — knobs for the lag/health translation.
139 pub fn advertise(
140 replicas: &[ReplicaState],
141 auth: &AuthResult,
142 epoch: u64,
143 primary_endpoint: Endpoint,
144 primary_current_lsn: u64,
145 lag: &LagConfig,
146 ) -> Topology {
147 // ADR 0008 §3: anonymous / denied → primary-only.
148 if !TopologyAuthGate::allows(auth) {
149 return Topology {
150 epoch,
151 primary: primary_endpoint,
152 replicas: Vec::new(),
153 };
154 }
155
156 let infos = replicas
157 .iter()
158 .map(|r| replica_to_info(r, primary_current_lsn, lag))
159 .collect();
160 Topology {
161 epoch,
162 primary: primary_endpoint,
163 replicas: infos,
164 }
165 }
166}
167
168// ---------------------------------------------------------------
169// Internal helpers
170// ---------------------------------------------------------------
171
172fn replica_to_info(state: &ReplicaState, primary_lsn: u64, lag: &LagConfig) -> ReplicaInfo {
173 let healthy = is_healthy(state, lag);
174 let lag_ms = compute_lag_ms(state, primary_lsn, lag);
175 ReplicaInfo {
176 // The replica `id` doubles as the gRPC address handed in at
177 // registration time. Keeping the same field lets the consumer
178 // dial the replica directly without a second lookup table.
179 addr: state.id.clone(),
180 region: state
181 .region
182 .clone()
183 .unwrap_or_else(|| "unknown".to_string()),
184 healthy,
185 lag_ms,
186 last_applied_lsn: state.last_acked_lsn,
187 // Surface the replica's re-bootstrap state so the consumer's
188 // routing table can exclude it from causal reads (issue #837).
189 rebootstrapping: state.rebootstrapping,
190 }
191}
192
193fn is_healthy(state: &ReplicaState, lag: &LagConfig) -> bool {
194 let last_seen = state.last_seen_at_unix_ms;
195 if lag.now_unix_ms < last_seen {
196 // Clock skew — treat the most recent ack as fresh rather
197 // than flag a healthy replica as stale.
198 return true;
199 }
200 (lag.now_unix_ms - last_seen) <= lag.replica_timeout_ms
201}
202
203fn compute_lag_ms(state: &ReplicaState, primary_lsn: u64, lag: &LagConfig) -> u32 {
204 let lag_records = primary_lsn.saturating_sub(state.last_acked_lsn);
205 if lag_records == 0 {
206 return 0;
207 }
208 let Some(rate) = lag.records_per_ms else {
209 // Issue spec: not estimable → u32::MAX.
210 return u32::MAX;
211 };
212 if rate <= 0.0 || !rate.is_finite() {
213 return u32::MAX;
214 }
215 let ms = (lag_records as f64) / rate;
216 if !ms.is_finite() || ms < 0.0 {
217 return u32::MAX;
218 }
219 if ms >= u32::MAX as f64 {
220 return u32::MAX;
221 }
222 ms.round() as u32
223}
224
225// ---------------------------------------------------------------
226// Tests
227// ---------------------------------------------------------------
228
229#[cfg(test)]
230mod tests {
231 use super::*;
232 use crate::auth::middleware::AuthSource;
233 use crate::auth::Role;
234
235 // -----------------------------------------------------------
236 // Auth-context fixtures: the four canonical principals from
237 // ADR 0008 (anonymous, tenant, operator, admin). The gate is
238 // role-blind today — ADR §1 says one capability, not a role
239 // ladder — so all three authenticated principals collapse to
240 // "has capability". We still keep the fixtures distinct so
241 // future capability-engine integration can override per-role
242 // behaviour without rewriting the test matrix.
243 // -----------------------------------------------------------
244
245 fn anonymous() -> AuthResult {
246 AuthResult::Anonymous
247 }
248
249 fn tenant() -> AuthResult {
250 AuthResult::Authenticated {
251 username: "tenant-alice".into(),
252 role: Role::Read,
253 source: AuthSource::Password,
254 }
255 }
256
257 fn operator() -> AuthResult {
258 AuthResult::Authenticated {
259 username: "operator-bob".into(),
260 role: Role::Write,
261 source: AuthSource::Password,
262 }
263 }
264
265 fn admin() -> AuthResult {
266 AuthResult::Authenticated {
267 username: "admin-root".into(),
268 role: Role::Admin,
269 source: AuthSource::Password,
270 }
271 }
272
273 fn primary_ep() -> Endpoint {
274 Endpoint {
275 addr: "primary.example.com:5050".into(),
276 region: "us-east-1".into(),
277 }
278 }
279
280 fn replica(id: &str, region: Option<&str>, last_seen_offset_ms: i128) -> ReplicaState {
281 // last_seen_offset_ms is measured against `lag_now_ms()` —
282 // negative values mean "older than now", positive means
283 // "in the future" (clock skew test).
284 let now = lag_now_ms();
285 let last_seen = (now as i128 + last_seen_offset_ms).max(0) as u128;
286 ReplicaState {
287 id: id.to_string(),
288 last_acked_lsn: 100,
289 last_sent_lsn: 100,
290 last_durable_lsn: 100,
291 apply_error_count: 0,
292 divergence_count: 0,
293 connected_at_unix_ms: now,
294 last_seen_at_unix_ms: last_seen,
295 region: region.map(String::from),
296 rebootstrapping: false,
297 }
298 }
299
300 fn lag_now_ms() -> u128 {
301 // Pinned clock so health computations are deterministic.
302 1_700_000_000_000
303 }
304
305 fn lag_default() -> LagConfig {
306 LagConfig {
307 replica_timeout_ms: DEFAULT_REPLICA_TIMEOUT_MS,
308 records_per_ms: None,
309 now_unix_ms: lag_now_ms(),
310 }
311 }
312
313 // -----------------------------------------------------------
314 // Auth gate (predicate-only, separate from the advertiser).
315 // -----------------------------------------------------------
316
317 #[test]
318 fn topology_advertiser_gate_allows_authenticated() {
319 assert!(TopologyAuthGate::allows(&tenant()));
320 assert!(TopologyAuthGate::allows(&operator()));
321 assert!(TopologyAuthGate::allows(&admin()));
322 }
323
324 #[test]
325 fn topology_advertiser_gate_blocks_anonymous_and_denied() {
326 assert!(!TopologyAuthGate::allows(&anonymous()));
327 assert!(!TopologyAuthGate::allows(&AuthResult::Denied(
328 "nope".into()
329 )));
330 }
331
332 // -----------------------------------------------------------
333 // Auth × registry-shape matrix (issue spec §"Tests" first
334 // bullet). 4 principals × 3 shapes (empty, 1 replica,
335 // multi-region).
336 // -----------------------------------------------------------
337
338 fn shape_empty() -> Vec<ReplicaState> {
339 Vec::new()
340 }
341
342 fn shape_one() -> Vec<ReplicaState> {
343 vec![replica("replica-a:5050", Some("us-east-1"), -100)]
344 }
345
346 fn shape_multi_region() -> Vec<ReplicaState> {
347 vec![
348 replica("replica-a:5050", Some("us-east-1"), -100),
349 replica("replica-b:5050", Some("us-west-2"), -200),
350 replica("replica-c:5050", Some("eu-central-1"), -300),
351 ]
352 }
353
354 #[test]
355 fn topology_advertiser_anonymous_gets_primary_only() {
356 // ADR 0008 §3: every shape collapses to primary-only for
357 // the unauthenticated caller — including the multi-region
358 // case where the disclosure-leak risk is highest.
359 for shape in [shape_empty(), shape_one(), shape_multi_region()] {
360 let topo = TopologyAdvertiser::advertise(
361 &shape,
362 &anonymous(),
363 42,
364 primary_ep(),
365 500,
366 &lag_default(),
367 );
368 assert_eq!(topo.epoch, 42);
369 assert_eq!(topo.primary, primary_ep());
370 assert!(
371 topo.replicas.is_empty(),
372 "anonymous must never see replicas, got {:?}",
373 topo.replicas
374 );
375 }
376 }
377
378 #[test]
379 fn topology_advertiser_authenticated_gets_full_list() {
380 // All three authenticated principals see every replica —
381 // the gate is capability-driven, not role-driven (ADR §1).
382 for ctx in [tenant(), operator(), admin()] {
383 let topo = TopologyAdvertiser::advertise(
384 &shape_multi_region(),
385 &ctx,
386 7,
387 primary_ep(),
388 500,
389 &lag_default(),
390 );
391 assert_eq!(topo.epoch, 7);
392 assert_eq!(topo.replicas.len(), 3);
393 let regions: Vec<&str> = topo.replicas.iter().map(|r| r.region.as_str()).collect();
394 assert!(regions.contains(&"us-east-1"));
395 assert!(regions.contains(&"us-west-2"));
396 assert!(regions.contains(&"eu-central-1"));
397 }
398 }
399
400 #[test]
401 fn topology_advertiser_authenticated_empty_registry_returns_no_replicas() {
402 let topo = TopologyAdvertiser::advertise(
403 &shape_empty(),
404 &admin(),
405 1,
406 primary_ep(),
407 0,
408 &lag_default(),
409 );
410 assert!(topo.replicas.is_empty());
411 assert_eq!(topo.primary, primary_ep());
412 }
413
414 #[test]
415 fn topology_advertiser_denied_collapses_to_primary_only() {
416 let topo = TopologyAdvertiser::advertise(
417 &shape_multi_region(),
418 &AuthResult::Denied("invalid token".into()),
419 9,
420 primary_ep(),
421 500,
422 &lag_default(),
423 );
424 assert!(topo.replicas.is_empty());
425 }
426
427 // -----------------------------------------------------------
428 // Health: ack window flips healthy.
429 // -----------------------------------------------------------
430
431 #[test]
432 fn topology_advertiser_recent_ack_is_healthy() {
433 let mut shape = shape_one();
434 shape[0].last_seen_at_unix_ms = lag_now_ms() - 100;
435 let topo =
436 TopologyAdvertiser::advertise(&shape, &admin(), 1, primary_ep(), 100, &lag_default());
437 assert!(topo.replicas[0].healthy);
438 }
439
440 #[test]
441 fn topology_advertiser_stale_ack_is_unhealthy() {
442 let mut shape = shape_one();
443 // Older than the timeout — flip to unhealthy.
444 shape[0].last_seen_at_unix_ms = lag_now_ms() - DEFAULT_REPLICA_TIMEOUT_MS - 1;
445 let topo =
446 TopologyAdvertiser::advertise(&shape, &admin(), 1, primary_ep(), 100, &lag_default());
447 assert!(!topo.replicas[0].healthy);
448 }
449
450 // -----------------------------------------------------------
451 // Lag: degrades gracefully to u32::MAX when no estimate.
452 // -----------------------------------------------------------
453
454 #[test]
455 fn topology_advertiser_lag_unknown_reports_u32_max() {
456 let mut shape = shape_one();
457 shape[0].last_acked_lsn = 50;
458 let topo =
459 TopologyAdvertiser::advertise(&shape, &admin(), 1, primary_ep(), 500, &lag_default());
460 assert_eq!(topo.replicas[0].lag_ms, u32::MAX);
461 }
462
463 #[test]
464 fn topology_advertiser_lag_zero_when_replica_caught_up() {
465 let mut shape = shape_one();
466 shape[0].last_acked_lsn = 500;
467 let topo =
468 TopologyAdvertiser::advertise(&shape, &admin(), 1, primary_ep(), 500, &lag_default());
469 assert_eq!(topo.replicas[0].lag_ms, 0);
470 }
471
472 #[test]
473 fn topology_advertiser_lag_uses_progress_estimate_when_provided() {
474 let mut shape = shape_one();
475 shape[0].last_acked_lsn = 400;
476 let lag = LagConfig {
477 records_per_ms: Some(10.0), // 10 records/ms → 100 records = 10 ms
478 ..lag_default()
479 };
480 let topo = TopologyAdvertiser::advertise(&shape, &admin(), 1, primary_ep(), 500, &lag);
481 assert_eq!(topo.replicas[0].lag_ms, 10);
482 }
483
484 #[test]
485 fn topology_advertiser_lag_zero_rate_falls_back_to_u32_max() {
486 // A degenerate rate (0 or negative) cannot produce a finite
487 // ms estimate; the advertiser must not divide by zero.
488 let mut shape = shape_one();
489 shape[0].last_acked_lsn = 50;
490 let lag = LagConfig {
491 records_per_ms: Some(0.0),
492 ..lag_default()
493 };
494 let topo = TopologyAdvertiser::advertise(&shape, &admin(), 1, primary_ep(), 500, &lag);
495 assert_eq!(topo.replicas[0].lag_ms, u32::MAX);
496 }
497
498 // -----------------------------------------------------------
499 // Epoch monotonicity: the advertiser is a pure function of the
500 // epoch the caller passes — registry-change accounting belongs
501 // to PrimaryReplication. The spec's "register, unregister, ack
502 // timeout flipping healthy" assertions reduce to "the caller
503 // is expected to bump the epoch on those events; the advertiser
504 // surfaces what it gets". We pin the contract here so a future
505 // refactor doesn't accidentally swallow the epoch.
506 // -----------------------------------------------------------
507
508 #[test]
509 fn topology_advertiser_propagates_epoch_verbatim() {
510 for epoch in [0, 1, 42, u64::MAX] {
511 let topo = TopologyAdvertiser::advertise(
512 &shape_one(),
513 &admin(),
514 epoch,
515 primary_ep(),
516 100,
517 &lag_default(),
518 );
519 assert_eq!(topo.epoch, epoch);
520 }
521 }
522
523 // -----------------------------------------------------------
524 // Both transports invoke the advertiser; bytes match the
525 // canonical encoder (#166). This is a server-internal sanity
526 // check — the byte-for-byte round-trip across transports is
527 // pinned by `reddb-grpc-proto::topology_tests` and
528 // `reddb-wire::topology::tests`.
529 // -----------------------------------------------------------
530
531 #[test]
532 fn topology_advertiser_output_round_trips_through_canonical_encoder() {
533 use reddb_wire::topology::{decode_topology, encode_topology};
534 let topo = TopologyAdvertiser::advertise(
535 &shape_multi_region(),
536 &admin(),
537 13,
538 primary_ep(),
539 500,
540 &lag_default(),
541 );
542 let bytes = encode_topology(&topo);
543 let decoded = decode_topology(&bytes).expect("decode").expect("v1");
544 assert_eq!(decoded, topo);
545 }
546
547 #[test]
548 fn topology_advertiser_output_round_trips_through_hello_ack_wrapper() {
549 use reddb_wire::topology::{decode_topology_from_hello_ack, encode_topology_for_hello_ack};
550 let topo = TopologyAdvertiser::advertise(
551 &shape_multi_region(),
552 &operator(),
553 21,
554 primary_ep(),
555 500,
556 &lag_default(),
557 );
558 let field = encode_topology_for_hello_ack(&topo);
559 let decoded = decode_topology_from_hello_ack(&field)
560 .expect("decode")
561 .expect("v1");
562 assert_eq!(decoded, topo);
563 }
564}