Skip to main content

allsource_core/domain/entities/
schema.rs

1use crate::error::Result;
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use serde_json::Value as JsonValue;
5use uuid::Uuid;
6
7/// Compatibility mode for schema evolution
8///
9/// Defines how schema changes are validated when registering new versions.
10#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
11#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
12pub enum CompatibilityMode {
13    /// No compatibility checking
14    None,
15    /// New schema must be backward compatible (can read old data)
16    #[default]
17    Backward,
18    /// New schema must be forward compatible (old readers can read new data)
19    Forward,
20    /// New schema must be both backward and forward compatible
21    Full,
22}
23
24impl CompatibilityMode {
25    /// Check if this mode requires backward compatibility
26    pub fn requires_backward(&self) -> bool {
27        matches!(self, Self::Backward | Self::Full)
28    }
29
30    /// Check if this mode requires forward compatibility
31    pub fn requires_forward(&self) -> bool {
32        matches!(self, Self::Forward | Self::Full)
33    }
34
35    /// Check if any compatibility is required
36    pub fn requires_compatibility(&self) -> bool {
37        !matches!(self, Self::None)
38    }
39}
40
41/// Domain Entity: Schema
42///
43/// Represents a versioned schema definition for event validation.
44/// Schemas define the structure and validation rules for event payloads.
45///
46/// Domain Rules:
47/// - Subject cannot be empty
48/// - Version starts at 1 and increments
49/// - Schema must be valid JSON
50/// - Once registered, schemas are immutable
51/// - New versions must respect compatibility mode
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct Schema {
54    id: Uuid,
55    subject: String,
56    version: u32,
57    schema_definition: JsonValue,
58    created_at: DateTime<Utc>,
59    description: Option<String>,
60    tags: Vec<String>,
61    compatibility_mode: CompatibilityMode,
62}
63
64impl Schema {
65    /// Create a new schema with validation
66    ///
67    /// # Arguments
68    /// * `subject` - The subject/topic this schema applies to (e.g., "order.placed")
69    /// * `version` - Version number (must be >= 1)
70    /// * `schema_definition` - JSON Schema definition
71    /// * `compatibility_mode` - How to validate future schema changes
72    pub fn new(
73        subject: String,
74        version: u32,
75        schema_definition: JsonValue,
76        compatibility_mode: CompatibilityMode,
77    ) -> Result<Self> {
78        Self::validate_subject(&subject)?;
79        Self::validate_version(version)?;
80        Self::validate_schema(&schema_definition)?;
81
82        Ok(Self {
83            id: Uuid::new_v4(),
84            subject,
85            version,
86            schema_definition,
87            created_at: Utc::now(),
88            description: None,
89            tags: Vec::new(),
90            compatibility_mode,
91        })
92    }
93
94    /// Create first version of a schema
95    pub fn new_v1(
96        subject: String,
97        schema_definition: JsonValue,
98        compatibility_mode: CompatibilityMode,
99    ) -> Result<Self> {
100        Self::new(subject, 1, schema_definition, compatibility_mode)
101    }
102
103    /// Reconstruct schema from storage (bypasses validation)
104    #[allow(clippy::too_many_arguments)]
105    pub fn reconstruct(
106        id: Uuid,
107        subject: String,
108        version: u32,
109        schema_definition: JsonValue,
110        created_at: DateTime<Utc>,
111        description: Option<String>,
112        tags: Vec<String>,
113        compatibility_mode: CompatibilityMode,
114    ) -> Self {
115        Self {
116            id,
117            subject,
118            version,
119            schema_definition,
120            created_at,
121            description,
122            tags,
123            compatibility_mode,
124        }
125    }
126
127    // Getters
128
129    pub fn id(&self) -> Uuid {
130        self.id
131    }
132
133    pub fn subject(&self) -> &str {
134        &self.subject
135    }
136
137    pub fn version(&self) -> u32 {
138        self.version
139    }
140
141    pub fn schema_definition(&self) -> &JsonValue {
142        &self.schema_definition
143    }
144
145    pub fn created_at(&self) -> DateTime<Utc> {
146        self.created_at
147    }
148
149    pub fn description(&self) -> Option<&str> {
150        self.description.as_deref()
151    }
152
153    pub fn tags(&self) -> &[String] {
154        &self.tags
155    }
156
157    pub fn compatibility_mode(&self) -> CompatibilityMode {
158        self.compatibility_mode
159    }
160
161    // Domain behavior methods
162
163    /// Add or update description
164    pub fn set_description(&mut self, description: String) -> Result<()> {
165        Self::validate_description(&description)?;
166        self.description = Some(description);
167        Ok(())
168    }
169
170    /// Clear description
171    pub fn clear_description(&mut self) {
172        self.description = None;
173    }
174
175    /// Add a tag
176    pub fn add_tag(&mut self, tag: String) -> Result<()> {
177        Self::validate_tag(&tag)?;
178
179        if self.tags.contains(&tag) {
180            return Err(crate::error::AllSourceError::InvalidInput(format!(
181                "Tag '{}' already exists",
182                tag
183            )));
184        }
185
186        self.tags.push(tag);
187        Ok(())
188    }
189
190    /// Remove a tag
191    pub fn remove_tag(&mut self, tag: &str) -> Result<()> {
192        let initial_len = self.tags.len();
193        self.tags.retain(|t| t != tag);
194
195        if self.tags.len() == initial_len {
196            return Err(crate::error::AllSourceError::InvalidInput(format!(
197                "Tag '{}' not found",
198                tag
199            )));
200        }
201
202        Ok(())
203    }
204
205    /// Check if schema has a specific tag
206    pub fn has_tag(&self, tag: &str) -> bool {
207        self.tags.iter().any(|t| t == tag)
208    }
209
210    /// Check if this schema is the first version
211    pub fn is_first_version(&self) -> bool {
212        self.version == 1
213    }
214
215    /// Check if this schema applies to a subject
216    pub fn applies_to(&self, subject: &str) -> bool {
217        self.subject == subject
218    }
219
220    /// Create next version of schema
221    pub fn create_next_version(&self, new_schema: JsonValue) -> Result<Schema> {
222        Schema::new(
223            self.subject.clone(),
224            self.version + 1,
225            new_schema,
226            self.compatibility_mode,
227        )
228    }
229
230    // Validation methods
231
232    fn validate_subject(subject: &str) -> Result<()> {
233        if subject.is_empty() {
234            return Err(crate::error::AllSourceError::InvalidInput(
235                "Schema subject cannot be empty".to_string(),
236            ));
237        }
238
239        if subject.len() > 256 {
240            return Err(crate::error::AllSourceError::InvalidInput(format!(
241                "Schema subject cannot exceed 256 characters, got {}",
242                subject.len()
243            )));
244        }
245
246        // Subject should follow similar naming as event types
247        if !subject
248            .chars()
249            .all(|c| c.is_lowercase() || c.is_numeric() || c == '.' || c == '_' || c == '-')
250        {
251            return Err(crate::error::AllSourceError::InvalidInput(format!(
252                "Schema subject '{}' must be lowercase with dots, underscores, or hyphens",
253                subject
254            )));
255        }
256
257        Ok(())
258    }
259
260    fn validate_version(version: u32) -> Result<()> {
261        if version == 0 {
262            return Err(crate::error::AllSourceError::InvalidInput(
263                "Schema version must be >= 1".to_string(),
264            ));
265        }
266
267        Ok(())
268    }
269
270    fn validate_schema(schema: &JsonValue) -> Result<()> {
271        if schema.is_null() {
272            return Err(crate::error::AllSourceError::InvalidInput(
273                "Schema definition cannot be null".to_string(),
274            ));
275        }
276
277        // Basic validation: should be an object with "type" property
278        if let Some(obj) = schema.as_object() {
279            if !obj.contains_key("type") && !obj.contains_key("$schema") {
280                return Err(crate::error::AllSourceError::InvalidInput(
281                    "Schema definition should contain 'type' or '$schema' property".to_string(),
282                ));
283            }
284        } else {
285            return Err(crate::error::AllSourceError::InvalidInput(
286                "Schema definition must be a JSON object".to_string(),
287            ));
288        }
289
290        Ok(())
291    }
292
293    fn validate_description(description: &str) -> Result<()> {
294        if description.len() > 1000 {
295            return Err(crate::error::AllSourceError::InvalidInput(format!(
296                "Schema description cannot exceed 1000 characters, got {}",
297                description.len()
298            )));
299        }
300        Ok(())
301    }
302
303    fn validate_tag(tag: &str) -> Result<()> {
304        if tag.is_empty() {
305            return Err(crate::error::AllSourceError::InvalidInput(
306                "Tag cannot be empty".to_string(),
307            ));
308        }
309
310        if tag.len() > 50 {
311            return Err(crate::error::AllSourceError::InvalidInput(format!(
312                "Tag cannot exceed 50 characters, got {}",
313                tag.len()
314            )));
315        }
316
317        if !tag
318            .chars()
319            .all(|c| c.is_alphanumeric() || c == '-' || c == '_')
320        {
321            return Err(crate::error::AllSourceError::InvalidInput(format!(
322                "Tag '{}' must be alphanumeric with hyphens or underscores",
323                tag
324            )));
325        }
326
327        Ok(())
328    }
329}
330
331#[cfg(test)]
332mod tests {
333    use super::*;
334    use serde_json::json;
335
336    fn valid_schema() -> JsonValue {
337        json!({
338            "type": "object",
339            "properties": {
340                "name": { "type": "string" },
341                "age": { "type": "number" }
342            }
343        })
344    }
345
346    #[test]
347    fn test_create_schema() {
348        let schema = Schema::new(
349            "user.created".to_string(),
350            1,
351            valid_schema(),
352            CompatibilityMode::Backward,
353        );
354
355        assert!(schema.is_ok());
356        let schema = schema.unwrap();
357        assert_eq!(schema.subject(), "user.created");
358        assert_eq!(schema.version(), 1);
359        assert_eq!(schema.compatibility_mode(), CompatibilityMode::Backward);
360    }
361
362    #[test]
363    fn test_create_v1_schema() {
364        let schema = Schema::new_v1(
365            "order.placed".to_string(),
366            valid_schema(),
367            CompatibilityMode::Full,
368        );
369
370        assert!(schema.is_ok());
371        let schema = schema.unwrap();
372        assert_eq!(schema.version(), 1);
373        assert!(schema.is_first_version());
374    }
375
376    #[test]
377    fn test_reject_empty_subject() {
378        let result = Schema::new("".to_string(), 1, valid_schema(), CompatibilityMode::None);
379
380        assert!(result.is_err());
381    }
382
383    #[test]
384    fn test_reject_too_long_subject() {
385        let long_subject = "a".repeat(257);
386        let result = Schema::new(long_subject, 1, valid_schema(), CompatibilityMode::None);
387
388        assert!(result.is_err());
389    }
390
391    #[test]
392    fn test_reject_invalid_subject_characters() {
393        let result = Schema::new(
394            "User.Created".to_string(), // Uppercase not allowed
395            1,
396            valid_schema(),
397            CompatibilityMode::None,
398        );
399
400        assert!(result.is_err());
401    }
402
403    #[test]
404    fn test_accept_valid_subjects() {
405        let subjects = vec![
406            "user.created",
407            "order_placed",
408            "payment-processed",
409            "event.v2.updated",
410        ];
411
412        for subject in subjects {
413            let result = Schema::new(
414                subject.to_string(),
415                1,
416                valid_schema(),
417                CompatibilityMode::None,
418            );
419            assert!(result.is_ok(), "Subject '{}' should be valid", subject);
420        }
421    }
422
423    #[test]
424    fn test_reject_zero_version() {
425        let result = Schema::new(
426            "test.event".to_string(),
427            0, // Invalid
428            valid_schema(),
429            CompatibilityMode::None,
430        );
431
432        assert!(result.is_err());
433    }
434
435    #[test]
436    fn test_reject_null_schema() {
437        let result = Schema::new(
438            "test.event".to_string(),
439            1,
440            JsonValue::Null,
441            CompatibilityMode::None,
442        );
443
444        assert!(result.is_err());
445    }
446
447    #[test]
448    fn test_reject_non_object_schema() {
449        let result = Schema::new(
450            "test.event".to_string(),
451            1,
452            json!("not an object"),
453            CompatibilityMode::None,
454        );
455
456        assert!(result.is_err());
457    }
458
459    #[test]
460    fn test_reject_schema_without_type() {
461        let result = Schema::new(
462            "test.event".to_string(),
463            1,
464            json!({"properties": {}}), // Missing "type"
465            CompatibilityMode::None,
466        );
467
468        assert!(result.is_err());
469    }
470
471    #[test]
472    fn test_accept_schema_with_schema_property() {
473        let result = Schema::new(
474            "test.event".to_string(),
475            1,
476            json!({"$schema": "http://json-schema.org/draft-07/schema#"}),
477            CompatibilityMode::None,
478        );
479
480        assert!(result.is_ok());
481    }
482
483    #[test]
484    fn test_set_description() {
485        let mut schema = Schema::new_v1(
486            "test.event".to_string(),
487            valid_schema(),
488            CompatibilityMode::None,
489        )
490        .unwrap();
491
492        assert!(schema.description().is_none());
493
494        let result = schema.set_description("Test schema".to_string());
495        assert!(result.is_ok());
496        assert_eq!(schema.description(), Some("Test schema"));
497    }
498
499    #[test]
500    fn test_reject_too_long_description() {
501        let mut schema = Schema::new_v1(
502            "test.event".to_string(),
503            valid_schema(),
504            CompatibilityMode::None,
505        )
506        .unwrap();
507
508        let long_desc = "a".repeat(1001);
509        let result = schema.set_description(long_desc);
510        assert!(result.is_err());
511    }
512
513    #[test]
514    fn test_clear_description() {
515        let mut schema = Schema::new_v1(
516            "test.event".to_string(),
517            valid_schema(),
518            CompatibilityMode::None,
519        )
520        .unwrap();
521
522        schema.set_description("Test".to_string()).unwrap();
523        assert!(schema.description().is_some());
524
525        schema.clear_description();
526        assert!(schema.description().is_none());
527    }
528
529    #[test]
530    fn test_add_tag() {
531        let mut schema = Schema::new_v1(
532            "test.event".to_string(),
533            valid_schema(),
534            CompatibilityMode::None,
535        )
536        .unwrap();
537
538        assert_eq!(schema.tags().len(), 0);
539
540        let result = schema.add_tag("production".to_string());
541        assert!(result.is_ok());
542        assert_eq!(schema.tags().len(), 1);
543        assert!(schema.has_tag("production"));
544    }
545
546    #[test]
547    fn test_reject_duplicate_tag() {
548        let mut schema = Schema::new_v1(
549            "test.event".to_string(),
550            valid_schema(),
551            CompatibilityMode::None,
552        )
553        .unwrap();
554
555        schema.add_tag("test".to_string()).unwrap();
556        let result = schema.add_tag("test".to_string());
557        assert!(result.is_err());
558    }
559
560    #[test]
561    fn test_remove_tag() {
562        let mut schema = Schema::new_v1(
563            "test.event".to_string(),
564            valid_schema(),
565            CompatibilityMode::None,
566        )
567        .unwrap();
568
569        schema.add_tag("tag1".to_string()).unwrap();
570        schema.add_tag("tag2".to_string()).unwrap();
571
572        let result = schema.remove_tag("tag1");
573        assert!(result.is_ok());
574        assert_eq!(schema.tags().len(), 1);
575        assert!(!schema.has_tag("tag1"));
576        assert!(schema.has_tag("tag2"));
577    }
578
579    #[test]
580    fn test_remove_nonexistent_tag() {
581        let mut schema = Schema::new_v1(
582            "test.event".to_string(),
583            valid_schema(),
584            CompatibilityMode::None,
585        )
586        .unwrap();
587
588        let result = schema.remove_tag("nonexistent");
589        assert!(result.is_err());
590    }
591
592    #[test]
593    fn test_reject_invalid_tags() {
594        let mut schema = Schema::new_v1(
595            "test.event".to_string(),
596            valid_schema(),
597            CompatibilityMode::None,
598        )
599        .unwrap();
600
601        // Empty tag
602        assert!(schema.add_tag("".to_string()).is_err());
603
604        // Too long tag
605        assert!(schema.add_tag("a".repeat(51)).is_err());
606
607        // Invalid characters
608        assert!(schema.add_tag("tag with spaces".to_string()).is_err());
609        assert!(schema.add_tag("tag@invalid".to_string()).is_err());
610    }
611
612    #[test]
613    fn test_accept_valid_tags() {
614        let mut schema = Schema::new_v1(
615            "test.event".to_string(),
616            valid_schema(),
617            CompatibilityMode::None,
618        )
619        .unwrap();
620
621        let valid_tags = vec!["production", "test-env", "v2_schema", "important123"];
622
623        for tag in valid_tags {
624            assert!(schema.add_tag(tag.to_string()).is_ok());
625        }
626    }
627
628    #[test]
629    fn test_is_first_version() {
630        let schema_v1 = Schema::new_v1(
631            "test.event".to_string(),
632            valid_schema(),
633            CompatibilityMode::None,
634        )
635        .unwrap();
636
637        let schema_v2 = Schema::new(
638            "test.event".to_string(),
639            2,
640            valid_schema(),
641            CompatibilityMode::None,
642        )
643        .unwrap();
644
645        assert!(schema_v1.is_first_version());
646        assert!(!schema_v2.is_first_version());
647    }
648
649    #[test]
650    fn test_applies_to() {
651        let schema = Schema::new_v1(
652            "user.created".to_string(),
653            valid_schema(),
654            CompatibilityMode::None,
655        )
656        .unwrap();
657
658        assert!(schema.applies_to("user.created"));
659        assert!(!schema.applies_to("order.placed"));
660    }
661
662    #[test]
663    fn test_create_next_version() {
664        let schema_v1 = Schema::new_v1(
665            "test.event".to_string(),
666            valid_schema(),
667            CompatibilityMode::Backward,
668        )
669        .unwrap();
670
671        let new_schema = json!({
672            "type": "object",
673            "properties": {
674                "name": { "type": "string" },
675                "age": { "type": "number" },
676                "email": { "type": "string" }  // New field
677            }
678        });
679
680        let schema_v2 = schema_v1.create_next_version(new_schema);
681        assert!(schema_v2.is_ok());
682
683        let schema_v2 = schema_v2.unwrap();
684        assert_eq!(schema_v2.version(), 2);
685        assert_eq!(schema_v2.subject(), "test.event");
686        assert_eq!(schema_v2.compatibility_mode(), CompatibilityMode::Backward);
687    }
688
689    #[test]
690    fn test_compatibility_mode_checks() {
691        assert!(CompatibilityMode::Backward.requires_backward());
692        assert!(!CompatibilityMode::Backward.requires_forward());
693
694        assert!(!CompatibilityMode::Forward.requires_backward());
695        assert!(CompatibilityMode::Forward.requires_forward());
696
697        assert!(CompatibilityMode::Full.requires_backward());
698        assert!(CompatibilityMode::Full.requires_forward());
699
700        assert!(!CompatibilityMode::None.requires_backward());
701        assert!(!CompatibilityMode::None.requires_forward());
702        assert!(!CompatibilityMode::None.requires_compatibility());
703    }
704
705    #[test]
706    fn test_serde_serialization() {
707        let schema = Schema::new_v1(
708            "test.event".to_string(),
709            valid_schema(),
710            CompatibilityMode::Backward,
711        )
712        .unwrap();
713
714        // Should be able to serialize
715        let json = serde_json::to_string(&schema);
716        assert!(json.is_ok());
717
718        // Should be able to deserialize
719        let deserialized = serde_json::from_str::<Schema>(&json.unwrap());
720        assert!(deserialized.is_ok());
721    }
722}