1use crate::error::Result;
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use serde_json::Value as JsonValue;
5use uuid::Uuid;
6
7#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
11#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
12pub enum CompatibilityMode {
13 None,
15 #[default]
17 Backward,
18 Forward,
20 Full,
22}
23
24impl CompatibilityMode {
25 pub fn requires_backward(&self) -> bool {
27 matches!(self, Self::Backward | Self::Full)
28 }
29
30 pub fn requires_forward(&self) -> bool {
32 matches!(self, Self::Forward | Self::Full)
33 }
34
35 pub fn requires_compatibility(&self) -> bool {
37 !matches!(self, Self::None)
38 }
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct Schema {
54 id: Uuid,
55 subject: String,
56 version: u32,
57 schema_definition: JsonValue,
58 created_at: DateTime<Utc>,
59 description: Option<String>,
60 tags: Vec<String>,
61 compatibility_mode: CompatibilityMode,
62}
63
64impl Schema {
65 pub fn new(
73 subject: String,
74 version: u32,
75 schema_definition: JsonValue,
76 compatibility_mode: CompatibilityMode,
77 ) -> Result<Self> {
78 Self::validate_subject(&subject)?;
79 Self::validate_version(version)?;
80 Self::validate_schema(&schema_definition)?;
81
82 Ok(Self {
83 id: Uuid::new_v4(),
84 subject,
85 version,
86 schema_definition,
87 created_at: Utc::now(),
88 description: None,
89 tags: Vec::new(),
90 compatibility_mode,
91 })
92 }
93
94 pub fn new_v1(
96 subject: String,
97 schema_definition: JsonValue,
98 compatibility_mode: CompatibilityMode,
99 ) -> Result<Self> {
100 Self::new(subject, 1, schema_definition, compatibility_mode)
101 }
102
103 #[allow(clippy::too_many_arguments)]
105 pub fn reconstruct(
106 id: Uuid,
107 subject: String,
108 version: u32,
109 schema_definition: JsonValue,
110 created_at: DateTime<Utc>,
111 description: Option<String>,
112 tags: Vec<String>,
113 compatibility_mode: CompatibilityMode,
114 ) -> Self {
115 Self {
116 id,
117 subject,
118 version,
119 schema_definition,
120 created_at,
121 description,
122 tags,
123 compatibility_mode,
124 }
125 }
126
127 pub fn id(&self) -> Uuid {
130 self.id
131 }
132
133 pub fn subject(&self) -> &str {
134 &self.subject
135 }
136
137 pub fn version(&self) -> u32 {
138 self.version
139 }
140
141 pub fn schema_definition(&self) -> &JsonValue {
142 &self.schema_definition
143 }
144
145 pub fn created_at(&self) -> DateTime<Utc> {
146 self.created_at
147 }
148
149 pub fn description(&self) -> Option<&str> {
150 self.description.as_deref()
151 }
152
153 pub fn tags(&self) -> &[String] {
154 &self.tags
155 }
156
157 pub fn compatibility_mode(&self) -> CompatibilityMode {
158 self.compatibility_mode
159 }
160
161 pub fn set_description(&mut self, description: String) -> Result<()> {
165 Self::validate_description(&description)?;
166 self.description = Some(description);
167 Ok(())
168 }
169
170 pub fn clear_description(&mut self) {
172 self.description = None;
173 }
174
175 pub fn add_tag(&mut self, tag: String) -> Result<()> {
177 Self::validate_tag(&tag)?;
178
179 if self.tags.contains(&tag) {
180 return Err(crate::error::AllSourceError::InvalidInput(format!(
181 "Tag '{tag}' already exists"
182 )));
183 }
184
185 self.tags.push(tag);
186 Ok(())
187 }
188
189 pub fn remove_tag(&mut self, tag: &str) -> Result<()> {
191 let initial_len = self.tags.len();
192 self.tags.retain(|t| t != tag);
193
194 if self.tags.len() == initial_len {
195 return Err(crate::error::AllSourceError::InvalidInput(format!(
196 "Tag '{tag}' not found"
197 )));
198 }
199
200 Ok(())
201 }
202
203 pub fn has_tag(&self, tag: &str) -> bool {
205 self.tags.iter().any(|t| t == tag)
206 }
207
208 pub fn is_first_version(&self) -> bool {
210 self.version == 1
211 }
212
213 pub fn applies_to(&self, subject: &str) -> bool {
215 self.subject == subject
216 }
217
218 pub fn create_next_version(&self, new_schema: JsonValue) -> Result<Schema> {
220 Schema::new(
221 self.subject.clone(),
222 self.version + 1,
223 new_schema,
224 self.compatibility_mode,
225 )
226 }
227
228 fn validate_subject(subject: &str) -> Result<()> {
231 if subject.is_empty() {
232 return Err(crate::error::AllSourceError::InvalidInput(
233 "Schema subject cannot be empty".to_string(),
234 ));
235 }
236
237 if subject.len() > 256 {
238 return Err(crate::error::AllSourceError::InvalidInput(format!(
239 "Schema subject cannot exceed 256 characters, got {}",
240 subject.len()
241 )));
242 }
243
244 if !subject
246 .chars()
247 .all(|c| c.is_lowercase() || c.is_numeric() || c == '.' || c == '_' || c == '-')
248 {
249 return Err(crate::error::AllSourceError::InvalidInput(format!(
250 "Schema subject '{subject}' must be lowercase with dots, underscores, or hyphens"
251 )));
252 }
253
254 Ok(())
255 }
256
257 fn validate_version(version: u32) -> Result<()> {
258 if version == 0 {
259 return Err(crate::error::AllSourceError::InvalidInput(
260 "Schema version must be >= 1".to_string(),
261 ));
262 }
263
264 Ok(())
265 }
266
267 fn validate_schema(schema: &JsonValue) -> Result<()> {
268 if schema.is_null() {
269 return Err(crate::error::AllSourceError::InvalidInput(
270 "Schema definition cannot be null".to_string(),
271 ));
272 }
273
274 if let Some(obj) = schema.as_object() {
276 if !obj.contains_key("type") && !obj.contains_key("$schema") {
277 return Err(crate::error::AllSourceError::InvalidInput(
278 "Schema definition should contain 'type' or '$schema' property".to_string(),
279 ));
280 }
281 } else {
282 return Err(crate::error::AllSourceError::InvalidInput(
283 "Schema definition must be a JSON object".to_string(),
284 ));
285 }
286
287 Ok(())
288 }
289
290 fn validate_description(description: &str) -> Result<()> {
291 if description.len() > 1000 {
292 return Err(crate::error::AllSourceError::InvalidInput(format!(
293 "Schema description cannot exceed 1000 characters, got {}",
294 description.len()
295 )));
296 }
297 Ok(())
298 }
299
300 fn validate_tag(tag: &str) -> Result<()> {
301 if tag.is_empty() {
302 return Err(crate::error::AllSourceError::InvalidInput(
303 "Tag cannot be empty".to_string(),
304 ));
305 }
306
307 if tag.len() > 50 {
308 return Err(crate::error::AllSourceError::InvalidInput(format!(
309 "Tag cannot exceed 50 characters, got {}",
310 tag.len()
311 )));
312 }
313
314 if !tag
315 .chars()
316 .all(|c| c.is_alphanumeric() || c == '-' || c == '_')
317 {
318 return Err(crate::error::AllSourceError::InvalidInput(format!(
319 "Tag '{tag}' must be alphanumeric with hyphens or underscores"
320 )));
321 }
322
323 Ok(())
324 }
325}
326
327#[cfg(test)]
328mod tests {
329 use super::*;
330 use serde_json::json;
331
332 fn valid_schema() -> JsonValue {
333 json!({
334 "type": "object",
335 "properties": {
336 "name": { "type": "string" },
337 "age": { "type": "number" }
338 }
339 })
340 }
341
342 #[test]
343 fn test_create_schema() {
344 let schema = Schema::new(
345 "user.created".to_string(),
346 1,
347 valid_schema(),
348 CompatibilityMode::Backward,
349 );
350
351 assert!(schema.is_ok());
352 let schema = schema.unwrap();
353 assert_eq!(schema.subject(), "user.created");
354 assert_eq!(schema.version(), 1);
355 assert_eq!(schema.compatibility_mode(), CompatibilityMode::Backward);
356 }
357
358 #[test]
359 fn test_create_v1_schema() {
360 let schema = Schema::new_v1(
361 "order.placed".to_string(),
362 valid_schema(),
363 CompatibilityMode::Full,
364 );
365
366 assert!(schema.is_ok());
367 let schema = schema.unwrap();
368 assert_eq!(schema.version(), 1);
369 assert!(schema.is_first_version());
370 }
371
372 #[test]
373 fn test_reject_empty_subject() {
374 let result = Schema::new(String::new(), 1, valid_schema(), CompatibilityMode::None);
375
376 assert!(result.is_err());
377 }
378
379 #[test]
380 fn test_reject_too_long_subject() {
381 let long_subject = "a".repeat(257);
382 let result = Schema::new(long_subject, 1, valid_schema(), CompatibilityMode::None);
383
384 assert!(result.is_err());
385 }
386
387 #[test]
388 fn test_reject_invalid_subject_characters() {
389 let result = Schema::new(
390 "User.Created".to_string(), 1,
392 valid_schema(),
393 CompatibilityMode::None,
394 );
395
396 assert!(result.is_err());
397 }
398
399 #[test]
400 fn test_accept_valid_subjects() {
401 let subjects = vec![
402 "user.created",
403 "order_placed",
404 "payment-processed",
405 "event.v2.updated",
406 ];
407
408 for subject in subjects {
409 let result = Schema::new(
410 subject.to_string(),
411 1,
412 valid_schema(),
413 CompatibilityMode::None,
414 );
415 assert!(result.is_ok(), "Subject '{subject}' should be valid");
416 }
417 }
418
419 #[test]
420 fn test_reject_zero_version() {
421 let result = Schema::new(
422 "test.event".to_string(),
423 0, valid_schema(),
425 CompatibilityMode::None,
426 );
427
428 assert!(result.is_err());
429 }
430
431 #[test]
432 fn test_reject_null_schema() {
433 let result = Schema::new(
434 "test.event".to_string(),
435 1,
436 JsonValue::Null,
437 CompatibilityMode::None,
438 );
439
440 assert!(result.is_err());
441 }
442
443 #[test]
444 fn test_reject_non_object_schema() {
445 let result = Schema::new(
446 "test.event".to_string(),
447 1,
448 json!("not an object"),
449 CompatibilityMode::None,
450 );
451
452 assert!(result.is_err());
453 }
454
455 #[test]
456 fn test_reject_schema_without_type() {
457 let result = Schema::new(
458 "test.event".to_string(),
459 1,
460 json!({"properties": {}}), CompatibilityMode::None,
462 );
463
464 assert!(result.is_err());
465 }
466
467 #[test]
468 fn test_accept_schema_with_schema_property() {
469 let result = Schema::new(
470 "test.event".to_string(),
471 1,
472 json!({"$schema": "http://json-schema.org/draft-07/schema#"}),
473 CompatibilityMode::None,
474 );
475
476 assert!(result.is_ok());
477 }
478
479 #[test]
480 fn test_set_description() {
481 let mut schema = Schema::new_v1(
482 "test.event".to_string(),
483 valid_schema(),
484 CompatibilityMode::None,
485 )
486 .unwrap();
487
488 assert!(schema.description().is_none());
489
490 let result = schema.set_description("Test schema".to_string());
491 assert!(result.is_ok());
492 assert_eq!(schema.description(), Some("Test schema"));
493 }
494
495 #[test]
496 fn test_reject_too_long_description() {
497 let mut schema = Schema::new_v1(
498 "test.event".to_string(),
499 valid_schema(),
500 CompatibilityMode::None,
501 )
502 .unwrap();
503
504 let long_desc = "a".repeat(1001);
505 let result = schema.set_description(long_desc);
506 assert!(result.is_err());
507 }
508
509 #[test]
510 fn test_clear_description() {
511 let mut schema = Schema::new_v1(
512 "test.event".to_string(),
513 valid_schema(),
514 CompatibilityMode::None,
515 )
516 .unwrap();
517
518 schema.set_description("Test".to_string()).unwrap();
519 assert!(schema.description().is_some());
520
521 schema.clear_description();
522 assert!(schema.description().is_none());
523 }
524
525 #[test]
526 fn test_add_tag() {
527 let mut schema = Schema::new_v1(
528 "test.event".to_string(),
529 valid_schema(),
530 CompatibilityMode::None,
531 )
532 .unwrap();
533
534 assert_eq!(schema.tags().len(), 0);
535
536 let result = schema.add_tag("production".to_string());
537 assert!(result.is_ok());
538 assert_eq!(schema.tags().len(), 1);
539 assert!(schema.has_tag("production"));
540 }
541
542 #[test]
543 fn test_reject_duplicate_tag() {
544 let mut schema = Schema::new_v1(
545 "test.event".to_string(),
546 valid_schema(),
547 CompatibilityMode::None,
548 )
549 .unwrap();
550
551 schema.add_tag("test".to_string()).unwrap();
552 let result = schema.add_tag("test".to_string());
553 assert!(result.is_err());
554 }
555
556 #[test]
557 fn test_remove_tag() {
558 let mut schema = Schema::new_v1(
559 "test.event".to_string(),
560 valid_schema(),
561 CompatibilityMode::None,
562 )
563 .unwrap();
564
565 schema.add_tag("tag1".to_string()).unwrap();
566 schema.add_tag("tag2".to_string()).unwrap();
567
568 let result = schema.remove_tag("tag1");
569 assert!(result.is_ok());
570 assert_eq!(schema.tags().len(), 1);
571 assert!(!schema.has_tag("tag1"));
572 assert!(schema.has_tag("tag2"));
573 }
574
575 #[test]
576 fn test_remove_nonexistent_tag() {
577 let mut schema = Schema::new_v1(
578 "test.event".to_string(),
579 valid_schema(),
580 CompatibilityMode::None,
581 )
582 .unwrap();
583
584 let result = schema.remove_tag("nonexistent");
585 assert!(result.is_err());
586 }
587
588 #[test]
589 fn test_reject_invalid_tags() {
590 let mut schema = Schema::new_v1(
591 "test.event".to_string(),
592 valid_schema(),
593 CompatibilityMode::None,
594 )
595 .unwrap();
596
597 assert!(schema.add_tag(String::new()).is_err());
599
600 assert!(schema.add_tag("a".repeat(51)).is_err());
602
603 assert!(schema.add_tag("tag with spaces".to_string()).is_err());
605 assert!(schema.add_tag("tag@invalid".to_string()).is_err());
606 }
607
608 #[test]
609 fn test_accept_valid_tags() {
610 let mut schema = Schema::new_v1(
611 "test.event".to_string(),
612 valid_schema(),
613 CompatibilityMode::None,
614 )
615 .unwrap();
616
617 let valid_tags = vec!["production", "test-env", "v2_schema", "important123"];
618
619 for tag in valid_tags {
620 assert!(schema.add_tag(tag.to_string()).is_ok());
621 }
622 }
623
624 #[test]
625 fn test_is_first_version() {
626 let schema_v1 = Schema::new_v1(
627 "test.event".to_string(),
628 valid_schema(),
629 CompatibilityMode::None,
630 )
631 .unwrap();
632
633 let schema_v2 = Schema::new(
634 "test.event".to_string(),
635 2,
636 valid_schema(),
637 CompatibilityMode::None,
638 )
639 .unwrap();
640
641 assert!(schema_v1.is_first_version());
642 assert!(!schema_v2.is_first_version());
643 }
644
645 #[test]
646 fn test_applies_to() {
647 let schema = Schema::new_v1(
648 "user.created".to_string(),
649 valid_schema(),
650 CompatibilityMode::None,
651 )
652 .unwrap();
653
654 assert!(schema.applies_to("user.created"));
655 assert!(!schema.applies_to("order.placed"));
656 }
657
658 #[test]
659 fn test_create_next_version() {
660 let schema_v1 = Schema::new_v1(
661 "test.event".to_string(),
662 valid_schema(),
663 CompatibilityMode::Backward,
664 )
665 .unwrap();
666
667 let new_schema = json!({
668 "type": "object",
669 "properties": {
670 "name": { "type": "string" },
671 "age": { "type": "number" },
672 "email": { "type": "string" } }
674 });
675
676 let schema_v2 = schema_v1.create_next_version(new_schema);
677 assert!(schema_v2.is_ok());
678
679 let schema_v2 = schema_v2.unwrap();
680 assert_eq!(schema_v2.version(), 2);
681 assert_eq!(schema_v2.subject(), "test.event");
682 assert_eq!(schema_v2.compatibility_mode(), CompatibilityMode::Backward);
683 }
684
685 #[test]
686 fn test_compatibility_mode_checks() {
687 assert!(CompatibilityMode::Backward.requires_backward());
688 assert!(!CompatibilityMode::Backward.requires_forward());
689
690 assert!(!CompatibilityMode::Forward.requires_backward());
691 assert!(CompatibilityMode::Forward.requires_forward());
692
693 assert!(CompatibilityMode::Full.requires_backward());
694 assert!(CompatibilityMode::Full.requires_forward());
695
696 assert!(!CompatibilityMode::None.requires_backward());
697 assert!(!CompatibilityMode::None.requires_forward());
698 assert!(!CompatibilityMode::None.requires_compatibility());
699 }
700
701 #[test]
702 fn test_serde_serialization() {
703 let schema = Schema::new_v1(
704 "test.event".to_string(),
705 valid_schema(),
706 CompatibilityMode::Backward,
707 )
708 .unwrap();
709
710 let json = serde_json::to_string(&schema);
712 assert!(json.is_ok());
713
714 let deserialized = serde_json::from_str::<Schema>(&json.unwrap());
716 assert!(deserialized.is_ok());
717 }
718}