pub const TOPOLOGY_WIRE_VERSION_V1: u8 = 0x01;
pub const MAX_KNOWN_TOPOLOGY_VERSION: u8 = TOPOLOGY_WIRE_VERSION_V1;
pub const TOPOLOGY_HEADER_SIZE: usize = 1 + 4;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Topology {
pub epoch: u64,
pub primary: Endpoint,
pub replicas: Vec<ReplicaInfo>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Endpoint {
pub addr: String,
pub region: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReplicaInfo {
pub addr: String,
pub region: String,
pub healthy: bool,
pub lag_ms: u32,
pub last_applied_lsn: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TopologyError {
Truncated,
BodyLengthMismatch { declared: u32, available: usize },
InvalidUtf8,
StringTooLong { declared: u32, remaining: usize },
}
impl std::fmt::Display for TopologyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Truncated => write!(f, "topology blob truncated (< 5-byte header)"),
Self::BodyLengthMismatch {
declared,
available,
} => write!(
f,
"topology body length mismatch: declared {declared}, available {available}"
),
Self::InvalidUtf8 => write!(f, "topology string field is not valid UTF-8"),
Self::StringTooLong {
declared,
remaining,
} => write!(
f,
"topology string length {declared} exceeds remaining body bytes {remaining}"
),
}
}
}
impl std::error::Error for TopologyError {}
pub fn encode_topology(topology: &Topology) -> Vec<u8> {
let mut body = Vec::with_capacity(estimate_body_size(topology));
body.extend_from_slice(&topology.epoch.to_le_bytes());
write_str(&mut body, &topology.primary.addr);
write_str(&mut body, &topology.primary.region);
body.extend_from_slice(&(topology.replicas.len() as u32).to_le_bytes());
for r in &topology.replicas {
write_str(&mut body, &r.addr);
write_str(&mut body, &r.region);
body.push(if r.healthy { 1 } else { 0 });
body.extend_from_slice(&r.lag_ms.to_le_bytes());
body.extend_from_slice(&r.last_applied_lsn.to_le_bytes());
}
let mut out = Vec::with_capacity(TOPOLOGY_HEADER_SIZE + body.len());
out.push(TOPOLOGY_WIRE_VERSION_V1);
out.extend_from_slice(&(body.len() as u32).to_le_bytes());
out.extend_from_slice(&body);
out
}
pub fn decode_topology(bytes: &[u8]) -> Result<Option<Topology>, TopologyError> {
if bytes.len() < TOPOLOGY_HEADER_SIZE {
return Err(TopologyError::Truncated);
}
let version = bytes[0];
let declared_len = u32::from_le_bytes([bytes[1], bytes[2], bytes[3], bytes[4]]);
let body = &bytes[TOPOLOGY_HEADER_SIZE..];
if (body.len() as u64) < declared_len as u64 {
return Err(TopologyError::BodyLengthMismatch {
declared: declared_len,
available: body.len(),
});
}
let body = &body[..declared_len as usize];
if version > MAX_KNOWN_TOPOLOGY_VERSION {
return Ok(None);
}
let mut cur = Cursor::new(body);
let epoch = cur.read_u64()?;
let primary_addr = cur.read_str()?;
let primary_region = cur.read_str()?;
let replica_count = cur.read_u32()? as usize;
let mut replicas = Vec::with_capacity(replica_count);
for _ in 0..replica_count {
let addr = cur.read_str()?;
let region = cur.read_str()?;
let healthy = cur.read_u8()? != 0;
let lag_ms = cur.read_u32()?;
let last_applied_lsn = cur.read_u64()?;
replicas.push(ReplicaInfo {
addr,
region,
healthy,
lag_ms,
last_applied_lsn,
});
}
Ok(Some(Topology {
epoch,
primary: Endpoint {
addr: primary_addr,
region: primary_region,
},
replicas,
}))
}
fn estimate_body_size(t: &Topology) -> usize {
let endpoint = |e: &Endpoint| 4 + e.addr.len() + 4 + e.region.len();
let mut n = 8 + endpoint(&t.primary) + 4;
for r in &t.replicas {
n += 4 + r.addr.len() + 4 + r.region.len() + 1 + 4 + 8;
}
n
}
fn write_str(buf: &mut Vec<u8>, s: &str) {
buf.extend_from_slice(&(s.len() as u32).to_le_bytes());
buf.extend_from_slice(s.as_bytes());
}
struct Cursor<'a> {
buf: &'a [u8],
pos: usize,
}
impl<'a> Cursor<'a> {
fn new(buf: &'a [u8]) -> Self {
Self { buf, pos: 0 }
}
fn remaining(&self) -> usize {
self.buf.len() - self.pos
}
fn read_u8(&mut self) -> Result<u8, TopologyError> {
if self.remaining() < 1 {
return Err(TopologyError::Truncated);
}
let v = self.buf[self.pos];
self.pos += 1;
Ok(v)
}
fn read_u32(&mut self) -> Result<u32, TopologyError> {
if self.remaining() < 4 {
return Err(TopologyError::Truncated);
}
let bytes = &self.buf[self.pos..self.pos + 4];
self.pos += 4;
Ok(u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
}
fn read_u64(&mut self) -> Result<u64, TopologyError> {
if self.remaining() < 8 {
return Err(TopologyError::Truncated);
}
let bytes = &self.buf[self.pos..self.pos + 8];
self.pos += 8;
Ok(u64::from_le_bytes([
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
]))
}
fn read_str(&mut self) -> Result<String, TopologyError> {
let len = self.read_u32()?;
if (len as usize) > self.remaining() {
return Err(TopologyError::StringTooLong {
declared: len,
remaining: self.remaining(),
});
}
let bytes = &self.buf[self.pos..self.pos + len as usize];
self.pos += len as usize;
let s = std::str::from_utf8(bytes)
.map_err(|_| TopologyError::InvalidUtf8)?
.to_string();
Ok(s)
}
}
pub fn encode_topology_for_hello_ack(topology: &Topology) -> String {
base64_encode(&encode_topology(topology))
}
pub fn decode_topology_from_hello_ack(field: &str) -> Result<Option<Topology>, TopologyError> {
let Some(bytes) = base64_decode(field) else {
return Ok(None);
};
decode_topology(&bytes)
}
const B64_ALPHA: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
fn base64_encode(input: &[u8]) -> String {
let mut out = String::with_capacity(input.len().div_ceil(3) * 4);
let chunks = input.chunks_exact(3);
let rem = chunks.remainder();
for c in chunks {
let n = ((c[0] as u32) << 16) | ((c[1] as u32) << 8) | (c[2] as u32);
out.push(B64_ALPHA[((n >> 18) & 0x3F) as usize] as char);
out.push(B64_ALPHA[((n >> 12) & 0x3F) as usize] as char);
out.push(B64_ALPHA[((n >> 6) & 0x3F) as usize] as char);
out.push(B64_ALPHA[(n & 0x3F) as usize] as char);
}
match rem {
[a] => {
let n = (*a as u32) << 16;
out.push(B64_ALPHA[((n >> 18) & 0x3F) as usize] as char);
out.push(B64_ALPHA[((n >> 12) & 0x3F) as usize] as char);
out.push('=');
out.push('=');
}
[a, b] => {
let n = ((*a as u32) << 16) | ((*b as u32) << 8);
out.push(B64_ALPHA[((n >> 18) & 0x3F) as usize] as char);
out.push(B64_ALPHA[((n >> 12) & 0x3F) as usize] as char);
out.push(B64_ALPHA[((n >> 6) & 0x3F) as usize] as char);
out.push('=');
}
_ => {}
}
out
}
fn base64_decode(input: &str) -> Option<Vec<u8>> {
let trimmed = input.trim_end_matches('=');
let mut out = Vec::with_capacity(trimmed.len() * 3 / 4);
let mut buf = 0u32;
let mut bits = 0u8;
for ch in trimmed.bytes() {
let v: u32 = match ch {
b'A'..=b'Z' => (ch - b'A') as u32,
b'a'..=b'z' => (ch - b'a' + 26) as u32,
b'0'..=b'9' => (ch - b'0' + 52) as u32,
b'+' => 62,
b'/' => 63,
_ => return None,
};
buf = (buf << 6) | v;
bits += 6;
if bits >= 8 {
bits -= 8;
out.push(((buf >> bits) & 0xFF) as u8);
}
}
Some(out)
}
#[cfg(test)]
mod tests {
use super::*;
fn fixture() -> Topology {
Topology {
epoch: 0xDEAD_BEEF_CAFE_BABE,
primary: Endpoint {
addr: "primary.example.com:5050".into(),
region: "us-east-1".into(),
},
replicas: vec![
ReplicaInfo {
addr: "replica-a.example.com:5050".into(),
region: "us-east-1".into(),
healthy: true,
lag_ms: 12,
last_applied_lsn: 4242,
},
ReplicaInfo {
addr: "replica-b.example.com:5050".into(),
region: "us-west-2".into(),
healthy: false,
lag_ms: 999,
last_applied_lsn: 4100,
},
],
}
}
#[test]
fn round_trip_v1() {
let t = fixture();
let bytes = encode_topology(&t);
let decoded = decode_topology(&bytes).expect("decode").expect("v1 known");
assert_eq!(decoded, t);
}
#[test]
fn empty_replicas_round_trip() {
let t = Topology {
epoch: 1,
primary: Endpoint {
addr: "p:5050".into(),
region: "r".into(),
},
replicas: vec![],
};
let bytes = encode_topology(&t);
let decoded = decode_topology(&bytes).expect("decode").expect("v1");
assert_eq!(decoded, t);
}
#[test]
fn unknown_version_tag_returns_none() {
let mut bytes = encode_topology(&fixture());
bytes[0] = 0xFE; let decoded = decode_topology(&bytes).expect("decode");
assert!(
decoded.is_none(),
"unknown version tag must drop cleanly, got {decoded:?}"
);
}
#[test]
fn truncated_header_errors() {
assert!(matches!(
decode_topology(&[0x01, 0x00]),
Err(TopologyError::Truncated)
));
}
#[test]
fn body_length_mismatch_errors() {
let bytes = vec![0x01, 0xFF, 0xFF, 0xFF, 0xFF, 0x00];
assert!(matches!(
decode_topology(&bytes),
Err(TopologyError::BodyLengthMismatch { .. })
));
}
#[test]
fn version_tag_is_pinned_to_0x01() {
assert_eq!(TOPOLOGY_WIRE_VERSION_V1, 0x01);
}
#[test]
fn hello_ack_round_trip_via_base64() {
let t = fixture();
let field = encode_topology_for_hello_ack(&t);
let decoded = decode_topology_from_hello_ack(&field)
.expect("decode")
.expect("v1 known");
assert_eq!(decoded, t);
}
#[test]
fn hello_ack_inner_bytes_match_grpc_bytes() {
let t = fixture();
let canonical = encode_topology(&t);
let field = encode_topology_for_hello_ack(&t);
let recovered = base64_decode(&field).expect("base64");
assert_eq!(recovered, canonical);
}
#[test]
fn hello_ack_unknown_version_tag_drops_cleanly() {
let mut bytes = encode_topology(&fixture());
bytes[0] = 0x99;
let field = base64_encode(&bytes);
let decoded = decode_topology_from_hello_ack(&field).expect("decode");
assert!(decoded.is_none());
}
#[test]
fn hello_ack_malformed_base64_drops_cleanly() {
let decoded = decode_topology_from_hello_ack("@not base64@").expect("decode");
assert!(decoded.is_none());
}
#[test]
fn old_hello_ack_without_topology_field_is_backwards_compat() {
let json = br#"{"version":1,"auth":"bearer","features":3,"server":"reddb/0.2.9"}"#;
let v: serde_json_check::Value = serde_json_check::from_slice(json).expect("valid JSON");
let topo_field = v.find_string("topology");
let topology = match topo_field {
None => None,
Some(s) => decode_topology_from_hello_ack(&s).expect("decode"),
};
assert!(
topology.is_none(),
"an old HelloAck without `topology` must produce None"
);
}
mod serde_json_check {
pub enum Value {
Object(Vec<(String, Value)>),
String(String),
Other,
}
impl Value {
pub fn find_string(&self, key: &str) -> Option<String> {
match self {
Value::Object(map) => map.iter().find_map(|(k, v)| {
if k == key {
if let Value::String(s) = v {
Some(s.clone())
} else {
None
}
} else {
None
}
}),
_ => None,
}
}
}
pub fn from_slice(bytes: &[u8]) -> Result<Value, &'static str> {
let s = std::str::from_utf8(bytes).map_err(|_| "utf8")?;
let mut p = Parser { src: s, pos: 0 };
p.skip_ws();
let v = p.parse_value()?;
Ok(v)
}
struct Parser<'a> {
src: &'a str,
pos: usize,
}
impl<'a> Parser<'a> {
fn rest(&self) -> &'a str {
&self.src[self.pos..]
}
fn bump(&mut self, n: usize) {
self.pos += n;
}
fn skip_ws(&mut self) {
while let Some(c) = self.rest().chars().next() {
if c.is_whitespace() {
self.bump(c.len_utf8());
} else {
break;
}
}
}
fn parse_value(&mut self) -> Result<Value, &'static str> {
self.skip_ws();
let head = self.rest().chars().next().ok_or("eof")?;
match head {
'{' => self.parse_object(),
'"' => self.parse_string().map(Value::String),
_ => {
self.skip_until_top_level_comma_or_close();
Ok(Value::Other)
}
}
}
fn skip_until_top_level_comma_or_close(&mut self) {
let mut depth = 0i32;
while let Some(c) = self.rest().chars().next() {
match c {
'"' => {
let _ = self.parse_string();
continue;
}
'{' | '[' => {
depth += 1;
self.bump(1);
}
'}' | ']' => {
if depth == 0 {
return;
}
depth -= 1;
self.bump(1);
}
',' if depth == 0 => return,
_ => self.bump(c.len_utf8()),
}
}
}
fn parse_object(&mut self) -> Result<Value, &'static str> {
self.bump(1); let mut map = Vec::new();
loop {
self.skip_ws();
if self.rest().starts_with('}') {
self.bump(1);
return Ok(Value::Object(map));
}
let key = self.parse_string()?;
self.skip_ws();
if !self.rest().starts_with(':') {
return Err("expected ':'");
}
self.bump(1);
let val = self.parse_value()?;
map.push((key, val));
self.skip_ws();
match self.rest().chars().next() {
Some(',') => {
self.bump(1);
continue;
}
Some('}') => {
self.bump(1);
return Ok(Value::Object(map));
}
_ => return Err("expected ',' or '}'"),
}
}
}
fn parse_string(&mut self) -> Result<String, &'static str> {
if !self.rest().starts_with('"') {
return Err("expected '\"'");
}
self.bump(1);
let start = self.pos;
while let Some(c) = self.rest().chars().next() {
if c == '"' {
let s = self.src[start..self.pos].to_string();
self.bump(1);
return Ok(s);
}
if c == '\\' {
self.bump(c.len_utf8());
}
self.bump(c.len_utf8());
}
Err("unterminated string")
}
}
}
#[test]
fn header_layout_first_byte_is_version_then_le_length() {
let t = fixture();
let bytes = encode_topology(&t);
assert_eq!(bytes[0], TOPOLOGY_WIRE_VERSION_V1);
let declared = u32::from_le_bytes([bytes[1], bytes[2], bytes[3], bytes[4]]);
assert_eq!(declared as usize, bytes.len() - TOPOLOGY_HEADER_SIZE);
}
}