celers_protocol/
migration.rs

1//! Protocol version migration helpers
2//!
3//! This module provides utilities for migrating messages between different
4//! Celery protocol versions (v2 and v5).
5//!
6//! # Example
7//!
8//! ```
9//! use celers_protocol::migration::{ProtocolMigrator, MigrationStrategy};
10//! use celers_protocol::{ProtocolVersion, Message, TaskArgs};
11//! use uuid::Uuid;
12//!
13//! let task_id = Uuid::new_v4();
14//! let body = serde_json::to_vec(&TaskArgs::new()).unwrap();
15//! let msg = Message::new("tasks.add".to_string(), task_id, body);
16//!
17//! let migrator = ProtocolMigrator::new(MigrationStrategy::Conservative);
18//! let info = migrator.check_compatibility(&msg, ProtocolVersion::V5);
19//! assert!(info.is_compatible);
20//! ```
21
22use crate::{Message, ProtocolVersion};
23
24/// Migration strategy
25#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
26pub enum MigrationStrategy {
27    /// Conservative: Only migrate if fully compatible
28    #[default]
29    Conservative,
30    /// Permissive: Migrate with warnings for potential issues
31    Permissive,
32    /// Strict: Require exact feature parity
33    Strict,
34}
35
36/// Migration compatibility information
37#[derive(Debug, Clone)]
38pub struct CompatibilityInfo {
39    /// Whether the message is compatible with the target version
40    pub is_compatible: bool,
41    /// Protocol version being migrated from
42    pub from_version: ProtocolVersion,
43    /// Protocol version being migrated to
44    pub to_version: ProtocolVersion,
45    /// Any warnings or issues
46    pub warnings: Vec<String>,
47    /// Features that may not be supported
48    pub unsupported_features: Vec<String>,
49}
50
51/// Protocol migrator for version transitions
52pub struct ProtocolMigrator {
53    strategy: MigrationStrategy,
54}
55
56impl ProtocolMigrator {
57    /// Create a new protocol migrator with the given strategy
58    pub fn new(strategy: MigrationStrategy) -> Self {
59        Self { strategy }
60    }
61
62    /// Check if a message is compatible with a target protocol version
63    pub fn check_compatibility(
64        &self,
65        message: &Message,
66        target_version: ProtocolVersion,
67    ) -> CompatibilityInfo {
68        let from_version = ProtocolVersion::V2; // Default assumption
69        let mut warnings = Vec::new();
70        let unsupported_features = Vec::new();
71
72        // Check for features that may not be fully supported across versions
73        if message.has_group() && target_version == ProtocolVersion::V2 {
74            warnings
75                .push("Group ID is supported in v2 but may have limited functionality".to_string());
76        }
77
78        if message.has_parent() || message.has_root() {
79            warnings.push(
80                "Workflow tracking (parent/root) support varies between versions".to_string(),
81            );
82        }
83
84        // Check priority support
85        if message.properties.priority.is_some() {
86            warnings.push("Priority support may vary between broker implementations".to_string());
87        }
88
89        // Determine compatibility based on strategy
90        let is_compatible = match self.strategy {
91            MigrationStrategy::Conservative => {
92                warnings.is_empty() && unsupported_features.is_empty()
93            }
94            MigrationStrategy::Permissive => true, // Always allow migration
95            MigrationStrategy::Strict => {
96                warnings.is_empty()
97                    && unsupported_features.is_empty()
98                    && self.check_strict_compatibility(message, target_version)
99            }
100        };
101
102        CompatibilityInfo {
103            is_compatible,
104            from_version,
105            to_version: target_version,
106            warnings,
107            unsupported_features,
108        }
109    }
110
111    /// Migrate a message to a different protocol version
112    pub fn migrate(
113        &self,
114        message: Message,
115        target_version: ProtocolVersion,
116    ) -> Result<Message, MigrationError> {
117        let compat = self.check_compatibility(&message, target_version);
118
119        if !compat.is_compatible && self.strategy == MigrationStrategy::Conservative {
120            return Err(MigrationError::IncompatibleVersion {
121                from: compat.from_version,
122                to: target_version,
123                reasons: compat.warnings,
124            });
125        }
126
127        // For now, message structure is the same between v2 and v5
128        // In a real implementation, you might transform headers or properties
129        Ok(message)
130    }
131
132    fn check_strict_compatibility(&self, _message: &Message, _target: ProtocolVersion) -> bool {
133        // In strict mode, ensure all features are fully supported
134        // For now, return true as v2 and v5 are largely compatible
135        true
136    }
137
138    /// Get the current strategy
139    pub fn strategy(&self) -> MigrationStrategy {
140        self.strategy
141    }
142
143    /// Set a new strategy
144    pub fn set_strategy(&mut self, strategy: MigrationStrategy) {
145        self.strategy = strategy;
146    }
147}
148
149impl Default for ProtocolMigrator {
150    fn default() -> Self {
151        Self::new(MigrationStrategy::Conservative)
152    }
153}
154
155/// Migration error
156#[derive(Debug, Clone)]
157pub enum MigrationError {
158    /// Version incompatibility
159    IncompatibleVersion {
160        from: ProtocolVersion,
161        to: ProtocolVersion,
162        reasons: Vec<String>,
163    },
164    /// Feature not supported in target version
165    UnsupportedFeature {
166        feature: String,
167        version: ProtocolVersion,
168    },
169    /// Validation error
170    ValidationError(String),
171}
172
173impl std::fmt::Display for MigrationError {
174    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175        match self {
176            MigrationError::IncompatibleVersion { from, to, reasons } => {
177                write!(
178                    f,
179                    "Incompatible migration from {} to {}: {}",
180                    from,
181                    to,
182                    reasons.join(", ")
183                )
184            }
185            MigrationError::UnsupportedFeature { feature, version } => {
186                write!(f, "Feature '{}' not supported in {}", feature, version)
187            }
188            MigrationError::ValidationError(msg) => write!(f, "Validation error: {}", msg),
189        }
190    }
191}
192
193impl std::error::Error for MigrationError {}
194
195/// Helper function to create a migration plan
196pub fn create_migration_plan(from: ProtocolVersion, to: ProtocolVersion) -> Vec<MigrationStep> {
197    let mut steps = Vec::new();
198
199    if from != to {
200        steps.push(MigrationStep {
201            description: format!("Migrate from {} to {}", from, to),
202            from_version: from,
203            to_version: to,
204            required: true,
205        });
206
207        // Add any intermediate steps if needed
208        if from == ProtocolVersion::V2 && to == ProtocolVersion::V5 {
209            steps.push(MigrationStep {
210                description: "Verify message format compatibility".to_string(),
211                from_version: from,
212                to_version: to,
213                required: true,
214            });
215
216            steps.push(MigrationStep {
217                description: "Update any version-specific headers".to_string(),
218                from_version: from,
219                to_version: to,
220                required: false,
221            });
222        }
223    }
224
225    steps
226}
227
228/// Migration step
229#[derive(Debug, Clone)]
230pub struct MigrationStep {
231    /// Description of the step
232    pub description: String,
233    /// Source version
234    pub from_version: ProtocolVersion,
235    /// Target version
236    pub to_version: ProtocolVersion,
237    /// Whether this step is required
238    pub required: bool,
239}
240
241#[cfg(test)]
242mod tests {
243    use super::*;
244    use crate::TaskArgs;
245    use uuid::Uuid;
246
247    #[test]
248    fn test_migrator_default() {
249        let migrator = ProtocolMigrator::default();
250        assert_eq!(migrator.strategy(), MigrationStrategy::Conservative);
251    }
252
253    #[test]
254    fn test_migrator_set_strategy() {
255        let mut migrator = ProtocolMigrator::default();
256        migrator.set_strategy(MigrationStrategy::Permissive);
257        assert_eq!(migrator.strategy(), MigrationStrategy::Permissive);
258    }
259
260    #[test]
261    fn test_check_compatibility_basic() {
262        let task_id = Uuid::new_v4();
263        let body = serde_json::to_vec(&TaskArgs::new()).unwrap();
264        let msg = Message::new("tasks.add".to_string(), task_id, body);
265
266        let migrator = ProtocolMigrator::new(MigrationStrategy::Conservative);
267        let info = migrator.check_compatibility(&msg, ProtocolVersion::V5);
268
269        assert!(info.is_compatible);
270        assert_eq!(info.to_version, ProtocolVersion::V5);
271    }
272
273    #[test]
274    fn test_check_compatibility_with_warnings() {
275        let task_id = Uuid::new_v4();
276        let body = serde_json::to_vec(&TaskArgs::new()).unwrap();
277        let msg = Message::new("tasks.add".to_string(), task_id, body)
278            .with_priority(5)
279            .with_group(Uuid::new_v4());
280
281        let migrator = ProtocolMigrator::new(MigrationStrategy::Permissive);
282        let info = migrator.check_compatibility(&msg, ProtocolVersion::V2);
283
284        assert!(info.is_compatible); // Permissive allows it
285        assert!(!info.warnings.is_empty());
286    }
287
288    #[test]
289    fn test_migrate_basic_message() {
290        let task_id = Uuid::new_v4();
291        let body = serde_json::to_vec(&TaskArgs::new()).unwrap();
292        let msg = Message::new("tasks.add".to_string(), task_id, body.clone());
293
294        let migrator = ProtocolMigrator::new(MigrationStrategy::Conservative);
295        let migrated = migrator.migrate(msg, ProtocolVersion::V5).unwrap();
296
297        assert_eq!(migrated.task_id(), task_id);
298        assert_eq!(migrated.body, body);
299    }
300
301    #[test]
302    fn test_migrate_permissive() {
303        let task_id = Uuid::new_v4();
304        let body = serde_json::to_vec(&TaskArgs::new()).unwrap();
305        let msg = Message::new("tasks.add".to_string(), task_id, body)
306            .with_priority(9)
307            .with_group(Uuid::new_v4());
308
309        let migrator = ProtocolMigrator::new(MigrationStrategy::Permissive);
310        let result = migrator.migrate(msg, ProtocolVersion::V5);
311
312        assert!(result.is_ok());
313    }
314
315    #[test]
316    fn test_create_migration_plan_same_version() {
317        let plan = create_migration_plan(ProtocolVersion::V2, ProtocolVersion::V2);
318        assert_eq!(plan.len(), 0);
319    }
320
321    #[test]
322    fn test_create_migration_plan_v2_to_v5() {
323        let plan = create_migration_plan(ProtocolVersion::V2, ProtocolVersion::V5);
324        assert!(!plan.is_empty());
325        assert!(plan.iter().any(|step| step.required));
326    }
327
328    #[test]
329    fn test_migration_error_display() {
330        let err = MigrationError::IncompatibleVersion {
331            from: ProtocolVersion::V2,
332            to: ProtocolVersion::V5,
333            reasons: vec!["test reason".to_string()],
334        };
335        assert!(err.to_string().contains("Incompatible migration"));
336
337        let err = MigrationError::UnsupportedFeature {
338            feature: "test_feature".to_string(),
339            version: ProtocolVersion::V2,
340        };
341        assert!(err.to_string().contains("not supported"));
342
343        let err = MigrationError::ValidationError("test error".to_string());
344        assert!(err.to_string().contains("Validation error"));
345    }
346
347    #[test]
348    fn test_compatibility_info_structure() {
349        let task_id = Uuid::new_v4();
350        let body = vec![1, 2, 3];
351        let msg = Message::new("tasks.test".to_string(), task_id, body);
352
353        let migrator = ProtocolMigrator::new(MigrationStrategy::Strict);
354        let info = migrator.check_compatibility(&msg, ProtocolVersion::V5);
355
356        assert_eq!(info.from_version, ProtocolVersion::V2);
357        assert_eq!(info.to_version, ProtocolVersion::V5);
358    }
359
360    #[test]
361    fn test_migration_strategy_equality() {
362        assert_eq!(
363            MigrationStrategy::Conservative,
364            MigrationStrategy::Conservative
365        );
366        assert_ne!(
367            MigrationStrategy::Conservative,
368            MigrationStrategy::Permissive
369        );
370    }
371
372    #[test]
373    fn test_migration_strategy_default() {
374        assert_eq!(
375            MigrationStrategy::default(),
376            MigrationStrategy::Conservative
377        );
378    }
379}