1use crate::contract_drift::protocol_contracts::{
8 ContractError, ContractOperation, ContractRequest, OperationType, ProtocolContract,
9 ValidationError, ValidationResult,
10};
11use jsonschema::{self, Draft, Validator as JSONSchema};
12use mockforge_foundation::contract_diff_types::{
13 ContractDiffResult, Mismatch, MismatchSeverity, MismatchType,
14};
15use mockforge_foundation::protocol::Protocol;
16use serde_json::Value;
17use std::collections::HashMap;
18
19pub use mockforge_foundation::protocol_contract_types::MqttTopicSchema;
25
26pub struct MqttContract {
31 contract_id: String,
33 version: String,
35 topics: HashMap<String, MqttTopicSchema>,
37 schema_cache: HashMap<String, JSONSchema>,
39 operations_cache: HashMap<String, ContractOperation>,
41 metadata: HashMap<String, String>,
43}
44
45impl MqttContract {
46 pub fn new(contract_id: String, version: String) -> Self {
48 Self {
49 contract_id,
50 version,
51 topics: HashMap::new(),
52 schema_cache: HashMap::new(),
53 operations_cache: HashMap::new(),
54 metadata: HashMap::new(),
55 }
56 }
57
58 pub fn add_topic(&mut self, topic_schema: MqttTopicSchema) -> Result<(), ContractError> {
60 let topic_name = topic_schema.topic.clone();
61
62 let schema = jsonschema::options()
64 .with_draft(Draft::Draft7)
65 .build(&topic_schema.schema)
66 .map_err(|e| ContractError::SchemaValidation(format!("Invalid JSON schema: {}", e)))?;
67 self.schema_cache.insert(topic_name.clone(), schema);
68
69 self.topics.insert(topic_name.clone(), topic_schema.clone());
71
72 let operation = ContractOperation {
74 id: topic_name.clone(),
75 name: topic_name.clone(),
76 operation_type: OperationType::MqttTopic {
77 topic: topic_name.clone(),
78 qos: topic_schema.qos,
79 },
80 input_schema: Some(topic_schema.schema.clone()),
81 output_schema: Some(topic_schema.schema.clone()), metadata: {
83 let mut meta = HashMap::new();
84 if let Some(retained) = topic_schema.retained {
85 meta.insert("retained".to_string(), retained.to_string());
86 }
87 if let Some(ref desc) = topic_schema.description {
88 meta.insert("description".to_string(), desc.clone());
89 }
90 meta
91 },
92 };
93 self.operations_cache.insert(topic_name, operation);
94
95 Ok(())
96 }
97
98 pub fn remove_topic(&mut self, topic_name: &str) {
100 if self.topics.remove(topic_name).is_some() {
101 self.schema_cache.remove(topic_name);
102 self.operations_cache.remove(topic_name);
103 }
104 }
105
106 fn diff_contracts(&self, other: &MqttContract) -> Result<ContractDiffResult, ContractError> {
108 let mut mismatches = Vec::new();
109
110 let all_topics: std::collections::HashSet<String> =
112 self.topics.keys().chain(other.topics.keys()).cloned().collect();
113
114 for topic_name in &all_topics {
116 if self.topics.contains_key(topic_name) && !other.topics.contains_key(topic_name) {
117 mismatches.push(Mismatch {
118 mismatch_type: MismatchType::EndpointNotFound,
119 path: topic_name.clone(),
120 method: None,
121 expected: Some(format!("Topic {} should exist", topic_name)),
122 actual: Some("Topic removed".to_string()),
123 description: format!("Topic {} was removed", topic_name),
124 severity: MismatchSeverity::Critical,
125 confidence: 1.0,
126 context: HashMap::new(),
127 });
128 }
129 }
130
131 for topic_name in &all_topics {
133 if !self.topics.contains_key(topic_name) && other.topics.contains_key(topic_name) {
134 mismatches.push(Mismatch {
135 mismatch_type: MismatchType::UnexpectedField,
136 path: topic_name.clone(),
137 method: None,
138 expected: None,
139 actual: Some(format!("New topic {}", topic_name)),
140 description: format!("New topic {} was added", topic_name),
141 severity: MismatchSeverity::Low,
142 confidence: 1.0,
143 context: HashMap::new(),
144 });
145 }
146 }
147
148 for topic_name in all_topics
150 .intersection(&self.topics.keys().cloned().collect::<std::collections::HashSet<_>>())
151 {
152 if let (Some(old_topic), Some(new_topic)) =
153 (self.topics.get(topic_name), other.topics.get(topic_name))
154 {
155 if old_topic.qos != new_topic.qos {
157 mismatches.push(Mismatch {
158 mismatch_type: MismatchType::SchemaMismatch,
159 path: format!("{}.qos", topic_name),
160 method: None,
161 expected: old_topic.qos.map(|q| format!("QoS: {}", q)),
162 actual: new_topic.qos.map(|q| format!("QoS: {}", q)),
163 description: format!(
164 "QoS changed for topic {}: {:?} -> {:?}",
165 topic_name, old_topic.qos, new_topic.qos
166 ),
167 severity: MismatchSeverity::Medium,
168 confidence: 1.0,
169 context: HashMap::new(),
170 });
171 }
172
173 let old_format = Self::detect_schema_format(&old_topic.schema);
175 let new_format = Self::detect_schema_format(&new_topic.schema);
176
177 if old_format != new_format {
179 let mut context = HashMap::new();
180 context.insert("is_additive".to_string(), serde_json::json!(false));
181 context.insert("is_breaking".to_string(), serde_json::json!(true));
182 context.insert(
183 "change_category".to_string(),
184 serde_json::json!("schema_format_changed"),
185 );
186 context.insert("topic".to_string(), serde_json::json!(topic_name));
187 context.insert("old_format".to_string(), serde_json::json!(old_format));
188 context.insert("new_format".to_string(), serde_json::json!(new_format));
189
190 mismatches.push(Mismatch {
191 mismatch_type: MismatchType::SchemaMismatch,
192 path: format!("{}.schema_format", topic_name),
193 method: None,
194 expected: Some(format!("Schema format: {}", old_format)),
195 actual: Some(format!("Schema format: {}", new_format)),
196 description: format!(
197 "Schema format changed from {} to {} for topic {}",
198 old_format, new_format, topic_name
199 ),
200 severity: MismatchSeverity::High,
201 confidence: 1.0,
202 context,
203 });
204 }
205
206 let schema_mismatches = match (old_format.as_str(), new_format.as_str()) {
208 ("json_schema", "json_schema") => {
209 Self::compare_json_schemas(&old_topic.schema, &new_topic.schema, topic_name)
210 }
211 ("avro", "avro") => {
212 Self::compare_avro_schemas(&old_topic.schema, &new_topic.schema, topic_name)
213 .unwrap_or_else(|_| Vec::new()) }
215 ("json_shape", "json_shape") => Self::compare_json_shape_schemas(
216 &old_topic.schema,
217 &new_topic.schema,
218 topic_name,
219 ),
220 _ => Vec::new(), };
222 mismatches.extend(schema_mismatches);
223 }
224 }
225
226 let matches = mismatches.is_empty();
227 let confidence = if matches { 1.0 } else { 0.8 };
228
229 Ok(ContractDiffResult {
230 matches,
231 confidence,
232 mismatches,
233 recommendations: Vec::new(),
234 corrections: Vec::new(),
235 metadata: mockforge_foundation::contract_diff_types::DiffMetadata {
236 analyzed_at: chrono::Utc::now(),
237 request_source: "mqtt_contract_diff".to_string(),
238 contract_version: Some(self.version.clone()),
239 contract_format: "mqtt_schema".to_string(),
240 endpoint_path: "".to_string(),
241 http_method: "".to_string(),
242 request_count: 1,
243 llm_provider: None,
244 llm_model: None,
245 },
246 })
247 }
248
249 fn detect_schema_format(schema: &Value) -> String {
251 if schema.get("type").and_then(|v| v.as_str()) == Some("record")
253 || schema.get("fields").is_some()
254 {
255 return "avro".to_string();
256 }
257
258 if schema.get("$schema").is_some()
260 || (schema.get("type").is_some() && schema.get("properties").is_some())
261 || schema.get("required").is_some()
262 {
263 return "json_schema".to_string();
264 }
265
266 if let Some(obj) = schema.as_object() {
268 let all_strings = obj.values().all(|v| {
269 v.as_str().is_some()
270 || (v.is_object() && v.get("type").and_then(|t| t.as_str()).is_some())
271 });
272 if all_strings && !obj.is_empty() {
273 return "json_shape".to_string();
274 }
275 }
276
277 "json_schema".to_string()
279 }
280
281 fn compare_avro_schemas(
283 old_schema: &Value,
284 new_schema: &Value,
285 path_prefix: &str,
286 ) -> Result<Vec<Mismatch>, ContractError> {
287 let mut mismatches = Vec::new();
288
289 let old_fields = old_schema.get("fields").and_then(|v| v.as_array()).ok_or_else(|| {
291 ContractError::SchemaValidation("Invalid Avro schema: missing fields".to_string())
292 })?;
293 let new_fields = new_schema.get("fields").and_then(|v| v.as_array()).ok_or_else(|| {
294 ContractError::SchemaValidation("Invalid Avro schema: missing fields".to_string())
295 })?;
296
297 let old_fields_map: HashMap<String, &Value> = old_fields
299 .iter()
300 .filter_map(|f| {
301 f.get("name").and_then(|n| n.as_str()).map(|name| (name.to_string(), f))
302 })
303 .collect();
304 let new_fields_map: HashMap<String, &Value> = new_fields
305 .iter()
306 .filter_map(|f| {
307 f.get("name").and_then(|n| n.as_str()).map(|name| (name.to_string(), f))
308 })
309 .collect();
310
311 for field_name in old_fields_map.keys() {
313 if !new_fields_map.contains_key(field_name) {
314 let mut context = HashMap::new();
315 context.insert("is_additive".to_string(), serde_json::json!(false));
316 context.insert("is_breaking".to_string(), serde_json::json!(true));
317 context.insert("change_category".to_string(), serde_json::json!("field_removed"));
318 context.insert("field_name".to_string(), serde_json::json!(field_name));
319 context.insert("schema_format".to_string(), serde_json::json!("avro"));
320
321 mismatches.push(Mismatch {
322 mismatch_type: MismatchType::EndpointNotFound,
323 path: format!("{}.{}", path_prefix, field_name),
324 method: None,
325 expected: Some(format!("Field {} should exist", field_name)),
326 actual: Some("Field removed".to_string()),
327 description: format!("Avro field {} was removed", field_name),
328 severity: MismatchSeverity::High,
329 confidence: 1.0,
330 context,
331 });
332 }
333 }
334
335 for (field_name, new_field) in &new_fields_map {
337 if !old_fields_map.contains_key(field_name) {
338 let has_default = new_field.get("default").is_some();
340 let is_required = !has_default;
341
342 let mut context = HashMap::new();
343 context.insert("is_additive".to_string(), serde_json::json!(!is_required));
344 context.insert("is_breaking".to_string(), serde_json::json!(is_required));
345 context.insert(
346 "change_category".to_string(),
347 serde_json::json!(if is_required {
348 "required_field_added"
349 } else {
350 "field_added"
351 }),
352 );
353 context.insert("field_name".to_string(), serde_json::json!(field_name));
354 context.insert("schema_format".to_string(), serde_json::json!("avro"));
355 context.insert("has_default".to_string(), serde_json::json!(has_default));
356
357 mismatches.push(Mismatch {
358 mismatch_type: if is_required {
359 MismatchType::MissingRequiredField
360 } else {
361 MismatchType::UnexpectedField
362 },
363 path: format!("{}.{}", path_prefix, field_name),
364 method: None,
365 expected: None,
366 actual: Some(format!(
367 "New Avro field {} ({})",
368 field_name,
369 if is_required { "required" } else { "optional" }
370 )),
371 description: format!(
372 "New Avro field {} was added ({})",
373 field_name,
374 if is_required {
375 "required - breaking"
376 } else {
377 "optional - additive"
378 }
379 ),
380 severity: if is_required {
381 MismatchSeverity::High
382 } else {
383 MismatchSeverity::Low
384 },
385 confidence: 1.0,
386 context,
387 });
388 } else {
389 let old_field = old_fields_map[field_name];
391 let old_type = old_field.get("type");
392 let new_type = new_field.get("type");
393
394 if old_type != new_type {
395 let mut context = HashMap::new();
396 context.insert("is_additive".to_string(), serde_json::json!(false));
397 context.insert("is_breaking".to_string(), serde_json::json!(true));
398 context.insert(
399 "change_category".to_string(),
400 serde_json::json!("field_type_changed"),
401 );
402 context.insert("field_name".to_string(), serde_json::json!(field_name));
403 context.insert("schema_format".to_string(), serde_json::json!("avro"));
404 context.insert("old_type".to_string(), serde_json::json!(old_type));
405 context.insert("new_type".to_string(), serde_json::json!(new_type));
406
407 mismatches.push(Mismatch {
408 mismatch_type: MismatchType::TypeMismatch,
409 path: format!("{}.{}", path_prefix, field_name),
410 method: None,
411 expected: Some(format!("Type: {:?}", old_type)),
412 actual: Some(format!("Type: {:?}", new_type)),
413 description: format!("Avro field {} type changed", field_name),
414 severity: MismatchSeverity::High,
415 confidence: 1.0,
416 context,
417 });
418 }
419 }
420 }
421
422 Ok(mismatches)
423 }
424
425 fn compare_json_shape_schemas(
427 old_schema: &Value,
428 new_schema: &Value,
429 path_prefix: &str,
430 ) -> Vec<Mismatch> {
431 let mut mismatches = Vec::new();
432
433 if let (Some(old_obj), Some(new_obj)) = (old_schema.as_object(), new_schema.as_object()) {
434 for (prop_name, _) in old_obj {
436 if !new_obj.contains_key(prop_name) {
437 let mut context = HashMap::new();
438 context.insert("is_additive".to_string(), serde_json::json!(false));
439 context.insert("is_breaking".to_string(), serde_json::json!(true));
440 context.insert(
441 "change_category".to_string(),
442 serde_json::json!("property_removed"),
443 );
444 context.insert("field_name".to_string(), serde_json::json!(prop_name));
445 context.insert("schema_format".to_string(), serde_json::json!("json_shape"));
446
447 mismatches.push(Mismatch {
448 mismatch_type: MismatchType::UnexpectedField,
449 path: format!("{}.{}", path_prefix, prop_name),
450 method: None,
451 expected: Some(format!("Property {} should exist", prop_name)),
452 actual: Some("Property removed".to_string()),
453 description: format!("Property {} was removed", prop_name),
454 severity: MismatchSeverity::High,
455 confidence: 1.0,
456 context,
457 });
458 }
459 }
460
461 for (prop_name, _) in new_obj {
463 if !old_obj.contains_key(prop_name) {
464 let mut context = HashMap::new();
465 context.insert("is_additive".to_string(), serde_json::json!(true));
466 context.insert("is_breaking".to_string(), serde_json::json!(false));
467 context
468 .insert("change_category".to_string(), serde_json::json!("property_added"));
469 context.insert("field_name".to_string(), serde_json::json!(prop_name));
470 context.insert("schema_format".to_string(), serde_json::json!("json_shape"));
471
472 mismatches.push(Mismatch {
473 mismatch_type: MismatchType::UnexpectedField,
474 path: format!("{}.{}", path_prefix, prop_name),
475 method: None,
476 expected: None,
477 actual: Some(format!("New property {}", prop_name)),
478 description: format!("New property {} was added", prop_name),
479 severity: MismatchSeverity::Low,
480 confidence: 1.0,
481 context,
482 });
483 } else {
484 let old_type = old_obj[prop_name]
486 .as_str()
487 .or_else(|| old_obj[prop_name].get("type").and_then(|t| t.as_str()));
488 let new_type = new_obj[prop_name]
489 .as_str()
490 .or_else(|| new_obj[prop_name].get("type").and_then(|t| t.as_str()));
491
492 if old_type != new_type {
493 let mut context = HashMap::new();
494 context.insert("is_additive".to_string(), serde_json::json!(false));
495 context.insert("is_breaking".to_string(), serde_json::json!(true));
496 context.insert(
497 "change_category".to_string(),
498 serde_json::json!("property_type_changed"),
499 );
500 context.insert("field_name".to_string(), serde_json::json!(prop_name));
501 context
502 .insert("schema_format".to_string(), serde_json::json!("json_shape"));
503 context.insert("old_type".to_string(), serde_json::json!(old_type));
504 context.insert("new_type".to_string(), serde_json::json!(new_type));
505
506 mismatches.push(Mismatch {
507 mismatch_type: MismatchType::TypeMismatch,
508 path: format!("{}.{}", path_prefix, prop_name),
509 method: None,
510 expected: old_type.map(|t| format!("Type: {}", t)),
511 actual: new_type.map(|t| format!("Type: {}", t)),
512 description: format!("Property {} type changed", prop_name),
513 severity: MismatchSeverity::High,
514 confidence: 1.0,
515 context,
516 });
517 }
518 }
519 }
520 }
521
522 mismatches
523 }
524
525 fn compare_json_schemas(
527 old_schema: &Value,
528 new_schema: &Value,
529 path_prefix: &str,
530 ) -> Vec<Mismatch> {
531 let mut mismatches = Vec::new();
532
533 if let (Some(old_required), Some(new_required)) = (
535 old_schema.get("required").and_then(|v| v.as_array()),
536 new_schema.get("required").and_then(|v| v.as_array()),
537 ) {
538 let old_required_set: std::collections::HashSet<&str> =
539 old_required.iter().filter_map(|v| v.as_str()).collect();
540 let new_required_set: std::collections::HashSet<&str> =
541 new_required.iter().filter_map(|v| v.as_str()).collect();
542
543 for new_req in new_required_set.difference(&old_required_set) {
545 let mut context = HashMap::new();
546 context.insert("is_additive".to_string(), serde_json::json!(false));
547 context.insert("is_breaking".to_string(), serde_json::json!(true));
548 context.insert(
549 "change_category".to_string(),
550 serde_json::json!("required_field_added"),
551 );
552 context.insert("field_name".to_string(), serde_json::json!(new_req));
553 context.insert("schema_format".to_string(), serde_json::json!("json_schema"));
554
555 mismatches.push(Mismatch {
556 mismatch_type: MismatchType::MissingRequiredField,
557 path: format!("{}.{}", path_prefix, new_req),
558 method: None,
559 expected: Some(format!("Field {} should be optional", new_req)),
560 actual: Some(format!("Field {} is now required", new_req)),
561 description: format!("Field {} became required", new_req),
562 severity: MismatchSeverity::Critical,
563 confidence: 1.0,
564 context,
565 });
566 }
567
568 for removed_req in old_required_set.difference(&new_required_set) {
570 let mut context = HashMap::new();
571 context.insert("is_additive".to_string(), serde_json::json!(true));
572 context.insert("is_breaking".to_string(), serde_json::json!(false));
573 context.insert(
574 "change_category".to_string(),
575 serde_json::json!("required_field_removed"),
576 );
577 context.insert("field_name".to_string(), serde_json::json!(removed_req));
578 context.insert("schema_format".to_string(), serde_json::json!("json_schema"));
579
580 mismatches.push(Mismatch {
581 mismatch_type: MismatchType::UnexpectedField,
582 path: format!("{}.{}", path_prefix, removed_req),
583 method: None,
584 expected: Some(format!("Field {} was required", removed_req)),
585 actual: Some(format!("Field {} is now optional", removed_req)),
586 description: format!("Field {} is no longer required", removed_req),
587 severity: MismatchSeverity::Low,
588 confidence: 1.0,
589 context,
590 });
591 }
592 }
593
594 if let (Some(old_props), Some(new_props)) = (
596 old_schema.get("properties").and_then(|v| v.as_object()),
597 new_schema.get("properties").and_then(|v| v.as_object()),
598 ) {
599 for (prop_name, new_prop_schema) in new_props {
600 if let Some(old_prop_schema) = old_props.get(prop_name) {
601 if let (Some(old_type), Some(new_type)) = (
602 old_prop_schema.get("type").and_then(|v| v.as_str()),
603 new_prop_schema.get("type").and_then(|v| v.as_str()),
604 ) {
605 if old_type != new_type {
606 let mut context = HashMap::new();
607 context.insert("is_additive".to_string(), serde_json::json!(false));
608 context.insert("is_breaking".to_string(), serde_json::json!(true));
609 context.insert(
610 "change_category".to_string(),
611 serde_json::json!("property_type_changed"),
612 );
613 context.insert("field_name".to_string(), serde_json::json!(prop_name));
614 context.insert("old_type".to_string(), serde_json::json!(old_type));
615 context.insert("new_type".to_string(), serde_json::json!(new_type));
616 context.insert(
617 "schema_format".to_string(),
618 serde_json::json!("json_schema"),
619 );
620
621 mismatches.push(Mismatch {
622 mismatch_type: MismatchType::TypeMismatch,
623 path: format!("{}.{}", path_prefix, prop_name),
624 method: None,
625 expected: Some(format!("Type: {}", old_type)),
626 actual: Some(format!("Type: {}", new_type)),
627 description: format!(
628 "Property {} type changed from {} to {}",
629 prop_name, old_type, new_type
630 ),
631 severity: MismatchSeverity::High,
632 confidence: 1.0,
633 context,
634 });
635 }
636 }
637 }
638 }
639
640 for prop_name in old_props.keys() {
642 if !new_props.contains_key(prop_name) {
643 let mut context = HashMap::new();
644 context.insert("is_additive".to_string(), serde_json::json!(false));
645 context.insert("is_breaking".to_string(), serde_json::json!(true));
646 context.insert(
647 "change_category".to_string(),
648 serde_json::json!("property_removed"),
649 );
650 context.insert("field_name".to_string(), serde_json::json!(prop_name));
651 context.insert("schema_format".to_string(), serde_json::json!("json_schema"));
652
653 mismatches.push(Mismatch {
654 mismatch_type: MismatchType::UnexpectedField,
655 path: format!("{}.{}", path_prefix, prop_name),
656 method: None,
657 expected: Some(format!("Property {} should exist", prop_name)),
658 actual: Some("Property removed".to_string()),
659 description: format!("Property {} was removed", prop_name),
660 severity: MismatchSeverity::High,
661 confidence: 1.0,
662 context,
663 });
664 }
665 }
666
667 for prop_name in new_props.keys() {
669 if !old_props.contains_key(prop_name) {
670 let mut context = HashMap::new();
671 context.insert("is_additive".to_string(), serde_json::json!(true));
672 context.insert("is_breaking".to_string(), serde_json::json!(false));
673 context
674 .insert("change_category".to_string(), serde_json::json!("property_added"));
675 context.insert("field_name".to_string(), serde_json::json!(prop_name));
676 context.insert("schema_format".to_string(), serde_json::json!("json_schema"));
677
678 mismatches.push(Mismatch {
679 mismatch_type: MismatchType::UnexpectedField,
680 path: format!("{}.{}", path_prefix, prop_name),
681 method: None,
682 expected: None,
683 actual: Some(format!("New property {}", prop_name)),
684 description: format!("New property {} was added", prop_name),
685 severity: MismatchSeverity::Low,
686 confidence: 1.0,
687 context,
688 });
689 }
690 }
691 }
692
693 mismatches
694 }
695
696 fn validate_message_against_schema(
698 &self,
699 topic_name: &str,
700 message: &Value,
701 ) -> Result<ValidationResult, ContractError> {
702 let schema = self
703 .schema_cache
704 .get(topic_name)
705 .ok_or_else(|| ContractError::OperationNotFound(topic_name.to_string()))?;
706
707 let mut validation_errors = Vec::new();
709 for error in schema.iter_errors(message) {
710 validation_errors.push(ValidationError {
711 message: error.to_string(),
712 path: Some(error.instance_path.to_string()),
713 code: Some("SCHEMA_VALIDATION_ERROR".to_string()),
714 });
715 }
716
717 Ok(ValidationResult {
718 valid: validation_errors.is_empty(),
719 errors: validation_errors,
720 warnings: Vec::new(),
721 })
722 }
723}
724
725#[async_trait::async_trait]
726impl ProtocolContract for MqttContract {
727 fn protocol(&self) -> Protocol {
728 Protocol::Mqtt
729 }
730
731 fn contract_id(&self) -> &str {
732 &self.contract_id
733 }
734
735 fn version(&self) -> &str {
736 &self.version
737 }
738
739 fn operations(&self) -> Vec<ContractOperation> {
740 self.operations_cache.values().cloned().collect()
741 }
742
743 fn get_operation(&self, operation_id: &str) -> Option<&ContractOperation> {
744 self.operations_cache.get(operation_id)
745 }
746
747 async fn diff(
748 &self,
749 other: &dyn ProtocolContract,
750 ) -> Result<ContractDiffResult, ContractError> {
751 if other.protocol() != Protocol::Mqtt {
752 return Err(ContractError::UnsupportedProtocol(other.protocol()));
753 }
754
755 Err(ContractError::Other(
756 "Direct comparison of MqttContract instances requires type information. \
757 Use MqttContract::diff_contracts() for comparing two MqttContract instances."
758 .to_string(),
759 ))
760 }
761
762 async fn validate(
763 &self,
764 operation_id: &str,
765 request: &ContractRequest,
766 ) -> Result<ValidationResult, ContractError> {
767 let message: Value = serde_json::from_slice(&request.payload)
769 .map_err(|e| ContractError::SchemaValidation(format!("Invalid JSON: {}", e)))?;
770
771 self.validate_message_against_schema(operation_id, &message)
773 }
774
775 fn get_schema(&self, operation_id: &str) -> Option<Value> {
776 self.topics.get(operation_id).map(|t| t.schema.clone())
777 }
778
779 fn to_json(&self) -> Result<Value, ContractError> {
780 let topics: Vec<Value> = self
781 .topics
782 .values()
783 .map(|topic| {
784 serde_json::json!({
785 "topic": topic.topic,
786 "qos": topic.qos,
787 "schema": topic.schema,
788 "retained": topic.retained,
789 "description": topic.description,
790 "example": topic.example,
791 })
792 })
793 .collect();
794
795 Ok(serde_json::json!({
796 "contract_id": self.contract_id,
797 "version": self.version,
798 "protocol": "mqtt",
799 "topics": topics,
800 "metadata": self.metadata,
801 }))
802 }
803}
804
805pub fn diff_mqtt_contracts(
807 old_contract: &MqttContract,
808 new_contract: &MqttContract,
809) -> Result<ContractDiffResult, ContractError> {
810 old_contract.diff_contracts(new_contract)
811}
812
813pub use mockforge_foundation::protocol_contract_types::{
819 EvolutionRules, KafkaTopicSchema, SchemaFormat, TopicSchema,
820};
821
822pub struct KafkaContract {
827 contract_id: String,
829 version: String,
831 topics: HashMap<String, KafkaTopicSchema>,
833 schema_cache: HashMap<String, (Option<JSONSchema>, JSONSchema)>, operations_cache: HashMap<String, ContractOperation>,
837 metadata: HashMap<String, String>,
839}
840
841impl KafkaContract {
842 pub fn new(contract_id: String, version: String) -> Self {
844 Self {
845 contract_id,
846 version,
847 topics: HashMap::new(),
848 schema_cache: HashMap::new(),
849 operations_cache: HashMap::new(),
850 metadata: HashMap::new(),
851 }
852 }
853
854 pub fn add_topic(&mut self, topic_schema: KafkaTopicSchema) -> Result<(), ContractError> {
856 let topic_name = topic_schema.topic.clone();
857
858 let value_schema = match topic_schema.value_schema.format {
861 SchemaFormat::Json => jsonschema::options()
862 .with_draft(Draft::Draft7)
863 .build(&topic_schema.value_schema.schema)
864 .map_err(|e| {
865 ContractError::SchemaValidation(format!("Invalid JSON schema: {}", e))
866 })?,
867 SchemaFormat::Avro | SchemaFormat::Protobuf => {
868 jsonschema::options()
872 .with_draft(Draft::Draft7)
873 .build(&serde_json::json!({}))
874 .map_err(|e| {
875 ContractError::SchemaValidation(format!(
876 "Failed to build fallback schema for {:?}: {}",
877 topic_schema.value_schema.format, e
878 ))
879 })?
880 }
881 };
882
883 let key_schema = if let Some(ref key_schema_def) = topic_schema.key_schema {
884 match key_schema_def.format {
885 SchemaFormat::Json => Some(
886 jsonschema::options()
887 .with_draft(Draft::Draft7)
888 .build(&key_schema_def.schema)
889 .map_err(|e| {
890 ContractError::SchemaValidation(format!("Invalid JSON schema: {}", e))
891 })?,
892 ),
893 SchemaFormat::Avro | SchemaFormat::Protobuf => Some(
894 jsonschema::options()
895 .with_draft(Draft::Draft7)
896 .build(&serde_json::json!({}))
897 .map_err(|e| {
898 ContractError::SchemaValidation(format!(
899 "Failed to build fallback key schema for {:?}: {}",
900 key_schema_def.format, e
901 ))
902 })?,
903 ),
904 }
905 } else {
906 None
907 };
908
909 self.schema_cache.insert(topic_name.clone(), (key_schema, value_schema));
910
911 self.topics.insert(topic_name.clone(), topic_schema.clone());
913
914 let operation = ContractOperation {
916 id: topic_name.clone(),
917 name: topic_name.clone(),
918 operation_type: OperationType::KafkaTopic {
919 topic: topic_name.clone(),
920 key_schema: topic_schema.key_schema.as_ref().and_then(|s| s.schema_id.clone()),
921 value_schema: topic_schema.value_schema.schema_id.clone(),
922 },
923 input_schema: Some(serde_json::json!({
924 "key": topic_schema.key_schema.as_ref().map(|s| s.schema.clone()),
925 "value": topic_schema.value_schema.schema.clone(),
926 })),
927 output_schema: Some(serde_json::json!({
928 "key": topic_schema.key_schema.as_ref().map(|s| s.schema.clone()),
929 "value": topic_schema.value_schema.schema.clone(),
930 })),
931 metadata: {
932 let mut meta = HashMap::new();
933 if let Some(partitions) = topic_schema.partitions {
934 meta.insert("partitions".to_string(), partitions.to_string());
935 }
936 if let Some(ref desc) = topic_schema.description {
937 meta.insert("description".to_string(), desc.clone());
938 }
939 meta
940 },
941 };
942 self.operations_cache.insert(topic_name, operation);
943
944 Ok(())
945 }
946
947 pub fn remove_topic(&mut self, topic_name: &str) {
949 if self.topics.remove(topic_name).is_some() {
950 self.schema_cache.remove(topic_name);
951 self.operations_cache.remove(topic_name);
952 }
953 }
954
955 fn diff_contracts(&self, other: &KafkaContract) -> Result<ContractDiffResult, ContractError> {
957 let mut mismatches = Vec::new();
958
959 let all_topics: std::collections::HashSet<String> =
961 self.topics.keys().chain(other.topics.keys()).cloned().collect();
962
963 for topic_name in &all_topics {
965 if self.topics.contains_key(topic_name) && !other.topics.contains_key(topic_name) {
966 mismatches.push(Mismatch {
967 mismatch_type: MismatchType::EndpointNotFound,
968 path: topic_name.clone(),
969 method: None,
970 expected: Some(format!("Topic {} should exist", topic_name)),
971 actual: Some("Topic removed".to_string()),
972 description: format!("Topic {} was removed", topic_name),
973 severity: MismatchSeverity::Critical,
974 confidence: 1.0,
975 context: HashMap::new(),
976 });
977 }
978 }
979
980 for topic_name in &all_topics {
982 if !self.topics.contains_key(topic_name) && other.topics.contains_key(topic_name) {
983 mismatches.push(Mismatch {
984 mismatch_type: MismatchType::UnexpectedField,
985 path: topic_name.clone(),
986 method: None,
987 expected: None,
988 actual: Some(format!("New topic {}", topic_name)),
989 description: format!("New topic {} was added", topic_name),
990 severity: MismatchSeverity::Low,
991 confidence: 1.0,
992 context: HashMap::new(),
993 });
994 }
995 }
996
997 for topic_name in all_topics
999 .intersection(&self.topics.keys().cloned().collect::<std::collections::HashSet<_>>())
1000 {
1001 if let (Some(old_topic), Some(new_topic)) =
1002 (self.topics.get(topic_name), other.topics.get(topic_name))
1003 {
1004 if old_topic.key_schema.is_some() != new_topic.key_schema.is_some() {
1006 mismatches.push(Mismatch {
1007 mismatch_type: MismatchType::SchemaMismatch,
1008 path: format!("{}.key_schema", topic_name),
1009 method: None,
1010 expected: Some(if old_topic.key_schema.is_some() {
1011 "Key schema should exist".to_string()
1012 } else {
1013 "Key schema should not exist".to_string()
1014 }),
1015 actual: Some(if new_topic.key_schema.is_some() {
1016 "Key schema added".to_string()
1017 } else {
1018 "Key schema removed".to_string()
1019 }),
1020 description: format!(
1021 "Key schema presence changed for topic {}",
1022 topic_name
1023 ),
1024 severity: MismatchSeverity::High,
1025 confidence: 1.0,
1026 context: HashMap::new(),
1027 });
1028 } else if let (Some(old_key), Some(new_key)) =
1029 (&old_topic.key_schema, &new_topic.key_schema)
1030 {
1031 if old_key.schema != new_key.schema {
1032 let key_mismatches = MqttContract::compare_json_schemas(
1033 &old_key.schema,
1034 &new_key.schema,
1035 &format!("{}.key", topic_name),
1036 );
1037 mismatches.extend(key_mismatches);
1038 }
1039 }
1040
1041 if old_topic.value_schema.schema != new_topic.value_schema.schema {
1043 let value_mismatches = MqttContract::compare_json_schemas(
1044 &old_topic.value_schema.schema,
1045 &new_topic.value_schema.schema,
1046 &format!("{}.value", topic_name),
1047 );
1048 mismatches.extend(value_mismatches);
1049 }
1050
1051 if let Some(ref evolution_rules) = new_topic.evolution_rules {
1053 let has_breaking_changes = mismatches.iter().any(|m| {
1055 matches!(m.severity, MismatchSeverity::Critical | MismatchSeverity::High)
1056 });
1057
1058 if has_breaking_changes && !evolution_rules.allow_backward_compatible {
1059 mismatches.push(Mismatch {
1060 mismatch_type: MismatchType::SchemaMismatch,
1061 path: format!("{}.evolution_rules", topic_name),
1062 method: None,
1063 expected: Some("Backward compatible changes only".to_string()),
1064 actual: Some("Breaking changes detected".to_string()),
1065 description: format!(
1066 "Topic {} has breaking changes but evolution rules require backward compatibility",
1067 topic_name
1068 ),
1069 severity: MismatchSeverity::High,
1070 confidence: 1.0,
1071 context: HashMap::new(),
1072 });
1073 }
1074 }
1075 }
1076 }
1077
1078 let matches = mismatches.is_empty();
1079 let confidence = if matches { 1.0 } else { 0.8 };
1080
1081 Ok(ContractDiffResult {
1082 matches,
1083 confidence,
1084 mismatches,
1085 recommendations: Vec::new(),
1086 corrections: Vec::new(),
1087 metadata: mockforge_foundation::contract_diff_types::DiffMetadata {
1088 analyzed_at: chrono::Utc::now(),
1089 request_source: "kafka_contract_diff".to_string(),
1090 contract_version: Some(self.version.clone()),
1091 contract_format: "kafka_schema".to_string(),
1092 endpoint_path: "".to_string(),
1093 http_method: "".to_string(),
1094 request_count: 1,
1095 llm_provider: None,
1096 llm_model: None,
1097 },
1098 })
1099 }
1100
1101 fn validate_message_against_schema(
1103 &self,
1104 topic_name: &str,
1105 key: Option<&Value>,
1106 value: &Value,
1107 ) -> Result<ValidationResult, ContractError> {
1108 let (key_schema_opt, value_schema) = self
1109 .schema_cache
1110 .get(topic_name)
1111 .ok_or_else(|| ContractError::OperationNotFound(topic_name.to_string()))?;
1112
1113 let mut validation_errors = Vec::new();
1114
1115 if let (Some(key_value), Some(key_schema)) = (key, key_schema_opt) {
1117 for error in key_schema.iter_errors(key_value) {
1118 validation_errors.push(ValidationError {
1119 message: format!("Key validation error: {}", error),
1120 path: Some(format!("{}.key{}", topic_name, error.instance_path)),
1121 code: Some("KEY_SCHEMA_VALIDATION_ERROR".to_string()),
1122 });
1123 }
1124 }
1125
1126 for error in value_schema.iter_errors(value) {
1128 validation_errors.push(ValidationError {
1129 message: format!("Value validation error: {}", error),
1130 path: Some(format!("{}.value{}", topic_name, error.instance_path)),
1131 code: Some("VALUE_SCHEMA_VALIDATION_ERROR".to_string()),
1132 });
1133 }
1134
1135 Ok(ValidationResult {
1136 valid: validation_errors.is_empty(),
1137 errors: validation_errors,
1138 warnings: Vec::new(),
1139 })
1140 }
1141}
1142
1143#[async_trait::async_trait]
1144impl ProtocolContract for KafkaContract {
1145 fn protocol(&self) -> Protocol {
1146 Protocol::Kafka
1147 }
1148
1149 fn contract_id(&self) -> &str {
1150 &self.contract_id
1151 }
1152
1153 fn version(&self) -> &str {
1154 &self.version
1155 }
1156
1157 fn operations(&self) -> Vec<ContractOperation> {
1158 self.operations_cache.values().cloned().collect()
1159 }
1160
1161 fn get_operation(&self, operation_id: &str) -> Option<&ContractOperation> {
1162 self.operations_cache.get(operation_id)
1163 }
1164
1165 async fn diff(
1166 &self,
1167 other: &dyn ProtocolContract,
1168 ) -> Result<ContractDiffResult, ContractError> {
1169 if other.protocol() != Protocol::Kafka {
1170 return Err(ContractError::UnsupportedProtocol(other.protocol()));
1171 }
1172
1173 Err(ContractError::Other(
1174 "Direct comparison of KafkaContract instances requires type information. \
1175 Use KafkaContract::diff_contracts() for comparing two KafkaContract instances."
1176 .to_string(),
1177 ))
1178 }
1179
1180 async fn validate(
1181 &self,
1182 operation_id: &str,
1183 request: &ContractRequest,
1184 ) -> Result<ValidationResult, ContractError> {
1185 let value: Value = serde_json::from_slice(&request.payload)
1189 .map_err(|e| ContractError::SchemaValidation(format!("Invalid JSON: {}", e)))?;
1190
1191 let key = request.metadata.get("key").and_then(|k| serde_json::from_str::<Value>(k).ok());
1192
1193 self.validate_message_against_schema(operation_id, key.as_ref(), &value)
1195 }
1196
1197 fn get_schema(&self, operation_id: &str) -> Option<Value> {
1198 self.topics.get(operation_id).map(|topic| {
1199 serde_json::json!({
1200 "key": topic.key_schema.as_ref().map(|s| s.schema.clone()),
1201 "value": topic.value_schema.schema.clone(),
1202 })
1203 })
1204 }
1205
1206 fn to_json(&self) -> Result<Value, ContractError> {
1207 let topics: Vec<Value> = self
1208 .topics
1209 .values()
1210 .map(|topic| {
1211 serde_json::json!({
1212 "topic": topic.topic,
1213 "key_schema": topic.key_schema.as_ref().map(|s| {
1214 serde_json::json!({
1215 "format": s.format,
1216 "schema": s.schema,
1217 "schema_id": s.schema_id,
1218 "version": s.version,
1219 })
1220 }),
1221 "value_schema": {
1222 "format": topic.value_schema.format,
1223 "schema": topic.value_schema.schema,
1224 "schema_id": topic.value_schema.schema_id,
1225 "version": topic.value_schema.version,
1226 },
1227 "partitions": topic.partitions,
1228 "replication_factor": topic.replication_factor,
1229 "description": topic.description,
1230 "evolution_rules": topic.evolution_rules,
1231 })
1232 })
1233 .collect();
1234
1235 Ok(serde_json::json!({
1236 "contract_id": self.contract_id,
1237 "version": self.version,
1238 "protocol": "kafka",
1239 "topics": topics,
1240 "metadata": self.metadata,
1241 }))
1242 }
1243}
1244
1245pub fn diff_kafka_contracts(
1247 old_contract: &KafkaContract,
1248 new_contract: &KafkaContract,
1249) -> Result<ContractDiffResult, ContractError> {
1250 old_contract.diff_contracts(new_contract)
1251}
1252
1253#[cfg(test)]
1254mod tests {
1255 use super::*;
1256
1257 #[test]
1258 fn test_mqtt_contract_creation() {
1259 let contract = MqttContract::new("test-contract".to_string(), "1.0.0".to_string());
1260 assert_eq!(contract.contract_id(), "test-contract");
1261 assert_eq!(contract.version(), "1.0.0");
1262 }
1263
1264 #[test]
1265 fn test_kafka_contract_creation() {
1266 let contract = KafkaContract::new("test-contract".to_string(), "1.0.0".to_string());
1267 assert_eq!(contract.contract_id(), "test-contract");
1268 assert_eq!(contract.version(), "1.0.0");
1269 }
1270}