use bytes::Bytes;
use schemreg::{
GlueCompression, GlueSchemaVersionId, decode_glue_wire_format, decode_glue_wire_format_bytes,
decode_protobuf_message_indexes, decode_wire_format, decode_wire_format_bytes,
encode_glue_wire_format, encode_protobuf_wire_format, encode_wire_format,
};
fn glue_id(s: &str) -> GlueSchemaVersionId {
s.parse().expect("valid UUID")
}
const CONFLUENT_AVRO_GOLDEN: &[u8] = &[
0x00, 0x00, 0x00, 0x00, 0x01, b'h', b'e', b'l', b'l', b'o', ];
#[test]
fn confluent_avro_encoder_matches_golden() {
let encoded = encode_wire_format(1u32, b"hello");
assert_eq!(
encoded.as_ref(),
CONFLUENT_AVRO_GOLDEN,
"Confluent Avro encoder must produce the canonical byte sequence"
);
}
#[test]
fn confluent_avro_decoder_matches_golden() {
let (schema_id, payload) = decode_wire_format(CONFLUENT_AVRO_GOLDEN)
.expect("canonical Confluent Avro frame must decode without error");
assert_eq!(schema_id, 1u32, "decoded schema_id must be 1");
assert_eq!(payload, b"hello", "decoded payload must be b\"hello\"");
}
#[test]
fn confluent_avro_bytes_decoder_matches_golden() {
let data = Bytes::from_static(CONFLUENT_AVRO_GOLDEN);
let (schema_id, payload) = decode_wire_format_bytes(&data)
.expect("canonical Confluent Avro frame must decode via Bytes variant");
assert_eq!(schema_id, 1u32);
assert_eq!(&payload[..], b"hello");
}
const CONFLUENT_AVRO_MAX_ID_GOLDEN: &[u8] = &[0x00, 0xFF, 0xFF, 0xFF, 0xFF];
#[test]
fn confluent_avro_max_schema_id_golden() {
let encoded = encode_wire_format(u32::MAX, b"");
assert_eq!(
encoded.as_ref(),
CONFLUENT_AVRO_MAX_ID_GOLDEN,
"schema_id u32::MAX must encode as four 0xFF bytes"
);
let (id, payload) = decode_wire_format(CONFLUENT_AVRO_MAX_ID_GOLDEN).unwrap();
assert_eq!(id, u32::MAX);
assert!(payload.is_empty());
}
const CONFLUENT_AVRO_BIG_ENDIAN_GOLDEN: &[u8] = &[0x00, 0x00, 0x00, 0x01, 0x00, b'z'];
#[test]
fn confluent_avro_big_endian_byte_order_golden() {
let encoded = encode_wire_format(256u32, b"z");
assert_eq!(
encoded.as_ref(),
CONFLUENT_AVRO_BIG_ENDIAN_GOLDEN,
"schema_id 256 must occupy bytes [0x00, 0x00, 0x01, 0x00] in big-endian order"
);
}
const CONFLUENT_PROTO_GOLDEN: &[u8] = &[
0x00, 0x00, 0x00, 0x00, 0x07, 0x01, 0x00, b'p', b'r', b'o', b't', b'o', ];
#[test]
fn confluent_protobuf_encoder_matches_golden() {
let encoded = encode_protobuf_wire_format(7u32, &[0], b"proto");
assert_eq!(
encoded.as_ref(),
CONFLUENT_PROTO_GOLDEN,
"Confluent Protobuf encoder must produce the canonical byte sequence"
);
}
#[test]
fn confluent_protobuf_decoder_matches_golden() {
let (schema_id, after_header) = decode_wire_format(CONFLUENT_PROTO_GOLDEN)
.expect("canonical frame must have a valid Confluent header");
assert_eq!(schema_id, 7u32);
let (indexes, offset) = decode_protobuf_message_indexes(after_header)
.expect("canonical message-index must decode cleanly");
assert_eq!(indexes, vec![0i32], "message-index must be [0]");
assert_eq!(
&after_header[offset..],
b"proto",
"payload after message-index must be b\"proto\""
);
}
const CONFLUENT_PROTO_IDX1_GOLDEN: &[u8] = &[
0x00, 0x00, 0x00, 0x00, 0x2A, 0x01, 0x02, b'd', b'a', b't', b'a', ];
#[test]
fn confluent_protobuf_index_1_golden() {
let encoded = encode_protobuf_wire_format(42u32, &[1], b"data");
assert_eq!(encoded.as_ref(), CONFLUENT_PROTO_IDX1_GOLDEN);
let (id, after_header) = decode_wire_format(CONFLUENT_PROTO_IDX1_GOLDEN).unwrap();
assert_eq!(id, 42u32);
let (indexes, offset) = decode_protobuf_message_indexes(after_header).unwrap();
assert_eq!(indexes, vec![1i32]);
assert_eq!(&after_header[offset..], b"data");
}
const CONFLUENT_PROTO_NESTED_GOLDEN: &[u8] = &[
0x00, 0x00, 0x00, 0x00, 0x64, 0x02, 0x00, 0x02, b'n', b'e', b's', b't', ];
#[test]
fn confluent_protobuf_nested_path_golden() {
let encoded = encode_protobuf_wire_format(100u32, &[0, 1], b"nest");
assert_eq!(encoded.as_ref(), CONFLUENT_PROTO_NESTED_GOLDEN);
let (id, after_header) = decode_wire_format(CONFLUENT_PROTO_NESTED_GOLDEN).unwrap();
assert_eq!(id, 100u32);
let (indexes, offset) = decode_protobuf_message_indexes(after_header).unwrap();
assert_eq!(indexes, vec![0i32, 1i32]);
assert_eq!(&after_header[offset..], b"nest");
}
const CONFLUENT_PROTO_NEG_INDEX_GOLDEN: &[u8] = &[
0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01, b'x',
];
#[test]
fn confluent_protobuf_negative_index_zigzag_golden() {
let encoded = encode_protobuf_wire_format(1u32, &[-1], b"x");
assert_eq!(encoded.as_ref(), CONFLUENT_PROTO_NEG_INDEX_GOLDEN);
let (_, after_header) = decode_wire_format(CONFLUENT_PROTO_NEG_INDEX_GOLDEN).unwrap();
let (indexes, _) = decode_protobuf_message_indexes(after_header).unwrap();
assert_eq!(indexes, vec![-1i32]);
}
const GLUE_UUID: &str = "550e8400-e29b-41d4-a716-446655440000";
const GLUE_UUID_BYTES: [u8; 16] = [
0x55, 0x0e, 0x84, 0x00, 0xe2, 0x9b, 0x41, 0xd4, 0xa7, 0x16, 0x44, 0x66, 0x55, 0x44, 0x00, 0x00, ];
const GLUE_NONE_GOLDEN: &[u8] = &[
0x03, 0x00, 0x55, 0x0e, 0x84, 0x00, 0xe2, 0x9b, 0x41, 0xd4, 0xa7, 0x16, 0x44, 0x66, 0x55, 0x44, 0x00, 0x00,
b'g', b'l', b'u', b'e', ];
#[test]
fn glue_none_encoder_matches_golden() {
let id = glue_id(GLUE_UUID);
let encoded = encode_glue_wire_format(id, b"glue", GlueCompression::None)
.expect("None-compression Glue encode must not fail");
assert_eq!(
encoded.as_ref(),
GLUE_NONE_GOLDEN,
"Glue encoder (None) must produce the canonical byte sequence"
);
}
#[test]
fn glue_none_decoder_matches_golden() {
let (version_id, payload) = decode_glue_wire_format(GLUE_NONE_GOLDEN)
.expect("canonical Glue frame must decode without error");
assert_eq!(
version_id,
glue_id(GLUE_UUID),
"decoded version UUID must match"
);
assert_eq!(payload, b"glue", "decoded payload must be b\"glue\"");
}
#[test]
fn glue_none_bytes_decoder_matches_golden() {
let data = Bytes::from_static(GLUE_NONE_GOLDEN);
let (version_id, payload) = decode_glue_wire_format_bytes(&data)
.expect("canonical Glue Bytes frame must decode without error");
assert_eq!(version_id, glue_id(GLUE_UUID));
assert_eq!(&payload[..], b"glue");
}
#[test]
fn glue_uuid_bytes_big_endian_in_header() {
let encoded = encode_glue_wire_format(glue_id(GLUE_UUID), b"", GlueCompression::None).unwrap();
assert_eq!(encoded[0], 0x03, "byte 0 must be header version 0x03");
assert_eq!(encoded[1], 0x00, "byte 1 must be compression=None (0x00)");
assert_eq!(
&encoded[2..18],
&GLUE_UUID_BYTES,
"bytes 2–17 must be the UUID in big-endian order"
);
}
#[test]
fn glue_header_version_byte_is_0x03() {
let encoded =
encode_glue_wire_format(glue_id(GLUE_UUID), b"payload", GlueCompression::None).unwrap();
assert_eq!(
encoded[0], 0x03,
"Glue header version byte must always be 0x03"
);
}
#[test]
fn glue_none_compression_byte_is_0x00() {
let encoded = encode_glue_wire_format(glue_id(GLUE_UUID), b"x", GlueCompression::None).unwrap();
assert_eq!(
encoded[1], 0x00,
"Glue None-compression indicator must be 0x00"
);
}
#[cfg(feature = "glue")]
#[test]
fn glue_zlib_compression_byte_is_0x05() {
let encoded = encode_glue_wire_format(glue_id(GLUE_UUID), b"compressed", GlueCompression::Zlib)
.expect("Zlib encoding requires glue feature");
assert_eq!(encoded[0], 0x03, "header version must be 0x03");
assert_eq!(encoded[1], 0x05, "ZLIB compression indicator must be 0x05");
assert_eq!(&encoded[2..18], &GLUE_UUID_BYTES);
}
#[cfg(feature = "glue")]
#[test]
fn glue_zlib_roundtrip_golden() {
let original = b"schema payload data that should be compressed";
let id = glue_id(GLUE_UUID);
let encoded = encode_glue_wire_format(id, original, GlueCompression::Zlib)
.expect("ZLIB encode must succeed");
assert_eq!(encoded[0], 0x03);
assert_eq!(encoded[1], 0x05);
assert_eq!(&encoded[2..18], &GLUE_UUID_BYTES);
let (decoded_id, decoded_payload) =
decode_glue_wire_format(&encoded).expect("ZLIB-compressed frame must decode");
assert_eq!(decoded_id, id);
assert_eq!(decoded_payload.as_slice(), original.as_ref());
}
#[test]
fn confluent_decoder_rejects_glue_frame() {
let glue_frame =
encode_glue_wire_format(glue_id(GLUE_UUID), b"data", GlueCompression::None).unwrap();
let err =
decode_wire_format(&glue_frame).expect_err("Confluent decoder must reject a Glue frame");
let msg = err.to_string();
assert!(
msg.contains("magic byte") || msg.contains("0x03"),
"error must reference the invalid magic byte: {msg}"
);
}
#[test]
fn glue_decoder_rejects_confluent_frame() {
let confluent_frame = encode_wire_format(42u32, b"data");
let err = decode_glue_wire_format(&confluent_frame)
.expect_err("Glue decoder must reject a Confluent frame");
let msg = err.to_string();
assert!(
msg.contains("too short") || msg.contains("header version") || msg.contains("0x03"),
"error must indicate rejection of a non-Glue frame: {msg}"
);
}