#[cfg(not(feature = "confluent"))]
fn main() {
eprintln!(
"This example requires the `confluent` Cargo feature.\n\
Run with: cargo run --example confluent_encode_decode --features confluent"
);
}
#[cfg(feature = "confluent")]
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
use std::collections::HashMap;
use std::sync::Arc;
use bytes::Bytes;
use schemreg::{
CachedSchemaRegistry, Schema, SchemaId, SchemaReference, SchemaRegistryClient, SchemaType,
SubjectNameStrategy, confluent::ConfluentSchemaEncoder, decoder::WireFormatDecoder,
error::SchemaRegError, traits::SchemaEncoder,
};
init_tracing();
#[derive(Default)]
struct StubRegistry {
next_id: std::sync::atomic::AtomicU32,
by_id: parking_lot::RwLock<HashMap<SchemaId, Schema>>,
by_subject: parking_lot::RwLock<HashMap<String, SchemaId>>,
}
impl SchemaRegistryClient for StubRegistry {
async fn get_schema_by_id(&self, id: SchemaId) -> schemreg::error::Result<Arc<Schema>> {
self.by_id
.read()
.get(&id)
.map(|s| Arc::new(s.clone()))
.ok_or_else(|| SchemaRegError::invalid_state(format!("schema {id} not found")))
}
async fn get_latest_schema(&self, subject: &str) -> schemreg::error::Result<Arc<Schema>> {
let id = self
.by_subject
.read()
.get(subject)
.copied()
.ok_or_else(|| {
SchemaRegError::invalid_state(format!("subject {subject} not found"))
})?;
self.get_schema_by_id(id).await
}
async fn get_schema_by_version(
&self,
subject: &str,
_version: schemreg::SchemaVersion,
) -> schemreg::error::Result<Arc<Schema>> {
self.get_latest_schema(subject).await
}
async fn register_schema(
&self,
subject: &str,
schema: &str,
schema_type: SchemaType,
_references: &[SchemaReference],
) -> schemreg::error::Result<SchemaId> {
if let Some(&id) = self.by_subject.read().get(subject) {
return Ok(id);
}
let raw_id = self
.next_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
+ 1;
let id = SchemaId::from(raw_id);
let s = Schema::new(id, schema_type, schema).with_subject(subject, 1i32);
self.by_id.write().insert(id, s);
self.by_subject.write().insert(subject.to_string(), id);
println!(" Registered schema -> id={id} subject={subject}");
Ok(id)
}
}
let cached = CachedSchemaRegistry::new(StubRegistry::default());
const AVRO_ORDER_SCHEMA: &str = r#"{
"type": "record",
"name": "Order",
"namespace": "com.example",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "amount_usd", "type": "double"},
{"name": "currency", "type": "string"}
]
}"#;
let cached = std::sync::Arc::new(cached);
let encoder = ConfluentSchemaEncoder::builder()
.registry(&*cached)
.schema(AVRO_ORDER_SCHEMA, SchemaType::Avro)
.strategy(SubjectNameStrategy::TopicName)
.build()?;
let decoder = WireFormatDecoder::confluent(std::sync::Arc::clone(&cached));
let topic = "orders";
let raw_payload = Bytes::from_static(b"\x0eorder-1");
println!("\n=== Confluent encode -> decode round-trip ===\n");
println!(
"Raw payload: {} bytes {:02X?}",
raw_payload.len(),
&raw_payload[..]
);
let framed = encoder
.encode(
raw_payload.clone(),
topic,
None,
schemreg::EncodeTarget::Value,
)
.await?;
let schema_id = u32::from_be_bytes([framed[1], framed[2], framed[3], framed[4]]);
println!(
"Framed: {} bytes magic=0x{:02X} schema_id={}",
framed.len(),
framed[0],
schema_id,
);
let msg = decoder.decode(framed.clone()).await?;
println!(
"Decoded: {} bytes format={:?}",
msg.payload.len(),
msg.schema_format
);
assert_eq!(
msg.payload, raw_payload,
"decoded bytes must match original payload"
);
println!("Round-trip verified.");
let framed2 = encoder
.encode(
raw_payload.clone(),
topic,
None,
schemreg::EncodeTarget::Value,
)
.await?;
assert_eq!(framed, framed2, "same schema ID must be reused");
println!("Schema ID cached - no extra registry call.");
let plain = Bytes::from_static(b"plain text, no framing");
let pass = decoder.decode(plain.clone()).await?;
assert_eq!(pass.payload, plain);
println!("Non-Confluent payload passed through unchanged.");
println!("\nCache stats: {} schemas cached.", cached.cache_len());
println!("\n=== All assertions passed ===\n");
Ok(())
}
#[cfg(feature = "confluent")]
fn init_tracing() {
let _ = tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "schemreg=debug".parse().unwrap()),
)
.try_init();
}