Skip to main content

mockforge_contracts/contract_drift/
mqtt_kafka_contracts.rs

1//! MQTT and Kafka contract implementations for protocol-agnostic contract drift detection
2//!
3//! This module provides `MqttContract` and `KafkaContract` structs that implement the
4//! `ProtocolContract` trait for MQTT and Kafka protocols, enabling drift detection and
5//! analysis for topic-based messaging systems.
6
7use crate::contract_drift::protocol_contracts::{
8    ContractError, ContractOperation, ContractRequest, OperationType, ProtocolContract,
9    ValidationError, ValidationResult,
10};
11use jsonschema::{self, Draft, Validator as JSONSchema};
12use mockforge_foundation::contract_diff_types::{
13    ContractDiffResult, Mismatch, MismatchSeverity, MismatchType,
14};
15use mockforge_foundation::protocol::Protocol;
16use serde_json::Value;
17use std::collections::HashMap;
18
19// ============================================================================
20// MQTT Contract
21// ============================================================================
22
23// MqttTopicSchema re-exported from foundation.
24pub use mockforge_foundation::protocol_contract_types::MqttTopicSchema;
25
26/// MQTT contract implementation
27///
28/// Defines topic schemas for MQTT messaging, enabling schema validation
29/// and drift detection for IoT and pub/sub systems.
30pub struct MqttContract {
31    /// Unique identifier for this contract
32    contract_id: String,
33    /// Contract version
34    version: String,
35    /// Map of topic names to topic schemas
36    topics: HashMap<String, MqttTopicSchema>,
37    /// Compiled JSON schemas for validation (cached)
38    schema_cache: HashMap<String, JSONSchema>,
39    /// Cached contract operations for quick lookup
40    operations_cache: HashMap<String, ContractOperation>,
41    /// Contract metadata
42    metadata: HashMap<String, String>,
43}
44
45impl MqttContract {
46    /// Create a new MQTT contract
47    pub fn new(contract_id: String, version: String) -> Self {
48        Self {
49            contract_id,
50            version,
51            topics: HashMap::new(),
52            schema_cache: HashMap::new(),
53            operations_cache: HashMap::new(),
54            metadata: HashMap::new(),
55        }
56    }
57
58    /// Add a topic schema to the contract
59    pub fn add_topic(&mut self, topic_schema: MqttTopicSchema) -> Result<(), ContractError> {
60        let topic_name = topic_schema.topic.clone();
61
62        // Compile and cache the JSON schema for validation
63        let schema = jsonschema::options()
64            .with_draft(Draft::Draft7)
65            .build(&topic_schema.schema)
66            .map_err(|e| ContractError::SchemaValidation(format!("Invalid JSON schema: {}", e)))?;
67        self.schema_cache.insert(topic_name.clone(), schema);
68
69        // Add to topics
70        self.topics.insert(topic_name.clone(), topic_schema.clone());
71
72        // Cache the contract operation
73        let operation = ContractOperation {
74            id: topic_name.clone(),
75            name: topic_name.clone(),
76            operation_type: OperationType::MqttTopic {
77                topic: topic_name.clone(),
78                qos: topic_schema.qos,
79            },
80            input_schema: Some(topic_schema.schema.clone()),
81            output_schema: Some(topic_schema.schema.clone()), // MQTT is one-way, but schema applies to both publish/subscribe
82            metadata: {
83                let mut meta = HashMap::new();
84                if let Some(retained) = topic_schema.retained {
85                    meta.insert("retained".to_string(), retained.to_string());
86                }
87                if let Some(ref desc) = topic_schema.description {
88                    meta.insert("description".to_string(), desc.clone());
89                }
90                meta
91            },
92        };
93        self.operations_cache.insert(topic_name, operation);
94
95        Ok(())
96    }
97
98    /// Remove a topic from the contract
99    pub fn remove_topic(&mut self, topic_name: &str) {
100        if self.topics.remove(topic_name).is_some() {
101            self.schema_cache.remove(topic_name);
102            self.operations_cache.remove(topic_name);
103        }
104    }
105
106    /// Compare two MQTT contracts and detect differences
107    fn diff_contracts(&self, other: &MqttContract) -> Result<ContractDiffResult, ContractError> {
108        let mut mismatches = Vec::new();
109
110        // Collect all topic names
111        let all_topics: std::collections::HashSet<String> =
112            self.topics.keys().chain(other.topics.keys()).cloned().collect();
113
114        // Check for removed topics (breaking change)
115        for topic_name in &all_topics {
116            if self.topics.contains_key(topic_name) && !other.topics.contains_key(topic_name) {
117                mismatches.push(Mismatch {
118                    mismatch_type: MismatchType::EndpointNotFound,
119                    path: topic_name.clone(),
120                    method: None,
121                    expected: Some(format!("Topic {} should exist", topic_name)),
122                    actual: Some("Topic removed".to_string()),
123                    description: format!("Topic {} was removed", topic_name),
124                    severity: MismatchSeverity::Critical,
125                    confidence: 1.0,
126                    context: HashMap::new(),
127                });
128            }
129        }
130
131        // Check for added topics (non-breaking)
132        for topic_name in &all_topics {
133            if !self.topics.contains_key(topic_name) && other.topics.contains_key(topic_name) {
134                mismatches.push(Mismatch {
135                    mismatch_type: MismatchType::UnexpectedField,
136                    path: topic_name.clone(),
137                    method: None,
138                    expected: None,
139                    actual: Some(format!("New topic {}", topic_name)),
140                    description: format!("New topic {} was added", topic_name),
141                    severity: MismatchSeverity::Low,
142                    confidence: 1.0,
143                    context: HashMap::new(),
144                });
145            }
146        }
147
148        // Compare topic schemas for topics that exist in both
149        for topic_name in all_topics
150            .intersection(&self.topics.keys().cloned().collect::<std::collections::HashSet<_>>())
151        {
152            if let (Some(old_topic), Some(new_topic)) =
153                (self.topics.get(topic_name), other.topics.get(topic_name))
154            {
155                // Compare QoS changes
156                if old_topic.qos != new_topic.qos {
157                    mismatches.push(Mismatch {
158                        mismatch_type: MismatchType::SchemaMismatch,
159                        path: format!("{}.qos", topic_name),
160                        method: None,
161                        expected: old_topic.qos.map(|q| format!("QoS: {}", q)),
162                        actual: new_topic.qos.map(|q| format!("QoS: {}", q)),
163                        description: format!(
164                            "QoS changed for topic {}: {:?} -> {:?}",
165                            topic_name, old_topic.qos, new_topic.qos
166                        ),
167                        severity: MismatchSeverity::Medium,
168                        confidence: 1.0,
169                        context: HashMap::new(),
170                    });
171                }
172
173                // Detect schema format and compare schemas
174                let old_format = Self::detect_schema_format(&old_topic.schema);
175                let new_format = Self::detect_schema_format(&new_topic.schema);
176
177                // Check for schema format changes (breaking change)
178                if old_format != new_format {
179                    let mut context = HashMap::new();
180                    context.insert("is_additive".to_string(), serde_json::json!(false));
181                    context.insert("is_breaking".to_string(), serde_json::json!(true));
182                    context.insert(
183                        "change_category".to_string(),
184                        serde_json::json!("schema_format_changed"),
185                    );
186                    context.insert("topic".to_string(), serde_json::json!(topic_name));
187                    context.insert("old_format".to_string(), serde_json::json!(old_format));
188                    context.insert("new_format".to_string(), serde_json::json!(new_format));
189
190                    mismatches.push(Mismatch {
191                        mismatch_type: MismatchType::SchemaMismatch,
192                        path: format!("{}.schema_format", topic_name),
193                        method: None,
194                        expected: Some(format!("Schema format: {}", old_format)),
195                        actual: Some(format!("Schema format: {}", new_format)),
196                        description: format!(
197                            "Schema format changed from {} to {} for topic {}",
198                            old_format, new_format, topic_name
199                        ),
200                        severity: MismatchSeverity::High,
201                        confidence: 1.0,
202                        context,
203                    });
204                }
205
206                // Compare schemas based on format
207                let schema_mismatches = match (old_format.as_str(), new_format.as_str()) {
208                    ("json_schema", "json_schema") => {
209                        Self::compare_json_schemas(&old_topic.schema, &new_topic.schema, topic_name)
210                    }
211                    ("avro", "avro") => {
212                        Self::compare_avro_schemas(&old_topic.schema, &new_topic.schema, topic_name)
213                            .unwrap_or_else(|_| Vec::new()) // Fallback to empty on error
214                    }
215                    ("json_shape", "json_shape") => Self::compare_json_shape_schemas(
216                        &old_topic.schema,
217                        &new_topic.schema,
218                        topic_name,
219                    ),
220                    _ => Vec::new(), // Different formats already handled above
221                };
222                mismatches.extend(schema_mismatches);
223            }
224        }
225
226        let matches = mismatches.is_empty();
227        let confidence = if matches { 1.0 } else { 0.8 };
228
229        Ok(ContractDiffResult {
230            matches,
231            confidence,
232            mismatches,
233            recommendations: Vec::new(),
234            corrections: Vec::new(),
235            metadata: mockforge_foundation::contract_diff_types::DiffMetadata {
236                analyzed_at: chrono::Utc::now(),
237                request_source: "mqtt_contract_diff".to_string(),
238                contract_version: Some(self.version.clone()),
239                contract_format: "mqtt_schema".to_string(),
240                endpoint_path: "".to_string(),
241                http_method: "".to_string(),
242                request_count: 1,
243                llm_provider: None,
244                llm_model: None,
245            },
246        })
247    }
248
249    /// Detect the schema format (JSON Schema, Avro, or JSON-shape)
250    fn detect_schema_format(schema: &Value) -> String {
251        // Check for Avro schema indicators
252        if schema.get("type").and_then(|v| v.as_str()) == Some("record")
253            || schema.get("fields").is_some()
254        {
255            return "avro".to_string();
256        }
257
258        // Check for JSON Schema indicators
259        if schema.get("$schema").is_some()
260            || (schema.get("type").is_some() && schema.get("properties").is_some())
261            || schema.get("required").is_some()
262        {
263            return "json_schema".to_string();
264        }
265
266        // Check for JSON-shape (simple object with type strings)
267        if let Some(obj) = schema.as_object() {
268            let all_strings = obj.values().all(|v| {
269                v.as_str().is_some()
270                    || (v.is_object() && v.get("type").and_then(|t| t.as_str()).is_some())
271            });
272            if all_strings && !obj.is_empty() {
273                return "json_shape".to_string();
274            }
275        }
276
277        // Default to JSON Schema if unclear
278        "json_schema".to_string()
279    }
280
281    /// Compare Avro schemas and identify differences
282    fn compare_avro_schemas(
283        old_schema: &Value,
284        new_schema: &Value,
285        path_prefix: &str,
286    ) -> Result<Vec<Mismatch>, ContractError> {
287        let mut mismatches = Vec::new();
288
289        // Extract fields from Avro schema
290        let old_fields = old_schema.get("fields").and_then(|v| v.as_array()).ok_or_else(|| {
291            ContractError::SchemaValidation("Invalid Avro schema: missing fields".to_string())
292        })?;
293        let new_fields = new_schema.get("fields").and_then(|v| v.as_array()).ok_or_else(|| {
294            ContractError::SchemaValidation("Invalid Avro schema: missing fields".to_string())
295        })?;
296
297        // Build field maps by name
298        let old_fields_map: HashMap<String, &Value> = old_fields
299            .iter()
300            .filter_map(|f| {
301                f.get("name").and_then(|n| n.as_str()).map(|name| (name.to_string(), f))
302            })
303            .collect();
304        let new_fields_map: HashMap<String, &Value> = new_fields
305            .iter()
306            .filter_map(|f| {
307                f.get("name").and_then(|n| n.as_str()).map(|name| (name.to_string(), f))
308            })
309            .collect();
310
311        // Check for removed fields (breaking change)
312        for field_name in old_fields_map.keys() {
313            if !new_fields_map.contains_key(field_name) {
314                let mut context = HashMap::new();
315                context.insert("is_additive".to_string(), serde_json::json!(false));
316                context.insert("is_breaking".to_string(), serde_json::json!(true));
317                context.insert("change_category".to_string(), serde_json::json!("field_removed"));
318                context.insert("field_name".to_string(), serde_json::json!(field_name));
319                context.insert("schema_format".to_string(), serde_json::json!("avro"));
320
321                mismatches.push(Mismatch {
322                    mismatch_type: MismatchType::EndpointNotFound,
323                    path: format!("{}.{}", path_prefix, field_name),
324                    method: None,
325                    expected: Some(format!("Field {} should exist", field_name)),
326                    actual: Some("Field removed".to_string()),
327                    description: format!("Avro field {} was removed", field_name),
328                    severity: MismatchSeverity::High,
329                    confidence: 1.0,
330                    context,
331                });
332            }
333        }
334
335        // Check for added fields
336        for (field_name, new_field) in &new_fields_map {
337            if !old_fields_map.contains_key(field_name) {
338                // In Avro, fields without defaults are required
339                let has_default = new_field.get("default").is_some();
340                let is_required = !has_default;
341
342                let mut context = HashMap::new();
343                context.insert("is_additive".to_string(), serde_json::json!(!is_required));
344                context.insert("is_breaking".to_string(), serde_json::json!(is_required));
345                context.insert(
346                    "change_category".to_string(),
347                    serde_json::json!(if is_required {
348                        "required_field_added"
349                    } else {
350                        "field_added"
351                    }),
352                );
353                context.insert("field_name".to_string(), serde_json::json!(field_name));
354                context.insert("schema_format".to_string(), serde_json::json!("avro"));
355                context.insert("has_default".to_string(), serde_json::json!(has_default));
356
357                mismatches.push(Mismatch {
358                    mismatch_type: if is_required {
359                        MismatchType::MissingRequiredField
360                    } else {
361                        MismatchType::UnexpectedField
362                    },
363                    path: format!("{}.{}", path_prefix, field_name),
364                    method: None,
365                    expected: None,
366                    actual: Some(format!(
367                        "New Avro field {} ({})",
368                        field_name,
369                        if is_required { "required" } else { "optional" }
370                    )),
371                    description: format!(
372                        "New Avro field {} was added ({})",
373                        field_name,
374                        if is_required {
375                            "required - breaking"
376                        } else {
377                            "optional - additive"
378                        }
379                    ),
380                    severity: if is_required {
381                        MismatchSeverity::High
382                    } else {
383                        MismatchSeverity::Low
384                    },
385                    confidence: 1.0,
386                    context,
387                });
388            } else {
389                // Check for type changes
390                let old_field = old_fields_map[field_name];
391                let old_type = old_field.get("type");
392                let new_type = new_field.get("type");
393
394                if old_type != new_type {
395                    let mut context = HashMap::new();
396                    context.insert("is_additive".to_string(), serde_json::json!(false));
397                    context.insert("is_breaking".to_string(), serde_json::json!(true));
398                    context.insert(
399                        "change_category".to_string(),
400                        serde_json::json!("field_type_changed"),
401                    );
402                    context.insert("field_name".to_string(), serde_json::json!(field_name));
403                    context.insert("schema_format".to_string(), serde_json::json!("avro"));
404                    context.insert("old_type".to_string(), serde_json::json!(old_type));
405                    context.insert("new_type".to_string(), serde_json::json!(new_type));
406
407                    mismatches.push(Mismatch {
408                        mismatch_type: MismatchType::TypeMismatch,
409                        path: format!("{}.{}", path_prefix, field_name),
410                        method: None,
411                        expected: Some(format!("Type: {:?}", old_type)),
412                        actual: Some(format!("Type: {:?}", new_type)),
413                        description: format!("Avro field {} type changed", field_name),
414                        severity: MismatchSeverity::High,
415                        confidence: 1.0,
416                        context,
417                    });
418                }
419            }
420        }
421
422        Ok(mismatches)
423    }
424
425    /// Compare JSON-shape schemas (simplified format)
426    fn compare_json_shape_schemas(
427        old_schema: &Value,
428        new_schema: &Value,
429        path_prefix: &str,
430    ) -> Vec<Mismatch> {
431        let mut mismatches = Vec::new();
432
433        if let (Some(old_obj), Some(new_obj)) = (old_schema.as_object(), new_schema.as_object()) {
434            // Check for removed properties (breaking)
435            for (prop_name, _) in old_obj {
436                if !new_obj.contains_key(prop_name) {
437                    let mut context = HashMap::new();
438                    context.insert("is_additive".to_string(), serde_json::json!(false));
439                    context.insert("is_breaking".to_string(), serde_json::json!(true));
440                    context.insert(
441                        "change_category".to_string(),
442                        serde_json::json!("property_removed"),
443                    );
444                    context.insert("field_name".to_string(), serde_json::json!(prop_name));
445                    context.insert("schema_format".to_string(), serde_json::json!("json_shape"));
446
447                    mismatches.push(Mismatch {
448                        mismatch_type: MismatchType::UnexpectedField,
449                        path: format!("{}.{}", path_prefix, prop_name),
450                        method: None,
451                        expected: Some(format!("Property {} should exist", prop_name)),
452                        actual: Some("Property removed".to_string()),
453                        description: format!("Property {} was removed", prop_name),
454                        severity: MismatchSeverity::High,
455                        confidence: 1.0,
456                        context,
457                    });
458                }
459            }
460
461            // Check for added properties (additive)
462            for (prop_name, _) in new_obj {
463                if !old_obj.contains_key(prop_name) {
464                    let mut context = HashMap::new();
465                    context.insert("is_additive".to_string(), serde_json::json!(true));
466                    context.insert("is_breaking".to_string(), serde_json::json!(false));
467                    context
468                        .insert("change_category".to_string(), serde_json::json!("property_added"));
469                    context.insert("field_name".to_string(), serde_json::json!(prop_name));
470                    context.insert("schema_format".to_string(), serde_json::json!("json_shape"));
471
472                    mismatches.push(Mismatch {
473                        mismatch_type: MismatchType::UnexpectedField,
474                        path: format!("{}.{}", path_prefix, prop_name),
475                        method: None,
476                        expected: None,
477                        actual: Some(format!("New property {}", prop_name)),
478                        description: format!("New property {} was added", prop_name),
479                        severity: MismatchSeverity::Low,
480                        confidence: 1.0,
481                        context,
482                    });
483                } else {
484                    // Check for type changes
485                    let old_type = old_obj[prop_name]
486                        .as_str()
487                        .or_else(|| old_obj[prop_name].get("type").and_then(|t| t.as_str()));
488                    let new_type = new_obj[prop_name]
489                        .as_str()
490                        .or_else(|| new_obj[prop_name].get("type").and_then(|t| t.as_str()));
491
492                    if old_type != new_type {
493                        let mut context = HashMap::new();
494                        context.insert("is_additive".to_string(), serde_json::json!(false));
495                        context.insert("is_breaking".to_string(), serde_json::json!(true));
496                        context.insert(
497                            "change_category".to_string(),
498                            serde_json::json!("property_type_changed"),
499                        );
500                        context.insert("field_name".to_string(), serde_json::json!(prop_name));
501                        context
502                            .insert("schema_format".to_string(), serde_json::json!("json_shape"));
503                        context.insert("old_type".to_string(), serde_json::json!(old_type));
504                        context.insert("new_type".to_string(), serde_json::json!(new_type));
505
506                        mismatches.push(Mismatch {
507                            mismatch_type: MismatchType::TypeMismatch,
508                            path: format!("{}.{}", path_prefix, prop_name),
509                            method: None,
510                            expected: old_type.map(|t| format!("Type: {}", t)),
511                            actual: new_type.map(|t| format!("Type: {}", t)),
512                            description: format!("Property {} type changed", prop_name),
513                            severity: MismatchSeverity::High,
514                            confidence: 1.0,
515                            context,
516                        });
517                    }
518                }
519            }
520        }
521
522        mismatches
523    }
524
525    /// Compare two JSON schemas and identify differences (shared with Kafka)
526    fn compare_json_schemas(
527        old_schema: &Value,
528        new_schema: &Value,
529        path_prefix: &str,
530    ) -> Vec<Mismatch> {
531        let mut mismatches = Vec::new();
532
533        // Check for required fields changes
534        if let (Some(old_required), Some(new_required)) = (
535            old_schema.get("required").and_then(|v| v.as_array()),
536            new_schema.get("required").and_then(|v| v.as_array()),
537        ) {
538            let old_required_set: std::collections::HashSet<&str> =
539                old_required.iter().filter_map(|v| v.as_str()).collect();
540            let new_required_set: std::collections::HashSet<&str> =
541                new_required.iter().filter_map(|v| v.as_str()).collect();
542
543            // Check for newly required fields (breaking change)
544            for new_req in new_required_set.difference(&old_required_set) {
545                let mut context = HashMap::new();
546                context.insert("is_additive".to_string(), serde_json::json!(false));
547                context.insert("is_breaking".to_string(), serde_json::json!(true));
548                context.insert(
549                    "change_category".to_string(),
550                    serde_json::json!("required_field_added"),
551                );
552                context.insert("field_name".to_string(), serde_json::json!(new_req));
553                context.insert("schema_format".to_string(), serde_json::json!("json_schema"));
554
555                mismatches.push(Mismatch {
556                    mismatch_type: MismatchType::MissingRequiredField,
557                    path: format!("{}.{}", path_prefix, new_req),
558                    method: None,
559                    expected: Some(format!("Field {} should be optional", new_req)),
560                    actual: Some(format!("Field {} is now required", new_req)),
561                    description: format!("Field {} became required", new_req),
562                    severity: MismatchSeverity::Critical,
563                    confidence: 1.0,
564                    context,
565                });
566            }
567
568            // Check for removed required fields (additive - field is now optional)
569            for removed_req in old_required_set.difference(&new_required_set) {
570                let mut context = HashMap::new();
571                context.insert("is_additive".to_string(), serde_json::json!(true));
572                context.insert("is_breaking".to_string(), serde_json::json!(false));
573                context.insert(
574                    "change_category".to_string(),
575                    serde_json::json!("required_field_removed"),
576                );
577                context.insert("field_name".to_string(), serde_json::json!(removed_req));
578                context.insert("schema_format".to_string(), serde_json::json!("json_schema"));
579
580                mismatches.push(Mismatch {
581                    mismatch_type: MismatchType::UnexpectedField,
582                    path: format!("{}.{}", path_prefix, removed_req),
583                    method: None,
584                    expected: Some(format!("Field {} was required", removed_req)),
585                    actual: Some(format!("Field {} is now optional", removed_req)),
586                    description: format!("Field {} is no longer required", removed_req),
587                    severity: MismatchSeverity::Low,
588                    confidence: 1.0,
589                    context,
590                });
591            }
592        }
593
594        // Check for property type changes
595        if let (Some(old_props), Some(new_props)) = (
596            old_schema.get("properties").and_then(|v| v.as_object()),
597            new_schema.get("properties").and_then(|v| v.as_object()),
598        ) {
599            for (prop_name, new_prop_schema) in new_props {
600                if let Some(old_prop_schema) = old_props.get(prop_name) {
601                    if let (Some(old_type), Some(new_type)) = (
602                        old_prop_schema.get("type").and_then(|v| v.as_str()),
603                        new_prop_schema.get("type").and_then(|v| v.as_str()),
604                    ) {
605                        if old_type != new_type {
606                            let mut context = HashMap::new();
607                            context.insert("is_additive".to_string(), serde_json::json!(false));
608                            context.insert("is_breaking".to_string(), serde_json::json!(true));
609                            context.insert(
610                                "change_category".to_string(),
611                                serde_json::json!("property_type_changed"),
612                            );
613                            context.insert("field_name".to_string(), serde_json::json!(prop_name));
614                            context.insert("old_type".to_string(), serde_json::json!(old_type));
615                            context.insert("new_type".to_string(), serde_json::json!(new_type));
616                            context.insert(
617                                "schema_format".to_string(),
618                                serde_json::json!("json_schema"),
619                            );
620
621                            mismatches.push(Mismatch {
622                                mismatch_type: MismatchType::TypeMismatch,
623                                path: format!("{}.{}", path_prefix, prop_name),
624                                method: None,
625                                expected: Some(format!("Type: {}", old_type)),
626                                actual: Some(format!("Type: {}", new_type)),
627                                description: format!(
628                                    "Property {} type changed from {} to {}",
629                                    prop_name, old_type, new_type
630                                ),
631                                severity: MismatchSeverity::High,
632                                confidence: 1.0,
633                                context,
634                            });
635                        }
636                    }
637                }
638            }
639
640            // Check for removed properties (breaking change)
641            for prop_name in old_props.keys() {
642                if !new_props.contains_key(prop_name) {
643                    let mut context = HashMap::new();
644                    context.insert("is_additive".to_string(), serde_json::json!(false));
645                    context.insert("is_breaking".to_string(), serde_json::json!(true));
646                    context.insert(
647                        "change_category".to_string(),
648                        serde_json::json!("property_removed"),
649                    );
650                    context.insert("field_name".to_string(), serde_json::json!(prop_name));
651                    context.insert("schema_format".to_string(), serde_json::json!("json_schema"));
652
653                    mismatches.push(Mismatch {
654                        mismatch_type: MismatchType::UnexpectedField,
655                        path: format!("{}.{}", path_prefix, prop_name),
656                        method: None,
657                        expected: Some(format!("Property {} should exist", prop_name)),
658                        actual: Some("Property removed".to_string()),
659                        description: format!("Property {} was removed", prop_name),
660                        severity: MismatchSeverity::High,
661                        confidence: 1.0,
662                        context,
663                    });
664                }
665            }
666
667            // Check for new properties (additive change)
668            for prop_name in new_props.keys() {
669                if !old_props.contains_key(prop_name) {
670                    let mut context = HashMap::new();
671                    context.insert("is_additive".to_string(), serde_json::json!(true));
672                    context.insert("is_breaking".to_string(), serde_json::json!(false));
673                    context
674                        .insert("change_category".to_string(), serde_json::json!("property_added"));
675                    context.insert("field_name".to_string(), serde_json::json!(prop_name));
676                    context.insert("schema_format".to_string(), serde_json::json!("json_schema"));
677
678                    mismatches.push(Mismatch {
679                        mismatch_type: MismatchType::UnexpectedField,
680                        path: format!("{}.{}", path_prefix, prop_name),
681                        method: None,
682                        expected: None,
683                        actual: Some(format!("New property {}", prop_name)),
684                        description: format!("New property {} was added", prop_name),
685                        severity: MismatchSeverity::Low,
686                        confidence: 1.0,
687                        context,
688                    });
689                }
690            }
691        }
692
693        mismatches
694    }
695
696    /// Validate a message against a topic schema
697    fn validate_message_against_schema(
698        &self,
699        topic_name: &str,
700        message: &Value,
701    ) -> Result<ValidationResult, ContractError> {
702        let schema = self
703            .schema_cache
704            .get(topic_name)
705            .ok_or_else(|| ContractError::OperationNotFound(topic_name.to_string()))?;
706
707        // Use iter_errors for validation
708        let mut validation_errors = Vec::new();
709        for error in schema.iter_errors(message) {
710            validation_errors.push(ValidationError {
711                message: error.to_string(),
712                path: Some(error.instance_path.to_string()),
713                code: Some("SCHEMA_VALIDATION_ERROR".to_string()),
714            });
715        }
716
717        Ok(ValidationResult {
718            valid: validation_errors.is_empty(),
719            errors: validation_errors,
720            warnings: Vec::new(),
721        })
722    }
723}
724
725#[async_trait::async_trait]
726impl ProtocolContract for MqttContract {
727    fn protocol(&self) -> Protocol {
728        Protocol::Mqtt
729    }
730
731    fn contract_id(&self) -> &str {
732        &self.contract_id
733    }
734
735    fn version(&self) -> &str {
736        &self.version
737    }
738
739    fn operations(&self) -> Vec<ContractOperation> {
740        self.operations_cache.values().cloned().collect()
741    }
742
743    fn get_operation(&self, operation_id: &str) -> Option<&ContractOperation> {
744        self.operations_cache.get(operation_id)
745    }
746
747    async fn diff(
748        &self,
749        other: &dyn ProtocolContract,
750    ) -> Result<ContractDiffResult, ContractError> {
751        if other.protocol() != Protocol::Mqtt {
752            return Err(ContractError::UnsupportedProtocol(other.protocol()));
753        }
754
755        Err(ContractError::Other(
756            "Direct comparison of MqttContract instances requires type information. \
757             Use MqttContract::diff_contracts() for comparing two MqttContract instances."
758                .to_string(),
759        ))
760    }
761
762    async fn validate(
763        &self,
764        operation_id: &str,
765        request: &ContractRequest,
766    ) -> Result<ValidationResult, ContractError> {
767        // Parse the message payload as JSON
768        let message: Value = serde_json::from_slice(&request.payload)
769            .map_err(|e| ContractError::SchemaValidation(format!("Invalid JSON: {}", e)))?;
770
771        // Validate against the topic schema
772        self.validate_message_against_schema(operation_id, &message)
773    }
774
775    fn get_schema(&self, operation_id: &str) -> Option<Value> {
776        self.topics.get(operation_id).map(|t| t.schema.clone())
777    }
778
779    fn to_json(&self) -> Result<Value, ContractError> {
780        let topics: Vec<Value> = self
781            .topics
782            .values()
783            .map(|topic| {
784                serde_json::json!({
785                    "topic": topic.topic,
786                    "qos": topic.qos,
787                    "schema": topic.schema,
788                    "retained": topic.retained,
789                    "description": topic.description,
790                    "example": topic.example,
791                })
792            })
793            .collect();
794
795        Ok(serde_json::json!({
796            "contract_id": self.contract_id,
797            "version": self.version,
798            "protocol": "mqtt",
799            "topics": topics,
800            "metadata": self.metadata,
801        }))
802    }
803}
804
805/// Helper function to compare two MqttContract instances
806pub fn diff_mqtt_contracts(
807    old_contract: &MqttContract,
808    new_contract: &MqttContract,
809) -> Result<ContractDiffResult, ContractError> {
810    old_contract.diff_contracts(new_contract)
811}
812
813// ============================================================================
814// Kafka Contract
815// ============================================================================
816
817// SchemaFormat, KafkaTopicSchema, TopicSchema, EvolutionRules re-exported from foundation.
818pub use mockforge_foundation::protocol_contract_types::{
819    EvolutionRules, KafkaTopicSchema, SchemaFormat, TopicSchema,
820};
821
822/// Kafka contract implementation
823///
824/// Defines topic schemas for Kafka messaging, enabling schema validation
825/// and drift detection for event streaming systems.
826pub struct KafkaContract {
827    /// Unique identifier for this contract
828    contract_id: String,
829    /// Contract version
830    version: String,
831    /// Map of topic names to topic schemas
832    topics: HashMap<String, KafkaTopicSchema>,
833    /// Compiled JSON schemas for validation (cached)
834    schema_cache: HashMap<String, (Option<JSONSchema>, JSONSchema)>, // (key_schema, value_schema)
835    /// Cached contract operations for quick lookup
836    operations_cache: HashMap<String, ContractOperation>,
837    /// Contract metadata
838    metadata: HashMap<String, String>,
839}
840
841impl KafkaContract {
842    /// Create a new Kafka contract
843    pub fn new(contract_id: String, version: String) -> Self {
844        Self {
845            contract_id,
846            version,
847            topics: HashMap::new(),
848            schema_cache: HashMap::new(),
849            operations_cache: HashMap::new(),
850            metadata: HashMap::new(),
851        }
852    }
853
854    /// Add a topic schema to the contract
855    pub fn add_topic(&mut self, topic_schema: KafkaTopicSchema) -> Result<(), ContractError> {
856        let topic_name = topic_schema.topic.clone();
857
858        // Compile and cache JSON schemas for validation
859        // Note: Avro and Protobuf schemas would need additional processing
860        let value_schema = match topic_schema.value_schema.format {
861            SchemaFormat::Json => jsonschema::options()
862                .with_draft(Draft::Draft7)
863                .build(&topic_schema.value_schema.schema)
864                .map_err(|e| {
865                    ContractError::SchemaValidation(format!("Invalid JSON schema: {}", e))
866                })?,
867            SchemaFormat::Avro | SchemaFormat::Protobuf => {
868                // Fallback to permissive validation for non-JSON schema formats.
869                // This preserves contract registration while explicit Avro/Protobuf
870                // validation can be layered in when parser support is available.
871                jsonschema::options()
872                    .with_draft(Draft::Draft7)
873                    .build(&serde_json::json!({}))
874                    .map_err(|e| {
875                        ContractError::SchemaValidation(format!(
876                            "Failed to build fallback schema for {:?}: {}",
877                            topic_schema.value_schema.format, e
878                        ))
879                    })?
880            }
881        };
882
883        let key_schema = if let Some(ref key_schema_def) = topic_schema.key_schema {
884            match key_schema_def.format {
885                SchemaFormat::Json => Some(
886                    jsonschema::options()
887                        .with_draft(Draft::Draft7)
888                        .build(&key_schema_def.schema)
889                        .map_err(|e| {
890                        ContractError::SchemaValidation(format!("Invalid JSON schema: {}", e))
891                    })?,
892                ),
893                SchemaFormat::Avro | SchemaFormat::Protobuf => Some(
894                    jsonschema::options()
895                        .with_draft(Draft::Draft7)
896                        .build(&serde_json::json!({}))
897                        .map_err(|e| {
898                        ContractError::SchemaValidation(format!(
899                            "Failed to build fallback key schema for {:?}: {}",
900                            key_schema_def.format, e
901                        ))
902                    })?,
903                ),
904            }
905        } else {
906            None
907        };
908
909        self.schema_cache.insert(topic_name.clone(), (key_schema, value_schema));
910
911        // Add to topics
912        self.topics.insert(topic_name.clone(), topic_schema.clone());
913
914        // Cache the contract operation
915        let operation = ContractOperation {
916            id: topic_name.clone(),
917            name: topic_name.clone(),
918            operation_type: OperationType::KafkaTopic {
919                topic: topic_name.clone(),
920                key_schema: topic_schema.key_schema.as_ref().and_then(|s| s.schema_id.clone()),
921                value_schema: topic_schema.value_schema.schema_id.clone(),
922            },
923            input_schema: Some(serde_json::json!({
924                "key": topic_schema.key_schema.as_ref().map(|s| s.schema.clone()),
925                "value": topic_schema.value_schema.schema.clone(),
926            })),
927            output_schema: Some(serde_json::json!({
928                "key": topic_schema.key_schema.as_ref().map(|s| s.schema.clone()),
929                "value": topic_schema.value_schema.schema.clone(),
930            })),
931            metadata: {
932                let mut meta = HashMap::new();
933                if let Some(partitions) = topic_schema.partitions {
934                    meta.insert("partitions".to_string(), partitions.to_string());
935                }
936                if let Some(ref desc) = topic_schema.description {
937                    meta.insert("description".to_string(), desc.clone());
938                }
939                meta
940            },
941        };
942        self.operations_cache.insert(topic_name, operation);
943
944        Ok(())
945    }
946
947    /// Remove a topic from the contract
948    pub fn remove_topic(&mut self, topic_name: &str) {
949        if self.topics.remove(topic_name).is_some() {
950            self.schema_cache.remove(topic_name);
951            self.operations_cache.remove(topic_name);
952        }
953    }
954
955    /// Compare two Kafka contracts and detect differences
956    fn diff_contracts(&self, other: &KafkaContract) -> Result<ContractDiffResult, ContractError> {
957        let mut mismatches = Vec::new();
958
959        // Collect all topic names
960        let all_topics: std::collections::HashSet<String> =
961            self.topics.keys().chain(other.topics.keys()).cloned().collect();
962
963        // Check for removed topics (breaking change)
964        for topic_name in &all_topics {
965            if self.topics.contains_key(topic_name) && !other.topics.contains_key(topic_name) {
966                mismatches.push(Mismatch {
967                    mismatch_type: MismatchType::EndpointNotFound,
968                    path: topic_name.clone(),
969                    method: None,
970                    expected: Some(format!("Topic {} should exist", topic_name)),
971                    actual: Some("Topic removed".to_string()),
972                    description: format!("Topic {} was removed", topic_name),
973                    severity: MismatchSeverity::Critical,
974                    confidence: 1.0,
975                    context: HashMap::new(),
976                });
977            }
978        }
979
980        // Check for added topics (non-breaking)
981        for topic_name in &all_topics {
982            if !self.topics.contains_key(topic_name) && other.topics.contains_key(topic_name) {
983                mismatches.push(Mismatch {
984                    mismatch_type: MismatchType::UnexpectedField,
985                    path: topic_name.clone(),
986                    method: None,
987                    expected: None,
988                    actual: Some(format!("New topic {}", topic_name)),
989                    description: format!("New topic {} was added", topic_name),
990                    severity: MismatchSeverity::Low,
991                    confidence: 1.0,
992                    context: HashMap::new(),
993                });
994            }
995        }
996
997        // Compare topic schemas for topics that exist in both
998        for topic_name in all_topics
999            .intersection(&self.topics.keys().cloned().collect::<std::collections::HashSet<_>>())
1000        {
1001            if let (Some(old_topic), Some(new_topic)) =
1002                (self.topics.get(topic_name), other.topics.get(topic_name))
1003            {
1004                // Compare key schema changes
1005                if old_topic.key_schema.is_some() != new_topic.key_schema.is_some() {
1006                    mismatches.push(Mismatch {
1007                        mismatch_type: MismatchType::SchemaMismatch,
1008                        path: format!("{}.key_schema", topic_name),
1009                        method: None,
1010                        expected: Some(if old_topic.key_schema.is_some() {
1011                            "Key schema should exist".to_string()
1012                        } else {
1013                            "Key schema should not exist".to_string()
1014                        }),
1015                        actual: Some(if new_topic.key_schema.is_some() {
1016                            "Key schema added".to_string()
1017                        } else {
1018                            "Key schema removed".to_string()
1019                        }),
1020                        description: format!(
1021                            "Key schema presence changed for topic {}",
1022                            topic_name
1023                        ),
1024                        severity: MismatchSeverity::High,
1025                        confidence: 1.0,
1026                        context: HashMap::new(),
1027                    });
1028                } else if let (Some(old_key), Some(new_key)) =
1029                    (&old_topic.key_schema, &new_topic.key_schema)
1030                {
1031                    if old_key.schema != new_key.schema {
1032                        let key_mismatches = MqttContract::compare_json_schemas(
1033                            &old_key.schema,
1034                            &new_key.schema,
1035                            &format!("{}.key", topic_name),
1036                        );
1037                        mismatches.extend(key_mismatches);
1038                    }
1039                }
1040
1041                // Compare value schema changes
1042                if old_topic.value_schema.schema != new_topic.value_schema.schema {
1043                    let value_mismatches = MqttContract::compare_json_schemas(
1044                        &old_topic.value_schema.schema,
1045                        &new_topic.value_schema.schema,
1046                        &format!("{}.value", topic_name),
1047                    );
1048                    mismatches.extend(value_mismatches);
1049                }
1050
1051                // Check evolution rules compliance
1052                if let Some(ref evolution_rules) = new_topic.evolution_rules {
1053                    // Check if changes violate evolution rules
1054                    let has_breaking_changes = mismatches.iter().any(|m| {
1055                        matches!(m.severity, MismatchSeverity::Critical | MismatchSeverity::High)
1056                    });
1057
1058                    if has_breaking_changes && !evolution_rules.allow_backward_compatible {
1059                        mismatches.push(Mismatch {
1060                            mismatch_type: MismatchType::SchemaMismatch,
1061                            path: format!("{}.evolution_rules", topic_name),
1062                            method: None,
1063                            expected: Some("Backward compatible changes only".to_string()),
1064                            actual: Some("Breaking changes detected".to_string()),
1065                            description: format!(
1066                                "Topic {} has breaking changes but evolution rules require backward compatibility",
1067                                topic_name
1068                            ),
1069                            severity: MismatchSeverity::High,
1070                            confidence: 1.0,
1071                            context: HashMap::new(),
1072                        });
1073                    }
1074                }
1075            }
1076        }
1077
1078        let matches = mismatches.is_empty();
1079        let confidence = if matches { 1.0 } else { 0.8 };
1080
1081        Ok(ContractDiffResult {
1082            matches,
1083            confidence,
1084            mismatches,
1085            recommendations: Vec::new(),
1086            corrections: Vec::new(),
1087            metadata: mockforge_foundation::contract_diff_types::DiffMetadata {
1088                analyzed_at: chrono::Utc::now(),
1089                request_source: "kafka_contract_diff".to_string(),
1090                contract_version: Some(self.version.clone()),
1091                contract_format: "kafka_schema".to_string(),
1092                endpoint_path: "".to_string(),
1093                http_method: "".to_string(),
1094                request_count: 1,
1095                llm_provider: None,
1096                llm_model: None,
1097            },
1098        })
1099    }
1100
1101    /// Validate a message against a topic schema
1102    fn validate_message_against_schema(
1103        &self,
1104        topic_name: &str,
1105        key: Option<&Value>,
1106        value: &Value,
1107    ) -> Result<ValidationResult, ContractError> {
1108        let (key_schema_opt, value_schema) = self
1109            .schema_cache
1110            .get(topic_name)
1111            .ok_or_else(|| ContractError::OperationNotFound(topic_name.to_string()))?;
1112
1113        let mut validation_errors = Vec::new();
1114
1115        // Validate key if present and schema exists
1116        if let (Some(key_value), Some(key_schema)) = (key, key_schema_opt) {
1117            for error in key_schema.iter_errors(key_value) {
1118                validation_errors.push(ValidationError {
1119                    message: format!("Key validation error: {}", error),
1120                    path: Some(format!("{}.key{}", topic_name, error.instance_path)),
1121                    code: Some("KEY_SCHEMA_VALIDATION_ERROR".to_string()),
1122                });
1123            }
1124        }
1125
1126        // Validate value
1127        for error in value_schema.iter_errors(value) {
1128            validation_errors.push(ValidationError {
1129                message: format!("Value validation error: {}", error),
1130                path: Some(format!("{}.value{}", topic_name, error.instance_path)),
1131                code: Some("VALUE_SCHEMA_VALIDATION_ERROR".to_string()),
1132            });
1133        }
1134
1135        Ok(ValidationResult {
1136            valid: validation_errors.is_empty(),
1137            errors: validation_errors,
1138            warnings: Vec::new(),
1139        })
1140    }
1141}
1142
1143#[async_trait::async_trait]
1144impl ProtocolContract for KafkaContract {
1145    fn protocol(&self) -> Protocol {
1146        Protocol::Kafka
1147    }
1148
1149    fn contract_id(&self) -> &str {
1150        &self.contract_id
1151    }
1152
1153    fn version(&self) -> &str {
1154        &self.version
1155    }
1156
1157    fn operations(&self) -> Vec<ContractOperation> {
1158        self.operations_cache.values().cloned().collect()
1159    }
1160
1161    fn get_operation(&self, operation_id: &str) -> Option<&ContractOperation> {
1162        self.operations_cache.get(operation_id)
1163    }
1164
1165    async fn diff(
1166        &self,
1167        other: &dyn ProtocolContract,
1168    ) -> Result<ContractDiffResult, ContractError> {
1169        if other.protocol() != Protocol::Kafka {
1170            return Err(ContractError::UnsupportedProtocol(other.protocol()));
1171        }
1172
1173        Err(ContractError::Other(
1174            "Direct comparison of KafkaContract instances requires type information. \
1175             Use KafkaContract::diff_contracts() for comparing two KafkaContract instances."
1176                .to_string(),
1177        ))
1178    }
1179
1180    async fn validate(
1181        &self,
1182        operation_id: &str,
1183        request: &ContractRequest,
1184    ) -> Result<ValidationResult, ContractError> {
1185        // Parse the message payload
1186        // For Kafka, the payload might contain both key and value
1187        // For simplicity, we'll assume the payload is the value and key is in metadata
1188        let value: Value = serde_json::from_slice(&request.payload)
1189            .map_err(|e| ContractError::SchemaValidation(format!("Invalid JSON: {}", e)))?;
1190
1191        let key = request.metadata.get("key").and_then(|k| serde_json::from_str::<Value>(k).ok());
1192
1193        // Validate against the topic schema
1194        self.validate_message_against_schema(operation_id, key.as_ref(), &value)
1195    }
1196
1197    fn get_schema(&self, operation_id: &str) -> Option<Value> {
1198        self.topics.get(operation_id).map(|topic| {
1199            serde_json::json!({
1200                "key": topic.key_schema.as_ref().map(|s| s.schema.clone()),
1201                "value": topic.value_schema.schema.clone(),
1202            })
1203        })
1204    }
1205
1206    fn to_json(&self) -> Result<Value, ContractError> {
1207        let topics: Vec<Value> = self
1208            .topics
1209            .values()
1210            .map(|topic| {
1211                serde_json::json!({
1212                    "topic": topic.topic,
1213                    "key_schema": topic.key_schema.as_ref().map(|s| {
1214                        serde_json::json!({
1215                            "format": s.format,
1216                            "schema": s.schema,
1217                            "schema_id": s.schema_id,
1218                            "version": s.version,
1219                        })
1220                    }),
1221                    "value_schema": {
1222                        "format": topic.value_schema.format,
1223                        "schema": topic.value_schema.schema,
1224                        "schema_id": topic.value_schema.schema_id,
1225                        "version": topic.value_schema.version,
1226                    },
1227                    "partitions": topic.partitions,
1228                    "replication_factor": topic.replication_factor,
1229                    "description": topic.description,
1230                    "evolution_rules": topic.evolution_rules,
1231                })
1232            })
1233            .collect();
1234
1235        Ok(serde_json::json!({
1236            "contract_id": self.contract_id,
1237            "version": self.version,
1238            "protocol": "kafka",
1239            "topics": topics,
1240            "metadata": self.metadata,
1241        }))
1242    }
1243}
1244
1245/// Helper function to compare two KafkaContract instances
1246pub fn diff_kafka_contracts(
1247    old_contract: &KafkaContract,
1248    new_contract: &KafkaContract,
1249) -> Result<ContractDiffResult, ContractError> {
1250    old_contract.diff_contracts(new_contract)
1251}
1252
1253#[cfg(test)]
1254mod tests {
1255    use super::*;
1256
1257    #[test]
1258    fn test_mqtt_contract_creation() {
1259        let contract = MqttContract::new("test-contract".to_string(), "1.0.0".to_string());
1260        assert_eq!(contract.contract_id(), "test-contract");
1261        assert_eq!(contract.version(), "1.0.0");
1262    }
1263
1264    #[test]
1265    fn test_kafka_contract_creation() {
1266        let contract = KafkaContract::new("test-contract".to_string(), "1.0.0".to_string());
1267        assert_eq!(contract.contract_id(), "test-contract");
1268        assert_eq!(contract.version(), "1.0.0");
1269    }
1270}