oxify_storage/
workflow_version_store.rs

1//! Workflow version history storage
2
3use crate::{DatabasePool, Result};
4use chrono::{DateTime, Utc};
5use oxify_model::{Workflow, WorkflowId};
6use serde::{Deserialize, Serialize};
7use sqlx::Row;
8use uuid::Uuid;
9
10/// Workflow version record
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct WorkflowVersion {
13    /// Version record ID
14    pub id: Uuid,
15    /// Workflow ID
16    pub workflow_id: WorkflowId,
17    /// Version number
18    pub version: i32,
19    /// Version description/change message
20    pub description: Option<String>,
21    /// Workflow snapshot at this version
22    pub workflow: Workflow,
23    /// When this version was created
24    pub created_at: DateTime<Utc>,
25    /// User who created this version (if available)
26    pub created_by: Option<String>,
27}
28
29/// Workflow version storage layer
30#[derive(Clone)]
31pub struct WorkflowVersionStore {
32    pool: DatabasePool,
33}
34
35impl WorkflowVersionStore {
36    /// Create a new workflow version store
37    pub fn new(pool: DatabasePool) -> Self {
38        Self { pool }
39    }
40
41    /// Save a new version of a workflow
42    pub async fn save_version(
43        &self,
44        workflow: &Workflow,
45        description: Option<String>,
46        created_by: Option<String>,
47    ) -> Result<Uuid> {
48        let id = Uuid::new_v4();
49        let workflow_id = workflow.metadata.id;
50        let version = self.get_next_version(&workflow_id).await?;
51        let definition = serde_json::to_string(workflow)?;
52
53        sqlx::query(
54            r"
55            INSERT INTO workflow_versions
56            (id, workflow_id, version, description, definition, created_by)
57            VALUES (?, ?, ?, ?, ?, ?)
58            ",
59        )
60        .bind(id.to_string())
61        .bind(workflow_id.to_string())
62        .bind(version)
63        .bind(description)
64        .bind(definition)
65        .bind(created_by)
66        .execute(self.pool.pool())
67        .await?;
68
69        Ok(id)
70    }
71
72    /// Get version history for a workflow
73    pub async fn get_versions(&self, workflow_id: &WorkflowId) -> Result<Vec<WorkflowVersion>> {
74        let rows = sqlx::query(
75            r"
76            SELECT id, workflow_id, version, description, definition, created_at, created_by
77            FROM workflow_versions
78            WHERE workflow_id = ?
79            ORDER BY version DESC
80            ",
81        )
82        .bind(workflow_id.to_string())
83        .fetch_all(self.pool.pool())
84        .await?;
85
86        let versions = rows
87            .into_iter()
88            .filter_map(|row| {
89                let id_str: String = row.get("id");
90                let workflow_id_str: String = row.get("workflow_id");
91                let definition: String = row.get("definition");
92                let created_at_str: String = row.get("created_at");
93
94                let id = Uuid::parse_str(&id_str).ok()?;
95                let workflow_id = Uuid::parse_str(&workflow_id_str).ok()?;
96                let workflow: Workflow = serde_json::from_str(&definition).ok()?;
97                let created_at = DateTime::parse_from_rfc3339(&created_at_str)
98                    .map(|dt| dt.with_timezone(&Utc))
99                    .ok()?;
100
101                Some(WorkflowVersion {
102                    id,
103                    workflow_id,
104                    version: row.get("version"),
105                    description: row.get("description"),
106                    workflow,
107                    created_at,
108                    created_by: row.get("created_by"),
109                })
110            })
111            .collect();
112
113        Ok(versions)
114    }
115
116    /// Get a specific version of a workflow
117    pub async fn get_version(
118        &self,
119        workflow_id: &WorkflowId,
120        version: i32,
121    ) -> Result<Option<WorkflowVersion>> {
122        let row = sqlx::query(
123            r"
124            SELECT id, workflow_id, version, description, definition, created_at, created_by
125            FROM workflow_versions
126            WHERE workflow_id = ? AND version = ?
127            ",
128        )
129        .bind(workflow_id.to_string())
130        .bind(version)
131        .fetch_optional(self.pool.pool())
132        .await?;
133
134        match row {
135            Some(row) => {
136                let id_str: String = row.get("id");
137                let workflow_id_str: String = row.get("workflow_id");
138                let definition: String = row.get("definition");
139                let created_at_str: String = row.get("created_at");
140
141                let id = Uuid::parse_str(&id_str)
142                    .map_err(|e| crate::StorageError::validation(format!("Invalid UUID: {}", e)))?;
143                let workflow_id = Uuid::parse_str(&workflow_id_str)
144                    .map_err(|e| crate::StorageError::validation(format!("Invalid UUID: {}", e)))?;
145                let workflow: Workflow = serde_json::from_str(&definition)?;
146                let created_at = DateTime::parse_from_rfc3339(&created_at_str)
147                    .map(|dt| dt.with_timezone(&Utc))
148                    .map_err(|e| crate::StorageError::validation(format!("Invalid date: {}", e)))?;
149
150                Ok(Some(WorkflowVersion {
151                    id,
152                    workflow_id,
153                    version: row.get("version"),
154                    description: row.get("description"),
155                    workflow,
156                    created_at,
157                    created_by: row.get("created_by"),
158                }))
159            }
160            None => Ok(None),
161        }
162    }
163
164    /// Get the next version number for a workflow
165    async fn get_next_version(&self, workflow_id: &WorkflowId) -> Result<i32> {
166        let row = sqlx::query(
167            r"
168            SELECT COALESCE(MAX(version), 0) + 1 as next_version
169            FROM workflow_versions
170            WHERE workflow_id = ?
171            ",
172        )
173        .bind(workflow_id.to_string())
174        .fetch_one(self.pool.pool())
175        .await?;
176
177        let next_version: i32 = row.get("next_version");
178        Ok(next_version)
179    }
180
181    /// Get the latest version number for a workflow
182    pub async fn get_latest_version(&self, workflow_id: &WorkflowId) -> Result<Option<i32>> {
183        let row = sqlx::query(
184            r"
185            SELECT MAX(version) as latest_version
186            FROM workflow_versions
187            WHERE workflow_id = ?
188            ",
189        )
190        .bind(workflow_id.to_string())
191        .fetch_one(self.pool.pool())
192        .await?;
193
194        let latest_version: Option<i32> = row.get("latest_version");
195        Ok(latest_version)
196    }
197
198    /// Delete all versions for a workflow
199    pub async fn delete_all_versions(&self, workflow_id: &WorkflowId) -> Result<u64> {
200        let result = sqlx::query(
201            r"
202            DELETE FROM workflow_versions
203            WHERE workflow_id = ?
204            ",
205        )
206        .bind(workflow_id.to_string())
207        .execute(self.pool.pool())
208        .await?;
209
210        Ok(result.rows_affected())
211    }
212
213    /// Compare two versions of a workflow
214    pub async fn compare_versions(
215        &self,
216        workflow_id: &WorkflowId,
217        version1: i32,
218        version2: i32,
219    ) -> Result<Option<VersionComparison>> {
220        let v1 = self.get_version(workflow_id, version1).await?;
221        let v2 = self.get_version(workflow_id, version2).await?;
222
223        match (v1, v2) {
224            (Some(v1), Some(v2)) => {
225                let comparison = VersionComparison {
226                    version1: v1.version,
227                    version2: v2.version,
228                    nodes_added: v2.workflow.nodes.len() as i32 - v1.workflow.nodes.len() as i32,
229                    nodes_removed: 0, // Would need more sophisticated diff
230                    edges_added: v2.workflow.edges.len() as i32 - v1.workflow.edges.len() as i32,
231                    edges_removed: 0, // Would need more sophisticated diff
232                    name_changed: v1.workflow.metadata.name != v2.workflow.metadata.name,
233                    description_changed: v1.workflow.metadata.description
234                        != v2.workflow.metadata.description,
235                };
236                Ok(Some(comparison))
237            }
238            _ => Ok(None),
239        }
240    }
241
242    /// Rollback a workflow to a specific version
243    /// This updates the main workflow definition and creates a new version record
244    /// Returns true if the rollback was successful
245    pub async fn rollback_to_version(
246        &self,
247        workflow_id: &WorkflowId,
248        target_version: i32,
249        rollback_description: Option<String>,
250        rolled_back_by: Option<String>,
251    ) -> Result<bool> {
252        // Get the target version
253        let version = self
254            .get_version(workflow_id, target_version)
255            .await?
256            .ok_or_else(|| {
257                crate::StorageError::not_found(
258                    crate::ResourceType::WorkflowVersion,
259                    crate::ResourceId::Composite(vec![
260                        workflow_id.to_string(),
261                        target_version.to_string(),
262                    ]),
263                )
264            })?;
265
266        // Update the main workflows table with the version's definition
267        let workflow_json = serde_json::to_string(&version.workflow)?;
268        let now = Utc::now().to_rfc3339();
269
270        let result = sqlx::query(
271            r"
272            UPDATE workflows
273            SET name = ?,
274                description = ?,
275                definition = ?,
276                updated_at = ?
277            WHERE id = ?
278            ",
279        )
280        .bind(&version.workflow.metadata.name)
281        .bind(&version.workflow.metadata.description)
282        .bind(workflow_json)
283        .bind(now)
284        .bind(workflow_id.to_string())
285        .execute(self.pool.pool())
286        .await?;
287
288        if result.rows_affected() == 0 {
289            return Ok(false);
290        }
291
292        // Create a new version record to track the rollback
293        let description = rollback_description
294            .unwrap_or_else(|| format!("Rolled back to version {target_version}"));
295
296        self.save_version(&version.workflow, Some(description), rolled_back_by)
297            .await?;
298
299        Ok(true)
300    }
301
302    /// Delete versions older than the specified version
303    /// Useful for cleaning up old version history
304    /// Returns the number of versions deleted
305    pub async fn delete_versions_before(
306        &self,
307        workflow_id: &WorkflowId,
308        before_version: i32,
309    ) -> Result<u64> {
310        let result = sqlx::query(
311            r"
312            DELETE FROM workflow_versions
313            WHERE workflow_id = ? AND version < ?
314            ",
315        )
316        .bind(workflow_id.to_string())
317        .bind(before_version)
318        .execute(self.pool.pool())
319        .await?;
320
321        Ok(result.rows_affected())
322    }
323}
324
325/// Comparison between two workflow versions
326#[derive(Debug, Clone, Serialize, Deserialize)]
327pub struct VersionComparison {
328    pub version1: i32,
329    pub version2: i32,
330    pub nodes_added: i32,
331    pub nodes_removed: i32,
332    pub edges_added: i32,
333    pub edges_removed: i32,
334    pub name_changed: bool,
335    pub description_changed: bool,
336}
337
338#[cfg(test)]
339mod tests {
340    use super::*;
341
342    #[allow(dead_code)]
343    async fn setup_test_pool() -> Result<DatabasePool> {
344        let config = crate::DatabaseConfig {
345            database_url: std::env::var("DATABASE_URL")
346                .unwrap_or_else(|_| "sqlite::memory:".to_string()),
347            ..Default::default()
348        };
349        DatabasePool::new(config).await
350    }
351
352    #[tokio::test]
353    #[ignore] // Requires database
354    async fn test_version_history() -> Result<()> {
355        let pool = setup_test_pool().await?;
356        pool.migrate().await?;
357
358        let store = WorkflowVersionStore::new(pool);
359
360        // Create test workflow
361        let workflow = Workflow::new("Test Workflow".to_string());
362        let workflow_id = workflow.metadata.id;
363
364        // Save first version
365        let v1_id = store
366            .save_version(&workflow, Some("Initial version".to_string()), None)
367            .await?;
368        assert_ne!(v1_id, Uuid::nil());
369
370        // Get versions
371        let versions = store.get_versions(&workflow_id).await?;
372        assert_eq!(versions.len(), 1);
373        assert_eq!(versions[0].version, 1);
374
375        // Save second version
376        let mut workflow_v2 = workflow.clone();
377        workflow_v2.metadata.name = "Updated Workflow".to_string();
378        store
379            .save_version(&workflow_v2, Some("Updated name".to_string()), None)
380            .await?;
381
382        // Get versions again
383        let versions = store.get_versions(&workflow_id).await?;
384        assert_eq!(versions.len(), 2);
385
386        // Get specific version
387        let v1 = store.get_version(&workflow_id, 1).await?;
388        assert!(v1.is_some());
389        assert_eq!(v1.unwrap().workflow.metadata.name, "Test Workflow");
390
391        Ok(())
392    }
393}