use crate::ai_contract_diff::{ContractDiffResult, Mismatch, MismatchSeverity, MismatchType};
use crate::contract_drift::protocol_contracts::{
ContractError, ContractOperation, ContractRequest, OperationType, ProtocolContract,
ValidationError, ValidationResult,
};
use crate::protocol_abstraction::Protocol;
use jsonschema::{self, Draft, Validator as JSONSchema};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MqttTopicSchema {
pub topic: String,
pub qos: Option<u8>,
pub schema: Value,
pub retained: Option<bool>,
pub description: Option<String>,
pub example: Option<Value>,
}
pub struct MqttContract {
contract_id: String,
version: String,
topics: HashMap<String, MqttTopicSchema>,
schema_cache: HashMap<String, JSONSchema>,
operations_cache: HashMap<String, ContractOperation>,
metadata: HashMap<String, String>,
}
impl MqttContract {
pub fn new(contract_id: String, version: String) -> Self {
Self {
contract_id,
version,
topics: HashMap::new(),
schema_cache: HashMap::new(),
operations_cache: HashMap::new(),
metadata: HashMap::new(),
}
}
pub fn add_topic(&mut self, topic_schema: MqttTopicSchema) -> Result<(), ContractError> {
let topic_name = topic_schema.topic.clone();
let schema = jsonschema::options()
.with_draft(Draft::Draft7)
.build(&topic_schema.schema)
.map_err(|e| ContractError::SchemaValidation(format!("Invalid JSON schema: {}", e)))?;
self.schema_cache.insert(topic_name.clone(), schema);
self.topics.insert(topic_name.clone(), topic_schema.clone());
let operation = ContractOperation {
id: topic_name.clone(),
name: topic_name.clone(),
operation_type: OperationType::MqttTopic {
topic: topic_name.clone(),
qos: topic_schema.qos,
},
input_schema: Some(topic_schema.schema.clone()),
output_schema: Some(topic_schema.schema.clone()), metadata: {
let mut meta = HashMap::new();
if let Some(retained) = topic_schema.retained {
meta.insert("retained".to_string(), retained.to_string());
}
if let Some(ref desc) = topic_schema.description {
meta.insert("description".to_string(), desc.clone());
}
meta
},
};
self.operations_cache.insert(topic_name, operation);
Ok(())
}
pub fn remove_topic(&mut self, topic_name: &str) {
if self.topics.remove(topic_name).is_some() {
self.schema_cache.remove(topic_name);
self.operations_cache.remove(topic_name);
}
}
fn diff_contracts(&self, other: &MqttContract) -> Result<ContractDiffResult, ContractError> {
let mut mismatches = Vec::new();
let all_topics: std::collections::HashSet<String> =
self.topics.keys().chain(other.topics.keys()).cloned().collect();
for topic_name in &all_topics {
if self.topics.contains_key(topic_name) && !other.topics.contains_key(topic_name) {
mismatches.push(Mismatch {
mismatch_type: MismatchType::EndpointNotFound,
path: topic_name.clone(),
method: None,
expected: Some(format!("Topic {} should exist", topic_name)),
actual: Some("Topic removed".to_string()),
description: format!("Topic {} was removed", topic_name),
severity: MismatchSeverity::Critical,
confidence: 1.0,
context: HashMap::new(),
});
}
}
for topic_name in &all_topics {
if !self.topics.contains_key(topic_name) && other.topics.contains_key(topic_name) {
mismatches.push(Mismatch {
mismatch_type: MismatchType::UnexpectedField,
path: topic_name.clone(),
method: None,
expected: None,
actual: Some(format!("New topic {}", topic_name)),
description: format!("New topic {} was added", topic_name),
severity: MismatchSeverity::Low,
confidence: 1.0,
context: HashMap::new(),
});
}
}
for topic_name in all_topics
.intersection(&self.topics.keys().cloned().collect::<std::collections::HashSet<_>>())
{
if let (Some(old_topic), Some(new_topic)) =
(self.topics.get(topic_name), other.topics.get(topic_name))
{
if old_topic.qos != new_topic.qos {
mismatches.push(Mismatch {
mismatch_type: MismatchType::SchemaMismatch,
path: format!("{}.qos", topic_name),
method: None,
expected: old_topic.qos.map(|q| format!("QoS: {}", q)),
actual: new_topic.qos.map(|q| format!("QoS: {}", q)),
description: format!(
"QoS changed for topic {}: {:?} -> {:?}",
topic_name, old_topic.qos, new_topic.qos
),
severity: MismatchSeverity::Medium,
confidence: 1.0,
context: HashMap::new(),
});
}
let old_format = Self::detect_schema_format(&old_topic.schema);
let new_format = Self::detect_schema_format(&new_topic.schema);
if old_format != new_format {
let mut context = HashMap::new();
context.insert("is_additive".to_string(), serde_json::json!(false));
context.insert("is_breaking".to_string(), serde_json::json!(true));
context.insert(
"change_category".to_string(),
serde_json::json!("schema_format_changed"),
);
context.insert("topic".to_string(), serde_json::json!(topic_name));
context.insert("old_format".to_string(), serde_json::json!(old_format));
context.insert("new_format".to_string(), serde_json::json!(new_format));
mismatches.push(Mismatch {
mismatch_type: MismatchType::SchemaMismatch,
path: format!("{}.schema_format", topic_name),
method: None,
expected: Some(format!("Schema format: {}", old_format)),
actual: Some(format!("Schema format: {}", new_format)),
description: format!(
"Schema format changed from {} to {} for topic {}",
old_format, new_format, topic_name
),
severity: MismatchSeverity::High,
confidence: 1.0,
context,
});
}
let schema_mismatches = match (old_format.as_str(), new_format.as_str()) {
("json_schema", "json_schema") => {
Self::compare_json_schemas(&old_topic.schema, &new_topic.schema, topic_name)
}
("avro", "avro") => {
Self::compare_avro_schemas(&old_topic.schema, &new_topic.schema, topic_name)
.unwrap_or_else(|_| Vec::new()) }
("json_shape", "json_shape") => Self::compare_json_shape_schemas(
&old_topic.schema,
&new_topic.schema,
topic_name,
),
_ => Vec::new(), };
mismatches.extend(schema_mismatches);
}
}
let matches = mismatches.is_empty();
let confidence = if matches { 1.0 } else { 0.8 };
Ok(ContractDiffResult {
matches,
confidence,
mismatches,
recommendations: Vec::new(),
corrections: Vec::new(),
metadata: crate::ai_contract_diff::DiffMetadata {
analyzed_at: chrono::Utc::now(),
request_source: "mqtt_contract_diff".to_string(),
contract_version: Some(self.version.clone()),
contract_format: "mqtt_schema".to_string(),
endpoint_path: "".to_string(),
http_method: "".to_string(),
request_count: 1,
llm_provider: None,
llm_model: None,
},
})
}
fn detect_schema_format(schema: &Value) -> String {
if schema.get("type").and_then(|v| v.as_str()) == Some("record")
|| schema.get("fields").is_some()
{
return "avro".to_string();
}
if schema.get("$schema").is_some()
|| (schema.get("type").is_some() && schema.get("properties").is_some())
|| schema.get("required").is_some()
{
return "json_schema".to_string();
}
if let Some(obj) = schema.as_object() {
let all_strings = obj.values().all(|v| {
v.as_str().is_some()
|| (v.is_object() && v.get("type").and_then(|t| t.as_str()).is_some())
});
if all_strings && !obj.is_empty() {
return "json_shape".to_string();
}
}
"json_schema".to_string()
}
fn compare_avro_schemas(
old_schema: &Value,
new_schema: &Value,
path_prefix: &str,
) -> Result<Vec<Mismatch>, ContractError> {
let mut mismatches = Vec::new();
let old_fields = old_schema.get("fields").and_then(|v| v.as_array()).ok_or_else(|| {
ContractError::SchemaValidation("Invalid Avro schema: missing fields".to_string())
})?;
let new_fields = new_schema.get("fields").and_then(|v| v.as_array()).ok_or_else(|| {
ContractError::SchemaValidation("Invalid Avro schema: missing fields".to_string())
})?;
let old_fields_map: HashMap<String, &Value> = old_fields
.iter()
.filter_map(|f| {
f.get("name").and_then(|n| n.as_str()).map(|name| (name.to_string(), f))
})
.collect();
let new_fields_map: HashMap<String, &Value> = new_fields
.iter()
.filter_map(|f| {
f.get("name").and_then(|n| n.as_str()).map(|name| (name.to_string(), f))
})
.collect();
for field_name in old_fields_map.keys() {
if !new_fields_map.contains_key(field_name) {
let mut context = HashMap::new();
context.insert("is_additive".to_string(), serde_json::json!(false));
context.insert("is_breaking".to_string(), serde_json::json!(true));
context.insert("change_category".to_string(), serde_json::json!("field_removed"));
context.insert("field_name".to_string(), serde_json::json!(field_name));
context.insert("schema_format".to_string(), serde_json::json!("avro"));
mismatches.push(Mismatch {
mismatch_type: MismatchType::EndpointNotFound,
path: format!("{}.{}", path_prefix, field_name),
method: None,
expected: Some(format!("Field {} should exist", field_name)),
actual: Some("Field removed".to_string()),
description: format!("Avro field {} was removed", field_name),
severity: MismatchSeverity::High,
confidence: 1.0,
context,
});
}
}
for (field_name, new_field) in &new_fields_map {
if !old_fields_map.contains_key(field_name) {
let has_default = new_field.get("default").is_some();
let is_required = !has_default;
let mut context = HashMap::new();
context.insert("is_additive".to_string(), serde_json::json!(!is_required));
context.insert("is_breaking".to_string(), serde_json::json!(is_required));
context.insert(
"change_category".to_string(),
serde_json::json!(if is_required {
"required_field_added"
} else {
"field_added"
}),
);
context.insert("field_name".to_string(), serde_json::json!(field_name));
context.insert("schema_format".to_string(), serde_json::json!("avro"));
context.insert("has_default".to_string(), serde_json::json!(has_default));
mismatches.push(Mismatch {
mismatch_type: if is_required {
MismatchType::MissingRequiredField
} else {
MismatchType::UnexpectedField
},
path: format!("{}.{}", path_prefix, field_name),
method: None,
expected: None,
actual: Some(format!(
"New Avro field {} ({})",
field_name,
if is_required { "required" } else { "optional" }
)),
description: format!(
"New Avro field {} was added ({})",
field_name,
if is_required {
"required - breaking"
} else {
"optional - additive"
}
),
severity: if is_required {
MismatchSeverity::High
} else {
MismatchSeverity::Low
},
confidence: 1.0,
context,
});
} else {
let old_field = old_fields_map[field_name];
let old_type = old_field.get("type");
let new_type = new_field.get("type");
if old_type != new_type {
let mut context = HashMap::new();
context.insert("is_additive".to_string(), serde_json::json!(false));
context.insert("is_breaking".to_string(), serde_json::json!(true));
context.insert(
"change_category".to_string(),
serde_json::json!("field_type_changed"),
);
context.insert("field_name".to_string(), serde_json::json!(field_name));
context.insert("schema_format".to_string(), serde_json::json!("avro"));
context.insert("old_type".to_string(), serde_json::json!(old_type));
context.insert("new_type".to_string(), serde_json::json!(new_type));
mismatches.push(Mismatch {
mismatch_type: MismatchType::TypeMismatch,
path: format!("{}.{}", path_prefix, field_name),
method: None,
expected: Some(format!("Type: {:?}", old_type)),
actual: Some(format!("Type: {:?}", new_type)),
description: format!("Avro field {} type changed", field_name),
severity: MismatchSeverity::High,
confidence: 1.0,
context,
});
}
}
}
Ok(mismatches)
}
fn compare_json_shape_schemas(
old_schema: &Value,
new_schema: &Value,
path_prefix: &str,
) -> Vec<Mismatch> {
let mut mismatches = Vec::new();
if let (Some(old_obj), Some(new_obj)) = (old_schema.as_object(), new_schema.as_object()) {
for (prop_name, _) in old_obj {
if !new_obj.contains_key(prop_name) {
let mut context = HashMap::new();
context.insert("is_additive".to_string(), serde_json::json!(false));
context.insert("is_breaking".to_string(), serde_json::json!(true));
context.insert(
"change_category".to_string(),
serde_json::json!("property_removed"),
);
context.insert("field_name".to_string(), serde_json::json!(prop_name));
context.insert("schema_format".to_string(), serde_json::json!("json_shape"));
mismatches.push(Mismatch {
mismatch_type: MismatchType::UnexpectedField,
path: format!("{}.{}", path_prefix, prop_name),
method: None,
expected: Some(format!("Property {} should exist", prop_name)),
actual: Some("Property removed".to_string()),
description: format!("Property {} was removed", prop_name),
severity: MismatchSeverity::High,
confidence: 1.0,
context,
});
}
}
for (prop_name, _) in new_obj {
if !old_obj.contains_key(prop_name) {
let mut context = HashMap::new();
context.insert("is_additive".to_string(), serde_json::json!(true));
context.insert("is_breaking".to_string(), serde_json::json!(false));
context
.insert("change_category".to_string(), serde_json::json!("property_added"));
context.insert("field_name".to_string(), serde_json::json!(prop_name));
context.insert("schema_format".to_string(), serde_json::json!("json_shape"));
mismatches.push(Mismatch {
mismatch_type: MismatchType::UnexpectedField,
path: format!("{}.{}", path_prefix, prop_name),
method: None,
expected: None,
actual: Some(format!("New property {}", prop_name)),
description: format!("New property {} was added", prop_name),
severity: MismatchSeverity::Low,
confidence: 1.0,
context,
});
} else {
let old_type = old_obj[prop_name]
.as_str()
.or_else(|| old_obj[prop_name].get("type").and_then(|t| t.as_str()));
let new_type = new_obj[prop_name]
.as_str()
.or_else(|| new_obj[prop_name].get("type").and_then(|t| t.as_str()));
if old_type != new_type {
let mut context = HashMap::new();
context.insert("is_additive".to_string(), serde_json::json!(false));
context.insert("is_breaking".to_string(), serde_json::json!(true));
context.insert(
"change_category".to_string(),
serde_json::json!("property_type_changed"),
);
context.insert("field_name".to_string(), serde_json::json!(prop_name));
context
.insert("schema_format".to_string(), serde_json::json!("json_shape"));
context.insert("old_type".to_string(), serde_json::json!(old_type));
context.insert("new_type".to_string(), serde_json::json!(new_type));
mismatches.push(Mismatch {
mismatch_type: MismatchType::TypeMismatch,
path: format!("{}.{}", path_prefix, prop_name),
method: None,
expected: old_type.map(|t| format!("Type: {}", t)),
actual: new_type.map(|t| format!("Type: {}", t)),
description: format!("Property {} type changed", prop_name),
severity: MismatchSeverity::High,
confidence: 1.0,
context,
});
}
}
}
}
mismatches
}
fn compare_json_schemas(
old_schema: &Value,
new_schema: &Value,
path_prefix: &str,
) -> Vec<Mismatch> {
let mut mismatches = Vec::new();
if let (Some(old_required), Some(new_required)) = (
old_schema.get("required").and_then(|v| v.as_array()),
new_schema.get("required").and_then(|v| v.as_array()),
) {
let old_required_set: std::collections::HashSet<&str> =
old_required.iter().filter_map(|v| v.as_str()).collect();
let new_required_set: std::collections::HashSet<&str> =
new_required.iter().filter_map(|v| v.as_str()).collect();
for new_req in new_required_set.difference(&old_required_set) {
let mut context = HashMap::new();
context.insert("is_additive".to_string(), serde_json::json!(false));
context.insert("is_breaking".to_string(), serde_json::json!(true));
context.insert(
"change_category".to_string(),
serde_json::json!("required_field_added"),
);
context.insert("field_name".to_string(), serde_json::json!(new_req));
context.insert("schema_format".to_string(), serde_json::json!("json_schema"));
mismatches.push(Mismatch {
mismatch_type: MismatchType::MissingRequiredField,
path: format!("{}.{}", path_prefix, new_req),
method: None,
expected: Some(format!("Field {} should be optional", new_req)),
actual: Some(format!("Field {} is now required", new_req)),
description: format!("Field {} became required", new_req),
severity: MismatchSeverity::Critical,
confidence: 1.0,
context,
});
}
for removed_req in old_required_set.difference(&new_required_set) {
let mut context = HashMap::new();
context.insert("is_additive".to_string(), serde_json::json!(true));
context.insert("is_breaking".to_string(), serde_json::json!(false));
context.insert(
"change_category".to_string(),
serde_json::json!("required_field_removed"),
);
context.insert("field_name".to_string(), serde_json::json!(removed_req));
context.insert("schema_format".to_string(), serde_json::json!("json_schema"));
mismatches.push(Mismatch {
mismatch_type: MismatchType::UnexpectedField,
path: format!("{}.{}", path_prefix, removed_req),
method: None,
expected: Some(format!("Field {} was required", removed_req)),
actual: Some(format!("Field {} is now optional", removed_req)),
description: format!("Field {} is no longer required", removed_req),
severity: MismatchSeverity::Low,
confidence: 1.0,
context,
});
}
}
if let (Some(old_props), Some(new_props)) = (
old_schema.get("properties").and_then(|v| v.as_object()),
new_schema.get("properties").and_then(|v| v.as_object()),
) {
for (prop_name, new_prop_schema) in new_props {
if let Some(old_prop_schema) = old_props.get(prop_name) {
if let (Some(old_type), Some(new_type)) = (
old_prop_schema.get("type").and_then(|v| v.as_str()),
new_prop_schema.get("type").and_then(|v| v.as_str()),
) {
if old_type != new_type {
let mut context = HashMap::new();
context.insert("is_additive".to_string(), serde_json::json!(false));
context.insert("is_breaking".to_string(), serde_json::json!(true));
context.insert(
"change_category".to_string(),
serde_json::json!("property_type_changed"),
);
context.insert("field_name".to_string(), serde_json::json!(prop_name));
context.insert("old_type".to_string(), serde_json::json!(old_type));
context.insert("new_type".to_string(), serde_json::json!(new_type));
context.insert(
"schema_format".to_string(),
serde_json::json!("json_schema"),
);
mismatches.push(Mismatch {
mismatch_type: MismatchType::TypeMismatch,
path: format!("{}.{}", path_prefix, prop_name),
method: None,
expected: Some(format!("Type: {}", old_type)),
actual: Some(format!("Type: {}", new_type)),
description: format!(
"Property {} type changed from {} to {}",
prop_name, old_type, new_type
),
severity: MismatchSeverity::High,
confidence: 1.0,
context,
});
}
}
}
}
for prop_name in old_props.keys() {
if !new_props.contains_key(prop_name) {
let mut context = HashMap::new();
context.insert("is_additive".to_string(), serde_json::json!(false));
context.insert("is_breaking".to_string(), serde_json::json!(true));
context.insert(
"change_category".to_string(),
serde_json::json!("property_removed"),
);
context.insert("field_name".to_string(), serde_json::json!(prop_name));
context.insert("schema_format".to_string(), serde_json::json!("json_schema"));
mismatches.push(Mismatch {
mismatch_type: MismatchType::UnexpectedField,
path: format!("{}.{}", path_prefix, prop_name),
method: None,
expected: Some(format!("Property {} should exist", prop_name)),
actual: Some("Property removed".to_string()),
description: format!("Property {} was removed", prop_name),
severity: MismatchSeverity::High,
confidence: 1.0,
context,
});
}
}
for prop_name in new_props.keys() {
if !old_props.contains_key(prop_name) {
let mut context = HashMap::new();
context.insert("is_additive".to_string(), serde_json::json!(true));
context.insert("is_breaking".to_string(), serde_json::json!(false));
context
.insert("change_category".to_string(), serde_json::json!("property_added"));
context.insert("field_name".to_string(), serde_json::json!(prop_name));
context.insert("schema_format".to_string(), serde_json::json!("json_schema"));
mismatches.push(Mismatch {
mismatch_type: MismatchType::UnexpectedField,
path: format!("{}.{}", path_prefix, prop_name),
method: None,
expected: None,
actual: Some(format!("New property {}", prop_name)),
description: format!("New property {} was added", prop_name),
severity: MismatchSeverity::Low,
confidence: 1.0,
context,
});
}
}
}
mismatches
}
fn validate_message_against_schema(
&self,
topic_name: &str,
message: &Value,
) -> Result<ValidationResult, ContractError> {
let schema = self
.schema_cache
.get(topic_name)
.ok_or_else(|| ContractError::OperationNotFound(topic_name.to_string()))?;
let mut validation_errors = Vec::new();
for error in schema.iter_errors(message) {
validation_errors.push(ValidationError {
message: error.to_string(),
path: Some(error.instance_path.to_string()),
code: Some("SCHEMA_VALIDATION_ERROR".to_string()),
});
}
Ok(ValidationResult {
valid: validation_errors.is_empty(),
errors: validation_errors,
warnings: Vec::new(),
})
}
}
#[async_trait::async_trait]
impl ProtocolContract for MqttContract {
fn protocol(&self) -> Protocol {
Protocol::Mqtt
}
fn contract_id(&self) -> &str {
&self.contract_id
}
fn version(&self) -> &str {
&self.version
}
fn operations(&self) -> Vec<ContractOperation> {
self.operations_cache.values().cloned().collect()
}
fn get_operation(&self, operation_id: &str) -> Option<&ContractOperation> {
self.operations_cache.get(operation_id)
}
async fn diff(
&self,
other: &dyn ProtocolContract,
) -> Result<ContractDiffResult, ContractError> {
if other.protocol() != Protocol::Mqtt {
return Err(ContractError::UnsupportedProtocol(other.protocol()));
}
Err(ContractError::Other(
"Direct comparison of MqttContract instances requires type information. \
Use MqttContract::diff_contracts() for comparing two MqttContract instances."
.to_string(),
))
}
async fn validate(
&self,
operation_id: &str,
request: &ContractRequest,
) -> Result<ValidationResult, ContractError> {
let message: Value = serde_json::from_slice(&request.payload)
.map_err(|e| ContractError::SchemaValidation(format!("Invalid JSON: {}", e)))?;
self.validate_message_against_schema(operation_id, &message)
}
fn get_schema(&self, operation_id: &str) -> Option<Value> {
self.topics.get(operation_id).map(|t| t.schema.clone())
}
fn to_json(&self) -> Result<Value, ContractError> {
let topics: Vec<Value> = self
.topics
.values()
.map(|topic| {
serde_json::json!({
"topic": topic.topic,
"qos": topic.qos,
"schema": topic.schema,
"retained": topic.retained,
"description": topic.description,
"example": topic.example,
})
})
.collect();
Ok(serde_json::json!({
"contract_id": self.contract_id,
"version": self.version,
"protocol": "mqtt",
"topics": topics,
"metadata": self.metadata,
}))
}
}
pub fn diff_mqtt_contracts(
old_contract: &MqttContract,
new_contract: &MqttContract,
) -> Result<ContractDiffResult, ContractError> {
old_contract.diff_contracts(new_contract)
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum SchemaFormat {
Json,
Avro,
Protobuf,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KafkaTopicSchema {
pub topic: String,
pub key_schema: Option<TopicSchema>,
pub value_schema: TopicSchema,
pub partitions: Option<u32>,
pub replication_factor: Option<u16>,
pub description: Option<String>,
pub evolution_rules: Option<EvolutionRules>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TopicSchema {
pub format: SchemaFormat,
pub schema: Value,
pub schema_id: Option<String>,
pub version: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EvolutionRules {
pub allow_backward_compatible: bool,
pub allow_forward_compatible: bool,
pub require_version_bump: bool,
}
impl Default for EvolutionRules {
fn default() -> Self {
Self {
allow_backward_compatible: true,
allow_forward_compatible: false,
require_version_bump: true,
}
}
}
pub struct KafkaContract {
contract_id: String,
version: String,
topics: HashMap<String, KafkaTopicSchema>,
schema_cache: HashMap<String, (Option<JSONSchema>, JSONSchema)>, operations_cache: HashMap<String, ContractOperation>,
metadata: HashMap<String, String>,
}
impl KafkaContract {
pub fn new(contract_id: String, version: String) -> Self {
Self {
contract_id,
version,
topics: HashMap::new(),
schema_cache: HashMap::new(),
operations_cache: HashMap::new(),
metadata: HashMap::new(),
}
}
pub fn add_topic(&mut self, topic_schema: KafkaTopicSchema) -> Result<(), ContractError> {
let topic_name = topic_schema.topic.clone();
let value_schema = match topic_schema.value_schema.format {
SchemaFormat::Json => jsonschema::options()
.with_draft(Draft::Draft7)
.build(&topic_schema.value_schema.schema)
.map_err(|e| {
ContractError::SchemaValidation(format!("Invalid JSON schema: {}", e))
})?,
SchemaFormat::Avro | SchemaFormat::Protobuf => {
jsonschema::options()
.with_draft(Draft::Draft7)
.build(&serde_json::json!({}))
.map_err(|e| {
ContractError::SchemaValidation(format!(
"Failed to build fallback schema for {:?}: {}",
topic_schema.value_schema.format, e
))
})?
}
};
let key_schema = if let Some(ref key_schema_def) = topic_schema.key_schema {
match key_schema_def.format {
SchemaFormat::Json => Some(
jsonschema::options()
.with_draft(Draft::Draft7)
.build(&key_schema_def.schema)
.map_err(|e| {
ContractError::SchemaValidation(format!("Invalid JSON schema: {}", e))
})?,
),
SchemaFormat::Avro | SchemaFormat::Protobuf => Some(
jsonschema::options()
.with_draft(Draft::Draft7)
.build(&serde_json::json!({}))
.map_err(|e| {
ContractError::SchemaValidation(format!(
"Failed to build fallback key schema for {:?}: {}",
key_schema_def.format, e
))
})?,
),
}
} else {
None
};
self.schema_cache.insert(topic_name.clone(), (key_schema, value_schema));
self.topics.insert(topic_name.clone(), topic_schema.clone());
let operation = ContractOperation {
id: topic_name.clone(),
name: topic_name.clone(),
operation_type: OperationType::KafkaTopic {
topic: topic_name.clone(),
key_schema: topic_schema.key_schema.as_ref().and_then(|s| s.schema_id.clone()),
value_schema: topic_schema.value_schema.schema_id.clone(),
},
input_schema: Some(serde_json::json!({
"key": topic_schema.key_schema.as_ref().map(|s| s.schema.clone()),
"value": topic_schema.value_schema.schema.clone(),
})),
output_schema: Some(serde_json::json!({
"key": topic_schema.key_schema.as_ref().map(|s| s.schema.clone()),
"value": topic_schema.value_schema.schema.clone(),
})),
metadata: {
let mut meta = HashMap::new();
if let Some(partitions) = topic_schema.partitions {
meta.insert("partitions".to_string(), partitions.to_string());
}
if let Some(ref desc) = topic_schema.description {
meta.insert("description".to_string(), desc.clone());
}
meta
},
};
self.operations_cache.insert(topic_name, operation);
Ok(())
}
pub fn remove_topic(&mut self, topic_name: &str) {
if self.topics.remove(topic_name).is_some() {
self.schema_cache.remove(topic_name);
self.operations_cache.remove(topic_name);
}
}
fn diff_contracts(&self, other: &KafkaContract) -> Result<ContractDiffResult, ContractError> {
let mut mismatches = Vec::new();
let all_topics: std::collections::HashSet<String> =
self.topics.keys().chain(other.topics.keys()).cloned().collect();
for topic_name in &all_topics {
if self.topics.contains_key(topic_name) && !other.topics.contains_key(topic_name) {
mismatches.push(Mismatch {
mismatch_type: MismatchType::EndpointNotFound,
path: topic_name.clone(),
method: None,
expected: Some(format!("Topic {} should exist", topic_name)),
actual: Some("Topic removed".to_string()),
description: format!("Topic {} was removed", topic_name),
severity: MismatchSeverity::Critical,
confidence: 1.0,
context: HashMap::new(),
});
}
}
for topic_name in &all_topics {
if !self.topics.contains_key(topic_name) && other.topics.contains_key(topic_name) {
mismatches.push(Mismatch {
mismatch_type: MismatchType::UnexpectedField,
path: topic_name.clone(),
method: None,
expected: None,
actual: Some(format!("New topic {}", topic_name)),
description: format!("New topic {} was added", topic_name),
severity: MismatchSeverity::Low,
confidence: 1.0,
context: HashMap::new(),
});
}
}
for topic_name in all_topics
.intersection(&self.topics.keys().cloned().collect::<std::collections::HashSet<_>>())
{
if let (Some(old_topic), Some(new_topic)) =
(self.topics.get(topic_name), other.topics.get(topic_name))
{
if old_topic.key_schema.is_some() != new_topic.key_schema.is_some() {
mismatches.push(Mismatch {
mismatch_type: MismatchType::SchemaMismatch,
path: format!("{}.key_schema", topic_name),
method: None,
expected: Some(if old_topic.key_schema.is_some() {
"Key schema should exist".to_string()
} else {
"Key schema should not exist".to_string()
}),
actual: Some(if new_topic.key_schema.is_some() {
"Key schema added".to_string()
} else {
"Key schema removed".to_string()
}),
description: format!(
"Key schema presence changed for topic {}",
topic_name
),
severity: MismatchSeverity::High,
confidence: 1.0,
context: HashMap::new(),
});
} else if let (Some(old_key), Some(new_key)) =
(&old_topic.key_schema, &new_topic.key_schema)
{
if old_key.schema != new_key.schema {
let key_mismatches = MqttContract::compare_json_schemas(
&old_key.schema,
&new_key.schema,
&format!("{}.key", topic_name),
);
mismatches.extend(key_mismatches);
}
}
if old_topic.value_schema.schema != new_topic.value_schema.schema {
let value_mismatches = MqttContract::compare_json_schemas(
&old_topic.value_schema.schema,
&new_topic.value_schema.schema,
&format!("{}.value", topic_name),
);
mismatches.extend(value_mismatches);
}
if let Some(ref evolution_rules) = new_topic.evolution_rules {
let has_breaking_changes = mismatches.iter().any(|m| {
matches!(m.severity, MismatchSeverity::Critical | MismatchSeverity::High)
});
if has_breaking_changes && !evolution_rules.allow_backward_compatible {
mismatches.push(Mismatch {
mismatch_type: MismatchType::SchemaMismatch,
path: format!("{}.evolution_rules", topic_name),
method: None,
expected: Some("Backward compatible changes only".to_string()),
actual: Some("Breaking changes detected".to_string()),
description: format!(
"Topic {} has breaking changes but evolution rules require backward compatibility",
topic_name
),
severity: MismatchSeverity::High,
confidence: 1.0,
context: HashMap::new(),
});
}
}
}
}
let matches = mismatches.is_empty();
let confidence = if matches { 1.0 } else { 0.8 };
Ok(ContractDiffResult {
matches,
confidence,
mismatches,
recommendations: Vec::new(),
corrections: Vec::new(),
metadata: crate::ai_contract_diff::DiffMetadata {
analyzed_at: chrono::Utc::now(),
request_source: "kafka_contract_diff".to_string(),
contract_version: Some(self.version.clone()),
contract_format: "kafka_schema".to_string(),
endpoint_path: "".to_string(),
http_method: "".to_string(),
request_count: 1,
llm_provider: None,
llm_model: None,
},
})
}
fn validate_message_against_schema(
&self,
topic_name: &str,
key: Option<&Value>,
value: &Value,
) -> Result<ValidationResult, ContractError> {
let (key_schema_opt, value_schema) = self
.schema_cache
.get(topic_name)
.ok_or_else(|| ContractError::OperationNotFound(topic_name.to_string()))?;
let mut validation_errors = Vec::new();
if let (Some(key_value), Some(key_schema)) = (key, key_schema_opt) {
for error in key_schema.iter_errors(key_value) {
validation_errors.push(ValidationError {
message: format!("Key validation error: {}", error),
path: Some(format!("{}.key{}", topic_name, error.instance_path)),
code: Some("KEY_SCHEMA_VALIDATION_ERROR".to_string()),
});
}
}
for error in value_schema.iter_errors(value) {
validation_errors.push(ValidationError {
message: format!("Value validation error: {}", error),
path: Some(format!("{}.value{}", topic_name, error.instance_path)),
code: Some("VALUE_SCHEMA_VALIDATION_ERROR".to_string()),
});
}
Ok(ValidationResult {
valid: validation_errors.is_empty(),
errors: validation_errors,
warnings: Vec::new(),
})
}
}
#[async_trait::async_trait]
impl ProtocolContract for KafkaContract {
fn protocol(&self) -> Protocol {
Protocol::Kafka
}
fn contract_id(&self) -> &str {
&self.contract_id
}
fn version(&self) -> &str {
&self.version
}
fn operations(&self) -> Vec<ContractOperation> {
self.operations_cache.values().cloned().collect()
}
fn get_operation(&self, operation_id: &str) -> Option<&ContractOperation> {
self.operations_cache.get(operation_id)
}
async fn diff(
&self,
other: &dyn ProtocolContract,
) -> Result<ContractDiffResult, ContractError> {
if other.protocol() != Protocol::Kafka {
return Err(ContractError::UnsupportedProtocol(other.protocol()));
}
Err(ContractError::Other(
"Direct comparison of KafkaContract instances requires type information. \
Use KafkaContract::diff_contracts() for comparing two KafkaContract instances."
.to_string(),
))
}
async fn validate(
&self,
operation_id: &str,
request: &ContractRequest,
) -> Result<ValidationResult, ContractError> {
let value: Value = serde_json::from_slice(&request.payload)
.map_err(|e| ContractError::SchemaValidation(format!("Invalid JSON: {}", e)))?;
let key = request.metadata.get("key").and_then(|k| serde_json::from_str::<Value>(k).ok());
self.validate_message_against_schema(operation_id, key.as_ref(), &value)
}
fn get_schema(&self, operation_id: &str) -> Option<Value> {
self.topics.get(operation_id).map(|topic| {
serde_json::json!({
"key": topic.key_schema.as_ref().map(|s| s.schema.clone()),
"value": topic.value_schema.schema.clone(),
})
})
}
fn to_json(&self) -> Result<Value, ContractError> {
let topics: Vec<Value> = self
.topics
.values()
.map(|topic| {
serde_json::json!({
"topic": topic.topic,
"key_schema": topic.key_schema.as_ref().map(|_s| {
serde_json::json!({
"format": topic.key_schema.as_ref().unwrap().format,
"schema": topic.key_schema.as_ref().unwrap().schema,
"schema_id": topic.key_schema.as_ref().unwrap().schema_id,
"version": topic.key_schema.as_ref().unwrap().version,
})
}),
"value_schema": {
"format": topic.value_schema.format,
"schema": topic.value_schema.schema,
"schema_id": topic.value_schema.schema_id,
"version": topic.value_schema.version,
},
"partitions": topic.partitions,
"replication_factor": topic.replication_factor,
"description": topic.description,
"evolution_rules": topic.evolution_rules,
})
})
.collect();
Ok(serde_json::json!({
"contract_id": self.contract_id,
"version": self.version,
"protocol": "kafka",
"topics": topics,
"metadata": self.metadata,
}))
}
}
pub fn diff_kafka_contracts(
old_contract: &KafkaContract,
new_contract: &KafkaContract,
) -> Result<ContractDiffResult, ContractError> {
old_contract.diff_contracts(new_contract)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_mqtt_contract_creation() {
let contract = MqttContract::new("test-contract".to_string(), "1.0.0".to_string());
assert_eq!(contract.contract_id(), "test-contract");
assert_eq!(contract.version(), "1.0.0");
}
#[test]
fn test_kafka_contract_creation() {
let contract = KafkaContract::new("test-contract".to_string(), "1.0.0".to_string());
assert_eq!(contract.contract_id(), "test-contract");
assert_eq!(contract.version(), "1.0.0");
}
}