1#[cfg(test)]
14use crate::EventMetadata;
15use crate::StreamEvent;
16use anyhow::{anyhow, Result};
17use chrono::{DateTime, Utc};
18use serde::{Deserialize, Serialize};
19use std::collections::HashMap;
20use std::sync::Arc;
21use tokio::sync::RwLock;
22use tracing::{debug, info, warn};
23use uuid::Uuid;
24
25#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
27pub enum SchemaFormat {
28 JsonSchema,
30 Avro,
32 Protobuf,
34 RdfSparql,
36 Custom { format_name: String },
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
42pub enum CompatibilityMode {
43 None,
45 Backward,
47 Forward,
49 Full,
51 Breaking,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct SchemaDefinition {
58 pub id: Uuid,
60 pub subject: String,
62 pub version: u32,
64 pub format: SchemaFormat,
66 pub schema_content: String,
68 pub title: Option<String>,
70 pub description: Option<String>,
72 pub created_at: DateTime<Utc>,
74 pub updated_at: DateTime<Utc>,
76 pub compatibility: CompatibilityMode,
78 pub tags: Vec<String>,
80 pub metadata: HashMap<String, String>,
82}
83
84impl SchemaDefinition {
85 pub fn new(
86 subject: String,
87 version: u32,
88 format: SchemaFormat,
89 schema_content: String,
90 ) -> Self {
91 let now = Utc::now();
92 Self {
93 id: Uuid::new_v4(),
94 subject,
95 version,
96 format,
97 schema_content,
98 title: None,
99 description: None,
100 created_at: now,
101 updated_at: now,
102 compatibility: CompatibilityMode::Backward,
103 tags: Vec::new(),
104 metadata: HashMap::new(),
105 }
106 }
107
108 pub fn update_content(&mut self, content: String) {
110 self.schema_content = content;
111 self.updated_at = Utc::now();
112 }
113
114 pub fn add_tag(&mut self, tag: String) {
116 if !self.tags.contains(&tag) {
117 self.tags.push(tag);
118 }
119 }
120
121 pub fn set_metadata(&mut self, key: String, value: String) {
123 self.metadata.insert(key, value);
124 self.updated_at = Utc::now();
125 }
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct ValidationResult {
131 pub is_valid: bool,
133 pub errors: Vec<String>,
135 pub warnings: Vec<String>,
137 pub schema_id: Uuid,
139 pub schema_version: u32,
141 pub validated_at: DateTime<Utc>,
143}
144
145impl ValidationResult {
146 pub fn success(schema_id: Uuid, schema_version: u32) -> Self {
147 Self {
148 is_valid: true,
149 errors: Vec::new(),
150 warnings: Vec::new(),
151 schema_id,
152 schema_version,
153 validated_at: Utc::now(),
154 }
155 }
156
157 pub fn failure(schema_id: Uuid, schema_version: u32, errors: Vec<String>) -> Self {
158 Self {
159 is_valid: false,
160 errors,
161 warnings: Vec::new(),
162 schema_id,
163 schema_version,
164 validated_at: Utc::now(),
165 }
166 }
167
168 pub fn add_warning(&mut self, warning: String) {
169 self.warnings.push(warning);
170 }
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize)]
175pub struct SchemaRegistryConfig {
176 pub enable_validation: bool,
178 pub strict_mode: bool,
180 pub enable_caching: bool,
182 pub cache_ttl_seconds: u64,
184 pub external_registry: Option<ExternalRegistryConfig>,
186 pub default_compatibility: CompatibilityMode,
188 pub max_versions_per_subject: u32,
190}
191
192impl Default for SchemaRegistryConfig {
193 fn default() -> Self {
194 Self {
195 enable_validation: true,
196 strict_mode: false,
197 enable_caching: true,
198 cache_ttl_seconds: 3600, external_registry: None,
200 default_compatibility: CompatibilityMode::Backward,
201 max_versions_per_subject: 10,
202 }
203 }
204}
205
206#[derive(Debug, Clone, Serialize, Deserialize)]
208pub struct ExternalRegistryConfig {
209 pub registry_type: String,
211 pub url: String,
213 pub auth: Option<RegistryAuth>,
215 pub enable_sync: bool,
217 pub sync_interval_seconds: u64,
219}
220
221#[derive(Debug, Clone, Serialize, Deserialize)]
223pub struct RegistryAuth {
224 pub auth_type: String,
226 pub username: Option<String>,
228 pub password: Option<String>,
230 pub token: Option<String>,
232}
233
234pub struct SchemaRegistry {
236 config: SchemaRegistryConfig,
238 schemas: Arc<RwLock<HashMap<String, HashMap<u32, SchemaDefinition>>>>,
240 schema_cache: Arc<RwLock<HashMap<Uuid, SchemaDefinition>>>,
242 latest_versions: Arc<RwLock<HashMap<String, u32>>>,
244 validation_stats: Arc<RwLock<ValidationStats>>,
246}
247
248#[derive(Debug, Default, Clone, Serialize, Deserialize)]
250pub struct ValidationStats {
251 pub total_validations: u64,
252 pub successful_validations: u64,
253 pub failed_validations: u64,
254 pub warnings_count: u64,
255 pub validation_time_ms: f64,
256 pub cache_hits: u64,
257 pub cache_misses: u64,
258}
259
260impl SchemaRegistry {
261 pub fn new(config: SchemaRegistryConfig) -> Self {
263 Self {
264 config,
265 schemas: Arc::new(RwLock::new(HashMap::new())),
266 schema_cache: Arc::new(RwLock::new(HashMap::new())),
267 latest_versions: Arc::new(RwLock::new(HashMap::new())),
268 validation_stats: Arc::new(RwLock::new(ValidationStats::default())),
269 }
270 }
271
272 pub async fn register_schema(
274 &self,
275 subject: String,
276 format: SchemaFormat,
277 schema_content: String,
278 compatibility: Option<CompatibilityMode>,
279 ) -> Result<SchemaDefinition> {
280 let mut schemas = self.schemas.write().await;
281 let mut latest_versions = self.latest_versions.write().await;
282
283 let subject_schemas = schemas.entry(subject.clone()).or_insert_with(HashMap::new);
285 let next_version = latest_versions.get(&subject).map(|v| v + 1).unwrap_or(1);
286
287 if next_version > 1 {
289 let latest_version = next_version - 1;
290 if let Some(existing_schema) = subject_schemas.get(&latest_version) {
291 self.check_compatibility(existing_schema, &schema_content, format.clone())
292 .await?;
293 }
294 }
295
296 let mut schema =
298 SchemaDefinition::new(subject.clone(), next_version, format, schema_content);
299
300 if let Some(compat) = compatibility {
301 schema.compatibility = compat;
302 } else {
303 schema.compatibility = self.config.default_compatibility.clone();
304 }
305
306 subject_schemas.insert(next_version, schema.clone());
308 latest_versions.insert(subject.clone(), next_version);
309
310 if self.config.enable_caching {
312 let mut cache = self.schema_cache.write().await;
313 cache.insert(schema.id, schema.clone());
314 }
315
316 info!(
317 "Registered schema for subject '{}' version {} with ID {}",
318 subject, next_version, schema.id
319 );
320
321 Ok(schema)
322 }
323
324 pub async fn get_schema(
326 &self,
327 subject: &str,
328 version: Option<u32>,
329 ) -> Result<Option<SchemaDefinition>> {
330 let schemas = self.schemas.read().await;
331
332 if let Some(subject_schemas) = schemas.get(subject) {
333 let version = if let Some(v) = version {
334 v
335 } else {
336 let latest_versions = self.latest_versions.read().await;
338 *latest_versions
339 .get(subject)
340 .ok_or_else(|| anyhow!("No schemas found for subject: {}", subject))?
341 };
342
343 Ok(subject_schemas.get(&version).cloned())
344 } else {
345 Ok(None)
346 }
347 }
348
349 pub async fn get_schema_by_id(&self, schema_id: &Uuid) -> Result<Option<SchemaDefinition>> {
351 if self.config.enable_caching {
353 let cache = self.schema_cache.read().await;
354 if let Some(schema) = cache.get(schema_id) {
355 let mut stats = self.validation_stats.write().await;
356 stats.cache_hits += 1;
357 return Ok(Some(schema.clone()));
358 }
359 }
360
361 let schemas = self.schemas.read().await;
363 for subject_schemas in schemas.values() {
364 for schema in subject_schemas.values() {
365 if &schema.id == schema_id {
366 if self.config.enable_caching {
368 let mut cache = self.schema_cache.write().await;
369 cache.insert(*schema_id, schema.clone());
370 }
371
372 let mut stats = self.validation_stats.write().await;
373 stats.cache_misses += 1;
374 return Ok(Some(schema.clone()));
375 }
376 }
377 }
378
379 Ok(None)
380 }
381
382 pub async fn list_schemas(&self, subject: &str) -> Result<Vec<SchemaDefinition>> {
384 let schemas = self.schemas.read().await;
385
386 if let Some(subject_schemas) = schemas.get(subject) {
387 let mut schemas: Vec<SchemaDefinition> = subject_schemas.values().cloned().collect();
388 schemas.sort_by(|a, b| a.version.cmp(&b.version));
389 Ok(schemas)
390 } else {
391 Ok(Vec::new())
392 }
393 }
394
395 pub async fn list_subjects(&self) -> Result<Vec<String>> {
397 let schemas = self.schemas.read().await;
398 Ok(schemas.keys().cloned().collect())
399 }
400
401 pub async fn validate_event(
403 &self,
404 event: &StreamEvent,
405 subject: Option<&str>,
406 ) -> Result<ValidationResult> {
407 if !self.config.enable_validation {
408 return Ok(ValidationResult::success(Uuid::new_v4(), 1));
409 }
410
411 let start_time = std::time::Instant::now();
412 let mut stats = self.validation_stats.write().await;
413 stats.total_validations += 1;
414 drop(stats);
415
416 let event_subject = subject
418 .map(|s| s.to_string())
419 .or_else(|| self.extract_subject_from_event(event))
420 .ok_or_else(|| anyhow!("Cannot determine subject for validation"))?;
421
422 let schema = self
424 .get_schema(&event_subject, None)
425 .await?
426 .ok_or_else(|| anyhow!("No schema found for subject: {}", event_subject))?;
427
428 let validation_result = match schema.format {
430 SchemaFormat::JsonSchema => self.validate_with_json_schema(event, &schema).await?,
431 SchemaFormat::RdfSparql => self.validate_with_rdf_schema(event, &schema).await?,
432 SchemaFormat::Avro => self.validate_with_avro_schema(event, &schema).await?,
433 _ => {
434 warn!("Validation not implemented for format: {:?}", schema.format);
435 ValidationResult::success(schema.id, schema.version)
436 }
437 };
438
439 let elapsed = start_time.elapsed();
441 let mut stats = self.validation_stats.write().await;
442 stats.validation_time_ms = (stats.validation_time_ms + elapsed.as_millis() as f64) / 2.0;
443
444 if validation_result.is_valid {
445 stats.successful_validations += 1;
446 } else {
447 stats.failed_validations += 1;
448 }
449
450 stats.warnings_count += validation_result.warnings.len() as u64;
451
452 debug!(
453 "Validated event against schema {} ({}ms): {}",
454 schema.id,
455 elapsed.as_millis(),
456 if validation_result.is_valid {
457 "VALID"
458 } else {
459 "INVALID"
460 }
461 );
462
463 Ok(validation_result)
464 }
465
466 fn extract_subject_from_event(&self, event: &StreamEvent) -> Option<String> {
468 match event {
470 StreamEvent::TripleAdded { metadata, .. } => metadata
471 .properties
472 .get("subject")
473 .cloned()
474 .or_else(|| Some("rdf.triple.added".to_string())),
475 StreamEvent::TripleRemoved { metadata, .. } => metadata
476 .properties
477 .get("subject")
478 .cloned()
479 .or_else(|| Some("rdf.triple.removed".to_string())),
480 StreamEvent::SparqlUpdate { metadata, .. } => metadata
481 .properties
482 .get("subject")
483 .cloned()
484 .or_else(|| Some("sparql.update".to_string())),
485 StreamEvent::TransactionBegin { metadata, .. } => metadata
486 .properties
487 .get("subject")
488 .cloned()
489 .or_else(|| Some("transaction.begin".to_string())),
490 StreamEvent::TransactionCommit { metadata, .. } => metadata
491 .properties
492 .get("subject")
493 .cloned()
494 .or_else(|| Some("transaction.commit".to_string())),
495 _ => Some(format!("stream.event.{:?}", std::mem::discriminant(event))),
496 }
497 }
498
499 async fn check_compatibility(
501 &self,
502 existing_schema: &SchemaDefinition,
503 new_schema_content: &str,
504 new_format: SchemaFormat,
505 ) -> Result<()> {
506 if existing_schema.compatibility == CompatibilityMode::None {
507 return Ok(());
508 }
509
510 if existing_schema.format != new_format {
511 return Err(anyhow!(
512 "Schema format changed from {:?} to {:?}",
513 existing_schema.format,
514 new_format
515 ));
516 }
517
518 match new_format {
520 SchemaFormat::JsonSchema => {
521 self.check_json_schema_compatibility(existing_schema, new_schema_content)
522 .await
523 }
524 SchemaFormat::RdfSparql => {
525 self.check_rdf_schema_compatibility(existing_schema, new_schema_content)
526 .await
527 }
528 _ => {
529 warn!(
530 "Compatibility checking not implemented for format: {:?}",
531 new_format
532 );
533 Ok(())
534 }
535 }
536 }
537
538 async fn validate_with_json_schema(
540 &self,
541 _event: &StreamEvent,
542 schema: &SchemaDefinition,
543 ) -> Result<ValidationResult> {
544 debug!("Validating with JSON schema: {}", schema.id);
547 Ok(ValidationResult::success(schema.id, schema.version))
548 }
549
550 async fn validate_with_rdf_schema(
552 &self,
553 event: &StreamEvent,
554 schema: &SchemaDefinition,
555 ) -> Result<ValidationResult> {
556 match event {
558 StreamEvent::TripleAdded {
559 subject,
560 predicate,
561 object: _,
562 ..
563 } => {
564 let mut errors = Vec::new();
565
566 if !subject.starts_with("http://") && !subject.starts_with("https://") {
568 errors.push(format!("Invalid subject URI: {subject}"));
569 }
570
571 if !predicate.starts_with("http://") && !predicate.starts_with("https://") {
572 errors.push(format!("Invalid predicate URI: {predicate}"));
573 }
574
575 if errors.is_empty() {
576 Ok(ValidationResult::success(schema.id, schema.version))
577 } else {
578 Ok(ValidationResult::failure(schema.id, schema.version, errors))
579 }
580 }
581 _ => Ok(ValidationResult::success(schema.id, schema.version)),
582 }
583 }
584
585 async fn validate_with_avro_schema(
587 &self,
588 _event: &StreamEvent,
589 schema: &SchemaDefinition,
590 ) -> Result<ValidationResult> {
591 debug!("Validating with Avro schema: {}", schema.id);
594 Ok(ValidationResult::success(schema.id, schema.version))
595 }
596
597 async fn check_json_schema_compatibility(
599 &self,
600 _existing_schema: &SchemaDefinition,
601 _new_schema_content: &str,
602 ) -> Result<()> {
603 Ok(())
606 }
607
608 async fn check_rdf_schema_compatibility(
610 &self,
611 _existing_schema: &SchemaDefinition,
612 _new_schema_content: &str,
613 ) -> Result<()> {
614 Ok(())
616 }
617
618 pub async fn get_validation_stats(&self) -> ValidationStats {
620 let stats = self.validation_stats.read().await;
621 (*stats).clone()
622 }
623
624 pub async fn delete_schema(&self, subject: &str, version: Option<u32>) -> Result<bool> {
626 let mut schemas = self.schemas.write().await;
627 let mut latest_versions = self.latest_versions.write().await;
628
629 if let Some(subject_schemas) = schemas.get_mut(subject) {
630 if let Some(version) = version {
631 let removed = subject_schemas.remove(&version).is_some();
633
634 if let Some(latest) = latest_versions.get(subject) {
636 if *latest == version {
637 let new_latest = subject_schemas.keys().max().cloned();
638 if let Some(new_latest) = new_latest {
639 latest_versions.insert(subject.to_string(), new_latest);
640 } else {
641 latest_versions.remove(subject);
642 schemas.remove(subject);
643 }
644 }
645 }
646
647 Ok(removed)
648 } else {
649 schemas.remove(subject);
651 latest_versions.remove(subject);
652 Ok(true)
653 }
654 } else {
655 Ok(false)
656 }
657 }
658}
659
660#[cfg(test)]
661mod tests {
662 use super::*;
663 use chrono::Utc;
664 use std::collections::HashMap;
665
666 #[tokio::test]
667 async fn test_schema_registration() -> Result<()> {
668 let config = SchemaRegistryConfig::default();
669 let registry = SchemaRegistry::new(config);
670
671 let schema_content = r#"
672 {
673 "type": "object",
674 "properties": {
675 "subject": {"type": "string"},
676 "predicate": {"type": "string"},
677 "object": {"type": "string"}
678 },
679 "required": ["subject", "predicate", "object"]
680 }"#;
681
682 let schema = registry
683 .register_schema(
684 "rdf.triple.added".to_string(),
685 SchemaFormat::JsonSchema,
686 schema_content.to_string(),
687 None,
688 )
689 .await?;
690
691 assert_eq!(schema.subject, "rdf.triple.added");
692 assert_eq!(schema.version, 1);
693 assert_eq!(schema.format, SchemaFormat::JsonSchema);
694
695 Ok(())
696 }
697
698 #[tokio::test]
699 async fn test_schema_retrieval() -> Result<()> {
700 let config = SchemaRegistryConfig::default();
701 let registry = SchemaRegistry::new(config);
702
703 let schema_content = r#"{"type": "object"}"#;
704 let registered_schema = registry
705 .register_schema(
706 "test.subject".to_string(),
707 SchemaFormat::JsonSchema,
708 schema_content.to_string(),
709 None,
710 )
711 .await?;
712
713 let retrieved = registry
715 .get_schema("test.subject", Some(1))
716 .await?
717 .expect("Schema should exist");
718
719 assert_eq!(retrieved.id, registered_schema.id);
720 assert_eq!(retrieved.version, 1);
721
722 let retrieved_by_id = registry
724 .get_schema_by_id(®istered_schema.id)
725 .await?
726 .expect("Schema should exist");
727
728 assert_eq!(retrieved_by_id.id, registered_schema.id);
729
730 Ok(())
731 }
732
733 #[tokio::test]
734 async fn test_event_validation() -> Result<()> {
735 let config = SchemaRegistryConfig::default();
736 let registry = SchemaRegistry::new(config);
737
738 let schema_content = "RDF Triple Schema";
740 registry
741 .register_schema(
742 "rdf.triple.added".to_string(),
743 SchemaFormat::RdfSparql,
744 schema_content.to_string(),
745 None,
746 )
747 .await?;
748
749 let event = StreamEvent::TripleAdded {
751 subject: "https://example.org/subject".to_string(),
752 predicate: "https://example.org/predicate".to_string(),
753 object: "\"Test Object\"".to_string(),
754 graph: None,
755 metadata: EventMetadata {
756 event_id: "test_event_1".to_string(),
757 timestamp: Utc::now(),
758 source: "test".to_string(),
759 user: None,
760 context: None,
761 caused_by: None,
762 version: "1.0".to_string(),
763 properties: HashMap::new(),
764 checksum: None,
765 },
766 };
767
768 let validation_result = registry
769 .validate_event(&event, Some("rdf.triple.added"))
770 .await?;
771
772 assert!(validation_result.is_valid);
773 assert!(validation_result.errors.is_empty());
774
775 Ok(())
776 }
777
778 #[tokio::test]
779 async fn test_schema_versioning() -> Result<()> {
780 let config = SchemaRegistryConfig::default();
781 let registry = SchemaRegistry::new(config);
782
783 let subject = "test.versioning".to_string();
784
785 let _v1 = registry
787 .register_schema(
788 subject.clone(),
789 SchemaFormat::JsonSchema,
790 r#"{"type": "object", "properties": {"name": {"type": "string"}}}"#.to_string(),
791 None,
792 )
793 .await?;
794
795 let _v2 = registry
797 .register_schema(
798 subject.clone(),
799 SchemaFormat::JsonSchema,
800 r#"{"type": "object", "properties": {"name": {"type": "string"}, "age": {"type": "integer"}}}"#.to_string(),
801 None,
802 )
803 .await?;
804
805 let schemas = registry.list_schemas(&subject).await?;
807 assert_eq!(schemas.len(), 2);
808 assert_eq!(schemas[0].version, 1);
809 assert_eq!(schemas[1].version, 2);
810
811 let latest = registry.get_schema(&subject, None).await?.unwrap();
813 assert_eq!(latest.version, 2);
814
815 Ok(())
816 }
817}