Skip to main content

oxirs_stream/
schema_validator.rs

1//! Stream message schema validation.
2//!
3//! Provides schema-driven validation of stream messages, including field type
4//! checking, required field enforcement, format validation (email, URI, date-time,
5//! UUID), strict unknown-field detection, and schema composition stubs (allOf/anyOf).
6//!
7//! Schemas are versioned and stored in a [`SchemaRegistry`].
8
9use std::collections::HashMap;
10
11// ─────────────────────────────────────────────────────────────────────────────
12// Field type
13// ─────────────────────────────────────────────────────────────────────────────
14
15/// The expected type of a schema field.
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub enum FieldType {
18    String,
19    Integer,
20    Float,
21    Boolean,
22    Array,
23    Object,
24}
25
26impl FieldType {
27    /// Human-readable name used in error messages.
28    pub fn name(&self) -> &'static str {
29        match self {
30            FieldType::String => "string",
31            FieldType::Integer => "integer",
32            FieldType::Float => "float",
33            FieldType::Boolean => "boolean",
34            FieldType::Array => "array",
35            FieldType::Object => "object",
36        }
37    }
38}
39
40// ─────────────────────────────────────────────────────────────────────────────
41// Format
42// ─────────────────────────────────────────────────────────────────────────────
43
44/// Optional string format constraints.
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub enum FieldFormat {
47    /// `user@example.com`
48    Email,
49    /// `https://example.com/path`
50    Uri,
51    /// ISO 8601 / RFC 3339, e.g. `2024-01-15T10:00:00Z`
52    DateTime,
53    /// `xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx`
54    Uuid,
55}
56
57impl FieldFormat {
58    /// Validate a string value against this format pattern.
59    pub fn validate(&self, value: &str) -> bool {
60        match self {
61            FieldFormat::Email => validate_email(value),
62            FieldFormat::Uri => validate_uri(value),
63            FieldFormat::DateTime => validate_datetime(value),
64            FieldFormat::Uuid => validate_uuid(value),
65        }
66    }
67
68    /// Short name used in error messages.
69    pub fn name(&self) -> &'static str {
70        match self {
71            FieldFormat::Email => "email",
72            FieldFormat::Uri => "uri",
73            FieldFormat::DateTime => "date-time",
74            FieldFormat::Uuid => "uuid",
75        }
76    }
77}
78
79// ─────────────────────────────────────────────────────────────────────────────
80// Field definition
81// ─────────────────────────────────────────────────────────────────────────────
82
83/// Definition of a single field within a schema.
84#[derive(Debug, Clone)]
85pub struct FieldDefinition {
86    /// Field name / key.
87    pub name: String,
88    /// Expected JSON-style type.
89    pub field_type: FieldType,
90    /// Whether the field must be present.
91    pub required: bool,
92    /// Optional string format constraint (applied when `field_type == String`).
93    pub format: Option<FieldFormat>,
94}
95
96impl FieldDefinition {
97    /// Create a new field definition.
98    pub fn new(
99        name: impl Into<String>,
100        field_type: FieldType,
101        required: bool,
102        format: Option<FieldFormat>,
103    ) -> Self {
104        Self {
105            name: name.into(),
106            field_type,
107            required,
108            format,
109        }
110    }
111}
112
113// ─────────────────────────────────────────────────────────────────────────────
114// Schema composition stubs
115// ─────────────────────────────────────────────────────────────────────────────
116
117/// Composition operator applied to sub-schemas.
118#[derive(Debug, Clone)]
119pub enum SchemaComposition {
120    /// Message must satisfy ALL listed sub-schemas.
121    AllOf(Vec<String>),
122    /// Message must satisfy AT LEAST ONE listed sub-schema.
123    AnyOf(Vec<String>),
124}
125
126// ─────────────────────────────────────────────────────────────────────────────
127// Schema
128// ─────────────────────────────────────────────────────────────────────────────
129
130/// A versioned schema describing the expected structure of a stream message.
131#[derive(Debug, Clone)]
132pub struct Schema {
133    /// Schema name.
134    pub name: String,
135    /// Schema version (e.g. `"1.0"`).
136    pub version: String,
137    /// Field definitions.
138    pub fields: Vec<FieldDefinition>,
139    /// When `true`, fields not declared in `fields` cause validation errors.
140    pub strict_mode: bool,
141    /// Optional schema composition (allOf / anyOf stub).
142    pub composition: Option<SchemaComposition>,
143}
144
145impl Schema {
146    /// Build a new schema.
147    pub fn new(name: impl Into<String>, version: impl Into<String>) -> Self {
148        Self {
149            name: name.into(),
150            version: version.into(),
151            fields: Vec::new(),
152            strict_mode: false,
153            composition: None,
154        }
155    }
156
157    /// Add a field definition.
158    pub fn with_field(mut self, field: FieldDefinition) -> Self {
159        self.fields.push(field);
160        self
161    }
162
163    /// Enable strict mode (unknown fields are rejected).
164    pub fn strict(mut self) -> Self {
165        self.strict_mode = true;
166        self
167    }
168
169    /// Attach a composition rule.
170    pub fn with_composition(mut self, composition: SchemaComposition) -> Self {
171        self.composition = Some(composition);
172        self
173    }
174}
175
176// ─────────────────────────────────────────────────────────────────────────────
177// Typed message value (simplified JSON-like)
178// ─────────────────────────────────────────────────────────────────────────────
179
180/// A typed value extracted from a stream message.
181#[derive(Debug, Clone, PartialEq)]
182pub enum MessageValue {
183    String(String),
184    Integer(i64),
185    Float(f64),
186    Boolean(bool),
187    Array(Vec<MessageValue>),
188    Object(HashMap<String, MessageValue>),
189}
190
191impl MessageValue {
192    /// Return the `FieldType` that matches this value variant.
193    pub fn field_type(&self) -> FieldType {
194        match self {
195            MessageValue::String(_) => FieldType::String,
196            MessageValue::Integer(_) => FieldType::Integer,
197            MessageValue::Float(_) => FieldType::Float,
198            MessageValue::Boolean(_) => FieldType::Boolean,
199            MessageValue::Array(_) => FieldType::Array,
200            MessageValue::Object(_) => FieldType::Object,
201        }
202    }
203
204    /// Return the string value if this is a `String` variant.
205    pub fn as_str(&self) -> Option<&str> {
206        if let MessageValue::String(s) = self {
207            Some(s.as_str())
208        } else {
209            None
210        }
211    }
212}
213
214/// A stream message represented as a flat map of field names to values.
215pub type StreamMessage = HashMap<String, MessageValue>;
216
217// ─────────────────────────────────────────────────────────────────────────────
218// Validation result
219// ─────────────────────────────────────────────────────────────────────────────
220
221/// A single field-level validation error.
222#[derive(Debug, Clone, PartialEq)]
223pub struct FieldError {
224    /// Field that failed.
225    pub field: String,
226    /// Human-readable description.
227    pub message: String,
228}
229
230impl FieldError {
231    fn new(field: impl Into<String>, message: impl Into<String>) -> Self {
232        Self {
233            field: field.into(),
234            message: message.into(),
235        }
236    }
237}
238
239/// Result of validating a single message against a schema.
240#[derive(Debug, Clone)]
241pub struct ValidationResult {
242    /// `true` iff the message passed all checks.
243    pub is_valid: bool,
244    /// Per-field errors (empty on success).
245    pub errors: Vec<FieldError>,
246}
247
248impl ValidationResult {
249    fn ok() -> Self {
250        Self {
251            is_valid: true,
252            errors: Vec::new(),
253        }
254    }
255
256    fn with_errors(errors: Vec<FieldError>) -> Self {
257        Self {
258            is_valid: errors.is_empty(),
259            errors,
260        }
261    }
262
263    /// Number of field errors.
264    pub fn error_count(&self) -> usize {
265        self.errors.len()
266    }
267}
268
269// ─────────────────────────────────────────────────────────────────────────────
270// Validator
271// ─────────────────────────────────────────────────────────────────────────────
272
273/// Validates stream messages against a [`Schema`].
274pub struct SchemaValidator;
275
276impl SchemaValidator {
277    /// Validate `message` against `schema`.
278    ///
279    /// Checks, in order:
280    /// 1. Required field presence
281    /// 2. Field type correctness
282    /// 3. String format constraints
283    /// 4. Unknown field detection (strict mode)
284    pub fn validate(schema: &Schema, message: &StreamMessage) -> ValidationResult {
285        let mut errors: Vec<FieldError> = Vec::new();
286
287        // Build a set of known field names for fast lookup.
288        let known: std::collections::HashSet<&str> =
289            schema.fields.iter().map(|f| f.name.as_str()).collect();
290
291        // Check required fields and type / format correctness.
292        for field_def in &schema.fields {
293            match message.get(&field_def.name) {
294                None => {
295                    if field_def.required {
296                        errors.push(FieldError::new(
297                            &field_def.name,
298                            format!("required field '{}' is missing", field_def.name),
299                        ));
300                    }
301                }
302                Some(value) => {
303                    // Type check
304                    if value.field_type() != field_def.field_type {
305                        errors.push(FieldError::new(
306                            &field_def.name,
307                            format!(
308                                "expected type '{}', found '{}'",
309                                field_def.field_type.name(),
310                                value.field_type().name()
311                            ),
312                        ));
313                    } else if let Some(fmt) = &field_def.format {
314                        // Format check (only applicable to strings)
315                        if let Some(s) = value.as_str() {
316                            if !fmt.validate(s) {
317                                errors.push(FieldError::new(
318                                    &field_def.name,
319                                    format!("value '{}' does not match format '{}'", s, fmt.name()),
320                                ));
321                            }
322                        }
323                    }
324                }
325            }
326        }
327
328        // Strict mode: unknown fields
329        if schema.strict_mode {
330            for key in message.keys() {
331                if !known.contains(key.as_str()) {
332                    errors.push(FieldError::new(
333                        key,
334                        format!("unknown field '{}' not allowed in strict mode", key),
335                    ));
336                }
337            }
338        }
339
340        if errors.is_empty() {
341            ValidationResult::ok()
342        } else {
343            ValidationResult::with_errors(errors)
344        }
345    }
346}
347
348// ─────────────────────────────────────────────────────────────────────────────
349// Schema registry
350// ─────────────────────────────────────────────────────────────────────────────
351
352/// Registry key: (schema name, version).
353#[derive(Debug, Clone, PartialEq, Eq, Hash)]
354struct RegistryKey {
355    name: String,
356    version: String,
357}
358
359/// Registry for named, versioned schemas.
360pub struct SchemaRegistry {
361    schemas: HashMap<RegistryKey, Schema>,
362}
363
364impl SchemaRegistry {
365    /// Create an empty registry.
366    pub fn new() -> Self {
367        Self {
368            schemas: HashMap::new(),
369        }
370    }
371
372    /// Register a schema. Overwrites any existing entry with the same name+version.
373    pub fn register(&mut self, schema: Schema) {
374        let key = RegistryKey {
375            name: schema.name.clone(),
376            version: schema.version.clone(),
377        };
378        self.schemas.insert(key, schema);
379    }
380
381    /// Look up a schema by name and version.
382    pub fn lookup(&self, name: &str, version: &str) -> Option<&Schema> {
383        let key = RegistryKey {
384            name: name.to_string(),
385            version: version.to_string(),
386        };
387        self.schemas.get(&key)
388    }
389
390    /// Validate a message using a registered schema.
391    ///
392    /// Returns `None` if the schema is not found.
393    pub fn validate(
394        &self,
395        schema_name: &str,
396        schema_version: &str,
397        message: &StreamMessage,
398    ) -> Option<ValidationResult> {
399        let schema = self.lookup(schema_name, schema_version)?;
400        Some(SchemaValidator::validate(schema, message))
401    }
402
403    /// Number of schemas registered.
404    pub fn schema_count(&self) -> usize {
405        self.schemas.len()
406    }
407
408    /// List all registered (name, version) pairs.
409    pub fn list(&self) -> Vec<(String, String)> {
410        let mut pairs: Vec<(String, String)> = self
411            .schemas
412            .keys()
413            .map(|k| (k.name.clone(), k.version.clone()))
414            .collect();
415        pairs.sort();
416        pairs
417    }
418}
419
420impl Default for SchemaRegistry {
421    fn default() -> Self {
422        Self::new()
423    }
424}
425
426// ─────────────────────────────────────────────────────────────────────────────
427// Format validation helpers
428// ─────────────────────────────────────────────────────────────────────────────
429
430fn validate_email(value: &str) -> bool {
431    // Simple pattern: <local>@<domain>.<tld>
432    let parts: Vec<&str> = value.splitn(2, '@').collect();
433    if parts.len() != 2 {
434        return false;
435    }
436    let local = parts[0];
437    let domain = parts[1];
438    !local.is_empty() && domain.contains('.') && !domain.starts_with('.') && !domain.ends_with('.')
439}
440
441fn validate_uri(value: &str) -> bool {
442    // Require a known scheme followed by "://"
443    let schemes = ["http://", "https://", "ftp://", "urn:", "mailto:"];
444    schemes.iter().any(|s| value.starts_with(s)) && value.len() > 8
445}
446
447fn validate_datetime(value: &str) -> bool {
448    // Accept ISO 8601 / RFC 3339: YYYY-MM-DDTHH:MM:SS with optional timezone.
449    // Minimal pattern: 19 chars for "YYYY-MM-DDTHH:MM:SS"
450    if value.len() < 19 {
451        return false;
452    }
453    let bytes = value.as_bytes();
454    let date_sep1 = bytes.get(4).copied() == Some(b'-');
455    let date_sep2 = bytes.get(7).copied() == Some(b'-');
456    let time_sep = bytes.get(10).copied() == Some(b'T') || bytes.get(10).copied() == Some(b' ');
457    let colon1 = bytes.get(13).copied() == Some(b':');
458    let colon2 = bytes.get(16).copied() == Some(b':');
459    date_sep1 && date_sep2 && time_sep && colon1 && colon2
460}
461
462fn validate_uuid(value: &str) -> bool {
463    // xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx  (36 chars with hyphens)
464    if value.len() != 36 {
465        return false;
466    }
467    let bytes = value.as_bytes();
468    bytes[8] == b'-'
469        && bytes[13] == b'-'
470        && bytes[18] == b'-'
471        && bytes[23] == b'-'
472        && value
473            .chars()
474            .enumerate()
475            .all(|(i, c)| matches!(i, 8 | 13 | 18 | 23) || c.is_ascii_hexdigit())
476}
477
478// ─────────────────────────────────────────────────────────────────────────────
479// Tests
480// ─────────────────────────────────────────────────────────────────────────────
481
482#[cfg(test)]
483mod tests {
484    use super::*;
485
486    // ── helpers ──────────────────────────────────────────────────────────────
487
488    fn string_field(name: &str, required: bool) -> FieldDefinition {
489        FieldDefinition::new(name, FieldType::String, required, None)
490    }
491
492    fn int_field(name: &str, required: bool) -> FieldDefinition {
493        FieldDefinition::new(name, FieldType::Integer, required, None)
494    }
495
496    fn email_field(name: &str) -> FieldDefinition {
497        FieldDefinition::new(name, FieldType::String, true, Some(FieldFormat::Email))
498    }
499
500    fn uri_field(name: &str) -> FieldDefinition {
501        FieldDefinition::new(name, FieldType::String, true, Some(FieldFormat::Uri))
502    }
503
504    fn datetime_field(name: &str) -> FieldDefinition {
505        FieldDefinition::new(name, FieldType::String, true, Some(FieldFormat::DateTime))
506    }
507
508    fn uuid_field(name: &str) -> FieldDefinition {
509        FieldDefinition::new(name, FieldType::String, true, Some(FieldFormat::Uuid))
510    }
511
512    fn basic_schema() -> Schema {
513        Schema::new("event", "1.0")
514            .with_field(string_field("id", true))
515            .with_field(int_field("count", false))
516    }
517
518    fn msg(pairs: &[(&str, MessageValue)]) -> StreamMessage {
519        pairs
520            .iter()
521            .map(|(k, v)| (k.to_string(), v.clone()))
522            .collect()
523    }
524
525    // ── required field presence ──────────────────────────────────────────────
526
527    #[test]
528    fn test_valid_message_passes() {
529        let schema = basic_schema();
530        let message = msg(&[("id", MessageValue::String("abc".into()))]);
531        let result = SchemaValidator::validate(&schema, &message);
532        assert!(result.is_valid);
533        assert!(result.errors.is_empty());
534    }
535
536    #[test]
537    fn test_missing_required_field_fails() {
538        let schema = basic_schema();
539        let message = msg(&[]);
540        let result = SchemaValidator::validate(&schema, &message);
541        assert!(!result.is_valid);
542        assert_eq!(result.errors.len(), 1);
543        assert_eq!(result.errors[0].field, "id");
544    }
545
546    #[test]
547    fn test_optional_field_may_be_absent() {
548        let schema = basic_schema();
549        let message = msg(&[("id", MessageValue::String("x".into()))]);
550        let result = SchemaValidator::validate(&schema, &message);
551        assert!(result.is_valid);
552    }
553
554    // ── type checking ────────────────────────────────────────────────────────
555
556    #[test]
557    fn test_type_mismatch_integer_vs_string() {
558        let schema = Schema::new("s", "1").with_field(int_field("n", true));
559        let message = msg(&[("n", MessageValue::String("not-a-number".into()))]);
560        let result = SchemaValidator::validate(&schema, &message);
561        assert!(!result.is_valid);
562        assert!(result.errors[0].message.contains("integer"));
563    }
564
565    #[test]
566    fn test_boolean_type_correct() {
567        let schema = Schema::new("s", "1").with_field(FieldDefinition::new(
568            "flag",
569            FieldType::Boolean,
570            true,
571            None,
572        ));
573        let message = msg(&[("flag", MessageValue::Boolean(true))]);
574        let result = SchemaValidator::validate(&schema, &message);
575        assert!(result.is_valid);
576    }
577
578    #[test]
579    fn test_float_type_accepted() {
580        let schema = Schema::new("s", "1").with_field(FieldDefinition::new(
581            "temp",
582            FieldType::Float,
583            true,
584            None,
585        ));
586        let message = msg(&[("temp", MessageValue::Float(36.6))]);
587        let result = SchemaValidator::validate(&schema, &message);
588        assert!(result.is_valid);
589    }
590
591    #[test]
592    fn test_array_type_accepted() {
593        let schema = Schema::new("s", "1").with_field(FieldDefinition::new(
594            "tags",
595            FieldType::Array,
596            true,
597            None,
598        ));
599        let message = msg(&[(
600            "tags",
601            MessageValue::Array(vec![MessageValue::String("a".into())]),
602        )]);
603        let result = SchemaValidator::validate(&schema, &message);
604        assert!(result.is_valid);
605    }
606
607    #[test]
608    fn test_object_type_accepted() {
609        let schema = Schema::new("s", "1").with_field(FieldDefinition::new(
610            "meta",
611            FieldType::Object,
612            true,
613            None,
614        ));
615        let inner: HashMap<String, MessageValue> =
616            [("k".to_string(), MessageValue::Integer(1))].into();
617        let message = msg(&[("meta", MessageValue::Object(inner))]);
618        let result = SchemaValidator::validate(&schema, &message);
619        assert!(result.is_valid);
620    }
621
622    // ── format validation ────────────────────────────────────────────────────
623
624    #[test]
625    fn test_valid_email_passes() {
626        let schema = Schema::new("s", "1").with_field(email_field("email"));
627        let message = msg(&[("email", MessageValue::String("user@example.com".into()))]);
628        let result = SchemaValidator::validate(&schema, &message);
629        assert!(result.is_valid, "{:?}", result.errors);
630    }
631
632    #[test]
633    fn test_invalid_email_fails() {
634        let schema = Schema::new("s", "1").with_field(email_field("email"));
635        let message = msg(&[("email", MessageValue::String("not-an-email".into()))]);
636        let result = SchemaValidator::validate(&schema, &message);
637        assert!(!result.is_valid);
638        assert!(result.errors[0].message.contains("email"));
639    }
640
641    #[test]
642    fn test_valid_uri_passes() {
643        let schema = Schema::new("s", "1").with_field(uri_field("url"));
644        let message = msg(&[(
645            "url",
646            MessageValue::String("https://example.com/path".into()),
647        )]);
648        let result = SchemaValidator::validate(&schema, &message);
649        assert!(result.is_valid);
650    }
651
652    #[test]
653    fn test_invalid_uri_fails() {
654        let schema = Schema::new("s", "1").with_field(uri_field("url"));
655        let message = msg(&[("url", MessageValue::String("not-a-uri".into()))]);
656        let result = SchemaValidator::validate(&schema, &message);
657        assert!(!result.is_valid);
658    }
659
660    #[test]
661    fn test_valid_datetime_passes() {
662        let schema = Schema::new("s", "1").with_field(datetime_field("ts"));
663        let message = msg(&[("ts", MessageValue::String("2024-01-15T10:30:00Z".into()))]);
664        let result = SchemaValidator::validate(&schema, &message);
665        assert!(result.is_valid);
666    }
667
668    #[test]
669    fn test_invalid_datetime_fails() {
670        let schema = Schema::new("s", "1").with_field(datetime_field("ts"));
671        let message = msg(&[("ts", MessageValue::String("not-a-date".into()))]);
672        let result = SchemaValidator::validate(&schema, &message);
673        assert!(!result.is_valid);
674    }
675
676    #[test]
677    fn test_valid_uuid_passes() {
678        let schema = Schema::new("s", "1").with_field(uuid_field("id"));
679        let message = msg(&[(
680            "id",
681            MessageValue::String("550e8400-e29b-41d4-a716-446655440000".into()),
682        )]);
683        let result = SchemaValidator::validate(&schema, &message);
684        assert!(result.is_valid);
685    }
686
687    #[test]
688    fn test_invalid_uuid_fails() {
689        let schema = Schema::new("s", "1").with_field(uuid_field("id"));
690        let message = msg(&[("id", MessageValue::String("not-a-uuid".into()))]);
691        let result = SchemaValidator::validate(&schema, &message);
692        assert!(!result.is_valid);
693    }
694
695    // ── strict mode ──────────────────────────────────────────────────────────
696
697    #[test]
698    fn test_strict_mode_rejects_unknown_field() {
699        let schema = Schema::new("s", "1")
700            .with_field(string_field("id", true))
701            .strict();
702        let message = msg(&[
703            ("id", MessageValue::String("x".into())),
704            ("extra", MessageValue::Boolean(false)),
705        ]);
706        let result = SchemaValidator::validate(&schema, &message);
707        assert!(!result.is_valid);
708        assert!(result.errors.iter().any(|e| e.field == "extra"));
709    }
710
711    #[test]
712    fn test_non_strict_allows_unknown_field() {
713        let schema = Schema::new("s", "1").with_field(string_field("id", true));
714        let message = msg(&[
715            ("id", MessageValue::String("x".into())),
716            ("extra", MessageValue::Boolean(false)),
717        ]);
718        let result = SchemaValidator::validate(&schema, &message);
719        assert!(result.is_valid);
720    }
721
722    // ── schema composition stubs ─────────────────────────────────────────────
723
724    #[test]
725    fn test_all_of_composition_stored() {
726        let schema = Schema::new("s", "1")
727            .with_composition(SchemaComposition::AllOf(vec!["base".into(), "ext".into()]));
728        assert!(matches!(
729            schema.composition,
730            Some(SchemaComposition::AllOf(_))
731        ));
732    }
733
734    #[test]
735    fn test_any_of_composition_stored() {
736        let schema =
737            Schema::new("s", "1").with_composition(SchemaComposition::AnyOf(vec!["opt1".into()]));
738        assert!(matches!(
739            schema.composition,
740            Some(SchemaComposition::AnyOf(_))
741        ));
742    }
743
744    // ── schema registry ──────────────────────────────────────────────────────
745
746    #[test]
747    fn test_registry_register_and_lookup() {
748        let mut registry = SchemaRegistry::new();
749        registry.register(basic_schema());
750        assert!(registry.lookup("event", "1.0").is_some());
751        assert!(registry.lookup("event", "2.0").is_none());
752    }
753
754    #[test]
755    fn test_registry_validate_via_registry() {
756        let mut registry = SchemaRegistry::new();
757        registry.register(basic_schema());
758        let message = msg(&[("id", MessageValue::String("abc".into()))]);
759        let result = registry
760            .validate("event", "1.0", &message)
761            .expect("schema found");
762        assert!(result.is_valid);
763    }
764
765    #[test]
766    fn test_registry_returns_none_for_missing_schema() {
767        let registry = SchemaRegistry::new();
768        let message = msg(&[]);
769        let result = registry.validate("nonexistent", "1.0", &message);
770        assert!(result.is_none());
771    }
772
773    #[test]
774    fn test_registry_schema_count() {
775        let mut registry = SchemaRegistry::new();
776        assert_eq!(registry.schema_count(), 0);
777        registry.register(basic_schema());
778        assert_eq!(registry.schema_count(), 1);
779    }
780
781    #[test]
782    fn test_registry_list_all() {
783        let mut registry = SchemaRegistry::new();
784        registry.register(Schema::new("a", "1.0"));
785        registry.register(Schema::new("b", "2.0"));
786        let list = registry.list();
787        assert_eq!(list.len(), 2);
788    }
789
790    #[test]
791    fn test_registry_overwrite_same_version() {
792        let mut registry = SchemaRegistry::new();
793        registry.register(Schema::new("ev", "1.0").with_field(string_field("x", true)));
794        registry.register(Schema::new("ev", "1.0").with_field(string_field("y", true)));
795        let schema = registry.lookup("ev", "1.0").expect("exists");
796        assert_eq!(schema.fields.len(), 1);
797        assert_eq!(schema.fields[0].name, "y");
798    }
799
800    // ── multiple errors ──────────────────────────────────────────────────────
801
802    #[test]
803    fn test_multiple_required_fields_missing() {
804        let schema = Schema::new("s", "1")
805            .with_field(string_field("a", true))
806            .with_field(string_field("b", true));
807        let message = msg(&[]);
808        let result = SchemaValidator::validate(&schema, &message);
809        assert!(!result.is_valid);
810        assert_eq!(result.error_count(), 2);
811    }
812
813    #[test]
814    fn test_type_error_and_missing_field() {
815        let schema = Schema::new("s", "1")
816            .with_field(string_field("name", true))
817            .with_field(int_field("count", true));
818        // 'name' present with wrong type, 'count' missing
819        let message = msg(&[("name", MessageValue::Integer(42))]);
820        let result = SchemaValidator::validate(&schema, &message);
821        assert!(!result.is_valid);
822        assert_eq!(result.error_count(), 2);
823    }
824
825    // ── field type names ─────────────────────────────────────────────────────
826
827    #[test]
828    fn test_field_type_names() {
829        assert_eq!(FieldType::String.name(), "string");
830        assert_eq!(FieldType::Integer.name(), "integer");
831        assert_eq!(FieldType::Float.name(), "float");
832        assert_eq!(FieldType::Boolean.name(), "boolean");
833        assert_eq!(FieldType::Array.name(), "array");
834        assert_eq!(FieldType::Object.name(), "object");
835    }
836
837    // ── message value helpers ─────────────────────────────────────────────────
838
839    #[test]
840    fn test_message_value_field_type_all_variants() {
841        assert_eq!(
842            MessageValue::String("s".into()).field_type(),
843            FieldType::String
844        );
845        assert_eq!(MessageValue::Integer(1).field_type(), FieldType::Integer);
846        assert_eq!(MessageValue::Float(1.0).field_type(), FieldType::Float);
847        assert_eq!(MessageValue::Boolean(true).field_type(), FieldType::Boolean);
848        assert_eq!(MessageValue::Array(vec![]).field_type(), FieldType::Array);
849        let empty: HashMap<String, MessageValue> = HashMap::new();
850        assert_eq!(MessageValue::Object(empty).field_type(), FieldType::Object);
851    }
852
853    #[test]
854    fn test_message_value_as_str() {
855        let v = MessageValue::String("hello".into());
856        assert_eq!(v.as_str(), Some("hello"));
857        assert_eq!(MessageValue::Integer(0).as_str(), None);
858    }
859
860    // ── format helpers ───────────────────────────────────────────────────────
861
862    #[test]
863    fn test_email_missing_at_sign_invalid() {
864        assert!(!validate_email("nodomain"));
865    }
866
867    #[test]
868    fn test_email_missing_tld_invalid() {
869        assert!(!validate_email("user@nodot"));
870    }
871
872    #[test]
873    fn test_uri_ftp_valid() {
874        assert!(validate_uri("ftp://files.example.com/pub/data.txt"));
875    }
876
877    #[test]
878    fn test_uuid_wrong_length_invalid() {
879        assert!(!validate_uuid("550e8400-e29b-41d4-a716-44665544000"));
880    }
881
882    #[test]
883    fn test_datetime_too_short_invalid() {
884        assert!(!validate_datetime("2024-01-15"));
885    }
886
887    #[test]
888    fn test_validation_result_ok_has_no_errors() {
889        let r = ValidationResult::ok();
890        assert!(r.is_valid);
891        assert_eq!(r.error_count(), 0);
892    }
893
894    #[test]
895    fn test_field_format_names() {
896        assert_eq!(FieldFormat::Email.name(), "email");
897        assert_eq!(FieldFormat::Uri.name(), "uri");
898        assert_eq!(FieldFormat::DateTime.name(), "date-time");
899        assert_eq!(FieldFormat::Uuid.name(), "uuid");
900    }
901
902    #[test]
903    fn test_schema_strict_flag() {
904        let schema = Schema::new("s", "1").strict();
905        assert!(schema.strict_mode);
906    }
907
908    #[test]
909    fn test_registry_default() {
910        let registry = SchemaRegistry::default();
911        assert_eq!(registry.schema_count(), 0);
912    }
913
914    #[test]
915    fn test_all_of_sub_schema_names_stored() {
916        let composition = SchemaComposition::AllOf(vec!["a".into(), "b".into(), "c".into()]);
917        if let SchemaComposition::AllOf(names) = composition {
918            assert_eq!(names.len(), 3);
919        }
920    }
921
922    #[test]
923    fn test_any_of_sub_schema_names_stored() {
924        let composition = SchemaComposition::AnyOf(vec!["x".into()]);
925        if let SchemaComposition::AnyOf(names) = composition {
926            assert_eq!(names.len(), 1);
927        }
928    }
929
930    #[test]
931    fn test_strict_mode_with_multiple_unknown_fields() {
932        let schema = Schema::new("s", "1")
933            .with_field(string_field("id", true))
934            .strict();
935        let message = msg(&[
936            ("id", MessageValue::String("v".into())),
937            ("x", MessageValue::Integer(1)),
938            ("y", MessageValue::Integer(2)),
939        ]);
940        let result = SchemaValidator::validate(&schema, &message);
941        assert!(!result.is_valid);
942        assert_eq!(result.error_count(), 2);
943    }
944
945    #[test]
946    fn test_empty_schema_empty_message_valid() {
947        let schema = Schema::new("empty", "1.0");
948        let message: StreamMessage = HashMap::new();
949        let result = SchemaValidator::validate(&schema, &message);
950        assert!(result.is_valid);
951    }
952
953    #[test]
954    fn test_optional_field_present_with_wrong_type() {
955        let schema = Schema::new("s", "1").with_field(int_field("n", false));
956        let message = msg(&[("n", MessageValue::Boolean(true))]);
957        let result = SchemaValidator::validate(&schema, &message);
958        assert!(!result.is_valid);
959        assert_eq!(result.errors[0].field, "n");
960    }
961
962    #[test]
963    fn test_field_error_new() {
964        let err = FieldError::new("field_x", "something went wrong");
965        assert_eq!(err.field, "field_x");
966        assert_eq!(err.message, "something went wrong");
967    }
968
969    #[test]
970    fn test_schema_name_and_version() {
971        let schema = Schema::new("orders", "2.5");
972        assert_eq!(schema.name, "orders");
973        assert_eq!(schema.version, "2.5");
974    }
975
976    #[test]
977    fn test_schema_fields_count() {
978        let schema = Schema::new("s", "1")
979            .with_field(string_field("a", true))
980            .with_field(int_field("b", false))
981            .with_field(string_field("c", true));
982        assert_eq!(schema.fields.len(), 3);
983    }
984
985    #[test]
986    fn test_validate_email_with_subdomain() {
987        let schema = Schema::new("s", "1").with_field(email_field("email"));
988        let message = msg(&[("email", MessageValue::String("user@sub.example.com".into()))]);
989        let result = SchemaValidator::validate(&schema, &message);
990        assert!(result.is_valid);
991    }
992
993    #[test]
994    fn test_validate_https_uri_valid() {
995        assert!(validate_uri("https://example.com/path?q=1"));
996    }
997
998    #[test]
999    fn test_validate_ftp_uri_valid() {
1000        assert!(validate_uri("ftp://files.example.com/data"));
1001    }
1002
1003    #[test]
1004    fn test_uuid_valid_uppercase_hex() {
1005        // UUIDs are case-insensitive in practice; validate_uuid checks is_ascii_hexdigit
1006        assert!(validate_uuid("550E8400-E29B-41D4-A716-446655440000"));
1007    }
1008
1009    #[test]
1010    fn test_datetime_with_space_separator() {
1011        // Some systems emit "YYYY-MM-DD HH:MM:SS"
1012        assert!(validate_datetime("2024-03-15 08:30:00"));
1013    }
1014
1015    #[test]
1016    fn test_strict_mode_empty_message_no_unknown() {
1017        let schema = Schema::new("s", "1")
1018            .with_field(string_field("id", false))
1019            .strict();
1020        // No fields present — no unknown fields, optional field absent.
1021        let message: StreamMessage = HashMap::new();
1022        let result = SchemaValidator::validate(&schema, &message);
1023        assert!(result.is_valid);
1024    }
1025
1026    #[test]
1027    fn test_registry_list_sorted() {
1028        let mut registry = SchemaRegistry::new();
1029        registry.register(Schema::new("z_schema", "1.0"));
1030        registry.register(Schema::new("a_schema", "1.0"));
1031        let list = registry.list();
1032        assert_eq!(list[0].0, "a_schema");
1033        assert_eq!(list[1].0, "z_schema");
1034    }
1035}