# ποΈ schemreg
[](https://crates.io/crates/schemreg)
[](https://docs.rs/schemreg)
[](https://github.com/hupe1980/schemreg/actions/workflows/ci.yml)
[](LICENSE)
[](https://releases.rs/docs/1.88.0/)
Async schema registry client for **Confluent Schema Registry** (and Karapace / Apicurio) and **AWS Glue Schema Registry**, with:
- β‘ Zero-copy wire-format encode / decode (Confluent 5-byte header, Glue 18-byte header)
- π Transparent in-memory caching with thundering-herd coalescing
- π Pluggable backend via the `SchemaRegistryClient` trait
- π― Feature-gated: pull in only what you need
---
## β¨ Features
| *(none)* | π§ Core types, wire format helpers, traits, Glue wire format |
| `confluent` | π Confluent HTTP client, encoder, decoder, TLS via rustls + webpki-roots |
| `glue` | βοΈ AWS Glue SDK client (`AwsGlueSchemaRegistry`), ZLIB compression via flate2 |
| `avro` | πͺΆ Avro encode / decode via apache-avro, works with any `SchemaRegistryClient` |
| `native-tls-roots` | π rustls-native-certs (implies `confluent`) |
| `aws-lc-rs` | π AWS LC crypto backend instead of ring (implies `confluent`) |
---
## π Quick start
### π Confluent Schema Registry
```toml
# Cargo.toml
[dependencies]
schemreg = { version = "0.1", features = ["confluent"] }
tokio = { version = "1", features = ["full"] }
bytes = "1"
```
```rust
use std::sync::Arc;
use schemreg::{
CachedSchemaRegistry, SchemaType, SubjectNameStrategy,
confluent::{ConfluentSchemaEncoder, ConfluentSchemaRegistry},
decoder::WireFormatDecoder,
traits::SchemaEncoder,
};
use bytes::Bytes;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Build the HTTP client.
let registry = ConfluentSchemaRegistry::builder()
.url("http://localhost:8081")
// .basic_auth("user", "password")
.build()?;
// Wrap with a 1 000-entry LRU cache (default).
let cached = Arc::new(CachedSchemaRegistry::new(registry));
// Producer side: register schema on first send, then reuse cached ID.
let encoder = ConfluentSchemaEncoder::builder()
.registry(&*cached)
.schema(r#"{"type":"record","name":"Order","fields":[{"name":"id","type":"string"}]}"#, SchemaType::Avro)
.strategy(SubjectNameStrategy::TopicName)
.build()?;
let raw_bytes = Bytes::from_static(b"\x04\x08some-avro-payload");
let framed = encoder.encode(raw_bytes, "orders", None, false).await?;
// Consumer side: strip the header (uses the same cached registry for schema lookup).
let decoder = WireFormatDecoder::confluent(Arc::clone(&cached));
let msg = decoder.decode(framed).await?;
println!("Decoded {} bytes", msg.payload.len());
Ok(())
}
```
### βοΈ AWS Glue Schema Registry
```toml
[dependencies]
schemreg = { version = "0.1", features = ["glue"] }
tokio = { version = "1", features = ["full"] }
bytes = "1"
```
```rust
use schemreg::{
GlueCompression, GlueDataFormat, GlueSchemaVersionId,
encode_glue_wire_format, decode_glue_wire_format,
};
fn main() -> Result<(), Box<dyn std::error::Error>> {
let version_id: GlueSchemaVersionId =
"550e8400-e29b-41d4-a716-446655440000".parse()?;
// Encode β adds the 18-byte Glue header.
let framed = encode_glue_wire_format(version_id, b"avro bytes", GlueCompression::None)?;
// Decode β strips the header.
let (id, payload) = decode_glue_wire_format(&framed)?;
assert_eq!(id, version_id);
assert_eq!(payload, b"avro bytes");
Ok(())
}
```
With the `glue` feature and real AWS credentials you can use the high-level `AwsGlueSchemaRegistry` client:
```rust
use aws_config::BehaviorVersion;
use schemreg::glue::AwsGlueSchemaRegistry;
let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
let registry = AwsGlueSchemaRegistry::from_config(&config);
```
---
## π¬ Wire formats
### π Confluent Schema Registry
```
Byte offset 0 1 2 3 4 5 β¦
ββββββββββ¬βββββββββββββββββββββββββββββ¬βββββββββββββββββββββββ
β 0x00 β schema_id (u32 BE) β payload (N bytes) β
ββββββββββ΄βββββββββββββββββββββββββββββ΄βββββββββββββββββββββββ
magic ββββββββββ 4 bytes ββββββββββββ
```
### βοΈ AWS Glue Schema Registry
```
Byte offset 0 1 2 18 18 β¦
ββββββββββ¬βββββββββ¬βββββββββββββββββββββ¬βββββββββββββββββββββββ
β 0x03 β comp β schema_version_id β payload (N bytes) β
ββββββββββ΄βββββββββ΄βββββββββββββββββββββ΄βββββββββββββββββββββββ
version βbyte βββββββ 16 bytes βββββββ
```
- **comp**: `0x00` = none, `0x05` = ZLIB
- **schema_version_id**: 128-bit UUID stored as big-endian bytes
---
## π Custom backend
Any struct that implements `SchemaRegistryClient` can be used as the backend:
```rust
use schemreg::{Schema, SchemaId, SchemaReference, SchemaRegistryClient, SchemaType, SchemaVersion};
use schemreg::error::Result;
struct MyRegistry { /* ... */ }
impl SchemaRegistryClient for MyRegistry {
async fn get_schema_by_id(&self, id: SchemaId) -> Result<Schema> {
todo!()
}
async fn get_latest_schema(&self, subject: &str) -> Result<Schema> {
todo!()
}
async fn get_schema_by_version(&self, subject: &str, version: SchemaVersion) -> Result<Schema> {
todo!()
}
async fn register_schema(
&self, subject: &str, schema: &str,
schema_type: SchemaType, references: &[SchemaReference],
) -> Result<SchemaId> {
todo!()
}
}
// Wrap with cache transparently:
use schemreg::CachedSchemaRegistry;
let cached = CachedSchemaRegistry::new(MyRegistry { /* ... */ });
```
See [`examples/custom_backend.rs`](examples/custom_backend.rs) for a full working example.
---
## β‘ Cache management
`CachedSchemaRegistry` exposes the `AnySchemaCache` trait for lifecycle control:
```rust
use schemreg::{AnySchemaCache, CachedSchemaRegistry, SchemaId};
let cached = CachedSchemaRegistry::with_max_entries(my_registry, 512);
// Pre-warm for known schema IDs (avoids cold-miss latency on startup).
cached.warm_cache(&[1, 2, 3]).await?;
println!("cached: {}", cached.cache_len());
// Invalidate a single stale entry.
cached.invalidate(2);
// Wipe everything.
cached.clear_cache();
```
---
## π Format-agnostic decoding
`WireFormatDecoder` auto-detects the wire format and dispatches to the correct backend:
```rust
use schemreg::decoder::WireFormatDecoder;
let decoder = WireFormatDecoder::new()
.confluent(&cached_confluent_registry)
.glue(&cached_glue_registry); // requires `glue` feature
let msg = decoder.decode(raw_bytes).await?;
println!("format: {:?}", msg.schema_format); // Avro / Protobuf / Json / Unknown
println!("payload: {} bytes", msg.payload.len());
```
---
## π Examples
| [`confluent_encode_decode`](examples/confluent_encode_decode.rs) | π Full encodeβdecode round-trip with an in-memory stub registry |
| [`avro_roundtrip`](examples/avro_roundtrip.rs) | πͺΆ End-to-end Avro encode β Confluent wire format β decode |
| [`glue_roundtrip`](examples/glue_roundtrip.rs) | βοΈ Glue wire format encode / decode, optional ZLIB compression |
| [`custom_backend`](examples/custom_backend.rs) | π Implementing `SchemaRegistryClient` + cache + WireFormatDecoder |
```bash
cargo run --example confluent_encode_decode --features confluent
cargo run --example avro_roundtrip --features avro
cargo run --example glue_roundtrip --features glue
cargo run --example custom_backend
```
---
## π License
Licensed under either of:
- Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or <http://www.apache.org/licenses/LICENSE-2.0>)
- MIT License ([LICENSE-MIT](LICENSE-MIT) or <http://opensource.org/licenses/MIT>)
at your option.