reddb_client/topology.rs
1//! Client-side topology consumer (issue #168, ADR 0008).
2//!
3//! Parses the canonical [`reddb_wire::topology::Topology`] payload —
4//! delivered either as raw gRPC bytes or as a base64-wrapped string
5//! field inside a RedWire HelloAck JSON envelope — and projects it
6//! into a [`ClusterMembership`] structure that downstream routing
7//! can read without caring about the wire encoding.
8//!
9//! # Merge rule (ADR 0008 §2)
10//!
11//! The seed URI a caller passes to `Reddb::connect("grpc://a,b,c")`
12//! is a *hint*, not a constraint. When the server advertises a
13//! topology:
14//!
15//! * The advertised primary always wins. The seed primary is
16//! discarded.
17//! * Replicas advertised by the server make the cut. Each carries
18//! the server's metadata (`region`, `healthy`, `lag_ms`,
19//! `last_applied_lsn`).
20//! * Replicas listed in the seed URI but absent from the
21//! advertisement are dropped — the operator decommissioned that
22//! node and the seed is stale.
23//! * Replicas in both lists are kept; advertised metadata wins on
24//! any field collision.
25//!
26//! # Refresh contract
27//!
28//! [`TopologyConsumer::should_refresh`] short-circuits when the
29//! observed epoch matches the current one. A higher-level driver
30//! (the future `HealthAwareRouter` in lane Q) is expected to:
31//!
32//! * Poll the [`Topology`] RPC at a configured interval (default
33//! 30s — see [`DEFAULT_REFRESH_INTERVAL`]).
34//! * Force a refresh on the next call after a connection-level
35//! error, regardless of timer state.
36//! * Skip the refresh when the previously observed epoch matches.
37//!
38//! [`RefreshScheduler`] captures the first two pieces with a
39//! pluggable clock so the 30s interval is testable without real
40//! waits.
41//!
42//! # Forward-compat (ADR 0008 §4)
43//!
44//! Unknown wire version tags and malformed base64 are *not* errors;
45//! they collapse to "fall back to URI-only routing" by surfacing a
46//! [`ConsumeError::UnknownVersion`] / [`ConsumeError::MalformedEnvelope`]
47//! that callers downgrade with a one-line warning. Structurally
48//! malformed bodies (truncated, bad UTF-8, oversized strings) bubble
49//! up as typed [`ConsumeError`] variants — never panics.
50
51use std::time::Duration;
52
53use reddb_wire::topology::{
54 self as wire, decode_topology, Endpoint as WireEndpoint, ReplicaInfo, Topology,
55};
56
57/// Default refresh interval for the topology poll loop. ADR 0008
58/// §1 picks 30s as the conservative default; operators can override
59/// per-deployment via [`RefreshScheduler::with_interval`].
60pub const DEFAULT_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
61
62/// Seed addresses extracted from the connection URI.
63///
64/// `primary` is the host the caller dialled first. `replicas` is the
65/// optional comma-separated tail (`grpc://a,b,c`). Both are kept as
66/// the raw connection-string strings so the merge rule can match
67/// them against advertised endpoint addresses cheaply.
68#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct UriSeed {
70 pub primary: String,
71 pub replicas: Vec<String>,
72}
73
74impl UriSeed {
75 /// Single-host seed (no replicas listed in the URI).
76 pub fn single(primary: impl Into<String>) -> Self {
77 Self {
78 primary: primary.into(),
79 replicas: Vec::new(),
80 }
81 }
82
83 /// Multi-host seed straight from a parsed `grpc://a,b,c` URI.
84 pub fn cluster(primary: impl Into<String>, replicas: Vec<String>) -> Self {
85 Self {
86 primary: primary.into(),
87 replicas,
88 }
89 }
90}
91
92/// The merged, route-ready view of the cluster.
93///
94/// The fields are the wire-canonical types from `reddb-wire` so a
95/// future router can read them without translating shapes again.
96#[derive(Debug, Clone, PartialEq, Eq)]
97pub struct ClusterMembership {
98 pub primary: WireEndpoint,
99 pub replicas: Vec<ReplicaInfo>,
100 pub epoch: u64,
101}
102
103/// Decode + merge errors. The unknown-version and malformed-envelope
104/// variants are recoverable: the caller is expected to log a warning
105/// and fall back to URI-only routing (ADR 0008 §4). The structural
106/// variants (truncated, bad UTF-8, oversize strings) indicate a
107/// genuinely broken peer.
108#[derive(Debug, Clone, PartialEq, Eq)]
109pub enum ConsumeError {
110 /// Buffer shorter than the 5-byte version+length header.
111 Truncated,
112 /// Header declared more body bytes than the buffer carries.
113 BodyLengthMismatch { declared: u32, available: usize },
114 /// A length-prefixed string field was not valid UTF-8.
115 InvalidUtf8,
116 /// A length-prefixed string declared more bytes than the body
117 /// has remaining.
118 StringTooLong { declared: u32, remaining: usize },
119 /// Recognised header but the version tag is past
120 /// [`wire::MAX_KNOWN_TOPOLOGY_VERSION`]. Recoverable: drop the
121 /// advertisement, fall back to URI-only routing.
122 UnknownVersion,
123 /// HelloAck `topology` field was not valid base64. Recoverable
124 /// in the same way as [`Self::UnknownVersion`].
125 MalformedEnvelope,
126}
127
128impl std::fmt::Display for ConsumeError {
129 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130 match self {
131 Self::Truncated => write!(f, "topology blob truncated"),
132 Self::BodyLengthMismatch {
133 declared,
134 available,
135 } => write!(
136 f,
137 "topology body length mismatch (declared {declared}, available {available})"
138 ),
139 Self::InvalidUtf8 => write!(f, "topology string field is not valid UTF-8"),
140 Self::StringTooLong {
141 declared,
142 remaining,
143 } => write!(
144 f,
145 "topology string length {declared} exceeds remaining body {remaining}"
146 ),
147 Self::UnknownVersion => write!(
148 f,
149 "topology wire version tag past MAX_KNOWN_TOPOLOGY_VERSION; falling back to URI-only routing"
150 ),
151 Self::MalformedEnvelope => write!(
152 f,
153 "topology envelope (HelloAck base64) is malformed; falling back to URI-only routing"
154 ),
155 }
156 }
157}
158
159impl std::error::Error for ConsumeError {}
160
161impl ConsumeError {
162 /// True when the caller should downgrade to URI-only routing
163 /// with a one-line warning (ADR 0008 §4) rather than treat the
164 /// error as a hard failure.
165 pub fn is_recoverable(&self) -> bool {
166 matches!(self, Self::UnknownVersion | Self::MalformedEnvelope)
167 }
168}
169
170impl From<wire::TopologyError> for ConsumeError {
171 fn from(e: wire::TopologyError) -> Self {
172 match e {
173 wire::TopologyError::Truncated => Self::Truncated,
174 wire::TopologyError::BodyLengthMismatch {
175 declared,
176 available,
177 } => Self::BodyLengthMismatch {
178 declared,
179 available,
180 },
181 wire::TopologyError::InvalidUtf8 => Self::InvalidUtf8,
182 wire::TopologyError::StringTooLong {
183 declared,
184 remaining,
185 } => Self::StringTooLong {
186 declared,
187 remaining,
188 },
189 }
190 }
191}
192
193/// Stateless deep module — entry points are associated functions so
194/// the routing driver can hold a `&Self` without capturing any state
195/// the consumer doesn't actually need to keep across calls. State
196/// (current epoch, last refresh) lives on the driver, not here.
197#[derive(Debug, Default)]
198pub struct TopologyConsumer;
199
200impl TopologyConsumer {
201 /// Apply the merge rule from ADR 0008 §2 against an already-
202 /// decoded payload. Pure, infallible, no I/O.
203 pub fn consume(payload: Topology, uri_seed: Option<UriSeed>) -> ClusterMembership {
204 let Topology {
205 epoch,
206 primary,
207 replicas,
208 } = payload;
209
210 // Merge: advertised replicas win on metadata; URI-only
211 // replicas are dropped (decommissioned). Membership we emit
212 // is exactly the advertised set, in advertised order — the
213 // seed only acted as a hint for the *initial* dial.
214 //
215 // We still walk `uri_seed` to keep the merge contract
216 // explicit: if a future variant of this rule wants to
217 // surface "URI replica X was dropped because it isn't in
218 // the advertisement", this is the spot. Today the merge is
219 // a no-op on the seed and we just keep the advertised list.
220 let _ = uri_seed;
221
222 ClusterMembership {
223 primary,
224 replicas,
225 epoch,
226 }
227 }
228
229 /// Decode raw canonical bytes (gRPC `TopologyReply.topology_bytes`)
230 /// and apply the merge.
231 ///
232 /// Recoverable variants (`UnknownVersion`) are surfaced as errors
233 /// for the caller to log; the caller is expected to fall back to
234 /// URI-only routing.
235 pub fn consume_bytes(
236 bytes: &[u8],
237 uri_seed: Option<UriSeed>,
238 ) -> Result<ClusterMembership, ConsumeError> {
239 match decode_topology(bytes)? {
240 Some(t) => Ok(Self::consume(t, uri_seed)),
241 None => Err(ConsumeError::UnknownVersion),
242 }
243 }
244
245 /// Decode the base64-wrapped HelloAck `topology` field and apply
246 /// the merge. Mirrors the gRPC path so a router can drive both
247 /// transports through one code path.
248 pub fn consume_hello_ack(
249 field: &str,
250 uri_seed: Option<UriSeed>,
251 ) -> Result<ClusterMembership, ConsumeError> {
252 // `decode_topology_from_hello_ack` collapses both "bad
253 // base64" and "unknown version tag" into `Ok(None)`. We
254 // can't tell them apart from here, so the recoverable
255 // variant we surface is the union — `MalformedEnvelope` —
256 // when the base64 layer rejected the input. To distinguish,
257 // we first try the base64 decode ourselves: if it succeeds
258 // and `decode_topology` reports unknown version, surface
259 // `UnknownVersion`; if base64 itself failed, surface
260 // `MalformedEnvelope`.
261 match decode_base64(field) {
262 None => Err(ConsumeError::MalformedEnvelope),
263 Some(bytes) => Self::consume_bytes(&bytes, uri_seed),
264 }
265 // (We deliberately don't call `decode_topology_from_hello_ack`
266 // here even though it exists — splitting the two stages lets
267 // us surface a precise recoverable variant.)
268 }
269
270 /// Refresh decision: skip when the observed epoch matches the
271 /// epoch we already applied. Strictly-greater is the canonical
272 /// "advance" condition; a lower observed epoch is treated as
273 /// stale and *also* skipped (the refresh loop will see the next
274 /// poll's payload).
275 ///
276 /// The refresh loop calls this with the raw observed epoch from
277 /// the just-decoded payload. Connection-level errors are out of
278 /// scope here — they force a refresh through a different
279 /// codepath ([`RefreshScheduler::force_now`]).
280 pub fn should_refresh(current_epoch: u64, observed_epoch: u64) -> bool {
281 observed_epoch > current_epoch
282 }
283}
284
285// --------------------------------------------------------------
286// Refresh scheduling — pluggable clock so the 30s interval is
287// testable without real waits.
288// --------------------------------------------------------------
289
290/// Trait the [`RefreshScheduler`] reads time from. The production
291/// impl reads `std::time::Instant::now()`; tests inject a
292/// monotonic-counter fake.
293pub trait Clock: std::fmt::Debug {
294 fn now_monotonic_ms(&self) -> u64;
295}
296
297/// Default real-time clock. Hides the `Instant` epoch so the trait
298/// stays `dyn`-friendly.
299#[derive(Debug)]
300pub struct SystemClock {
301 origin: std::time::Instant,
302}
303
304impl Default for SystemClock {
305 fn default() -> Self {
306 Self {
307 origin: std::time::Instant::now(),
308 }
309 }
310}
311
312impl Clock for SystemClock {
313 fn now_monotonic_ms(&self) -> u64 {
314 self.origin.elapsed().as_millis() as u64
315 }
316}
317
318/// Drives the periodic-refresh + force-on-error rule.
319///
320/// Owns the "should I refresh now?" decision; the actual RPC dispatch
321/// is the higher-level driver's job. Keeping this state machine
322/// isolated lets the 30s interval get tested without sleeping.
323#[derive(Debug)]
324pub struct RefreshScheduler {
325 interval: Duration,
326 clock: Box<dyn Clock + Send + Sync>,
327 last_refresh_ms: Option<u64>,
328 /// Force flag set by [`Self::force_now`]; cleared the next time
329 /// [`Self::should_refresh_now`] returns true.
330 force: bool,
331}
332
333impl RefreshScheduler {
334 /// Build a scheduler with the default 30s interval and the real
335 /// system clock.
336 pub fn new() -> Self {
337 Self::with_interval(DEFAULT_REFRESH_INTERVAL)
338 }
339
340 /// Build a scheduler with a custom interval and the real system
341 /// clock.
342 pub fn with_interval(interval: Duration) -> Self {
343 Self::with_interval_and_clock(interval, Box::new(SystemClock::default()))
344 }
345
346 /// Build a scheduler with a custom interval *and* clock — the
347 /// hook tests inject a fake clock through.
348 pub fn with_interval_and_clock(
349 interval: Duration,
350 clock: Box<dyn Clock + Send + Sync>,
351 ) -> Self {
352 Self {
353 interval,
354 clock,
355 last_refresh_ms: None,
356 force: false,
357 }
358 }
359
360 /// Poll-loop hook: call once per loop iteration (or before each
361 /// dispatch). Returns true when a refresh is due.
362 ///
363 /// On `true`, the caller should dispatch the [`Topology`] RPC,
364 /// run [`TopologyConsumer::consume_bytes`], and call
365 /// [`Self::mark_refreshed`] with the resulting epoch.
366 pub fn should_refresh_now(&mut self) -> bool {
367 if self.force {
368 self.force = false;
369 return true;
370 }
371 let now = self.clock.now_monotonic_ms();
372 let interval_ms = self.interval.as_millis() as u64;
373 match self.last_refresh_ms {
374 None => true,
375 Some(last) => now.saturating_sub(last) >= interval_ms,
376 }
377 }
378
379 /// Stamp the last successful refresh. The next
380 /// [`Self::should_refresh_now`] returns true once
381 /// `interval` has elapsed.
382 pub fn mark_refreshed(&mut self) {
383 self.last_refresh_ms = Some(self.clock.now_monotonic_ms());
384 }
385
386 /// Set the force flag — the next call to
387 /// [`Self::should_refresh_now`] returns true regardless of
388 /// the timer. Used by the routing driver after a connection-
389 /// level error.
390 pub fn force_now(&mut self) {
391 self.force = true;
392 }
393}
394
395impl Default for RefreshScheduler {
396 fn default() -> Self {
397 Self::new()
398 }
399}
400
401// --------------------------------------------------------------
402// Internal: minimal base64 decoder reused so we can split the
403// "bad base64" vs "bad version" recoverable error variants.
404// Mirrors the alphabet used by the wire encoder. Kept private —
405// the wire crate exposes its own; this is a paste-equivalent so
406// we don't widen `reddb-wire`'s public surface for one branch.
407// --------------------------------------------------------------
408
409fn decode_base64(input: &str) -> Option<Vec<u8>> {
410 let trimmed = input.trim_end_matches('=');
411 let mut out = Vec::with_capacity(trimmed.len() * 3 / 4);
412 let mut buf = 0u32;
413 let mut bits = 0u8;
414 for ch in trimmed.bytes() {
415 let v: u32 = match ch {
416 b'A'..=b'Z' => (ch - b'A') as u32,
417 b'a'..=b'z' => (ch - b'a' + 26) as u32,
418 b'0'..=b'9' => (ch - b'0' + 52) as u32,
419 b'+' => 62,
420 b'/' => 63,
421 _ => return None,
422 };
423 buf = (buf << 6) | v;
424 bits += 6;
425 if bits >= 8 {
426 bits -= 8;
427 out.push(((buf >> bits) & 0xFF) as u8);
428 }
429 }
430 Some(out)
431}
432
433#[cfg(test)]
434mod tests {
435 use super::*;
436 use reddb_wire::topology::{
437 encode_topology, encode_topology_for_hello_ack, Endpoint as WireEndpoint, ReplicaInfo,
438 Topology, TOPOLOGY_HEADER_SIZE, TOPOLOGY_WIRE_VERSION_V1,
439 };
440
441 fn fixture() -> Topology {
442 Topology {
443 epoch: 7,
444 primary: WireEndpoint {
445 addr: "primary.example.com:5050".into(),
446 region: "us-east-1".into(),
447 },
448 replicas: vec![
449 ReplicaInfo {
450 addr: "replica-a.example.com:5050".into(),
451 region: "us-east-1".into(),
452 healthy: true,
453 lag_ms: 12,
454 last_applied_lsn: 4242,
455 },
456 ReplicaInfo {
457 addr: "replica-b.example.com:5050".into(),
458 region: "us-west-2".into(),
459 healthy: false,
460 lag_ms: 999,
461 last_applied_lsn: 4100,
462 },
463 ],
464 }
465 }
466
467 // ---- round-trip on both transports ----
468
469 #[test]
470 fn parse_round_trip_grpc_bytes() {
471 let t = fixture();
472 let bytes = encode_topology(&t);
473 let m = TopologyConsumer::consume_bytes(&bytes, None).expect("consume");
474 assert_eq!(m.epoch, 7);
475 assert_eq!(m.primary, t.primary);
476 assert_eq!(m.replicas, t.replicas);
477 }
478
479 #[test]
480 fn parse_round_trip_hello_ack_field() {
481 let t = fixture();
482 let field = encode_topology_for_hello_ack(&t);
483 let m = TopologyConsumer::consume_hello_ack(&field, None).expect("consume");
484 assert_eq!(m.epoch, 7);
485 assert_eq!(m.primary, t.primary);
486 assert_eq!(m.replicas, t.replicas);
487 }
488
489 #[test]
490 fn fixture_byte_stable_across_runs() {
491 // Acceptance: same Topology fixture round-trips byte-stable
492 // through the canonical encoder, so both transports carry
493 // identical bytes (#166 §4 already pinned this; here we
494 // assert the consumer doesn't perturb it).
495 let t = fixture();
496 let a = encode_topology(&t);
497 let b = encode_topology(&t);
498 assert_eq!(a, b);
499 // And the inner bytes recovered from the HelloAck base64
500 // wrapper match the gRPC bytes byte-for-byte.
501 let field = encode_topology_for_hello_ack(&t);
502 let recovered = decode_base64(&field).expect("base64");
503 assert_eq!(recovered, a);
504 }
505
506 // ---- merge rule ----
507
508 #[test]
509 fn merge_uri_only_replicas_dropped() {
510 // URI lists three replicas; advertisement only carries two.
511 // The third (URI-only) must be dropped — operator
512 // decommissioned it.
513 let t = fixture();
514 let seed = UriSeed::cluster(
515 "primary.example.com:5050".to_string(),
516 vec![
517 "replica-a.example.com:5050".into(),
518 "replica-b.example.com:5050".into(),
519 "replica-stale.example.com:5050".into(),
520 ],
521 );
522 let m = TopologyConsumer::consume(t.clone(), Some(seed));
523 assert_eq!(m.replicas.len(), 2, "URI-only replica must be dropped");
524 assert!(
525 m.replicas
526 .iter()
527 .all(|r| r.addr != "replica-stale.example.com:5050"),
528 "stale URI replica must not appear in membership"
529 );
530 }
531
532 #[test]
533 fn merge_advertised_only_replicas_added() {
534 // URI lists no replicas; advertisement carries two. Both
535 // must show up in the merged membership — URI is a hint,
536 // not a constraint.
537 let t = fixture();
538 let seed = UriSeed::single("primary.example.com:5050");
539 let m = TopologyConsumer::consume(t.clone(), Some(seed));
540 assert_eq!(m.replicas.len(), 2);
541 assert_eq!(m.replicas, t.replicas);
542 }
543
544 #[test]
545 fn merge_intersection_uses_advertised_metadata() {
546 // URI replica matches an advertised replica. The merged
547 // membership must carry the *advertised* metadata
548 // (region, healthy, lag_ms, last_applied_lsn), not anything
549 // synthesised from the URI.
550 let t = fixture();
551 let seed = UriSeed::cluster(
552 "primary.example.com:5050".to_string(),
553 vec!["replica-a.example.com:5050".into()],
554 );
555 let m = TopologyConsumer::consume(t.clone(), Some(seed));
556 let merged_a = m
557 .replicas
558 .iter()
559 .find(|r| r.addr == "replica-a.example.com:5050")
560 .expect("advertised replica must be present");
561 assert_eq!(merged_a.region, "us-east-1");
562 assert!(merged_a.healthy);
563 assert_eq!(merged_a.lag_ms, 12);
564 assert_eq!(merged_a.last_applied_lsn, 4242);
565 }
566
567 #[test]
568 fn merge_with_no_seed_keeps_full_advertisement() {
569 let t = fixture();
570 let m = TopologyConsumer::consume(t.clone(), None);
571 assert_eq!(m.primary, t.primary);
572 assert_eq!(m.replicas, t.replicas);
573 assert_eq!(m.epoch, t.epoch);
574 }
575
576 // ---- refresh decision ----
577
578 #[test]
579 fn should_refresh_skips_same_epoch() {
580 assert!(!TopologyConsumer::should_refresh(7, 7));
581 }
582
583 #[test]
584 fn should_refresh_advances_on_higher_epoch() {
585 assert!(TopologyConsumer::should_refresh(7, 8));
586 }
587
588 #[test]
589 fn should_refresh_treats_lower_epoch_as_stale() {
590 // A lower observed epoch means the peer is behind us. We
591 // skip — the next poll picks up the canonical advancement.
592 assert!(!TopologyConsumer::should_refresh(7, 6));
593 }
594
595 // ---- malformed / adversarial fixtures ----
596
597 #[test]
598 fn malformed_truncated_blob_returns_typed_error() {
599 let err = TopologyConsumer::consume_bytes(&[0x01, 0x00], None).unwrap_err();
600 assert!(matches!(err, ConsumeError::Truncated));
601 assert!(!err.is_recoverable());
602 }
603
604 #[test]
605 fn malformed_body_length_mismatch_returns_typed_error() {
606 let bytes = vec![0x01, 0xFF, 0xFF, 0xFF, 0xFF, 0x00];
607 let err = TopologyConsumer::consume_bytes(&bytes, None).unwrap_err();
608 assert!(matches!(err, ConsumeError::BodyLengthMismatch { .. }));
609 assert!(!err.is_recoverable());
610 }
611
612 #[test]
613 fn unknown_version_tag_is_recoverable() {
614 // ADR 0008 §4: forward-compat. An unknown wire version tag
615 // collapses to "fall back to URI-only routing", surfaced as
616 // a recoverable typed error so the caller can log a one-line
617 // warning before downgrading.
618 let mut bytes = encode_topology(&fixture());
619 bytes[0] = 0xFE; // past MAX_KNOWN_TOPOLOGY_VERSION
620 let err = TopologyConsumer::consume_bytes(&bytes, None).unwrap_err();
621 assert!(matches!(err, ConsumeError::UnknownVersion));
622 assert!(err.is_recoverable());
623 }
624
625 #[test]
626 fn malformed_envelope_base64_is_recoverable() {
627 // Bad base64 in the HelloAck `topology` field. Same posture
628 // as an unknown version tag: drop, fall back, never panic.
629 let err = TopologyConsumer::consume_hello_ack("@not base64@", None).unwrap_err();
630 assert!(matches!(err, ConsumeError::MalformedEnvelope));
631 assert!(err.is_recoverable());
632 }
633
634 #[test]
635 fn oversize_string_field_returns_typed_error() {
636 // Adversarial: stamp a string-length prefix that overruns
637 // the body. The decoder must surface a typed error, not
638 // panic.
639 // Build a v1 body with a primary.addr length way past the
640 // available body bytes.
641 let mut body = Vec::new();
642 body.extend_from_slice(&0u64.to_le_bytes()); // epoch
643 // primary.addr len = 0xFFFF_FFFF (clearly bogus)
644 body.extend_from_slice(&u32::MAX.to_le_bytes());
645 let mut bytes = Vec::new();
646 bytes.push(TOPOLOGY_WIRE_VERSION_V1);
647 bytes.extend_from_slice(&(body.len() as u32).to_le_bytes());
648 bytes.extend_from_slice(&body);
649 assert_eq!(bytes.len(), TOPOLOGY_HEADER_SIZE + body.len());
650 let err = TopologyConsumer::consume_bytes(&bytes, None).unwrap_err();
651 assert!(
652 matches!(err, ConsumeError::StringTooLong { .. }),
653 "expected StringTooLong, got {err:?}"
654 );
655 assert!(!err.is_recoverable());
656 }
657
658 #[test]
659 fn invalid_utf8_string_returns_typed_error() {
660 // Build a v1 body where primary.addr is two bytes of
661 // invalid UTF-8 (0xFF 0xFE).
662 let mut body = Vec::new();
663 body.extend_from_slice(&0u64.to_le_bytes()); // epoch
664 body.extend_from_slice(&2u32.to_le_bytes()); // primary.addr len
665 body.extend_from_slice(&[0xFF, 0xFE]); // bogus utf8
666 // The body would normally continue, but the decoder
667 // hits invalid utf8 first.
668 let mut bytes = Vec::new();
669 bytes.push(TOPOLOGY_WIRE_VERSION_V1);
670 bytes.extend_from_slice(&(body.len() as u32).to_le_bytes());
671 bytes.extend_from_slice(&body);
672 let err = TopologyConsumer::consume_bytes(&bytes, None).unwrap_err();
673 assert!(
674 matches!(err, ConsumeError::InvalidUtf8),
675 "expected InvalidUtf8, got {err:?}"
676 );
677 }
678
679 #[test]
680 fn consume_does_not_panic_on_any_random_short_buffer() {
681 // Smoke fuzz: short buffers should always either Ok-Some or
682 // typed-Err, never panic.
683 for n in 0..10usize {
684 let bytes = vec![0xAAu8; n];
685 let _ = TopologyConsumer::consume_bytes(&bytes, None);
686 }
687 }
688
689 // ---- fake-clock RefreshScheduler ----
690
691 #[derive(Debug)]
692 struct FakeClock {
693 ms: std::sync::Mutex<u64>,
694 }
695
696 impl FakeClock {
697 fn new() -> Self {
698 Self {
699 ms: std::sync::Mutex::new(0),
700 }
701 }
702 fn advance(&self, by: Duration) {
703 *self.ms.lock().unwrap() += by.as_millis() as u64;
704 }
705 }
706
707 impl Clock for FakeClock {
708 fn now_monotonic_ms(&self) -> u64 {
709 *self.ms.lock().unwrap()
710 }
711 }
712
713 fn scheduler_with(clock: std::sync::Arc<FakeClock>, interval: Duration) -> RefreshScheduler {
714 // The scheduler owns a Box<dyn Clock>; route both the
715 // scheduler and the test handle through an Arc so the test
716 // can advance time without taking the box back.
717 #[derive(Debug)]
718 struct ArcClock(std::sync::Arc<FakeClock>);
719 impl Clock for ArcClock {
720 fn now_monotonic_ms(&self) -> u64 {
721 self.0.now_monotonic_ms()
722 }
723 }
724 RefreshScheduler::with_interval_and_clock(interval, Box::new(ArcClock(clock)))
725 }
726
727 #[test]
728 fn fake_clock_first_call_refreshes_immediately() {
729 let clock = std::sync::Arc::new(FakeClock::new());
730 let mut s = scheduler_with(clock.clone(), Duration::from_secs(30));
731 assert!(s.should_refresh_now(), "first call must refresh");
732 }
733
734 #[test]
735 fn fake_clock_thirty_second_interval_holds_without_real_waits() {
736 let clock = std::sync::Arc::new(FakeClock::new());
737 let mut s = scheduler_with(clock.clone(), Duration::from_secs(30));
738 assert!(s.should_refresh_now());
739 s.mark_refreshed();
740 // Just under 30s: must NOT refresh.
741 clock.advance(Duration::from_millis(29_999));
742 assert!(
743 !s.should_refresh_now(),
744 "must not refresh before interval elapsed"
745 );
746 // Crossing the threshold: must refresh.
747 clock.advance(Duration::from_millis(2));
748 assert!(
749 s.should_refresh_now(),
750 "must refresh once interval has elapsed"
751 );
752 }
753
754 #[test]
755 fn fake_clock_force_now_overrides_interval() {
756 let clock = std::sync::Arc::new(FakeClock::new());
757 let mut s = scheduler_with(clock.clone(), Duration::from_secs(30));
758 assert!(s.should_refresh_now());
759 s.mark_refreshed();
760 // Far below the 30s interval — would normally skip.
761 clock.advance(Duration::from_millis(100));
762 assert!(!s.should_refresh_now());
763 // Connection-level error: force a refresh on the next call.
764 s.force_now();
765 assert!(s.should_refresh_now(), "force_now must override the timer");
766 // Force flag is single-shot: the next call goes back to the
767 // timer (which has not elapsed).
768 s.mark_refreshed();
769 clock.advance(Duration::from_millis(100));
770 assert!(!s.should_refresh_now());
771 }
772
773 #[test]
774 fn default_interval_is_thirty_seconds() {
775 // Sentinel against an accidental rebase that knocks the
776 // documented 30s default.
777 assert_eq!(DEFAULT_REFRESH_INTERVAL, Duration::from_secs(30));
778 }
779}