Skip to main content

allsource_core/domain/entities/
schema.rs

1use crate::error::Result;
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use serde_json::Value as JsonValue;
5use uuid::Uuid;
6
7/// Compatibility mode for schema evolution
8///
9/// Defines how schema changes are validated when registering new versions.
10#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
11#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
12pub enum CompatibilityMode {
13    /// No compatibility checking
14    None,
15    /// New schema must be backward compatible (can read old data)
16    #[default]
17    Backward,
18    /// New schema must be forward compatible (old readers can read new data)
19    Forward,
20    /// New schema must be both backward and forward compatible
21    Full,
22}
23
24impl CompatibilityMode {
25    /// Check if this mode requires backward compatibility
26    pub fn requires_backward(&self) -> bool {
27        matches!(self, Self::Backward | Self::Full)
28    }
29
30    /// Check if this mode requires forward compatibility
31    pub fn requires_forward(&self) -> bool {
32        matches!(self, Self::Forward | Self::Full)
33    }
34
35    /// Check if any compatibility is required
36    pub fn requires_compatibility(&self) -> bool {
37        !matches!(self, Self::None)
38    }
39}
40
41/// Domain Entity: Schema
42///
43/// Represents a versioned schema definition for event validation.
44/// Schemas define the structure and validation rules for event payloads.
45///
46/// Domain Rules:
47/// - Subject cannot be empty
48/// - Version starts at 1 and increments
49/// - Schema must be valid JSON
50/// - Once registered, schemas are immutable
51/// - New versions must respect compatibility mode
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct Schema {
54    id: Uuid,
55    subject: String,
56    version: u32,
57    schema_definition: JsonValue,
58    created_at: DateTime<Utc>,
59    description: Option<String>,
60    tags: Vec<String>,
61    compatibility_mode: CompatibilityMode,
62}
63
64impl Schema {
65    /// Create a new schema with validation
66    ///
67    /// # Arguments
68    /// * `subject` - The subject/topic this schema applies to (e.g., "order.placed")
69    /// * `version` - Version number (must be >= 1)
70    /// * `schema_definition` - JSON Schema definition
71    /// * `compatibility_mode` - How to validate future schema changes
72    pub fn new(
73        subject: String,
74        version: u32,
75        schema_definition: JsonValue,
76        compatibility_mode: CompatibilityMode,
77    ) -> Result<Self> {
78        Self::validate_subject(&subject)?;
79        Self::validate_version(version)?;
80        Self::validate_schema(&schema_definition)?;
81
82        Ok(Self {
83            id: Uuid::new_v4(),
84            subject,
85            version,
86            schema_definition,
87            created_at: Utc::now(),
88            description: None,
89            tags: Vec::new(),
90            compatibility_mode,
91        })
92    }
93
94    /// Create first version of a schema
95    pub fn new_v1(
96        subject: String,
97        schema_definition: JsonValue,
98        compatibility_mode: CompatibilityMode,
99    ) -> Result<Self> {
100        Self::new(subject, 1, schema_definition, compatibility_mode)
101    }
102
103    /// Reconstruct schema from storage (bypasses validation)
104    #[allow(clippy::too_many_arguments)]
105    pub fn reconstruct(
106        id: Uuid,
107        subject: String,
108        version: u32,
109        schema_definition: JsonValue,
110        created_at: DateTime<Utc>,
111        description: Option<String>,
112        tags: Vec<String>,
113        compatibility_mode: CompatibilityMode,
114    ) -> Self {
115        Self {
116            id,
117            subject,
118            version,
119            schema_definition,
120            created_at,
121            description,
122            tags,
123            compatibility_mode,
124        }
125    }
126
127    // Getters
128
129    pub fn id(&self) -> Uuid {
130        self.id
131    }
132
133    pub fn subject(&self) -> &str {
134        &self.subject
135    }
136
137    pub fn version(&self) -> u32 {
138        self.version
139    }
140
141    pub fn schema_definition(&self) -> &JsonValue {
142        &self.schema_definition
143    }
144
145    pub fn created_at(&self) -> DateTime<Utc> {
146        self.created_at
147    }
148
149    pub fn description(&self) -> Option<&str> {
150        self.description.as_deref()
151    }
152
153    pub fn tags(&self) -> &[String] {
154        &self.tags
155    }
156
157    pub fn compatibility_mode(&self) -> CompatibilityMode {
158        self.compatibility_mode
159    }
160
161    // Domain behavior methods
162
163    /// Add or update description
164    pub fn set_description(&mut self, description: String) -> Result<()> {
165        Self::validate_description(&description)?;
166        self.description = Some(description);
167        Ok(())
168    }
169
170    /// Clear description
171    pub fn clear_description(&mut self) {
172        self.description = None;
173    }
174
175    /// Add a tag
176    pub fn add_tag(&mut self, tag: String) -> Result<()> {
177        Self::validate_tag(&tag)?;
178
179        if self.tags.contains(&tag) {
180            return Err(crate::error::AllSourceError::InvalidInput(format!(
181                "Tag '{tag}' already exists"
182            )));
183        }
184
185        self.tags.push(tag);
186        Ok(())
187    }
188
189    /// Remove a tag
190    pub fn remove_tag(&mut self, tag: &str) -> Result<()> {
191        let initial_len = self.tags.len();
192        self.tags.retain(|t| t != tag);
193
194        if self.tags.len() == initial_len {
195            return Err(crate::error::AllSourceError::InvalidInput(format!(
196                "Tag '{tag}' not found"
197            )));
198        }
199
200        Ok(())
201    }
202
203    /// Check if schema has a specific tag
204    pub fn has_tag(&self, tag: &str) -> bool {
205        self.tags.iter().any(|t| t == tag)
206    }
207
208    /// Check if this schema is the first version
209    pub fn is_first_version(&self) -> bool {
210        self.version == 1
211    }
212
213    /// Check if this schema applies to a subject
214    pub fn applies_to(&self, subject: &str) -> bool {
215        self.subject == subject
216    }
217
218    /// Create next version of schema
219    pub fn create_next_version(&self, new_schema: JsonValue) -> Result<Schema> {
220        Schema::new(
221            self.subject.clone(),
222            self.version + 1,
223            new_schema,
224            self.compatibility_mode,
225        )
226    }
227
228    // Validation methods
229
230    fn validate_subject(subject: &str) -> Result<()> {
231        if subject.is_empty() {
232            return Err(crate::error::AllSourceError::InvalidInput(
233                "Schema subject cannot be empty".to_string(),
234            ));
235        }
236
237        if subject.len() > 256 {
238            return Err(crate::error::AllSourceError::InvalidInput(format!(
239                "Schema subject cannot exceed 256 characters, got {}",
240                subject.len()
241            )));
242        }
243
244        // Subject should follow similar naming as event types
245        if !subject
246            .chars()
247            .all(|c| c.is_lowercase() || c.is_numeric() || c == '.' || c == '_' || c == '-')
248        {
249            return Err(crate::error::AllSourceError::InvalidInput(format!(
250                "Schema subject '{subject}' must be lowercase with dots, underscores, or hyphens"
251            )));
252        }
253
254        Ok(())
255    }
256
257    fn validate_version(version: u32) -> Result<()> {
258        if version == 0 {
259            return Err(crate::error::AllSourceError::InvalidInput(
260                "Schema version must be >= 1".to_string(),
261            ));
262        }
263
264        Ok(())
265    }
266
267    fn validate_schema(schema: &JsonValue) -> Result<()> {
268        if schema.is_null() {
269            return Err(crate::error::AllSourceError::InvalidInput(
270                "Schema definition cannot be null".to_string(),
271            ));
272        }
273
274        // Basic validation: should be an object with "type" property
275        if let Some(obj) = schema.as_object() {
276            if !obj.contains_key("type") && !obj.contains_key("$schema") {
277                return Err(crate::error::AllSourceError::InvalidInput(
278                    "Schema definition should contain 'type' or '$schema' property".to_string(),
279                ));
280            }
281        } else {
282            return Err(crate::error::AllSourceError::InvalidInput(
283                "Schema definition must be a JSON object".to_string(),
284            ));
285        }
286
287        Ok(())
288    }
289
290    fn validate_description(description: &str) -> Result<()> {
291        if description.len() > 1000 {
292            return Err(crate::error::AllSourceError::InvalidInput(format!(
293                "Schema description cannot exceed 1000 characters, got {}",
294                description.len()
295            )));
296        }
297        Ok(())
298    }
299
300    fn validate_tag(tag: &str) -> Result<()> {
301        if tag.is_empty() {
302            return Err(crate::error::AllSourceError::InvalidInput(
303                "Tag cannot be empty".to_string(),
304            ));
305        }
306
307        if tag.len() > 50 {
308            return Err(crate::error::AllSourceError::InvalidInput(format!(
309                "Tag cannot exceed 50 characters, got {}",
310                tag.len()
311            )));
312        }
313
314        if !tag
315            .chars()
316            .all(|c| c.is_alphanumeric() || c == '-' || c == '_')
317        {
318            return Err(crate::error::AllSourceError::InvalidInput(format!(
319                "Tag '{tag}' must be alphanumeric with hyphens or underscores"
320            )));
321        }
322
323        Ok(())
324    }
325}
326
327#[cfg(test)]
328mod tests {
329    use super::*;
330    use serde_json::json;
331
332    fn valid_schema() -> JsonValue {
333        json!({
334            "type": "object",
335            "properties": {
336                "name": { "type": "string" },
337                "age": { "type": "number" }
338            }
339        })
340    }
341
342    #[test]
343    fn test_create_schema() {
344        let schema = Schema::new(
345            "user.created".to_string(),
346            1,
347            valid_schema(),
348            CompatibilityMode::Backward,
349        );
350
351        assert!(schema.is_ok());
352        let schema = schema.unwrap();
353        assert_eq!(schema.subject(), "user.created");
354        assert_eq!(schema.version(), 1);
355        assert_eq!(schema.compatibility_mode(), CompatibilityMode::Backward);
356    }
357
358    #[test]
359    fn test_create_v1_schema() {
360        let schema = Schema::new_v1(
361            "order.placed".to_string(),
362            valid_schema(),
363            CompatibilityMode::Full,
364        );
365
366        assert!(schema.is_ok());
367        let schema = schema.unwrap();
368        assert_eq!(schema.version(), 1);
369        assert!(schema.is_first_version());
370    }
371
372    #[test]
373    fn test_reject_empty_subject() {
374        let result = Schema::new(String::new(), 1, valid_schema(), CompatibilityMode::None);
375
376        assert!(result.is_err());
377    }
378
379    #[test]
380    fn test_reject_too_long_subject() {
381        let long_subject = "a".repeat(257);
382        let result = Schema::new(long_subject, 1, valid_schema(), CompatibilityMode::None);
383
384        assert!(result.is_err());
385    }
386
387    #[test]
388    fn test_reject_invalid_subject_characters() {
389        let result = Schema::new(
390            "User.Created".to_string(), // Uppercase not allowed
391            1,
392            valid_schema(),
393            CompatibilityMode::None,
394        );
395
396        assert!(result.is_err());
397    }
398
399    #[test]
400    fn test_accept_valid_subjects() {
401        let subjects = vec![
402            "user.created",
403            "order_placed",
404            "payment-processed",
405            "event.v2.updated",
406        ];
407
408        for subject in subjects {
409            let result = Schema::new(
410                subject.to_string(),
411                1,
412                valid_schema(),
413                CompatibilityMode::None,
414            );
415            assert!(result.is_ok(), "Subject '{subject}' should be valid");
416        }
417    }
418
419    #[test]
420    fn test_reject_zero_version() {
421        let result = Schema::new(
422            "test.event".to_string(),
423            0, // Invalid
424            valid_schema(),
425            CompatibilityMode::None,
426        );
427
428        assert!(result.is_err());
429    }
430
431    #[test]
432    fn test_reject_null_schema() {
433        let result = Schema::new(
434            "test.event".to_string(),
435            1,
436            JsonValue::Null,
437            CompatibilityMode::None,
438        );
439
440        assert!(result.is_err());
441    }
442
443    #[test]
444    fn test_reject_non_object_schema() {
445        let result = Schema::new(
446            "test.event".to_string(),
447            1,
448            json!("not an object"),
449            CompatibilityMode::None,
450        );
451
452        assert!(result.is_err());
453    }
454
455    #[test]
456    fn test_reject_schema_without_type() {
457        let result = Schema::new(
458            "test.event".to_string(),
459            1,
460            json!({"properties": {}}), // Missing "type"
461            CompatibilityMode::None,
462        );
463
464        assert!(result.is_err());
465    }
466
467    #[test]
468    fn test_accept_schema_with_schema_property() {
469        let result = Schema::new(
470            "test.event".to_string(),
471            1,
472            json!({"$schema": "http://json-schema.org/draft-07/schema#"}),
473            CompatibilityMode::None,
474        );
475
476        assert!(result.is_ok());
477    }
478
479    #[test]
480    fn test_set_description() {
481        let mut schema = Schema::new_v1(
482            "test.event".to_string(),
483            valid_schema(),
484            CompatibilityMode::None,
485        )
486        .unwrap();
487
488        assert!(schema.description().is_none());
489
490        let result = schema.set_description("Test schema".to_string());
491        assert!(result.is_ok());
492        assert_eq!(schema.description(), Some("Test schema"));
493    }
494
495    #[test]
496    fn test_reject_too_long_description() {
497        let mut schema = Schema::new_v1(
498            "test.event".to_string(),
499            valid_schema(),
500            CompatibilityMode::None,
501        )
502        .unwrap();
503
504        let long_desc = "a".repeat(1001);
505        let result = schema.set_description(long_desc);
506        assert!(result.is_err());
507    }
508
509    #[test]
510    fn test_clear_description() {
511        let mut schema = Schema::new_v1(
512            "test.event".to_string(),
513            valid_schema(),
514            CompatibilityMode::None,
515        )
516        .unwrap();
517
518        schema.set_description("Test".to_string()).unwrap();
519        assert!(schema.description().is_some());
520
521        schema.clear_description();
522        assert!(schema.description().is_none());
523    }
524
525    #[test]
526    fn test_add_tag() {
527        let mut schema = Schema::new_v1(
528            "test.event".to_string(),
529            valid_schema(),
530            CompatibilityMode::None,
531        )
532        .unwrap();
533
534        assert_eq!(schema.tags().len(), 0);
535
536        let result = schema.add_tag("production".to_string());
537        assert!(result.is_ok());
538        assert_eq!(schema.tags().len(), 1);
539        assert!(schema.has_tag("production"));
540    }
541
542    #[test]
543    fn test_reject_duplicate_tag() {
544        let mut schema = Schema::new_v1(
545            "test.event".to_string(),
546            valid_schema(),
547            CompatibilityMode::None,
548        )
549        .unwrap();
550
551        schema.add_tag("test".to_string()).unwrap();
552        let result = schema.add_tag("test".to_string());
553        assert!(result.is_err());
554    }
555
556    #[test]
557    fn test_remove_tag() {
558        let mut schema = Schema::new_v1(
559            "test.event".to_string(),
560            valid_schema(),
561            CompatibilityMode::None,
562        )
563        .unwrap();
564
565        schema.add_tag("tag1".to_string()).unwrap();
566        schema.add_tag("tag2".to_string()).unwrap();
567
568        let result = schema.remove_tag("tag1");
569        assert!(result.is_ok());
570        assert_eq!(schema.tags().len(), 1);
571        assert!(!schema.has_tag("tag1"));
572        assert!(schema.has_tag("tag2"));
573    }
574
575    #[test]
576    fn test_remove_nonexistent_tag() {
577        let mut schema = Schema::new_v1(
578            "test.event".to_string(),
579            valid_schema(),
580            CompatibilityMode::None,
581        )
582        .unwrap();
583
584        let result = schema.remove_tag("nonexistent");
585        assert!(result.is_err());
586    }
587
588    #[test]
589    fn test_reject_invalid_tags() {
590        let mut schema = Schema::new_v1(
591            "test.event".to_string(),
592            valid_schema(),
593            CompatibilityMode::None,
594        )
595        .unwrap();
596
597        // Empty tag
598        assert!(schema.add_tag(String::new()).is_err());
599
600        // Too long tag
601        assert!(schema.add_tag("a".repeat(51)).is_err());
602
603        // Invalid characters
604        assert!(schema.add_tag("tag with spaces".to_string()).is_err());
605        assert!(schema.add_tag("tag@invalid".to_string()).is_err());
606    }
607
608    #[test]
609    fn test_accept_valid_tags() {
610        let mut schema = Schema::new_v1(
611            "test.event".to_string(),
612            valid_schema(),
613            CompatibilityMode::None,
614        )
615        .unwrap();
616
617        let valid_tags = vec!["production", "test-env", "v2_schema", "important123"];
618
619        for tag in valid_tags {
620            assert!(schema.add_tag(tag.to_string()).is_ok());
621        }
622    }
623
624    #[test]
625    fn test_is_first_version() {
626        let schema_v1 = Schema::new_v1(
627            "test.event".to_string(),
628            valid_schema(),
629            CompatibilityMode::None,
630        )
631        .unwrap();
632
633        let schema_v2 = Schema::new(
634            "test.event".to_string(),
635            2,
636            valid_schema(),
637            CompatibilityMode::None,
638        )
639        .unwrap();
640
641        assert!(schema_v1.is_first_version());
642        assert!(!schema_v2.is_first_version());
643    }
644
645    #[test]
646    fn test_applies_to() {
647        let schema = Schema::new_v1(
648            "user.created".to_string(),
649            valid_schema(),
650            CompatibilityMode::None,
651        )
652        .unwrap();
653
654        assert!(schema.applies_to("user.created"));
655        assert!(!schema.applies_to("order.placed"));
656    }
657
658    #[test]
659    fn test_create_next_version() {
660        let schema_v1 = Schema::new_v1(
661            "test.event".to_string(),
662            valid_schema(),
663            CompatibilityMode::Backward,
664        )
665        .unwrap();
666
667        let new_schema = json!({
668            "type": "object",
669            "properties": {
670                "name": { "type": "string" },
671                "age": { "type": "number" },
672                "email": { "type": "string" }  // New field
673            }
674        });
675
676        let schema_v2 = schema_v1.create_next_version(new_schema);
677        assert!(schema_v2.is_ok());
678
679        let schema_v2 = schema_v2.unwrap();
680        assert_eq!(schema_v2.version(), 2);
681        assert_eq!(schema_v2.subject(), "test.event");
682        assert_eq!(schema_v2.compatibility_mode(), CompatibilityMode::Backward);
683    }
684
685    #[test]
686    fn test_compatibility_mode_checks() {
687        assert!(CompatibilityMode::Backward.requires_backward());
688        assert!(!CompatibilityMode::Backward.requires_forward());
689
690        assert!(!CompatibilityMode::Forward.requires_backward());
691        assert!(CompatibilityMode::Forward.requires_forward());
692
693        assert!(CompatibilityMode::Full.requires_backward());
694        assert!(CompatibilityMode::Full.requires_forward());
695
696        assert!(!CompatibilityMode::None.requires_backward());
697        assert!(!CompatibilityMode::None.requires_forward());
698        assert!(!CompatibilityMode::None.requires_compatibility());
699    }
700
701    #[test]
702    fn test_serde_serialization() {
703        let schema = Schema::new_v1(
704            "test.event".to_string(),
705            valid_schema(),
706            CompatibilityMode::Backward,
707        )
708        .unwrap();
709
710        // Should be able to serialize
711        let json = serde_json::to_string(&schema);
712        assert!(json.is_ok());
713
714        // Should be able to deserialize
715        let deserialized = serde_json::from_str::<Schema>(&json.unwrap());
716        assert!(deserialized.is_ok());
717    }
718}