use anyhow::Result;
use danube_client::{DanubeClient, SchemaRegistryClient, SubType};
use jsonschema::Validator;
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize, Debug)]
struct MyMessage {
field1: String,
field2: i32,
}
async fn validate_struct_against_registry<T: Serialize>(
schema_client: &SchemaRegistryClient,
subject: &str,
sample: &T,
) -> Result<u32> {
println!("🔍 Fetching schema from registry for subject: {}", subject);
let schema_response = schema_client.get_latest_schema(subject).await?;
println!("📋 Retrieved schema version: {}", schema_response.version);
println!("📋 Schema type: {}", schema_response.schema_type);
let schema_def: serde_json::Value = serde_json::from_slice(&schema_response.schema_definition)?;
println!("🔨 Compiling JSON Schema validator...");
let validator =
Validator::new(&schema_def).map_err(|e| anyhow::anyhow!("Invalid JSON schema: {}", e))?;
println!("✅ Validating struct against schema...");
let sample_json = serde_json::to_value(sample)?;
if let Err(_validation_error) = validator.validate(&sample_json) {
eprintln!(
"\n❌ VALIDATION FAILED: Struct does not match schema v{}",
schema_response.version
);
eprintln!(" The consumer struct definition is incompatible with the registered schema.");
eprintln!("\n Validation errors:");
for error in validator.iter_errors(&sample_json) {
eprintln!(" - {}", error);
}
eprintln!("\n 💡 Fix: Update the MyMessage struct to match the schema in the registry.");
return Err(anyhow::anyhow!(
"Struct validation failed - consumer cannot start"
));
}
println!(
"✅ Struct validated successfully against schema v{}",
schema_response.version
);
Ok(schema_response.version)
}
#[tokio::main]
async fn main() -> Result<()> {
println!("🚀 Starting validated JSON consumer example\n");
let client = DanubeClient::builder()
.service_url("http://127.0.0.1:6650")
.build()
.await?;
let topic = "/default/test_topic";
let consumer_name = "cons_json_validated";
let subscription_name = "subs_json_validated";
println!("📝 Step 1: Validating consumer struct against schema registry\n");
let schema_client = client.schema();
let schema_version = validate_struct_against_registry(
&schema_client,
"my-app-events",
&MyMessage {
field1: "validation_test".to_string(),
field2: 0,
},
)
.await?;
println!(
"\n✅ Consumer validated against schema version: {}",
schema_version
);
println!(" Safe to proceed with typed deserialization\n");
println!("📝 Step 2: Creating consumer and subscribing to topic\n");
let mut consumer = client
.new_consumer()
.with_topic(topic)
.with_consumer_name(consumer_name)
.with_subscription(subscription_name)
.with_subscription_type(SubType::Exclusive)
.build()?;
consumer.subscribe().await?;
println!("✅ Consumer {} subscribed to {}", consumer_name, topic);
println!("\n📝 Step 3: Listening for messages...\n");
let mut message_stream = consumer.receive().await?;
while let Some(message) = message_stream.recv().await {
match serde_json::from_slice::<MyMessage>(&message.payload) {
Ok(decoded_message) => {
println!("📥 Received valid message:");
println!(" field1: {}", decoded_message.field1);
println!(" field2: {}", decoded_message.field2);
println!(" Message ID: {:?}", message.msg_id);
println!();
if let Err(e) = consumer.ack(&message).await {
eprintln!("❌ Failed to acknowledge message: {}", e);
}
}
Err(e) => {
eprintln!("❌ Deserialization failed: {}", e);
eprintln!(
" This might indicate schema drift - check schema version {}",
schema_version
);
eprintln!(" Message will NOT be acknowledged (will retry or go to DLQ)");
}
}
}
Ok(())
}