Skip to main content

reddb_server/replication/
topology_advertiser.rs

1//! Server-side `TopologyAdvertiser` (issue #167).
2//!
3//! Produces the canonical `Topology` payload (defined in
4//! `reddb-wire::topology`) from the live primary replica registry,
5//! gated by the `cluster:topology:read` capability per ADR 0008.
6//!
7//! ## Auth gate
8//!
9//! ADR 0008 §1–§3 specifies one capability — `cluster:topology:read` —
10//! evaluated against the principal that opened the connection.
11//! Authenticated principals get the capability by default, anonymous
12//! callers do not. The advertiser collapses to a *primary-only* payload
13//! when the gate denies (ADR 0008 §3) — a `Topology { primary, replicas: [] }`
14//! that still carries the write-path endpoint so unauthenticated
15//! bootstrap keeps working without leaking the replica fleet.
16//!
17//! ## Health + lag
18//!
19//! * `healthy = (now_ms - last_seen_at_unix_ms) <= replica_timeout_ms`
20//! * `lag_ms = primary.current_lsn - replica.last_applied_lsn` mapped
21//!   to milliseconds via the recent commit-rate estimate. When the
22//!   estimate cannot be produced (no commit history yet), the field
23//!   reports `u32::MAX` per the issue spec — the consumer side treats
24//!   that as "lag unknown, fall back to URI-only routing for this
25//!   replica".
26//!
27//! ## Module shape
28//!
29//! `TopologyAdvertiser` is the deep module: callers pass in the
30//! replica snapshot, an auth context, the current epoch, the primary
31//! endpoint, and a configuration knob for lag/health. It returns a
32//! `Topology` ready for the wire encoder. The auth predicate is
33//! extracted as `TopologyAuthGate` so it can be unit-tested in
34//! isolation without booting the rest of the advertiser.
35
36use crate::auth::middleware::AuthResult;
37use crate::replication::primary::ReplicaState;
38use reddb_wire::topology::{Endpoint, ReplicaInfo, Topology};
39
40/// Default replica heartbeat timeout used when an operator hasn't
41/// configured one explicitly. Matches the order of the `poll_interval_ms`
42/// default in `ReplicationConfig` (100 ms) multiplied by a generous
43/// fudge factor — five seconds without an ack flips a replica to
44/// `healthy: false`. Operators tune this via `LagConfig`.
45pub const DEFAULT_REPLICA_TIMEOUT_MS: u128 = 5_000;
46
47/// Capability name from ADR 0008 §1.
48///
49/// Kept as a constant rather than scattered string literals so a
50/// future capability-engine integration can grep for one symbol.
51pub const TOPOLOGY_READ_CAPABILITY: &str = "cluster:topology:read";
52
53// ---------------------------------------------------------------
54// TopologyAuthGate
55// ---------------------------------------------------------------
56
57/// Predicate over the caller's auth context — answers "does this
58/// principal have `cluster:topology:read`?".
59///
60/// Extracted so the gate can be unit-tested in isolation, the way
61/// ADR 0008 §1 wants ("one capability, one check, one place to
62/// grep"). The advertiser composes this gate, never reimplements it.
63///
64/// Until the capability engine lands, the policy is the one approved
65/// in ADR 0008 §2: every authenticated principal carries the
66/// capability by default; anonymous and denied callers do not.
67pub struct TopologyAuthGate;
68
69impl TopologyAuthGate {
70    /// `true` if the principal carries `cluster:topology:read` and
71    /// should receive the full topology. `false` collapses the
72    /// advertiser output to primary-only per ADR 0008 §3.
73    pub fn allows(auth: &AuthResult) -> bool {
74        match auth {
75            AuthResult::Authenticated { .. } => true,
76            AuthResult::Anonymous => false,
77            AuthResult::Denied(_) => false,
78        }
79    }
80}
81
82// ---------------------------------------------------------------
83// LagConfig
84// ---------------------------------------------------------------
85
86/// Knobs for the lag/health computation. Kept as a small struct so
87/// the call sites (gRPC `topology` RPC, RedWire HelloAck builder)
88/// thread the same defaults without each one redeclaring constants.
89#[derive(Debug, Clone, Copy)]
90pub struct LagConfig {
91    /// Replica heartbeats older than this flip `healthy` to `false`.
92    pub replica_timeout_ms: u128,
93    /// Recent-progress estimate: how many WAL records the cluster
94    /// applies per millisecond on average. `None` → lag conversion
95    /// degrades gracefully to `u32::MAX` (issue spec: "if not
96    /// estimable").
97    pub records_per_ms: Option<f64>,
98    /// `now` in unix milliseconds. Threaded explicitly so tests can
99    /// pin a deterministic clock without reaching for a global.
100    pub now_unix_ms: u128,
101}
102
103impl LagConfig {
104    /// Sensible default for production callers — `replica_timeout`
105    /// at the module default, no progress estimate (so lag reports
106    /// `u32::MAX`), `now` from the system clock.
107    pub fn from_now() -> Self {
108        Self {
109            replica_timeout_ms: DEFAULT_REPLICA_TIMEOUT_MS,
110            records_per_ms: None,
111            now_unix_ms: crate::utils::now_unix_millis() as u128,
112        }
113    }
114}
115
116// ---------------------------------------------------------------
117// TopologyAdvertiser
118// ---------------------------------------------------------------
119
120/// Server-side advertiser. Zero-sized — all state is threaded
121/// through `advertise()`'s arguments so callers control the
122/// snapshot semantics.
123pub struct TopologyAdvertiser;
124
125impl TopologyAdvertiser {
126    /// Build the canonical `Topology` payload for the given caller.
127    ///
128    /// * `replicas` — snapshot of the primary's replica registry
129    ///   (`PrimaryReplication::replica_snapshots()`).
130    /// * `auth` — caller's resolved auth context. Drives the
131    ///   capability gate (ADR 0008 §1).
132    /// * `epoch` — registry-change epoch; clients use this to detect
133    ///   stale advertisements.
134    /// * `primary_endpoint` — what the primary advertises about
135    ///   itself. Always returned regardless of auth (ADR 0008 §3).
136    /// * `primary_current_lsn` — primary's current WAL LSN, used as
137    ///   the reference for the per-replica lag computation.
138    /// * `lag` — knobs for the lag/health translation.
139    pub fn advertise(
140        replicas: &[ReplicaState],
141        auth: &AuthResult,
142        epoch: u64,
143        primary_endpoint: Endpoint,
144        primary_current_lsn: u64,
145        lag: &LagConfig,
146    ) -> Topology {
147        // ADR 0008 §3: anonymous / denied → primary-only.
148        if !TopologyAuthGate::allows(auth) {
149            return Topology {
150                epoch,
151                primary: primary_endpoint,
152                replicas: Vec::new(),
153            };
154        }
155
156        let infos = replicas
157            .iter()
158            .map(|r| replica_to_info(r, primary_current_lsn, lag))
159            .collect();
160        Topology {
161            epoch,
162            primary: primary_endpoint,
163            replicas: infos,
164        }
165    }
166}
167
168// ---------------------------------------------------------------
169// Internal helpers
170// ---------------------------------------------------------------
171
172fn replica_to_info(state: &ReplicaState, primary_lsn: u64, lag: &LagConfig) -> ReplicaInfo {
173    let healthy = is_healthy(state, lag);
174    let lag_ms = compute_lag_ms(state, primary_lsn, lag);
175    ReplicaInfo {
176        // The replica `id` doubles as the gRPC address handed in at
177        // registration time. Keeping the same field lets the consumer
178        // dial the replica directly without a second lookup table.
179        addr: state.id.clone(),
180        region: state
181            .region
182            .clone()
183            .unwrap_or_else(|| "unknown".to_string()),
184        healthy,
185        lag_ms,
186        last_applied_lsn: state.last_acked_lsn,
187    }
188}
189
190fn is_healthy(state: &ReplicaState, lag: &LagConfig) -> bool {
191    let last_seen = state.last_seen_at_unix_ms;
192    if lag.now_unix_ms < last_seen {
193        // Clock skew — treat the most recent ack as fresh rather
194        // than flag a healthy replica as stale.
195        return true;
196    }
197    (lag.now_unix_ms - last_seen) <= lag.replica_timeout_ms
198}
199
200fn compute_lag_ms(state: &ReplicaState, primary_lsn: u64, lag: &LagConfig) -> u32 {
201    let lag_records = primary_lsn.saturating_sub(state.last_acked_lsn);
202    if lag_records == 0 {
203        return 0;
204    }
205    let Some(rate) = lag.records_per_ms else {
206        // Issue spec: not estimable → u32::MAX.
207        return u32::MAX;
208    };
209    if rate <= 0.0 || !rate.is_finite() {
210        return u32::MAX;
211    }
212    let ms = (lag_records as f64) / rate;
213    if !ms.is_finite() || ms < 0.0 {
214        return u32::MAX;
215    }
216    if ms >= u32::MAX as f64 {
217        return u32::MAX;
218    }
219    ms.round() as u32
220}
221
222// ---------------------------------------------------------------
223// Tests
224// ---------------------------------------------------------------
225
226#[cfg(test)]
227mod tests {
228    use super::*;
229    use crate::auth::middleware::AuthSource;
230    use crate::auth::Role;
231
232    // -----------------------------------------------------------
233    // Auth-context fixtures: the four canonical principals from
234    // ADR 0008 (anonymous, tenant, operator, admin). The gate is
235    // role-blind today — ADR §1 says one capability, not a role
236    // ladder — so all three authenticated principals collapse to
237    // "has capability". We still keep the fixtures distinct so
238    // future capability-engine integration can override per-role
239    // behaviour without rewriting the test matrix.
240    // -----------------------------------------------------------
241
242    fn anonymous() -> AuthResult {
243        AuthResult::Anonymous
244    }
245
246    fn tenant() -> AuthResult {
247        AuthResult::Authenticated {
248            username: "tenant-alice".into(),
249            role: Role::Read,
250            source: AuthSource::Password,
251        }
252    }
253
254    fn operator() -> AuthResult {
255        AuthResult::Authenticated {
256            username: "operator-bob".into(),
257            role: Role::Write,
258            source: AuthSource::Password,
259        }
260    }
261
262    fn admin() -> AuthResult {
263        AuthResult::Authenticated {
264            username: "admin-root".into(),
265            role: Role::Admin,
266            source: AuthSource::Password,
267        }
268    }
269
270    fn primary_ep() -> Endpoint {
271        Endpoint {
272            addr: "primary.example.com:5050".into(),
273            region: "us-east-1".into(),
274        }
275    }
276
277    fn replica(id: &str, region: Option<&str>, last_seen_offset_ms: i128) -> ReplicaState {
278        // last_seen_offset_ms is measured against `lag_now_ms()` —
279        // negative values mean "older than now", positive means
280        // "in the future" (clock skew test).
281        let now = lag_now_ms();
282        let last_seen = (now as i128 + last_seen_offset_ms).max(0) as u128;
283        ReplicaState {
284            id: id.to_string(),
285            last_acked_lsn: 100,
286            last_sent_lsn: 100,
287            last_durable_lsn: 100,
288            connected_at_unix_ms: now,
289            last_seen_at_unix_ms: last_seen,
290            region: region.map(String::from),
291        }
292    }
293
294    fn lag_now_ms() -> u128 {
295        // Pinned clock so health computations are deterministic.
296        1_700_000_000_000
297    }
298
299    fn lag_default() -> LagConfig {
300        LagConfig {
301            replica_timeout_ms: DEFAULT_REPLICA_TIMEOUT_MS,
302            records_per_ms: None,
303            now_unix_ms: lag_now_ms(),
304        }
305    }
306
307    // -----------------------------------------------------------
308    // Auth gate (predicate-only, separate from the advertiser).
309    // -----------------------------------------------------------
310
311    #[test]
312    fn topology_advertiser_gate_allows_authenticated() {
313        assert!(TopologyAuthGate::allows(&tenant()));
314        assert!(TopologyAuthGate::allows(&operator()));
315        assert!(TopologyAuthGate::allows(&admin()));
316    }
317
318    #[test]
319    fn topology_advertiser_gate_blocks_anonymous_and_denied() {
320        assert!(!TopologyAuthGate::allows(&anonymous()));
321        assert!(!TopologyAuthGate::allows(&AuthResult::Denied(
322            "nope".into()
323        )));
324    }
325
326    // -----------------------------------------------------------
327    // Auth × registry-shape matrix (issue spec §"Tests" first
328    // bullet). 4 principals × 3 shapes (empty, 1 replica,
329    // multi-region).
330    // -----------------------------------------------------------
331
332    fn shape_empty() -> Vec<ReplicaState> {
333        Vec::new()
334    }
335
336    fn shape_one() -> Vec<ReplicaState> {
337        vec![replica("replica-a:5050", Some("us-east-1"), -100)]
338    }
339
340    fn shape_multi_region() -> Vec<ReplicaState> {
341        vec![
342            replica("replica-a:5050", Some("us-east-1"), -100),
343            replica("replica-b:5050", Some("us-west-2"), -200),
344            replica("replica-c:5050", Some("eu-central-1"), -300),
345        ]
346    }
347
348    #[test]
349    fn topology_advertiser_anonymous_gets_primary_only() {
350        // ADR 0008 §3: every shape collapses to primary-only for
351        // the unauthenticated caller — including the multi-region
352        // case where the disclosure-leak risk is highest.
353        for shape in [shape_empty(), shape_one(), shape_multi_region()] {
354            let topo = TopologyAdvertiser::advertise(
355                &shape,
356                &anonymous(),
357                42,
358                primary_ep(),
359                500,
360                &lag_default(),
361            );
362            assert_eq!(topo.epoch, 42);
363            assert_eq!(topo.primary, primary_ep());
364            assert!(
365                topo.replicas.is_empty(),
366                "anonymous must never see replicas, got {:?}",
367                topo.replicas
368            );
369        }
370    }
371
372    #[test]
373    fn topology_advertiser_authenticated_gets_full_list() {
374        // All three authenticated principals see every replica —
375        // the gate is capability-driven, not role-driven (ADR §1).
376        for ctx in [tenant(), operator(), admin()] {
377            let topo = TopologyAdvertiser::advertise(
378                &shape_multi_region(),
379                &ctx,
380                7,
381                primary_ep(),
382                500,
383                &lag_default(),
384            );
385            assert_eq!(topo.epoch, 7);
386            assert_eq!(topo.replicas.len(), 3);
387            let regions: Vec<&str> = topo.replicas.iter().map(|r| r.region.as_str()).collect();
388            assert!(regions.contains(&"us-east-1"));
389            assert!(regions.contains(&"us-west-2"));
390            assert!(regions.contains(&"eu-central-1"));
391        }
392    }
393
394    #[test]
395    fn topology_advertiser_authenticated_empty_registry_returns_no_replicas() {
396        let topo = TopologyAdvertiser::advertise(
397            &shape_empty(),
398            &admin(),
399            1,
400            primary_ep(),
401            0,
402            &lag_default(),
403        );
404        assert!(topo.replicas.is_empty());
405        assert_eq!(topo.primary, primary_ep());
406    }
407
408    #[test]
409    fn topology_advertiser_denied_collapses_to_primary_only() {
410        let topo = TopologyAdvertiser::advertise(
411            &shape_multi_region(),
412            &AuthResult::Denied("invalid token".into()),
413            9,
414            primary_ep(),
415            500,
416            &lag_default(),
417        );
418        assert!(topo.replicas.is_empty());
419    }
420
421    // -----------------------------------------------------------
422    // Health: ack window flips healthy.
423    // -----------------------------------------------------------
424
425    #[test]
426    fn topology_advertiser_recent_ack_is_healthy() {
427        let mut shape = shape_one();
428        shape[0].last_seen_at_unix_ms = lag_now_ms() - 100;
429        let topo =
430            TopologyAdvertiser::advertise(&shape, &admin(), 1, primary_ep(), 100, &lag_default());
431        assert!(topo.replicas[0].healthy);
432    }
433
434    #[test]
435    fn topology_advertiser_stale_ack_is_unhealthy() {
436        let mut shape = shape_one();
437        // Older than the timeout — flip to unhealthy.
438        shape[0].last_seen_at_unix_ms = lag_now_ms() - DEFAULT_REPLICA_TIMEOUT_MS - 1;
439        let topo =
440            TopologyAdvertiser::advertise(&shape, &admin(), 1, primary_ep(), 100, &lag_default());
441        assert!(!topo.replicas[0].healthy);
442    }
443
444    // -----------------------------------------------------------
445    // Lag: degrades gracefully to u32::MAX when no estimate.
446    // -----------------------------------------------------------
447
448    #[test]
449    fn topology_advertiser_lag_unknown_reports_u32_max() {
450        let mut shape = shape_one();
451        shape[0].last_acked_lsn = 50;
452        let topo =
453            TopologyAdvertiser::advertise(&shape, &admin(), 1, primary_ep(), 500, &lag_default());
454        assert_eq!(topo.replicas[0].lag_ms, u32::MAX);
455    }
456
457    #[test]
458    fn topology_advertiser_lag_zero_when_replica_caught_up() {
459        let mut shape = shape_one();
460        shape[0].last_acked_lsn = 500;
461        let topo =
462            TopologyAdvertiser::advertise(&shape, &admin(), 1, primary_ep(), 500, &lag_default());
463        assert_eq!(topo.replicas[0].lag_ms, 0);
464    }
465
466    #[test]
467    fn topology_advertiser_lag_uses_progress_estimate_when_provided() {
468        let mut shape = shape_one();
469        shape[0].last_acked_lsn = 400;
470        let lag = LagConfig {
471            records_per_ms: Some(10.0), // 10 records/ms → 100 records = 10 ms
472            ..lag_default()
473        };
474        let topo = TopologyAdvertiser::advertise(&shape, &admin(), 1, primary_ep(), 500, &lag);
475        assert_eq!(topo.replicas[0].lag_ms, 10);
476    }
477
478    #[test]
479    fn topology_advertiser_lag_zero_rate_falls_back_to_u32_max() {
480        // A degenerate rate (0 or negative) cannot produce a finite
481        // ms estimate; the advertiser must not divide by zero.
482        let mut shape = shape_one();
483        shape[0].last_acked_lsn = 50;
484        let lag = LagConfig {
485            records_per_ms: Some(0.0),
486            ..lag_default()
487        };
488        let topo = TopologyAdvertiser::advertise(&shape, &admin(), 1, primary_ep(), 500, &lag);
489        assert_eq!(topo.replicas[0].lag_ms, u32::MAX);
490    }
491
492    // -----------------------------------------------------------
493    // Epoch monotonicity: the advertiser is a pure function of the
494    // epoch the caller passes — registry-change accounting belongs
495    // to PrimaryReplication. The spec's "register, unregister, ack
496    // timeout flipping healthy" assertions reduce to "the caller
497    // is expected to bump the epoch on those events; the advertiser
498    // surfaces what it gets". We pin the contract here so a future
499    // refactor doesn't accidentally swallow the epoch.
500    // -----------------------------------------------------------
501
502    #[test]
503    fn topology_advertiser_propagates_epoch_verbatim() {
504        for epoch in [0, 1, 42, u64::MAX] {
505            let topo = TopologyAdvertiser::advertise(
506                &shape_one(),
507                &admin(),
508                epoch,
509                primary_ep(),
510                100,
511                &lag_default(),
512            );
513            assert_eq!(topo.epoch, epoch);
514        }
515    }
516
517    // -----------------------------------------------------------
518    // Both transports invoke the advertiser; bytes match the
519    // canonical encoder (#166). This is a server-internal sanity
520    // check — the byte-for-byte round-trip across transports is
521    // pinned by `reddb-grpc-proto::topology_tests` and
522    // `reddb-wire::topology::tests`.
523    // -----------------------------------------------------------
524
525    #[test]
526    fn topology_advertiser_output_round_trips_through_canonical_encoder() {
527        use reddb_wire::topology::{decode_topology, encode_topology};
528        let topo = TopologyAdvertiser::advertise(
529            &shape_multi_region(),
530            &admin(),
531            13,
532            primary_ep(),
533            500,
534            &lag_default(),
535        );
536        let bytes = encode_topology(&topo);
537        let decoded = decode_topology(&bytes).expect("decode").expect("v1");
538        assert_eq!(decoded, topo);
539    }
540
541    #[test]
542    fn topology_advertiser_output_round_trips_through_hello_ack_wrapper() {
543        use reddb_wire::topology::{decode_topology_from_hello_ack, encode_topology_for_hello_ack};
544        let topo = TopologyAdvertiser::advertise(
545            &shape_multi_region(),
546            &operator(),
547            21,
548            primary_ep(),
549            500,
550            &lag_default(),
551        );
552        let field = encode_topology_for_hello_ack(&topo);
553        let decoded = decode_topology_from_hello_ack(&field)
554            .expect("decode")
555            .expect("v1");
556        assert_eq!(decoded, topo);
557    }
558}