reddb_server/replication/topology_advertiser.rs
1//! Server-side `TopologyAdvertiser` (issue #167).
2//!
3//! Produces the canonical `Topology` payload (defined in
4//! `reddb-wire::topology`) from the live primary replica registry,
5//! gated by the `cluster:topology:read` capability per ADR 0008.
6//!
7//! ## Auth gate
8//!
9//! ADR 0008 §1–§3 specifies one capability — `cluster:topology:read` —
10//! evaluated against the principal that opened the connection.
11//! Authenticated principals get the capability by default, anonymous
12//! callers do not. The advertiser collapses to a *primary-only* payload
13//! when the gate denies (ADR 0008 §3) — a `Topology { primary, replicas: [] }`
14//! that still carries the write-path endpoint so unauthenticated
15//! bootstrap keeps working without leaking the replica fleet.
16//!
17//! ## Health + lag
18//!
19//! * `healthy = (now_ms - last_seen_at_unix_ms) <= replica_timeout_ms`
20//! * `lag_ms = primary.current_lsn - replica.last_applied_lsn` mapped
21//! to milliseconds via the recent commit-rate estimate. When the
22//! estimate cannot be produced (no commit history yet), the field
23//! reports `u32::MAX` per the issue spec — the consumer side treats
24//! that as "lag unknown, fall back to URI-only routing for this
25//! replica".
26//!
27//! ## Module shape
28//!
29//! `TopologyAdvertiser` is the deep module: callers pass in the
30//! replica snapshot, an auth context, the current epoch, the primary
31//! endpoint, and a configuration knob for lag/health. It returns a
32//! `Topology` ready for the wire encoder. The auth predicate is
33//! extracted as `TopologyAuthGate` so it can be unit-tested in
34//! isolation without booting the rest of the advertiser.
35
36use crate::auth::middleware::AuthResult;
37use crate::replication::primary::ReplicaState;
38use reddb_wire::topology::{Endpoint, ReplicaInfo, Topology};
39
40/// Default replica heartbeat timeout used when an operator hasn't
41/// configured one explicitly. Matches the order of the `poll_interval_ms`
42/// default in `ReplicationConfig` (100 ms) multiplied by a generous
43/// fudge factor — five seconds without an ack flips a replica to
44/// `healthy: false`. Operators tune this via `LagConfig`.
45pub const DEFAULT_REPLICA_TIMEOUT_MS: u128 = 5_000;
46
47/// Capability name from ADR 0008 §1.
48///
49/// Kept as a constant rather than scattered string literals so a
50/// future capability-engine integration can grep for one symbol.
51pub const TOPOLOGY_READ_CAPABILITY: &str = "cluster:topology:read";
52
53// ---------------------------------------------------------------
54// TopologyAuthGate
55// ---------------------------------------------------------------
56
57/// Predicate over the caller's auth context — answers "does this
58/// principal have `cluster:topology:read`?".
59///
60/// Extracted so the gate can be unit-tested in isolation, the way
61/// ADR 0008 §1 wants ("one capability, one check, one place to
62/// grep"). The advertiser composes this gate, never reimplements it.
63///
64/// Until the capability engine lands, the policy is the one approved
65/// in ADR 0008 §2: every authenticated principal carries the
66/// capability by default; anonymous and denied callers do not.
67pub struct TopologyAuthGate;
68
69impl TopologyAuthGate {
70 /// `true` if the principal carries `cluster:topology:read` and
71 /// should receive the full topology. `false` collapses the
72 /// advertiser output to primary-only per ADR 0008 §3.
73 pub fn allows(auth: &AuthResult) -> bool {
74 match auth {
75 AuthResult::Authenticated { .. } => true,
76 AuthResult::Anonymous => false,
77 AuthResult::Denied(_) => false,
78 }
79 }
80}
81
82// ---------------------------------------------------------------
83// LagConfig
84// ---------------------------------------------------------------
85
86/// Knobs for the lag/health computation. Kept as a small struct so
87/// the call sites (gRPC `topology` RPC, RedWire HelloAck builder)
88/// thread the same defaults without each one redeclaring constants.
89#[derive(Debug, Clone, Copy)]
90pub struct LagConfig {
91 /// Replica heartbeats older than this flip `healthy` to `false`.
92 pub replica_timeout_ms: u128,
93 /// Recent-progress estimate: how many WAL records the cluster
94 /// applies per millisecond on average. `None` → lag conversion
95 /// degrades gracefully to `u32::MAX` (issue spec: "if not
96 /// estimable").
97 pub records_per_ms: Option<f64>,
98 /// `now` in unix milliseconds. Threaded explicitly so tests can
99 /// pin a deterministic clock without reaching for a global.
100 pub now_unix_ms: u128,
101}
102
103impl LagConfig {
104 /// Sensible default for production callers — `replica_timeout`
105 /// at the module default, no progress estimate (so lag reports
106 /// `u32::MAX`), `now` from the system clock.
107 pub fn from_now() -> Self {
108 Self {
109 replica_timeout_ms: DEFAULT_REPLICA_TIMEOUT_MS,
110 records_per_ms: None,
111 now_unix_ms: crate::utils::now_unix_millis() as u128,
112 }
113 }
114}
115
116// ---------------------------------------------------------------
117// TopologyAdvertiser
118// ---------------------------------------------------------------
119
120/// Server-side advertiser. Zero-sized — all state is threaded
121/// through `advertise()`'s arguments so callers control the
122/// snapshot semantics.
123pub struct TopologyAdvertiser;
124
125impl TopologyAdvertiser {
126 /// Build the canonical `Topology` payload for the given caller.
127 ///
128 /// * `replicas` — snapshot of the primary's replica registry
129 /// (`PrimaryReplication::replica_snapshots()`).
130 /// * `auth` — caller's resolved auth context. Drives the
131 /// capability gate (ADR 0008 §1).
132 /// * `epoch` — registry-change epoch; clients use this to detect
133 /// stale advertisements.
134 /// * `primary_endpoint` — what the primary advertises about
135 /// itself. Always returned regardless of auth (ADR 0008 §3).
136 /// * `primary_current_lsn` — primary's current WAL LSN, used as
137 /// the reference for the per-replica lag computation.
138 /// * `lag` — knobs for the lag/health translation.
139 pub fn advertise(
140 replicas: &[ReplicaState],
141 auth: &AuthResult,
142 epoch: u64,
143 primary_endpoint: Endpoint,
144 primary_current_lsn: u64,
145 lag: &LagConfig,
146 ) -> Topology {
147 // ADR 0008 §3: anonymous / denied → primary-only.
148 if !TopologyAuthGate::allows(auth) {
149 return Topology {
150 epoch,
151 primary: primary_endpoint,
152 replicas: Vec::new(),
153 };
154 }
155
156 let infos = replicas
157 .iter()
158 .map(|r| replica_to_info(r, primary_current_lsn, lag))
159 .collect();
160 Topology {
161 epoch,
162 primary: primary_endpoint,
163 replicas: infos,
164 }
165 }
166}
167
168// ---------------------------------------------------------------
169// Internal helpers
170// ---------------------------------------------------------------
171
172fn replica_to_info(state: &ReplicaState, primary_lsn: u64, lag: &LagConfig) -> ReplicaInfo {
173 let healthy = is_healthy(state, lag);
174 let lag_ms = compute_lag_ms(state, primary_lsn, lag);
175 ReplicaInfo {
176 // The replica `id` doubles as the gRPC address handed in at
177 // registration time. Keeping the same field lets the consumer
178 // dial the replica directly without a second lookup table.
179 addr: state.id.clone(),
180 region: state
181 .region
182 .clone()
183 .unwrap_or_else(|| "unknown".to_string()),
184 healthy,
185 lag_ms,
186 last_applied_lsn: state.last_acked_lsn,
187 }
188}
189
190fn is_healthy(state: &ReplicaState, lag: &LagConfig) -> bool {
191 let last_seen = state.last_seen_at_unix_ms;
192 if lag.now_unix_ms < last_seen {
193 // Clock skew — treat the most recent ack as fresh rather
194 // than flag a healthy replica as stale.
195 return true;
196 }
197 (lag.now_unix_ms - last_seen) <= lag.replica_timeout_ms
198}
199
200fn compute_lag_ms(state: &ReplicaState, primary_lsn: u64, lag: &LagConfig) -> u32 {
201 let lag_records = primary_lsn.saturating_sub(state.last_acked_lsn);
202 if lag_records == 0 {
203 return 0;
204 }
205 let Some(rate) = lag.records_per_ms else {
206 // Issue spec: not estimable → u32::MAX.
207 return u32::MAX;
208 };
209 if rate <= 0.0 || !rate.is_finite() {
210 return u32::MAX;
211 }
212 let ms = (lag_records as f64) / rate;
213 if !ms.is_finite() || ms < 0.0 {
214 return u32::MAX;
215 }
216 if ms >= u32::MAX as f64 {
217 return u32::MAX;
218 }
219 ms.round() as u32
220}
221
222// ---------------------------------------------------------------
223// Tests
224// ---------------------------------------------------------------
225
226#[cfg(test)]
227mod tests {
228 use super::*;
229 use crate::auth::middleware::AuthSource;
230 use crate::auth::Role;
231
232 // -----------------------------------------------------------
233 // Auth-context fixtures: the four canonical principals from
234 // ADR 0008 (anonymous, tenant, operator, admin). The gate is
235 // role-blind today — ADR §1 says one capability, not a role
236 // ladder — so all three authenticated principals collapse to
237 // "has capability". We still keep the fixtures distinct so
238 // future capability-engine integration can override per-role
239 // behaviour without rewriting the test matrix.
240 // -----------------------------------------------------------
241
242 fn anonymous() -> AuthResult {
243 AuthResult::Anonymous
244 }
245
246 fn tenant() -> AuthResult {
247 AuthResult::Authenticated {
248 username: "tenant-alice".into(),
249 role: Role::Read,
250 source: AuthSource::Password,
251 }
252 }
253
254 fn operator() -> AuthResult {
255 AuthResult::Authenticated {
256 username: "operator-bob".into(),
257 role: Role::Write,
258 source: AuthSource::Password,
259 }
260 }
261
262 fn admin() -> AuthResult {
263 AuthResult::Authenticated {
264 username: "admin-root".into(),
265 role: Role::Admin,
266 source: AuthSource::Password,
267 }
268 }
269
270 fn primary_ep() -> Endpoint {
271 Endpoint {
272 addr: "primary.example.com:5050".into(),
273 region: "us-east-1".into(),
274 }
275 }
276
277 fn replica(id: &str, region: Option<&str>, last_seen_offset_ms: i128) -> ReplicaState {
278 // last_seen_offset_ms is measured against `lag_now_ms()` —
279 // negative values mean "older than now", positive means
280 // "in the future" (clock skew test).
281 let now = lag_now_ms();
282 let last_seen = (now as i128 + last_seen_offset_ms).max(0) as u128;
283 ReplicaState {
284 id: id.to_string(),
285 last_acked_lsn: 100,
286 last_sent_lsn: 100,
287 last_durable_lsn: 100,
288 connected_at_unix_ms: now,
289 last_seen_at_unix_ms: last_seen,
290 region: region.map(String::from),
291 }
292 }
293
294 fn lag_now_ms() -> u128 {
295 // Pinned clock so health computations are deterministic.
296 1_700_000_000_000
297 }
298
299 fn lag_default() -> LagConfig {
300 LagConfig {
301 replica_timeout_ms: DEFAULT_REPLICA_TIMEOUT_MS,
302 records_per_ms: None,
303 now_unix_ms: lag_now_ms(),
304 }
305 }
306
307 // -----------------------------------------------------------
308 // Auth gate (predicate-only, separate from the advertiser).
309 // -----------------------------------------------------------
310
311 #[test]
312 fn topology_advertiser_gate_allows_authenticated() {
313 assert!(TopologyAuthGate::allows(&tenant()));
314 assert!(TopologyAuthGate::allows(&operator()));
315 assert!(TopologyAuthGate::allows(&admin()));
316 }
317
318 #[test]
319 fn topology_advertiser_gate_blocks_anonymous_and_denied() {
320 assert!(!TopologyAuthGate::allows(&anonymous()));
321 assert!(!TopologyAuthGate::allows(&AuthResult::Denied(
322 "nope".into()
323 )));
324 }
325
326 // -----------------------------------------------------------
327 // Auth × registry-shape matrix (issue spec §"Tests" first
328 // bullet). 4 principals × 3 shapes (empty, 1 replica,
329 // multi-region).
330 // -----------------------------------------------------------
331
332 fn shape_empty() -> Vec<ReplicaState> {
333 Vec::new()
334 }
335
336 fn shape_one() -> Vec<ReplicaState> {
337 vec![replica("replica-a:5050", Some("us-east-1"), -100)]
338 }
339
340 fn shape_multi_region() -> Vec<ReplicaState> {
341 vec![
342 replica("replica-a:5050", Some("us-east-1"), -100),
343 replica("replica-b:5050", Some("us-west-2"), -200),
344 replica("replica-c:5050", Some("eu-central-1"), -300),
345 ]
346 }
347
348 #[test]
349 fn topology_advertiser_anonymous_gets_primary_only() {
350 // ADR 0008 §3: every shape collapses to primary-only for
351 // the unauthenticated caller — including the multi-region
352 // case where the disclosure-leak risk is highest.
353 for shape in [shape_empty(), shape_one(), shape_multi_region()] {
354 let topo = TopologyAdvertiser::advertise(
355 &shape,
356 &anonymous(),
357 42,
358 primary_ep(),
359 500,
360 &lag_default(),
361 );
362 assert_eq!(topo.epoch, 42);
363 assert_eq!(topo.primary, primary_ep());
364 assert!(
365 topo.replicas.is_empty(),
366 "anonymous must never see replicas, got {:?}",
367 topo.replicas
368 );
369 }
370 }
371
372 #[test]
373 fn topology_advertiser_authenticated_gets_full_list() {
374 // All three authenticated principals see every replica —
375 // the gate is capability-driven, not role-driven (ADR §1).
376 for ctx in [tenant(), operator(), admin()] {
377 let topo = TopologyAdvertiser::advertise(
378 &shape_multi_region(),
379 &ctx,
380 7,
381 primary_ep(),
382 500,
383 &lag_default(),
384 );
385 assert_eq!(topo.epoch, 7);
386 assert_eq!(topo.replicas.len(), 3);
387 let regions: Vec<&str> = topo.replicas.iter().map(|r| r.region.as_str()).collect();
388 assert!(regions.contains(&"us-east-1"));
389 assert!(regions.contains(&"us-west-2"));
390 assert!(regions.contains(&"eu-central-1"));
391 }
392 }
393
394 #[test]
395 fn topology_advertiser_authenticated_empty_registry_returns_no_replicas() {
396 let topo = TopologyAdvertiser::advertise(
397 &shape_empty(),
398 &admin(),
399 1,
400 primary_ep(),
401 0,
402 &lag_default(),
403 );
404 assert!(topo.replicas.is_empty());
405 assert_eq!(topo.primary, primary_ep());
406 }
407
408 #[test]
409 fn topology_advertiser_denied_collapses_to_primary_only() {
410 let topo = TopologyAdvertiser::advertise(
411 &shape_multi_region(),
412 &AuthResult::Denied("invalid token".into()),
413 9,
414 primary_ep(),
415 500,
416 &lag_default(),
417 );
418 assert!(topo.replicas.is_empty());
419 }
420
421 // -----------------------------------------------------------
422 // Health: ack window flips healthy.
423 // -----------------------------------------------------------
424
425 #[test]
426 fn topology_advertiser_recent_ack_is_healthy() {
427 let mut shape = shape_one();
428 shape[0].last_seen_at_unix_ms = lag_now_ms() - 100;
429 let topo =
430 TopologyAdvertiser::advertise(&shape, &admin(), 1, primary_ep(), 100, &lag_default());
431 assert!(topo.replicas[0].healthy);
432 }
433
434 #[test]
435 fn topology_advertiser_stale_ack_is_unhealthy() {
436 let mut shape = shape_one();
437 // Older than the timeout — flip to unhealthy.
438 shape[0].last_seen_at_unix_ms = lag_now_ms() - DEFAULT_REPLICA_TIMEOUT_MS - 1;
439 let topo =
440 TopologyAdvertiser::advertise(&shape, &admin(), 1, primary_ep(), 100, &lag_default());
441 assert!(!topo.replicas[0].healthy);
442 }
443
444 // -----------------------------------------------------------
445 // Lag: degrades gracefully to u32::MAX when no estimate.
446 // -----------------------------------------------------------
447
448 #[test]
449 fn topology_advertiser_lag_unknown_reports_u32_max() {
450 let mut shape = shape_one();
451 shape[0].last_acked_lsn = 50;
452 let topo =
453 TopologyAdvertiser::advertise(&shape, &admin(), 1, primary_ep(), 500, &lag_default());
454 assert_eq!(topo.replicas[0].lag_ms, u32::MAX);
455 }
456
457 #[test]
458 fn topology_advertiser_lag_zero_when_replica_caught_up() {
459 let mut shape = shape_one();
460 shape[0].last_acked_lsn = 500;
461 let topo =
462 TopologyAdvertiser::advertise(&shape, &admin(), 1, primary_ep(), 500, &lag_default());
463 assert_eq!(topo.replicas[0].lag_ms, 0);
464 }
465
466 #[test]
467 fn topology_advertiser_lag_uses_progress_estimate_when_provided() {
468 let mut shape = shape_one();
469 shape[0].last_acked_lsn = 400;
470 let lag = LagConfig {
471 records_per_ms: Some(10.0), // 10 records/ms → 100 records = 10 ms
472 ..lag_default()
473 };
474 let topo = TopologyAdvertiser::advertise(&shape, &admin(), 1, primary_ep(), 500, &lag);
475 assert_eq!(topo.replicas[0].lag_ms, 10);
476 }
477
478 #[test]
479 fn topology_advertiser_lag_zero_rate_falls_back_to_u32_max() {
480 // A degenerate rate (0 or negative) cannot produce a finite
481 // ms estimate; the advertiser must not divide by zero.
482 let mut shape = shape_one();
483 shape[0].last_acked_lsn = 50;
484 let lag = LagConfig {
485 records_per_ms: Some(0.0),
486 ..lag_default()
487 };
488 let topo = TopologyAdvertiser::advertise(&shape, &admin(), 1, primary_ep(), 500, &lag);
489 assert_eq!(topo.replicas[0].lag_ms, u32::MAX);
490 }
491
492 // -----------------------------------------------------------
493 // Epoch monotonicity: the advertiser is a pure function of the
494 // epoch the caller passes — registry-change accounting belongs
495 // to PrimaryReplication. The spec's "register, unregister, ack
496 // timeout flipping healthy" assertions reduce to "the caller
497 // is expected to bump the epoch on those events; the advertiser
498 // surfaces what it gets". We pin the contract here so a future
499 // refactor doesn't accidentally swallow the epoch.
500 // -----------------------------------------------------------
501
502 #[test]
503 fn topology_advertiser_propagates_epoch_verbatim() {
504 for epoch in [0, 1, 42, u64::MAX] {
505 let topo = TopologyAdvertiser::advertise(
506 &shape_one(),
507 &admin(),
508 epoch,
509 primary_ep(),
510 100,
511 &lag_default(),
512 );
513 assert_eq!(topo.epoch, epoch);
514 }
515 }
516
517 // -----------------------------------------------------------
518 // Both transports invoke the advertiser; bytes match the
519 // canonical encoder (#166). This is a server-internal sanity
520 // check — the byte-for-byte round-trip across transports is
521 // pinned by `reddb-grpc-proto::topology_tests` and
522 // `reddb-wire::topology::tests`.
523 // -----------------------------------------------------------
524
525 #[test]
526 fn topology_advertiser_output_round_trips_through_canonical_encoder() {
527 use reddb_wire::topology::{decode_topology, encode_topology};
528 let topo = TopologyAdvertiser::advertise(
529 &shape_multi_region(),
530 &admin(),
531 13,
532 primary_ep(),
533 500,
534 &lag_default(),
535 );
536 let bytes = encode_topology(&topo);
537 let decoded = decode_topology(&bytes).expect("decode").expect("v1");
538 assert_eq!(decoded, topo);
539 }
540
541 #[test]
542 fn topology_advertiser_output_round_trips_through_hello_ack_wrapper() {
543 use reddb_wire::topology::{decode_topology_from_hello_ack, encode_topology_for_hello_ack};
544 let topo = TopologyAdvertiser::advertise(
545 &shape_multi_region(),
546 &operator(),
547 21,
548 primary_ep(),
549 500,
550 &lag_default(),
551 );
552 let field = encode_topology_for_hello_ack(&topo);
553 let decoded = decode_topology_from_hello_ack(&field)
554 .expect("decode")
555 .expect("v1");
556 assert_eq!(decoded, topo);
557 }
558}