1pub const TOPOLOGY_WIRE_VERSION_V1: u8 = 0x01;
55
56pub const MAX_KNOWN_TOPOLOGY_VERSION: u8 = TOPOLOGY_WIRE_VERSION_V1;
60
61pub const TOPOLOGY_HEADER_SIZE: usize = 1 + 4;
63
64#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct Topology {
70 pub epoch: u64,
71 pub primary: Endpoint,
72 pub replicas: Vec<ReplicaInfo>,
73}
74
75#[derive(Debug, Clone, PartialEq, Eq)]
76pub struct Endpoint {
77 pub addr: String,
78 pub region: String,
79}
80
81#[derive(Debug, Clone, PartialEq, Eq)]
82pub struct ReplicaInfo {
83 pub addr: String,
84 pub region: String,
85 pub healthy: bool,
86 pub lag_ms: u32,
87 pub last_applied_lsn: u64,
88 pub rebootstrapping: bool,
101}
102
103#[derive(Debug, Clone, PartialEq, Eq)]
108pub enum TopologyError {
109 Truncated,
110 BodyLengthMismatch { declared: u32, available: usize },
111 InvalidUtf8,
112 StringTooLong { declared: u32, remaining: usize },
113}
114
115impl std::fmt::Display for TopologyError {
116 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117 match self {
118 Self::Truncated => write!(f, "topology blob truncated (< 5-byte header)"),
119 Self::BodyLengthMismatch {
120 declared,
121 available,
122 } => write!(
123 f,
124 "topology body length mismatch: declared {declared}, available {available}"
125 ),
126 Self::InvalidUtf8 => write!(f, "topology string field is not valid UTF-8"),
127 Self::StringTooLong {
128 declared,
129 remaining,
130 } => write!(
131 f,
132 "topology string length {declared} exceeds remaining body bytes {remaining}"
133 ),
134 }
135 }
136}
137
138impl std::error::Error for TopologyError {}
139
140pub fn encode_topology(topology: &Topology) -> Vec<u8> {
145 let mut body = Vec::with_capacity(estimate_body_size(topology));
146 body.extend_from_slice(&topology.epoch.to_le_bytes());
147 write_str(&mut body, &topology.primary.addr);
148 write_str(&mut body, &topology.primary.region);
149 body.extend_from_slice(&(topology.replicas.len() as u32).to_le_bytes());
150 for r in &topology.replicas {
151 write_str(&mut body, &r.addr);
152 write_str(&mut body, &r.region);
153 body.push(if r.healthy { 1 } else { 0 });
154 body.extend_from_slice(&r.lag_ms.to_le_bytes());
155 body.extend_from_slice(&r.last_applied_lsn.to_le_bytes());
156 }
157 for r in &topology.replicas {
163 body.push(if r.rebootstrapping { 1 } else { 0 });
164 }
165
166 let mut out = Vec::with_capacity(TOPOLOGY_HEADER_SIZE + body.len());
167 out.push(TOPOLOGY_WIRE_VERSION_V1);
168 out.extend_from_slice(&(body.len() as u32).to_le_bytes());
169 out.extend_from_slice(&body);
170 out
171}
172
173pub fn decode_topology(bytes: &[u8]) -> Result<Option<Topology>, TopologyError> {
183 if bytes.len() < TOPOLOGY_HEADER_SIZE {
184 return Err(TopologyError::Truncated);
185 }
186 let version = bytes[0];
187 let declared_len = u32::from_le_bytes([bytes[1], bytes[2], bytes[3], bytes[4]]);
188 let body = &bytes[TOPOLOGY_HEADER_SIZE..];
189 if (body.len() as u64) < declared_len as u64 {
190 return Err(TopologyError::BodyLengthMismatch {
191 declared: declared_len,
192 available: body.len(),
193 });
194 }
195 let body = &body[..declared_len as usize];
196
197 if version > MAX_KNOWN_TOPOLOGY_VERSION {
198 return Ok(None);
200 }
201
202 let mut cur = Cursor::new(body);
204 let epoch = cur.read_u64()?;
205 let primary_addr = cur.read_str()?;
206 let primary_region = cur.read_str()?;
207 let replica_count = cur.read_u32()? as usize;
208 let mut replicas = Vec::with_capacity(replica_count);
209 for _ in 0..replica_count {
210 let addr = cur.read_str()?;
211 let region = cur.read_str()?;
212 let healthy = cur.read_u8()? != 0;
213 let lag_ms = cur.read_u32()?;
214 let last_applied_lsn = cur.read_u64()?;
215 replicas.push(ReplicaInfo {
216 addr,
217 region,
218 healthy,
219 lag_ms,
220 last_applied_lsn,
221 rebootstrapping: false,
225 });
226 }
227 for r in replicas.iter_mut() {
234 if cur.remaining() == 0 {
235 break;
236 }
237 r.rebootstrapping = cur.read_u8()? != 0;
238 }
239 Ok(Some(Topology {
240 epoch,
241 primary: Endpoint {
242 addr: primary_addr,
243 region: primary_region,
244 },
245 replicas,
246 }))
247}
248
249fn estimate_body_size(t: &Topology) -> usize {
250 let endpoint = |e: &Endpoint| 4 + e.addr.len() + 4 + e.region.len();
251 let mut n = 8 + endpoint(&t.primary) + 4;
252 for r in &t.replicas {
253 n += 4 + r.addr.len() + 4 + r.region.len() + 1 + 4 + 8;
254 }
255 n += t.replicas.len();
257 n
258}
259
260fn write_str(buf: &mut Vec<u8>, s: &str) {
261 buf.extend_from_slice(&(s.len() as u32).to_le_bytes());
262 buf.extend_from_slice(s.as_bytes());
263}
264
265struct Cursor<'a> {
266 buf: &'a [u8],
267 pos: usize,
268}
269
270impl<'a> Cursor<'a> {
271 fn new(buf: &'a [u8]) -> Self {
272 Self { buf, pos: 0 }
273 }
274
275 fn remaining(&self) -> usize {
276 self.buf.len() - self.pos
277 }
278
279 fn read_u8(&mut self) -> Result<u8, TopologyError> {
280 if self.remaining() < 1 {
281 return Err(TopologyError::Truncated);
282 }
283 let v = self.buf[self.pos];
284 self.pos += 1;
285 Ok(v)
286 }
287
288 fn read_u32(&mut self) -> Result<u32, TopologyError> {
289 if self.remaining() < 4 {
290 return Err(TopologyError::Truncated);
291 }
292 let bytes = &self.buf[self.pos..self.pos + 4];
293 self.pos += 4;
294 Ok(u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
295 }
296
297 fn read_u64(&mut self) -> Result<u64, TopologyError> {
298 if self.remaining() < 8 {
299 return Err(TopologyError::Truncated);
300 }
301 let bytes = &self.buf[self.pos..self.pos + 8];
302 self.pos += 8;
303 Ok(u64::from_le_bytes([
304 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
305 ]))
306 }
307
308 fn read_str(&mut self) -> Result<String, TopologyError> {
309 let len = self.read_u32()?;
310 if (len as usize) > self.remaining() {
311 return Err(TopologyError::StringTooLong {
312 declared: len,
313 remaining: self.remaining(),
314 });
315 }
316 let bytes = &self.buf[self.pos..self.pos + len as usize];
317 self.pos += len as usize;
318 let s = std::str::from_utf8(bytes)
319 .map_err(|_| TopologyError::InvalidUtf8)?
320 .to_string();
321 Ok(s)
322 }
323}
324
325pub fn encode_topology_for_hello_ack(topology: &Topology) -> String {
343 base64_encode(&encode_topology(topology))
344}
345
346pub fn decode_topology_from_hello_ack(field: &str) -> Result<Option<Topology>, TopologyError> {
356 let Some(bytes) = base64_decode(field) else {
357 return Ok(None);
361 };
362 decode_topology(&bytes)
363}
364
365const B64_ALPHA: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
366
367fn base64_encode(input: &[u8]) -> String {
368 let mut out = String::with_capacity(input.len().div_ceil(3) * 4);
369 let chunks = input.chunks_exact(3);
370 let rem = chunks.remainder();
371 for c in chunks {
372 let n = ((c[0] as u32) << 16) | ((c[1] as u32) << 8) | (c[2] as u32);
373 out.push(B64_ALPHA[((n >> 18) & 0x3F) as usize] as char);
374 out.push(B64_ALPHA[((n >> 12) & 0x3F) as usize] as char);
375 out.push(B64_ALPHA[((n >> 6) & 0x3F) as usize] as char);
376 out.push(B64_ALPHA[(n & 0x3F) as usize] as char);
377 }
378 match rem {
379 [a] => {
380 let n = (*a as u32) << 16;
381 out.push(B64_ALPHA[((n >> 18) & 0x3F) as usize] as char);
382 out.push(B64_ALPHA[((n >> 12) & 0x3F) as usize] as char);
383 out.push('=');
384 out.push('=');
385 }
386 [a, b] => {
387 let n = ((*a as u32) << 16) | ((*b as u32) << 8);
388 out.push(B64_ALPHA[((n >> 18) & 0x3F) as usize] as char);
389 out.push(B64_ALPHA[((n >> 12) & 0x3F) as usize] as char);
390 out.push(B64_ALPHA[((n >> 6) & 0x3F) as usize] as char);
391 out.push('=');
392 }
393 _ => {}
394 }
395 out
396}
397
398fn base64_decode(input: &str) -> Option<Vec<u8>> {
399 let trimmed = input.trim_end_matches('=');
400 let mut out = Vec::with_capacity(trimmed.len() * 3 / 4);
401 let mut buf = 0u32;
402 let mut bits = 0u8;
403 for ch in trimmed.bytes() {
404 let v: u32 = match ch {
405 b'A'..=b'Z' => (ch - b'A') as u32,
406 b'a'..=b'z' => (ch - b'a' + 26) as u32,
407 b'0'..=b'9' => (ch - b'0' + 52) as u32,
408 b'+' => 62,
409 b'/' => 63,
410 _ => return None,
411 };
412 buf = (buf << 6) | v;
413 bits += 6;
414 if bits >= 8 {
415 bits -= 8;
416 out.push(((buf >> bits) & 0xFF) as u8);
417 }
418 }
419 Some(out)
420}
421
422#[cfg(test)]
423mod tests {
424 use super::*;
425
426 fn fixture() -> Topology {
427 Topology {
428 epoch: 0xDEAD_BEEF_CAFE_BABE,
429 primary: Endpoint {
430 addr: "primary.example.com:5050".into(),
431 region: "us-east-1".into(),
432 },
433 replicas: vec![
434 ReplicaInfo {
435 addr: "replica-a.example.com:5050".into(),
436 region: "us-east-1".into(),
437 healthy: true,
438 lag_ms: 12,
439 last_applied_lsn: 4242,
440 rebootstrapping: false,
441 },
442 ReplicaInfo {
443 addr: "replica-b.example.com:5050".into(),
444 region: "us-west-2".into(),
445 healthy: false,
446 lag_ms: 999,
447 last_applied_lsn: 4100,
448 rebootstrapping: true,
449 },
450 ],
451 }
452 }
453
454 #[test]
455 fn round_trip_v1() {
456 let t = fixture();
457 let bytes = encode_topology(&t);
458 let decoded = decode_topology(&bytes).expect("decode").expect("v1 known");
459 assert_eq!(decoded, t);
460 }
461
462 #[test]
463 fn empty_replicas_round_trip() {
464 let t = Topology {
465 epoch: 1,
466 primary: Endpoint {
467 addr: "p:5050".into(),
468 region: "r".into(),
469 },
470 replicas: vec![],
471 };
472 let bytes = encode_topology(&t);
473 let decoded = decode_topology(&bytes).expect("decode").expect("v1");
474 assert_eq!(decoded, t);
475 }
476
477 #[test]
478 fn unknown_version_tag_returns_none() {
479 let mut bytes = encode_topology(&fixture());
483 bytes[0] = 0xFE; let decoded = decode_topology(&bytes).expect("decode");
485 assert!(
486 decoded.is_none(),
487 "unknown version tag must drop cleanly, got {decoded:?}"
488 );
489 }
490
491 #[test]
492 fn truncated_header_errors() {
493 assert!(matches!(
494 decode_topology(&[0x01, 0x00]),
495 Err(TopologyError::Truncated)
496 ));
497 }
498
499 #[test]
500 fn body_length_mismatch_errors() {
501 let bytes = vec![0x01, 0xFF, 0xFF, 0xFF, 0xFF, 0x00];
503 assert!(matches!(
504 decode_topology(&bytes),
505 Err(TopologyError::BodyLengthMismatch { .. })
506 ));
507 }
508
509 #[test]
510 fn version_tag_is_pinned_to_0x01() {
511 assert_eq!(TOPOLOGY_WIRE_VERSION_V1, 0x01);
515 }
516
517 #[test]
518 fn rebootstrapping_flag_round_trips_per_replica() {
519 let t = fixture();
522 let decoded = decode_topology(&encode_topology(&t))
523 .expect("decode")
524 .expect("v1 known");
525 assert!(!decoded.replicas[0].rebootstrapping);
526 assert!(decoded.replicas[1].rebootstrapping);
527 }
528
529 #[test]
530 fn legacy_blob_without_trailing_block_defaults_rebootstrapping_false() {
531 let t = fixture();
537 let full = encode_topology(&t);
538 let trailing = t.replicas.len();
539 let body_len = (full.len() - TOPOLOGY_HEADER_SIZE - trailing) as u32;
540 let mut legacy = Vec::new();
541 legacy.push(TOPOLOGY_WIRE_VERSION_V1);
542 legacy.extend_from_slice(&body_len.to_le_bytes());
543 legacy.extend_from_slice(&full[TOPOLOGY_HEADER_SIZE..full.len() - trailing]);
544 let decoded = decode_topology(&legacy).expect("decode").expect("v1 known");
545 assert_eq!(decoded.replicas.len(), 2);
546 assert!(decoded.replicas.iter().all(|r| !r.rebootstrapping));
547 assert_eq!(decoded.replicas[0].last_applied_lsn, 4242);
550 assert_eq!(decoded.replicas[1].lag_ms, 999);
551 }
552
553 #[test]
554 fn hello_ack_round_trip_via_base64() {
555 let t = fixture();
559 let field = encode_topology_for_hello_ack(&t);
560 let decoded = decode_topology_from_hello_ack(&field)
561 .expect("decode")
562 .expect("v1 known");
563 assert_eq!(decoded, t);
564 }
565
566 #[test]
567 fn hello_ack_inner_bytes_match_grpc_bytes() {
568 let t = fixture();
573 let canonical = encode_topology(&t);
574 let field = encode_topology_for_hello_ack(&t);
575 let recovered = base64_decode(&field).expect("base64");
576 assert_eq!(recovered, canonical);
577 }
578
579 #[test]
580 fn hello_ack_unknown_version_tag_drops_cleanly() {
581 let mut bytes = encode_topology(&fixture());
585 bytes[0] = 0x99;
586 let field = base64_encode(&bytes);
587 let decoded = decode_topology_from_hello_ack(&field).expect("decode");
588 assert!(decoded.is_none());
589 }
590
591 #[test]
592 fn hello_ack_malformed_base64_drops_cleanly() {
593 let decoded = decode_topology_from_hello_ack("@not base64@").expect("decode");
596 assert!(decoded.is_none());
597 }
598
599 #[test]
600 fn old_hello_ack_without_topology_field_is_backwards_compat() {
601 let json = br#"{"version":1,"auth":"bearer","features":3,"server":"reddb/0.2.9"}"#;
609 let v: serde_json_check::Value = serde_json_check::from_slice(json).expect("valid JSON");
610 let topo_field = v.find_string("topology");
611 let topology = match topo_field {
612 None => None,
613 Some(s) => decode_topology_from_hello_ack(&s).expect("decode"),
614 };
615 assert!(
616 topology.is_none(),
617 "an old HelloAck without `topology` must produce None"
618 );
619 }
620
621 mod serde_json_check {
625 pub enum Value {
626 Object(Vec<(String, Value)>),
627 String(String),
628 Other,
629 }
630
631 impl Value {
632 pub fn find_string(&self, key: &str) -> Option<String> {
633 match self {
634 Value::Object(map) => map.iter().find_map(|(k, v)| {
635 if k == key {
636 if let Value::String(s) = v {
637 Some(s.clone())
638 } else {
639 None
640 }
641 } else {
642 None
643 }
644 }),
645 _ => None,
646 }
647 }
648 }
649
650 pub fn from_slice(bytes: &[u8]) -> Result<Value, &'static str> {
651 let s = std::str::from_utf8(bytes).map_err(|_| "utf8")?;
652 let mut p = Parser { src: s, pos: 0 };
653 p.skip_ws();
654 let v = p.parse_value()?;
655 Ok(v)
656 }
657
658 struct Parser<'a> {
659 src: &'a str,
660 pos: usize,
661 }
662
663 impl<'a> Parser<'a> {
664 fn rest(&self) -> &'a str {
665 &self.src[self.pos..]
666 }
667 fn bump(&mut self, n: usize) {
668 self.pos += n;
669 }
670 fn skip_ws(&mut self) {
671 while let Some(c) = self.rest().chars().next() {
672 if c.is_whitespace() {
673 self.bump(c.len_utf8());
674 } else {
675 break;
676 }
677 }
678 }
679 fn parse_value(&mut self) -> Result<Value, &'static str> {
680 self.skip_ws();
681 let head = self.rest().chars().next().ok_or("eof")?;
682 match head {
683 '{' => self.parse_object(),
684 '"' => self.parse_string().map(Value::String),
685 _ => {
686 self.skip_until_top_level_comma_or_close();
691 Ok(Value::Other)
692 }
693 }
694 }
695 fn skip_until_top_level_comma_or_close(&mut self) {
696 let mut depth = 0i32;
697 while let Some(c) = self.rest().chars().next() {
698 match c {
699 '"' => {
700 let _ = self.parse_string();
701 continue;
702 }
703 '{' | '[' => {
704 depth += 1;
705 self.bump(1);
706 }
707 '}' | ']' => {
708 if depth == 0 {
709 return;
710 }
711 depth -= 1;
712 self.bump(1);
713 }
714 ',' if depth == 0 => return,
715 _ => self.bump(c.len_utf8()),
716 }
717 }
718 }
719 fn parse_object(&mut self) -> Result<Value, &'static str> {
720 self.bump(1); let mut map = Vec::new();
722 loop {
723 self.skip_ws();
724 if self.rest().starts_with('}') {
725 self.bump(1);
726 return Ok(Value::Object(map));
727 }
728 let key = self.parse_string()?;
729 self.skip_ws();
730 if !self.rest().starts_with(':') {
731 return Err("expected ':'");
732 }
733 self.bump(1);
734 let val = self.parse_value()?;
735 map.push((key, val));
736 self.skip_ws();
737 match self.rest().chars().next() {
738 Some(',') => {
739 self.bump(1);
740 continue;
741 }
742 Some('}') => {
743 self.bump(1);
744 return Ok(Value::Object(map));
745 }
746 _ => return Err("expected ',' or '}'"),
747 }
748 }
749 }
750 fn parse_string(&mut self) -> Result<String, &'static str> {
751 if !self.rest().starts_with('"') {
752 return Err("expected '\"'");
753 }
754 self.bump(1);
755 let start = self.pos;
756 while let Some(c) = self.rest().chars().next() {
757 if c == '"' {
758 let s = self.src[start..self.pos].to_string();
759 self.bump(1);
760 return Ok(s);
761 }
762 if c == '\\' {
763 self.bump(c.len_utf8());
764 }
765 self.bump(c.len_utf8());
766 }
767 Err("unterminated string")
768 }
769 }
770 }
771
772 #[test]
773 fn header_layout_first_byte_is_version_then_le_length() {
774 let t = fixture();
779 let bytes = encode_topology(&t);
780 assert_eq!(bytes[0], TOPOLOGY_WIRE_VERSION_V1);
781 let declared = u32::from_le_bytes([bytes[1], bytes[2], bytes[3], bytes[4]]);
782 assert_eq!(declared as usize, bytes.len() - TOPOLOGY_HEADER_SIZE);
783 }
784}