1pub const TOPOLOGY_WIRE_VERSION_V1: u8 = 0x01;
55
56pub const MAX_KNOWN_TOPOLOGY_VERSION: u8 = TOPOLOGY_WIRE_VERSION_V1;
60
61pub const TOPOLOGY_HEADER_SIZE: usize = 1 + 4;
63
64#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct Topology {
70 pub epoch: u64,
71 pub primary: Endpoint,
72 pub replicas: Vec<ReplicaInfo>,
73}
74
75#[derive(Debug, Clone, PartialEq, Eq)]
76pub struct Endpoint {
77 pub addr: String,
78 pub region: String,
79}
80
81#[derive(Debug, Clone, PartialEq, Eq)]
82pub struct ReplicaInfo {
83 pub addr: String,
84 pub region: String,
85 pub healthy: bool,
86 pub lag_ms: u32,
87 pub last_applied_lsn: u64,
88}
89
90#[derive(Debug, Clone, PartialEq, Eq)]
95pub enum TopologyError {
96 Truncated,
97 BodyLengthMismatch { declared: u32, available: usize },
98 InvalidUtf8,
99 StringTooLong { declared: u32, remaining: usize },
100}
101
102impl std::fmt::Display for TopologyError {
103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104 match self {
105 Self::Truncated => write!(f, "topology blob truncated (< 5-byte header)"),
106 Self::BodyLengthMismatch {
107 declared,
108 available,
109 } => write!(
110 f,
111 "topology body length mismatch: declared {declared}, available {available}"
112 ),
113 Self::InvalidUtf8 => write!(f, "topology string field is not valid UTF-8"),
114 Self::StringTooLong {
115 declared,
116 remaining,
117 } => write!(
118 f,
119 "topology string length {declared} exceeds remaining body bytes {remaining}"
120 ),
121 }
122 }
123}
124
125impl std::error::Error for TopologyError {}
126
127pub fn encode_topology(topology: &Topology) -> Vec<u8> {
132 let mut body = Vec::with_capacity(estimate_body_size(topology));
133 body.extend_from_slice(&topology.epoch.to_le_bytes());
134 write_str(&mut body, &topology.primary.addr);
135 write_str(&mut body, &topology.primary.region);
136 body.extend_from_slice(&(topology.replicas.len() as u32).to_le_bytes());
137 for r in &topology.replicas {
138 write_str(&mut body, &r.addr);
139 write_str(&mut body, &r.region);
140 body.push(if r.healthy { 1 } else { 0 });
141 body.extend_from_slice(&r.lag_ms.to_le_bytes());
142 body.extend_from_slice(&r.last_applied_lsn.to_le_bytes());
143 }
144
145 let mut out = Vec::with_capacity(TOPOLOGY_HEADER_SIZE + body.len());
146 out.push(TOPOLOGY_WIRE_VERSION_V1);
147 out.extend_from_slice(&(body.len() as u32).to_le_bytes());
148 out.extend_from_slice(&body);
149 out
150}
151
152pub fn decode_topology(bytes: &[u8]) -> Result<Option<Topology>, TopologyError> {
162 if bytes.len() < TOPOLOGY_HEADER_SIZE {
163 return Err(TopologyError::Truncated);
164 }
165 let version = bytes[0];
166 let declared_len = u32::from_le_bytes([bytes[1], bytes[2], bytes[3], bytes[4]]);
167 let body = &bytes[TOPOLOGY_HEADER_SIZE..];
168 if (body.len() as u64) < declared_len as u64 {
169 return Err(TopologyError::BodyLengthMismatch {
170 declared: declared_len,
171 available: body.len(),
172 });
173 }
174 let body = &body[..declared_len as usize];
175
176 if version > MAX_KNOWN_TOPOLOGY_VERSION {
177 return Ok(None);
179 }
180
181 let mut cur = Cursor::new(body);
183 let epoch = cur.read_u64()?;
184 let primary_addr = cur.read_str()?;
185 let primary_region = cur.read_str()?;
186 let replica_count = cur.read_u32()? as usize;
187 let mut replicas = Vec::with_capacity(replica_count);
188 for _ in 0..replica_count {
189 let addr = cur.read_str()?;
190 let region = cur.read_str()?;
191 let healthy = cur.read_u8()? != 0;
192 let lag_ms = cur.read_u32()?;
193 let last_applied_lsn = cur.read_u64()?;
194 replicas.push(ReplicaInfo {
195 addr,
196 region,
197 healthy,
198 lag_ms,
199 last_applied_lsn,
200 });
201 }
202 Ok(Some(Topology {
203 epoch,
204 primary: Endpoint {
205 addr: primary_addr,
206 region: primary_region,
207 },
208 replicas,
209 }))
210}
211
212fn estimate_body_size(t: &Topology) -> usize {
213 let endpoint = |e: &Endpoint| 4 + e.addr.len() + 4 + e.region.len();
214 let mut n = 8 + endpoint(&t.primary) + 4;
215 for r in &t.replicas {
216 n += 4 + r.addr.len() + 4 + r.region.len() + 1 + 4 + 8;
217 }
218 n
219}
220
221fn write_str(buf: &mut Vec<u8>, s: &str) {
222 buf.extend_from_slice(&(s.len() as u32).to_le_bytes());
223 buf.extend_from_slice(s.as_bytes());
224}
225
226struct Cursor<'a> {
227 buf: &'a [u8],
228 pos: usize,
229}
230
231impl<'a> Cursor<'a> {
232 fn new(buf: &'a [u8]) -> Self {
233 Self { buf, pos: 0 }
234 }
235
236 fn remaining(&self) -> usize {
237 self.buf.len() - self.pos
238 }
239
240 fn read_u8(&mut self) -> Result<u8, TopologyError> {
241 if self.remaining() < 1 {
242 return Err(TopologyError::Truncated);
243 }
244 let v = self.buf[self.pos];
245 self.pos += 1;
246 Ok(v)
247 }
248
249 fn read_u32(&mut self) -> Result<u32, TopologyError> {
250 if self.remaining() < 4 {
251 return Err(TopologyError::Truncated);
252 }
253 let bytes = &self.buf[self.pos..self.pos + 4];
254 self.pos += 4;
255 Ok(u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
256 }
257
258 fn read_u64(&mut self) -> Result<u64, TopologyError> {
259 if self.remaining() < 8 {
260 return Err(TopologyError::Truncated);
261 }
262 let bytes = &self.buf[self.pos..self.pos + 8];
263 self.pos += 8;
264 Ok(u64::from_le_bytes([
265 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
266 ]))
267 }
268
269 fn read_str(&mut self) -> Result<String, TopologyError> {
270 let len = self.read_u32()?;
271 if (len as usize) > self.remaining() {
272 return Err(TopologyError::StringTooLong {
273 declared: len,
274 remaining: self.remaining(),
275 });
276 }
277 let bytes = &self.buf[self.pos..self.pos + len as usize];
278 self.pos += len as usize;
279 let s = std::str::from_utf8(bytes)
280 .map_err(|_| TopologyError::InvalidUtf8)?
281 .to_string();
282 Ok(s)
283 }
284}
285
286pub fn encode_topology_for_hello_ack(topology: &Topology) -> String {
304 base64_encode(&encode_topology(topology))
305}
306
307pub fn decode_topology_from_hello_ack(field: &str) -> Result<Option<Topology>, TopologyError> {
317 let Some(bytes) = base64_decode(field) else {
318 return Ok(None);
322 };
323 decode_topology(&bytes)
324}
325
326const B64_ALPHA: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
327
328fn base64_encode(input: &[u8]) -> String {
329 let mut out = String::with_capacity(input.len().div_ceil(3) * 4);
330 let chunks = input.chunks_exact(3);
331 let rem = chunks.remainder();
332 for c in chunks {
333 let n = ((c[0] as u32) << 16) | ((c[1] as u32) << 8) | (c[2] as u32);
334 out.push(B64_ALPHA[((n >> 18) & 0x3F) as usize] as char);
335 out.push(B64_ALPHA[((n >> 12) & 0x3F) as usize] as char);
336 out.push(B64_ALPHA[((n >> 6) & 0x3F) as usize] as char);
337 out.push(B64_ALPHA[(n & 0x3F) as usize] as char);
338 }
339 match rem {
340 [a] => {
341 let n = (*a as u32) << 16;
342 out.push(B64_ALPHA[((n >> 18) & 0x3F) as usize] as char);
343 out.push(B64_ALPHA[((n >> 12) & 0x3F) as usize] as char);
344 out.push('=');
345 out.push('=');
346 }
347 [a, b] => {
348 let n = ((*a as u32) << 16) | ((*b as u32) << 8);
349 out.push(B64_ALPHA[((n >> 18) & 0x3F) as usize] as char);
350 out.push(B64_ALPHA[((n >> 12) & 0x3F) as usize] as char);
351 out.push(B64_ALPHA[((n >> 6) & 0x3F) as usize] as char);
352 out.push('=');
353 }
354 _ => {}
355 }
356 out
357}
358
359fn base64_decode(input: &str) -> Option<Vec<u8>> {
360 let trimmed = input.trim_end_matches('=');
361 let mut out = Vec::with_capacity(trimmed.len() * 3 / 4);
362 let mut buf = 0u32;
363 let mut bits = 0u8;
364 for ch in trimmed.bytes() {
365 let v: u32 = match ch {
366 b'A'..=b'Z' => (ch - b'A') as u32,
367 b'a'..=b'z' => (ch - b'a' + 26) as u32,
368 b'0'..=b'9' => (ch - b'0' + 52) as u32,
369 b'+' => 62,
370 b'/' => 63,
371 _ => return None,
372 };
373 buf = (buf << 6) | v;
374 bits += 6;
375 if bits >= 8 {
376 bits -= 8;
377 out.push(((buf >> bits) & 0xFF) as u8);
378 }
379 }
380 Some(out)
381}
382
383#[cfg(test)]
384mod tests {
385 use super::*;
386
387 fn fixture() -> Topology {
388 Topology {
389 epoch: 0xDEAD_BEEF_CAFE_BABE,
390 primary: Endpoint {
391 addr: "primary.example.com:5050".into(),
392 region: "us-east-1".into(),
393 },
394 replicas: vec![
395 ReplicaInfo {
396 addr: "replica-a.example.com:5050".into(),
397 region: "us-east-1".into(),
398 healthy: true,
399 lag_ms: 12,
400 last_applied_lsn: 4242,
401 },
402 ReplicaInfo {
403 addr: "replica-b.example.com:5050".into(),
404 region: "us-west-2".into(),
405 healthy: false,
406 lag_ms: 999,
407 last_applied_lsn: 4100,
408 },
409 ],
410 }
411 }
412
413 #[test]
414 fn round_trip_v1() {
415 let t = fixture();
416 let bytes = encode_topology(&t);
417 let decoded = decode_topology(&bytes).expect("decode").expect("v1 known");
418 assert_eq!(decoded, t);
419 }
420
421 #[test]
422 fn empty_replicas_round_trip() {
423 let t = Topology {
424 epoch: 1,
425 primary: Endpoint {
426 addr: "p:5050".into(),
427 region: "r".into(),
428 },
429 replicas: vec![],
430 };
431 let bytes = encode_topology(&t);
432 let decoded = decode_topology(&bytes).expect("decode").expect("v1");
433 assert_eq!(decoded, t);
434 }
435
436 #[test]
437 fn unknown_version_tag_returns_none() {
438 let mut bytes = encode_topology(&fixture());
442 bytes[0] = 0xFE; let decoded = decode_topology(&bytes).expect("decode");
444 assert!(
445 decoded.is_none(),
446 "unknown version tag must drop cleanly, got {decoded:?}"
447 );
448 }
449
450 #[test]
451 fn truncated_header_errors() {
452 assert!(matches!(
453 decode_topology(&[0x01, 0x00]),
454 Err(TopologyError::Truncated)
455 ));
456 }
457
458 #[test]
459 fn body_length_mismatch_errors() {
460 let bytes = vec![0x01, 0xFF, 0xFF, 0xFF, 0xFF, 0x00];
462 assert!(matches!(
463 decode_topology(&bytes),
464 Err(TopologyError::BodyLengthMismatch { .. })
465 ));
466 }
467
468 #[test]
469 fn version_tag_is_pinned_to_0x01() {
470 assert_eq!(TOPOLOGY_WIRE_VERSION_V1, 0x01);
474 }
475
476 #[test]
477 fn hello_ack_round_trip_via_base64() {
478 let t = fixture();
482 let field = encode_topology_for_hello_ack(&t);
483 let decoded = decode_topology_from_hello_ack(&field)
484 .expect("decode")
485 .expect("v1 known");
486 assert_eq!(decoded, t);
487 }
488
489 #[test]
490 fn hello_ack_inner_bytes_match_grpc_bytes() {
491 let t = fixture();
496 let canonical = encode_topology(&t);
497 let field = encode_topology_for_hello_ack(&t);
498 let recovered = base64_decode(&field).expect("base64");
499 assert_eq!(recovered, canonical);
500 }
501
502 #[test]
503 fn hello_ack_unknown_version_tag_drops_cleanly() {
504 let mut bytes = encode_topology(&fixture());
508 bytes[0] = 0x99;
509 let field = base64_encode(&bytes);
510 let decoded = decode_topology_from_hello_ack(&field).expect("decode");
511 assert!(decoded.is_none());
512 }
513
514 #[test]
515 fn hello_ack_malformed_base64_drops_cleanly() {
516 let decoded = decode_topology_from_hello_ack("@not base64@").expect("decode");
519 assert!(decoded.is_none());
520 }
521
522 #[test]
523 fn old_hello_ack_without_topology_field_is_backwards_compat() {
524 let json = br#"{"version":1,"auth":"bearer","features":3,"server":"reddb/0.2.9"}"#;
532 let v: serde_json_check::Value = serde_json_check::from_slice(json).expect("valid JSON");
533 let topo_field = v.find_string("topology");
534 let topology = match topo_field {
535 None => None,
536 Some(s) => decode_topology_from_hello_ack(&s).expect("decode"),
537 };
538 assert!(
539 topology.is_none(),
540 "an old HelloAck without `topology` must produce None"
541 );
542 }
543
544 mod serde_json_check {
548 pub enum Value {
549 Object(Vec<(String, Value)>),
550 String(String),
551 Other,
552 }
553
554 impl Value {
555 pub fn find_string(&self, key: &str) -> Option<String> {
556 match self {
557 Value::Object(map) => map.iter().find_map(|(k, v)| {
558 if k == key {
559 if let Value::String(s) = v {
560 Some(s.clone())
561 } else {
562 None
563 }
564 } else {
565 None
566 }
567 }),
568 _ => None,
569 }
570 }
571 }
572
573 pub fn from_slice(bytes: &[u8]) -> Result<Value, &'static str> {
574 let s = std::str::from_utf8(bytes).map_err(|_| "utf8")?;
575 let mut p = Parser { src: s, pos: 0 };
576 p.skip_ws();
577 let v = p.parse_value()?;
578 Ok(v)
579 }
580
581 struct Parser<'a> {
582 src: &'a str,
583 pos: usize,
584 }
585
586 impl<'a> Parser<'a> {
587 fn rest(&self) -> &'a str {
588 &self.src[self.pos..]
589 }
590 fn bump(&mut self, n: usize) {
591 self.pos += n;
592 }
593 fn skip_ws(&mut self) {
594 while let Some(c) = self.rest().chars().next() {
595 if c.is_whitespace() {
596 self.bump(c.len_utf8());
597 } else {
598 break;
599 }
600 }
601 }
602 fn parse_value(&mut self) -> Result<Value, &'static str> {
603 self.skip_ws();
604 let head = self.rest().chars().next().ok_or("eof")?;
605 match head {
606 '{' => self.parse_object(),
607 '"' => self.parse_string().map(Value::String),
608 _ => {
609 self.skip_until_top_level_comma_or_close();
614 Ok(Value::Other)
615 }
616 }
617 }
618 fn skip_until_top_level_comma_or_close(&mut self) {
619 let mut depth = 0i32;
620 while let Some(c) = self.rest().chars().next() {
621 match c {
622 '"' => {
623 let _ = self.parse_string();
624 continue;
625 }
626 '{' | '[' => {
627 depth += 1;
628 self.bump(1);
629 }
630 '}' | ']' => {
631 if depth == 0 {
632 return;
633 }
634 depth -= 1;
635 self.bump(1);
636 }
637 ',' if depth == 0 => return,
638 _ => self.bump(c.len_utf8()),
639 }
640 }
641 }
642 fn parse_object(&mut self) -> Result<Value, &'static str> {
643 self.bump(1); let mut map = Vec::new();
645 loop {
646 self.skip_ws();
647 if self.rest().starts_with('}') {
648 self.bump(1);
649 return Ok(Value::Object(map));
650 }
651 let key = self.parse_string()?;
652 self.skip_ws();
653 if !self.rest().starts_with(':') {
654 return Err("expected ':'");
655 }
656 self.bump(1);
657 let val = self.parse_value()?;
658 map.push((key, val));
659 self.skip_ws();
660 match self.rest().chars().next() {
661 Some(',') => {
662 self.bump(1);
663 continue;
664 }
665 Some('}') => {
666 self.bump(1);
667 return Ok(Value::Object(map));
668 }
669 _ => return Err("expected ',' or '}'"),
670 }
671 }
672 }
673 fn parse_string(&mut self) -> Result<String, &'static str> {
674 if !self.rest().starts_with('"') {
675 return Err("expected '\"'");
676 }
677 self.bump(1);
678 let start = self.pos;
679 while let Some(c) = self.rest().chars().next() {
680 if c == '"' {
681 let s = self.src[start..self.pos].to_string();
682 self.bump(1);
683 return Ok(s);
684 }
685 if c == '\\' {
686 self.bump(c.len_utf8());
687 }
688 self.bump(c.len_utf8());
689 }
690 Err("unterminated string")
691 }
692 }
693 }
694
695 #[test]
696 fn header_layout_first_byte_is_version_then_le_length() {
697 let t = fixture();
702 let bytes = encode_topology(&t);
703 assert_eq!(bytes[0], TOPOLOGY_WIRE_VERSION_V1);
704 let declared = u32::from_le_bytes([bytes[1], bytes[2], bytes[3], bytes[4]]);
705 assert_eq!(declared as usize, bytes.len() - TOPOLOGY_HEADER_SIZE);
706 }
707}