1use crate::error::{SchemaError, SchemaResult};
40use crate::types::{
41 SchemaType, ValidationLevel, ValidationReport, ValidationResult, ValidationRule,
42 ValidationRuleType,
43};
44use serde::Deserialize;
45use std::collections::HashMap;
46use tracing::{debug, warn};
47
48#[derive(Debug, Clone)]
50pub struct ValidationEngineConfig {
51 pub fail_fast: bool,
53 pub warnings_as_errors: bool,
55 pub max_rules_per_schema: usize,
57}
58
59impl Default for ValidationEngineConfig {
60 fn default() -> Self {
61 Self {
62 fail_fast: false,
63 warnings_as_errors: false,
64 max_rules_per_schema: 100,
65 }
66 }
67}
68
69impl ValidationEngineConfig {
70 pub fn new() -> Self {
71 Self::default()
72 }
73
74 pub fn with_fail_fast(mut self, fail_fast: bool) -> Self {
75 self.fail_fast = fail_fast;
76 self
77 }
78
79 pub fn with_warnings_as_errors(mut self, warnings_as_errors: bool) -> Self {
80 self.warnings_as_errors = warnings_as_errors;
81 self
82 }
83}
84
85pub struct ValidationEngine {
87 config: ValidationEngineConfig,
89 global_rules: Vec<ValidationRule>,
91 subject_rules: HashMap<String, Vec<ValidationRule>>,
93}
94
95impl ValidationEngine {
96 pub fn new(config: ValidationEngineConfig) -> Self {
98 Self {
99 config,
100 global_rules: Vec::new(),
101 subject_rules: HashMap::new(),
102 }
103 }
104
105 pub fn add_rule(&mut self, rule: ValidationRule) {
107 self.global_rules.push(rule);
108 }
109
110 pub fn add_rules(&mut self, rules: impl IntoIterator<Item = ValidationRule>) {
112 self.global_rules.extend(rules);
113 }
114
115 pub fn add_subject_rule(&mut self, subject: &str, rule: ValidationRule) {
117 self.subject_rules
118 .entry(subject.to_string())
119 .or_default()
120 .push(rule);
121 }
122
123 pub fn remove_rule(&mut self, name: &str) -> bool {
125 let before = self.global_rules.len();
126 self.global_rules.retain(|r| r.name != name);
127
128 for rules in self.subject_rules.values_mut() {
129 rules.retain(|r| r.name != name);
130 }
131
132 self.global_rules.len() != before
133 }
134
135 pub fn rules(&self) -> &[ValidationRule] {
137 &self.global_rules
138 }
139
140 pub fn list_rules(&self) -> Vec<ValidationRule> {
142 self.global_rules.clone()
143 }
144
145 pub fn subject_rules(&self, subject: &str) -> Option<&[ValidationRule]> {
147 self.subject_rules.get(subject).map(|v| v.as_slice())
148 }
149
150 pub fn clear(&mut self) {
152 self.global_rules.clear();
153 self.subject_rules.clear();
154 }
155
156 pub fn validate(
158 &self,
159 schema_type: SchemaType,
160 subject: &str,
161 schema: &str,
162 ) -> SchemaResult<ValidationReport> {
163 let mut report = ValidationReport::new();
164 let mut rules_evaluated = 0;
165
166 let applicable_rules: Vec<&ValidationRule> = self
168 .global_rules
169 .iter()
170 .chain(self.subject_rules.get(subject).into_iter().flatten())
171 .filter(|r| r.applies(schema_type, subject))
172 .take(self.config.max_rules_per_schema)
173 .collect();
174
175 debug!(
176 "Validating schema for subject {} with {} applicable rules",
177 subject,
178 applicable_rules.len()
179 );
180
181 for rule in applicable_rules {
182 let result = self.execute_rule(rule, schema_type, schema)?;
183
184 if self.config.fail_fast && !result.passed && result.level == ValidationLevel::Error {
186 report.add_result(result);
187 return Ok(report);
188 }
189
190 report.add_result(result);
191 rules_evaluated += 1;
192 }
193
194 debug!(
195 "Validation complete: {} rules evaluated, {} errors, {} warnings",
196 rules_evaluated, report.summary.errors, report.summary.warnings
197 );
198
199 Ok(report)
200 }
201
202 fn execute_rule(
204 &self,
205 rule: &ValidationRule,
206 schema_type: SchemaType,
207 schema: &str,
208 ) -> SchemaResult<ValidationResult> {
209 match rule.rule_type {
210 ValidationRuleType::MaxSize => self.validate_max_size(rule, schema),
211 ValidationRuleType::NamingConvention => {
212 self.validate_naming_convention(rule, schema_type, schema)
213 }
214 ValidationRuleType::FieldRequired => {
215 self.validate_field_required(rule, schema_type, schema)
216 }
217 ValidationRuleType::FieldType => self.validate_field_type(rule, schema_type, schema),
218 ValidationRuleType::Regex => self.validate_regex(rule, schema),
219 ValidationRuleType::JsonSchema => self.validate_json_schema(rule, schema),
220 ValidationRuleType::Cel => {
221 warn!("CEL validation not yet implemented for rule: {}", rule.name);
223 Ok(ValidationResult::pass(&rule.name))
224 }
225 }
226 }
227
228 fn validate_max_size(
230 &self,
231 rule: &ValidationRule,
232 schema: &str,
233 ) -> SchemaResult<ValidationResult> {
234 #[derive(Deserialize)]
235 struct Config {
236 max_bytes: usize,
237 }
238
239 let config: Config = serde_json::from_str(&rule.config)
240 .map_err(|e| SchemaError::Validation(format!("Invalid max_size config: {}", e)))?;
241
242 let size = schema.len();
243 if size > config.max_bytes {
244 Ok(ValidationResult::fail(
245 &rule.name,
246 rule.level,
247 format!(
248 "Schema size {} bytes exceeds maximum {} bytes",
249 size, config.max_bytes
250 ),
251 ))
252 } else {
253 Ok(ValidationResult::pass(&rule.name))
254 }
255 }
256
257 fn validate_naming_convention(
259 &self,
260 rule: &ValidationRule,
261 schema_type: SchemaType,
262 schema: &str,
263 ) -> SchemaResult<ValidationResult> {
264 #[derive(Deserialize)]
265 struct Config {
266 pattern: String,
267 #[serde(default = "default_name_field")]
268 field: String,
269 }
270
271 fn default_name_field() -> String {
272 "name".to_string()
273 }
274
275 let config: Config = serde_json::from_str(&rule.config).map_err(|e| {
276 SchemaError::Validation(format!("Invalid naming_convention config: {}", e))
277 })?;
278
279 let regex = regex::Regex::new(&config.pattern)
280 .map_err(|e| SchemaError::Validation(format!("Invalid regex pattern: {}", e)))?;
281
282 let name = match schema_type {
284 SchemaType::Avro | SchemaType::Json => {
285 let parsed: serde_json::Value = serde_json::from_str(schema)
286 .map_err(|e| SchemaError::Validation(format!("Invalid JSON schema: {}", e)))?;
287 parsed
288 .get(&config.field)
289 .and_then(|v| v.as_str())
290 .map(|s| s.to_string())
291 }
292 SchemaType::Protobuf => {
293 extract_protobuf_name(schema)
295 }
296 };
297
298 match name {
299 Some(n) if regex.is_match(&n) => Ok(ValidationResult::pass(&rule.name)),
300 Some(n) => Ok(ValidationResult::fail(
301 &rule.name,
302 rule.level,
303 format!("Name '{}' does not match pattern '{}'", n, config.pattern),
304 )),
305 None => Ok(ValidationResult::fail(
306 &rule.name,
307 rule.level,
308 format!("Could not extract '{}' field from schema", config.field),
309 )),
310 }
311 }
312
313 fn validate_field_required(
315 &self,
316 rule: &ValidationRule,
317 schema_type: SchemaType,
318 schema: &str,
319 ) -> SchemaResult<ValidationResult> {
320 #[derive(Deserialize)]
321 struct Config {
322 field: String,
323 }
324
325 let config: Config = serde_json::from_str(&rule.config).map_err(|e| {
326 SchemaError::Validation(format!("Invalid field_required config: {}", e))
327 })?;
328
329 match schema_type {
330 SchemaType::Avro | SchemaType::Json => {
331 let parsed: serde_json::Value = serde_json::from_str(schema)
332 .map_err(|e| SchemaError::Validation(format!("Invalid JSON schema: {}", e)))?;
333
334 if has_field_recursive(&parsed, &config.field) {
335 Ok(ValidationResult::pass(&rule.name))
336 } else {
337 Ok(ValidationResult::fail(
338 &rule.name,
339 rule.level,
340 format!("Required field '{}' not found in schema", config.field),
341 ))
342 }
343 }
344 SchemaType::Protobuf => {
345 if schema.contains(&format!("{} ", config.field))
347 || schema.contains(&format!("{};", config.field))
348 {
349 Ok(ValidationResult::pass(&rule.name))
350 } else {
351 Ok(ValidationResult::fail(
352 &rule.name,
353 rule.level,
354 format!("Required field '{}' not found in schema", config.field),
355 ))
356 }
357 }
358 }
359 }
360
361 fn validate_field_type(
363 &self,
364 rule: &ValidationRule,
365 schema_type: SchemaType,
366 schema: &str,
367 ) -> SchemaResult<ValidationResult> {
368 #[derive(Deserialize)]
369 struct Config {
370 field: String,
371 #[serde(rename = "type")]
372 expected_type: String,
373 }
374
375 let config: Config = serde_json::from_str(&rule.config)
376 .map_err(|e| SchemaError::Validation(format!("Invalid field_type config: {}", e)))?;
377
378 match schema_type {
379 SchemaType::Avro => {
380 let parsed: serde_json::Value = serde_json::from_str(schema)
381 .map_err(|e| SchemaError::Validation(format!("Invalid Avro schema: {}", e)))?;
382
383 if let Some(field_type) = find_avro_field_type(&parsed, &config.field) {
384 if field_type == config.expected_type {
385 Ok(ValidationResult::pass(&rule.name))
386 } else {
387 Ok(ValidationResult::fail(
388 &rule.name,
389 rule.level,
390 format!(
391 "Field '{}' has type '{}', expected '{}'",
392 config.field, field_type, config.expected_type
393 ),
394 ))
395 }
396 } else {
397 Ok(ValidationResult::fail(
398 &rule.name,
399 rule.level,
400 format!("Field '{}' not found in schema", config.field),
401 ))
402 }
403 }
404 _ => {
405 Ok(ValidationResult::pass(&rule.name))
407 }
408 }
409 }
410
411 fn validate_regex(
413 &self,
414 rule: &ValidationRule,
415 schema: &str,
416 ) -> SchemaResult<ValidationResult> {
417 #[derive(Deserialize)]
418 struct Config {
419 pattern: String,
420 #[serde(default)]
421 must_match: bool,
422 }
423
424 let config: Config = serde_json::from_str(&rule.config)
425 .map_err(|e| SchemaError::Validation(format!("Invalid regex config: {}", e)))?;
426
427 let regex = regex::Regex::new(&config.pattern)
428 .map_err(|e| SchemaError::Validation(format!("Invalid regex pattern: {}", e)))?;
429
430 let matches = regex.is_match(schema);
431 let expected = config.must_match;
432
433 if matches == expected {
434 Ok(ValidationResult::pass(&rule.name))
435 } else if expected {
436 Ok(ValidationResult::fail(
437 &rule.name,
438 rule.level,
439 format!(
440 "Schema does not match required pattern '{}'",
441 config.pattern
442 ),
443 ))
444 } else {
445 Ok(ValidationResult::fail(
446 &rule.name,
447 rule.level,
448 format!("Schema matches forbidden pattern '{}'", config.pattern),
449 ))
450 }
451 }
452
453 fn validate_json_schema(
455 &self,
456 rule: &ValidationRule,
457 schema: &str,
458 ) -> SchemaResult<ValidationResult> {
459 #[cfg(feature = "json-schema")]
460 {
461 #[derive(Deserialize)]
462 struct Config {
463 schema: serde_json::Value,
464 }
465
466 let config: Config = serde_json::from_str(&rule.config).map_err(|e| {
467 SchemaError::Validation(format!("Invalid json_schema config: {}", e))
468 })?;
469
470 let instance: serde_json::Value = serde_json::from_str(schema)
472 .map_err(|e| SchemaError::Validation(format!("Invalid JSON in schema: {}", e)))?;
473
474 let validator = jsonschema::JSONSchema::compile(&config.schema).map_err(|e| {
476 SchemaError::Validation(format!("Invalid JSON Schema validator: {}", e))
477 })?;
478
479 if validator.is_valid(&instance) {
480 Ok(ValidationResult::pass(&rule.name))
481 } else {
482 let errors: Vec<String> = validator
483 .validate(&instance)
484 .err()
485 .into_iter()
486 .flatten()
487 .map(|e| e.to_string())
488 .take(3)
489 .collect();
490
491 Ok(ValidationResult::fail(
492 &rule.name,
493 rule.level,
494 format!("JSON Schema validation failed: {}", errors.join("; ")),
495 ))
496 }
497 }
498
499 #[cfg(not(feature = "json-schema"))]
500 {
501 let _ = (rule, schema); warn!("JSON Schema validation requires the 'json-schema' feature");
503 Ok(ValidationResult::pass(&rule.name))
504 }
505 }
506}
507
508fn extract_protobuf_name(schema: &str) -> Option<String> {
510 for line in schema.lines() {
511 let trimmed = line.trim();
512 if trimmed.starts_with("message ") {
513 let name = trimmed
514 .strip_prefix("message ")?
515 .split_whitespace()
516 .next()?;
517 return Some(name.to_string());
518 }
519 }
520 None
521}
522
523fn has_field_recursive(value: &serde_json::Value, field: &str) -> bool {
525 match value {
526 serde_json::Value::Object(map) => {
527 if map.contains_key(field) {
528 return true;
529 }
530 for v in map.values() {
531 if has_field_recursive(v, field) {
532 return true;
533 }
534 }
535 false
536 }
537 serde_json::Value::Array(arr) => arr.iter().any(|v| has_field_recursive(v, field)),
538 _ => false,
539 }
540}
541
542fn find_avro_field_type(schema: &serde_json::Value, field_name: &str) -> Option<String> {
544 if let Some(fields) = schema.get("fields").and_then(|f| f.as_array()) {
545 for field in fields {
546 if field.get("name").and_then(|n| n.as_str()) == Some(field_name) {
547 return field.get("type").map(|t| match t {
548 serde_json::Value::String(s) => s.clone(),
549 serde_json::Value::Object(o) => o
550 .get("type")
551 .and_then(|t| t.as_str())
552 .unwrap_or("complex")
553 .to_string(),
554 serde_json::Value::Array(_) => "union".to_string(),
555 _ => "unknown".to_string(),
556 });
557 }
558 }
559 }
560 None
561}
562
563pub mod presets {
565 use super::*;
566
567 pub fn max_size(max_bytes: usize) -> ValidationRule {
569 ValidationRule::new(
570 "max-schema-size",
571 ValidationRuleType::MaxSize,
572 format!(r#"{{"max_bytes": {}}}"#, max_bytes),
573 )
574 .with_description(format!("Schema must be smaller than {} bytes", max_bytes))
575 }
576
577 pub fn require_doc() -> ValidationRule {
579 ValidationRule::new(
580 "require-doc",
581 ValidationRuleType::FieldRequired,
582 r#"{"field": "doc"}"#,
583 )
584 .with_description("Schema must have a 'doc' field for documentation")
585 .with_schema_types(vec![SchemaType::Avro])
586 }
587
588 pub fn require_namespace() -> ValidationRule {
590 ValidationRule::new(
591 "require-namespace",
592 ValidationRuleType::FieldRequired,
593 r#"{"field": "namespace"}"#,
594 )
595 .with_description("Avro schema must have a namespace")
596 .with_schema_types(vec![SchemaType::Avro])
597 }
598
599 pub fn pascal_case_name() -> ValidationRule {
601 ValidationRule::new(
602 "pascal-case-name",
603 ValidationRuleType::NamingConvention,
604 r#"{"pattern": "^[A-Z][a-zA-Z0-9]*$", "field": "name"}"#,
605 )
606 .with_description("Schema name must be PascalCase")
607 .with_level(ValidationLevel::Warning)
608 }
609
610 pub fn forbid_pattern(name: &str, pattern: &str, description: &str) -> ValidationRule {
612 ValidationRule::new(
613 name,
614 ValidationRuleType::Regex,
615 format!(r#"{{"pattern": "{}", "must_match": false}}"#, pattern),
616 )
617 .with_description(description)
618 }
619
620 pub fn production_ruleset() -> Vec<ValidationRule> {
622 vec![
623 max_size(100 * 1024), require_doc(), require_namespace(), pascal_case_name(), ]
628 }
629}
630
631#[cfg(test)]
632mod tests {
633 use super::*;
634
635 #[test]
636 fn test_validation_engine_creation() {
637 let engine = ValidationEngine::new(ValidationEngineConfig::default());
638 assert!(engine.rules().is_empty());
639 }
640
641 #[test]
642 fn test_add_and_remove_rule() {
643 let mut engine = ValidationEngine::new(ValidationEngineConfig::default());
644
645 engine.add_rule(presets::max_size(1024));
646 assert_eq!(engine.rules().len(), 1);
647
648 engine.remove_rule("max-schema-size");
649 assert!(engine.rules().is_empty());
650 }
651
652 #[test]
653 fn test_max_size_validation() {
654 let mut engine = ValidationEngine::new(ValidationEngineConfig::default());
655 engine.add_rule(presets::max_size(100));
656
657 let small_schema = r#"{"type":"string"}"#;
659 let report = engine
660 .validate(SchemaType::Avro, "test", small_schema)
661 .unwrap();
662 assert!(report.is_valid());
663
664 let large_schema = "x".repeat(200);
666 let report = engine
667 .validate(SchemaType::Avro, "test", &large_schema)
668 .unwrap();
669 assert!(!report.is_valid());
670 }
671
672 #[test]
673 fn test_field_required_validation() {
674 let mut engine = ValidationEngine::new(ValidationEngineConfig::default());
675 engine.add_rule(presets::require_doc());
676
677 let with_doc = r#"{"type":"record","name":"User","doc":"A user","fields":[]}"#;
679 let report = engine.validate(SchemaType::Avro, "test", with_doc).unwrap();
680 assert!(report.is_valid());
681
682 let without_doc = r#"{"type":"record","name":"User","fields":[]}"#;
684 let report = engine
685 .validate(SchemaType::Avro, "test", without_doc)
686 .unwrap();
687 assert!(!report.is_valid());
688 }
689
690 #[test]
691 fn test_naming_convention_validation() {
692 let mut engine = ValidationEngine::new(ValidationEngineConfig::default());
693 engine.add_rule(
694 ValidationRule::new(
695 "pascal-case",
696 ValidationRuleType::NamingConvention,
697 r#"{"pattern": "^[A-Z][a-zA-Z0-9]*$"}"#,
698 )
699 .with_level(ValidationLevel::Error),
700 );
701
702 let pascal = r#"{"name":"UserEvent"}"#;
704 let report = engine.validate(SchemaType::Avro, "test", pascal).unwrap();
705 assert!(report.is_valid());
706
707 let camel = r#"{"name":"userEvent"}"#;
709 let report = engine.validate(SchemaType::Avro, "test", camel).unwrap();
710 assert!(!report.is_valid());
711 }
712
713 #[test]
714 fn test_regex_validation() {
715 let mut engine = ValidationEngine::new(ValidationEngineConfig::default());
716 engine.add_rule(presets::forbid_pattern(
717 "no-ssn",
718 r"ssn|social.?security",
719 "Schema must not contain SSN fields",
720 ));
721
722 let clean = r#"{"type":"record","name":"User","fields":[{"name":"id","type":"long"}]}"#;
724 let report = engine.validate(SchemaType::Avro, "test", clean).unwrap();
725 assert!(report.is_valid());
726
727 let with_ssn =
729 r#"{"type":"record","name":"User","fields":[{"name":"ssn","type":"string"}]}"#;
730 let report = engine.validate(SchemaType::Avro, "test", with_ssn).unwrap();
731 assert!(!report.is_valid());
732 }
733
734 #[test]
735 fn test_field_type_validation() {
736 let mut engine = ValidationEngine::new(ValidationEngineConfig::default());
737 engine.add_rule(ValidationRule::new(
738 "id-must-be-long",
739 ValidationRuleType::FieldType,
740 r#"{"field": "id", "type": "long"}"#,
741 ));
742
743 let correct = r#"{"type":"record","name":"User","fields":[{"name":"id","type":"long"}]}"#;
745 let report = engine.validate(SchemaType::Avro, "test", correct).unwrap();
746 assert!(report.is_valid());
747
748 let wrong = r#"{"type":"record","name":"User","fields":[{"name":"id","type":"int"}]}"#;
750 let report = engine.validate(SchemaType::Avro, "test", wrong).unwrap();
751 assert!(!report.is_valid());
752 }
753
754 #[test]
755 fn test_subject_specific_rules() {
756 let mut engine = ValidationEngine::new(ValidationEngineConfig::default());
757
758 engine.add_subject_rule(
760 "users-value",
761 ValidationRule::new(
762 "users-rule",
763 ValidationRuleType::MaxSize,
764 r#"{"max_bytes": 50}"#,
765 ),
766 );
767
768 let schema = r#"{"type":"string"}"#;
769
770 let report = engine
772 .validate(SchemaType::Avro, "orders-value", schema)
773 .unwrap();
774 assert!(report.is_valid());
775
776 let report = engine
779 .validate(SchemaType::Avro, "users-value", schema)
780 .unwrap();
781 assert!(report.is_valid());
782 }
783
784 #[test]
785 fn test_fail_fast() {
786 let config = ValidationEngineConfig::default().with_fail_fast(true);
787 let mut engine = ValidationEngine::new(config);
788
789 engine.add_rule(ValidationRule::new(
790 "rule1",
791 ValidationRuleType::MaxSize,
792 r#"{"max_bytes": 1}"#,
793 ));
794 engine.add_rule(ValidationRule::new(
795 "rule2",
796 ValidationRuleType::MaxSize,
797 r#"{"max_bytes": 2}"#,
798 ));
799
800 let schema = "xxx"; let report = engine.validate(SchemaType::Avro, "test", schema).unwrap();
802
803 assert_eq!(report.results.len(), 1);
805 }
806
807 #[test]
808 fn test_production_ruleset() {
809 let mut engine = ValidationEngine::new(ValidationEngineConfig::default());
810 engine.add_rules(presets::production_ruleset());
811
812 let schema = r#"{
814 "type": "record",
815 "name": "UserCreated",
816 "namespace": "com.example.events",
817 "doc": "Event emitted when a new user is created",
818 "fields": [
819 {"name": "userId", "type": "long", "doc": "Unique user ID"}
820 ]
821 }"#;
822
823 let report = engine
824 .validate(SchemaType::Avro, "users-value", schema)
825 .unwrap();
826 assert!(report.is_valid(), "Errors: {:?}", report.error_messages());
827 }
828
829 #[test]
830 fn test_protobuf_name_extraction() {
831 let proto = r#"
832 syntax = "proto3";
833 message UserEvent {
834 int64 id = 1;
835 }
836 "#;
837
838 let name = extract_protobuf_name(proto);
839 assert_eq!(name, Some("UserEvent".to_string()));
840 }
841}