1use crate::{DatabasePool, Result};
4use chrono::{DateTime, Utc};
5use oxify_model::{Workflow, WorkflowId};
6use serde::{Deserialize, Serialize};
7use sqlx::Row;
8use uuid::Uuid;
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct WorkflowVersion {
13 pub id: Uuid,
15 pub workflow_id: WorkflowId,
17 pub version: i32,
19 pub description: Option<String>,
21 pub workflow: Workflow,
23 pub created_at: DateTime<Utc>,
25 pub created_by: Option<String>,
27}
28
29#[derive(Clone)]
31pub struct WorkflowVersionStore {
32 pool: DatabasePool,
33}
34
35impl WorkflowVersionStore {
36 pub fn new(pool: DatabasePool) -> Self {
38 Self { pool }
39 }
40
41 pub async fn save_version(
43 &self,
44 workflow: &Workflow,
45 description: Option<String>,
46 created_by: Option<String>,
47 ) -> Result<Uuid> {
48 let id = Uuid::new_v4();
49 let workflow_id = workflow.metadata.id;
50 let version = self.get_next_version(&workflow_id).await?;
51 let definition = serde_json::to_string(workflow)?;
52
53 sqlx::query(
54 r"
55 INSERT INTO workflow_versions
56 (id, workflow_id, version, description, definition, created_by)
57 VALUES (?, ?, ?, ?, ?, ?)
58 ",
59 )
60 .bind(id.to_string())
61 .bind(workflow_id.to_string())
62 .bind(version)
63 .bind(description)
64 .bind(definition)
65 .bind(created_by)
66 .execute(self.pool.pool())
67 .await?;
68
69 Ok(id)
70 }
71
72 pub async fn get_versions(&self, workflow_id: &WorkflowId) -> Result<Vec<WorkflowVersion>> {
74 let rows = sqlx::query(
75 r"
76 SELECT id, workflow_id, version, description, definition, created_at, created_by
77 FROM workflow_versions
78 WHERE workflow_id = ?
79 ORDER BY version DESC
80 ",
81 )
82 .bind(workflow_id.to_string())
83 .fetch_all(self.pool.pool())
84 .await?;
85
86 let versions = rows
87 .into_iter()
88 .filter_map(|row| {
89 let id_str: String = row.get("id");
90 let workflow_id_str: String = row.get("workflow_id");
91 let definition: String = row.get("definition");
92 let created_at_str: String = row.get("created_at");
93
94 let id = Uuid::parse_str(&id_str).ok()?;
95 let workflow_id = Uuid::parse_str(&workflow_id_str).ok()?;
96 let workflow: Workflow = serde_json::from_str(&definition).ok()?;
97 let created_at = DateTime::parse_from_rfc3339(&created_at_str)
98 .map(|dt| dt.with_timezone(&Utc))
99 .ok()?;
100
101 Some(WorkflowVersion {
102 id,
103 workflow_id,
104 version: row.get("version"),
105 description: row.get("description"),
106 workflow,
107 created_at,
108 created_by: row.get("created_by"),
109 })
110 })
111 .collect();
112
113 Ok(versions)
114 }
115
116 pub async fn get_version(
118 &self,
119 workflow_id: &WorkflowId,
120 version: i32,
121 ) -> Result<Option<WorkflowVersion>> {
122 let row = sqlx::query(
123 r"
124 SELECT id, workflow_id, version, description, definition, created_at, created_by
125 FROM workflow_versions
126 WHERE workflow_id = ? AND version = ?
127 ",
128 )
129 .bind(workflow_id.to_string())
130 .bind(version)
131 .fetch_optional(self.pool.pool())
132 .await?;
133
134 match row {
135 Some(row) => {
136 let id_str: String = row.get("id");
137 let workflow_id_str: String = row.get("workflow_id");
138 let definition: String = row.get("definition");
139 let created_at_str: String = row.get("created_at");
140
141 let id = Uuid::parse_str(&id_str)
142 .map_err(|e| crate::StorageError::validation(format!("Invalid UUID: {}", e)))?;
143 let workflow_id = Uuid::parse_str(&workflow_id_str)
144 .map_err(|e| crate::StorageError::validation(format!("Invalid UUID: {}", e)))?;
145 let workflow: Workflow = serde_json::from_str(&definition)?;
146 let created_at = DateTime::parse_from_rfc3339(&created_at_str)
147 .map(|dt| dt.with_timezone(&Utc))
148 .map_err(|e| crate::StorageError::validation(format!("Invalid date: {}", e)))?;
149
150 Ok(Some(WorkflowVersion {
151 id,
152 workflow_id,
153 version: row.get("version"),
154 description: row.get("description"),
155 workflow,
156 created_at,
157 created_by: row.get("created_by"),
158 }))
159 }
160 None => Ok(None),
161 }
162 }
163
164 async fn get_next_version(&self, workflow_id: &WorkflowId) -> Result<i32> {
166 let row = sqlx::query(
167 r"
168 SELECT COALESCE(MAX(version), 0) + 1 as next_version
169 FROM workflow_versions
170 WHERE workflow_id = ?
171 ",
172 )
173 .bind(workflow_id.to_string())
174 .fetch_one(self.pool.pool())
175 .await?;
176
177 let next_version: i32 = row.get("next_version");
178 Ok(next_version)
179 }
180
181 pub async fn get_latest_version(&self, workflow_id: &WorkflowId) -> Result<Option<i32>> {
183 let row = sqlx::query(
184 r"
185 SELECT MAX(version) as latest_version
186 FROM workflow_versions
187 WHERE workflow_id = ?
188 ",
189 )
190 .bind(workflow_id.to_string())
191 .fetch_one(self.pool.pool())
192 .await?;
193
194 let latest_version: Option<i32> = row.get("latest_version");
195 Ok(latest_version)
196 }
197
198 pub async fn delete_all_versions(&self, workflow_id: &WorkflowId) -> Result<u64> {
200 let result = sqlx::query(
201 r"
202 DELETE FROM workflow_versions
203 WHERE workflow_id = ?
204 ",
205 )
206 .bind(workflow_id.to_string())
207 .execute(self.pool.pool())
208 .await?;
209
210 Ok(result.rows_affected())
211 }
212
213 pub async fn compare_versions(
215 &self,
216 workflow_id: &WorkflowId,
217 version1: i32,
218 version2: i32,
219 ) -> Result<Option<VersionComparison>> {
220 let v1 = self.get_version(workflow_id, version1).await?;
221 let v2 = self.get_version(workflow_id, version2).await?;
222
223 match (v1, v2) {
224 (Some(v1), Some(v2)) => {
225 let comparison = VersionComparison {
226 version1: v1.version,
227 version2: v2.version,
228 nodes_added: v2.workflow.nodes.len() as i32 - v1.workflow.nodes.len() as i32,
229 nodes_removed: 0, edges_added: v2.workflow.edges.len() as i32 - v1.workflow.edges.len() as i32,
231 edges_removed: 0, name_changed: v1.workflow.metadata.name != v2.workflow.metadata.name,
233 description_changed: v1.workflow.metadata.description
234 != v2.workflow.metadata.description,
235 };
236 Ok(Some(comparison))
237 }
238 _ => Ok(None),
239 }
240 }
241
242 pub async fn rollback_to_version(
246 &self,
247 workflow_id: &WorkflowId,
248 target_version: i32,
249 rollback_description: Option<String>,
250 rolled_back_by: Option<String>,
251 ) -> Result<bool> {
252 let version = self
254 .get_version(workflow_id, target_version)
255 .await?
256 .ok_or_else(|| {
257 crate::StorageError::not_found(
258 crate::ResourceType::WorkflowVersion,
259 crate::ResourceId::Composite(vec![
260 workflow_id.to_string(),
261 target_version.to_string(),
262 ]),
263 )
264 })?;
265
266 let workflow_json = serde_json::to_string(&version.workflow)?;
268 let now = Utc::now().to_rfc3339();
269
270 let result = sqlx::query(
271 r"
272 UPDATE workflows
273 SET name = ?,
274 description = ?,
275 definition = ?,
276 updated_at = ?
277 WHERE id = ?
278 ",
279 )
280 .bind(&version.workflow.metadata.name)
281 .bind(&version.workflow.metadata.description)
282 .bind(workflow_json)
283 .bind(now)
284 .bind(workflow_id.to_string())
285 .execute(self.pool.pool())
286 .await?;
287
288 if result.rows_affected() == 0 {
289 return Ok(false);
290 }
291
292 let description = rollback_description
294 .unwrap_or_else(|| format!("Rolled back to version {target_version}"));
295
296 self.save_version(&version.workflow, Some(description), rolled_back_by)
297 .await?;
298
299 Ok(true)
300 }
301
302 pub async fn delete_versions_before(
306 &self,
307 workflow_id: &WorkflowId,
308 before_version: i32,
309 ) -> Result<u64> {
310 let result = sqlx::query(
311 r"
312 DELETE FROM workflow_versions
313 WHERE workflow_id = ? AND version < ?
314 ",
315 )
316 .bind(workflow_id.to_string())
317 .bind(before_version)
318 .execute(self.pool.pool())
319 .await?;
320
321 Ok(result.rows_affected())
322 }
323}
324
325#[derive(Debug, Clone, Serialize, Deserialize)]
327pub struct VersionComparison {
328 pub version1: i32,
329 pub version2: i32,
330 pub nodes_added: i32,
331 pub nodes_removed: i32,
332 pub edges_added: i32,
333 pub edges_removed: i32,
334 pub name_changed: bool,
335 pub description_changed: bool,
336}
337
338#[cfg(test)]
339mod tests {
340 use super::*;
341
342 #[allow(dead_code)]
343 async fn setup_test_pool() -> Result<DatabasePool> {
344 let config = crate::DatabaseConfig {
345 database_url: std::env::var("DATABASE_URL")
346 .unwrap_or_else(|_| "sqlite::memory:".to_string()),
347 ..Default::default()
348 };
349 DatabasePool::new(config).await
350 }
351
352 #[tokio::test]
353 #[ignore] async fn test_version_history() -> Result<()> {
355 let pool = setup_test_pool().await?;
356 pool.migrate().await?;
357
358 let store = WorkflowVersionStore::new(pool);
359
360 let workflow = Workflow::new("Test Workflow".to_string());
362 let workflow_id = workflow.metadata.id;
363
364 let v1_id = store
366 .save_version(&workflow, Some("Initial version".to_string()), None)
367 .await?;
368 assert_ne!(v1_id, Uuid::nil());
369
370 let versions = store.get_versions(&workflow_id).await?;
372 assert_eq!(versions.len(), 1);
373 assert_eq!(versions[0].version, 1);
374
375 let mut workflow_v2 = workflow.clone();
377 workflow_v2.metadata.name = "Updated Workflow".to_string();
378 store
379 .save_version(&workflow_v2, Some("Updated name".to_string()), None)
380 .await?;
381
382 let versions = store.get_versions(&workflow_id).await?;
384 assert_eq!(versions.len(), 2);
385
386 let v1 = store.get_version(&workflow_id, 1).await?;
388 assert!(v1.is_some());
389 assert_eq!(v1.unwrap().workflow.metadata.name, "Test Workflow");
390
391 Ok(())
392 }
393}