Skip to main content

reddb_server/replication/
topology_advertiser.rs

1//! Server-side `TopologyAdvertiser` (issue #167).
2//!
3//! Produces the canonical `Topology` payload (defined in
4//! `reddb-wire::topology`) from the live primary replica registry,
5//! gated by the `cluster:topology:read` capability per ADR 0008.
6//!
7//! ## Auth gate
8//!
9//! ADR 0008 §1–§3 specifies one capability — `cluster:topology:read` —
10//! evaluated against the principal that opened the connection.
11//! Authenticated principals get the capability by default, anonymous
12//! callers do not. The advertiser collapses to a *primary-only* payload
13//! when the gate denies (ADR 0008 §3) — a `Topology { primary, replicas: [] }`
14//! that still carries the write-path endpoint so unauthenticated
15//! bootstrap keeps working without leaking the replica fleet.
16//!
17//! ## Health + lag
18//!
19//! * `healthy = (now_ms - last_seen_at_unix_ms) <= replica_timeout_ms`
20//! * `lag_ms = primary.current_lsn - replica.last_applied_lsn` mapped
21//!   to milliseconds via the recent commit-rate estimate. When the
22//!   estimate cannot be produced (no commit history yet), the field
23//!   reports `u32::MAX` per the issue spec — the consumer side treats
24//!   that as "lag unknown, fall back to URI-only routing for this
25//!   replica".
26//!
27//! ## Module shape
28//!
29//! `TopologyAdvertiser` is the deep module: callers pass in the
30//! replica snapshot, an auth context, the current epoch, the primary
31//! endpoint, and a configuration knob for lag/health. It returns a
32//! `Topology` ready for the wire encoder. The auth predicate is
33//! extracted as `TopologyAuthGate` so it can be unit-tested in
34//! isolation without booting the rest of the advertiser.
35
36use crate::auth::middleware::AuthResult;
37use crate::replication::primary::ReplicaState;
38use reddb_wire::topology::{Endpoint, ReplicaInfo, Topology};
39
40/// Default replica heartbeat timeout used when an operator hasn't
41/// configured one explicitly. Matches the order of the `poll_interval_ms`
42/// default in `ReplicationConfig` (100 ms) multiplied by a generous
43/// fudge factor — five seconds without an ack flips a replica to
44/// `healthy: false`. Operators tune this via `LagConfig`.
45pub const DEFAULT_REPLICA_TIMEOUT_MS: u128 = 5_000;
46
47/// Capability name from ADR 0008 §1.
48///
49/// Kept as a constant rather than scattered string literals so a
50/// future capability-engine integration can grep for one symbol.
51pub const TOPOLOGY_READ_CAPABILITY: &str = "cluster:topology:read";
52
53// ---------------------------------------------------------------
54// TopologyAuthGate
55// ---------------------------------------------------------------
56
57/// Predicate over the caller's auth context — answers "does this
58/// principal have `cluster:topology:read`?".
59///
60/// Extracted so the gate can be unit-tested in isolation, the way
61/// ADR 0008 §1 wants ("one capability, one check, one place to
62/// grep"). The advertiser composes this gate, never reimplements it.
63///
64/// Until the capability engine lands, the policy is the one approved
65/// in ADR 0008 §2: every authenticated principal carries the
66/// capability by default; anonymous and denied callers do not.
67pub struct TopologyAuthGate;
68
69impl TopologyAuthGate {
70    /// `true` if the principal carries `cluster:topology:read` and
71    /// should receive the full topology. `false` collapses the
72    /// advertiser output to primary-only per ADR 0008 §3.
73    pub fn allows(auth: &AuthResult) -> bool {
74        match auth {
75            AuthResult::Authenticated { .. } => true,
76            AuthResult::Anonymous => false,
77            AuthResult::Denied(_) => false,
78        }
79    }
80}
81
82// ---------------------------------------------------------------
83// LagConfig
84// ---------------------------------------------------------------
85
86/// Knobs for the lag/health computation. Kept as a small struct so
87/// the call sites (gRPC `topology` RPC, RedWire HelloAck builder)
88/// thread the same defaults without each one redeclaring constants.
89#[derive(Debug, Clone, Copy)]
90pub struct LagConfig {
91    /// Replica heartbeats older than this flip `healthy` to `false`.
92    pub replica_timeout_ms: u128,
93    /// Recent-progress estimate: how many WAL records the cluster
94    /// applies per millisecond on average. `None` → lag conversion
95    /// degrades gracefully to `u32::MAX` (issue spec: "if not
96    /// estimable").
97    pub records_per_ms: Option<f64>,
98    /// `now` in unix milliseconds. Threaded explicitly so tests can
99    /// pin a deterministic clock without reaching for a global.
100    pub now_unix_ms: u128,
101}
102
103impl LagConfig {
104    /// Sensible default for production callers — `replica_timeout`
105    /// at the module default, no progress estimate (so lag reports
106    /// `u32::MAX`), `now` from the system clock.
107    pub fn from_now() -> Self {
108        Self {
109            replica_timeout_ms: DEFAULT_REPLICA_TIMEOUT_MS,
110            records_per_ms: None,
111            now_unix_ms: crate::utils::now_unix_millis() as u128,
112        }
113    }
114}
115
116// ---------------------------------------------------------------
117// TopologyAdvertiser
118// ---------------------------------------------------------------
119
120/// Server-side advertiser. Zero-sized — all state is threaded
121/// through `advertise()`'s arguments so callers control the
122/// snapshot semantics.
123pub struct TopologyAdvertiser;
124
125impl TopologyAdvertiser {
126    /// Build the canonical `Topology` payload for the given caller.
127    ///
128    /// * `replicas` — snapshot of the primary's replica registry
129    ///   (`PrimaryReplication::replica_snapshots()`).
130    /// * `auth` — caller's resolved auth context. Drives the
131    ///   capability gate (ADR 0008 §1).
132    /// * `epoch` — registry-change epoch; clients use this to detect
133    ///   stale advertisements.
134    /// * `primary_endpoint` — what the primary advertises about
135    ///   itself. Always returned regardless of auth (ADR 0008 §3).
136    /// * `primary_current_lsn` — primary's current WAL LSN, used as
137    ///   the reference for the per-replica lag computation.
138    /// * `lag` — knobs for the lag/health translation.
139    pub fn advertise(
140        replicas: &[ReplicaState],
141        auth: &AuthResult,
142        epoch: u64,
143        primary_endpoint: Endpoint,
144        primary_current_lsn: u64,
145        lag: &LagConfig,
146    ) -> Topology {
147        // ADR 0008 §3: anonymous / denied → primary-only.
148        if !TopologyAuthGate::allows(auth) {
149            return Topology {
150                epoch,
151                primary: primary_endpoint,
152                replicas: Vec::new(),
153            };
154        }
155
156        let infos = replicas
157            .iter()
158            .map(|r| replica_to_info(r, primary_current_lsn, lag))
159            .collect();
160        Topology {
161            epoch,
162            primary: primary_endpoint,
163            replicas: infos,
164        }
165    }
166}
167
168// ---------------------------------------------------------------
169// Internal helpers
170// ---------------------------------------------------------------
171
172fn replica_to_info(state: &ReplicaState, primary_lsn: u64, lag: &LagConfig) -> ReplicaInfo {
173    let healthy = is_healthy(state, lag);
174    let lag_ms = compute_lag_ms(state, primary_lsn, lag);
175    ReplicaInfo {
176        // The replica `id` doubles as the gRPC address handed in at
177        // registration time. Keeping the same field lets the consumer
178        // dial the replica directly without a second lookup table.
179        addr: state.id.clone(),
180        region: state
181            .region
182            .clone()
183            .unwrap_or_else(|| "unknown".to_string()),
184        healthy,
185        lag_ms,
186        last_applied_lsn: state.last_acked_lsn,
187        // Surface the replica's re-bootstrap state so the consumer's
188        // routing table can exclude it from causal reads (issue #837).
189        rebootstrapping: state.rebootstrapping,
190    }
191}
192
193fn is_healthy(state: &ReplicaState, lag: &LagConfig) -> bool {
194    let last_seen = state.last_seen_at_unix_ms;
195    if lag.now_unix_ms < last_seen {
196        // Clock skew — treat the most recent ack as fresh rather
197        // than flag a healthy replica as stale.
198        return true;
199    }
200    (lag.now_unix_ms - last_seen) <= lag.replica_timeout_ms
201}
202
203fn compute_lag_ms(state: &ReplicaState, primary_lsn: u64, lag: &LagConfig) -> u32 {
204    let lag_records = primary_lsn.saturating_sub(state.last_acked_lsn);
205    if lag_records == 0 {
206        return 0;
207    }
208    let Some(rate) = lag.records_per_ms else {
209        // Issue spec: not estimable → u32::MAX.
210        return u32::MAX;
211    };
212    if rate <= 0.0 || !rate.is_finite() {
213        return u32::MAX;
214    }
215    let ms = (lag_records as f64) / rate;
216    if !ms.is_finite() || ms < 0.0 {
217        return u32::MAX;
218    }
219    if ms >= u32::MAX as f64 {
220        return u32::MAX;
221    }
222    ms.round() as u32
223}
224
225// ---------------------------------------------------------------
226// Tests
227// ---------------------------------------------------------------
228
229#[cfg(test)]
230mod tests {
231    use super::*;
232    use crate::auth::middleware::AuthSource;
233    use crate::auth::Role;
234
235    // -----------------------------------------------------------
236    // Auth-context fixtures: the four canonical principals from
237    // ADR 0008 (anonymous, tenant, operator, admin). The gate is
238    // role-blind today — ADR §1 says one capability, not a role
239    // ladder — so all three authenticated principals collapse to
240    // "has capability". We still keep the fixtures distinct so
241    // future capability-engine integration can override per-role
242    // behaviour without rewriting the test matrix.
243    // -----------------------------------------------------------
244
245    fn anonymous() -> AuthResult {
246        AuthResult::Anonymous
247    }
248
249    fn tenant() -> AuthResult {
250        AuthResult::Authenticated {
251            username: "tenant-alice".into(),
252            role: Role::Read,
253            source: AuthSource::Password,
254        }
255    }
256
257    fn operator() -> AuthResult {
258        AuthResult::Authenticated {
259            username: "operator-bob".into(),
260            role: Role::Write,
261            source: AuthSource::Password,
262        }
263    }
264
265    fn admin() -> AuthResult {
266        AuthResult::Authenticated {
267            username: "admin-root".into(),
268            role: Role::Admin,
269            source: AuthSource::Password,
270        }
271    }
272
273    fn primary_ep() -> Endpoint {
274        Endpoint {
275            addr: "primary.example.com:5050".into(),
276            region: "us-east-1".into(),
277        }
278    }
279
280    fn replica(id: &str, region: Option<&str>, last_seen_offset_ms: i128) -> ReplicaState {
281        // last_seen_offset_ms is measured against `lag_now_ms()` —
282        // negative values mean "older than now", positive means
283        // "in the future" (clock skew test).
284        let now = lag_now_ms();
285        let last_seen = (now as i128 + last_seen_offset_ms).max(0) as u128;
286        ReplicaState {
287            id: id.to_string(),
288            last_acked_lsn: 100,
289            last_sent_lsn: 100,
290            last_durable_lsn: 100,
291            apply_error_count: 0,
292            divergence_count: 0,
293            connected_at_unix_ms: now,
294            last_seen_at_unix_ms: last_seen,
295            region: region.map(String::from),
296            rebootstrapping: false,
297        }
298    }
299
300    fn lag_now_ms() -> u128 {
301        // Pinned clock so health computations are deterministic.
302        1_700_000_000_000
303    }
304
305    fn lag_default() -> LagConfig {
306        LagConfig {
307            replica_timeout_ms: DEFAULT_REPLICA_TIMEOUT_MS,
308            records_per_ms: None,
309            now_unix_ms: lag_now_ms(),
310        }
311    }
312
313    // -----------------------------------------------------------
314    // Auth gate (predicate-only, separate from the advertiser).
315    // -----------------------------------------------------------
316
317    #[test]
318    fn topology_advertiser_gate_allows_authenticated() {
319        assert!(TopologyAuthGate::allows(&tenant()));
320        assert!(TopologyAuthGate::allows(&operator()));
321        assert!(TopologyAuthGate::allows(&admin()));
322    }
323
324    #[test]
325    fn topology_advertiser_gate_blocks_anonymous_and_denied() {
326        assert!(!TopologyAuthGate::allows(&anonymous()));
327        assert!(!TopologyAuthGate::allows(&AuthResult::Denied(
328            "nope".into()
329        )));
330    }
331
332    // -----------------------------------------------------------
333    // Auth × registry-shape matrix (issue spec §"Tests" first
334    // bullet). 4 principals × 3 shapes (empty, 1 replica,
335    // multi-region).
336    // -----------------------------------------------------------
337
338    fn shape_empty() -> Vec<ReplicaState> {
339        Vec::new()
340    }
341
342    fn shape_one() -> Vec<ReplicaState> {
343        vec![replica("replica-a:5050", Some("us-east-1"), -100)]
344    }
345
346    fn shape_multi_region() -> Vec<ReplicaState> {
347        vec![
348            replica("replica-a:5050", Some("us-east-1"), -100),
349            replica("replica-b:5050", Some("us-west-2"), -200),
350            replica("replica-c:5050", Some("eu-central-1"), -300),
351        ]
352    }
353
354    #[test]
355    fn topology_advertiser_anonymous_gets_primary_only() {
356        // ADR 0008 §3: every shape collapses to primary-only for
357        // the unauthenticated caller — including the multi-region
358        // case where the disclosure-leak risk is highest.
359        for shape in [shape_empty(), shape_one(), shape_multi_region()] {
360            let topo = TopologyAdvertiser::advertise(
361                &shape,
362                &anonymous(),
363                42,
364                primary_ep(),
365                500,
366                &lag_default(),
367            );
368            assert_eq!(topo.epoch, 42);
369            assert_eq!(topo.primary, primary_ep());
370            assert!(
371                topo.replicas.is_empty(),
372                "anonymous must never see replicas, got {:?}",
373                topo.replicas
374            );
375        }
376    }
377
378    #[test]
379    fn topology_advertiser_authenticated_gets_full_list() {
380        // All three authenticated principals see every replica —
381        // the gate is capability-driven, not role-driven (ADR §1).
382        for ctx in [tenant(), operator(), admin()] {
383            let topo = TopologyAdvertiser::advertise(
384                &shape_multi_region(),
385                &ctx,
386                7,
387                primary_ep(),
388                500,
389                &lag_default(),
390            );
391            assert_eq!(topo.epoch, 7);
392            assert_eq!(topo.replicas.len(), 3);
393            let regions: Vec<&str> = topo.replicas.iter().map(|r| r.region.as_str()).collect();
394            assert!(regions.contains(&"us-east-1"));
395            assert!(regions.contains(&"us-west-2"));
396            assert!(regions.contains(&"eu-central-1"));
397        }
398    }
399
400    #[test]
401    fn topology_advertiser_authenticated_empty_registry_returns_no_replicas() {
402        let topo = TopologyAdvertiser::advertise(
403            &shape_empty(),
404            &admin(),
405            1,
406            primary_ep(),
407            0,
408            &lag_default(),
409        );
410        assert!(topo.replicas.is_empty());
411        assert_eq!(topo.primary, primary_ep());
412    }
413
414    #[test]
415    fn topology_advertiser_denied_collapses_to_primary_only() {
416        let topo = TopologyAdvertiser::advertise(
417            &shape_multi_region(),
418            &AuthResult::Denied("invalid token".into()),
419            9,
420            primary_ep(),
421            500,
422            &lag_default(),
423        );
424        assert!(topo.replicas.is_empty());
425    }
426
427    // -----------------------------------------------------------
428    // Health: ack window flips healthy.
429    // -----------------------------------------------------------
430
431    #[test]
432    fn topology_advertiser_recent_ack_is_healthy() {
433        let mut shape = shape_one();
434        shape[0].last_seen_at_unix_ms = lag_now_ms() - 100;
435        let topo =
436            TopologyAdvertiser::advertise(&shape, &admin(), 1, primary_ep(), 100, &lag_default());
437        assert!(topo.replicas[0].healthy);
438    }
439
440    #[test]
441    fn topology_advertiser_stale_ack_is_unhealthy() {
442        let mut shape = shape_one();
443        // Older than the timeout — flip to unhealthy.
444        shape[0].last_seen_at_unix_ms = lag_now_ms() - DEFAULT_REPLICA_TIMEOUT_MS - 1;
445        let topo =
446            TopologyAdvertiser::advertise(&shape, &admin(), 1, primary_ep(), 100, &lag_default());
447        assert!(!topo.replicas[0].healthy);
448    }
449
450    // -----------------------------------------------------------
451    // Lag: degrades gracefully to u32::MAX when no estimate.
452    // -----------------------------------------------------------
453
454    #[test]
455    fn topology_advertiser_lag_unknown_reports_u32_max() {
456        let mut shape = shape_one();
457        shape[0].last_acked_lsn = 50;
458        let topo =
459            TopologyAdvertiser::advertise(&shape, &admin(), 1, primary_ep(), 500, &lag_default());
460        assert_eq!(topo.replicas[0].lag_ms, u32::MAX);
461    }
462
463    #[test]
464    fn topology_advertiser_lag_zero_when_replica_caught_up() {
465        let mut shape = shape_one();
466        shape[0].last_acked_lsn = 500;
467        let topo =
468            TopologyAdvertiser::advertise(&shape, &admin(), 1, primary_ep(), 500, &lag_default());
469        assert_eq!(topo.replicas[0].lag_ms, 0);
470    }
471
472    #[test]
473    fn topology_advertiser_lag_uses_progress_estimate_when_provided() {
474        let mut shape = shape_one();
475        shape[0].last_acked_lsn = 400;
476        let lag = LagConfig {
477            records_per_ms: Some(10.0), // 10 records/ms → 100 records = 10 ms
478            ..lag_default()
479        };
480        let topo = TopologyAdvertiser::advertise(&shape, &admin(), 1, primary_ep(), 500, &lag);
481        assert_eq!(topo.replicas[0].lag_ms, 10);
482    }
483
484    #[test]
485    fn topology_advertiser_lag_zero_rate_falls_back_to_u32_max() {
486        // A degenerate rate (0 or negative) cannot produce a finite
487        // ms estimate; the advertiser must not divide by zero.
488        let mut shape = shape_one();
489        shape[0].last_acked_lsn = 50;
490        let lag = LagConfig {
491            records_per_ms: Some(0.0),
492            ..lag_default()
493        };
494        let topo = TopologyAdvertiser::advertise(&shape, &admin(), 1, primary_ep(), 500, &lag);
495        assert_eq!(topo.replicas[0].lag_ms, u32::MAX);
496    }
497
498    // -----------------------------------------------------------
499    // Epoch monotonicity: the advertiser is a pure function of the
500    // epoch the caller passes — registry-change accounting belongs
501    // to PrimaryReplication. The spec's "register, unregister, ack
502    // timeout flipping healthy" assertions reduce to "the caller
503    // is expected to bump the epoch on those events; the advertiser
504    // surfaces what it gets". We pin the contract here so a future
505    // refactor doesn't accidentally swallow the epoch.
506    // -----------------------------------------------------------
507
508    #[test]
509    fn topology_advertiser_propagates_epoch_verbatim() {
510        for epoch in [0, 1, 42, u64::MAX] {
511            let topo = TopologyAdvertiser::advertise(
512                &shape_one(),
513                &admin(),
514                epoch,
515                primary_ep(),
516                100,
517                &lag_default(),
518            );
519            assert_eq!(topo.epoch, epoch);
520        }
521    }
522
523    // -----------------------------------------------------------
524    // Both transports invoke the advertiser; bytes match the
525    // canonical encoder (#166). This is a server-internal sanity
526    // check — the byte-for-byte round-trip across transports is
527    // pinned by `reddb-grpc-proto::topology_tests` and
528    // `reddb-wire::topology::tests`.
529    // -----------------------------------------------------------
530
531    #[test]
532    fn topology_advertiser_output_round_trips_through_canonical_encoder() {
533        use reddb_wire::topology::{decode_topology, encode_topology};
534        let topo = TopologyAdvertiser::advertise(
535            &shape_multi_region(),
536            &admin(),
537            13,
538            primary_ep(),
539            500,
540            &lag_default(),
541        );
542        let bytes = encode_topology(&topo);
543        let decoded = decode_topology(&bytes).expect("decode").expect("v1");
544        assert_eq!(decoded, topo);
545    }
546
547    #[test]
548    fn topology_advertiser_output_round_trips_through_hello_ack_wrapper() {
549        use reddb_wire::topology::{decode_topology_from_hello_ack, encode_topology_for_hello_ack};
550        let topo = TopologyAdvertiser::advertise(
551            &shape_multi_region(),
552            &operator(),
553            21,
554            primary_ep(),
555            500,
556            &lag_default(),
557        );
558        let field = encode_topology_for_hello_ack(&topo);
559        let decoded = decode_topology_from_hello_ack(&field)
560            .expect("decode")
561            .expect("v1");
562        assert_eq!(decoded, topo);
563    }
564}