1use crate::error::Result;
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use serde_json::Value as JsonValue;
5use uuid::Uuid;
6
7#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
11#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
12pub enum CompatibilityMode {
13 None,
15 #[default]
17 Backward,
18 Forward,
20 Full,
22}
23
24impl CompatibilityMode {
25 pub fn requires_backward(&self) -> bool {
27 matches!(self, Self::Backward | Self::Full)
28 }
29
30 pub fn requires_forward(&self) -> bool {
32 matches!(self, Self::Forward | Self::Full)
33 }
34
35 pub fn requires_compatibility(&self) -> bool {
37 !matches!(self, Self::None)
38 }
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct Schema {
54 id: Uuid,
55 subject: String,
56 version: u32,
57 schema_definition: JsonValue,
58 created_at: DateTime<Utc>,
59 description: Option<String>,
60 tags: Vec<String>,
61 compatibility_mode: CompatibilityMode,
62}
63
64impl Schema {
65 pub fn new(
73 subject: String,
74 version: u32,
75 schema_definition: JsonValue,
76 compatibility_mode: CompatibilityMode,
77 ) -> Result<Self> {
78 Self::validate_subject(&subject)?;
79 Self::validate_version(version)?;
80 Self::validate_schema(&schema_definition)?;
81
82 Ok(Self {
83 id: Uuid::new_v4(),
84 subject,
85 version,
86 schema_definition,
87 created_at: Utc::now(),
88 description: None,
89 tags: Vec::new(),
90 compatibility_mode,
91 })
92 }
93
94 pub fn new_v1(
96 subject: String,
97 schema_definition: JsonValue,
98 compatibility_mode: CompatibilityMode,
99 ) -> Result<Self> {
100 Self::new(subject, 1, schema_definition, compatibility_mode)
101 }
102
103 #[allow(clippy::too_many_arguments)]
105 pub fn reconstruct(
106 id: Uuid,
107 subject: String,
108 version: u32,
109 schema_definition: JsonValue,
110 created_at: DateTime<Utc>,
111 description: Option<String>,
112 tags: Vec<String>,
113 compatibility_mode: CompatibilityMode,
114 ) -> Self {
115 Self {
116 id,
117 subject,
118 version,
119 schema_definition,
120 created_at,
121 description,
122 tags,
123 compatibility_mode,
124 }
125 }
126
127 pub fn id(&self) -> Uuid {
130 self.id
131 }
132
133 pub fn subject(&self) -> &str {
134 &self.subject
135 }
136
137 pub fn version(&self) -> u32 {
138 self.version
139 }
140
141 pub fn schema_definition(&self) -> &JsonValue {
142 &self.schema_definition
143 }
144
145 pub fn created_at(&self) -> DateTime<Utc> {
146 self.created_at
147 }
148
149 pub fn description(&self) -> Option<&str> {
150 self.description.as_deref()
151 }
152
153 pub fn tags(&self) -> &[String] {
154 &self.tags
155 }
156
157 pub fn compatibility_mode(&self) -> CompatibilityMode {
158 self.compatibility_mode
159 }
160
161 pub fn set_description(&mut self, description: String) -> Result<()> {
165 Self::validate_description(&description)?;
166 self.description = Some(description);
167 Ok(())
168 }
169
170 pub fn clear_description(&mut self) {
172 self.description = None;
173 }
174
175 pub fn add_tag(&mut self, tag: String) -> Result<()> {
177 Self::validate_tag(&tag)?;
178
179 if self.tags.contains(&tag) {
180 return Err(crate::error::AllSourceError::InvalidInput(format!(
181 "Tag '{}' already exists",
182 tag
183 )));
184 }
185
186 self.tags.push(tag);
187 Ok(())
188 }
189
190 pub fn remove_tag(&mut self, tag: &str) -> Result<()> {
192 let initial_len = self.tags.len();
193 self.tags.retain(|t| t != tag);
194
195 if self.tags.len() == initial_len {
196 return Err(crate::error::AllSourceError::InvalidInput(format!(
197 "Tag '{}' not found",
198 tag
199 )));
200 }
201
202 Ok(())
203 }
204
205 pub fn has_tag(&self, tag: &str) -> bool {
207 self.tags.iter().any(|t| t == tag)
208 }
209
210 pub fn is_first_version(&self) -> bool {
212 self.version == 1
213 }
214
215 pub fn applies_to(&self, subject: &str) -> bool {
217 self.subject == subject
218 }
219
220 pub fn create_next_version(&self, new_schema: JsonValue) -> Result<Schema> {
222 Schema::new(
223 self.subject.clone(),
224 self.version + 1,
225 new_schema,
226 self.compatibility_mode,
227 )
228 }
229
230 fn validate_subject(subject: &str) -> Result<()> {
233 if subject.is_empty() {
234 return Err(crate::error::AllSourceError::InvalidInput(
235 "Schema subject cannot be empty".to_string(),
236 ));
237 }
238
239 if subject.len() > 256 {
240 return Err(crate::error::AllSourceError::InvalidInput(format!(
241 "Schema subject cannot exceed 256 characters, got {}",
242 subject.len()
243 )));
244 }
245
246 if !subject
248 .chars()
249 .all(|c| c.is_lowercase() || c.is_numeric() || c == '.' || c == '_' || c == '-')
250 {
251 return Err(crate::error::AllSourceError::InvalidInput(format!(
252 "Schema subject '{}' must be lowercase with dots, underscores, or hyphens",
253 subject
254 )));
255 }
256
257 Ok(())
258 }
259
260 fn validate_version(version: u32) -> Result<()> {
261 if version == 0 {
262 return Err(crate::error::AllSourceError::InvalidInput(
263 "Schema version must be >= 1".to_string(),
264 ));
265 }
266
267 Ok(())
268 }
269
270 fn validate_schema(schema: &JsonValue) -> Result<()> {
271 if schema.is_null() {
272 return Err(crate::error::AllSourceError::InvalidInput(
273 "Schema definition cannot be null".to_string(),
274 ));
275 }
276
277 if let Some(obj) = schema.as_object() {
279 if !obj.contains_key("type") && !obj.contains_key("$schema") {
280 return Err(crate::error::AllSourceError::InvalidInput(
281 "Schema definition should contain 'type' or '$schema' property".to_string(),
282 ));
283 }
284 } else {
285 return Err(crate::error::AllSourceError::InvalidInput(
286 "Schema definition must be a JSON object".to_string(),
287 ));
288 }
289
290 Ok(())
291 }
292
293 fn validate_description(description: &str) -> Result<()> {
294 if description.len() > 1000 {
295 return Err(crate::error::AllSourceError::InvalidInput(format!(
296 "Schema description cannot exceed 1000 characters, got {}",
297 description.len()
298 )));
299 }
300 Ok(())
301 }
302
303 fn validate_tag(tag: &str) -> Result<()> {
304 if tag.is_empty() {
305 return Err(crate::error::AllSourceError::InvalidInput(
306 "Tag cannot be empty".to_string(),
307 ));
308 }
309
310 if tag.len() > 50 {
311 return Err(crate::error::AllSourceError::InvalidInput(format!(
312 "Tag cannot exceed 50 characters, got {}",
313 tag.len()
314 )));
315 }
316
317 if !tag
318 .chars()
319 .all(|c| c.is_alphanumeric() || c == '-' || c == '_')
320 {
321 return Err(crate::error::AllSourceError::InvalidInput(format!(
322 "Tag '{}' must be alphanumeric with hyphens or underscores",
323 tag
324 )));
325 }
326
327 Ok(())
328 }
329}
330
331#[cfg(test)]
332mod tests {
333 use super::*;
334 use serde_json::json;
335
336 fn valid_schema() -> JsonValue {
337 json!({
338 "type": "object",
339 "properties": {
340 "name": { "type": "string" },
341 "age": { "type": "number" }
342 }
343 })
344 }
345
346 #[test]
347 fn test_create_schema() {
348 let schema = Schema::new(
349 "user.created".to_string(),
350 1,
351 valid_schema(),
352 CompatibilityMode::Backward,
353 );
354
355 assert!(schema.is_ok());
356 let schema = schema.unwrap();
357 assert_eq!(schema.subject(), "user.created");
358 assert_eq!(schema.version(), 1);
359 assert_eq!(schema.compatibility_mode(), CompatibilityMode::Backward);
360 }
361
362 #[test]
363 fn test_create_v1_schema() {
364 let schema = Schema::new_v1(
365 "order.placed".to_string(),
366 valid_schema(),
367 CompatibilityMode::Full,
368 );
369
370 assert!(schema.is_ok());
371 let schema = schema.unwrap();
372 assert_eq!(schema.version(), 1);
373 assert!(schema.is_first_version());
374 }
375
376 #[test]
377 fn test_reject_empty_subject() {
378 let result = Schema::new("".to_string(), 1, valid_schema(), CompatibilityMode::None);
379
380 assert!(result.is_err());
381 }
382
383 #[test]
384 fn test_reject_too_long_subject() {
385 let long_subject = "a".repeat(257);
386 let result = Schema::new(long_subject, 1, valid_schema(), CompatibilityMode::None);
387
388 assert!(result.is_err());
389 }
390
391 #[test]
392 fn test_reject_invalid_subject_characters() {
393 let result = Schema::new(
394 "User.Created".to_string(), 1,
396 valid_schema(),
397 CompatibilityMode::None,
398 );
399
400 assert!(result.is_err());
401 }
402
403 #[test]
404 fn test_accept_valid_subjects() {
405 let subjects = vec![
406 "user.created",
407 "order_placed",
408 "payment-processed",
409 "event.v2.updated",
410 ];
411
412 for subject in subjects {
413 let result = Schema::new(
414 subject.to_string(),
415 1,
416 valid_schema(),
417 CompatibilityMode::None,
418 );
419 assert!(result.is_ok(), "Subject '{}' should be valid", subject);
420 }
421 }
422
423 #[test]
424 fn test_reject_zero_version() {
425 let result = Schema::new(
426 "test.event".to_string(),
427 0, valid_schema(),
429 CompatibilityMode::None,
430 );
431
432 assert!(result.is_err());
433 }
434
435 #[test]
436 fn test_reject_null_schema() {
437 let result = Schema::new(
438 "test.event".to_string(),
439 1,
440 JsonValue::Null,
441 CompatibilityMode::None,
442 );
443
444 assert!(result.is_err());
445 }
446
447 #[test]
448 fn test_reject_non_object_schema() {
449 let result = Schema::new(
450 "test.event".to_string(),
451 1,
452 json!("not an object"),
453 CompatibilityMode::None,
454 );
455
456 assert!(result.is_err());
457 }
458
459 #[test]
460 fn test_reject_schema_without_type() {
461 let result = Schema::new(
462 "test.event".to_string(),
463 1,
464 json!({"properties": {}}), CompatibilityMode::None,
466 );
467
468 assert!(result.is_err());
469 }
470
471 #[test]
472 fn test_accept_schema_with_schema_property() {
473 let result = Schema::new(
474 "test.event".to_string(),
475 1,
476 json!({"$schema": "http://json-schema.org/draft-07/schema#"}),
477 CompatibilityMode::None,
478 );
479
480 assert!(result.is_ok());
481 }
482
483 #[test]
484 fn test_set_description() {
485 let mut schema = Schema::new_v1(
486 "test.event".to_string(),
487 valid_schema(),
488 CompatibilityMode::None,
489 )
490 .unwrap();
491
492 assert!(schema.description().is_none());
493
494 let result = schema.set_description("Test schema".to_string());
495 assert!(result.is_ok());
496 assert_eq!(schema.description(), Some("Test schema"));
497 }
498
499 #[test]
500 fn test_reject_too_long_description() {
501 let mut schema = Schema::new_v1(
502 "test.event".to_string(),
503 valid_schema(),
504 CompatibilityMode::None,
505 )
506 .unwrap();
507
508 let long_desc = "a".repeat(1001);
509 let result = schema.set_description(long_desc);
510 assert!(result.is_err());
511 }
512
513 #[test]
514 fn test_clear_description() {
515 let mut schema = Schema::new_v1(
516 "test.event".to_string(),
517 valid_schema(),
518 CompatibilityMode::None,
519 )
520 .unwrap();
521
522 schema.set_description("Test".to_string()).unwrap();
523 assert!(schema.description().is_some());
524
525 schema.clear_description();
526 assert!(schema.description().is_none());
527 }
528
529 #[test]
530 fn test_add_tag() {
531 let mut schema = Schema::new_v1(
532 "test.event".to_string(),
533 valid_schema(),
534 CompatibilityMode::None,
535 )
536 .unwrap();
537
538 assert_eq!(schema.tags().len(), 0);
539
540 let result = schema.add_tag("production".to_string());
541 assert!(result.is_ok());
542 assert_eq!(schema.tags().len(), 1);
543 assert!(schema.has_tag("production"));
544 }
545
546 #[test]
547 fn test_reject_duplicate_tag() {
548 let mut schema = Schema::new_v1(
549 "test.event".to_string(),
550 valid_schema(),
551 CompatibilityMode::None,
552 )
553 .unwrap();
554
555 schema.add_tag("test".to_string()).unwrap();
556 let result = schema.add_tag("test".to_string());
557 assert!(result.is_err());
558 }
559
560 #[test]
561 fn test_remove_tag() {
562 let mut schema = Schema::new_v1(
563 "test.event".to_string(),
564 valid_schema(),
565 CompatibilityMode::None,
566 )
567 .unwrap();
568
569 schema.add_tag("tag1".to_string()).unwrap();
570 schema.add_tag("tag2".to_string()).unwrap();
571
572 let result = schema.remove_tag("tag1");
573 assert!(result.is_ok());
574 assert_eq!(schema.tags().len(), 1);
575 assert!(!schema.has_tag("tag1"));
576 assert!(schema.has_tag("tag2"));
577 }
578
579 #[test]
580 fn test_remove_nonexistent_tag() {
581 let mut schema = Schema::new_v1(
582 "test.event".to_string(),
583 valid_schema(),
584 CompatibilityMode::None,
585 )
586 .unwrap();
587
588 let result = schema.remove_tag("nonexistent");
589 assert!(result.is_err());
590 }
591
592 #[test]
593 fn test_reject_invalid_tags() {
594 let mut schema = Schema::new_v1(
595 "test.event".to_string(),
596 valid_schema(),
597 CompatibilityMode::None,
598 )
599 .unwrap();
600
601 assert!(schema.add_tag("".to_string()).is_err());
603
604 assert!(schema.add_tag("a".repeat(51)).is_err());
606
607 assert!(schema.add_tag("tag with spaces".to_string()).is_err());
609 assert!(schema.add_tag("tag@invalid".to_string()).is_err());
610 }
611
612 #[test]
613 fn test_accept_valid_tags() {
614 let mut schema = Schema::new_v1(
615 "test.event".to_string(),
616 valid_schema(),
617 CompatibilityMode::None,
618 )
619 .unwrap();
620
621 let valid_tags = vec!["production", "test-env", "v2_schema", "important123"];
622
623 for tag in valid_tags {
624 assert!(schema.add_tag(tag.to_string()).is_ok());
625 }
626 }
627
628 #[test]
629 fn test_is_first_version() {
630 let schema_v1 = Schema::new_v1(
631 "test.event".to_string(),
632 valid_schema(),
633 CompatibilityMode::None,
634 )
635 .unwrap();
636
637 let schema_v2 = Schema::new(
638 "test.event".to_string(),
639 2,
640 valid_schema(),
641 CompatibilityMode::None,
642 )
643 .unwrap();
644
645 assert!(schema_v1.is_first_version());
646 assert!(!schema_v2.is_first_version());
647 }
648
649 #[test]
650 fn test_applies_to() {
651 let schema = Schema::new_v1(
652 "user.created".to_string(),
653 valid_schema(),
654 CompatibilityMode::None,
655 )
656 .unwrap();
657
658 assert!(schema.applies_to("user.created"));
659 assert!(!schema.applies_to("order.placed"));
660 }
661
662 #[test]
663 fn test_create_next_version() {
664 let schema_v1 = Schema::new_v1(
665 "test.event".to_string(),
666 valid_schema(),
667 CompatibilityMode::Backward,
668 )
669 .unwrap();
670
671 let new_schema = json!({
672 "type": "object",
673 "properties": {
674 "name": { "type": "string" },
675 "age": { "type": "number" },
676 "email": { "type": "string" } }
678 });
679
680 let schema_v2 = schema_v1.create_next_version(new_schema);
681 assert!(schema_v2.is_ok());
682
683 let schema_v2 = schema_v2.unwrap();
684 assert_eq!(schema_v2.version(), 2);
685 assert_eq!(schema_v2.subject(), "test.event");
686 assert_eq!(schema_v2.compatibility_mode(), CompatibilityMode::Backward);
687 }
688
689 #[test]
690 fn test_compatibility_mode_checks() {
691 assert!(CompatibilityMode::Backward.requires_backward());
692 assert!(!CompatibilityMode::Backward.requires_forward());
693
694 assert!(!CompatibilityMode::Forward.requires_backward());
695 assert!(CompatibilityMode::Forward.requires_forward());
696
697 assert!(CompatibilityMode::Full.requires_backward());
698 assert!(CompatibilityMode::Full.requires_forward());
699
700 assert!(!CompatibilityMode::None.requires_backward());
701 assert!(!CompatibilityMode::None.requires_forward());
702 assert!(!CompatibilityMode::None.requires_compatibility());
703 }
704
705 #[test]
706 fn test_serde_serialization() {
707 let schema = Schema::new_v1(
708 "test.event".to_string(),
709 valid_schema(),
710 CompatibilityMode::Backward,
711 )
712 .unwrap();
713
714 let json = serde_json::to_string(&schema);
716 assert!(json.is_ok());
717
718 let deserialized = serde_json::from_str::<Schema>(&json.unwrap());
720 assert!(deserialized.is_ok());
721 }
722}