schemreg 0.1.0

Async Confluent + AWS Glue schema registry client β€” wire format, traits, caching, HTTP
Documentation

πŸ—‚οΈ schemreg

Crates.io docs.rs CI License: MIT OR Apache-2.0 MSRV: 1.88

Async schema registry client for Confluent Schema Registry (and Karapace / Apicurio) and AWS Glue Schema Registry, with:

  • ⚑ Zero-copy wire-format encode / decode (Confluent 5-byte header, Glue 18-byte header)
  • πŸš€ Transparent in-memory caching with thundering-herd coalescing
  • πŸ”Œ Pluggable backend via the SchemaRegistryClient trait
  • 🎯 Feature-gated: pull in only what you need

✨ Features

Feature Enables
(none) πŸ”§ Core types, wire format helpers, traits, Glue wire format
confluent 🌐 Confluent HTTP client, encoder, decoder, TLS via rustls + webpki-roots
glue ☁️ AWS Glue SDK client (AwsGlueSchemaRegistry), ZLIB compression via flate2
avro πŸͺΆ Avro encode / decode via apache-avro, works with any SchemaRegistryClient
native-tls-roots πŸ”’ rustls-native-certs (implies confluent)
aws-lc-rs πŸ”‘ AWS LC crypto backend instead of ring (implies confluent)

πŸš€ Quick start

🌐 Confluent Schema Registry

# Cargo.toml
[dependencies]
schemreg = { version = "0.1", features = ["confluent"] }
tokio = { version = "1", features = ["full"] }
bytes = "1"
use std::sync::Arc;
use schemreg::{
    CachedSchemaRegistry, SchemaType, SubjectNameStrategy,
    confluent::{ConfluentSchemaEncoder, ConfluentSchemaRegistry},
    decoder::WireFormatDecoder,
    traits::SchemaEncoder,
};
use bytes::Bytes;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Build the HTTP client.
    let registry = ConfluentSchemaRegistry::builder()
        .url("http://localhost:8081")
        // .basic_auth("user", "password")
        .build()?;

    // Wrap with a 1 000-entry LRU cache (default).
    let cached = Arc::new(CachedSchemaRegistry::new(registry));

    // Producer side: register schema on first send, then reuse cached ID.
    let encoder = ConfluentSchemaEncoder::builder()
        .registry(&*cached)
        .schema(r#"{"type":"record","name":"Order","fields":[{"name":"id","type":"string"}]}"#, SchemaType::Avro)
        .strategy(SubjectNameStrategy::TopicName)
        .build()?;

    let raw_bytes = Bytes::from_static(b"\x04\x08some-avro-payload");
    let framed = encoder.encode(raw_bytes, "orders", None, false).await?;

    // Consumer side: strip the header (uses the same cached registry for schema lookup).
    let decoder = WireFormatDecoder::confluent(Arc::clone(&cached));
    let msg = decoder.decode(framed).await?;

    println!("Decoded {} bytes", msg.payload.len());
    Ok(())
}

☁️ AWS Glue Schema Registry

[dependencies]
schemreg = { version = "0.1", features = ["glue"] }
tokio = { version = "1", features = ["full"] }
bytes = "1"
use schemreg::{
    GlueCompression, GlueDataFormat, GlueSchemaVersionId,
    encode_glue_wire_format, decode_glue_wire_format,
};

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let version_id: GlueSchemaVersionId =
        "550e8400-e29b-41d4-a716-446655440000".parse()?;

    // Encode β€” adds the 18-byte Glue header.
    let framed = encode_glue_wire_format(version_id, b"avro bytes", GlueCompression::None)?;

    // Decode β€” strips the header.
    let (id, payload) = decode_glue_wire_format(&framed)?;
    assert_eq!(id, version_id);
    assert_eq!(payload, b"avro bytes");
    Ok(())
}

With the glue feature and real AWS credentials you can use the high-level AwsGlueSchemaRegistry client:

