Skip to main content

reddb_wire/
topology.rs

1//! Canonical `Topology` payload — shared by both transports.
2//!
3//! ADR 0008 (`.red/adr/0008-topology-advertisement-security.md`)
4//! settled the security model and the schema-evolution rule:
5//!
6//!   * New optional fields land under a versioned envelope.
7//!   * Old parsers ignore unknown fields cleanly. Unknown version
8//!     tags are dropped (no panic), the consumer falls back to
9//!     URI-only routing.
10//!   * Schema-version bumps are reserved for changes a naive
11//!     optional field cannot express (a removed field, a renegotiated
12//!     meaning, a framing change). They are not the default move.
13//!
14//! Wire encoding (single shape across RedWire HelloAck + gRPC
15//! `Topology` RPC):
16//!
17//! ```text
18//! ┌────────────────────────────────────────────────────────────┐
19//! │ u8   version_tag        currently 0x01                     │
20//! │ u32  body_length (LE)   bytes that follow                  │
21//! │ ... body ...            version-specific payload encoding  │
22//! └────────────────────────────────────────────────────────────┘
23//! ```
24//!
25//! The header (tag + length) is fixed across versions so a parser
26//! that does not recognise the tag can still skip the entire blob
27//! cleanly.
28//!
29//! Body for `0x01`: a flat little-endian struct dump where every
30//! string is `u32 len` + utf-8 bytes:
31//!
32//! ```text
33//! u64  epoch
34//! str  primary.addr
35//! str  primary.region
36//! u32  replicas.len
37//! foreach replica:
38//!   str  addr
39//!   str  region
40//!   u8   healthy   (0 / 1)
41//!   u32  lag_ms
42//!   u64  last_applied_lsn
43//! ```
44//!
45//! No serde dependency: a hex dump stays readable, no extra crate
46//! pulled into `reddb-wire`, and the format matches the rest of the
47//! RedWire codec discipline.
48
49/// Wire version tag for the initial schema.
50///
51/// Bumping this is reserved for genuinely breaking changes (removed
52/// field, renegotiated meaning, framing change). Additive evolution
53/// (new optional fields) keeps this byte stable — see ADR 0008 §4.
54pub const TOPOLOGY_WIRE_VERSION_V1: u8 = 0x01;
55
56/// Highest tag the current parser understands. A `Topology` blob
57/// stamped with anything else is ignored cleanly so an older client
58/// can keep its URI-only routing fallback.
59pub const MAX_KNOWN_TOPOLOGY_VERSION: u8 = TOPOLOGY_WIRE_VERSION_V1;
60
61/// Header size (version tag + u32 body length).
62pub const TOPOLOGY_HEADER_SIZE: usize = 1 + 4;
63
64/// Canonical topology payload — the same shape both transports
65/// carry. New optional fields go in here as `Option<…>` /
66/// `Vec<…>` and ride the existing version tag; only a genuinely
67/// breaking change earns a new tag.
68#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct Topology {
70    pub epoch: u64,
71    pub primary: Endpoint,
72    pub replicas: Vec<ReplicaInfo>,
73}
74
75#[derive(Debug, Clone, PartialEq, Eq)]
76pub struct Endpoint {
77    pub addr: String,
78    pub region: String,
79}
80
81#[derive(Debug, Clone, PartialEq, Eq)]
82pub struct ReplicaInfo {
83    pub addr: String,
84    pub region: String,
85    pub healthy: bool,
86    pub lag_ms: u32,
87    pub last_applied_lsn: u64,
88    /// `true` while this replica is re-bootstrapping (loading a fresh
89    /// snapshot). Its `last_applied_lsn` describes data it is about to
90    /// discard, so a causal reader must treat it as ineligible for
91    /// bookmark reads even when the advertised frontier covers the
92    /// bookmark (issue #837). Non-causal reads are unaffected.
93    ///
94    /// Additive field (ADR 0008 §4): it rides the existing `0x01`
95    /// version tag as a trailing per-replica flag block appended after
96    /// the replica records. An old decoder stops after the records and
97    /// ignores the trailing bytes (defaulting every replica to
98    /// `false`); a new decoder reading old bytes finds no trailing
99    /// block and likewise defaults to `false`. No version bump.
100    pub rebootstrapping: bool,
101}
102
103/// Decode-side errors. Distinct from "unknown version tag", which
104/// is reported as `Ok(None)` on `decode_topology` so the consumer
105/// can fall back to URI-only routing without branching on an
106/// error variant.
107#[derive(Debug, Clone, PartialEq, Eq)]
108pub enum TopologyError {
109    Truncated,
110    BodyLengthMismatch { declared: u32, available: usize },
111    InvalidUtf8,
112    StringTooLong { declared: u32, remaining: usize },
113}
114
115impl std::fmt::Display for TopologyError {
116    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117        match self {
118            Self::Truncated => write!(f, "topology blob truncated (< 5-byte header)"),
119            Self::BodyLengthMismatch {
120                declared,
121                available,
122            } => write!(
123                f,
124                "topology body length mismatch: declared {declared}, available {available}"
125            ),
126            Self::InvalidUtf8 => write!(f, "topology string field is not valid UTF-8"),
127            Self::StringTooLong {
128                declared,
129                remaining,
130            } => write!(
131                f,
132                "topology string length {declared} exceeds remaining body bytes {remaining}"
133            ),
134        }
135    }
136}
137
138impl std::error::Error for TopologyError {}
139
140/// Encode `topology` to the canonical version-tagged byte string.
141/// Same bytes consumed by both RedWire HelloAck (after base64
142/// embedding in the JSON envelope) and the gRPC `TopologyReply`
143/// (carried directly as a `bytes` field).
144pub fn encode_topology(topology: &Topology) -> Vec<u8> {
145    let mut body = Vec::with_capacity(estimate_body_size(topology));
146    body.extend_from_slice(&topology.epoch.to_le_bytes());
147    write_str(&mut body, &topology.primary.addr);
148    write_str(&mut body, &topology.primary.region);
149    body.extend_from_slice(&(topology.replicas.len() as u32).to_le_bytes());
150    for r in &topology.replicas {
151        write_str(&mut body, &r.addr);
152        write_str(&mut body, &r.region);
153        body.push(if r.healthy { 1 } else { 0 });
154        body.extend_from_slice(&r.lag_ms.to_le_bytes());
155        body.extend_from_slice(&r.last_applied_lsn.to_le_bytes());
156    }
157    // Additive trailing block (ADR 0008 §4): one `rebootstrapping`
158    // byte per replica, in advertised order, appended after the
159    // records. Placed at the tail rather than interleaved into each
160    // record so an old decoder — which stops after `replicas.len`
161    // records — skips it cleanly instead of mis-framing every field.
162    for r in &topology.replicas {
163        body.push(if r.rebootstrapping { 1 } else { 0 });
164    }
165
166    let mut out = Vec::with_capacity(TOPOLOGY_HEADER_SIZE + body.len());
167    out.push(TOPOLOGY_WIRE_VERSION_V1);
168    out.extend_from_slice(&(body.len() as u32).to_le_bytes());
169    out.extend_from_slice(&body);
170    out
171}
172
173/// Decode a topology blob.
174///
175/// Returns:
176/// * `Ok(Some(Topology))` — recognised version tag, body parsed.
177/// * `Ok(None)` — unknown version tag. The consumer is expected to
178///   fall back to URI-only routing rather than treat this as an
179///   error (ADR 0008 §4: unknown fields are dropped, not rejected).
180/// * `Err(TopologyError)` — recognised tag, but the body was
181///   structurally malformed (truncated, invalid UTF-8, …).
182pub fn decode_topology(bytes: &[u8]) -> Result<Option<Topology>, TopologyError> {
183    if bytes.len() < TOPOLOGY_HEADER_SIZE {
184        return Err(TopologyError::Truncated);
185    }
186    let version = bytes[0];
187    let declared_len = u32::from_le_bytes([bytes[1], bytes[2], bytes[3], bytes[4]]);
188    let body = &bytes[TOPOLOGY_HEADER_SIZE..];
189    if (body.len() as u64) < declared_len as u64 {
190        return Err(TopologyError::BodyLengthMismatch {
191            declared: declared_len,
192            available: body.len(),
193        });
194    }
195    let body = &body[..declared_len as usize];
196
197    if version > MAX_KNOWN_TOPOLOGY_VERSION {
198        // Forward-compat: unknown version tag, drop cleanly.
199        return Ok(None);
200    }
201
202    // version == 0x01
203    let mut cur = Cursor::new(body);
204    let epoch = cur.read_u64()?;
205    let primary_addr = cur.read_str()?;
206    let primary_region = cur.read_str()?;
207    let replica_count = cur.read_u32()? as usize;
208    let mut replicas = Vec::with_capacity(replica_count);
209    for _ in 0..replica_count {
210        let addr = cur.read_str()?;
211        let region = cur.read_str()?;
212        let healthy = cur.read_u8()? != 0;
213        let lag_ms = cur.read_u32()?;
214        let last_applied_lsn = cur.read_u64()?;
215        replicas.push(ReplicaInfo {
216            addr,
217            region,
218            healthy,
219            lag_ms,
220            last_applied_lsn,
221            // Default until the trailing block is read below. A
222            // pre-#837 advertisement carries no trailing block, so
223            // every replica stays `false`.
224            rebootstrapping: false,
225        });
226    }
227    // Additive trailing block: a `rebootstrapping` byte per replica.
228    // Present only when the producer is post-#837. When the producer
229    // is older the cursor has no bytes left here and every flag keeps
230    // its `false` default. A partial/truncated block (fewer bytes than
231    // replicas) leaves the unread tail `false` rather than erroring —
232    // forward-compat posture per ADR 0008 §4.
233    for r in replicas.iter_mut() {
234        if cur.remaining() == 0 {
235            break;
236        }
237        r.rebootstrapping = cur.read_u8()? != 0;
238    }
239    Ok(Some(Topology {
240        epoch,
241        primary: Endpoint {
242            addr: primary_addr,
243            region: primary_region,
244        },
245        replicas,
246    }))
247}
248
249fn estimate_body_size(t: &Topology) -> usize {
250    let endpoint = |e: &Endpoint| 4 + e.addr.len() + 4 + e.region.len();
251    let mut n = 8 + endpoint(&t.primary) + 4;
252    for r in &t.replicas {
253        n += 4 + r.addr.len() + 4 + r.region.len() + 1 + 4 + 8;
254    }
255    // Trailing `rebootstrapping` flag block: one byte per replica.
256    n += t.replicas.len();
257    n
258}
259
260fn write_str(buf: &mut Vec<u8>, s: &str) {
261    buf.extend_from_slice(&(s.len() as u32).to_le_bytes());
262    buf.extend_from_slice(s.as_bytes());
263}
264
265struct Cursor<'a> {
266    buf: &'a [u8],
267    pos: usize,
268}
269
270impl<'a> Cursor<'a> {
271    fn new(buf: &'a [u8]) -> Self {
272        Self { buf, pos: 0 }
273    }
274
275    fn remaining(&self) -> usize {
276        self.buf.len() - self.pos
277    }
278
279    fn read_u8(&mut self) -> Result<u8, TopologyError> {
280        if self.remaining() < 1 {
281            return Err(TopologyError::Truncated);
282        }
283        let v = self.buf[self.pos];
284        self.pos += 1;
285        Ok(v)
286    }
287
288    fn read_u32(&mut self) -> Result<u32, TopologyError> {
289        if self.remaining() < 4 {
290            return Err(TopologyError::Truncated);
291        }
292        let bytes = &self.buf[self.pos..self.pos + 4];
293        self.pos += 4;
294        Ok(u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
295    }
296
297    fn read_u64(&mut self) -> Result<u64, TopologyError> {
298        if self.remaining() < 8 {
299            return Err(TopologyError::Truncated);
300        }
301        let bytes = &self.buf[self.pos..self.pos + 8];
302        self.pos += 8;
303        Ok(u64::from_le_bytes([
304            bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
305        ]))
306    }
307
308    fn read_str(&mut self) -> Result<String, TopologyError> {
309        let len = self.read_u32()?;
310        if (len as usize) > self.remaining() {
311            return Err(TopologyError::StringTooLong {
312                declared: len,
313                remaining: self.remaining(),
314            });
315        }
316        let bytes = &self.buf[self.pos..self.pos + len as usize];
317        self.pos += len as usize;
318        let s = std::str::from_utf8(bytes)
319            .map_err(|_| TopologyError::InvalidUtf8)?
320            .to_string();
321        Ok(s)
322    }
323}
324
325// ---------------------------------------------------------------
326// HelloAck embedding.
327//
328// The HelloAck payload is a JSON object. To carry the binary
329// topology blob inside it without breaking the existing JSON-only
330// parser (which deserialises the whole payload with `serde_json`),
331// the canonical bytes are base64-encoded under a new `topology`
332// key. Old parsers ignore unknown keys cleanly; new parsers extract
333// the field and run `decode_topology` on the decoded bytes.
334// ---------------------------------------------------------------
335
336/// Base64-encode the canonical topology bytes for embedding inside
337/// a HelloAck JSON payload as a string field.
338///
339/// The caller (server-side HelloAck builder) is expected to insert
340/// the resulting string under the JSON key `"topology"`; the client
341/// extracts that key, base64-decodes it, and runs `decode_topology`.
342pub fn encode_topology_for_hello_ack(topology: &Topology) -> String {
343    base64_encode(&encode_topology(topology))
344}
345
346/// Decode the base64 string carried in HelloAck JSON `topology`
347/// field back into a `Topology`.
348///
349/// Mirrors `decode_topology`'s three-state contract:
350/// * `Ok(Some(_))` — recognised version tag, body parsed.
351/// * `Ok(None)` — base64 decode failed *or* the version tag is
352///   unknown. Both cases collapse to "fall back to URI-only
353///   routing"; the consumer does not branch on which one it was.
354/// * `Err(_)` — recognised version tag with a malformed body.
355pub fn decode_topology_from_hello_ack(field: &str) -> Result<Option<Topology>, TopologyError> {
356    let Some(bytes) = base64_decode(field) else {
357        // Malformed base64 is treated as "unknown encoding, drop
358        // cleanly" so the client falls back to URI-only routing —
359        // same posture as an unknown version tag (ADR 0008 §4).
360        return Ok(None);
361    };
362    decode_topology(&bytes)
363}
364
365const B64_ALPHA: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
366
367fn base64_encode(input: &[u8]) -> String {
368    let mut out = String::with_capacity(input.len().div_ceil(3) * 4);
369    let chunks = input.chunks_exact(3);
370    let rem = chunks.remainder();
371    for c in chunks {
372        let n = ((c[0] as u32) << 16) | ((c[1] as u32) << 8) | (c[2] as u32);
373        out.push(B64_ALPHA[((n >> 18) & 0x3F) as usize] as char);
374        out.push(B64_ALPHA[((n >> 12) & 0x3F) as usize] as char);
375        out.push(B64_ALPHA[((n >> 6) & 0x3F) as usize] as char);
376        out.push(B64_ALPHA[(n & 0x3F) as usize] as char);
377    }
378    match rem {
379        [a] => {
380            let n = (*a as u32) << 16;
381            out.push(B64_ALPHA[((n >> 18) & 0x3F) as usize] as char);
382            out.push(B64_ALPHA[((n >> 12) & 0x3F) as usize] as char);
383            out.push('=');
384            out.push('=');
385        }
386        [a, b] => {
387            let n = ((*a as u32) << 16) | ((*b as u32) << 8);
388            out.push(B64_ALPHA[((n >> 18) & 0x3F) as usize] as char);
389            out.push(B64_ALPHA[((n >> 12) & 0x3F) as usize] as char);
390            out.push(B64_ALPHA[((n >> 6) & 0x3F) as usize] as char);
391            out.push('=');
392        }
393        _ => {}
394    }
395    out
396}
397
398fn base64_decode(input: &str) -> Option<Vec<u8>> {
399    let trimmed = input.trim_end_matches('=');
400    let mut out = Vec::with_capacity(trimmed.len() * 3 / 4);
401    let mut buf = 0u32;
402    let mut bits = 0u8;
403    for ch in trimmed.bytes() {
404        let v: u32 = match ch {
405            b'A'..=b'Z' => (ch - b'A') as u32,
406            b'a'..=b'z' => (ch - b'a' + 26) as u32,
407            b'0'..=b'9' => (ch - b'0' + 52) as u32,
408            b'+' => 62,
409            b'/' => 63,
410            _ => return None,
411        };
412        buf = (buf << 6) | v;
413        bits += 6;
414        if bits >= 8 {
415            bits -= 8;
416            out.push(((buf >> bits) & 0xFF) as u8);
417        }
418    }
419    Some(out)
420}
421
422#[cfg(test)]
423mod tests {
424    use super::*;
425
426    fn fixture() -> Topology {
427        Topology {
428            epoch: 0xDEAD_BEEF_CAFE_BABE,
429            primary: Endpoint {
430                addr: "primary.example.com:5050".into(),
431                region: "us-east-1".into(),
432            },
433            replicas: vec![
434                ReplicaInfo {
435                    addr: "replica-a.example.com:5050".into(),
436                    region: "us-east-1".into(),
437                    healthy: true,
438                    lag_ms: 12,
439                    last_applied_lsn: 4242,
440                    rebootstrapping: false,
441                },
442                ReplicaInfo {
443                    addr: "replica-b.example.com:5050".into(),
444                    region: "us-west-2".into(),
445                    healthy: false,
446                    lag_ms: 999,
447                    last_applied_lsn: 4100,
448                    rebootstrapping: true,
449                },
450            ],
451        }
452    }
453
454    #[test]
455    fn round_trip_v1() {
456        let t = fixture();
457        let bytes = encode_topology(&t);
458        let decoded = decode_topology(&bytes).expect("decode").expect("v1 known");
459        assert_eq!(decoded, t);
460    }
461
462    #[test]
463    fn empty_replicas_round_trip() {
464        let t = Topology {
465            epoch: 1,
466            primary: Endpoint {
467                addr: "p:5050".into(),
468                region: "r".into(),
469            },
470            replicas: vec![],
471        };
472        let bytes = encode_topology(&t);
473        let decoded = decode_topology(&bytes).expect("decode").expect("v1");
474        assert_eq!(decoded, t);
475    }
476
477    #[test]
478    fn unknown_version_tag_returns_none() {
479        // Forward-compat invariant from ADR 0008 §4: an unknown
480        // version tag must be ignored, not rejected. The consumer
481        // falls back to URI-only routing.
482        let mut bytes = encode_topology(&fixture());
483        bytes[0] = 0xFE; // bumped past MAX_KNOWN_TOPOLOGY_VERSION
484        let decoded = decode_topology(&bytes).expect("decode");
485        assert!(
486            decoded.is_none(),
487            "unknown version tag must drop cleanly, got {decoded:?}"
488        );
489    }
490
491    #[test]
492    fn truncated_header_errors() {
493        assert!(matches!(
494            decode_topology(&[0x01, 0x00]),
495            Err(TopologyError::Truncated)
496        ));
497    }
498
499    #[test]
500    fn body_length_mismatch_errors() {
501        // Declared body length larger than buffer.
502        let bytes = vec![0x01, 0xFF, 0xFF, 0xFF, 0xFF, 0x00];
503        assert!(matches!(
504            decode_topology(&bytes),
505            Err(TopologyError::BodyLengthMismatch { .. })
506        ));
507    }
508
509    #[test]
510    fn version_tag_is_pinned_to_0x01() {
511        // Sentinel against an accidental schema bump. ADR 0008 §4
512        // reserves the bump for genuinely breaking changes; a PR
513        // adding an optional field must NOT touch this value.
514        assert_eq!(TOPOLOGY_WIRE_VERSION_V1, 0x01);
515    }
516
517    #[test]
518    fn rebootstrapping_flag_round_trips_per_replica() {
519        // The additive #837 flag survives a full encode/decode and is
520        // carried independently per replica.
521        let t = fixture();
522        let decoded = decode_topology(&encode_topology(&t))
523            .expect("decode")
524            .expect("v1 known");
525        assert!(!decoded.replicas[0].rebootstrapping);
526        assert!(decoded.replicas[1].rebootstrapping);
527    }
528
529    #[test]
530    fn legacy_blob_without_trailing_block_defaults_rebootstrapping_false() {
531        // Backward-compat (ADR 0008 §4): a pre-#837 producer emits no
532        // trailing flag block. Synthesise that blob by encoding, then
533        // truncating the trailing per-replica flag bytes off the body
534        // and fixing the declared length. The decoder must still parse
535        // and default every replica to `false`.
536        let t = fixture();
537        let full = encode_topology(&t);
538        let trailing = t.replicas.len();
539        let body_len = (full.len() - TOPOLOGY_HEADER_SIZE - trailing) as u32;
540        let mut legacy = Vec::new();
541        legacy.push(TOPOLOGY_WIRE_VERSION_V1);
542        legacy.extend_from_slice(&body_len.to_le_bytes());
543        legacy.extend_from_slice(&full[TOPOLOGY_HEADER_SIZE..full.len() - trailing]);
544        let decoded = decode_topology(&legacy).expect("decode").expect("v1 known");
545        assert_eq!(decoded.replicas.len(), 2);
546        assert!(decoded.replicas.iter().all(|r| !r.rebootstrapping));
547        // Every other field is preserved verbatim — only the new flag
548        // is absent.
549        assert_eq!(decoded.replicas[0].last_applied_lsn, 4242);
550        assert_eq!(decoded.replicas[1].lag_ms, 999);
551    }
552
553    #[test]
554    fn hello_ack_round_trip_via_base64() {
555        // The HelloAck embedding shape: encode → base64-string →
556        // decode. Same canonical bytes both transports carry, just
557        // base64-wrapped so the JSON envelope stays valid.
558        let t = fixture();
559        let field = encode_topology_for_hello_ack(&t);
560        let decoded = decode_topology_from_hello_ack(&field)
561            .expect("decode")
562            .expect("v1 known");
563        assert_eq!(decoded, t);
564    }
565
566    #[test]
567    fn hello_ack_inner_bytes_match_grpc_bytes() {
568        // The acceptance criterion (#166 §4): same bytes consumed
569        // by both transports. Round-trip via the HelloAck base64
570        // wrapper and assert the decoded inner payload is byte-for-
571        // byte equivalent to the canonical encoding.
572        let t = fixture();
573        let canonical = encode_topology(&t);
574        let field = encode_topology_for_hello_ack(&t);
575        let recovered = base64_decode(&field).expect("base64");
576        assert_eq!(recovered, canonical);
577    }
578
579    #[test]
580    fn hello_ack_unknown_version_tag_drops_cleanly() {
581        // A HelloAck whose topology field carries a future version
582        // tag must not panic — the client falls back to URI-only
583        // routing.
584        let mut bytes = encode_topology(&fixture());
585        bytes[0] = 0x99;
586        let field = base64_encode(&bytes);
587        let decoded = decode_topology_from_hello_ack(&field).expect("decode");
588        assert!(decoded.is_none());
589    }
590
591    #[test]
592    fn hello_ack_malformed_base64_drops_cleanly() {
593        // A garbled base64 field is treated like an unknown version
594        // tag: drop, fall back to URI-only routing, never panic.
595        let decoded = decode_topology_from_hello_ack("@not base64@").expect("decode");
596        assert!(decoded.is_none());
597    }
598
599    #[test]
600    fn old_hello_ack_without_topology_field_is_backwards_compat() {
601        // Backwards-compat (#166 acceptance criterion §6): an
602        // old-style HelloAck JSON payload — no `topology` key —
603        // still parses cleanly into a Topology slot of `None` on
604        // the consumer side. We model that here by extracting the
605        // JSON value the way the client will (look for the key,
606        // run our decoder if present), and checking the absence
607        // path resolves to "no topology, fall back to URI-only".
608        let json = br#"{"version":1,"auth":"bearer","features":3,"server":"reddb/0.2.9"}"#;
609        let v: serde_json_check::Value = serde_json_check::from_slice(json).expect("valid JSON");
610        let topo_field = v.find_string("topology");
611        let topology = match topo_field {
612            None => None,
613            Some(s) => decode_topology_from_hello_ack(&s).expect("decode"),
614        };
615        assert!(
616            topology.is_none(),
617            "an old HelloAck without `topology` must produce None"
618        );
619    }
620
621    /// Tiny JSON probe used only by the backwards-compat test.
622    /// `reddb-wire` has no JSON dep — keep this scoped to test
623    /// code so we do not pull serde into the production build.
624    mod serde_json_check {
625        pub enum Value {
626            Object(Vec<(String, Value)>),
627            String(String),
628            Other,
629        }
630
631        impl Value {
632            pub fn find_string(&self, key: &str) -> Option<String> {
633                match self {
634                    Value::Object(map) => map.iter().find_map(|(k, v)| {
635                        if k == key {
636                            if let Value::String(s) = v {
637                                Some(s.clone())
638                            } else {
639                                None
640                            }
641                        } else {
642                            None
643                        }
644                    }),
645                    _ => None,
646                }
647            }
648        }
649
650        pub fn from_slice(bytes: &[u8]) -> Result<Value, &'static str> {
651            let s = std::str::from_utf8(bytes).map_err(|_| "utf8")?;
652            let mut p = Parser { src: s, pos: 0 };
653            p.skip_ws();
654            let v = p.parse_value()?;
655            Ok(v)
656        }
657
658        struct Parser<'a> {
659            src: &'a str,
660            pos: usize,
661        }
662
663        impl<'a> Parser<'a> {
664            fn rest(&self) -> &'a str {
665                &self.src[self.pos..]
666            }
667            fn bump(&mut self, n: usize) {
668                self.pos += n;
669            }
670            fn skip_ws(&mut self) {
671                while let Some(c) = self.rest().chars().next() {
672                    if c.is_whitespace() {
673                        self.bump(c.len_utf8());
674                    } else {
675                        break;
676                    }
677                }
678            }
679            fn parse_value(&mut self) -> Result<Value, &'static str> {
680                self.skip_ws();
681                let head = self.rest().chars().next().ok_or("eof")?;
682                match head {
683                    '{' => self.parse_object(),
684                    '"' => self.parse_string().map(Value::String),
685                    _ => {
686                        // Skip primitives (numbers, true/false/null,
687                        // arrays). The probe only cares about object
688                        // keys at the top level, so coarse skipping
689                        // is enough for our fixture.
690                        self.skip_until_top_level_comma_or_close();
691                        Ok(Value::Other)
692                    }
693                }
694            }
695            fn skip_until_top_level_comma_or_close(&mut self) {
696                let mut depth = 0i32;
697                while let Some(c) = self.rest().chars().next() {
698                    match c {
699                        '"' => {
700                            let _ = self.parse_string();
701                            continue;
702                        }
703                        '{' | '[' => {
704                            depth += 1;
705                            self.bump(1);
706                        }
707                        '}' | ']' => {
708                            if depth == 0 {
709                                return;
710                            }
711                            depth -= 1;
712                            self.bump(1);
713                        }
714                        ',' if depth == 0 => return,
715                        _ => self.bump(c.len_utf8()),
716                    }
717                }
718            }
719            fn parse_object(&mut self) -> Result<Value, &'static str> {
720                self.bump(1); // '{'
721                let mut map = Vec::new();
722                loop {
723                    self.skip_ws();
724                    if self.rest().starts_with('}') {
725                        self.bump(1);
726                        return Ok(Value::Object(map));
727                    }
728                    let key = self.parse_string()?;
729                    self.skip_ws();
730                    if !self.rest().starts_with(':') {
731                        return Err("expected ':'");
732                    }
733                    self.bump(1);
734                    let val = self.parse_value()?;
735                    map.push((key, val));
736                    self.skip_ws();
737                    match self.rest().chars().next() {
738                        Some(',') => {
739                            self.bump(1);
740                            continue;
741                        }
742                        Some('}') => {
743                            self.bump(1);
744                            return Ok(Value::Object(map));
745                        }
746                        _ => return Err("expected ',' or '}'"),
747                    }
748                }
749            }
750            fn parse_string(&mut self) -> Result<String, &'static str> {
751                if !self.rest().starts_with('"') {
752                    return Err("expected '\"'");
753                }
754                self.bump(1);
755                let start = self.pos;
756                while let Some(c) = self.rest().chars().next() {
757                    if c == '"' {
758                        let s = self.src[start..self.pos].to_string();
759                        self.bump(1);
760                        return Ok(s);
761                    }
762                    if c == '\\' {
763                        self.bump(c.len_utf8());
764                    }
765                    self.bump(c.len_utf8());
766                }
767                Err("unterminated string")
768            }
769        }
770    }
771
772    #[test]
773    fn header_layout_first_byte_is_version_then_le_length() {
774        // Pinning the layout: byte 0 is the tag, bytes 1..5 are the
775        // little-endian body length. A consumer that sees an unknown
776        // tag still knows how many bytes to skip — that is the only
777        // forward-compat invariant the header has to carry.
778        let t = fixture();
779        let bytes = encode_topology(&t);
780        assert_eq!(bytes[0], TOPOLOGY_WIRE_VERSION_V1);
781        let declared = u32::from_le_bytes([bytes[1], bytes[2], bytes[3], bytes[4]]);
782        assert_eq!(declared as usize, bytes.len() - TOPOLOGY_HEADER_SIZE);
783    }
784}