use serde::{Deserialize, Serialize};
pub const BINARY_MAGIC: u8 = 0xB0;
pub const JSON_MAGIC: u8 = b'{';
const FLAG_SHAREABLE: u8 = 0b0000_0001;
const FLAG_ZSTD_PAYLOAD: u8 = 0b0000_0010;
const FLAG_RESERVED_MASK: u8 = !(FLAG_SHAREABLE | FLAG_ZSTD_PAYLOAD);
pub const DEFAULT_ZSTD_LEVEL: i32 = 3;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct RecordEnvelope {
pub shareable: bool,
pub tags: Vec<String>,
pub payload: Vec<u8>,
}
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum EnvelopeError {
#[error("invalid JSON envelope: {0}")]
Json(#[from] serde_json::Error),
#[error("truncated envelope: expected {expected} bytes, got {got}")]
Truncated {
expected: usize,
got: usize,
},
#[error("invalid envelope magic: 0x{0:02x}")]
InvalidMagic(u8),
#[error("binary envelope CRC mismatch: header {expected:08x}, computed {got:08x}")]
CrcMismatch {
expected: u32,
got: u32,
},
#[error("reserved binary envelope flags set: 0x{0:02x}")]
ReservedFlagsSet(u8),
#[error("zstd payload decompression failed: {0}")]
ZstdDecompress(String),
#[error("binary envelope: tag {index} is not UTF-8: {source}")]
TagNotUtf8 {
index: usize,
source: std::str::Utf8Error,
},
}
impl RecordEnvelope {
pub fn new(shareable: bool, tags: Vec<String>, payload: Vec<u8>) -> Self {
Self {
shareable,
tags,
payload,
}
}
pub fn encode(&self) -> Vec<u8> {
serde_json::to_vec(self).expect("RecordEnvelope is always serialisable")
}
pub fn encode_binary(&self) -> Vec<u8> {
self.encode_binary_inner(None)
}
pub fn encode_binary_zstd(&self, level: i32) -> Vec<u8> {
self.encode_binary_inner(Some(level))
}
pub fn encode_binary_smallest(&self) -> Vec<u8> {
let plain = self.encode_binary();
let compressed = self.encode_binary_zstd(DEFAULT_ZSTD_LEVEL);
if compressed.len() < plain.len() {
compressed
} else {
plain
}
}
fn encode_binary_inner(&self, zstd_level: Option<i32>) -> Vec<u8> {
let (payload_bytes, zstd_flag) = match zstd_level {
Some(level) => match zstd::bulk::compress(&self.payload, level) {
Ok(compressed) => (std::borrow::Cow::Owned(compressed), FLAG_ZSTD_PAYLOAD),
Err(_) => (std::borrow::Cow::Borrowed(self.payload.as_slice()), 0),
},
None => (std::borrow::Cow::Borrowed(self.payload.as_slice()), 0),
};
let mut tag_bytes_total = 0usize;
for t in &self.tags {
tag_bytes_total += 2 + t.len();
}
let total = 1 + 1 + 2 + tag_bytes_total + 4 + payload_bytes.len() + 4;
let mut buf = Vec::with_capacity(total);
buf.push(BINARY_MAGIC);
let flags = if self.shareable { FLAG_SHAREABLE } else { 0 } | zstd_flag;
buf.push(flags);
let tag_count = self.tags.len() as u16;
buf.extend_from_slice(&tag_count.to_le_bytes());
for t in &self.tags {
let tag_bytes = t.as_bytes();
let tag_len = tag_bytes.len() as u16;
buf.extend_from_slice(&tag_len.to_le_bytes());
buf.extend_from_slice(tag_bytes);
}
let payload_len = payload_bytes.len() as u32;
buf.extend_from_slice(&payload_len.to_le_bytes());
buf.extend_from_slice(&payload_bytes);
let crc = crc32c::crc32c(&buf);
buf.extend_from_slice(&crc.to_le_bytes());
buf
}
pub fn decode(buf: &[u8]) -> Result<Self, EnvelopeError> {
match buf.first().copied() {
None => Err(EnvelopeError::Truncated {
expected: 1,
got: 0,
}),
Some(JSON_MAGIC) => Ok(serde_json::from_slice(buf)?),
Some(BINARY_MAGIC) => Self::decode_binary(buf),
Some(other) => Err(EnvelopeError::InvalidMagic(other)),
}
}
pub fn decode_binary(buf: &[u8]) -> Result<Self, EnvelopeError> {
let mut cur = Cursor::new(buf);
let magic = cur.read_u8()?;
if magic != BINARY_MAGIC {
return Err(EnvelopeError::InvalidMagic(magic));
}
let flags = cur.read_u8()?;
if flags & FLAG_RESERVED_MASK != 0 {
return Err(EnvelopeError::ReservedFlagsSet(flags));
}
let shareable = flags & FLAG_SHAREABLE != 0;
let zstd_payload = flags & FLAG_ZSTD_PAYLOAD != 0;
let tag_count = cur.read_u16_le()? as usize;
let mut tags = Vec::with_capacity(tag_count);
for i in 0..tag_count {
let tag_len = cur.read_u16_le()? as usize;
let bytes = cur.read_bytes(tag_len)?;
let s = std::str::from_utf8(bytes).map_err(|e| EnvelopeError::TagNotUtf8 {
index: i,
source: e,
})?;
tags.push(s.to_string());
}
let payload_len = cur.read_u32_le()? as usize;
let payload_bytes = cur.read_bytes(payload_len)?.to_vec();
let body_end = cur.pos;
let stored_crc = cur.read_u32_le()?;
let computed_crc = crc32c::crc32c(&buf[..body_end]);
if stored_crc != computed_crc {
return Err(EnvelopeError::CrcMismatch {
expected: stored_crc,
got: computed_crc,
});
}
let payload = if zstd_payload {
zstd::bulk::decompress(&payload_bytes, ZSTD_MAX_DECOMPRESSED_SIZE)
.map_err(|e| EnvelopeError::ZstdDecompress(e.to_string()))?
} else {
payload_bytes
};
Ok(Self {
shareable,
tags,
payload,
})
}
}
const ZSTD_MAX_DECOMPRESSED_SIZE: usize = 64 * 1024 * 1024;
struct Cursor<'a> {
buf: &'a [u8],
pos: usize,
}
impl<'a> Cursor<'a> {
fn new(buf: &'a [u8]) -> Self {
Self { buf, pos: 0 }
}
fn ensure(&self, n: usize) -> Result<(), EnvelopeError> {
if self.pos + n > self.buf.len() {
return Err(EnvelopeError::Truncated {
expected: self.pos + n,
got: self.buf.len(),
});
}
Ok(())
}
fn read_u8(&mut self) -> Result<u8, EnvelopeError> {
self.ensure(1)?;
let v = self.buf[self.pos];
self.pos += 1;
Ok(v)
}
fn read_u16_le(&mut self) -> Result<u16, EnvelopeError> {
self.ensure(2)?;
let v = u16::from_le_bytes([self.buf[self.pos], self.buf[self.pos + 1]]);
self.pos += 2;
Ok(v)
}
fn read_u32_le(&mut self) -> Result<u32, EnvelopeError> {
self.ensure(4)?;
let v = u32::from_le_bytes([
self.buf[self.pos],
self.buf[self.pos + 1],
self.buf[self.pos + 2],
self.buf[self.pos + 3],
]);
self.pos += 4;
Ok(v)
}
fn read_bytes(&mut self, n: usize) -> Result<&'a [u8], EnvelopeError> {
self.ensure(n)?;
let slice = &self.buf[self.pos..self.pos + n];
self.pos += n;
Ok(slice)
}
}
pub const ENVELOPE_KEY_PREFIX: &str = "hansa:rec:";
pub fn envelope_key_for(record_id: u64) -> String {
format!("{ENVELOPE_KEY_PREFIX}{record_id}")
}
#[cfg(test)]
mod tests {
use super::*;
fn sample(payload: &str) -> RecordEnvelope {
RecordEnvelope {
shareable: true,
tags: vec!["topic".into(), "crypto".into()],
payload: payload.as_bytes().to_vec(),
}
}
#[test]
fn json_round_trip_back_compat() {
let env = sample("hello world");
let bytes = env.encode();
assert_eq!(bytes[0], JSON_MAGIC);
let back = RecordEnvelope::decode(&bytes).unwrap();
assert_eq!(env, back);
}
#[test]
fn binary_round_trip() {
let env = sample("hello world");
let bytes = env.encode_binary();
assert_eq!(bytes[0], BINARY_MAGIC);
let back = RecordEnvelope::decode(&bytes).unwrap();
assert_eq!(env, back);
}
#[test]
fn binary_round_trip_empty_tags_empty_payload() {
let env = RecordEnvelope {
shareable: false,
tags: vec![],
payload: vec![],
};
let back = RecordEnvelope::decode(&env.encode_binary()).unwrap();
assert_eq!(env, back);
}
#[test]
fn binary_round_trip_unicode_tags() {
let env = RecordEnvelope {
shareable: true,
tags: vec!["café".into(), "日本語".into(), "🦀".into()],
payload: b"unicode payload".to_vec(),
};
let back = RecordEnvelope::decode(&env.encode_binary()).unwrap();
assert_eq!(env, back);
}
#[test]
fn binary_round_trip_large_payload() {
let env = RecordEnvelope {
shareable: false,
tags: vec!["x".into()],
payload: vec![0xAB; 100_000],
};
let back = RecordEnvelope::decode(&env.encode_binary()).unwrap();
assert_eq!(env, back);
}
#[test]
fn binary_is_smaller_than_json_for_text_payloads() {
let payload: Vec<u8> = (0..1024).map(|i| ((i % 26) as u8) + b'a').collect();
let env = RecordEnvelope {
shareable: true,
tags: vec!["topic".into(), "skill:python".into()],
payload,
};
let json_bytes = env.encode();
let bin_bytes = env.encode_binary();
assert!(
bin_bytes.len() * 3 < json_bytes.len(),
"expected binary < 1/3 JSON, got binary={} json={} ratio={:.2}x",
bin_bytes.len(),
json_bytes.len(),
json_bytes.len() as f32 / bin_bytes.len() as f32,
);
}
#[test]
fn decode_rejects_garbage_first_byte() {
let err = RecordEnvelope::decode(b"xyz garbage").unwrap_err();
assert!(matches!(err, EnvelopeError::InvalidMagic(b'x')));
}
#[test]
fn decode_rejects_empty_buffer() {
let err = RecordEnvelope::decode(b"").unwrap_err();
assert!(matches!(err, EnvelopeError::Truncated { .. }));
}
#[test]
fn decode_rejects_truncated_binary() {
let env = sample("hello");
let bytes = env.encode_binary();
let err = RecordEnvelope::decode(&bytes[..bytes.len() - 2]).unwrap_err();
assert!(matches!(err, EnvelopeError::Truncated { .. }));
}
#[test]
fn decode_detects_crc_corruption() {
let env = sample("hello");
let mut bytes = env.encode_binary();
let payload_start = bytes.len() - env.payload.len() - 4;
bytes[payload_start] ^= 0xFF;
let err = RecordEnvelope::decode(&bytes).unwrap_err();
assert!(matches!(err, EnvelopeError::CrcMismatch { .. }));
}
#[test]
fn decode_rejects_reserved_flags() {
let env = sample("hi");
let mut bytes = env.encode_binary();
bytes[1] |= 0b1000_0000;
let err = RecordEnvelope::decode(&bytes).unwrap_err();
assert!(matches!(err, EnvelopeError::ReservedFlagsSet(_)));
}
#[test]
fn binary_zstd_round_trip() {
let env = sample("hello world this is a longer payload to be compressed");
let bytes = env.encode_binary_zstd(DEFAULT_ZSTD_LEVEL);
assert_eq!(bytes[0], BINARY_MAGIC);
assert!(bytes[1] & FLAG_ZSTD_PAYLOAD != 0, "zstd flag missing");
let back = RecordEnvelope::decode(&bytes).unwrap();
assert_eq!(env, back);
}
#[test]
fn binary_zstd_round_trip_large_compressible() {
let env = RecordEnvelope {
shareable: true,
tags: vec!["doc".into()],
payload: "lorem ipsum dolor sit amet ".repeat(2_000).into_bytes(),
};
let bytes = env.encode_binary_zstd(DEFAULT_ZSTD_LEVEL);
let back = RecordEnvelope::decode(&bytes).unwrap();
assert_eq!(env, back);
}
#[test]
fn binary_zstd_beats_plain_on_text_payload() {
let env = RecordEnvelope {
shareable: false,
tags: vec!["x".into()],
payload: "the quick brown fox jumps over the lazy dog. "
.repeat(500)
.into_bytes(),
};
let plain = env.encode_binary();
let zstd = env.encode_binary_zstd(DEFAULT_ZSTD_LEVEL);
assert!(
zstd.len() * 4 < plain.len(),
"expected zstd < 1/4 plain, got zstd={} plain={}",
zstd.len(),
plain.len()
);
}
#[test]
fn binary_zstd_smallest_picks_shorter_per_payload() {
let text_env = RecordEnvelope::new(true, vec![], "aaaaaaaaaa ".repeat(300).into_bytes());
let text_smallest = text_env.encode_binary_smallest();
let text_zstd = text_env.encode_binary_zstd(DEFAULT_ZSTD_LEVEL);
assert_eq!(text_smallest, text_zstd);
let tiny_env = RecordEnvelope::new(true, vec![], b"hi".to_vec());
let tiny_smallest = tiny_env.encode_binary_smallest();
let tiny_plain = tiny_env.encode_binary();
assert_eq!(tiny_smallest, tiny_plain);
}
#[test]
fn binary_zstd_corrupt_frame_returns_error() {
let env = sample("hello world payload");
let mut bytes = env.encode_binary_zstd(DEFAULT_ZSTD_LEVEL);
let payload_start = bytes.len() - 4 - 1; bytes[payload_start] ^= 0xFF;
let err = RecordEnvelope::decode(&bytes).unwrap_err();
assert!(matches!(
err,
EnvelopeError::CrcMismatch { .. } | EnvelopeError::ZstdDecompress(_)
));
}
#[test]
fn binary_zstd_empty_payload_round_trip() {
let env = RecordEnvelope {
shareable: false,
tags: vec!["e".into()],
payload: vec![],
};
let bytes = env.encode_binary_zstd(DEFAULT_ZSTD_LEVEL);
let back = RecordEnvelope::decode(&bytes).unwrap();
assert_eq!(env, back);
}
#[test]
fn envelope_key_is_predictable() {
assert_eq!(envelope_key_for(42), "hansa:rec:42");
assert_eq!(envelope_key_for(0), "hansa:rec:0");
}
#[test]
fn binary_format_round_trip_through_decode_binary() {
let env = sample("a");
let json = env.encode();
let err = RecordEnvelope::decode_binary(&json).unwrap_err();
assert!(matches!(err, EnvelopeError::InvalidMagic(_)));
}
}