use aws_config::BehaviorVersion;
use schemreg::glue::AwsGlueSchemaRegistry;

let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
let registry = AwsGlueSchemaRegistry::from_config(&config);

πŸ”¬ Wire formats

🌐 Confluent Schema Registry

Byte offset  0        1        2        3        4        5 …
             β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
             β”‚ 0x00   β”‚     schema_id (u32 BE)      β”‚  payload (N bytes)   β”‚
             β””β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
             magic    │←──────── 4 bytes ──────────→│

☁️ AWS Glue Schema Registry

Byte offset  0        1        2                   18        18 …
             β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
             β”‚ 0x03   β”‚  comp  β”‚  schema_version_id  β”‚  payload (N bytes)   β”‚
             β””β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
             version  β”‚byte    │←───── 16 bytes ─────→│
  • comp: 0x00 = none, 0x05 = ZLIB
  • schema_version_id: 128-bit UUID stored as big-endian bytes

πŸ”Œ Custom backend

Any struct that implements SchemaRegistryClient can be used as the backend:

use schemreg::{Schema, SchemaId, SchemaReference, SchemaRegistryClient, SchemaType, SchemaVersion};
use schemreg::error::Result;

struct MyRegistry { /* ... */ }

impl SchemaRegistryClient for MyRegistry {
    async fn get_schema_by_id(&self, id: SchemaId) -> Result<Schema> {
        todo!()
    }
    async fn get_latest_schema(&self, subject: &str) -> Result<Schema> {
        todo!()
    }
    async fn get_schema_by_version(&self, subject: &str, version: SchemaVersion) -> Result<Schema> {
        todo!()
    }
    async fn register_schema(
        &self, subject: &str, schema: &str,
        schema_type: SchemaType, references: &[SchemaReference],
    ) -> Result<SchemaId> {
        todo!()
    }
}

// Wrap with cache transparently:
use schemreg::CachedSchemaRegistry;
let cached = CachedSchemaRegistry::new(MyRegistry { /* ... */ });

See examples/custom_backend.rs for a full working example.


⚑ Cache management

CachedSchemaRegistry exposes the AnySchemaCache trait for lifecycle control:

use schemreg::{AnySchemaCache, CachedSchemaRegistry, SchemaId};

let cached = CachedSchemaRegistry::with_max_entries(my_registry, 512);

// Pre-warm for known schema IDs (avoids cold-miss latency on startup).
cached.warm_cache(&[1, 2, 3]).await?;

println!("cached: {}", cached.cache_len());

// Invalidate a single stale entry.
cached.invalidate(2);

// Wipe everything.
cached.clear_cache();

πŸ” Format-agnostic decoding

WireFormatDecoder auto-detects the wire format and dispatches to the correct backend:

use schemreg::decoder::WireFormatDecoder;

let decoder = WireFormatDecoder::new()
    .confluent(&cached_confluent_registry)
    .glue(&cached_glue_registry);        // requires `glue` feature

let msg = decoder.decode(raw_bytes).await?;
println!("format: {:?}", msg.schema_format);  // Avro / Protobuf / Json / Unknown
println!("payload: {} bytes", msg.payload.len());

πŸ“– Examples

Example Description
confluent_encode_decode 🌐 Full encodeβ†’decode round-trip with an in-memory stub registry
avro_roundtrip πŸͺΆ End-to-end Avro encode β†’ Confluent wire format β†’ decode
glue_roundtrip ☁️ Glue wire format encode / decode, optional ZLIB compression
custom_backend πŸ”Œ Implementing SchemaRegistryClient + cache + WireFormatDecoder
cargo run --example confluent_encode_decode --features confluent
cargo run --example avro_roundtrip --features avro
cargo run --example glue_roundtrip --features glue
cargo run --example custom_backend

πŸ“„ License

Licensed under either of:

at your option.