1pub mod migration;
7pub mod rollback;
8
9use crate::engine::WorkflowDefinition;
10use crate::error::{Result, WorkflowError};
11use chrono::{DateTime, Utc};
12use dashmap::DashMap;
13use serde::{Deserialize, Serialize};
14use std::collections::HashMap;
15use std::sync::Arc;
16
17pub use migration::{MigrationPlan, MigrationStep, WorkflowMigration};
18pub use rollback::{RollbackManager, RollbackPoint};
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct WorkflowVersion {
23 pub version: String,
25 pub definition: WorkflowDefinition,
27 pub metadata: VersionMetadata,
29 pub previous_version: Option<String>,
31 pub migration_notes: Option<String>,
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct VersionMetadata {
38 pub created_at: DateTime<Utc>,
40 pub author: String,
42 pub changelog: Vec<ChangelogEntry>,
44 pub breaking_changes: Vec<String>,
46 pub deprecations: Vec<String>,
48 pub tags: Vec<String>,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct ChangelogEntry {
55 pub change_type: ChangeType,
57 pub description: String,
59 pub affected_components: Vec<String>,
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
65pub enum ChangeType {
66 Feature,
68 Fix,
70 Performance,
72 Breaking,
74 Deprecation,
76 Documentation,
78 Refactor,
80}
81
82#[derive(Debug, Clone, Copy, PartialEq, Eq)]
84pub enum VersionComparison {
85 Less,
87 Equal,
89 Greater,
91}
92
93pub struct WorkflowVersionManager {
95 versions: Arc<DashMap<String, HashMap<String, WorkflowVersion>>>,
96 migration: WorkflowMigration,
97 rollback: RollbackManager,
98}
99
100impl WorkflowVersionManager {
101 pub fn new() -> Self {
103 Self {
104 versions: Arc::new(DashMap::new()),
105 migration: WorkflowMigration::new(),
106 rollback: RollbackManager::new(),
107 }
108 }
109
110 pub fn register_version(&self, workflow_id: String, version: WorkflowVersion) -> Result<()> {
112 Self::validate_version(&version.version)?;
114
115 let mut workflow_versions = self.versions.entry(workflow_id.clone()).or_default();
116
117 if workflow_versions.contains_key(&version.version) {
118 return Err(WorkflowError::versioning(format!(
119 "Version {} already exists for workflow {}",
120 version.version, workflow_id
121 )));
122 }
123
124 workflow_versions.insert(version.version.clone(), version);
125
126 Ok(())
127 }
128
129 pub fn get_version(&self, workflow_id: &str, version: &str) -> Option<WorkflowVersion> {
131 self.versions
132 .get(workflow_id)
133 .and_then(|entry| entry.get(version).cloned())
134 }
135
136 pub fn get_latest_version(&self, workflow_id: &str) -> Option<WorkflowVersion> {
138 self.versions.get(workflow_id).and_then(|entry| {
139 entry
140 .values()
141 .max_by(|a, b| Self::compare_versions(&a.version, &b.version))
142 .cloned()
143 })
144 }
145
146 pub fn list_versions(&self, workflow_id: &str) -> Vec<WorkflowVersion> {
148 self.versions
149 .get(workflow_id)
150 .map(|entry| {
151 let mut versions: Vec<WorkflowVersion> = entry.values().cloned().collect();
152 versions.sort_by(|a, b| Self::compare_versions(&a.version, &b.version));
153 versions
154 })
155 .unwrap_or_default()
156 }
157
158 pub fn is_compatible(&self, version1: &str, version2: &str) -> Result<bool> {
160 let (major1, minor1, _) = Self::parse_version(version1)?;
161 let (major2, minor2, _) = Self::parse_version(version2)?;
162
163 Ok(major1 == major2 && minor1 <= minor2)
165 }
166
167 pub fn migrate(
169 &self,
170 workflow_id: &str,
171 from_version: &str,
172 to_version: &str,
173 ) -> Result<WorkflowDefinition> {
174 let from = self
175 .get_version(workflow_id, from_version)
176 .ok_or_else(|| WorkflowError::not_found(from_version))?;
177
178 let to = self
179 .get_version(workflow_id, to_version)
180 .ok_or_else(|| WorkflowError::not_found(to_version))?;
181
182 self.migration.migrate(from.definition, to.definition)
183 }
184
185 pub fn create_rollback_point(&self, workflow_id: String, version: String) -> Result<String> {
187 let workflow_version = self
188 .get_version(&workflow_id, &version)
189 .ok_or_else(|| WorkflowError::not_found(&version))?;
190
191 self.rollback
192 .create_rollback_point(workflow_id, workflow_version.definition)
193 }
194
195 pub fn rollback(&self, rollback_id: &str) -> Result<WorkflowDefinition> {
197 self.rollback.rollback(rollback_id)
198 }
199
200 fn validate_version(version: &str) -> Result<()> {
202 Self::parse_version(version).map(|_| ())
203 }
204
205 fn parse_version(version: &str) -> Result<(u32, u32, u32)> {
207 let parts: Vec<&str> = version
208 .split('-')
209 .next()
210 .ok_or_else(|| WorkflowError::versioning("Invalid version format"))?
211 .split('.')
212 .collect();
213
214 if parts.len() != 3 {
215 return Err(WorkflowError::versioning(
216 "Version must have 3 parts (major.minor.patch)",
217 ));
218 }
219
220 let major = parts[0]
221 .parse::<u32>()
222 .map_err(|_| WorkflowError::versioning("Invalid major version"))?;
223
224 let minor = parts[1]
225 .parse::<u32>()
226 .map_err(|_| WorkflowError::versioning("Invalid minor version"))?;
227
228 let patch = parts[2]
229 .parse::<u32>()
230 .map_err(|_| WorkflowError::versioning("Invalid patch version"))?;
231
232 Ok((major, minor, patch))
233 }
234
235 fn compare_versions(v1: &str, v2: &str) -> std::cmp::Ordering {
237 let Ok((major1, minor1, patch1)) = Self::parse_version(v1) else {
238 return std::cmp::Ordering::Equal;
239 };
240
241 let Ok((major2, minor2, patch2)) = Self::parse_version(v2) else {
242 return std::cmp::Ordering::Equal;
243 };
244
245 match major1.cmp(&major2) {
246 std::cmp::Ordering::Equal => match minor1.cmp(&minor2) {
247 std::cmp::Ordering::Equal => patch1.cmp(&patch2),
248 other => other,
249 },
250 other => other,
251 }
252 }
253
254 pub fn has_breaking_changes(&self, workflow_id: &str, from: &str, to: &str) -> Result<bool> {
256 let from_version = self
257 .get_version(workflow_id, from)
258 .ok_or_else(|| WorkflowError::not_found(from))?;
259
260 let to_version = self
261 .get_version(workflow_id, to)
262 .ok_or_else(|| WorkflowError::not_found(to))?;
263
264 Ok(!to_version.metadata.breaking_changes.is_empty()
265 && Self::compare_versions(&from_version.version, &to_version.version)
266 == std::cmp::Ordering::Less)
267 }
268}
269
270impl Default for WorkflowVersionManager {
271 fn default() -> Self {
272 Self::new()
273 }
274}
275
276#[cfg(test)]
277mod tests {
278 use super::*;
279
280 #[test]
281 fn test_version_parsing() {
282 assert!(WorkflowVersionManager::parse_version("1.0.0").is_ok());
283 assert!(WorkflowVersionManager::parse_version("1.2.3").is_ok());
284 assert!(WorkflowVersionManager::parse_version("invalid").is_err());
285 }
286
287 #[test]
288 fn test_version_comparison() {
289 use std::cmp::Ordering;
290
291 assert_eq!(
292 WorkflowVersionManager::compare_versions("1.0.0", "1.0.0"),
293 Ordering::Equal
294 );
295 assert_eq!(
296 WorkflowVersionManager::compare_versions("1.0.0", "2.0.0"),
297 Ordering::Less
298 );
299 assert_eq!(
300 WorkflowVersionManager::compare_versions("2.0.0", "1.0.0"),
301 Ordering::Greater
302 );
303 assert_eq!(
304 WorkflowVersionManager::compare_versions("1.0.0", "1.1.0"),
305 Ordering::Less
306 );
307 }
308
309 #[test]
310 fn test_version_compatibility() {
311 let manager = WorkflowVersionManager::new();
312
313 assert!(
314 manager
315 .is_compatible("1.0.0", "1.1.0")
316 .expect("Check failed")
317 );
318 assert!(
319 !manager
320 .is_compatible("1.0.0", "2.0.0")
321 .expect("Check failed")
322 );
323 }
324
325 #[test]
326 fn test_register_version() {
327 use crate::dag::WorkflowDag;
328
329 let manager = WorkflowVersionManager::new();
330
331 let version = WorkflowVersion {
332 version: "1.0.0".to_string(),
333 definition: WorkflowDefinition {
334 id: "test".to_string(),
335 name: "Test".to_string(),
336 description: None,
337 version: "1.0.0".to_string(),
338 dag: WorkflowDag::new(),
339 },
340 metadata: VersionMetadata {
341 created_at: Utc::now(),
342 author: "test".to_string(),
343 changelog: vec![],
344 breaking_changes: vec![],
345 deprecations: vec![],
346 tags: vec![],
347 },
348 previous_version: None,
349 migration_notes: None,
350 };
351
352 assert!(
353 manager
354 .register_version("test-workflow".to_string(), version)
355 .is_ok()
356 );
357 }
358
359 #[test]
360 fn test_get_latest_version() {
361 use crate::dag::WorkflowDag;
362
363 let manager = WorkflowVersionManager::new();
364
365 let v1 = WorkflowVersion {
366 version: "1.0.0".to_string(),
367 definition: WorkflowDefinition {
368 id: "test".to_string(),
369 name: "Test".to_string(),
370 description: None,
371 version: "1.0.0".to_string(),
372 dag: WorkflowDag::new(),
373 },
374 metadata: VersionMetadata {
375 created_at: Utc::now(),
376 author: "test".to_string(),
377 changelog: vec![],
378 breaking_changes: vec![],
379 deprecations: vec![],
380 tags: vec![],
381 },
382 previous_version: None,
383 migration_notes: None,
384 };
385
386 let v2 = WorkflowVersion {
387 version: "2.0.0".to_string(),
388 definition: WorkflowDefinition {
389 id: "test".to_string(),
390 name: "Test".to_string(),
391 description: None,
392 version: "2.0.0".to_string(),
393 dag: WorkflowDag::new(),
394 },
395 metadata: VersionMetadata {
396 created_at: Utc::now(),
397 author: "test".to_string(),
398 changelog: vec![],
399 breaking_changes: vec![],
400 deprecations: vec![],
401 tags: vec![],
402 },
403 previous_version: Some("1.0.0".to_string()),
404 migration_notes: None,
405 };
406
407 manager
408 .register_version("test".to_string(), v1)
409 .expect("Failed");
410 manager
411 .register_version("test".to_string(), v2)
412 .expect("Failed");
413
414 let latest = manager.get_latest_version("test").expect("Not found");
415 assert_eq!(latest.version, "2.0.0");
416 }
417}