use bytes::Bytes;
use schemreg::wire::DetectedWireFormat;
use schemreg::{decode_wire_format_bytes, detect_wire_format};
#[test]
fn confluent_empty_payload_is_unknown() {
let wf = detect_wire_format(&Bytes::new());
assert!(matches!(wf, DetectedWireFormat::Unknown), "{wf:?}");
}
#[test]
fn confluent_only_magic_byte_is_unknown() {
let buf = Bytes::from_static(&[0x00]);
let wf = detect_wire_format(&buf);
assert!(matches!(wf, DetectedWireFormat::InvalidConfluent), "{wf:?}");
}
#[test]
fn confluent_header_truncated_to_four_bytes() {
let buf = Bytes::from_static(&[0x00, 0x00, 0x00, 0x01]);
let wf = detect_wire_format(&buf);
assert!(matches!(wf, DetectedWireFormat::InvalidConfluent), "{wf:?}");
}
#[test]
fn confluent_five_byte_header_empty_payload_is_confluent() {
let buf = Bytes::from_static(&[0x00, 0x00, 0x00, 0x00, 0x01]);
let wf = detect_wire_format(&buf);
assert!(matches!(wf, DetectedWireFormat::Confluent { .. }), "{wf:?}");
}
#[test]
fn confluent_wrong_magic_byte_is_unknown() {
let mut buf = vec![0x01u8; 100];
buf[0] = 0x01; let wf = detect_wire_format(&Bytes::from(buf));
assert!(matches!(wf, DetectedWireFormat::Unknown), "{wf:?}");
}
#[test]
fn confluent_schema_id_zero() {
let buf = Bytes::from_static(&[0x00, 0x00, 0x00, 0x00, 0x00, b'x']);
let wf = detect_wire_format(&buf);
assert!(
matches!(wf, DetectedWireFormat::Confluent { schema_id, .. } if schema_id.as_u32() == 0),
"{wf:?}"
);
}
#[test]
fn confluent_schema_id_max_u32() {
let buf = Bytes::from_static(&[0x00, 0xFF, 0xFF, 0xFF, 0xFF, b'x']);
let wf = detect_wire_format(&buf);
assert!(
matches!(wf, DetectedWireFormat::Confluent { schema_id, .. } if schema_id.as_u32() == u32::MAX),
"{wf:?}"
);
}
#[test]
fn decode_wire_format_bytes_truncated_is_error() {
let buf = Bytes::from_static(&[0x00, 0x00]);
let result = decode_wire_format_bytes(&buf);
assert!(
result.is_err(),
"expected Err for truncated input, got {result:?}"
);
}
#[test]
fn decode_wire_format_bytes_header_only_empty_inner() {
let buf = Bytes::from_static(&[0x00, 0x00, 0x00, 0x00, 0x42]);
let (schema_id, inner) = decode_wire_format_bytes(&buf).expect("5-byte header is valid");
assert_eq!(schema_id.as_u32(), 0x42);
assert!(inner.is_empty());
}
#[test]
fn confluent_all_zeros_parses_schema_id_zero() {
let buf = Bytes::from(vec![0u8; 100]);
let wf = detect_wire_format(&buf);
assert!(
matches!(wf, DetectedWireFormat::Confluent { schema_id, .. } if schema_id.as_u32() == 0),
"{wf:?}"
);
}
#[cfg(feature = "glue")]
mod glue_adversarial {
use bytes::Bytes;
use schemreg::detect_wire_format;
use schemreg::wire::DetectedWireFormat;
#[test]
fn glue_only_version_byte_is_unknown() {
let buf = Bytes::from_static(&[0x03]);
let wf = detect_wire_format(&buf);
assert!(matches!(wf, DetectedWireFormat::InvalidGlue), "{wf:?}");
}
#[test]
fn glue_header_missing_compression_byte() {
let mut buf = vec![0x03u8];
buf.extend_from_slice(&[0xAAu8; 16]); assert_eq!(buf.len(), 17);
let wf = detect_wire_format(&Bytes::from(buf));
assert!(matches!(wf, DetectedWireFormat::InvalidGlue), "{wf:?}");
}
#[test]
fn glue_all_zeros_uuid_header() {
let mut buf = vec![0x03u8]; buf.extend_from_slice(&[0x00u8; 16]); buf.push(0x00); assert_eq!(buf.len(), 18);
let wf = detect_wire_format(&Bytes::from(buf));
assert!(matches!(wf, DetectedWireFormat::Glue { .. }), "{wf:?}");
}
#[test]
fn glue_wrong_version_byte_is_unknown() {
let mut buf = vec![0x04u8]; buf.extend_from_slice(&[0xBBu8; 17]);
let wf = detect_wire_format(&Bytes::from(buf));
assert!(matches!(wf, DetectedWireFormat::Unknown), "{wf:?}");
}
#[test]
fn glue_unknown_compression_byte_is_error() {
use schemreg::glue::decode_glue_wire_format;
let mut buf = vec![0x03u8]; buf.push(0x01); buf.extend_from_slice(&[0xAAu8; 16]); buf.extend_from_slice(b"payload");
let result = decode_glue_wire_format(&buf);
assert!(
result.is_err(),
"unknown compression byte 0x01 must be rejected: {result:?}"
);
let err_str = result.unwrap_err().to_string();
assert!(
err_str.contains("0x01"),
"error should mention the unknown byte: {err_str}"
);
}
#[test]
fn glue_compression_byte_0x02_is_error() {
use schemreg::glue::decode_glue_wire_format;
let mut buf = vec![0x03u8];
buf.push(0x02);
buf.extend_from_slice(&[0xBBu8; 16]);
buf.extend_from_slice(b"payload");
let result = decode_glue_wire_format(&buf);
assert!(
result.is_err(),
"unknown compression byte 0x02 must be rejected: {result:?}"
);
}
}
mod protobuf_adversarial {
use bytes::Bytes;
use schemreg::wire::decode_protobuf_message_indexes;
#[test]
fn protobuf_empty_index_array() {
let buf = Bytes::from_static(&[0x00]);
let (idxs, consumed) =
decode_protobuf_message_indexes(&buf).expect("empty index array is valid");
assert!(idxs.is_empty());
assert_eq!(consumed, 1);
}
#[test]
fn protobuf_overlong_varint_is_error() {
let buf = Bytes::from(vec![0x80u8; 20]);
let result = decode_protobuf_message_indexes(&buf);
assert!(
result.is_err(),
"overlong varint must be rejected: {result:?}"
);
}
#[test]
fn protobuf_index_count_but_no_index_data() {
let buf = Bytes::from_static(&[0x01]);
let result = decode_protobuf_message_indexes(&buf);
assert!(
result.is_err(),
"missing index data must be an error: {result:?}"
);
}
#[test]
fn protobuf_single_index_zero() {
let buf = Bytes::from_static(&[0x01, 0x00]);
let (idxs, consumed) =
decode_protobuf_message_indexes(&buf).expect("single index [0] is valid");
assert_eq!(idxs, vec![0]);
assert_eq!(consumed, 2);
}
#[test]
fn protobuf_large_index_value() {
let buf = Bytes::from_static(&[0x01, 0xAC, 0x02]);
let (idxs, consumed) =
decode_protobuf_message_indexes(&buf).expect("large index value is valid");
assert_eq!(idxs, vec![150]);
assert_eq!(consumed, 3);
}
#[test]
fn protobuf_message_index_count_over_limit_is_error() {
let buf = Bytes::from_static(&[0x81, 0x04]);
let result = decode_protobuf_message_indexes(&buf);
assert!(
result.is_err(),
"message-index count 513 (> 512 limit) must be rejected: {result:?}"
);
let err_str = result.unwrap_err().to_string();
assert!(
err_str.contains("513") || err_str.contains("512"),
"error should mention the limit or count: {err_str}"
);
}
}