1use crate::error::Result;
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use serde_json::Value as JsonValue;
5use uuid::Uuid;
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
11#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
12pub enum CompatibilityMode {
13 None,
15 Backward,
17 Forward,
19 Full,
21}
22
23impl Default for CompatibilityMode {
24 fn default() -> Self {
25 Self::Backward
26 }
27}
28
29impl CompatibilityMode {
30 pub fn requires_backward(&self) -> bool {
32 matches!(self, Self::Backward | Self::Full)
33 }
34
35 pub fn requires_forward(&self) -> bool {
37 matches!(self, Self::Forward | Self::Full)
38 }
39
40 pub fn requires_compatibility(&self) -> bool {
42 !matches!(self, Self::None)
43 }
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct Schema {
59 id: Uuid,
60 subject: String,
61 version: u32,
62 schema_definition: JsonValue,
63 created_at: DateTime<Utc>,
64 description: Option<String>,
65 tags: Vec<String>,
66 compatibility_mode: CompatibilityMode,
67}
68
69impl Schema {
70 pub fn new(
78 subject: String,
79 version: u32,
80 schema_definition: JsonValue,
81 compatibility_mode: CompatibilityMode,
82 ) -> Result<Self> {
83 Self::validate_subject(&subject)?;
84 Self::validate_version(version)?;
85 Self::validate_schema(&schema_definition)?;
86
87 Ok(Self {
88 id: Uuid::new_v4(),
89 subject,
90 version,
91 schema_definition,
92 created_at: Utc::now(),
93 description: None,
94 tags: Vec::new(),
95 compatibility_mode,
96 })
97 }
98
99 pub fn new_v1(
101 subject: String,
102 schema_definition: JsonValue,
103 compatibility_mode: CompatibilityMode,
104 ) -> Result<Self> {
105 Self::new(subject, 1, schema_definition, compatibility_mode)
106 }
107
108 #[allow(clippy::too_many_arguments)]
110 pub fn reconstruct(
111 id: Uuid,
112 subject: String,
113 version: u32,
114 schema_definition: JsonValue,
115 created_at: DateTime<Utc>,
116 description: Option<String>,
117 tags: Vec<String>,
118 compatibility_mode: CompatibilityMode,
119 ) -> Self {
120 Self {
121 id,
122 subject,
123 version,
124 schema_definition,
125 created_at,
126 description,
127 tags,
128 compatibility_mode,
129 }
130 }
131
132 pub fn id(&self) -> Uuid {
135 self.id
136 }
137
138 pub fn subject(&self) -> &str {
139 &self.subject
140 }
141
142 pub fn version(&self) -> u32 {
143 self.version
144 }
145
146 pub fn schema_definition(&self) -> &JsonValue {
147 &self.schema_definition
148 }
149
150 pub fn created_at(&self) -> DateTime<Utc> {
151 self.created_at
152 }
153
154 pub fn description(&self) -> Option<&str> {
155 self.description.as_deref()
156 }
157
158 pub fn tags(&self) -> &[String] {
159 &self.tags
160 }
161
162 pub fn compatibility_mode(&self) -> CompatibilityMode {
163 self.compatibility_mode
164 }
165
166 pub fn set_description(&mut self, description: String) -> Result<()> {
170 Self::validate_description(&description)?;
171 self.description = Some(description);
172 Ok(())
173 }
174
175 pub fn clear_description(&mut self) {
177 self.description = None;
178 }
179
180 pub fn add_tag(&mut self, tag: String) -> Result<()> {
182 Self::validate_tag(&tag)?;
183
184 if self.tags.contains(&tag) {
185 return Err(crate::error::AllSourceError::InvalidInput(format!(
186 "Tag '{}' already exists",
187 tag
188 )));
189 }
190
191 self.tags.push(tag);
192 Ok(())
193 }
194
195 pub fn remove_tag(&mut self, tag: &str) -> Result<()> {
197 let initial_len = self.tags.len();
198 self.tags.retain(|t| t != tag);
199
200 if self.tags.len() == initial_len {
201 return Err(crate::error::AllSourceError::InvalidInput(format!(
202 "Tag '{}' not found",
203 tag
204 )));
205 }
206
207 Ok(())
208 }
209
210 pub fn has_tag(&self, tag: &str) -> bool {
212 self.tags.iter().any(|t| t == tag)
213 }
214
215 pub fn is_first_version(&self) -> bool {
217 self.version == 1
218 }
219
220 pub fn applies_to(&self, subject: &str) -> bool {
222 self.subject == subject
223 }
224
225 pub fn create_next_version(&self, new_schema: JsonValue) -> Result<Schema> {
227 Schema::new(
228 self.subject.clone(),
229 self.version + 1,
230 new_schema,
231 self.compatibility_mode,
232 )
233 }
234
235 fn validate_subject(subject: &str) -> Result<()> {
238 if subject.is_empty() {
239 return Err(crate::error::AllSourceError::InvalidInput(
240 "Schema subject cannot be empty".to_string(),
241 ));
242 }
243
244 if subject.len() > 256 {
245 return Err(crate::error::AllSourceError::InvalidInput(format!(
246 "Schema subject cannot exceed 256 characters, got {}",
247 subject.len()
248 )));
249 }
250
251 if !subject
253 .chars()
254 .all(|c| c.is_lowercase() || c.is_numeric() || c == '.' || c == '_' || c == '-')
255 {
256 return Err(crate::error::AllSourceError::InvalidInput(format!(
257 "Schema subject '{}' must be lowercase with dots, underscores, or hyphens",
258 subject
259 )));
260 }
261
262 Ok(())
263 }
264
265 fn validate_version(version: u32) -> Result<()> {
266 if version == 0 {
267 return Err(crate::error::AllSourceError::InvalidInput(
268 "Schema version must be >= 1".to_string(),
269 ));
270 }
271
272 Ok(())
273 }
274
275 fn validate_schema(schema: &JsonValue) -> Result<()> {
276 if schema.is_null() {
277 return Err(crate::error::AllSourceError::InvalidInput(
278 "Schema definition cannot be null".to_string(),
279 ));
280 }
281
282 if let Some(obj) = schema.as_object() {
284 if !obj.contains_key("type") && !obj.contains_key("$schema") {
285 return Err(crate::error::AllSourceError::InvalidInput(
286 "Schema definition should contain 'type' or '$schema' property".to_string(),
287 ));
288 }
289 } else {
290 return Err(crate::error::AllSourceError::InvalidInput(
291 "Schema definition must be a JSON object".to_string(),
292 ));
293 }
294
295 Ok(())
296 }
297
298 fn validate_description(description: &str) -> Result<()> {
299 if description.len() > 1000 {
300 return Err(crate::error::AllSourceError::InvalidInput(format!(
301 "Schema description cannot exceed 1000 characters, got {}",
302 description.len()
303 )));
304 }
305 Ok(())
306 }
307
308 fn validate_tag(tag: &str) -> Result<()> {
309 if tag.is_empty() {
310 return Err(crate::error::AllSourceError::InvalidInput(
311 "Tag cannot be empty".to_string(),
312 ));
313 }
314
315 if tag.len() > 50 {
316 return Err(crate::error::AllSourceError::InvalidInput(format!(
317 "Tag cannot exceed 50 characters, got {}",
318 tag.len()
319 )));
320 }
321
322 if !tag
323 .chars()
324 .all(|c| c.is_alphanumeric() || c == '-' || c == '_')
325 {
326 return Err(crate::error::AllSourceError::InvalidInput(format!(
327 "Tag '{}' must be alphanumeric with hyphens or underscores",
328 tag
329 )));
330 }
331
332 Ok(())
333 }
334}
335
336#[cfg(test)]
337mod tests {
338 use super::*;
339 use serde_json::json;
340
341 fn valid_schema() -> JsonValue {
342 json!({
343 "type": "object",
344 "properties": {
345 "name": { "type": "string" },
346 "age": { "type": "number" }
347 }
348 })
349 }
350
351 #[test]
352 fn test_create_schema() {
353 let schema = Schema::new(
354 "user.created".to_string(),
355 1,
356 valid_schema(),
357 CompatibilityMode::Backward,
358 );
359
360 assert!(schema.is_ok());
361 let schema = schema.unwrap();
362 assert_eq!(schema.subject(), "user.created");
363 assert_eq!(schema.version(), 1);
364 assert_eq!(schema.compatibility_mode(), CompatibilityMode::Backward);
365 }
366
367 #[test]
368 fn test_create_v1_schema() {
369 let schema = Schema::new_v1(
370 "order.placed".to_string(),
371 valid_schema(),
372 CompatibilityMode::Full,
373 );
374
375 assert!(schema.is_ok());
376 let schema = schema.unwrap();
377 assert_eq!(schema.version(), 1);
378 assert!(schema.is_first_version());
379 }
380
381 #[test]
382 fn test_reject_empty_subject() {
383 let result = Schema::new("".to_string(), 1, valid_schema(), CompatibilityMode::None);
384
385 assert!(result.is_err());
386 }
387
388 #[test]
389 fn test_reject_too_long_subject() {
390 let long_subject = "a".repeat(257);
391 let result = Schema::new(long_subject, 1, valid_schema(), CompatibilityMode::None);
392
393 assert!(result.is_err());
394 }
395
396 #[test]
397 fn test_reject_invalid_subject_characters() {
398 let result = Schema::new(
399 "User.Created".to_string(), 1,
401 valid_schema(),
402 CompatibilityMode::None,
403 );
404
405 assert!(result.is_err());
406 }
407
408 #[test]
409 fn test_accept_valid_subjects() {
410 let subjects = vec![
411 "user.created",
412 "order_placed",
413 "payment-processed",
414 "event.v2.updated",
415 ];
416
417 for subject in subjects {
418 let result = Schema::new(
419 subject.to_string(),
420 1,
421 valid_schema(),
422 CompatibilityMode::None,
423 );
424 assert!(result.is_ok(), "Subject '{}' should be valid", subject);
425 }
426 }
427
428 #[test]
429 fn test_reject_zero_version() {
430 let result = Schema::new(
431 "test.event".to_string(),
432 0, valid_schema(),
434 CompatibilityMode::None,
435 );
436
437 assert!(result.is_err());
438 }
439
440 #[test]
441 fn test_reject_null_schema() {
442 let result = Schema::new(
443 "test.event".to_string(),
444 1,
445 JsonValue::Null,
446 CompatibilityMode::None,
447 );
448
449 assert!(result.is_err());
450 }
451
452 #[test]
453 fn test_reject_non_object_schema() {
454 let result = Schema::new(
455 "test.event".to_string(),
456 1,
457 json!("not an object"),
458 CompatibilityMode::None,
459 );
460
461 assert!(result.is_err());
462 }
463
464 #[test]
465 fn test_reject_schema_without_type() {
466 let result = Schema::new(
467 "test.event".to_string(),
468 1,
469 json!({"properties": {}}), CompatibilityMode::None,
471 );
472
473 assert!(result.is_err());
474 }
475
476 #[test]
477 fn test_accept_schema_with_schema_property() {
478 let result = Schema::new(
479 "test.event".to_string(),
480 1,
481 json!({"$schema": "http://json-schema.org/draft-07/schema#"}),
482 CompatibilityMode::None,
483 );
484
485 assert!(result.is_ok());
486 }
487
488 #[test]
489 fn test_set_description() {
490 let mut schema = Schema::new_v1(
491 "test.event".to_string(),
492 valid_schema(),
493 CompatibilityMode::None,
494 )
495 .unwrap();
496
497 assert!(schema.description().is_none());
498
499 let result = schema.set_description("Test schema".to_string());
500 assert!(result.is_ok());
501 assert_eq!(schema.description(), Some("Test schema"));
502 }
503
504 #[test]
505 fn test_reject_too_long_description() {
506 let mut schema = Schema::new_v1(
507 "test.event".to_string(),
508 valid_schema(),
509 CompatibilityMode::None,
510 )
511 .unwrap();
512
513 let long_desc = "a".repeat(1001);
514 let result = schema.set_description(long_desc);
515 assert!(result.is_err());
516 }
517
518 #[test]
519 fn test_clear_description() {
520 let mut schema = Schema::new_v1(
521 "test.event".to_string(),
522 valid_schema(),
523 CompatibilityMode::None,
524 )
525 .unwrap();
526
527 schema.set_description("Test".to_string()).unwrap();
528 assert!(schema.description().is_some());
529
530 schema.clear_description();
531 assert!(schema.description().is_none());
532 }
533
534 #[test]
535 fn test_add_tag() {
536 let mut schema = Schema::new_v1(
537 "test.event".to_string(),
538 valid_schema(),
539 CompatibilityMode::None,
540 )
541 .unwrap();
542
543 assert_eq!(schema.tags().len(), 0);
544
545 let result = schema.add_tag("production".to_string());
546 assert!(result.is_ok());
547 assert_eq!(schema.tags().len(), 1);
548 assert!(schema.has_tag("production"));
549 }
550
551 #[test]
552 fn test_reject_duplicate_tag() {
553 let mut schema = Schema::new_v1(
554 "test.event".to_string(),
555 valid_schema(),
556 CompatibilityMode::None,
557 )
558 .unwrap();
559
560 schema.add_tag("test".to_string()).unwrap();
561 let result = schema.add_tag("test".to_string());
562 assert!(result.is_err());
563 }
564
565 #[test]
566 fn test_remove_tag() {
567 let mut schema = Schema::new_v1(
568 "test.event".to_string(),
569 valid_schema(),
570 CompatibilityMode::None,
571 )
572 .unwrap();
573
574 schema.add_tag("tag1".to_string()).unwrap();
575 schema.add_tag("tag2".to_string()).unwrap();
576
577 let result = schema.remove_tag("tag1");
578 assert!(result.is_ok());
579 assert_eq!(schema.tags().len(), 1);
580 assert!(!schema.has_tag("tag1"));
581 assert!(schema.has_tag("tag2"));
582 }
583
584 #[test]
585 fn test_remove_nonexistent_tag() {
586 let mut schema = Schema::new_v1(
587 "test.event".to_string(),
588 valid_schema(),
589 CompatibilityMode::None,
590 )
591 .unwrap();
592
593 let result = schema.remove_tag("nonexistent");
594 assert!(result.is_err());
595 }
596
597 #[test]
598 fn test_reject_invalid_tags() {
599 let mut schema = Schema::new_v1(
600 "test.event".to_string(),
601 valid_schema(),
602 CompatibilityMode::None,
603 )
604 .unwrap();
605
606 assert!(schema.add_tag("".to_string()).is_err());
608
609 assert!(schema.add_tag("a".repeat(51)).is_err());
611
612 assert!(schema.add_tag("tag with spaces".to_string()).is_err());
614 assert!(schema.add_tag("tag@invalid".to_string()).is_err());
615 }
616
617 #[test]
618 fn test_accept_valid_tags() {
619 let mut schema = Schema::new_v1(
620 "test.event".to_string(),
621 valid_schema(),
622 CompatibilityMode::None,
623 )
624 .unwrap();
625
626 let valid_tags = vec!["production", "test-env", "v2_schema", "important123"];
627
628 for tag in valid_tags {
629 assert!(schema.add_tag(tag.to_string()).is_ok());
630 }
631 }
632
633 #[test]
634 fn test_is_first_version() {
635 let schema_v1 = Schema::new_v1(
636 "test.event".to_string(),
637 valid_schema(),
638 CompatibilityMode::None,
639 )
640 .unwrap();
641
642 let schema_v2 = Schema::new(
643 "test.event".to_string(),
644 2,
645 valid_schema(),
646 CompatibilityMode::None,
647 )
648 .unwrap();
649
650 assert!(schema_v1.is_first_version());
651 assert!(!schema_v2.is_first_version());
652 }
653
654 #[test]
655 fn test_applies_to() {
656 let schema = Schema::new_v1(
657 "user.created".to_string(),
658 valid_schema(),
659 CompatibilityMode::None,
660 )
661 .unwrap();
662
663 assert!(schema.applies_to("user.created"));
664 assert!(!schema.applies_to("order.placed"));
665 }
666
667 #[test]
668 fn test_create_next_version() {
669 let schema_v1 = Schema::new_v1(
670 "test.event".to_string(),
671 valid_schema(),
672 CompatibilityMode::Backward,
673 )
674 .unwrap();
675
676 let new_schema = json!({
677 "type": "object",
678 "properties": {
679 "name": { "type": "string" },
680 "age": { "type": "number" },
681 "email": { "type": "string" } }
683 });
684
685 let schema_v2 = schema_v1.create_next_version(new_schema);
686 assert!(schema_v2.is_ok());
687
688 let schema_v2 = schema_v2.unwrap();
689 assert_eq!(schema_v2.version(), 2);
690 assert_eq!(schema_v2.subject(), "test.event");
691 assert_eq!(schema_v2.compatibility_mode(), CompatibilityMode::Backward);
692 }
693
694 #[test]
695 fn test_compatibility_mode_checks() {
696 assert!(CompatibilityMode::Backward.requires_backward());
697 assert!(!CompatibilityMode::Backward.requires_forward());
698
699 assert!(!CompatibilityMode::Forward.requires_backward());
700 assert!(CompatibilityMode::Forward.requires_forward());
701
702 assert!(CompatibilityMode::Full.requires_backward());
703 assert!(CompatibilityMode::Full.requires_forward());
704
705 assert!(!CompatibilityMode::None.requires_backward());
706 assert!(!CompatibilityMode::None.requires_forward());
707 assert!(!CompatibilityMode::None.requires_compatibility());
708 }
709
710 #[test]
711 fn test_serde_serialization() {
712 let schema = Schema::new_v1(
713 "test.event".to_string(),
714 valid_schema(),
715 CompatibilityMode::Backward,
716 )
717 .unwrap();
718
719 let json = serde_json::to_string(&schema);
721 assert!(json.is_ok());
722
723 let deserialized = serde_json::from_str::<Schema>(&json.unwrap());
725 assert!(deserialized.is_ok());
726 }
727}