allsource_core/domain/entities/
schema.rs

1use crate::error::Result;
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use serde_json::Value as JsonValue;
5use uuid::Uuid;
6
7/// Compatibility mode for schema evolution
8///
9/// Defines how schema changes are validated when registering new versions.
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
11#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
12pub enum CompatibilityMode {
13    /// No compatibility checking
14    None,
15    /// New schema must be backward compatible (can read old data)
16    Backward,
17    /// New schema must be forward compatible (old readers can read new data)
18    Forward,
19    /// New schema must be both backward and forward compatible
20    Full,
21}
22
23impl Default for CompatibilityMode {
24    fn default() -> Self {
25        Self::Backward
26    }
27}
28
29impl CompatibilityMode {
30    /// Check if this mode requires backward compatibility
31    pub fn requires_backward(&self) -> bool {
32        matches!(self, Self::Backward | Self::Full)
33    }
34
35    /// Check if this mode requires forward compatibility
36    pub fn requires_forward(&self) -> bool {
37        matches!(self, Self::Forward | Self::Full)
38    }
39
40    /// Check if any compatibility is required
41    pub fn requires_compatibility(&self) -> bool {
42        !matches!(self, Self::None)
43    }
44}
45
46/// Domain Entity: Schema
47///
48/// Represents a versioned schema definition for event validation.
49/// Schemas define the structure and validation rules for event payloads.
50///
51/// Domain Rules:
52/// - Subject cannot be empty
53/// - Version starts at 1 and increments
54/// - Schema must be valid JSON
55/// - Once registered, schemas are immutable
56/// - New versions must respect compatibility mode
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct Schema {
59    id: Uuid,
60    subject: String,
61    version: u32,
62    schema_definition: JsonValue,
63    created_at: DateTime<Utc>,
64    description: Option<String>,
65    tags: Vec<String>,
66    compatibility_mode: CompatibilityMode,
67}
68
69impl Schema {
70    /// Create a new schema with validation
71    ///
72    /// # Arguments
73    /// * `subject` - The subject/topic this schema applies to (e.g., "order.placed")
74    /// * `version` - Version number (must be >= 1)
75    /// * `schema_definition` - JSON Schema definition
76    /// * `compatibility_mode` - How to validate future schema changes
77    pub fn new(
78        subject: String,
79        version: u32,
80        schema_definition: JsonValue,
81        compatibility_mode: CompatibilityMode,
82    ) -> Result<Self> {
83        Self::validate_subject(&subject)?;
84        Self::validate_version(version)?;
85        Self::validate_schema(&schema_definition)?;
86
87        Ok(Self {
88            id: Uuid::new_v4(),
89            subject,
90            version,
91            schema_definition,
92            created_at: Utc::now(),
93            description: None,
94            tags: Vec::new(),
95            compatibility_mode,
96        })
97    }
98
99    /// Create first version of a schema
100    pub fn new_v1(
101        subject: String,
102        schema_definition: JsonValue,
103        compatibility_mode: CompatibilityMode,
104    ) -> Result<Self> {
105        Self::new(subject, 1, schema_definition, compatibility_mode)
106    }
107
108    /// Reconstruct schema from storage (bypasses validation)
109    #[allow(clippy::too_many_arguments)]
110    pub fn reconstruct(
111        id: Uuid,
112        subject: String,
113        version: u32,
114        schema_definition: JsonValue,
115        created_at: DateTime<Utc>,
116        description: Option<String>,
117        tags: Vec<String>,
118        compatibility_mode: CompatibilityMode,
119    ) -> Self {
120        Self {
121            id,
122            subject,
123            version,
124            schema_definition,
125            created_at,
126            description,
127            tags,
128            compatibility_mode,
129        }
130    }
131
132    // Getters
133
134    pub fn id(&self) -> Uuid {
135        self.id
136    }
137
138    pub fn subject(&self) -> &str {
139        &self.subject
140    }
141
142    pub fn version(&self) -> u32 {
143        self.version
144    }
145
146    pub fn schema_definition(&self) -> &JsonValue {
147        &self.schema_definition
148    }
149
150    pub fn created_at(&self) -> DateTime<Utc> {
151        self.created_at
152    }
153
154    pub fn description(&self) -> Option<&str> {
155        self.description.as_deref()
156    }
157
158    pub fn tags(&self) -> &[String] {
159        &self.tags
160    }
161
162    pub fn compatibility_mode(&self) -> CompatibilityMode {
163        self.compatibility_mode
164    }
165
166    // Domain behavior methods
167
168    /// Add or update description
169    pub fn set_description(&mut self, description: String) -> Result<()> {
170        Self::validate_description(&description)?;
171        self.description = Some(description);
172        Ok(())
173    }
174
175    /// Clear description
176    pub fn clear_description(&mut self) {
177        self.description = None;
178    }
179
180    /// Add a tag
181    pub fn add_tag(&mut self, tag: String) -> Result<()> {
182        Self::validate_tag(&tag)?;
183
184        if self.tags.contains(&tag) {
185            return Err(crate::error::AllSourceError::InvalidInput(format!(
186                "Tag '{}' already exists",
187                tag
188            )));
189        }
190
191        self.tags.push(tag);
192        Ok(())
193    }
194
195    /// Remove a tag
196    pub fn remove_tag(&mut self, tag: &str) -> Result<()> {
197        let initial_len = self.tags.len();
198        self.tags.retain(|t| t != tag);
199
200        if self.tags.len() == initial_len {
201            return Err(crate::error::AllSourceError::InvalidInput(format!(
202                "Tag '{}' not found",
203                tag
204            )));
205        }
206
207        Ok(())
208    }
209
210    /// Check if schema has a specific tag
211    pub fn has_tag(&self, tag: &str) -> bool {
212        self.tags.iter().any(|t| t == tag)
213    }
214
215    /// Check if this schema is the first version
216    pub fn is_first_version(&self) -> bool {
217        self.version == 1
218    }
219
220    /// Check if this schema applies to a subject
221    pub fn applies_to(&self, subject: &str) -> bool {
222        self.subject == subject
223    }
224
225    /// Create next version of schema
226    pub fn create_next_version(&self, new_schema: JsonValue) -> Result<Schema> {
227        Schema::new(
228            self.subject.clone(),
229            self.version + 1,
230            new_schema,
231            self.compatibility_mode,
232        )
233    }
234
235    // Validation methods
236
237    fn validate_subject(subject: &str) -> Result<()> {
238        if subject.is_empty() {
239            return Err(crate::error::AllSourceError::InvalidInput(
240                "Schema subject cannot be empty".to_string(),
241            ));
242        }
243
244        if subject.len() > 256 {
245            return Err(crate::error::AllSourceError::InvalidInput(format!(
246                "Schema subject cannot exceed 256 characters, got {}",
247                subject.len()
248            )));
249        }
250
251        // Subject should follow similar naming as event types
252        if !subject
253            .chars()
254            .all(|c| c.is_lowercase() || c.is_numeric() || c == '.' || c == '_' || c == '-')
255        {
256            return Err(crate::error::AllSourceError::InvalidInput(format!(
257                "Schema subject '{}' must be lowercase with dots, underscores, or hyphens",
258                subject
259            )));
260        }
261
262        Ok(())
263    }
264
265    fn validate_version(version: u32) -> Result<()> {
266        if version == 0 {
267            return Err(crate::error::AllSourceError::InvalidInput(
268                "Schema version must be >= 1".to_string(),
269            ));
270        }
271
272        Ok(())
273    }
274
275    fn validate_schema(schema: &JsonValue) -> Result<()> {
276        if schema.is_null() {
277            return Err(crate::error::AllSourceError::InvalidInput(
278                "Schema definition cannot be null".to_string(),
279            ));
280        }
281
282        // Basic validation: should be an object with "type" property
283        if let Some(obj) = schema.as_object() {
284            if !obj.contains_key("type") && !obj.contains_key("$schema") {
285                return Err(crate::error::AllSourceError::InvalidInput(
286                    "Schema definition should contain 'type' or '$schema' property".to_string(),
287                ));
288            }
289        } else {
290            return Err(crate::error::AllSourceError::InvalidInput(
291                "Schema definition must be a JSON object".to_string(),
292            ));
293        }
294
295        Ok(())
296    }
297
298    fn validate_description(description: &str) -> Result<()> {
299        if description.len() > 1000 {
300            return Err(crate::error::AllSourceError::InvalidInput(format!(
301                "Schema description cannot exceed 1000 characters, got {}",
302                description.len()
303            )));
304        }
305        Ok(())
306    }
307
308    fn validate_tag(tag: &str) -> Result<()> {
309        if tag.is_empty() {
310            return Err(crate::error::AllSourceError::InvalidInput(
311                "Tag cannot be empty".to_string(),
312            ));
313        }
314
315        if tag.len() > 50 {
316            return Err(crate::error::AllSourceError::InvalidInput(format!(
317                "Tag cannot exceed 50 characters, got {}",
318                tag.len()
319            )));
320        }
321
322        if !tag
323            .chars()
324            .all(|c| c.is_alphanumeric() || c == '-' || c == '_')
325        {
326            return Err(crate::error::AllSourceError::InvalidInput(format!(
327                "Tag '{}' must be alphanumeric with hyphens or underscores",
328                tag
329            )));
330        }
331
332        Ok(())
333    }
334}
335
336#[cfg(test)]
337mod tests {
338    use super::*;
339    use serde_json::json;
340
341    fn valid_schema() -> JsonValue {
342        json!({
343            "type": "object",
344            "properties": {
345                "name": { "type": "string" },
346                "age": { "type": "number" }
347            }
348        })
349    }
350
351    #[test]
352    fn test_create_schema() {
353        let schema = Schema::new(
354            "user.created".to_string(),
355            1,
356            valid_schema(),
357            CompatibilityMode::Backward,
358        );
359
360        assert!(schema.is_ok());
361        let schema = schema.unwrap();
362        assert_eq!(schema.subject(), "user.created");
363        assert_eq!(schema.version(), 1);
364        assert_eq!(schema.compatibility_mode(), CompatibilityMode::Backward);
365    }
366
367    #[test]
368    fn test_create_v1_schema() {
369        let schema = Schema::new_v1(
370            "order.placed".to_string(),
371            valid_schema(),
372            CompatibilityMode::Full,
373        );
374
375        assert!(schema.is_ok());
376        let schema = schema.unwrap();
377        assert_eq!(schema.version(), 1);
378        assert!(schema.is_first_version());
379    }
380
381    #[test]
382    fn test_reject_empty_subject() {
383        let result = Schema::new("".to_string(), 1, valid_schema(), CompatibilityMode::None);
384
385        assert!(result.is_err());
386    }
387
388    #[test]
389    fn test_reject_too_long_subject() {
390        let long_subject = "a".repeat(257);
391        let result = Schema::new(long_subject, 1, valid_schema(), CompatibilityMode::None);
392
393        assert!(result.is_err());
394    }
395
396    #[test]
397    fn test_reject_invalid_subject_characters() {
398        let result = Schema::new(
399            "User.Created".to_string(), // Uppercase not allowed
400            1,
401            valid_schema(),
402            CompatibilityMode::None,
403        );
404
405        assert!(result.is_err());
406    }
407
408    #[test]
409    fn test_accept_valid_subjects() {
410        let subjects = vec![
411            "user.created",
412            "order_placed",
413            "payment-processed",
414            "event.v2.updated",
415        ];
416
417        for subject in subjects {
418            let result = Schema::new(
419                subject.to_string(),
420                1,
421                valid_schema(),
422                CompatibilityMode::None,
423            );
424            assert!(result.is_ok(), "Subject '{}' should be valid", subject);
425        }
426    }
427
428    #[test]
429    fn test_reject_zero_version() {
430        let result = Schema::new(
431            "test.event".to_string(),
432            0, // Invalid
433            valid_schema(),
434            CompatibilityMode::None,
435        );
436
437        assert!(result.is_err());
438    }
439
440    #[test]
441    fn test_reject_null_schema() {
442        let result = Schema::new(
443            "test.event".to_string(),
444            1,
445            JsonValue::Null,
446            CompatibilityMode::None,
447        );
448
449        assert!(result.is_err());
450    }
451
452    #[test]
453    fn test_reject_non_object_schema() {
454        let result = Schema::new(
455            "test.event".to_string(),
456            1,
457            json!("not an object"),
458            CompatibilityMode::None,
459        );
460
461        assert!(result.is_err());
462    }
463
464    #[test]
465    fn test_reject_schema_without_type() {
466        let result = Schema::new(
467            "test.event".to_string(),
468            1,
469            json!({"properties": {}}), // Missing "type"
470            CompatibilityMode::None,
471        );
472
473        assert!(result.is_err());
474    }
475
476    #[test]
477    fn test_accept_schema_with_schema_property() {
478        let result = Schema::new(
479            "test.event".to_string(),
480            1,
481            json!({"$schema": "http://json-schema.org/draft-07/schema#"}),
482            CompatibilityMode::None,
483        );
484
485        assert!(result.is_ok());
486    }
487
488    #[test]
489    fn test_set_description() {
490        let mut schema = Schema::new_v1(
491            "test.event".to_string(),
492            valid_schema(),
493            CompatibilityMode::None,
494        )
495        .unwrap();
496
497        assert!(schema.description().is_none());
498
499        let result = schema.set_description("Test schema".to_string());
500        assert!(result.is_ok());
501        assert_eq!(schema.description(), Some("Test schema"));
502    }
503
504    #[test]
505    fn test_reject_too_long_description() {
506        let mut schema = Schema::new_v1(
507            "test.event".to_string(),
508            valid_schema(),
509            CompatibilityMode::None,
510        )
511        .unwrap();
512
513        let long_desc = "a".repeat(1001);
514        let result = schema.set_description(long_desc);
515        assert!(result.is_err());
516    }
517
518    #[test]
519    fn test_clear_description() {
520        let mut schema = Schema::new_v1(
521            "test.event".to_string(),
522            valid_schema(),
523            CompatibilityMode::None,
524        )
525        .unwrap();
526
527        schema.set_description("Test".to_string()).unwrap();
528        assert!(schema.description().is_some());
529
530        schema.clear_description();
531        assert!(schema.description().is_none());
532    }
533
534    #[test]
535    fn test_add_tag() {
536        let mut schema = Schema::new_v1(
537            "test.event".to_string(),
538            valid_schema(),
539            CompatibilityMode::None,
540        )
541        .unwrap();
542
543        assert_eq!(schema.tags().len(), 0);
544
545        let result = schema.add_tag("production".to_string());
546        assert!(result.is_ok());
547        assert_eq!(schema.tags().len(), 1);
548        assert!(schema.has_tag("production"));
549    }
550
551    #[test]
552    fn test_reject_duplicate_tag() {
553        let mut schema = Schema::new_v1(
554            "test.event".to_string(),
555            valid_schema(),
556            CompatibilityMode::None,
557        )
558        .unwrap();
559
560        schema.add_tag("test".to_string()).unwrap();
561        let result = schema.add_tag("test".to_string());
562        assert!(result.is_err());
563    }
564
565    #[test]
566    fn test_remove_tag() {
567        let mut schema = Schema::new_v1(
568            "test.event".to_string(),
569            valid_schema(),
570            CompatibilityMode::None,
571        )
572        .unwrap();
573
574        schema.add_tag("tag1".to_string()).unwrap();
575        schema.add_tag("tag2".to_string()).unwrap();
576
577        let result = schema.remove_tag("tag1");
578        assert!(result.is_ok());
579        assert_eq!(schema.tags().len(), 1);
580        assert!(!schema.has_tag("tag1"));
581        assert!(schema.has_tag("tag2"));
582    }
583
584    #[test]
585    fn test_remove_nonexistent_tag() {
586        let mut schema = Schema::new_v1(
587            "test.event".to_string(),
588            valid_schema(),
589            CompatibilityMode::None,
590        )
591        .unwrap();
592
593        let result = schema.remove_tag("nonexistent");
594        assert!(result.is_err());
595    }
596
597    #[test]
598    fn test_reject_invalid_tags() {
599        let mut schema = Schema::new_v1(
600            "test.event".to_string(),
601            valid_schema(),
602            CompatibilityMode::None,
603        )
604        .unwrap();
605
606        // Empty tag
607        assert!(schema.add_tag("".to_string()).is_err());
608
609        // Too long tag
610        assert!(schema.add_tag("a".repeat(51)).is_err());
611
612        // Invalid characters
613        assert!(schema.add_tag("tag with spaces".to_string()).is_err());
614        assert!(schema.add_tag("tag@invalid".to_string()).is_err());
615    }
616
617    #[test]
618    fn test_accept_valid_tags() {
619        let mut schema = Schema::new_v1(
620            "test.event".to_string(),
621            valid_schema(),
622            CompatibilityMode::None,
623        )
624        .unwrap();
625
626        let valid_tags = vec!["production", "test-env", "v2_schema", "important123"];
627
628        for tag in valid_tags {
629            assert!(schema.add_tag(tag.to_string()).is_ok());
630        }
631    }
632
633    #[test]
634    fn test_is_first_version() {
635        let schema_v1 = Schema::new_v1(
636            "test.event".to_string(),
637            valid_schema(),
638            CompatibilityMode::None,
639        )
640        .unwrap();
641
642        let schema_v2 = Schema::new(
643            "test.event".to_string(),
644            2,
645            valid_schema(),
646            CompatibilityMode::None,
647        )
648        .unwrap();
649
650        assert!(schema_v1.is_first_version());
651        assert!(!schema_v2.is_first_version());
652    }
653
654    #[test]
655    fn test_applies_to() {
656        let schema = Schema::new_v1(
657            "user.created".to_string(),
658            valid_schema(),
659            CompatibilityMode::None,
660        )
661        .unwrap();
662
663        assert!(schema.applies_to("user.created"));
664        assert!(!schema.applies_to("order.placed"));
665    }
666
667    #[test]
668    fn test_create_next_version() {
669        let schema_v1 = Schema::new_v1(
670            "test.event".to_string(),
671            valid_schema(),
672            CompatibilityMode::Backward,
673        )
674        .unwrap();
675
676        let new_schema = json!({
677            "type": "object",
678            "properties": {
679                "name": { "type": "string" },
680                "age": { "type": "number" },
681                "email": { "type": "string" }  // New field
682            }
683        });
684
685        let schema_v2 = schema_v1.create_next_version(new_schema);
686        assert!(schema_v2.is_ok());
687
688        let schema_v2 = schema_v2.unwrap();
689        assert_eq!(schema_v2.version(), 2);
690        assert_eq!(schema_v2.subject(), "test.event");
691        assert_eq!(schema_v2.compatibility_mode(), CompatibilityMode::Backward);
692    }
693
694    #[test]
695    fn test_compatibility_mode_checks() {
696        assert!(CompatibilityMode::Backward.requires_backward());
697        assert!(!CompatibilityMode::Backward.requires_forward());
698
699        assert!(!CompatibilityMode::Forward.requires_backward());
700        assert!(CompatibilityMode::Forward.requires_forward());
701
702        assert!(CompatibilityMode::Full.requires_backward());
703        assert!(CompatibilityMode::Full.requires_forward());
704
705        assert!(!CompatibilityMode::None.requires_backward());
706        assert!(!CompatibilityMode::None.requires_forward());
707        assert!(!CompatibilityMode::None.requires_compatibility());
708    }
709
710    #[test]
711    fn test_serde_serialization() {
712        let schema = Schema::new_v1(
713            "test.event".to_string(),
714            valid_schema(),
715            CompatibilityMode::Backward,
716        )
717        .unwrap();
718
719        // Should be able to serialize
720        let json = serde_json::to_string(&schema);
721        assert!(json.is_ok());
722
723        // Should be able to deserialize
724        let deserialized = serde_json::from_str::<Schema>(&json.unwrap());
725        assert!(deserialized.is_ok());
726    }
727}