Skip to main content

reddb_client/
topology.rs

1//! Client-side topology consumer (issue #168, ADR 0008).
2//!
3//! Parses the canonical [`reddb_wire::topology::Topology`] payload —
4//! delivered either as raw gRPC bytes or as a base64-wrapped string
5//! field inside a RedWire HelloAck JSON envelope — and projects it
6//! into a [`ClusterMembership`] structure that downstream routing
7//! can read without caring about the wire encoding.
8//!
9//! # Merge rule (ADR 0008 §2)
10//!
11//! The seed URI a caller passes to `Reddb::connect("grpc://a,b,c")`
12//! is a *hint*, not a constraint. When the server advertises a
13//! topology:
14//!
15//! * The advertised primary always wins. The seed primary is
16//!   discarded.
17//! * Replicas advertised by the server make the cut. Each carries
18//!   the server's metadata (`region`, `healthy`, `lag_ms`,
19//!   `last_applied_lsn`).
20//! * Replicas listed in the seed URI but absent from the
21//!   advertisement are dropped — the operator decommissioned that
22//!   node and the seed is stale.
23//! * Replicas in both lists are kept; advertised metadata wins on
24//!   any field collision.
25//!
26//! # Refresh contract
27//!
28//! [`TopologyConsumer::should_refresh`] short-circuits when the
29//! observed epoch matches the current one. A higher-level driver
30//! (the future `HealthAwareRouter` in lane Q) is expected to:
31//!
32//! * Poll the [`Topology`] RPC at a configured interval (default
33//!   30s — see [`DEFAULT_REFRESH_INTERVAL`]).
34//! * Force a refresh on the next call after a connection-level
35//!   error, regardless of timer state.
36//! * Skip the refresh when the previously observed epoch matches.
37//!
38//! [`RefreshScheduler`] captures the first two pieces with a
39//! pluggable clock so the 30s interval is testable without real
40//! waits.
41//!
42//! # Forward-compat (ADR 0008 §4)
43//!
44//! Unknown wire version tags and malformed base64 are *not* errors;
45//! they collapse to "fall back to URI-only routing" by surfacing a
46//! [`ConsumeError::UnknownVersion`] / [`ConsumeError::MalformedEnvelope`]
47//! that callers downgrade with a one-line warning. Structurally
48//! malformed bodies (truncated, bad UTF-8, oversized strings) bubble
49//! up as typed [`ConsumeError`] variants — never panics.
50
51use std::time::Duration;
52
53use reddb_wire::topology::{
54    self as wire, decode_topology, Endpoint as WireEndpoint, ReplicaInfo, Topology,
55};
56
57/// Default refresh interval for the topology poll loop. ADR 0008
58/// §1 picks 30s as the conservative default; operators can override
59/// per-deployment via [`RefreshScheduler::with_interval`].
60pub const DEFAULT_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
61
62/// Seed addresses extracted from the connection URI.
63///
64/// `primary` is the host the caller dialled first. `replicas` is the
65/// optional comma-separated tail (`grpc://a,b,c`). Both are kept as
66/// the raw connection-string strings so the merge rule can match
67/// them against advertised endpoint addresses cheaply.
68#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct UriSeed {
70    pub primary: String,
71    pub replicas: Vec<String>,
72}
73
74impl UriSeed {
75    /// Single-host seed (no replicas listed in the URI).
76    pub fn single(primary: impl Into<String>) -> Self {
77        Self {
78            primary: primary.into(),
79            replicas: Vec::new(),
80        }
81    }
82
83    /// Multi-host seed straight from a parsed `grpc://a,b,c` URI.
84    pub fn cluster(primary: impl Into<String>, replicas: Vec<String>) -> Self {
85        Self {
86            primary: primary.into(),
87            replicas,
88        }
89    }
90}
91
92/// The merged, route-ready view of the cluster.
93///
94/// The fields are the wire-canonical types from `reddb-wire` so a
95/// future router can read them without translating shapes again.
96#[derive(Debug, Clone, PartialEq, Eq)]
97pub struct ClusterMembership {
98    pub primary: WireEndpoint,
99    pub replicas: Vec<ReplicaInfo>,
100    pub epoch: u64,
101}
102
103/// Decode + merge errors. The unknown-version and malformed-envelope
104/// variants are recoverable: the caller is expected to log a warning
105/// and fall back to URI-only routing (ADR 0008 §4). The structural
106/// variants (truncated, bad UTF-8, oversize strings) indicate a
107/// genuinely broken peer.
108#[derive(Debug, Clone, PartialEq, Eq)]
109pub enum ConsumeError {
110    /// Buffer shorter than the 5-byte version+length header.
111    Truncated,
112    /// Header declared more body bytes than the buffer carries.
113    BodyLengthMismatch { declared: u32, available: usize },
114    /// A length-prefixed string field was not valid UTF-8.
115    InvalidUtf8,
116    /// A length-prefixed string declared more bytes than the body
117    /// has remaining.
118    StringTooLong { declared: u32, remaining: usize },
119    /// Recognised header but the version tag is past
120    /// [`wire::MAX_KNOWN_TOPOLOGY_VERSION`]. Recoverable: drop the
121    /// advertisement, fall back to URI-only routing.
122    UnknownVersion,
123    /// HelloAck `topology` field was not valid base64. Recoverable
124    /// in the same way as [`Self::UnknownVersion`].
125    MalformedEnvelope,
126}
127
128impl std::fmt::Display for ConsumeError {
129    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130        match self {
131            Self::Truncated => write!(f, "topology blob truncated"),
132            Self::BodyLengthMismatch {
133                declared,
134                available,
135            } => write!(
136                f,
137                "topology body length mismatch (declared {declared}, available {available})"
138            ),
139            Self::InvalidUtf8 => write!(f, "topology string field is not valid UTF-8"),
140            Self::StringTooLong {
141                declared,
142                remaining,
143            } => write!(
144                f,
145                "topology string length {declared} exceeds remaining body {remaining}"
146            ),
147            Self::UnknownVersion => write!(
148                f,
149                "topology wire version tag past MAX_KNOWN_TOPOLOGY_VERSION; falling back to URI-only routing"
150            ),
151            Self::MalformedEnvelope => write!(
152                f,
153                "topology envelope (HelloAck base64) is malformed; falling back to URI-only routing"
154            ),
155        }
156    }
157}
158
159impl std::error::Error for ConsumeError {}
160
161impl ConsumeError {
162    /// True when the caller should downgrade to URI-only routing
163    /// with a one-line warning (ADR 0008 §4) rather than treat the
164    /// error as a hard failure.
165    pub fn is_recoverable(&self) -> bool {
166        matches!(self, Self::UnknownVersion | Self::MalformedEnvelope)
167    }
168}
169
170impl From<wire::TopologyError> for ConsumeError {
171    fn from(e: wire::TopologyError) -> Self {
172        match e {
173            wire::TopologyError::Truncated => Self::Truncated,
174            wire::TopologyError::BodyLengthMismatch {
175                declared,
176                available,
177            } => Self::BodyLengthMismatch {
178                declared,
179                available,
180            },
181            wire::TopologyError::InvalidUtf8 => Self::InvalidUtf8,
182            wire::TopologyError::StringTooLong {
183                declared,
184                remaining,
185            } => Self::StringTooLong {
186                declared,
187                remaining,
188            },
189        }
190    }
191}
192
193/// Stateless deep module — entry points are associated functions so
194/// the routing driver can hold a `&Self` without capturing any state
195/// the consumer doesn't actually need to keep across calls. State
196/// (current epoch, last refresh) lives on the driver, not here.
197#[derive(Debug, Default)]
198pub struct TopologyConsumer;
199
200impl TopologyConsumer {
201    /// Apply the merge rule from ADR 0008 §2 against an already-
202    /// decoded payload. Pure, infallible, no I/O.
203    pub fn consume(payload: Topology, uri_seed: Option<UriSeed>) -> ClusterMembership {
204        let Topology {
205            epoch,
206            primary,
207            replicas,
208        } = payload;
209
210        // Merge: advertised replicas win on metadata; URI-only
211        // replicas are dropped (decommissioned). Membership we emit
212        // is exactly the advertised set, in advertised order — the
213        // seed only acted as a hint for the *initial* dial.
214        //
215        // We still walk `uri_seed` to keep the merge contract
216        // explicit: if a future variant of this rule wants to
217        // surface "URI replica X was dropped because it isn't in
218        // the advertisement", this is the spot. Today the merge is
219        // a no-op on the seed and we just keep the advertised list.
220        let _ = uri_seed;
221
222        ClusterMembership {
223            primary,
224            replicas,
225            epoch,
226        }
227    }
228
229    /// Decode raw canonical bytes (gRPC `TopologyReply.topology_bytes`)
230    /// and apply the merge.
231    ///
232    /// Recoverable variants (`UnknownVersion`) are surfaced as errors
233    /// for the caller to log; the caller is expected to fall back to
234    /// URI-only routing.
235    pub fn consume_bytes(
236        bytes: &[u8],
237        uri_seed: Option<UriSeed>,
238    ) -> Result<ClusterMembership, ConsumeError> {
239        match decode_topology(bytes)? {
240            Some(t) => Ok(Self::consume(t, uri_seed)),
241            None => Err(ConsumeError::UnknownVersion),
242        }
243    }
244
245    /// Decode the base64-wrapped HelloAck `topology` field and apply
246    /// the merge. Mirrors the gRPC path so a router can drive both
247    /// transports through one code path.
248    pub fn consume_hello_ack(
249        field: &str,
250        uri_seed: Option<UriSeed>,
251    ) -> Result<ClusterMembership, ConsumeError> {
252        // `decode_topology_from_hello_ack` collapses both "bad
253        // base64" and "unknown version tag" into `Ok(None)`. We
254        // can't tell them apart from here, so the recoverable
255        // variant we surface is the union — `MalformedEnvelope` —
256        // when the base64 layer rejected the input. To distinguish,
257        // we first try the base64 decode ourselves: if it succeeds
258        // and `decode_topology` reports unknown version, surface
259        // `UnknownVersion`; if base64 itself failed, surface
260        // `MalformedEnvelope`.
261        match decode_base64(field) {
262            None => Err(ConsumeError::MalformedEnvelope),
263            Some(bytes) => Self::consume_bytes(&bytes, uri_seed),
264        }
265        // (We deliberately don't call `decode_topology_from_hello_ack`
266        // here even though it exists — splitting the two stages lets
267        // us surface a precise recoverable variant.)
268    }
269
270    /// Refresh decision: skip when the observed epoch matches the
271    /// epoch we already applied. Strictly-greater is the canonical
272    /// "advance" condition; a lower observed epoch is treated as
273    /// stale and *also* skipped (the refresh loop will see the next
274    /// poll's payload).
275    ///
276    /// The refresh loop calls this with the raw observed epoch from
277    /// the just-decoded payload. Connection-level errors are out of
278    /// scope here — they force a refresh through a different
279    /// codepath ([`RefreshScheduler::force_now`]).
280    pub fn should_refresh(current_epoch: u64, observed_epoch: u64) -> bool {
281        observed_epoch > current_epoch
282    }
283}
284
285// --------------------------------------------------------------
286// Refresh scheduling — pluggable clock so the 30s interval is
287// testable without real waits.
288// --------------------------------------------------------------
289
290/// Trait the [`RefreshScheduler`] reads time from. The production
291/// impl reads `std::time::Instant::now()`; tests inject a
292/// monotonic-counter fake.
293pub trait Clock: std::fmt::Debug {
294    fn now_monotonic_ms(&self) -> u64;
295}
296
297/// Default real-time clock. Hides the `Instant` epoch so the trait
298/// stays `dyn`-friendly.
299#[derive(Debug)]
300pub struct SystemClock {
301    origin: std::time::Instant,
302}
303
304impl Default for SystemClock {
305    fn default() -> Self {
306        Self {
307            origin: std::time::Instant::now(),
308        }
309    }
310}
311
312impl Clock for SystemClock {
313    fn now_monotonic_ms(&self) -> u64 {
314        self.origin.elapsed().as_millis() as u64
315    }
316}
317
318/// Drives the periodic-refresh + force-on-error rule.
319///
320/// Owns the "should I refresh now?" decision; the actual RPC dispatch
321/// is the higher-level driver's job. Keeping this state machine
322/// isolated lets the 30s interval get tested without sleeping.
323#[derive(Debug)]
324pub struct RefreshScheduler {
325    interval: Duration,
326    clock: Box<dyn Clock + Send + Sync>,
327    last_refresh_ms: Option<u64>,
328    /// Force flag set by [`Self::force_now`]; cleared the next time
329    /// [`Self::should_refresh_now`] returns true.
330    force: bool,
331}
332
333impl RefreshScheduler {
334    /// Build a scheduler with the default 30s interval and the real
335    /// system clock.
336    pub fn new() -> Self {
337        Self::with_interval(DEFAULT_REFRESH_INTERVAL)
338    }
339
340    /// Build a scheduler with a custom interval and the real system
341    /// clock.
342    pub fn with_interval(interval: Duration) -> Self {
343        Self::with_interval_and_clock(interval, Box::new(SystemClock::default()))
344    }
345
346    /// Build a scheduler with a custom interval *and* clock — the
347    /// hook tests inject a fake clock through.
348    pub fn with_interval_and_clock(
349        interval: Duration,
350        clock: Box<dyn Clock + Send + Sync>,
351    ) -> Self {
352        Self {
353            interval,
354            clock,
355            last_refresh_ms: None,
356            force: false,
357        }
358    }
359
360    /// Poll-loop hook: call once per loop iteration (or before each
361    /// dispatch). Returns true when a refresh is due.
362    ///
363    /// On `true`, the caller should dispatch the [`Topology`] RPC,
364    /// run [`TopologyConsumer::consume_bytes`], and call
365    /// [`Self::mark_refreshed`] with the resulting epoch.
366    pub fn should_refresh_now(&mut self) -> bool {
367        if self.force {
368            self.force = false;
369            return true;
370        }
371        let now = self.clock.now_monotonic_ms();
372        let interval_ms = self.interval.as_millis() as u64;
373        match self.last_refresh_ms {
374            None => true,
375            Some(last) => now.saturating_sub(last) >= interval_ms,
376        }
377    }
378
379    /// Stamp the last successful refresh. The next
380    /// [`Self::should_refresh_now`] returns true once
381    /// `interval` has elapsed.
382    pub fn mark_refreshed(&mut self) {
383        self.last_refresh_ms = Some(self.clock.now_monotonic_ms());
384    }
385
386    /// Set the force flag — the next call to
387    /// [`Self::should_refresh_now`] returns true regardless of
388    /// the timer. Used by the routing driver after a connection-
389    /// level error.
390    pub fn force_now(&mut self) {
391        self.force = true;
392    }
393}
394
395impl Default for RefreshScheduler {
396    fn default() -> Self {
397        Self::new()
398    }
399}
400
401// --------------------------------------------------------------
402// Internal: minimal base64 decoder reused so we can split the
403// "bad base64" vs "bad version" recoverable error variants.
404// Mirrors the alphabet used by the wire encoder. Kept private —
405// the wire crate exposes its own; this is a paste-equivalent so
406// we don't widen `reddb-wire`'s public surface for one branch.
407// --------------------------------------------------------------
408
409fn decode_base64(input: &str) -> Option<Vec<u8>> {
410    let trimmed = input.trim_end_matches('=');
411    let mut out = Vec::with_capacity(trimmed.len() * 3 / 4);
412    let mut buf = 0u32;
413    let mut bits = 0u8;
414    for ch in trimmed.bytes() {
415        let v: u32 = match ch {
416            b'A'..=b'Z' => (ch - b'A') as u32,
417            b'a'..=b'z' => (ch - b'a' + 26) as u32,
418            b'0'..=b'9' => (ch - b'0' + 52) as u32,
419            b'+' => 62,
420            b'/' => 63,
421            _ => return None,
422        };
423        buf = (buf << 6) | v;
424        bits += 6;
425        if bits >= 8 {
426            bits -= 8;
427            out.push(((buf >> bits) & 0xFF) as u8);
428        }
429    }
430    Some(out)
431}
432
433#[cfg(test)]
434mod tests {
435    use super::*;
436    use reddb_wire::topology::{
437        encode_topology, encode_topology_for_hello_ack, Endpoint as WireEndpoint, ReplicaInfo,
438        Topology, TOPOLOGY_HEADER_SIZE, TOPOLOGY_WIRE_VERSION_V1,
439    };
440
441    fn fixture() -> Topology {
442        Topology {
443            epoch: 7,
444            primary: WireEndpoint {
445                addr: "primary.example.com:5050".into(),
446                region: "us-east-1".into(),
447            },
448            replicas: vec![
449                ReplicaInfo {
450                    addr: "replica-a.example.com:5050".into(),
451                    region: "us-east-1".into(),
452                    healthy: true,
453                    lag_ms: 12,
454                    last_applied_lsn: 4242,
455                },
456                ReplicaInfo {
457                    addr: "replica-b.example.com:5050".into(),
458                    region: "us-west-2".into(),
459                    healthy: false,
460                    lag_ms: 999,
461                    last_applied_lsn: 4100,
462                },
463            ],
464        }
465    }
466
467    // ---- round-trip on both transports ----
468
469    #[test]
470    fn parse_round_trip_grpc_bytes() {
471        let t = fixture();
472        let bytes = encode_topology(&t);
473        let m = TopologyConsumer::consume_bytes(&bytes, None).expect("consume");
474        assert_eq!(m.epoch, 7);
475        assert_eq!(m.primary, t.primary);
476        assert_eq!(m.replicas, t.replicas);
477    }
478
479    #[test]
480    fn parse_round_trip_hello_ack_field() {
481        let t = fixture();
482        let field = encode_topology_for_hello_ack(&t);
483        let m = TopologyConsumer::consume_hello_ack(&field, None).expect("consume");
484        assert_eq!(m.epoch, 7);
485        assert_eq!(m.primary, t.primary);
486        assert_eq!(m.replicas, t.replicas);
487    }
488
489    #[test]
490    fn fixture_byte_stable_across_runs() {
491        // Acceptance: same Topology fixture round-trips byte-stable
492        // through the canonical encoder, so both transports carry
493        // identical bytes (#166 §4 already pinned this; here we
494        // assert the consumer doesn't perturb it).
495        let t = fixture();
496        let a = encode_topology(&t);
497        let b = encode_topology(&t);
498        assert_eq!(a, b);
499        // And the inner bytes recovered from the HelloAck base64
500        // wrapper match the gRPC bytes byte-for-byte.
501        let field = encode_topology_for_hello_ack(&t);
502        let recovered = decode_base64(&field).expect("base64");
503        assert_eq!(recovered, a);
504    }
505
506    // ---- merge rule ----
507
508    #[test]
509    fn merge_uri_only_replicas_dropped() {
510        // URI lists three replicas; advertisement only carries two.
511        // The third (URI-only) must be dropped — operator
512        // decommissioned it.
513        let t = fixture();
514        let seed = UriSeed::cluster(
515            "primary.example.com:5050".to_string(),
516            vec![
517                "replica-a.example.com:5050".into(),
518                "replica-b.example.com:5050".into(),
519                "replica-stale.example.com:5050".into(),
520            ],
521        );
522        let m = TopologyConsumer::consume(t.clone(), Some(seed));
523        assert_eq!(m.replicas.len(), 2, "URI-only replica must be dropped");
524        assert!(
525            m.replicas
526                .iter()
527                .all(|r| r.addr != "replica-stale.example.com:5050"),
528            "stale URI replica must not appear in membership"
529        );
530    }
531
532    #[test]
533    fn merge_advertised_only_replicas_added() {
534        // URI lists no replicas; advertisement carries two. Both
535        // must show up in the merged membership — URI is a hint,
536        // not a constraint.
537        let t = fixture();
538        let seed = UriSeed::single("primary.example.com:5050");
539        let m = TopologyConsumer::consume(t.clone(), Some(seed));
540        assert_eq!(m.replicas.len(), 2);
541        assert_eq!(m.replicas, t.replicas);
542    }
543
544    #[test]
545    fn merge_intersection_uses_advertised_metadata() {
546        // URI replica matches an advertised replica. The merged
547        // membership must carry the *advertised* metadata
548        // (region, healthy, lag_ms, last_applied_lsn), not anything
549        // synthesised from the URI.
550        let t = fixture();
551        let seed = UriSeed::cluster(
552            "primary.example.com:5050".to_string(),
553            vec!["replica-a.example.com:5050".into()],
554        );
555        let m = TopologyConsumer::consume(t.clone(), Some(seed));
556        let merged_a = m
557            .replicas
558            .iter()
559            .find(|r| r.addr == "replica-a.example.com:5050")
560            .expect("advertised replica must be present");
561        assert_eq!(merged_a.region, "us-east-1");
562        assert!(merged_a.healthy);
563        assert_eq!(merged_a.lag_ms, 12);
564        assert_eq!(merged_a.last_applied_lsn, 4242);
565    }
566
567    #[test]
568    fn merge_with_no_seed_keeps_full_advertisement() {
569        let t = fixture();
570        let m = TopologyConsumer::consume(t.clone(), None);
571        assert_eq!(m.primary, t.primary);
572        assert_eq!(m.replicas, t.replicas);
573        assert_eq!(m.epoch, t.epoch);
574    }
575
576    // ---- refresh decision ----
577
578    #[test]
579    fn should_refresh_skips_same_epoch() {
580        assert!(!TopologyConsumer::should_refresh(7, 7));
581    }
582
583    #[test]
584    fn should_refresh_advances_on_higher_epoch() {
585        assert!(TopologyConsumer::should_refresh(7, 8));
586    }
587
588    #[test]
589    fn should_refresh_treats_lower_epoch_as_stale() {
590        // A lower observed epoch means the peer is behind us. We
591        // skip — the next poll picks up the canonical advancement.
592        assert!(!TopologyConsumer::should_refresh(7, 6));
593    }
594
595    // ---- malformed / adversarial fixtures ----
596
597    #[test]
598    fn malformed_truncated_blob_returns_typed_error() {
599        let err = TopologyConsumer::consume_bytes(&[0x01, 0x00], None).unwrap_err();
600        assert!(matches!(err, ConsumeError::Truncated));
601        assert!(!err.is_recoverable());
602    }
603
604    #[test]
605    fn malformed_body_length_mismatch_returns_typed_error() {
606        let bytes = vec![0x01, 0xFF, 0xFF, 0xFF, 0xFF, 0x00];
607        let err = TopologyConsumer::consume_bytes(&bytes, None).unwrap_err();
608        assert!(matches!(err, ConsumeError::BodyLengthMismatch { .. }));
609        assert!(!err.is_recoverable());
610    }
611
612    #[test]
613    fn unknown_version_tag_is_recoverable() {
614        // ADR 0008 §4: forward-compat. An unknown wire version tag
615        // collapses to "fall back to URI-only routing", surfaced as
616        // a recoverable typed error so the caller can log a one-line
617        // warning before downgrading.
618        let mut bytes = encode_topology(&fixture());
619        bytes[0] = 0xFE; // past MAX_KNOWN_TOPOLOGY_VERSION
620        let err = TopologyConsumer::consume_bytes(&bytes, None).unwrap_err();
621        assert!(matches!(err, ConsumeError::UnknownVersion));
622        assert!(err.is_recoverable());
623    }
624
625    #[test]
626    fn malformed_envelope_base64_is_recoverable() {
627        // Bad base64 in the HelloAck `topology` field. Same posture
628        // as an unknown version tag: drop, fall back, never panic.
629        let err = TopologyConsumer::consume_hello_ack("@not base64@", None).unwrap_err();
630        assert!(matches!(err, ConsumeError::MalformedEnvelope));
631        assert!(err.is_recoverable());
632    }
633
634    #[test]
635    fn oversize_string_field_returns_typed_error() {
636        // Adversarial: stamp a string-length prefix that overruns
637        // the body. The decoder must surface a typed error, not
638        // panic.
639        // Build a v1 body with a primary.addr length way past the
640        // available body bytes.
641        let mut body = Vec::new();
642        body.extend_from_slice(&0u64.to_le_bytes()); // epoch
643                                                     // primary.addr len = 0xFFFF_FFFF (clearly bogus)
644        body.extend_from_slice(&u32::MAX.to_le_bytes());
645        let mut bytes = Vec::new();
646        bytes.push(TOPOLOGY_WIRE_VERSION_V1);
647        bytes.extend_from_slice(&(body.len() as u32).to_le_bytes());
648        bytes.extend_from_slice(&body);
649        assert_eq!(bytes.len(), TOPOLOGY_HEADER_SIZE + body.len());
650        let err = TopologyConsumer::consume_bytes(&bytes, None).unwrap_err();
651        assert!(
652            matches!(err, ConsumeError::StringTooLong { .. }),
653            "expected StringTooLong, got {err:?}"
654        );
655        assert!(!err.is_recoverable());
656    }
657
658    #[test]
659    fn invalid_utf8_string_returns_typed_error() {
660        // Build a v1 body where primary.addr is two bytes of
661        // invalid UTF-8 (0xFF 0xFE).
662        let mut body = Vec::new();
663        body.extend_from_slice(&0u64.to_le_bytes()); // epoch
664        body.extend_from_slice(&2u32.to_le_bytes()); // primary.addr len
665        body.extend_from_slice(&[0xFF, 0xFE]); // bogus utf8
666                                               // The body would normally continue, but the decoder
667                                               // hits invalid utf8 first.
668        let mut bytes = Vec::new();
669        bytes.push(TOPOLOGY_WIRE_VERSION_V1);
670        bytes.extend_from_slice(&(body.len() as u32).to_le_bytes());
671        bytes.extend_from_slice(&body);
672        let err = TopologyConsumer::consume_bytes(&bytes, None).unwrap_err();
673        assert!(
674            matches!(err, ConsumeError::InvalidUtf8),
675            "expected InvalidUtf8, got {err:?}"
676        );
677    }
678
679    #[test]
680    fn consume_does_not_panic_on_any_random_short_buffer() {
681        // Smoke fuzz: short buffers should always either Ok-Some or
682        // typed-Err, never panic.
683        for n in 0..10usize {
684            let bytes = vec![0xAAu8; n];
685            let _ = TopologyConsumer::consume_bytes(&bytes, None);
686        }
687    }
688
689    // ---- fake-clock RefreshScheduler ----
690
691    #[derive(Debug)]
692    struct FakeClock {
693        ms: std::sync::Mutex<u64>,
694    }
695
696    impl FakeClock {
697        fn new() -> Self {
698            Self {
699                ms: std::sync::Mutex::new(0),
700            }
701        }
702        fn advance(&self, by: Duration) {
703            *self.ms.lock().unwrap() += by.as_millis() as u64;
704        }
705    }
706
707    impl Clock for FakeClock {
708        fn now_monotonic_ms(&self) -> u64 {
709            *self.ms.lock().unwrap()
710        }
711    }
712
713    fn scheduler_with(clock: std::sync::Arc<FakeClock>, interval: Duration) -> RefreshScheduler {
714        // The scheduler owns a Box<dyn Clock>; route both the
715        // scheduler and the test handle through an Arc so the test
716        // can advance time without taking the box back.
717        #[derive(Debug)]
718        struct ArcClock(std::sync::Arc<FakeClock>);
719        impl Clock for ArcClock {
720            fn now_monotonic_ms(&self) -> u64 {
721                self.0.now_monotonic_ms()
722            }
723        }
724        RefreshScheduler::with_interval_and_clock(interval, Box::new(ArcClock(clock)))
725    }
726
727    #[test]
728    fn fake_clock_first_call_refreshes_immediately() {
729        let clock = std::sync::Arc::new(FakeClock::new());
730        let mut s = scheduler_with(clock.clone(), Duration::from_secs(30));
731        assert!(s.should_refresh_now(), "first call must refresh");
732    }
733
734    #[test]
735    fn fake_clock_thirty_second_interval_holds_without_real_waits() {
736        let clock = std::sync::Arc::new(FakeClock::new());
737        let mut s = scheduler_with(clock.clone(), Duration::from_secs(30));
738        assert!(s.should_refresh_now());
739        s.mark_refreshed();
740        // Just under 30s: must NOT refresh.
741        clock.advance(Duration::from_millis(29_999));
742        assert!(
743            !s.should_refresh_now(),
744            "must not refresh before interval elapsed"
745        );
746        // Crossing the threshold: must refresh.
747        clock.advance(Duration::from_millis(2));
748        assert!(
749            s.should_refresh_now(),
750            "must refresh once interval has elapsed"
751        );
752    }
753
754    #[test]
755    fn fake_clock_force_now_overrides_interval() {
756        let clock = std::sync::Arc::new(FakeClock::new());
757        let mut s = scheduler_with(clock.clone(), Duration::from_secs(30));
758        assert!(s.should_refresh_now());
759        s.mark_refreshed();
760        // Far below the 30s interval — would normally skip.
761        clock.advance(Duration::from_millis(100));
762        assert!(!s.should_refresh_now());
763        // Connection-level error: force a refresh on the next call.
764        s.force_now();
765        assert!(s.should_refresh_now(), "force_now must override the timer");
766        // Force flag is single-shot: the next call goes back to the
767        // timer (which has not elapsed).
768        s.mark_refreshed();
769        clock.advance(Duration::from_millis(100));
770        assert!(!s.should_refresh_now());
771    }
772
773    #[test]
774    fn default_interval_is_thirty_seconds() {
775        // Sentinel against an accidental rebase that knocks the
776        // documented 30s default.
777        assert_eq!(DEFAULT_REFRESH_INTERVAL, Duration::from_secs(30));
778    }
779}