1use crate::context::Context;
43use crate::error::{Mecha10Error, Result};
44use serde::{Deserialize, Serialize};
45use std::collections::HashMap;
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct MessageSchema {
50 pub name: String,
52
53 pub version: u32,
55
56 #[serde(default)]
58 pub description: String,
59
60 pub fields: Vec<SchemaField>,
62
63 #[serde(default)]
65 pub metadata: HashMap<String, String>,
66
67 #[serde(default)]
69 pub registered_at: u64,
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct SchemaField {
75 pub name: String,
77
78 pub field_type: String,
80
81 pub required: bool,
83
84 #[serde(default)]
86 pub description: String,
87
88 #[serde(skip_serializing_if = "Option::is_none")]
90 pub items: Option<String>,
91}
92
93impl MessageSchema {
94 pub fn new(name: impl Into<String>, version: u32) -> Self {
96 Self {
97 name: name.into(),
98 version,
99 description: String::new(),
100 fields: Vec::new(),
101 metadata: HashMap::new(),
102 registered_at: current_timestamp(),
103 }
104 }
105
106 pub fn with_description(mut self, description: impl Into<String>) -> Self {
108 self.description = description.into();
109 self
110 }
111
112 pub fn with_field(mut self, name: impl Into<String>, field_type: impl Into<String>, required: bool) -> Self {
114 self.fields.push(SchemaField {
115 name: name.into(),
116 field_type: field_type.into(),
117 required,
118 description: String::new(),
119 items: None,
120 });
121 self
122 }
123
124 pub fn with_array_field(mut self, name: impl Into<String>, items: impl Into<String>, required: bool) -> Self {
126 self.fields.push(SchemaField {
127 name: name.into(),
128 field_type: "array".to_string(),
129 required,
130 description: String::new(),
131 items: Some(items.into()),
132 });
133 self
134 }
135
136 pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
138 self.metadata.insert(key.into(), value.into());
139 self
140 }
141
142 fn redis_key(&self) -> String {
144 format!("mecha10:schemas:{}:{}", self.name, self.version)
145 }
146
147 fn redis_latest_key(name: &str) -> String {
149 format!("mecha10:schemas:{}:latest", name)
150 }
151
152 fn redis_list_key() -> String {
154 "mecha10:schemas:list".to_string()
155 }
156}
157
158pub trait SchemaRegistryExt {
160 fn register_schema(&self, schema: &MessageSchema) -> impl std::future::Future<Output = Result<()>> + Send;
183
184 fn get_schema(
191 &self,
192 name: &str,
193 version: u32,
194 ) -> impl std::future::Future<Output = Result<Option<MessageSchema>>> + Send;
195
196 fn get_latest_schema(&self, name: &str) -> impl std::future::Future<Output = Result<Option<MessageSchema>>> + Send;
202
203 fn list_schemas(&self) -> impl std::future::Future<Output = Result<Vec<MessageSchema>>> + Send;
205
206 fn get_schema_versions(&self, name: &str) -> impl std::future::Future<Output = Result<Vec<MessageSchema>>> + Send;
212
213 fn delete_schema(&self, name: &str, version: Option<u32>) -> impl std::future::Future<Output = Result<()>> + Send;
220}
221
222impl SchemaRegistryExt for Context {
223 async fn register_schema(&self, schema: &MessageSchema) -> Result<()> {
224 #[cfg(feature = "messaging")]
225 {
226 use redis::AsyncCommands;
227
228 let redis_url = Context::get_redis_url()?;
229 let client = redis::Client::open(redis_url.as_str()).map_err(|e| Mecha10Error::MessagingError {
230 message: format!("Failed to connect to Redis: {}", e),
231 suggestion: "Ensure Redis is running".to_string(),
232 })?;
233
234 let mut conn =
235 client
236 .get_multiplexed_async_connection()
237 .await
238 .map_err(|e| Mecha10Error::MessagingError {
239 message: format!("Failed to get Redis connection: {}", e),
240 suggestion: "Ensure Redis is running".to_string(),
241 })?;
242
243 let schema_json = serde_json::to_string(&schema)
245 .map_err(|e| Mecha10Error::Other(format!("Failed to serialize schema: {}", e)))?;
246
247 let key = schema.redis_key();
249 conn.set::<_, _, ()>(&key, &schema_json)
250 .await
251 .map_err(|e| Mecha10Error::MessagingError {
252 message: format!("Failed to register schema: {}", e),
253 suggestion: "Check Redis connection".to_string(),
254 })?;
255
256 let latest_key = MessageSchema::redis_latest_key(&schema.name);
258 conn.set::<_, _, ()>(&latest_key, schema.version)
259 .await
260 .map_err(|e| Mecha10Error::MessagingError {
261 message: format!("Failed to update latest version: {}", e),
262 suggestion: "Check Redis connection".to_string(),
263 })?;
264
265 let list_key = MessageSchema::redis_list_key();
267 conn.sadd::<_, _, ()>(&list_key, &schema.name)
268 .await
269 .map_err(|e| Mecha10Error::MessagingError {
270 message: format!("Failed to update schema list: {}", e),
271 suggestion: "Check Redis connection".to_string(),
272 })?;
273
274 Ok(())
275 }
276
277 #[cfg(not(feature = "messaging"))]
278 {
279 Err(Mecha10Error::Other("Messaging feature not enabled".to_string()))
280 }
281 }
282
283 async fn get_schema(&self, name: &str, version: u32) -> Result<Option<MessageSchema>> {
284 #[cfg(feature = "messaging")]
285 {
286 use redis::AsyncCommands;
287
288 let redis_url = Context::get_redis_url()?;
289 let client = redis::Client::open(redis_url.as_str()).map_err(|e| Mecha10Error::MessagingError {
290 message: format!("Failed to connect to Redis: {}", e),
291 suggestion: "Ensure Redis is running".to_string(),
292 })?;
293
294 let mut conn =
295 client
296 .get_multiplexed_async_connection()
297 .await
298 .map_err(|e| Mecha10Error::MessagingError {
299 message: format!("Failed to get Redis connection: {}", e),
300 suggestion: "Ensure Redis is running".to_string(),
301 })?;
302
303 let key = format!("mecha10:schemas:{}:{}", name, version);
304 let json: Option<String> = conn.get(&key).await.map_err(|e| Mecha10Error::MessagingError {
305 message: format!("Failed to get schema: {}", e),
306 suggestion: "Check Redis connection".to_string(),
307 })?;
308
309 if let Some(json) = json {
310 let schema = serde_json::from_str::<MessageSchema>(&json)
311 .map_err(|e| Mecha10Error::Other(format!("Failed to parse schema: {}", e)))?;
312 Ok(Some(schema))
313 } else {
314 Ok(None)
315 }
316 }
317
318 #[cfg(not(feature = "messaging"))]
319 {
320 Err(Mecha10Error::Other("Messaging feature not enabled".to_string()))
321 }
322 }
323
324 async fn get_latest_schema(&self, name: &str) -> Result<Option<MessageSchema>> {
325 #[cfg(feature = "messaging")]
326 {
327 use redis::AsyncCommands;
328
329 let redis_url = Context::get_redis_url()?;
330 let client = redis::Client::open(redis_url.as_str()).map_err(|e| Mecha10Error::MessagingError {
331 message: format!("Failed to connect to Redis: {}", e),
332 suggestion: "Ensure Redis is running".to_string(),
333 })?;
334
335 let mut conn =
336 client
337 .get_multiplexed_async_connection()
338 .await
339 .map_err(|e| Mecha10Error::MessagingError {
340 message: format!("Failed to get Redis connection: {}", e),
341 suggestion: "Ensure Redis is running".to_string(),
342 })?;
343
344 let latest_key = MessageSchema::redis_latest_key(name);
346 let version: Option<u32> = conn.get(&latest_key).await.map_err(|e| Mecha10Error::MessagingError {
347 message: format!("Failed to get latest version: {}", e),
348 suggestion: "Check Redis connection".to_string(),
349 })?;
350
351 if let Some(version) = version {
352 self.get_schema(name, version).await
353 } else {
354 Ok(None)
355 }
356 }
357
358 #[cfg(not(feature = "messaging"))]
359 {
360 Err(Mecha10Error::Other("Messaging feature not enabled".to_string()))
361 }
362 }
363
364 async fn list_schemas(&self) -> Result<Vec<MessageSchema>> {
365 #[cfg(feature = "messaging")]
366 {
367 use redis::AsyncCommands;
368
369 let redis_url = Context::get_redis_url()?;
370 let client = redis::Client::open(redis_url.as_str()).map_err(|e| Mecha10Error::MessagingError {
371 message: format!("Failed to connect to Redis: {}", e),
372 suggestion: "Ensure Redis is running".to_string(),
373 })?;
374
375 let mut conn =
376 client
377 .get_multiplexed_async_connection()
378 .await
379 .map_err(|e| Mecha10Error::MessagingError {
380 message: format!("Failed to get Redis connection: {}", e),
381 suggestion: "Ensure Redis is running".to_string(),
382 })?;
383
384 let list_key = MessageSchema::redis_list_key();
386 let names: Vec<String> = conn
387 .smembers(&list_key)
388 .await
389 .map_err(|e| Mecha10Error::MessagingError {
390 message: format!("Failed to get schema list: {}", e),
391 suggestion: "Check Redis connection".to_string(),
392 })?;
393
394 let mut schemas = Vec::new();
396 for name in names {
397 if let Some(schema) = self.get_latest_schema(&name).await? {
398 schemas.push(schema);
399 }
400 }
401
402 Ok(schemas)
403 }
404
405 #[cfg(not(feature = "messaging"))]
406 {
407 Err(Mecha10Error::Other("Messaging feature not enabled".to_string()))
408 }
409 }
410
411 async fn get_schema_versions(&self, name: &str) -> Result<Vec<MessageSchema>> {
412 #[cfg(feature = "messaging")]
413 {
414 use redis::AsyncCommands;
415
416 let redis_url = Context::get_redis_url()?;
417 let client = redis::Client::open(redis_url.as_str()).map_err(|e| Mecha10Error::MessagingError {
418 message: format!("Failed to connect to Redis: {}", e),
419 suggestion: "Ensure Redis is running".to_string(),
420 })?;
421
422 let mut conn =
423 client
424 .get_multiplexed_async_connection()
425 .await
426 .map_err(|e| Mecha10Error::MessagingError {
427 message: format!("Failed to get Redis connection: {}", e),
428 suggestion: "Ensure Redis is running".to_string(),
429 })?;
430
431 let pattern = format!("mecha10:schemas:{}:*", name);
433 let keys: Vec<String> = conn.keys(&pattern).await.map_err(|e| Mecha10Error::MessagingError {
434 message: format!("Failed to scan for schema versions: {}", e),
435 suggestion: "Check Redis connection".to_string(),
436 })?;
437
438 let mut schemas = Vec::new();
439 for key in keys {
440 if key.ends_with(":latest") {
442 continue;
443 }
444
445 let json: String = conn.get(&key).await.map_err(|e| Mecha10Error::MessagingError {
446 message: format!("Failed to get schema: {}", e),
447 suggestion: "Check Redis connection".to_string(),
448 })?;
449
450 if let Ok(schema) = serde_json::from_str::<MessageSchema>(&json) {
451 schemas.push(schema);
452 }
453 }
454
455 schemas.sort_by_key(|s| s.version);
457
458 Ok(schemas)
459 }
460
461 #[cfg(not(feature = "messaging"))]
462 {
463 Err(Mecha10Error::Other("Messaging feature not enabled".to_string()))
464 }
465 }
466
467 async fn delete_schema(&self, name: &str, version: Option<u32>) -> Result<()> {
468 #[cfg(feature = "messaging")]
469 {
470 use redis::AsyncCommands;
471
472 let redis_url = Context::get_redis_url()?;
473 let client = redis::Client::open(redis_url.as_str()).map_err(|e| Mecha10Error::MessagingError {
474 message: format!("Failed to connect to Redis: {}", e),
475 suggestion: "Ensure Redis is running".to_string(),
476 })?;
477
478 let mut conn =
479 client
480 .get_multiplexed_async_connection()
481 .await
482 .map_err(|e| Mecha10Error::MessagingError {
483 message: format!("Failed to get Redis connection: {}", e),
484 suggestion: "Ensure Redis is running".to_string(),
485 })?;
486
487 if let Some(version) = version {
488 let key = format!("mecha10:schemas:{}:{}", name, version);
490 conn.del::<_, ()>(&key)
491 .await
492 .map_err(|e| Mecha10Error::MessagingError {
493 message: format!("Failed to delete schema: {}", e),
494 suggestion: "Check Redis connection".to_string(),
495 })?;
496 } else {
497 let pattern = format!("mecha10:schemas:{}:*", name);
499 let keys: Vec<String> = conn.keys(&pattern).await.map_err(|e| Mecha10Error::MessagingError {
500 message: format!("Failed to scan for schemas: {}", e),
501 suggestion: "Check Redis connection".to_string(),
502 })?;
503
504 for key in keys {
505 conn.del::<_, ()>(&key)
506 .await
507 .map_err(|e| Mecha10Error::MessagingError {
508 message: format!("Failed to delete schema: {}", e),
509 suggestion: "Check Redis connection".to_string(),
510 })?;
511 }
512
513 let list_key = MessageSchema::redis_list_key();
515 conn.srem::<_, _, ()>(&list_key, name)
516 .await
517 .map_err(|e| Mecha10Error::MessagingError {
518 message: format!("Failed to update schema list: {}", e),
519 suggestion: "Check Redis connection".to_string(),
520 })?;
521 }
522
523 Ok(())
524 }
525
526 #[cfg(not(feature = "messaging"))]
527 {
528 Err(Mecha10Error::Other("Messaging feature not enabled".to_string()))
529 }
530 }
531}
532
533fn current_timestamp() -> u64 {
535 std::time::SystemTime::now()
536 .duration_since(std::time::UNIX_EPOCH)
537 .unwrap()
538 .as_secs()
539}
540
541#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
547pub enum CompatibilityMode {
548 None,
550
551 #[default]
553 Backward,
554
555 Forward,
557
558 Full,
560
561 Strict,
563}
564
565#[derive(Debug, Clone, Serialize, Deserialize)]
567pub enum SchemaEvolution {
568 FieldAdded {
570 name: String,
571 field_type: String,
572 required: bool,
573 default_value: Option<serde_json::Value>,
574 },
575
576 FieldRemoved { name: String },
578
579 FieldMadeOptional { name: String },
581
582 FieldMadeRequired {
584 name: String,
585 default_value: serde_json::Value,
586 },
587
588 FieldTypeChanged {
590 name: String,
591 old_type: String,
592 new_type: String,
593 },
594
595 FieldRenamed { old_name: String, new_name: String },
597}
598
599#[derive(Debug, Clone, Serialize, Deserialize)]
601pub struct CompatibilityCheck {
602 pub compatible: bool,
604
605 pub incompatibilities: Vec<String>,
607
608 pub evolutions: Vec<SchemaEvolution>,
610
611 pub warnings: Vec<String>,
613}
614
615impl CompatibilityCheck {
616 pub fn check(old: &MessageSchema, new: &MessageSchema, mode: CompatibilityMode) -> Self {
618 let mut incompatibilities = Vec::new();
619 let mut evolutions = Vec::new();
620 let mut warnings = Vec::new();
621
622 let old_fields: HashMap<String, &SchemaField> = old.fields.iter().map(|f| (f.name.clone(), f)).collect();
624 let new_fields: HashMap<String, &SchemaField> = new.fields.iter().map(|f| (f.name.clone(), f)).collect();
625
626 match mode {
627 CompatibilityMode::None => {
628 return Self {
630 compatible: true,
631 incompatibilities,
632 evolutions,
633 warnings,
634 };
635 }
636 CompatibilityMode::Strict => {
637 if old.fields.len() != new.fields.len() {
639 incompatibilities.push("Field count changed".to_string());
640 }
641 for (name, old_field) in &old_fields {
642 if let Some(new_field) = new_fields.get(name) {
643 if old_field.field_type != new_field.field_type {
644 incompatibilities.push(format!("Field '{}' type changed", name));
645 }
646 if old_field.required != new_field.required {
647 incompatibilities.push(format!("Field '{}' required status changed", name));
648 }
649 } else {
650 incompatibilities.push(format!("Field '{}' removed", name));
651 }
652 }
653 for name in new_fields.keys() {
654 if !old_fields.contains_key(name) {
655 incompatibilities.push(format!("Field '{}' added", name));
656 }
657 }
658 }
659 CompatibilityMode::Backward => {
660 for (name, old_field) in &old_fields {
668 if let Some(new_field) = new_fields.get(name) {
669 if old_field.field_type != new_field.field_type {
671 incompatibilities.push(format!(
672 "Field '{}' type changed from {} to {}",
673 name, old_field.field_type, new_field.field_type
674 ));
675 evolutions.push(SchemaEvolution::FieldTypeChanged {
676 name: name.clone(),
677 old_type: old_field.field_type.clone(),
678 new_type: new_field.field_type.clone(),
679 });
680 }
681
682 if old_field.required && !new_field.required {
683 evolutions.push(SchemaEvolution::FieldMadeOptional { name: name.clone() });
685 } else if !old_field.required && new_field.required {
686 incompatibilities.push(format!("Field '{}' made required (was optional)", name));
688 evolutions.push(SchemaEvolution::FieldMadeRequired {
689 name: name.clone(),
690 default_value: serde_json::Value::Null,
691 });
692 }
693 } else {
694 if old_field.required {
696 incompatibilities.push(format!("Required field '{}' removed", name));
697 } else {
698 warnings.push(format!("Optional field '{}' removed", name));
699 }
700 evolutions.push(SchemaEvolution::FieldRemoved { name: name.clone() });
701 }
702 }
703
704 for (name, new_field) in &new_fields {
706 if !old_fields.contains_key(name) {
707 if new_field.required {
708 incompatibilities.push(format!("Required field '{}' added", name));
709 } else {
710 warnings.push(format!("Optional field '{}' added", name));
711 }
712 evolutions.push(SchemaEvolution::FieldAdded {
713 name: name.clone(),
714 field_type: new_field.field_type.clone(),
715 required: new_field.required,
716 default_value: None,
717 });
718 }
719 }
720 }
721 CompatibilityMode::Forward => {
722 for (name, old_field) in &old_fields {
730 if let Some(new_field) = new_fields.get(name) {
731 if old_field.field_type != new_field.field_type {
732 incompatibilities.push(format!("Field '{}' type changed", name));
733 evolutions.push(SchemaEvolution::FieldTypeChanged {
734 name: name.clone(),
735 old_type: old_field.field_type.clone(),
736 new_type: new_field.field_type.clone(),
737 });
738 }
739 } else if old_field.required {
740 incompatibilities.push(format!("Required field '{}' removed", name));
741 evolutions.push(SchemaEvolution::FieldRemoved { name: name.clone() });
742 }
743 }
744
745 for (name, new_field) in &new_fields {
746 if !old_fields.contains_key(name) && new_field.required {
747 incompatibilities.push(format!("Required field '{}' added", name));
748 evolutions.push(SchemaEvolution::FieldAdded {
749 name: name.clone(),
750 field_type: new_field.field_type.clone(),
751 required: new_field.required,
752 default_value: None,
753 });
754 }
755 }
756 }
757 CompatibilityMode::Full => {
758 for (name, old_field) in &old_fields {
764 if let Some(new_field) = new_fields.get(name) {
765 if old_field.field_type != new_field.field_type {
766 incompatibilities.push(format!("Field '{}' type changed", name));
767 }
768 if old_field.required != new_field.required {
769 incompatibilities.push(format!("Field '{}' required status changed", name));
770 }
771 } else if old_field.required {
772 incompatibilities.push(format!("Required field '{}' removed", name));
773 }
774 }
775
776 for (name, new_field) in &new_fields {
777 if !old_fields.contains_key(name) && new_field.required {
778 incompatibilities.push(format!("Required field '{}' added", name));
779 }
780 }
781 }
782 }
783
784 Self {
785 compatible: incompatibilities.is_empty(),
786 incompatibilities,
787 evolutions,
788 warnings,
789 }
790 }
791}
792
793#[derive(Debug, Clone, Serialize, Deserialize)]
795pub struct MessageVersionInfo {
796 pub name: String,
798
799 pub current_version: u32,
801
802 pub latest_version: u32,
804
805 pub deprecated: bool,
807
808 pub deprecation_message: Option<String>,
810
811 pub min_supported_version: u32,
813
814 pub compatibility_mode: CompatibilityMode,
816}
817
818pub trait SchemaVersioningExt: SchemaRegistryExt {
820 fn check_compatibility(
822 &self,
823 name: &str,
824 old_version: u32,
825 new_version: u32,
826 mode: CompatibilityMode,
827 ) -> impl std::future::Future<Output = Result<CompatibilityCheck>> + Send;
828
829 fn register_versioned_schema(
831 &self,
832 schema: &MessageSchema,
833 mode: CompatibilityMode,
834 ) -> impl std::future::Future<Output = Result<CompatibilityCheck>> + Send;
835
836 fn get_version_info(
838 &self,
839 name: &str,
840 ) -> impl std::future::Future<Output = Result<Option<MessageVersionInfo>>> + Send;
841
842 fn deprecate_version(
844 &self,
845 name: &str,
846 version: u32,
847 message: String,
848 ) -> impl std::future::Future<Output = Result<()>> + Send;
849
850 fn is_version_deprecated(&self, name: &str, version: u32)
852 -> impl std::future::Future<Output = Result<bool>> + Send;
853}
854
855impl SchemaVersioningExt for Context {
856 async fn check_compatibility(
857 &self,
858 name: &str,
859 old_version: u32,
860 new_version: u32,
861 mode: CompatibilityMode,
862 ) -> Result<CompatibilityCheck> {
863 let old_schema = self
864 .get_schema(name, old_version)
865 .await?
866 .ok_or_else(|| Mecha10Error::Other(format!("Schema {} v{} not found", name, old_version)))?;
867
868 let new_schema = self
869 .get_schema(name, new_version)
870 .await?
871 .ok_or_else(|| Mecha10Error::Other(format!("Schema {} v{} not found", name, new_version)))?;
872
873 Ok(CompatibilityCheck::check(&old_schema, &new_schema, mode))
874 }
875
876 async fn register_versioned_schema(
877 &self,
878 schema: &MessageSchema,
879 mode: CompatibilityMode,
880 ) -> Result<CompatibilityCheck> {
881 if schema.version > 1 {
883 let prev_version = schema.version - 1;
884 let check = self
885 .check_compatibility(&schema.name, prev_version, schema.version, mode)
886 .await?;
887
888 if !check.compatible {
889 return Err(Mecha10Error::Other(format!(
890 "Schema compatibility check failed: {:?}",
891 check.incompatibilities
892 )));
893 }
894
895 self.register_schema(schema).await?;
897
898 Ok(check)
899 } else {
900 self.register_schema(schema).await?;
902
903 Ok(CompatibilityCheck {
904 compatible: true,
905 incompatibilities: Vec::new(),
906 evolutions: Vec::new(),
907 warnings: Vec::new(),
908 })
909 }
910 }
911
912 async fn get_version_info(&self, name: &str) -> Result<Option<MessageVersionInfo>> {
913 let latest = self.get_latest_schema(name).await?;
914
915 if let Some(latest_schema) = latest {
916 let deprecated = latest_schema.metadata.contains_key("deprecated");
918 let deprecation_message = latest_schema.metadata.get("deprecation_message").cloned();
919 let min_supported = latest_schema
920 .metadata
921 .get("min_supported_version")
922 .and_then(|v| v.parse().ok())
923 .unwrap_or(1);
924
925 let compatibility_mode = latest_schema
926 .metadata
927 .get("compatibility_mode")
928 .and_then(|m| serde_json::from_str(m).ok())
929 .unwrap_or(CompatibilityMode::Backward);
930
931 Ok(Some(MessageVersionInfo {
932 name: name.to_string(),
933 current_version: latest_schema.version,
934 latest_version: latest_schema.version,
935 deprecated,
936 deprecation_message,
937 min_supported_version: min_supported,
938 compatibility_mode,
939 }))
940 } else {
941 Ok(None)
942 }
943 }
944
945 async fn deprecate_version(&self, name: &str, version: u32, message: String) -> Result<()> {
946 let mut schema = self
947 .get_schema(name, version)
948 .await?
949 .ok_or_else(|| Mecha10Error::Other(format!("Schema {} v{} not found", name, version)))?;
950
951 schema.metadata.insert("deprecated".to_string(), "true".to_string());
952 schema.metadata.insert("deprecation_message".to_string(), message);
953
954 self.register_schema(&schema).await
955 }
956
957 async fn is_version_deprecated(&self, name: &str, version: u32) -> Result<bool> {
958 let schema = self
959 .get_schema(name, version)
960 .await?
961 .ok_or_else(|| Mecha10Error::Other(format!("Schema {} v{} not found", name, version)))?;
962
963 Ok(schema.metadata.contains_key("deprecated"))
964 }
965}