1use crate::{Message, ProtocolVersion};
23
24#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
26pub enum MigrationStrategy {
27 #[default]
29 Conservative,
30 Permissive,
32 Strict,
34}
35
36#[derive(Debug, Clone)]
38pub struct CompatibilityInfo {
39 pub is_compatible: bool,
41 pub from_version: ProtocolVersion,
43 pub to_version: ProtocolVersion,
45 pub warnings: Vec<String>,
47 pub unsupported_features: Vec<String>,
49}
50
51pub struct ProtocolMigrator {
53 strategy: MigrationStrategy,
54}
55
56impl ProtocolMigrator {
57 pub fn new(strategy: MigrationStrategy) -> Self {
59 Self { strategy }
60 }
61
62 pub fn check_compatibility(
64 &self,
65 message: &Message,
66 target_version: ProtocolVersion,
67 ) -> CompatibilityInfo {
68 let from_version = ProtocolVersion::V2; let mut warnings = Vec::new();
70 let unsupported_features = Vec::new();
71
72 if message.has_group() && target_version == ProtocolVersion::V2 {
74 warnings
75 .push("Group ID is supported in v2 but may have limited functionality".to_string());
76 }
77
78 if message.has_parent() || message.has_root() {
79 warnings.push(
80 "Workflow tracking (parent/root) support varies between versions".to_string(),
81 );
82 }
83
84 if message.properties.priority.is_some() {
86 warnings.push("Priority support may vary between broker implementations".to_string());
87 }
88
89 let is_compatible = match self.strategy {
91 MigrationStrategy::Conservative => {
92 warnings.is_empty() && unsupported_features.is_empty()
93 }
94 MigrationStrategy::Permissive => true, MigrationStrategy::Strict => {
96 warnings.is_empty()
97 && unsupported_features.is_empty()
98 && self.check_strict_compatibility(message, target_version)
99 }
100 };
101
102 CompatibilityInfo {
103 is_compatible,
104 from_version,
105 to_version: target_version,
106 warnings,
107 unsupported_features,
108 }
109 }
110
111 pub fn migrate(
113 &self,
114 message: Message,
115 target_version: ProtocolVersion,
116 ) -> Result<Message, MigrationError> {
117 let compat = self.check_compatibility(&message, target_version);
118
119 if !compat.is_compatible && self.strategy == MigrationStrategy::Conservative {
120 return Err(MigrationError::IncompatibleVersion {
121 from: compat.from_version,
122 to: target_version,
123 reasons: compat.warnings,
124 });
125 }
126
127 Ok(message)
130 }
131
132 fn check_strict_compatibility(&self, _message: &Message, _target: ProtocolVersion) -> bool {
133 true
136 }
137
138 pub fn strategy(&self) -> MigrationStrategy {
140 self.strategy
141 }
142
143 pub fn set_strategy(&mut self, strategy: MigrationStrategy) {
145 self.strategy = strategy;
146 }
147}
148
149impl Default for ProtocolMigrator {
150 fn default() -> Self {
151 Self::new(MigrationStrategy::Conservative)
152 }
153}
154
155#[derive(Debug, Clone)]
157pub enum MigrationError {
158 IncompatibleVersion {
160 from: ProtocolVersion,
161 to: ProtocolVersion,
162 reasons: Vec<String>,
163 },
164 UnsupportedFeature {
166 feature: String,
167 version: ProtocolVersion,
168 },
169 ValidationError(String),
171}
172
173impl std::fmt::Display for MigrationError {
174 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175 match self {
176 MigrationError::IncompatibleVersion { from, to, reasons } => {
177 write!(
178 f,
179 "Incompatible migration from {} to {}: {}",
180 from,
181 to,
182 reasons.join(", ")
183 )
184 }
185 MigrationError::UnsupportedFeature { feature, version } => {
186 write!(f, "Feature '{}' not supported in {}", feature, version)
187 }
188 MigrationError::ValidationError(msg) => write!(f, "Validation error: {}", msg),
189 }
190 }
191}
192
193impl std::error::Error for MigrationError {}
194
195pub fn create_migration_plan(from: ProtocolVersion, to: ProtocolVersion) -> Vec<MigrationStep> {
197 let mut steps = Vec::new();
198
199 if from != to {
200 steps.push(MigrationStep {
201 description: format!("Migrate from {} to {}", from, to),
202 from_version: from,
203 to_version: to,
204 required: true,
205 });
206
207 if from == ProtocolVersion::V2 && to == ProtocolVersion::V5 {
209 steps.push(MigrationStep {
210 description: "Verify message format compatibility".to_string(),
211 from_version: from,
212 to_version: to,
213 required: true,
214 });
215
216 steps.push(MigrationStep {
217 description: "Update any version-specific headers".to_string(),
218 from_version: from,
219 to_version: to,
220 required: false,
221 });
222 }
223 }
224
225 steps
226}
227
228#[derive(Debug, Clone)]
230pub struct MigrationStep {
231 pub description: String,
233 pub from_version: ProtocolVersion,
235 pub to_version: ProtocolVersion,
237 pub required: bool,
239}
240
241#[cfg(test)]
242mod tests {
243 use super::*;
244 use crate::TaskArgs;
245 use uuid::Uuid;
246
247 #[test]
248 fn test_migrator_default() {
249 let migrator = ProtocolMigrator::default();
250 assert_eq!(migrator.strategy(), MigrationStrategy::Conservative);
251 }
252
253 #[test]
254 fn test_migrator_set_strategy() {
255 let mut migrator = ProtocolMigrator::default();
256 migrator.set_strategy(MigrationStrategy::Permissive);
257 assert_eq!(migrator.strategy(), MigrationStrategy::Permissive);
258 }
259
260 #[test]
261 fn test_check_compatibility_basic() {
262 let task_id = Uuid::new_v4();
263 let body = serde_json::to_vec(&TaskArgs::new()).unwrap();
264 let msg = Message::new("tasks.add".to_string(), task_id, body);
265
266 let migrator = ProtocolMigrator::new(MigrationStrategy::Conservative);
267 let info = migrator.check_compatibility(&msg, ProtocolVersion::V5);
268
269 assert!(info.is_compatible);
270 assert_eq!(info.to_version, ProtocolVersion::V5);
271 }
272
273 #[test]
274 fn test_check_compatibility_with_warnings() {
275 let task_id = Uuid::new_v4();
276 let body = serde_json::to_vec(&TaskArgs::new()).unwrap();
277 let msg = Message::new("tasks.add".to_string(), task_id, body)
278 .with_priority(5)
279 .with_group(Uuid::new_v4());
280
281 let migrator = ProtocolMigrator::new(MigrationStrategy::Permissive);
282 let info = migrator.check_compatibility(&msg, ProtocolVersion::V2);
283
284 assert!(info.is_compatible); assert!(!info.warnings.is_empty());
286 }
287
288 #[test]
289 fn test_migrate_basic_message() {
290 let task_id = Uuid::new_v4();
291 let body = serde_json::to_vec(&TaskArgs::new()).unwrap();
292 let msg = Message::new("tasks.add".to_string(), task_id, body.clone());
293
294 let migrator = ProtocolMigrator::new(MigrationStrategy::Conservative);
295 let migrated = migrator.migrate(msg, ProtocolVersion::V5).unwrap();
296
297 assert_eq!(migrated.task_id(), task_id);
298 assert_eq!(migrated.body, body);
299 }
300
301 #[test]
302 fn test_migrate_permissive() {
303 let task_id = Uuid::new_v4();
304 let body = serde_json::to_vec(&TaskArgs::new()).unwrap();
305 let msg = Message::new("tasks.add".to_string(), task_id, body)
306 .with_priority(9)
307 .with_group(Uuid::new_v4());
308
309 let migrator = ProtocolMigrator::new(MigrationStrategy::Permissive);
310 let result = migrator.migrate(msg, ProtocolVersion::V5);
311
312 assert!(result.is_ok());
313 }
314
315 #[test]
316 fn test_create_migration_plan_same_version() {
317 let plan = create_migration_plan(ProtocolVersion::V2, ProtocolVersion::V2);
318 assert_eq!(plan.len(), 0);
319 }
320
321 #[test]
322 fn test_create_migration_plan_v2_to_v5() {
323 let plan = create_migration_plan(ProtocolVersion::V2, ProtocolVersion::V5);
324 assert!(!plan.is_empty());
325 assert!(plan.iter().any(|step| step.required));
326 }
327
328 #[test]
329 fn test_migration_error_display() {
330 let err = MigrationError::IncompatibleVersion {
331 from: ProtocolVersion::V2,
332 to: ProtocolVersion::V5,
333 reasons: vec!["test reason".to_string()],
334 };
335 assert!(err.to_string().contains("Incompatible migration"));
336
337 let err = MigrationError::UnsupportedFeature {
338 feature: "test_feature".to_string(),
339 version: ProtocolVersion::V2,
340 };
341 assert!(err.to_string().contains("not supported"));
342
343 let err = MigrationError::ValidationError("test error".to_string());
344 assert!(err.to_string().contains("Validation error"));
345 }
346
347 #[test]
348 fn test_compatibility_info_structure() {
349 let task_id = Uuid::new_v4();
350 let body = vec![1, 2, 3];
351 let msg = Message::new("tasks.test".to_string(), task_id, body);
352
353 let migrator = ProtocolMigrator::new(MigrationStrategy::Strict);
354 let info = migrator.check_compatibility(&msg, ProtocolVersion::V5);
355
356 assert_eq!(info.from_version, ProtocolVersion::V2);
357 assert_eq!(info.to_version, ProtocolVersion::V5);
358 }
359
360 #[test]
361 fn test_migration_strategy_equality() {
362 assert_eq!(
363 MigrationStrategy::Conservative,
364 MigrationStrategy::Conservative
365 );
366 assert_ne!(
367 MigrationStrategy::Conservative,
368 MigrationStrategy::Permissive
369 );
370 }
371
372 #[test]
373 fn test_migration_strategy_default() {
374 assert_eq!(
375 MigrationStrategy::default(),
376 MigrationStrategy::Conservative
377 );
378 }
379}