Skip to main content

allframe_core/cqrs/
event_versioning.rs

1//! Event Versioning and Upcasting for schema evolution
2//!
3//! This module provides automatic event versioning and migration, eliminating
4//! the need for manual version checking and conversion code.
5
6use std::{any::TypeId, collections::HashMap, marker::PhantomData, sync::Arc};
7
8use tokio::sync::RwLock;
9
10use super::Event;
11
12/// Trait for versioned events
13pub trait VersionedEvent: Event {
14    /// Get the version number of this event
15    fn version(&self) -> u32;
16
17    /// Get the event type name (for serialization)
18    fn event_type(&self) -> &'static str;
19}
20
21/// Trait for upcasting events from one version to another
22pub trait Upcaster<From: Event, To: Event>: Send + Sync {
23    /// Convert an event from an older version to a newer version
24    fn upcast(&self, from: From) -> To;
25}
26
27/// Automatic upcaster using From trait
28pub struct AutoUpcaster<From: Event, To: Event> {
29    _phantom: PhantomData<(From, To)>,
30}
31
32impl<From: Event, To: Event> AutoUpcaster<From, To>
33where
34    To: std::convert::From<From>,
35{
36    /// Create a new automatic upcaster
37    pub fn new() -> Self {
38        Self {
39            _phantom: PhantomData,
40        }
41    }
42}
43
44impl<From: Event, To: Event> Default for AutoUpcaster<From, To>
45where
46    To: std::convert::From<From>,
47{
48    fn default() -> Self {
49        Self::new()
50    }
51}
52
53impl<From: Event, To: Event> Upcaster<From, To> for AutoUpcaster<From, To>
54where
55    To: std::convert::From<From>,
56{
57    fn upcast(&self, from: From) -> To {
58        from.into()
59    }
60}
61
62/// Type-erased upcaster for registry storage
63trait ErasedUpcaster<E: Event>: Send + Sync {
64    /// Upcast an event, returning the new version
65    #[allow(dead_code)]
66    fn upcast_erased(&self, event: Box<dyn std::any::Any>) -> Option<E>;
67}
68
69/// Wrapper for concrete upcasters
70struct UpcasterWrapper<From: Event, To: Event, U: Upcaster<From, To>> {
71    #[allow(dead_code)]
72    upcaster: Arc<U>,
73    _phantom: PhantomData<(From, To)>,
74}
75
76impl<From: Event, To: Event, U: Upcaster<From, To>> ErasedUpcaster<To>
77    for UpcasterWrapper<From, To, U>
78{
79    fn upcast_erased(&self, event: Box<dyn std::any::Any>) -> Option<To> {
80        match event.downcast::<From>() {
81            Ok(from_event) => Some(self.upcaster.upcast(*from_event)),
82            Err(_) => None,
83        }
84    }
85}
86
87/// Type alias for upcaster storage
88type UpcasterMap<E> = HashMap<(TypeId, TypeId), Box<dyn ErasedUpcaster<E>>>;
89
90/// Migration path from one event version to another
91#[derive(Debug, Clone)]
92pub struct MigrationPath {
93    /// Starting version
94    pub from_version: u32,
95    /// Target version
96    pub to_version: u32,
97    /// Event type name
98    pub event_type: String,
99}
100
101impl MigrationPath {
102    /// Create a new migration path
103    pub fn new(from: u32, to: u32, event_type: impl Into<String>) -> Self {
104        Self {
105            from_version: from,
106            to_version: to,
107            event_type: event_type.into(),
108        }
109    }
110}
111
112/// Version registry for managing event versions and migrations
113pub struct VersionRegistry<E: Event> {
114    /// Registered upcasters by (from_type, to_type)
115    upcasters: Arc<RwLock<UpcasterMap<E>>>,
116    /// Migration paths by event type
117    migrations: Arc<RwLock<HashMap<String, Vec<MigrationPath>>>>,
118    _phantom: PhantomData<E>,
119}
120
121impl<E: Event> VersionRegistry<E> {
122    /// Create a new version registry
123    pub fn new() -> Self {
124        Self {
125            upcasters: Arc::new(RwLock::new(HashMap::new())),
126            migrations: Arc::new(RwLock::new(HashMap::new())),
127            _phantom: PhantomData,
128        }
129    }
130
131    /// Register an upcaster for converting from one event version to another
132    pub async fn register_upcaster<F: Event + 'static, U: Upcaster<F, E> + 'static>(
133        &self,
134        upcaster: U,
135    ) {
136        let from_type = TypeId::of::<F>();
137        let to_type = TypeId::of::<E>();
138
139        let wrapper = UpcasterWrapper {
140            upcaster: Arc::new(upcaster),
141            _phantom: PhantomData,
142        };
143
144        let mut upcasters = self.upcasters.write().await;
145        upcasters.insert((from_type, to_type), Box::new(wrapper));
146    }
147
148    /// Register a migration path
149    pub async fn register_migration(&self, path: MigrationPath) {
150        let mut migrations = self.migrations.write().await;
151        migrations
152            .entry(path.event_type.clone())
153            .or_insert_with(Vec::new)
154            .push(path);
155    }
156
157    /// Get all registered migration paths
158    pub async fn get_migrations(&self) -> Vec<MigrationPath> {
159        let migrations = self.migrations.read().await;
160        migrations.values().flatten().cloned().collect()
161    }
162
163    /// Get migrations for a specific event type
164    pub async fn get_migrations_for(&self, event_type: &str) -> Vec<MigrationPath> {
165        let migrations = self.migrations.read().await;
166        migrations.get(event_type).cloned().unwrap_or_default()
167    }
168
169    /// Check if an upcaster is registered
170    pub async fn has_upcaster<F: Event + 'static, T: Event + 'static>(&self) -> bool {
171        let from_type = TypeId::of::<F>();
172        let to_type = TypeId::of::<T>();
173        let upcasters = self.upcasters.read().await;
174        upcasters.contains_key(&(from_type, to_type))
175    }
176
177    /// Get number of registered upcasters
178    pub async fn upcaster_count(&self) -> usize {
179        self.upcasters.read().await.len()
180    }
181
182    /// Get number of registered migrations
183    pub async fn migration_count(&self) -> usize {
184        self.migrations.read().await.values().map(|v| v.len()).sum()
185    }
186}
187
188impl<E: Event> Default for VersionRegistry<E> {
189    fn default() -> Self {
190        Self::new()
191    }
192}
193
194impl<E: Event> Clone for VersionRegistry<E> {
195    fn clone(&self) -> Self {
196        Self {
197            upcasters: Arc::clone(&self.upcasters),
198            migrations: Arc::clone(&self.migrations),
199            _phantom: PhantomData,
200        }
201    }
202}
203
204/// Helper macro for defining versioned events (simplified version)
205///
206/// In a real implementation, this would be a proc macro that generates
207/// the boilerplate automatically. For now, this is a documentation example.
208///
209/// # Example
210/// ```ignore
211/// #[versioned_event(version = 2)]
212/// #[migration(from = 1, via = "upgrade_v1_to_v2")]
213/// struct UserCreated {
214///     user_id: String,
215///     email: String,
216///     #[added(version = 2, default = "Unknown")]
217///     name: String,
218/// }
219/// ```
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224    use crate::cqrs::EventTypeName;
225
226    #[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
227    struct UserCreatedV1 {
228        user_id: String,
229        email: String,
230    }
231
232    impl EventTypeName for UserCreatedV1 {}
233    impl Event for UserCreatedV1 {}
234
235    #[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
236    struct UserCreatedV2 {
237        user_id: String,
238        email: String,
239        name: String,
240    }
241
242    impl EventTypeName for UserCreatedV2 {}
243    impl Event for UserCreatedV2 {}
244
245    impl From<UserCreatedV1> for UserCreatedV2 {
246        fn from(v1: UserCreatedV1) -> Self {
247            Self {
248                user_id: v1.user_id,
249                email: v1.email,
250                name: "Unknown".to_string(),
251            }
252        }
253    }
254
255    #[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
256    enum TestEvent {
257        #[allow(dead_code)]
258        V1(UserCreatedV1),
259        V2(UserCreatedV2),
260    }
261
262    impl EventTypeName for TestEvent {}
263    impl Event for TestEvent {}
264
265    impl From<UserCreatedV2> for TestEvent {
266        fn from(v2: UserCreatedV2) -> Self {
267            TestEvent::V2(v2)
268        }
269    }
270
271    #[tokio::test]
272    async fn test_registry_creation() {
273        let registry: VersionRegistry<TestEvent> = VersionRegistry::new();
274        assert_eq!(registry.upcaster_count().await, 0);
275        assert_eq!(registry.migration_count().await, 0);
276    }
277
278    #[tokio::test]
279    async fn test_upcaster_registration() {
280        let registry: VersionRegistry<UserCreatedV2> = VersionRegistry::new();
281
282        // Register automatic upcaster using From trait
283        registry
284            .register_upcaster(AutoUpcaster::<UserCreatedV1, UserCreatedV2>::new())
285            .await;
286
287        assert_eq!(registry.upcaster_count().await, 1);
288        assert!(
289            registry
290                .has_upcaster::<UserCreatedV1, UserCreatedV2>()
291                .await
292        );
293    }
294
295    #[tokio::test]
296    async fn test_migration_path_registration() {
297        let registry: VersionRegistry<TestEvent> = VersionRegistry::new();
298
299        let path = MigrationPath::new(1, 2, "UserCreated");
300        registry.register_migration(path).await;
301
302        assert_eq!(registry.migration_count().await, 1);
303
304        let migrations = registry.get_migrations_for("UserCreated").await;
305        assert_eq!(migrations.len(), 1);
306        assert_eq!(migrations[0].from_version, 1);
307        assert_eq!(migrations[0].to_version, 2);
308    }
309
310    #[tokio::test]
311    async fn test_multiple_migrations() {
312        let registry: VersionRegistry<TestEvent> = VersionRegistry::new();
313
314        // Register migration chain: v1 -> v2 -> v3
315        registry
316            .register_migration(MigrationPath::new(1, 2, "UserCreated"))
317            .await;
318        registry
319            .register_migration(MigrationPath::new(2, 3, "UserCreated"))
320            .await;
321
322        assert_eq!(registry.migration_count().await, 2);
323
324        let migrations = registry.get_migrations_for("UserCreated").await;
325        assert_eq!(migrations.len(), 2);
326    }
327
328    #[tokio::test]
329    async fn test_auto_upcaster() {
330        let upcaster = AutoUpcaster::<UserCreatedV1, UserCreatedV2>::new();
331
332        let v1 = UserCreatedV1 {
333            user_id: "123".to_string(),
334            email: "test@example.com".to_string(),
335        };
336
337        let v2 = upcaster.upcast(v1.clone());
338
339        assert_eq!(v2.user_id, v1.user_id);
340        assert_eq!(v2.email, v1.email);
341        assert_eq!(v2.name, "Unknown");
342    }
343
344    #[test]
345    fn test_migration_path_creation() {
346        let path = MigrationPath::new(1, 2, "UserCreated");
347
348        assert_eq!(path.from_version, 1);
349        assert_eq!(path.to_version, 2);
350        assert_eq!(path.event_type, "UserCreated");
351    }
352}