ποΈ schemreg
Async schema registry client for Confluent Schema Registry (and Karapace), Apicurio Registry v3, and AWS Glue Schema Registry, with:
- β‘ Zero-copy wire-format encode / decode (Confluent 5-byte header, Glue 18-byte header)
- π Transparent in-memory caching with thundering-herd coalescing
- π Pluggable backend via the
SchemaRegistryClienttrait - π― Feature-gated: pull in only what you need
β¨ Features
| Feature | Enables |
|---|---|
| (none) | π§ Core types, wire format helpers, traits, Glue wire format |
confluent |
π Confluent HTTP client, encoder, decoder, TLS via rustls + webpki-roots |
apicurio |
ποΈ Native Apicurio Registry v3 HTTP client (ApicurioSchemaRegistry) |
glue |
βοΈ AWS Glue SDK client (AwsGlueSchemaRegistry), ZLIB compression via flate2 |
avro |
πͺΆ Avro encode / decode via apache-avro, works with any SchemaRegistryClient |
json |
π JSON Schema encode / decode, works with any SchemaRegistryClient |
native-tls-roots |
π rustls-native-certs (implies confluent) |
aws-lc-rs |
π AWS LC crypto backend instead of ring (implies confluent) |
π Quick start
π Confluent Schema Registry
# Cargo.toml
[]
= { = "0.3", = ["confluent"] }
= { = "1", = ["full"] }
= "1"
use Arc;
use ;
use Bytes;
async
βοΈ AWS Glue Schema Registry
[]
= { = "0.3", = ["glue"] }
= { = "1", = ["full"] }
= "1"
use ;
With the glue feature and real AWS credentials you can use the high-level AwsGlueSchemaRegistry client:
use BehaviorVersion;
use AwsGlueSchemaRegistry;
let config = load_defaults.await;
let registry = from_config;
π¬ Wire formats
π Confluent Schema Registry
Byte offset 0 1 2 3 4 5 β¦
ββββββββββ¬βββββββββββββββββββββββββββββ¬βββββββββββββββββββββββ
β 0x00 β schema_id (u32 BE) β payload (N bytes) β
ββββββββββ΄βββββββββββββββββββββββββββββ΄βββββββββββββββββββββββ
magic ββββββββββ 4 bytes ββββββββββββ
βοΈ AWS Glue Schema Registry
Byte offset 0 1 2 18 18 β¦
ββββββββββ¬βββββββββ¬βββββββββββββββββββββ¬βββββββββββββββββββββββ
β 0x03 β comp β schema_version_id β payload (N bytes) β
ββββββββββ΄βββββββββ΄βββββββββββββββββββββ΄βββββββββββββββββββββββ
version βbyte βββββββ 16 bytes βββββββ
- comp:
0x00= none,0x05= ZLIB - schema_version_id: 128-bit UUID stored as big-endian bytes
π Custom backend
Any struct that implements SchemaRegistryClient can be used as the backend:
use Arc;
use ;
use Result;
// Wrap with cache transparently:
use CachedSchemaRegistry;
let cached = new;
See examples/custom_backend.rs for a full working example.
β‘ Cache management
CachedSchemaRegistry exposes the AnySchemaCache trait for lifecycle control:
use CachedSchemaRegistry;
let cached = with_max_entries;
// Pre-warm for known schema IDs (avoids cold-miss latency on startup).
cached.warm_cache.await?;
println!;
// Invalidate a single stale entry.
cached.invalidate;
// Wipe everything.
cached.clear_cache;
π Format-agnostic decoding
WireFormatDecoder auto-detects the wire format and dispatches to the correct backend:
use Arc;
use WireFormatDecoder;
let decoder = new
.with_confluent
.with_glue; // requires `glue` feature
let msg = decoder.decode.await?;
println!; // Avro / Protobuf / Json / Unknown
println!;
π Examples
| Example | Description |
|---|---|
confluent_encode_decode |
π Full encodeβdecode round-trip with an in-memory stub registry |
avro_roundtrip |
πͺΆ End-to-end Avro encode β Confluent wire format β decode |
glue_roundtrip |
βοΈ Glue wire format encode / decode, optional ZLIB compression |
custom_backend |
π Implementing SchemaRegistryClient + cache + WireFormatDecoder |
apicurio_roundtrip |
ποΈ Apicurio Registry v3 encodeβdecode round-trip with a mock registry |
π οΈ Full API surface β SchemaRegistryClient trait
All implementations (ConfluentSchemaRegistry, ApicurioSchemaRegistry, and any custom backend) expose the same methods through the SchemaRegistryClient trait. CachedSchemaRegistry adds transparent caching and delegates all calls to the inner client.
| Method | Description |
|---|---|
get_schema_by_id(id) |
Fetch schema by its globally unique integer ID |
get_latest_schema(subject) |
Fetch the most recently registered version under a subject |
get_schema_by_version(subject, version) |
Fetch a specific version under a subject |
register_schema(subject, schema, type, refs) |
Register a new schema (idempotent β returns existing ID if already registered) |
check_compatibility(subject, schema, type, refs) |
Test whether a schema is compatible with the currently registered version |
check_compatible(subject, schema, type) |
Convenience alias for check_compatibility with no schema references |
delete_subject(subject, permanent) |
Delete all versions of a subject (permanent = true for hard delete) |
get_subjects() |
List all registered subjects |
get_versions(subject) |
List all registered version numbers for a subject |
health_check() |
Probe the registry for connectivity (lightweight ping β backend-specific endpoint) |
set_compatibility(subject, level) |
Set the per-subject compatibility policy |
get_compatibility(subject) |
Get the current compatibility policy for a subject |
Compatibility levels
CompatibilityLevel supports all Confluent / Apicurio policies:
use CompatibilityLevel;
registry.set_compatibility.await?;
let level = registry.get_compatibility.await?;
Available variants: Backward, BackwardTransitive, Forward, ForwardTransitive, Full, FullTransitive, None.
12-Factor / environment variable configuration
Both ConfluentSchemaRegistryBuilder and ApicurioSchemaRegistryBuilder support from_env():
// Reads SCHEMA_REGISTRY_URL, SCHEMA_REGISTRY_USERNAME/PASSWORD or SCHEMA_REGISTRY_BEARER_TOKEN
let registry = from_env?.build?;
// Reads APICURIO_REGISTRY_URL, APICURIO_REGISTRY_USERNAME/PASSWORD or APICURIO_REGISTRY_BEARER_TOKEN
let registry = from_env?.build?;
π Retry and resilience
schemreg has built-in retry logic for all HTTP requests β no extra configuration is needed:
| Scenario | Behavior |
|---|---|
| HTTP 429 (rate limited) | Retried; Retry-After header is honored (server-dictated delay) |
| HTTP 5xx (server error) | Retried with exponential backoff |
| Network errors | Retried (connection reset, timeout, DNS) |
| Max retries | 3 attempts; final error propagated |
| Backoff | 100 ms base, doubles per attempt, capped at 60 s |
No configuration is needed. To set independent connection and request timeouts:
use Duration;
use ConfluentSchemaRegistryBuilder;
let registry = default
.url
.connect_timeout // TCP connection timeout
.request_timeout // full request timeout
.pool_max_idle_per_host // max idle connections per host
.build?;
π License
Licensed under either of:
- Apache License, Version 2.0 (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0)
- MIT License (LICENSE-MIT or http://opensource.org/licenses/MIT)
at your option.