mecha10_core/
schema.rs

1//! Schema Registry
2//!
3//! Provides versioned message schema storage and retrieval.
4//! Enables schema evolution and documentation of message types.
5//!
6//! # Architecture
7//!
8//! - Stores schemas in Redis with versioning
9//! - Each schema has a name and version number
10//! - Schemas stored as JSON Schema format
11//! - Supports listing all schemas and retrieving specific versions
12//!
13//! # Example
14//!
15//! ```rust
16//! use mecha10::prelude::*;
17//! use mecha10::schema::{SchemaRegistryExt, MessageSchema};
18//!
19//! # async fn example(ctx: &Context) -> Result<()> {
20//! // Register a schema
21//! let schema = MessageSchema::new("LaserScan", 1)
22//!     .with_description("2D laser scan data")
23//!     .with_field("timestamp", "integer", true)
24//!     .with_field("ranges", "array", true)
25//!     .with_field("intensities", "array", false);
26//!
27//! ctx.register_schema(&schema).await?;
28//!
29//! // Get latest schema
30//! let schema = ctx.get_latest_schema("LaserScan").await?;
31//!
32//! // Get specific version
33//! let schema_v1 = ctx.get_schema("LaserScan", 1).await?;
34//!
35//! // List all schemas
36//! let all_schemas = ctx.list_schemas().await?;
37//!
38//! # Ok(())
39//! # }
40//! ```
41
42use crate::context::Context;
43use crate::error::{Mecha10Error, Result};
44use serde::{Deserialize, Serialize};
45use std::collections::HashMap;
46
47/// Message schema definition
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct MessageSchema {
50    /// Schema name (typically the message type name)
51    pub name: String,
52
53    /// Schema version (incremental)
54    pub version: u32,
55
56    /// Human-readable description
57    #[serde(default)]
58    pub description: String,
59
60    /// Schema fields definition
61    pub fields: Vec<SchemaField>,
62
63    /// Additional metadata
64    #[serde(default)]
65    pub metadata: HashMap<String, String>,
66
67    /// When this schema was registered (Unix timestamp)
68    #[serde(default)]
69    pub registered_at: u64,
70}
71
72/// Schema field definition
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct SchemaField {
75    /// Field name
76    pub name: String,
77
78    /// Field type (string, integer, number, boolean, array, object)
79    pub field_type: String,
80
81    /// Whether this field is required
82    pub required: bool,
83
84    /// Field description
85    #[serde(default)]
86    pub description: String,
87
88    /// For array types, the item type
89    #[serde(skip_serializing_if = "Option::is_none")]
90    pub items: Option<String>,
91}
92
93impl MessageSchema {
94    /// Create a new schema
95    pub fn new(name: impl Into<String>, version: u32) -> Self {
96        Self {
97            name: name.into(),
98            version,
99            description: String::new(),
100            fields: Vec::new(),
101            metadata: HashMap::new(),
102            registered_at: current_timestamp(),
103        }
104    }
105
106    /// Set description
107    pub fn with_description(mut self, description: impl Into<String>) -> Self {
108        self.description = description.into();
109        self
110    }
111
112    /// Add a field
113    pub fn with_field(mut self, name: impl Into<String>, field_type: impl Into<String>, required: bool) -> Self {
114        self.fields.push(SchemaField {
115            name: name.into(),
116            field_type: field_type.into(),
117            required,
118            description: String::new(),
119            items: None,
120        });
121        self
122    }
123
124    /// Add an array field with item type
125    pub fn with_array_field(mut self, name: impl Into<String>, items: impl Into<String>, required: bool) -> Self {
126        self.fields.push(SchemaField {
127            name: name.into(),
128            field_type: "array".to_string(),
129            required,
130            description: String::new(),
131            items: Some(items.into()),
132        });
133        self
134    }
135
136    /// Add metadata
137    pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
138        self.metadata.insert(key.into(), value.into());
139        self
140    }
141
142    /// Get the schema key for Redis
143    fn redis_key(&self) -> String {
144        format!("mecha10:schemas:{}:{}", self.name, self.version)
145    }
146
147    /// Get the latest version key for Redis
148    fn redis_latest_key(name: &str) -> String {
149        format!("mecha10:schemas:{}:latest", name)
150    }
151
152    /// Get the schema list key for Redis
153    fn redis_list_key() -> String {
154        "mecha10:schemas:list".to_string()
155    }
156}
157
158/// Schema registry extension trait for Context
159pub trait SchemaRegistryExt {
160    /// Register a schema in the registry
161    ///
162    /// # Arguments
163    ///
164    /// * `schema` - Schema to register
165    ///
166    /// # Example
167    ///
168    /// ```rust
169    /// use mecha10::prelude::*;
170    /// use mecha10::schema::{SchemaRegistryExt, MessageSchema};
171    ///
172    /// # async fn example(ctx: &Context) -> Result<()> {
173    /// let schema = MessageSchema::new("LaserScan", 1)
174    ///     .with_description("2D laser scan data")
175    ///     .with_field("timestamp", "integer", true)
176    ///     .with_field("ranges", "array", true);
177    ///
178    /// ctx.register_schema(&schema).await?;
179    /// # Ok(())
180    /// # }
181    /// ```
182    fn register_schema(&self, schema: &MessageSchema) -> impl std::future::Future<Output = Result<()>> + Send;
183
184    /// Get a specific version of a schema
185    ///
186    /// # Arguments
187    ///
188    /// * `name` - Schema name
189    /// * `version` - Schema version
190    fn get_schema(
191        &self,
192        name: &str,
193        version: u32,
194    ) -> impl std::future::Future<Output = Result<Option<MessageSchema>>> + Send;
195
196    /// Get the latest version of a schema
197    ///
198    /// # Arguments
199    ///
200    /// * `name` - Schema name
201    fn get_latest_schema(&self, name: &str) -> impl std::future::Future<Output = Result<Option<MessageSchema>>> + Send;
202
203    /// List all registered schemas (returns latest version of each)
204    fn list_schemas(&self) -> impl std::future::Future<Output = Result<Vec<MessageSchema>>> + Send;
205
206    /// Get all versions of a schema
207    ///
208    /// # Arguments
209    ///
210    /// * `name` - Schema name
211    fn get_schema_versions(&self, name: &str) -> impl std::future::Future<Output = Result<Vec<MessageSchema>>> + Send;
212
213    /// Delete a schema version
214    ///
215    /// # Arguments
216    ///
217    /// * `name` - Schema name
218    /// * `version` - Schema version (if None, deletes all versions)
219    fn delete_schema(&self, name: &str, version: Option<u32>) -> impl std::future::Future<Output = Result<()>> + Send;
220}
221
222impl SchemaRegistryExt for Context {
223    async fn register_schema(&self, schema: &MessageSchema) -> Result<()> {
224        #[cfg(feature = "messaging")]
225        {
226            use redis::AsyncCommands;
227
228            let redis_url = Context::get_redis_url()?;
229            let client = redis::Client::open(redis_url.as_str()).map_err(|e| Mecha10Error::MessagingError {
230                message: format!("Failed to connect to Redis: {}", e),
231                suggestion: "Ensure Redis is running".to_string(),
232            })?;
233
234            let mut conn =
235                client
236                    .get_multiplexed_async_connection()
237                    .await
238                    .map_err(|e| Mecha10Error::MessagingError {
239                        message: format!("Failed to get Redis connection: {}", e),
240                        suggestion: "Ensure Redis is running".to_string(),
241                    })?;
242
243            // Serialize schema
244            let schema_json = serde_json::to_string(&schema)
245                .map_err(|e| Mecha10Error::Other(format!("Failed to serialize schema: {}", e)))?;
246
247            // Store schema with version
248            let key = schema.redis_key();
249            conn.set::<_, _, ()>(&key, &schema_json)
250                .await
251                .map_err(|e| Mecha10Error::MessagingError {
252                    message: format!("Failed to register schema: {}", e),
253                    suggestion: "Check Redis connection".to_string(),
254                })?;
255
256            // Update latest version pointer
257            let latest_key = MessageSchema::redis_latest_key(&schema.name);
258            conn.set::<_, _, ()>(&latest_key, schema.version)
259                .await
260                .map_err(|e| Mecha10Error::MessagingError {
261                    message: format!("Failed to update latest version: {}", e),
262                    suggestion: "Check Redis connection".to_string(),
263                })?;
264
265            // Add to schema list
266            let list_key = MessageSchema::redis_list_key();
267            conn.sadd::<_, _, ()>(&list_key, &schema.name)
268                .await
269                .map_err(|e| Mecha10Error::MessagingError {
270                    message: format!("Failed to update schema list: {}", e),
271                    suggestion: "Check Redis connection".to_string(),
272                })?;
273
274            Ok(())
275        }
276
277        #[cfg(not(feature = "messaging"))]
278        {
279            Err(Mecha10Error::Other("Messaging feature not enabled".to_string()))
280        }
281    }
282
283    async fn get_schema(&self, name: &str, version: u32) -> Result<Option<MessageSchema>> {
284        #[cfg(feature = "messaging")]
285        {
286            use redis::AsyncCommands;
287
288            let redis_url = Context::get_redis_url()?;
289            let client = redis::Client::open(redis_url.as_str()).map_err(|e| Mecha10Error::MessagingError {
290                message: format!("Failed to connect to Redis: {}", e),
291                suggestion: "Ensure Redis is running".to_string(),
292            })?;
293
294            let mut conn =
295                client
296                    .get_multiplexed_async_connection()
297                    .await
298                    .map_err(|e| Mecha10Error::MessagingError {
299                        message: format!("Failed to get Redis connection: {}", e),
300                        suggestion: "Ensure Redis is running".to_string(),
301                    })?;
302
303            let key = format!("mecha10:schemas:{}:{}", name, version);
304            let json: Option<String> = conn.get(&key).await.map_err(|e| Mecha10Error::MessagingError {
305                message: format!("Failed to get schema: {}", e),
306                suggestion: "Check Redis connection".to_string(),
307            })?;
308
309            if let Some(json) = json {
310                let schema = serde_json::from_str::<MessageSchema>(&json)
311                    .map_err(|e| Mecha10Error::Other(format!("Failed to parse schema: {}", e)))?;
312                Ok(Some(schema))
313            } else {
314                Ok(None)
315            }
316        }
317
318        #[cfg(not(feature = "messaging"))]
319        {
320            Err(Mecha10Error::Other("Messaging feature not enabled".to_string()))
321        }
322    }
323
324    async fn get_latest_schema(&self, name: &str) -> Result<Option<MessageSchema>> {
325        #[cfg(feature = "messaging")]
326        {
327            use redis::AsyncCommands;
328
329            let redis_url = Context::get_redis_url()?;
330            let client = redis::Client::open(redis_url.as_str()).map_err(|e| Mecha10Error::MessagingError {
331                message: format!("Failed to connect to Redis: {}", e),
332                suggestion: "Ensure Redis is running".to_string(),
333            })?;
334
335            let mut conn =
336                client
337                    .get_multiplexed_async_connection()
338                    .await
339                    .map_err(|e| Mecha10Error::MessagingError {
340                        message: format!("Failed to get Redis connection: {}", e),
341                        suggestion: "Ensure Redis is running".to_string(),
342                    })?;
343
344            // Get latest version number
345            let latest_key = MessageSchema::redis_latest_key(name);
346            let version: Option<u32> = conn.get(&latest_key).await.map_err(|e| Mecha10Error::MessagingError {
347                message: format!("Failed to get latest version: {}", e),
348                suggestion: "Check Redis connection".to_string(),
349            })?;
350
351            if let Some(version) = version {
352                self.get_schema(name, version).await
353            } else {
354                Ok(None)
355            }
356        }
357
358        #[cfg(not(feature = "messaging"))]
359        {
360            Err(Mecha10Error::Other("Messaging feature not enabled".to_string()))
361        }
362    }
363
364    async fn list_schemas(&self) -> Result<Vec<MessageSchema>> {
365        #[cfg(feature = "messaging")]
366        {
367            use redis::AsyncCommands;
368
369            let redis_url = Context::get_redis_url()?;
370            let client = redis::Client::open(redis_url.as_str()).map_err(|e| Mecha10Error::MessagingError {
371                message: format!("Failed to connect to Redis: {}", e),
372                suggestion: "Ensure Redis is running".to_string(),
373            })?;
374
375            let mut conn =
376                client
377                    .get_multiplexed_async_connection()
378                    .await
379                    .map_err(|e| Mecha10Error::MessagingError {
380                        message: format!("Failed to get Redis connection: {}", e),
381                        suggestion: "Ensure Redis is running".to_string(),
382                    })?;
383
384            // Get all schema names
385            let list_key = MessageSchema::redis_list_key();
386            let names: Vec<String> = conn
387                .smembers(&list_key)
388                .await
389                .map_err(|e| Mecha10Error::MessagingError {
390                    message: format!("Failed to get schema list: {}", e),
391                    suggestion: "Check Redis connection".to_string(),
392                })?;
393
394            // Get latest version of each schema
395            let mut schemas = Vec::new();
396            for name in names {
397                if let Some(schema) = self.get_latest_schema(&name).await? {
398                    schemas.push(schema);
399                }
400            }
401
402            Ok(schemas)
403        }
404
405        #[cfg(not(feature = "messaging"))]
406        {
407            Err(Mecha10Error::Other("Messaging feature not enabled".to_string()))
408        }
409    }
410
411    async fn get_schema_versions(&self, name: &str) -> Result<Vec<MessageSchema>> {
412        #[cfg(feature = "messaging")]
413        {
414            use redis::AsyncCommands;
415
416            let redis_url = Context::get_redis_url()?;
417            let client = redis::Client::open(redis_url.as_str()).map_err(|e| Mecha10Error::MessagingError {
418                message: format!("Failed to connect to Redis: {}", e),
419                suggestion: "Ensure Redis is running".to_string(),
420            })?;
421
422            let mut conn =
423                client
424                    .get_multiplexed_async_connection()
425                    .await
426                    .map_err(|e| Mecha10Error::MessagingError {
427                        message: format!("Failed to get Redis connection: {}", e),
428                        suggestion: "Ensure Redis is running".to_string(),
429                    })?;
430
431            // Scan for all versions of this schema
432            let pattern = format!("mecha10:schemas:{}:*", name);
433            let keys: Vec<String> = conn.keys(&pattern).await.map_err(|e| Mecha10Error::MessagingError {
434                message: format!("Failed to scan for schema versions: {}", e),
435                suggestion: "Check Redis connection".to_string(),
436            })?;
437
438            let mut schemas = Vec::new();
439            for key in keys {
440                // Skip the 'latest' key
441                if key.ends_with(":latest") {
442                    continue;
443                }
444
445                let json: String = conn.get(&key).await.map_err(|e| Mecha10Error::MessagingError {
446                    message: format!("Failed to get schema: {}", e),
447                    suggestion: "Check Redis connection".to_string(),
448                })?;
449
450                if let Ok(schema) = serde_json::from_str::<MessageSchema>(&json) {
451                    schemas.push(schema);
452                }
453            }
454
455            // Sort by version
456            schemas.sort_by_key(|s| s.version);
457
458            Ok(schemas)
459        }
460
461        #[cfg(not(feature = "messaging"))]
462        {
463            Err(Mecha10Error::Other("Messaging feature not enabled".to_string()))
464        }
465    }
466
467    async fn delete_schema(&self, name: &str, version: Option<u32>) -> Result<()> {
468        #[cfg(feature = "messaging")]
469        {
470            use redis::AsyncCommands;
471
472            let redis_url = Context::get_redis_url()?;
473            let client = redis::Client::open(redis_url.as_str()).map_err(|e| Mecha10Error::MessagingError {
474                message: format!("Failed to connect to Redis: {}", e),
475                suggestion: "Ensure Redis is running".to_string(),
476            })?;
477
478            let mut conn =
479                client
480                    .get_multiplexed_async_connection()
481                    .await
482                    .map_err(|e| Mecha10Error::MessagingError {
483                        message: format!("Failed to get Redis connection: {}", e),
484                        suggestion: "Ensure Redis is running".to_string(),
485                    })?;
486
487            if let Some(version) = version {
488                // Delete specific version
489                let key = format!("mecha10:schemas:{}:{}", name, version);
490                conn.del::<_, ()>(&key)
491                    .await
492                    .map_err(|e| Mecha10Error::MessagingError {
493                        message: format!("Failed to delete schema: {}", e),
494                        suggestion: "Check Redis connection".to_string(),
495                    })?;
496            } else {
497                // Delete all versions
498                let pattern = format!("mecha10:schemas:{}:*", name);
499                let keys: Vec<String> = conn.keys(&pattern).await.map_err(|e| Mecha10Error::MessagingError {
500                    message: format!("Failed to scan for schemas: {}", e),
501                    suggestion: "Check Redis connection".to_string(),
502                })?;
503
504                for key in keys {
505                    conn.del::<_, ()>(&key)
506                        .await
507                        .map_err(|e| Mecha10Error::MessagingError {
508                            message: format!("Failed to delete schema: {}", e),
509                            suggestion: "Check Redis connection".to_string(),
510                        })?;
511                }
512
513                // Remove from schema list
514                let list_key = MessageSchema::redis_list_key();
515                conn.srem::<_, _, ()>(&list_key, name)
516                    .await
517                    .map_err(|e| Mecha10Error::MessagingError {
518                        message: format!("Failed to update schema list: {}", e),
519                        suggestion: "Check Redis connection".to_string(),
520                    })?;
521            }
522
523            Ok(())
524        }
525
526        #[cfg(not(feature = "messaging"))]
527        {
528            Err(Mecha10Error::Other("Messaging feature not enabled".to_string()))
529        }
530    }
531}
532
533/// Get current Unix timestamp in seconds
534fn current_timestamp() -> u64 {
535    std::time::SystemTime::now()
536        .duration_since(std::time::UNIX_EPOCH)
537        .unwrap()
538        .as_secs()
539}
540
541// ============================================================================
542// Message Versioning and Compatibility
543// ============================================================================
544
545/// Schema compatibility mode
546#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
547pub enum CompatibilityMode {
548    /// No compatibility checking
549    None,
550
551    /// New schema can read data written by old schema (backward compatible)
552    #[default]
553    Backward,
554
555    /// Old schema can read data written by new schema (forward compatible)
556    Forward,
557
558    /// Both backward and forward compatible
559    Full,
560
561    /// No schema changes allowed
562    Strict,
563}
564
565/// Schema evolution operation
566#[derive(Debug, Clone, Serialize, Deserialize)]
567pub enum SchemaEvolution {
568    /// Field added (name, type, optional/required, default_value)
569    FieldAdded {
570        name: String,
571        field_type: String,
572        required: bool,
573        default_value: Option<serde_json::Value>,
574    },
575
576    /// Field removed
577    FieldRemoved { name: String },
578
579    /// Field made optional (was required)
580    FieldMadeOptional { name: String },
581
582    /// Field made required (was optional) - requires default value
583    FieldMadeRequired {
584        name: String,
585        default_value: serde_json::Value,
586    },
587
588    /// Field type changed
589    FieldTypeChanged {
590        name: String,
591        old_type: String,
592        new_type: String,
593    },
594
595    /// Field renamed
596    FieldRenamed { old_name: String, new_name: String },
597}
598
599/// Schema compatibility check result
600#[derive(Debug, Clone, Serialize, Deserialize)]
601pub struct CompatibilityCheck {
602    /// Whether schemas are compatible
603    pub compatible: bool,
604
605    /// List of incompatibilities found
606    pub incompatibilities: Vec<String>,
607
608    /// List of evolutions detected
609    pub evolutions: Vec<SchemaEvolution>,
610
611    /// Warnings (non-breaking but notable changes)
612    pub warnings: Vec<String>,
613}
614
615impl CompatibilityCheck {
616    /// Check if two schemas are compatible
617    pub fn check(old: &MessageSchema, new: &MessageSchema, mode: CompatibilityMode) -> Self {
618        let mut incompatibilities = Vec::new();
619        let mut evolutions = Vec::new();
620        let mut warnings = Vec::new();
621
622        // Build field maps for easier comparison
623        let old_fields: HashMap<String, &SchemaField> = old.fields.iter().map(|f| (f.name.clone(), f)).collect();
624        let new_fields: HashMap<String, &SchemaField> = new.fields.iter().map(|f| (f.name.clone(), f)).collect();
625
626        match mode {
627            CompatibilityMode::None => {
628                // No checking required
629                return Self {
630                    compatible: true,
631                    incompatibilities,
632                    evolutions,
633                    warnings,
634                };
635            }
636            CompatibilityMode::Strict => {
637                // No changes allowed
638                if old.fields.len() != new.fields.len() {
639                    incompatibilities.push("Field count changed".to_string());
640                }
641                for (name, old_field) in &old_fields {
642                    if let Some(new_field) = new_fields.get(name) {
643                        if old_field.field_type != new_field.field_type {
644                            incompatibilities.push(format!("Field '{}' type changed", name));
645                        }
646                        if old_field.required != new_field.required {
647                            incompatibilities.push(format!("Field '{}' required status changed", name));
648                        }
649                    } else {
650                        incompatibilities.push(format!("Field '{}' removed", name));
651                    }
652                }
653                for name in new_fields.keys() {
654                    if !old_fields.contains_key(name) {
655                        incompatibilities.push(format!("Field '{}' added", name));
656                    }
657                }
658            }
659            CompatibilityMode::Backward => {
660                // New schema can read old data
661                // - Can add optional fields
662                // - Can make required fields optional
663                // - Cannot remove required fields
664                // - Cannot add required fields
665                // - Cannot change field types
666
667                for (name, old_field) in &old_fields {
668                    if let Some(new_field) = new_fields.get(name) {
669                        // Field exists in both
670                        if old_field.field_type != new_field.field_type {
671                            incompatibilities.push(format!(
672                                "Field '{}' type changed from {} to {}",
673                                name, old_field.field_type, new_field.field_type
674                            ));
675                            evolutions.push(SchemaEvolution::FieldTypeChanged {
676                                name: name.clone(),
677                                old_type: old_field.field_type.clone(),
678                                new_type: new_field.field_type.clone(),
679                            });
680                        }
681
682                        if old_field.required && !new_field.required {
683                            // Field made optional - OK for backward compatibility
684                            evolutions.push(SchemaEvolution::FieldMadeOptional { name: name.clone() });
685                        } else if !old_field.required && new_field.required {
686                            // Field made required - breaks backward compatibility
687                            incompatibilities.push(format!("Field '{}' made required (was optional)", name));
688                            evolutions.push(SchemaEvolution::FieldMadeRequired {
689                                name: name.clone(),
690                                default_value: serde_json::Value::Null,
691                            });
692                        }
693                    } else {
694                        // Field removed
695                        if old_field.required {
696                            incompatibilities.push(format!("Required field '{}' removed", name));
697                        } else {
698                            warnings.push(format!("Optional field '{}' removed", name));
699                        }
700                        evolutions.push(SchemaEvolution::FieldRemoved { name: name.clone() });
701                    }
702                }
703
704                // Check for added fields
705                for (name, new_field) in &new_fields {
706                    if !old_fields.contains_key(name) {
707                        if new_field.required {
708                            incompatibilities.push(format!("Required field '{}' added", name));
709                        } else {
710                            warnings.push(format!("Optional field '{}' added", name));
711                        }
712                        evolutions.push(SchemaEvolution::FieldAdded {
713                            name: name.clone(),
714                            field_type: new_field.field_type.clone(),
715                            required: new_field.required,
716                            default_value: None,
717                        });
718                    }
719                }
720            }
721            CompatibilityMode::Forward => {
722                // Old schema can read new data
723                // - Can remove optional fields
724                // - Can make optional fields required
725                // - Cannot add required fields
726                // - Cannot remove required fields
727                // - Cannot change field types
728
729                for (name, old_field) in &old_fields {
730                    if let Some(new_field) = new_fields.get(name) {
731                        if old_field.field_type != new_field.field_type {
732                            incompatibilities.push(format!("Field '{}' type changed", name));
733                            evolutions.push(SchemaEvolution::FieldTypeChanged {
734                                name: name.clone(),
735                                old_type: old_field.field_type.clone(),
736                                new_type: new_field.field_type.clone(),
737                            });
738                        }
739                    } else if old_field.required {
740                        incompatibilities.push(format!("Required field '{}' removed", name));
741                        evolutions.push(SchemaEvolution::FieldRemoved { name: name.clone() });
742                    }
743                }
744
745                for (name, new_field) in &new_fields {
746                    if !old_fields.contains_key(name) && new_field.required {
747                        incompatibilities.push(format!("Required field '{}' added", name));
748                        evolutions.push(SchemaEvolution::FieldAdded {
749                            name: name.clone(),
750                            field_type: new_field.field_type.clone(),
751                            required: new_field.required,
752                            default_value: None,
753                        });
754                    }
755                }
756            }
757            CompatibilityMode::Full => {
758                // Both backward and forward compatible
759                // - Can only add/remove optional fields
760                // - Cannot change field types
761                // - Cannot change required status
762
763                for (name, old_field) in &old_fields {
764                    if let Some(new_field) = new_fields.get(name) {
765                        if old_field.field_type != new_field.field_type {
766                            incompatibilities.push(format!("Field '{}' type changed", name));
767                        }
768                        if old_field.required != new_field.required {
769                            incompatibilities.push(format!("Field '{}' required status changed", name));
770                        }
771                    } else if old_field.required {
772                        incompatibilities.push(format!("Required field '{}' removed", name));
773                    }
774                }
775
776                for (name, new_field) in &new_fields {
777                    if !old_fields.contains_key(name) && new_field.required {
778                        incompatibilities.push(format!("Required field '{}' added", name));
779                    }
780                }
781            }
782        }
783
784        Self {
785            compatible: incompatibilities.is_empty(),
786            incompatibilities,
787            evolutions,
788            warnings,
789        }
790    }
791}
792
793/// Message version metadata
794#[derive(Debug, Clone, Serialize, Deserialize)]
795pub struct MessageVersionInfo {
796    /// Schema name
797    pub name: String,
798
799    /// Current version being used
800    pub current_version: u32,
801
802    /// Latest available version
803    pub latest_version: u32,
804
805    /// Whether current version is deprecated
806    pub deprecated: bool,
807
808    /// Deprecation message if applicable
809    pub deprecation_message: Option<String>,
810
811    /// Minimum supported version
812    pub min_supported_version: u32,
813
814    /// Compatibility mode for this message type
815    pub compatibility_mode: CompatibilityMode,
816}
817
818/// Extension trait for versioned schema operations
819pub trait SchemaVersioningExt: SchemaRegistryExt {
820    /// Check compatibility between two schema versions
821    fn check_compatibility(
822        &self,
823        name: &str,
824        old_version: u32,
825        new_version: u32,
826        mode: CompatibilityMode,
827    ) -> impl std::future::Future<Output = Result<CompatibilityCheck>> + Send;
828
829    /// Register a new schema version with compatibility check
830    fn register_versioned_schema(
831        &self,
832        schema: &MessageSchema,
833        mode: CompatibilityMode,
834    ) -> impl std::future::Future<Output = Result<CompatibilityCheck>> + Send;
835
836    /// Get version info for a message type
837    fn get_version_info(
838        &self,
839        name: &str,
840    ) -> impl std::future::Future<Output = Result<Option<MessageVersionInfo>>> + Send;
841
842    /// Mark a schema version as deprecated
843    fn deprecate_version(
844        &self,
845        name: &str,
846        version: u32,
847        message: String,
848    ) -> impl std::future::Future<Output = Result<()>> + Send;
849
850    /// Check if using a deprecated version
851    fn is_version_deprecated(&self, name: &str, version: u32)
852        -> impl std::future::Future<Output = Result<bool>> + Send;
853}
854
855impl SchemaVersioningExt for Context {
856    async fn check_compatibility(
857        &self,
858        name: &str,
859        old_version: u32,
860        new_version: u32,
861        mode: CompatibilityMode,
862    ) -> Result<CompatibilityCheck> {
863        let old_schema = self
864            .get_schema(name, old_version)
865            .await?
866            .ok_or_else(|| Mecha10Error::Other(format!("Schema {} v{} not found", name, old_version)))?;
867
868        let new_schema = self
869            .get_schema(name, new_version)
870            .await?
871            .ok_or_else(|| Mecha10Error::Other(format!("Schema {} v{} not found", name, new_version)))?;
872
873        Ok(CompatibilityCheck::check(&old_schema, &new_schema, mode))
874    }
875
876    async fn register_versioned_schema(
877        &self,
878        schema: &MessageSchema,
879        mode: CompatibilityMode,
880    ) -> Result<CompatibilityCheck> {
881        // Get previous version if exists
882        if schema.version > 1 {
883            let prev_version = schema.version - 1;
884            let check = self
885                .check_compatibility(&schema.name, prev_version, schema.version, mode)
886                .await?;
887
888            if !check.compatible {
889                return Err(Mecha10Error::Other(format!(
890                    "Schema compatibility check failed: {:?}",
891                    check.incompatibilities
892                )));
893            }
894
895            // Register the schema
896            self.register_schema(schema).await?;
897
898            Ok(check)
899        } else {
900            // First version - no compatibility check needed
901            self.register_schema(schema).await?;
902
903            Ok(CompatibilityCheck {
904                compatible: true,
905                incompatibilities: Vec::new(),
906                evolutions: Vec::new(),
907                warnings: Vec::new(),
908            })
909        }
910    }
911
912    async fn get_version_info(&self, name: &str) -> Result<Option<MessageVersionInfo>> {
913        let latest = self.get_latest_schema(name).await?;
914
915        if let Some(latest_schema) = latest {
916            // Check for deprecation metadata
917            let deprecated = latest_schema.metadata.contains_key("deprecated");
918            let deprecation_message = latest_schema.metadata.get("deprecation_message").cloned();
919            let min_supported = latest_schema
920                .metadata
921                .get("min_supported_version")
922                .and_then(|v| v.parse().ok())
923                .unwrap_or(1);
924
925            let compatibility_mode = latest_schema
926                .metadata
927                .get("compatibility_mode")
928                .and_then(|m| serde_json::from_str(m).ok())
929                .unwrap_or(CompatibilityMode::Backward);
930
931            Ok(Some(MessageVersionInfo {
932                name: name.to_string(),
933                current_version: latest_schema.version,
934                latest_version: latest_schema.version,
935                deprecated,
936                deprecation_message,
937                min_supported_version: min_supported,
938                compatibility_mode,
939            }))
940        } else {
941            Ok(None)
942        }
943    }
944
945    async fn deprecate_version(&self, name: &str, version: u32, message: String) -> Result<()> {
946        let mut schema = self
947            .get_schema(name, version)
948            .await?
949            .ok_or_else(|| Mecha10Error::Other(format!("Schema {} v{} not found", name, version)))?;
950
951        schema.metadata.insert("deprecated".to_string(), "true".to_string());
952        schema.metadata.insert("deprecation_message".to_string(), message);
953
954        self.register_schema(&schema).await
955    }
956
957    async fn is_version_deprecated(&self, name: &str, version: u32) -> Result<bool> {
958        let schema = self
959            .get_schema(name, version)
960            .await?
961            .ok_or_else(|| Mecha10Error::Other(format!("Schema {} v{} not found", name, version)))?;
962
963        Ok(schema.metadata.contains_key("deprecated"))
964    }
965}