#[cfg(feature = "schema-registry")]
mod client;
pub mod glue;
#[cfg(feature = "schema-registry")]
#[cfg_attr(docsrs, doc(cfg(feature = "schema-registry")))]
pub use client::{ConfluentSchemaRegistry, ConfluentSchemaRegistryBuilder};
use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use bytes::{BufMut, Bytes, BytesMut};
use parking_lot::RwLock;
use tokio::sync::{Mutex as AsyncMutex, oneshot};
use crate::error::{KrafkaError, Result};
pub type SchemaId = u32;
pub type SchemaVersion = i32;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum SchemaType {
Avro,
Protobuf,
Json,
}
impl SchemaType {
pub fn as_str(&self) -> &'static str {
match self {
Self::Avro => "AVRO",
Self::Protobuf => "PROTOBUF",
Self::Json => "JSON",
}
}
}
impl fmt::Display for SchemaType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.as_str())
}
}
impl std::str::FromStr for SchemaType {
type Err = KrafkaError;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
if s.eq_ignore_ascii_case("AVRO") {
Ok(Self::Avro)
} else if s.eq_ignore_ascii_case("PROTOBUF") {
Ok(Self::Protobuf)
} else if s.eq_ignore_ascii_case("JSON") {
Ok(Self::Json)
} else {
Err(KrafkaError::schema_registry(format!(
"unknown schema type: '{s}'"
)))
}
}
}
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SchemaReference {
pub name: String,
pub subject: String,
pub version: SchemaVersion,
}
impl SchemaReference {
pub fn new(
name: impl Into<String>,
subject: impl Into<String>,
version: SchemaVersion,
) -> Self {
Self {
name: name.into(),
subject: subject.into(),
version,
}
}
}
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Schema {
pub id: SchemaId,
pub schema_type: SchemaType,
pub schema: String,
pub version: Option<SchemaVersion>,
pub subject: Option<String>,
pub references: Vec<SchemaReference>,
}
impl Schema {
pub fn new(id: SchemaId, schema_type: SchemaType, schema: impl Into<String>) -> Self {
Self {
id,
schema_type,
schema: schema.into(),
version: None,
subject: None,
references: Vec::new(),
}
}
pub fn with_subject(mut self, subject: impl Into<String>, version: SchemaVersion) -> Self {
self.subject = Some(subject.into());
self.version = Some(version);
self
}
pub fn with_references(mut self, references: Vec<SchemaReference>) -> Self {
self.references = references;
self
}
}
const MAGIC_BYTE: u8 = 0x00;
const HEADER_SIZE: usize = 5;
pub fn encode_wire_format(schema_id: SchemaId, payload: &[u8]) -> Bytes {
let mut buf = BytesMut::with_capacity(HEADER_SIZE + payload.len());
buf.put_u8(MAGIC_BYTE);
buf.put_u32(schema_id);
buf.put_slice(payload);
buf.freeze()
}
pub fn decode_wire_format(data: &[u8]) -> Result<(SchemaId, &[u8])> {
let schema_id = validate_wire_header(data)?;
Ok((schema_id, &data[HEADER_SIZE..]))
}
pub fn decode_wire_format_bytes(data: &Bytes) -> Result<(SchemaId, Bytes)> {
let schema_id = validate_wire_header(data)?;
Ok((schema_id, data.slice(HEADER_SIZE..)))
}
fn validate_wire_header(data: &[u8]) -> Result<SchemaId> {
if data.len() < HEADER_SIZE {
return Err(KrafkaError::serialization(format!(
"wire format data too short: expected at least {HEADER_SIZE} bytes, got {}",
data.len()
)));
}
if data[0] != MAGIC_BYTE {
return Err(KrafkaError::serialization(format!(
"invalid wire format magic byte: expected 0x{MAGIC_BYTE:02X}, got 0x{:02X}",
data[0]
)));
}
Ok(u32::from_be_bytes([data[1], data[2], data[3], data[4]]))
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
#[non_exhaustive]
pub enum SubjectNameStrategy {
#[default]
TopicName,
RecordName,
TopicRecordName,
}
impl SubjectNameStrategy {
pub fn subject_name(
&self,
topic: &str,
record_name: Option<&str>,
is_key: bool,
) -> Result<String> {
match self {
Self::TopicName => {
let suffix = if is_key { "key" } else { "value" };
Ok(format!("{topic}-{suffix}"))
}
Self::RecordName => {
let name = record_name.ok_or_else(|| {
KrafkaError::config("RecordName strategy requires a record name")
})?;
Ok(name.to_string())
}
Self::TopicRecordName => {
let name = record_name.ok_or_else(|| {
KrafkaError::config("TopicRecordName strategy requires a record name")
})?;
Ok(format!("{topic}-{name}"))
}
}
}
}
pub trait SchemaRegistryClient: Send + Sync {
fn get_schema_by_id(
&self,
id: SchemaId,
) -> Pin<Box<dyn Future<Output = Result<Schema>> + Send + '_>>;
fn get_latest_schema(
&self,
subject: &str,
) -> Pin<Box<dyn Future<Output = Result<Schema>> + Send + '_>>;
fn get_schema_by_version(
&self,
subject: &str,
version: SchemaVersion,
) -> Pin<Box<dyn Future<Output = Result<Schema>> + Send + '_>>;
fn register_schema(
&self,
subject: &str,
schema: &str,
schema_type: SchemaType,
references: &[SchemaReference],
) -> Pin<Box<dyn Future<Output = Result<SchemaId>> + Send + '_>>;
}
pub struct CachedSchemaRegistry<C> {
inner: C,
cache: RwLock<HashMap<SchemaId, Schema>>,
insertion_order: RwLock<VecDeque<SchemaId>>,
max_entries: Option<usize>,
in_flight: AsyncMutex<HashMap<SchemaId, Vec<oneshot::Sender<Result<Schema>>>>>,
}
impl<C: SchemaRegistryClient> CachedSchemaRegistry<C> {
pub fn new(inner: C) -> Self {
Self {
inner,
cache: RwLock::new(HashMap::new()),
insertion_order: RwLock::new(VecDeque::new()),
max_entries: None,
in_flight: AsyncMutex::new(HashMap::new()),
}
}
pub fn with_capacity(inner: C, capacity: usize) -> Self {
Self {
inner,
cache: RwLock::new(HashMap::with_capacity(capacity)),
insertion_order: RwLock::new(VecDeque::with_capacity(capacity)),
max_entries: None,
in_flight: AsyncMutex::new(HashMap::new()),
}
}
pub fn with_max_entries(inner: C, max_entries: usize) -> Self {
let max_entries = max_entries.max(1);
Self {
inner,
cache: RwLock::new(HashMap::with_capacity(max_entries)),
insertion_order: RwLock::new(VecDeque::with_capacity(max_entries)),
max_entries: Some(max_entries),
in_flight: AsyncMutex::new(HashMap::new()),
}
}
pub fn inner(&self) -> &C {
&self.inner
}
pub fn cache_len(&self) -> usize {
self.cache.read().len()
}
pub fn cache_is_empty(&self) -> bool {
self.cache.read().is_empty()
}
pub fn clear_cache(&self) {
self.cache.write().clear();
self.insertion_order.write().clear();
}
fn insert_cache_entry(&self, id: SchemaId, schema: Schema) {
let mut cache = self.cache.write();
if let Some(existing) = cache.get_mut(&id) {
*existing = schema;
return;
}
if let Some(max_entries) = self.max_entries {
let mut insertion_order = self.insertion_order.write();
if cache.len() >= max_entries
&& let Some(evicted) = insertion_order.pop_front()
{
cache.remove(&evicted);
}
insertion_order.push_back(id);
}
cache.insert(id, schema);
}
}
impl<C> fmt::Debug for CachedSchemaRegistry<C> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("CachedSchemaRegistry")
.field("cache_len", &self.cache.read().len())
.field("max_entries", &self.max_entries)
.finish()
}
}
impl<C: SchemaRegistryClient> SchemaRegistryClient for CachedSchemaRegistry<C> {
fn get_schema_by_id(
&self,
id: SchemaId,
) -> Pin<Box<dyn Future<Output = Result<Schema>> + Send + '_>> {
Box::pin(async move {
if let Some(schema) = self.cache.read().get(&id) {
return Ok(schema.clone());
}
let mut in_flight = self.in_flight.lock().await;
if let Some(schema) = self.cache.read().get(&id) {
return Ok(schema.clone());
}
if let Some(waiters) = in_flight.get_mut(&id) {
let (tx, rx) = oneshot::channel();
waiters.push(tx);
drop(in_flight);
return rx
.await
.map_err(|_| KrafkaError::invalid_state("schema lookup coalescer dropped"))?;
}
in_flight.insert(id, Vec::new());
drop(in_flight);
let result = self.inner.get_schema_by_id(id).await;
if let Ok(schema) = &result {
self.insert_cache_entry(id, schema.clone());
}
let waiters = self.in_flight.lock().await.remove(&id).unwrap_or_default();
for waiter in waiters {
let _ = waiter.send(result.clone());
}
result
})
}
fn get_latest_schema(
&self,
subject: &str,
) -> Pin<Box<dyn Future<Output = Result<Schema>> + Send + '_>> {
let subject = subject.to_string();
Box::pin(async move {
let schema = self.inner.get_latest_schema(&subject).await?;
self.insert_cache_entry(schema.id, schema.clone());
Ok(schema)
})
}
fn get_schema_by_version(
&self,
subject: &str,
version: SchemaVersion,
) -> Pin<Box<dyn Future<Output = Result<Schema>> + Send + '_>> {
let subject = subject.to_string();
Box::pin(async move {
let schema = self.inner.get_schema_by_version(&subject, version).await?;
self.insert_cache_entry(schema.id, schema.clone());
Ok(schema)
})
}
fn register_schema(
&self,
subject: &str,
schema: &str,
schema_type: SchemaType,
references: &[SchemaReference],
) -> Pin<Box<dyn Future<Output = Result<SchemaId>> + Send + '_>> {
let subject = subject.to_string();
let schema = schema.to_string();
let references = references.to_vec();
Box::pin(async move {
self.inner
.register_schema(&subject, &schema, schema_type, &references)
.await
})
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use tokio::sync::Notify;
fn ok<T, E: std::fmt::Display>(result: std::result::Result<T, E>) -> T {
match result {
Ok(value) => value,
Err(err) => unreachable!("expected Ok(..), got Err({err})"),
}
}
fn err<T, E: std::fmt::Display>(result: std::result::Result<T, E>) -> E {
match result {
Err(err) => err,
Ok(_) => unreachable!("expected Err(..), got Ok(..)"),
}
}
fn join_ok<T>(result: std::result::Result<T, tokio::task::JoinError>) -> T {
match result {
Ok(value) => value,
Err(err) => unreachable!("spawned task failed unexpectedly: {err}"),
}
}
#[test]
fn test_wire_format_roundtrip() {
let payload = b"hello world";
let encoded = encode_wire_format(42, payload);
let (id, decoded) = ok(decode_wire_format(&encoded));
assert_eq!(id, 42);
assert_eq!(decoded, payload);
}
#[test]
fn test_wire_format_empty_payload() {
let encoded = encode_wire_format(1, b"");
assert_eq!(encoded.len(), HEADER_SIZE);
let (id, payload) = ok(decode_wire_format(&encoded));
assert_eq!(id, 1);
assert!(payload.is_empty());
}
#[test]
fn test_wire_format_max_schema_id() {
let encoded = encode_wire_format(u32::MAX, b"data");
let (id, _) = ok(decode_wire_format(&encoded));
assert_eq!(id, u32::MAX);
}
#[test]
fn test_wire_format_header_bytes() {
let encoded = encode_wire_format(256, b"x");
assert_eq!(&encoded[..5], &[0x00, 0x00, 0x00, 0x01, 0x00]);
assert_eq!(&encoded[5..], b"x");
}
#[test]
fn test_wire_format_invalid_magic_byte() {
let data = [0x01, 0, 0, 0, 1, 0x42];
let result = decode_wire_format(&data);
assert!(result.is_err());
assert!(err(result).to_string().contains("magic byte"));
}
#[test]
fn test_wire_format_too_short() {
let result = decode_wire_format(&[0x00, 0, 0]);
assert!(result.is_err());
assert!(err(result).to_string().contains("too short"));
}
#[test]
fn test_wire_format_empty_data() {
let result = decode_wire_format(&[]);
assert!(result.is_err());
}
#[test]
fn test_subject_default_is_topic_name() {
assert_eq!(
SubjectNameStrategy::default(),
SubjectNameStrategy::TopicName
);
}
#[test]
fn test_subject_topic_name_key() {
let s = ok(SubjectNameStrategy::TopicName.subject_name("orders", None, true));
assert_eq!(s, "orders-key");
}
#[test]
fn test_subject_topic_name_value() {
let s = ok(SubjectNameStrategy::TopicName.subject_name("orders", None, false));
assert_eq!(s, "orders-value");
}
#[test]
fn test_subject_record_name() {
let s = ok(SubjectNameStrategy::RecordName.subject_name(
"orders",
Some("com.example.Order"),
false,
));
assert_eq!(s, "com.example.Order");
}
#[test]
fn test_subject_record_name_missing() {
let result = SubjectNameStrategy::RecordName.subject_name("orders", None, false);
assert!(result.is_err());
}
#[test]
fn test_subject_topic_record_name() {
let s =
ok(SubjectNameStrategy::TopicRecordName.subject_name("orders", Some("Order"), true));
assert_eq!(s, "orders-Order");
}
#[test]
fn test_subject_topic_record_name_missing() {
let result = SubjectNameStrategy::TopicRecordName.subject_name("orders", None, true);
assert!(result.is_err());
}
#[test]
fn test_schema_type_display() {
assert_eq!(SchemaType::Avro.to_string(), "AVRO");
assert_eq!(SchemaType::Protobuf.to_string(), "PROTOBUF");
assert_eq!(SchemaType::Json.to_string(), "JSON");
}
#[test]
fn test_schema_type_from_str() {
assert_eq!(ok("AVRO".parse::<SchemaType>()), SchemaType::Avro);
assert_eq!(ok("PROTOBUF".parse::<SchemaType>()), SchemaType::Protobuf);
assert_eq!(ok("JSON".parse::<SchemaType>()), SchemaType::Json);
}
#[test]
fn test_schema_type_from_str_unknown() {
let result = "XML".parse::<SchemaType>();
assert!(result.is_err());
assert!(err(result).to_string().contains("XML"));
}
#[test]
fn test_schema_new() {
let s = Schema::new(1, SchemaType::Avro, r#"{"type":"string"}"#);
assert_eq!(s.id, 1);
assert_eq!(s.schema_type, SchemaType::Avro);
assert_eq!(s.schema, r#"{"type":"string"}"#);
assert_eq!(s.version, None);
assert_eq!(s.subject, None);
assert!(s.references.is_empty());
}
#[test]
fn test_schema_with_subject() {
let s = Schema::new(1, SchemaType::Avro, "{}").with_subject("my-topic-value", 3);
assert_eq!(s.subject, Some("my-topic-value".to_string()));
assert_eq!(s.version, Some(3));
}
#[test]
fn test_schema_with_references() {
let refs = vec![SchemaReference::new("Ref", "ref-subject", 1)];
let s = Schema::new(1, SchemaType::Avro, "{}").with_references(refs.clone());
assert_eq!(s.references, refs);
}
#[test]
fn test_schema_reference_new() {
let r = SchemaReference::new("com.example.Address", "address-value", 2);
assert_eq!(r.name, "com.example.Address");
assert_eq!(r.subject, "address-value");
assert_eq!(r.version, 2);
}
struct MockRegistry {
get_by_id_calls: AtomicU32,
}
impl MockRegistry {
fn new() -> Self {
Self {
get_by_id_calls: AtomicU32::new(0),
}
}
fn get_by_id_call_count(&self) -> u32 {
self.get_by_id_calls.load(Ordering::SeqCst)
}
}
impl SchemaRegistryClient for MockRegistry {
fn get_schema_by_id(
&self,
id: SchemaId,
) -> Pin<Box<dyn Future<Output = Result<Schema>> + Send + '_>> {
self.get_by_id_calls.fetch_add(1, Ordering::SeqCst);
Box::pin(async move { Ok(Schema::new(id, SchemaType::Avro, r#"{"type":"string"}"#)) })
}
fn get_latest_schema(
&self,
subject: &str,
) -> Pin<Box<dyn Future<Output = Result<Schema>> + Send + '_>> {
let subject = subject.to_string();
Box::pin(async move {
Ok(Schema::new(100, SchemaType::Avro, r#"{"type":"string"}"#)
.with_subject(subject, 1))
})
}
fn get_schema_by_version(
&self,
subject: &str,
version: SchemaVersion,
) -> Pin<Box<dyn Future<Output = Result<Schema>> + Send + '_>> {
let subject = subject.to_string();
Box::pin(async move {
Ok(Schema::new(100, SchemaType::Avro, r#"{"type":"string"}"#)
.with_subject(subject, version))
})
}
fn register_schema(
&self,
_subject: &str,
_schema: &str,
_schema_type: SchemaType,
_references: &[SchemaReference],
) -> Pin<Box<dyn Future<Output = Result<SchemaId>> + Send + '_>> {
Box::pin(async { Ok(42) })
}
}
struct BlockingMockRegistry {
get_by_id_calls: AtomicU32,
started: Notify,
release: Notify,
}
impl BlockingMockRegistry {
fn new() -> Self {
Self {
get_by_id_calls: AtomicU32::new(0),
started: Notify::new(),
release: Notify::new(),
}
}
fn get_by_id_call_count(&self) -> u32 {
self.get_by_id_calls.load(Ordering::SeqCst)
}
async fn wait_started(&self) {
self.started.notified().await;
}
fn release(&self) {
self.release.notify_waiters();
}
}
impl SchemaRegistryClient for BlockingMockRegistry {
fn get_schema_by_id(
&self,
id: SchemaId,
) -> Pin<Box<dyn Future<Output = Result<Schema>> + Send + '_>> {
self.get_by_id_calls.fetch_add(1, Ordering::SeqCst);
Box::pin(async move {
self.started.notify_waiters();
self.release.notified().await;
Ok(Schema::new(id, SchemaType::Avro, r#"{"type":"string"}"#))
})
}
fn get_latest_schema(
&self,
subject: &str,
) -> Pin<Box<dyn Future<Output = Result<Schema>> + Send + '_>> {
let subject = subject.to_string();
Box::pin(async move {
Ok(Schema::new(100, SchemaType::Avro, r#"{"type":"string"}"#)
.with_subject(subject, 1))
})
}
fn get_schema_by_version(
&self,
subject: &str,
version: SchemaVersion,
) -> Pin<Box<dyn Future<Output = Result<Schema>> + Send + '_>> {
let subject = subject.to_string();
Box::pin(async move {
Ok(Schema::new(100, SchemaType::Avro, r#"{"type":"string"}"#)
.with_subject(subject, version))
})
}
fn register_schema(
&self,
_subject: &str,
_schema: &str,
_schema_type: SchemaType,
_references: &[SchemaReference],
) -> Pin<Box<dyn Future<Output = Result<SchemaId>> + Send + '_>> {
Box::pin(async { Ok(42) })
}
}
#[tokio::test]
async fn test_cache_miss_then_hit() {
let mock = MockRegistry::new();
let cached = CachedSchemaRegistry::new(mock);
let s1 = ok(cached.get_schema_by_id(1).await);
assert_eq!(cached.inner().get_by_id_call_count(), 1);
assert_eq!(cached.cache_len(), 1);
let s2 = ok(cached.get_schema_by_id(1).await);
assert_eq!(cached.inner().get_by_id_call_count(), 1);
assert_eq!(s1, s2);
}
#[tokio::test]
async fn test_cache_different_ids() {
let mock = MockRegistry::new();
let cached = CachedSchemaRegistry::new(mock);
ok(cached.get_schema_by_id(1).await);
ok(cached.get_schema_by_id(2).await);
assert_eq!(cached.inner().get_by_id_call_count(), 2);
assert_eq!(cached.cache_len(), 2);
ok(cached.get_schema_by_id(1).await);
ok(cached.get_schema_by_id(2).await);
assert_eq!(cached.inner().get_by_id_call_count(), 2);
}
#[tokio::test]
async fn test_cache_clear() {
let mock = MockRegistry::new();
let cached = CachedSchemaRegistry::new(mock);
ok(cached.get_schema_by_id(1).await);
assert_eq!(cached.cache_len(), 1);
cached.clear_cache();
assert_eq!(cached.cache_len(), 0);
ok(cached.get_schema_by_id(1).await);
assert_eq!(cached.inner().get_by_id_call_count(), 2);
}
#[tokio::test]
async fn test_cache_coalesces_concurrent_misses() {
let cached = Arc::new(CachedSchemaRegistry::new(BlockingMockRegistry::new()));
let first = {
let cached = cached.clone();
tokio::spawn(async move { ok(cached.get_schema_by_id(7).await) })
};
cached.inner().wait_started().await;
let second = {
let cached = cached.clone();
tokio::spawn(async move { ok(cached.get_schema_by_id(7).await) })
};
tokio::task::yield_now().await;
cached.inner().release();
let first_schema = join_ok(first.await);
let second_schema = join_ok(second.await);
assert_eq!(first_schema, second_schema);
assert_eq!(cached.inner().get_by_id_call_count(), 1);
}
#[tokio::test]
async fn test_cache_get_latest_populates_id_cache() {
let mock = MockRegistry::new();
let cached = CachedSchemaRegistry::new(mock);
let schema = ok(cached.get_latest_schema("test-value").await);
assert_eq!(cached.cache_len(), 1);
let by_id = ok(cached.get_schema_by_id(schema.id).await);
assert_eq!(cached.inner().get_by_id_call_count(), 0);
assert_eq!(by_id.id, schema.id);
}
#[tokio::test]
async fn test_cache_get_by_version_populates_id_cache() {
let mock = MockRegistry::new();
let cached = CachedSchemaRegistry::new(mock);
let schema = ok(cached.get_schema_by_version("test-value", 1).await);
assert_eq!(cached.cache_len(), 1);
let by_id = ok(cached.get_schema_by_id(schema.id).await);
assert_eq!(cached.inner().get_by_id_call_count(), 0);
assert_eq!(by_id.id, schema.id);
}
#[tokio::test]
async fn test_cache_register_forwards() {
let mock = MockRegistry::new();
let cached = CachedSchemaRegistry::new(mock);
let id = cached
.register_schema("test-value", "{}", SchemaType::Avro, &[])
.await;
let id = ok(id);
assert_eq!(id, 42);
}
#[test]
fn test_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<Schema>();
assert_send_sync::<SchemaReference>();
assert_send_sync::<SchemaType>();
assert_send_sync::<SubjectNameStrategy>();
assert_send_sync::<CachedSchemaRegistry<MockRegistry>>();
}
#[test]
fn test_object_safe() {
fn _assert_object_safe(_: &dyn SchemaRegistryClient) {}
}
#[test]
fn test_cached_debug() {
let cached = CachedSchemaRegistry::new(MockRegistry::new());
let debug = format!("{cached:?}");
assert!(debug.contains("cache_len"));
}
#[test]
fn test_wire_format_bytes_roundtrip() {
let payload = b"hello world";
let encoded = encode_wire_format(42, payload);
let (id, decoded) = ok(decode_wire_format_bytes(&encoded));
assert_eq!(id, 42);
assert_eq!(&decoded[..], payload);
}
#[test]
fn test_wire_format_bytes_empty_payload() {
let encoded = encode_wire_format(1, b"");
let (id, payload) = ok(decode_wire_format_bytes(&encoded));
assert_eq!(id, 1);
assert!(payload.is_empty());
}
#[test]
fn test_wire_format_bytes_invalid_magic() {
let data = Bytes::from_static(&[0x01, 0, 0, 0, 1, 0x42]);
let result = decode_wire_format_bytes(&data);
assert!(result.is_err());
assert!(err(result).to_string().contains("magic byte"));
}
#[test]
fn test_wire_format_bytes_too_short() {
let data = Bytes::from_static(&[0x00, 0, 0]);
let result = decode_wire_format_bytes(&data);
assert!(result.is_err());
assert!(err(result).to_string().contains("too short"));
}
#[test]
fn test_wire_format_bytes_zero_copy() {
let encoded = encode_wire_format(99, b"shared");
let (_, payload) = ok(decode_wire_format_bytes(&encoded));
assert_eq!(&payload[..], b"shared");
}
#[test]
fn test_schema_type_from_str_lowercase() {
assert_eq!(ok("avro".parse::<SchemaType>()), SchemaType::Avro);
assert_eq!(ok("protobuf".parse::<SchemaType>()), SchemaType::Protobuf);
assert_eq!(ok("json".parse::<SchemaType>()), SchemaType::Json);
}
#[test]
fn test_schema_type_from_str_mixed_case() {
assert_eq!(ok("Avro".parse::<SchemaType>()), SchemaType::Avro);
assert_eq!(ok("ProtobuF".parse::<SchemaType>()), SchemaType::Protobuf);
assert_eq!(ok("Json".parse::<SchemaType>()), SchemaType::Json);
}
#[tokio::test]
async fn test_cache_with_capacity() {
let mock = MockRegistry::new();
let cached = CachedSchemaRegistry::with_capacity(mock, 100);
assert_eq!(cached.cache_len(), 0);
ok(cached.get_schema_by_id(1).await);
assert_eq!(cached.cache_len(), 1);
}
#[tokio::test]
async fn test_cache_with_max_entries_evicts_oldest_entry() {
let mock = MockRegistry::new();
let cached = CachedSchemaRegistry::with_max_entries(mock, 1);
ok(cached.get_schema_by_id(1).await);
ok(cached.get_schema_by_id(2).await);
assert_eq!(cached.cache_len(), 1);
assert_eq!(cached.inner().get_by_id_call_count(), 2);
ok(cached.get_schema_by_id(1).await);
assert_eq!(cached.inner().get_by_id_call_count(), 3);
}
}