Skip to main content

reddb_wire/
topology.rs

1//! Canonical `Topology` payload — shared by both transports.
2//!
3//! ADR 0008 (`docs/adr/0008-topology-advertisement-security.md`)
4//! settled the security model and the schema-evolution rule:
5//!
6//!   * New optional fields land under a versioned envelope.
7//!   * Old parsers ignore unknown fields cleanly. Unknown version
8//!     tags are dropped (no panic), the consumer falls back to
9//!     URI-only routing.
10//!   * Schema-version bumps are reserved for changes a naive
11//!     optional field cannot express (a removed field, a renegotiated
12//!     meaning, a framing change). They are not the default move.
13//!
14//! Wire encoding (single shape across RedWire HelloAck + gRPC
15//! `Topology` RPC):
16//!
17//! ```text
18//! ┌────────────────────────────────────────────────────────────┐
19//! │ u8   version_tag        currently 0x01                     │
20//! │ u32  body_length (LE)   bytes that follow                  │
21//! │ ... body ...            version-specific payload encoding  │
22//! └────────────────────────────────────────────────────────────┘
23//! ```
24//!
25//! The header (tag + length) is fixed across versions so a parser
26//! that does not recognise the tag can still skip the entire blob
27//! cleanly.
28//!
29//! Body for `0x01`: a flat little-endian struct dump where every
30//! string is `u32 len` + utf-8 bytes:
31//!
32//! ```text
33//! u64  epoch
34//! str  primary.addr
35//! str  primary.region
36//! u32  replicas.len
37//! foreach replica:
38//!   str  addr
39//!   str  region
40//!   u8   healthy   (0 / 1)
41//!   u32  lag_ms
42//!   u64  last_applied_lsn
43//! ```
44//!
45//! No serde dependency: a hex dump stays readable, no extra crate
46//! pulled into `reddb-wire`, and the format matches the rest of the
47//! RedWire codec discipline.
48
49/// Wire version tag for the initial schema.
50///
51/// Bumping this is reserved for genuinely breaking changes (removed
52/// field, renegotiated meaning, framing change). Additive evolution
53/// (new optional fields) keeps this byte stable — see ADR 0008 §4.
54pub const TOPOLOGY_WIRE_VERSION_V1: u8 = 0x01;
55
56/// Highest tag the current parser understands. A `Topology` blob
57/// stamped with anything else is ignored cleanly so an older client
58/// can keep its URI-only routing fallback.
59pub const MAX_KNOWN_TOPOLOGY_VERSION: u8 = TOPOLOGY_WIRE_VERSION_V1;
60
61/// Header size (version tag + u32 body length).
62pub const TOPOLOGY_HEADER_SIZE: usize = 1 + 4;
63
64/// Canonical topology payload — the same shape both transports
65/// carry. New optional fields go in here as `Option<…>` /
66/// `Vec<…>` and ride the existing version tag; only a genuinely
67/// breaking change earns a new tag.
68#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct Topology {
70    pub epoch: u64,
71    pub primary: Endpoint,
72    pub replicas: Vec<ReplicaInfo>,
73}
74
75#[derive(Debug, Clone, PartialEq, Eq)]
76pub struct Endpoint {
77    pub addr: String,
78    pub region: String,
79}
80
81#[derive(Debug, Clone, PartialEq, Eq)]
82pub struct ReplicaInfo {
83    pub addr: String,
84    pub region: String,
85    pub healthy: bool,
86    pub lag_ms: u32,
87    pub last_applied_lsn: u64,
88}
89
90/// Decode-side errors. Distinct from "unknown version tag", which
91/// is reported as `Ok(None)` on `decode_topology` so the consumer
92/// can fall back to URI-only routing without branching on an
93/// error variant.
94#[derive(Debug, Clone, PartialEq, Eq)]
95pub enum TopologyError {
96    Truncated,
97    BodyLengthMismatch { declared: u32, available: usize },
98    InvalidUtf8,
99    StringTooLong { declared: u32, remaining: usize },
100}
101
102impl std::fmt::Display for TopologyError {
103    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104        match self {
105            Self::Truncated => write!(f, "topology blob truncated (< 5-byte header)"),
106            Self::BodyLengthMismatch {
107                declared,
108                available,
109            } => write!(
110                f,
111                "topology body length mismatch: declared {declared}, available {available}"
112            ),
113            Self::InvalidUtf8 => write!(f, "topology string field is not valid UTF-8"),
114            Self::StringTooLong {
115                declared,
116                remaining,
117            } => write!(
118                f,
119                "topology string length {declared} exceeds remaining body bytes {remaining}"
120            ),
121        }
122    }
123}
124
125impl std::error::Error for TopologyError {}
126
127/// Encode `topology` to the canonical version-tagged byte string.
128/// Same bytes consumed by both RedWire HelloAck (after base64
129/// embedding in the JSON envelope) and the gRPC `TopologyReply`
130/// (carried directly as a `bytes` field).
131pub fn encode_topology(topology: &Topology) -> Vec<u8> {
132    let mut body = Vec::with_capacity(estimate_body_size(topology));
133    body.extend_from_slice(&topology.epoch.to_le_bytes());
134    write_str(&mut body, &topology.primary.addr);
135    write_str(&mut body, &topology.primary.region);
136    body.extend_from_slice(&(topology.replicas.len() as u32).to_le_bytes());
137    for r in &topology.replicas {
138        write_str(&mut body, &r.addr);
139        write_str(&mut body, &r.region);
140        body.push(if r.healthy { 1 } else { 0 });
141        body.extend_from_slice(&r.lag_ms.to_le_bytes());
142        body.extend_from_slice(&r.last_applied_lsn.to_le_bytes());
143    }
144
145    let mut out = Vec::with_capacity(TOPOLOGY_HEADER_SIZE + body.len());
146    out.push(TOPOLOGY_WIRE_VERSION_V1);
147    out.extend_from_slice(&(body.len() as u32).to_le_bytes());
148    out.extend_from_slice(&body);
149    out
150}
151
152/// Decode a topology blob.
153///
154/// Returns:
155/// * `Ok(Some(Topology))` — recognised version tag, body parsed.
156/// * `Ok(None)` — unknown version tag. The consumer is expected to
157///   fall back to URI-only routing rather than treat this as an
158///   error (ADR 0008 §4: unknown fields are dropped, not rejected).
159/// * `Err(TopologyError)` — recognised tag, but the body was
160///   structurally malformed (truncated, invalid UTF-8, …).
161pub fn decode_topology(bytes: &[u8]) -> Result<Option<Topology>, TopologyError> {
162    if bytes.len() < TOPOLOGY_HEADER_SIZE {
163        return Err(TopologyError::Truncated);
164    }
165    let version = bytes[0];
166    let declared_len = u32::from_le_bytes([bytes[1], bytes[2], bytes[3], bytes[4]]);
167    let body = &bytes[TOPOLOGY_HEADER_SIZE..];
168    if (body.len() as u64) < declared_len as u64 {
169        return Err(TopologyError::BodyLengthMismatch {
170            declared: declared_len,
171            available: body.len(),
172        });
173    }
174    let body = &body[..declared_len as usize];
175
176    if version > MAX_KNOWN_TOPOLOGY_VERSION {
177        // Forward-compat: unknown version tag, drop cleanly.
178        return Ok(None);
179    }
180
181    // version == 0x01
182    let mut cur = Cursor::new(body);
183    let epoch = cur.read_u64()?;
184    let primary_addr = cur.read_str()?;
185    let primary_region = cur.read_str()?;
186    let replica_count = cur.read_u32()? as usize;
187    let mut replicas = Vec::with_capacity(replica_count);
188    for _ in 0..replica_count {
189        let addr = cur.read_str()?;
190        let region = cur.read_str()?;
191        let healthy = cur.read_u8()? != 0;
192        let lag_ms = cur.read_u32()?;
193        let last_applied_lsn = cur.read_u64()?;
194        replicas.push(ReplicaInfo {
195            addr,
196            region,
197            healthy,
198            lag_ms,
199            last_applied_lsn,
200        });
201    }
202    Ok(Some(Topology {
203        epoch,
204        primary: Endpoint {
205            addr: primary_addr,
206            region: primary_region,
207        },
208        replicas,
209    }))
210}
211
212fn estimate_body_size(t: &Topology) -> usize {
213    let endpoint = |e: &Endpoint| 4 + e.addr.len() + 4 + e.region.len();
214    let mut n = 8 + endpoint(&t.primary) + 4;
215    for r in &t.replicas {
216        n += 4 + r.addr.len() + 4 + r.region.len() + 1 + 4 + 8;
217    }
218    n
219}
220
221fn write_str(buf: &mut Vec<u8>, s: &str) {
222    buf.extend_from_slice(&(s.len() as u32).to_le_bytes());
223    buf.extend_from_slice(s.as_bytes());
224}
225
226struct Cursor<'a> {
227    buf: &'a [u8],
228    pos: usize,
229}
230
231impl<'a> Cursor<'a> {
232    fn new(buf: &'a [u8]) -> Self {
233        Self { buf, pos: 0 }
234    }
235
236    fn remaining(&self) -> usize {
237        self.buf.len() - self.pos
238    }
239
240    fn read_u8(&mut self) -> Result<u8, TopologyError> {
241        if self.remaining() < 1 {
242            return Err(TopologyError::Truncated);
243        }
244        let v = self.buf[self.pos];
245        self.pos += 1;
246        Ok(v)
247    }
248
249    fn read_u32(&mut self) -> Result<u32, TopologyError> {
250        if self.remaining() < 4 {
251            return Err(TopologyError::Truncated);
252        }
253        let bytes = &self.buf[self.pos..self.pos + 4];
254        self.pos += 4;
255        Ok(u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
256    }
257
258    fn read_u64(&mut self) -> Result<u64, TopologyError> {
259        if self.remaining() < 8 {
260            return Err(TopologyError::Truncated);
261        }
262        let bytes = &self.buf[self.pos..self.pos + 8];
263        self.pos += 8;
264        Ok(u64::from_le_bytes([
265            bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
266        ]))
267    }
268
269    fn read_str(&mut self) -> Result<String, TopologyError> {
270        let len = self.read_u32()?;
271        if (len as usize) > self.remaining() {
272            return Err(TopologyError::StringTooLong {
273                declared: len,
274                remaining: self.remaining(),
275            });
276        }
277        let bytes = &self.buf[self.pos..self.pos + len as usize];
278        self.pos += len as usize;
279        let s = std::str::from_utf8(bytes)
280            .map_err(|_| TopologyError::InvalidUtf8)?
281            .to_string();
282        Ok(s)
283    }
284}
285
286// ---------------------------------------------------------------
287// HelloAck embedding.
288//
289// The HelloAck payload is a JSON object. To carry the binary
290// topology blob inside it without breaking the existing JSON-only
291// parser (which deserialises the whole payload with `serde_json`),
292// the canonical bytes are base64-encoded under a new `topology`
293// key. Old parsers ignore unknown keys cleanly; new parsers extract
294// the field and run `decode_topology` on the decoded bytes.
295// ---------------------------------------------------------------
296
297/// Base64-encode the canonical topology bytes for embedding inside
298/// a HelloAck JSON payload as a string field.
299///
300/// The caller (server-side HelloAck builder) is expected to insert
301/// the resulting string under the JSON key `"topology"`; the client
302/// extracts that key, base64-decodes it, and runs `decode_topology`.
303pub fn encode_topology_for_hello_ack(topology: &Topology) -> String {
304    base64_encode(&encode_topology(topology))
305}
306
307/// Decode the base64 string carried in HelloAck JSON `topology`
308/// field back into a `Topology`.
309///
310/// Mirrors `decode_topology`'s three-state contract:
311/// * `Ok(Some(_))` — recognised version tag, body parsed.
312/// * `Ok(None)` — base64 decode failed *or* the version tag is
313///   unknown. Both cases collapse to "fall back to URI-only
314///   routing"; the consumer does not branch on which one it was.
315/// * `Err(_)` — recognised version tag with a malformed body.
316pub fn decode_topology_from_hello_ack(field: &str) -> Result<Option<Topology>, TopologyError> {
317    let Some(bytes) = base64_decode(field) else {
318        // Malformed base64 is treated as "unknown encoding, drop
319        // cleanly" so the client falls back to URI-only routing —
320        // same posture as an unknown version tag (ADR 0008 §4).
321        return Ok(None);
322    };
323    decode_topology(&bytes)
324}
325
326const B64_ALPHA: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
327
328fn base64_encode(input: &[u8]) -> String {
329    let mut out = String::with_capacity(input.len().div_ceil(3) * 4);
330    let chunks = input.chunks_exact(3);
331    let rem = chunks.remainder();
332    for c in chunks {
333        let n = ((c[0] as u32) << 16) | ((c[1] as u32) << 8) | (c[2] as u32);
334        out.push(B64_ALPHA[((n >> 18) & 0x3F) as usize] as char);
335        out.push(B64_ALPHA[((n >> 12) & 0x3F) as usize] as char);
336        out.push(B64_ALPHA[((n >> 6) & 0x3F) as usize] as char);
337        out.push(B64_ALPHA[(n & 0x3F) as usize] as char);
338    }
339    match rem {
340        [a] => {
341            let n = (*a as u32) << 16;
342            out.push(B64_ALPHA[((n >> 18) & 0x3F) as usize] as char);
343            out.push(B64_ALPHA[((n >> 12) & 0x3F) as usize] as char);
344            out.push('=');
345            out.push('=');
346        }
347        [a, b] => {
348            let n = ((*a as u32) << 16) | ((*b as u32) << 8);
349            out.push(B64_ALPHA[((n >> 18) & 0x3F) as usize] as char);
350            out.push(B64_ALPHA[((n >> 12) & 0x3F) as usize] as char);
351            out.push(B64_ALPHA[((n >> 6) & 0x3F) as usize] as char);
352            out.push('=');
353        }
354        _ => {}
355    }
356    out
357}
358
359fn base64_decode(input: &str) -> Option<Vec<u8>> {
360    let trimmed = input.trim_end_matches('=');
361    let mut out = Vec::with_capacity(trimmed.len() * 3 / 4);
362    let mut buf = 0u32;
363    let mut bits = 0u8;
364    for ch in trimmed.bytes() {
365        let v: u32 = match ch {
366            b'A'..=b'Z' => (ch - b'A') as u32,
367            b'a'..=b'z' => (ch - b'a' + 26) as u32,
368            b'0'..=b'9' => (ch - b'0' + 52) as u32,
369            b'+' => 62,
370            b'/' => 63,
371            _ => return None,
372        };
373        buf = (buf << 6) | v;
374        bits += 6;
375        if bits >= 8 {
376            bits -= 8;
377            out.push(((buf >> bits) & 0xFF) as u8);
378        }
379    }
380    Some(out)
381}
382
383#[cfg(test)]
384mod tests {
385    use super::*;
386
387    fn fixture() -> Topology {
388        Topology {
389            epoch: 0xDEAD_BEEF_CAFE_BABE,
390            primary: Endpoint {
391                addr: "primary.example.com:5050".into(),
392                region: "us-east-1".into(),
393            },
394            replicas: vec![
395                ReplicaInfo {
396                    addr: "replica-a.example.com:5050".into(),
397                    region: "us-east-1".into(),
398                    healthy: true,
399                    lag_ms: 12,
400                    last_applied_lsn: 4242,
401                },
402                ReplicaInfo {
403                    addr: "replica-b.example.com:5050".into(),
404                    region: "us-west-2".into(),
405                    healthy: false,
406                    lag_ms: 999,
407                    last_applied_lsn: 4100,
408                },
409            ],
410        }
411    }
412
413    #[test]
414    fn round_trip_v1() {
415        let t = fixture();
416        let bytes = encode_topology(&t);
417        let decoded = decode_topology(&bytes).expect("decode").expect("v1 known");
418        assert_eq!(decoded, t);
419    }
420
421    #[test]
422    fn empty_replicas_round_trip() {
423        let t = Topology {
424            epoch: 1,
425            primary: Endpoint {
426                addr: "p:5050".into(),
427                region: "r".into(),
428            },
429            replicas: vec![],
430        };
431        let bytes = encode_topology(&t);
432        let decoded = decode_topology(&bytes).expect("decode").expect("v1");
433        assert_eq!(decoded, t);
434    }
435
436    #[test]
437    fn unknown_version_tag_returns_none() {
438        // Forward-compat invariant from ADR 0008 §4: an unknown
439        // version tag must be ignored, not rejected. The consumer
440        // falls back to URI-only routing.
441        let mut bytes = encode_topology(&fixture());
442        bytes[0] = 0xFE; // bumped past MAX_KNOWN_TOPOLOGY_VERSION
443        let decoded = decode_topology(&bytes).expect("decode");
444        assert!(
445            decoded.is_none(),
446            "unknown version tag must drop cleanly, got {decoded:?}"
447        );
448    }
449
450    #[test]
451    fn truncated_header_errors() {
452        assert!(matches!(
453            decode_topology(&[0x01, 0x00]),
454            Err(TopologyError::Truncated)
455        ));
456    }
457
458    #[test]
459    fn body_length_mismatch_errors() {
460        // Declared body length larger than buffer.
461        let bytes = vec![0x01, 0xFF, 0xFF, 0xFF, 0xFF, 0x00];
462        assert!(matches!(
463            decode_topology(&bytes),
464            Err(TopologyError::BodyLengthMismatch { .. })
465        ));
466    }
467
468    #[test]
469    fn version_tag_is_pinned_to_0x01() {
470        // Sentinel against an accidental schema bump. ADR 0008 §4
471        // reserves the bump for genuinely breaking changes; a PR
472        // adding an optional field must NOT touch this value.
473        assert_eq!(TOPOLOGY_WIRE_VERSION_V1, 0x01);
474    }
475
476    #[test]
477    fn hello_ack_round_trip_via_base64() {
478        // The HelloAck embedding shape: encode → base64-string →
479        // decode. Same canonical bytes both transports carry, just
480        // base64-wrapped so the JSON envelope stays valid.
481        let t = fixture();
482        let field = encode_topology_for_hello_ack(&t);
483        let decoded = decode_topology_from_hello_ack(&field)
484            .expect("decode")
485            .expect("v1 known");
486        assert_eq!(decoded, t);
487    }
488
489    #[test]
490    fn hello_ack_inner_bytes_match_grpc_bytes() {
491        // The acceptance criterion (#166 §4): same bytes consumed
492        // by both transports. Round-trip via the HelloAck base64
493        // wrapper and assert the decoded inner payload is byte-for-
494        // byte equivalent to the canonical encoding.
495        let t = fixture();
496        let canonical = encode_topology(&t);
497        let field = encode_topology_for_hello_ack(&t);
498        let recovered = base64_decode(&field).expect("base64");
499        assert_eq!(recovered, canonical);
500    }
501
502    #[test]
503    fn hello_ack_unknown_version_tag_drops_cleanly() {
504        // A HelloAck whose topology field carries a future version
505        // tag must not panic — the client falls back to URI-only
506        // routing.
507        let mut bytes = encode_topology(&fixture());
508        bytes[0] = 0x99;
509        let field = base64_encode(&bytes);
510        let decoded = decode_topology_from_hello_ack(&field).expect("decode");
511        assert!(decoded.is_none());
512    }
513
514    #[test]
515    fn hello_ack_malformed_base64_drops_cleanly() {
516        // A garbled base64 field is treated like an unknown version
517        // tag: drop, fall back to URI-only routing, never panic.
518        let decoded = decode_topology_from_hello_ack("@not base64@").expect("decode");
519        assert!(decoded.is_none());
520    }
521
522    #[test]
523    fn old_hello_ack_without_topology_field_is_backwards_compat() {
524        // Backwards-compat (#166 acceptance criterion §6): an
525        // old-style HelloAck JSON payload — no `topology` key —
526        // still parses cleanly into a Topology slot of `None` on
527        // the consumer side. We model that here by extracting the
528        // JSON value the way the client will (look for the key,
529        // run our decoder if present), and checking the absence
530        // path resolves to "no topology, fall back to URI-only".
531        let json = br#"{"version":1,"auth":"bearer","features":3,"server":"reddb/0.2.9"}"#;
532        let v: serde_json_check::Value = serde_json_check::from_slice(json).expect("valid JSON");
533        let topo_field = v.find_string("topology");
534        let topology = match topo_field {
535            None => None,
536            Some(s) => decode_topology_from_hello_ack(&s).expect("decode"),
537        };
538        assert!(
539            topology.is_none(),
540            "an old HelloAck without `topology` must produce None"
541        );
542    }
543
544    /// Tiny JSON probe used only by the backwards-compat test.
545    /// `reddb-wire` has no JSON dep — keep this scoped to test
546    /// code so we do not pull serde into the production build.
547    mod serde_json_check {
548        pub enum Value {
549            Object(Vec<(String, Value)>),
550            String(String),
551            Other,
552        }
553
554        impl Value {
555            pub fn find_string(&self, key: &str) -> Option<String> {
556                match self {
557                    Value::Object(map) => map.iter().find_map(|(k, v)| {
558                        if k == key {
559                            if let Value::String(s) = v {
560                                Some(s.clone())
561                            } else {
562                                None
563                            }
564                        } else {
565                            None
566                        }
567                    }),
568                    _ => None,
569                }
570            }
571        }
572
573        pub fn from_slice(bytes: &[u8]) -> Result<Value, &'static str> {
574            let s = std::str::from_utf8(bytes).map_err(|_| "utf8")?;
575            let mut p = Parser { src: s, pos: 0 };
576            p.skip_ws();
577            let v = p.parse_value()?;
578            Ok(v)
579        }
580
581        struct Parser<'a> {
582            src: &'a str,
583            pos: usize,
584        }
585
586        impl<'a> Parser<'a> {
587            fn rest(&self) -> &'a str {
588                &self.src[self.pos..]
589            }
590            fn bump(&mut self, n: usize) {
591                self.pos += n;
592            }
593            fn skip_ws(&mut self) {
594                while let Some(c) = self.rest().chars().next() {
595                    if c.is_whitespace() {
596                        self.bump(c.len_utf8());
597                    } else {
598                        break;
599                    }
600                }
601            }
602            fn parse_value(&mut self) -> Result<Value, &'static str> {
603                self.skip_ws();
604                let head = self.rest().chars().next().ok_or("eof")?;
605                match head {
606                    '{' => self.parse_object(),
607                    '"' => self.parse_string().map(Value::String),
608                    _ => {
609                        // Skip primitives (numbers, true/false/null,
610                        // arrays). The probe only cares about object
611                        // keys at the top level, so coarse skipping
612                        // is enough for our fixture.
613                        self.skip_until_top_level_comma_or_close();
614                        Ok(Value::Other)
615                    }
616                }
617            }
618            fn skip_until_top_level_comma_or_close(&mut self) {
619                let mut depth = 0i32;
620                while let Some(c) = self.rest().chars().next() {
621                    match c {
622                        '"' => {
623                            let _ = self.parse_string();
624                            continue;
625                        }
626                        '{' | '[' => {
627                            depth += 1;
628                            self.bump(1);
629                        }
630                        '}' | ']' => {
631                            if depth == 0 {
632                                return;
633                            }
634                            depth -= 1;
635                            self.bump(1);
636                        }
637                        ',' if depth == 0 => return,
638                        _ => self.bump(c.len_utf8()),
639                    }
640                }
641            }
642            fn parse_object(&mut self) -> Result<Value, &'static str> {
643                self.bump(1); // '{'
644                let mut map = Vec::new();
645                loop {
646                    self.skip_ws();
647                    if self.rest().starts_with('}') {
648                        self.bump(1);
649                        return Ok(Value::Object(map));
650                    }
651                    let key = self.parse_string()?;
652                    self.skip_ws();
653                    if !self.rest().starts_with(':') {
654                        return Err("expected ':'");
655                    }
656                    self.bump(1);
657                    let val = self.parse_value()?;
658                    map.push((key, val));
659                    self.skip_ws();
660                    match self.rest().chars().next() {
661                        Some(',') => {
662                            self.bump(1);
663                            continue;
664                        }
665                        Some('}') => {
666                            self.bump(1);
667                            return Ok(Value::Object(map));
668                        }
669                        _ => return Err("expected ',' or '}'"),
670                    }
671                }
672            }
673            fn parse_string(&mut self) -> Result<String, &'static str> {
674                if !self.rest().starts_with('"') {
675                    return Err("expected '\"'");
676                }
677                self.bump(1);
678                let start = self.pos;
679                while let Some(c) = self.rest().chars().next() {
680                    if c == '"' {
681                        let s = self.src[start..self.pos].to_string();
682                        self.bump(1);
683                        return Ok(s);
684                    }
685                    if c == '\\' {
686                        self.bump(c.len_utf8());
687                    }
688                    self.bump(c.len_utf8());
689                }
690                Err("unterminated string")
691            }
692        }
693    }
694
695    #[test]
696    fn header_layout_first_byte_is_version_then_le_length() {
697        // Pinning the layout: byte 0 is the tag, bytes 1..5 are the
698        // little-endian body length. A consumer that sees an unknown
699        // tag still knows how many bytes to skip — that is the only
700        // forward-compat invariant the header has to carry.
701        let t = fixture();
702        let bytes = encode_topology(&t);
703        assert_eq!(bytes[0], TOPOLOGY_WIRE_VERSION_V1);
704        let declared = u32::from_le_bytes([bytes[1], bytes[2], bytes[3], bytes[4]]);
705        assert_eq!(declared as usize, bytes.len() - TOPOLOGY_HEADER_SIZE);
706    }
707}