oxify_storage/
workflow_store.rs

1//! Workflow storage implementation for SQLite
2
3use crate::{DatabasePool, Result, StorageError};
4use chrono::{DateTime, Utc};
5use oxify_model::{Workflow, WorkflowId};
6use serde::{Deserialize, Serialize};
7use sqlx::Row;
8use uuid::Uuid;
9
10/// Workflow storage layer
11#[derive(Clone)]
12pub struct WorkflowStore {
13    pool: DatabasePool,
14}
15
16impl WorkflowStore {
17    /// Create a new workflow store
18    pub fn new(pool: DatabasePool) -> Self {
19        Self { pool }
20    }
21
22    /// Create a new workflow
23    #[tracing::instrument(skip(self, workflow), fields(workflow_id = %workflow.metadata.id, workflow_name = %workflow.metadata.name))]
24    pub async fn create(&self, workflow: &Workflow) -> Result<WorkflowId> {
25        let id = workflow.metadata.id.to_string();
26        let name = &workflow.metadata.name;
27        let description = workflow.metadata.description.as_ref();
28        let definition = serde_json::to_string(workflow)?;
29        let tags = serde_json::to_string(&workflow.metadata.tags)?;
30
31        sqlx::query(
32            r#"
33            INSERT INTO workflows (id, name, description, definition, tags)
34            VALUES (?, ?, ?, ?, ?)
35            "#,
36        )
37        .bind(&id)
38        .bind(name)
39        .bind(description)
40        .bind(&definition)
41        .bind(&tags)
42        .execute(self.pool.pool())
43        .await?;
44
45        Ok(workflow.metadata.id)
46    }
47
48    /// Get a workflow by ID
49    #[tracing::instrument(skip(self), fields(workflow_id = %id))]
50    pub async fn get(&self, id: &WorkflowId) -> Result<Option<Workflow>> {
51        let id_str = id.to_string();
52        let row = sqlx::query(
53            r#"
54            SELECT id, name, description, created_at, updated_at, version, definition, tags
55            FROM workflows
56            WHERE id = ?
57            "#,
58        )
59        .bind(&id_str)
60        .fetch_optional(self.pool.pool())
61        .await?;
62
63        match row {
64            Some(row) => {
65                let definition_str: String = row.get("definition");
66                let workflow: Workflow = serde_json::from_str(&definition_str)?;
67                Ok(Some(workflow))
68            }
69            None => Ok(None),
70        }
71    }
72
73    /// List all workflows
74    pub async fn list(&self) -> Result<Vec<Workflow>> {
75        let rows = sqlx::query(
76            r#"
77            SELECT id, name, description, created_at, updated_at, version, definition, tags
78            FROM workflows
79            ORDER BY created_at DESC
80            "#,
81        )
82        .fetch_all(self.pool.pool())
83        .await?;
84
85        let workflows: Vec<Workflow> = rows
86            .into_iter()
87            .filter_map(|row| {
88                let definition_str: String = row.get("definition");
89                serde_json::from_str(&definition_str).ok()
90            })
91            .collect();
92
93        Ok(workflows)
94    }
95
96    /// List workflows with pagination
97    pub async fn list_paginated(&self, limit: i64, offset: i64) -> Result<Vec<Workflow>> {
98        let rows = sqlx::query(
99            r#"
100            SELECT id, name, description, created_at, updated_at, version, definition, tags
101            FROM workflows
102            ORDER BY created_at DESC
103            LIMIT ? OFFSET ?
104            "#,
105        )
106        .bind(limit)
107        .bind(offset)
108        .fetch_all(self.pool.pool())
109        .await?;
110
111        let workflows: Vec<Workflow> = rows
112            .into_iter()
113            .filter_map(|row| {
114                let definition_str: String = row.get("definition");
115                serde_json::from_str(&definition_str).ok()
116            })
117            .collect();
118
119        Ok(workflows)
120    }
121
122    /// Count total workflows
123    pub async fn count(&self) -> Result<i64> {
124        let row = sqlx::query("SELECT COUNT(*) as count FROM workflows")
125            .fetch_one(self.pool.pool())
126            .await?;
127
128        let count: i64 = row.get("count");
129        Ok(count)
130    }
131
132    /// Update a workflow
133    #[tracing::instrument(skip(self, workflow), fields(workflow_id = %id, workflow_name = %workflow.metadata.name))]
134    pub async fn update(&self, id: &WorkflowId, workflow: &Workflow) -> Result<bool> {
135        let id_str = id.to_string();
136        let name = &workflow.metadata.name;
137        let description = workflow.metadata.description.as_ref();
138        let definition = serde_json::to_string(workflow)?;
139        let tags = serde_json::to_string(&workflow.metadata.tags)?;
140
141        let result = sqlx::query(
142            r#"
143            UPDATE workflows
144            SET name = ?, description = ?, definition = ?, tags = ?, version = version + 1, updated_at = datetime('now')
145            WHERE id = ?
146            "#,
147        )
148        .bind(name)
149        .bind(description)
150        .bind(&definition)
151        .bind(&tags)
152        .bind(&id_str)
153        .execute(self.pool.pool())
154        .await?;
155
156        Ok(result.rows_affected() > 0)
157    }
158
159    /// Delete a workflow
160    #[tracing::instrument(skip(self), fields(workflow_id = %id))]
161    pub async fn delete(&self, id: &WorkflowId) -> Result<bool> {
162        let id_str = id.to_string();
163        let result = sqlx::query(
164            r#"
165            DELETE FROM workflows
166            WHERE id = ?
167            "#,
168        )
169        .bind(&id_str)
170        .execute(self.pool.pool())
171        .await?;
172
173        Ok(result.rows_affected() > 0)
174    }
175
176    /// Search workflows by name (case-insensitive)
177    pub async fn search(&self, query: &str) -> Result<Vec<Workflow>> {
178        let pattern = format!("%{query}%");
179
180        let rows = sqlx::query(
181            r#"
182            SELECT id, name, description, created_at, updated_at, version, definition, tags
183            FROM workflows
184            WHERE name LIKE ? OR description LIKE ?
185            ORDER BY created_at DESC
186            "#,
187        )
188        .bind(&pattern)
189        .bind(&pattern)
190        .fetch_all(self.pool.pool())
191        .await?;
192
193        let workflows: Vec<Workflow> = rows
194            .into_iter()
195            .filter_map(|row| {
196                let definition_str: String = row.get("definition");
197                serde_json::from_str(&definition_str).ok()
198            })
199            .collect();
200
201        Ok(workflows)
202    }
203
204    // ==================== Bulk Operations ====================
205
206    /// Bulk create multiple workflows
207    /// Returns a list of (id, success, error_message) tuples
208    pub async fn bulk_create(&self, workflows: &[Workflow]) -> Result<Vec<BulkOperationResult>> {
209        let mut results = Vec::with_capacity(workflows.len());
210
211        for workflow in workflows {
212            let result = match self.create(workflow).await {
213                Ok(id) => BulkOperationResult {
214                    id,
215                    success: true,
216                    error: None,
217                },
218                Err(e) => BulkOperationResult {
219                    id: workflow.metadata.id,
220                    success: false,
221                    error: Some(e.to_string()),
222                },
223            };
224            results.push(result);
225        }
226
227        Ok(results)
228    }
229
230    /// Bulk delete multiple workflows by IDs
231    /// Returns a list of (id, success, error_message) tuples
232    pub async fn bulk_delete(&self, ids: &[WorkflowId]) -> Result<Vec<BulkOperationResult>> {
233        let mut results = Vec::with_capacity(ids.len());
234
235        for id in ids {
236            let result = match self.delete(id).await {
237                Ok(deleted) => BulkOperationResult {
238                    id: *id,
239                    success: deleted,
240                    error: if deleted {
241                        None
242                    } else {
243                        Some("Workflow not found".to_string())
244                    },
245                },
246                Err(e) => BulkOperationResult {
247                    id: *id,
248                    success: false,
249                    error: Some(e.to_string()),
250                },
251            };
252            results.push(result);
253        }
254
255        Ok(results)
256    }
257
258    /// Bulk export workflows to JSON format
259    pub async fn bulk_export(&self, ids: Option<&[WorkflowId]>) -> Result<WorkflowExport> {
260        let workflows = match ids {
261            Some(ids) => {
262                let mut workflows = Vec::with_capacity(ids.len());
263                for id in ids {
264                    if let Some(workflow) = self.get(id).await? {
265                        workflows.push(workflow);
266                    }
267                }
268                workflows
269            }
270            None => self.list().await?,
271        };
272
273        Ok(WorkflowExport {
274            version: "1.0".to_string(),
275            exported_at: Utc::now(),
276            count: workflows.len(),
277            workflows,
278        })
279    }
280
281    /// Bulk import workflows from export format
282    /// Returns import results with statistics
283    pub async fn bulk_import(
284        &self,
285        export: &WorkflowExport,
286        options: ImportOptions,
287    ) -> Result<ImportResult> {
288        let mut result = ImportResult {
289            total: export.workflows.len(),
290            imported: 0,
291            skipped: 0,
292            failed: 0,
293            errors: Vec::new(),
294        };
295
296        for workflow in &export.workflows {
297            // Check if workflow already exists
298            let existing = self.get(&workflow.metadata.id).await?;
299
300            if existing.is_some() {
301                match options.on_conflict {
302                    ConflictStrategy::Skip => {
303                        result.skipped += 1;
304                        continue;
305                    }
306                    ConflictStrategy::Replace => {
307                        match self.update(&workflow.metadata.id, workflow).await {
308                            Ok(_) => result.imported += 1,
309                            Err(e) => {
310                                result.failed += 1;
311                                result.errors.push(ImportError {
312                                    workflow_id: workflow.metadata.id,
313                                    workflow_name: workflow.metadata.name.clone(),
314                                    error: e.to_string(),
315                                });
316                            }
317                        }
318                    }
319                    ConflictStrategy::CreateNew => {
320                        // Create with new ID
321                        let mut new_workflow = workflow.clone();
322                        new_workflow.metadata.id = Uuid::new_v4();
323                        match self.create(&new_workflow).await {
324                            Ok(_) => result.imported += 1,
325                            Err(e) => {
326                                result.failed += 1;
327                                result.errors.push(ImportError {
328                                    workflow_id: workflow.metadata.id,
329                                    workflow_name: workflow.metadata.name.clone(),
330                                    error: e.to_string(),
331                                });
332                            }
333                        }
334                    }
335                    ConflictStrategy::Fail => {
336                        return Err(StorageError::ConstraintViolation(format!(
337                            "Workflow {} already exists",
338                            workflow.metadata.id
339                        )));
340                    }
341                }
342            } else {
343                match self.create(workflow).await {
344                    Ok(_) => result.imported += 1,
345                    Err(e) => {
346                        result.failed += 1;
347                        result.errors.push(ImportError {
348                            workflow_id: workflow.metadata.id,
349                            workflow_name: workflow.metadata.name.clone(),
350                            error: e.to_string(),
351                        });
352                    }
353                }
354            }
355        }
356
357        Ok(result)
358    }
359
360    /// List workflows by tag
361    /// Note: tags are stored as JSON array in SQLite
362    pub async fn list_by_tag(&self, tag: &str) -> Result<Vec<Workflow>> {
363        // Search for tag in JSON array using LIKE (simple approach)
364        let pattern = format!("%\"{tag}\"%");
365
366        let rows = sqlx::query(
367            r#"
368            SELECT id, name, description, created_at, updated_at, version, definition, tags
369            FROM workflows
370            WHERE tags LIKE ?
371            ORDER BY created_at DESC
372            "#,
373        )
374        .bind(&pattern)
375        .fetch_all(self.pool.pool())
376        .await?;
377
378        let workflows: Vec<Workflow> = rows
379            .into_iter()
380            .filter_map(|row| {
381                let definition_str: String = row.get("definition");
382                serde_json::from_str(&definition_str).ok()
383            })
384            .collect();
385
386        Ok(workflows)
387    }
388
389    /// List workflows by multiple tags (AND logic - must have all tags)
390    pub async fn list_by_tags(&self, tags: &[String]) -> Result<Vec<Workflow>> {
391        // Get all workflows and filter in memory for SQLite compatibility
392        let all_workflows = self.list().await?;
393
394        let workflows: Vec<Workflow> = all_workflows
395            .into_iter()
396            .filter(|w| tags.iter().all(|tag| w.metadata.tags.contains(tag)))
397            .collect();
398
399        Ok(workflows)
400    }
401
402    /// Get workflow IDs only (for lighter queries)
403    pub async fn list_ids(&self) -> Result<Vec<WorkflowId>> {
404        let rows = sqlx::query("SELECT id FROM workflows ORDER BY created_at DESC")
405            .fetch_all(self.pool.pool())
406            .await?;
407
408        let ids: Vec<WorkflowId> = rows
409            .into_iter()
410            .filter_map(|r| {
411                let id_str: String = r.get("id");
412                Uuid::parse_str(&id_str).ok()
413            })
414            .collect();
415
416        Ok(ids)
417    }
418
419    /// Check if a workflow exists
420    pub async fn exists(&self, id: &WorkflowId) -> Result<bool> {
421        let id_str = id.to_string();
422        let row = sqlx::query("SELECT 1 FROM workflows WHERE id = ? LIMIT 1")
423            .bind(&id_str)
424            .fetch_optional(self.pool.pool())
425            .await?;
426
427        Ok(row.is_some())
428    }
429
430    /// Get multiple workflows by IDs
431    pub async fn get_many(&self, ids: &[WorkflowId]) -> Result<Vec<Workflow>> {
432        // For SQLite, we query each ID individually
433        // This could be optimized with IN clause and parameter binding
434        let mut workflows = Vec::with_capacity(ids.len());
435
436        for id in ids {
437            if let Some(workflow) = self.get(id).await? {
438                workflows.push(workflow);
439            }
440        }
441
442        Ok(workflows)
443    }
444}
445
446/// Result of a bulk operation for a single item
447#[derive(Debug, Clone, Serialize, Deserialize)]
448pub struct BulkOperationResult {
449    pub id: Uuid,
450    pub success: bool,
451    pub error: Option<String>,
452}
453
454/// Workflow export format
455#[derive(Debug, Clone, Serialize, Deserialize)]
456pub struct WorkflowExport {
457    pub version: String,
458    pub exported_at: DateTime<Utc>,
459    pub count: usize,
460    pub workflows: Vec<Workflow>,
461}
462
463/// Import options
464#[derive(Debug, Clone, Default)]
465pub struct ImportOptions {
466    pub on_conflict: ConflictStrategy,
467}
468
469/// Strategy for handling conflicts during import
470#[derive(Debug, Clone, Copy, Default)]
471pub enum ConflictStrategy {
472    /// Skip existing workflows
473    #[default]
474    Skip,
475    /// Replace existing workflows
476    Replace,
477    /// Create new workflows with new IDs
478    CreateNew,
479    /// Fail if any workflow exists
480    Fail,
481}
482
483/// Import result with statistics
484#[derive(Debug, Clone, Serialize, Deserialize)]
485pub struct ImportResult {
486    pub total: usize,
487    pub imported: usize,
488    pub skipped: usize,
489    pub failed: usize,
490    pub errors: Vec<ImportError>,
491}
492
493/// Import error for a single workflow
494#[derive(Debug, Clone, Serialize, Deserialize)]
495pub struct ImportError {
496    pub workflow_id: Uuid,
497    pub workflow_name: String,
498    pub error: String,
499}
500
501#[cfg(test)]
502mod tests {
503    use super::*;
504    use oxify_model::{Node, NodeKind};
505
506    async fn setup_test_pool() -> Result<DatabasePool> {
507        let config = crate::DatabaseConfig {
508            database_url: std::env::var("DATABASE_URL")
509                .unwrap_or_else(|_| "sqlite::memory:".to_string()),
510            ..Default::default()
511        };
512        DatabasePool::new(config).await
513    }
514
515    #[tokio::test]
516    #[ignore] // Requires database
517    async fn test_workflow_crud() -> Result<()> {
518        let pool = setup_test_pool().await?;
519        pool.migrate().await?;
520
521        let store = WorkflowStore::new(pool);
522
523        // Create test workflow
524        let mut workflow = Workflow::new("Test Workflow".to_string());
525        workflow.add_node(Node::new("Start".to_string(), NodeKind::Start));
526
527        // Create
528        let id = store.create(&workflow).await?;
529        assert_eq!(id, workflow.metadata.id);
530
531        // Get
532        let fetched = store.get(&id).await?;
533        assert!(fetched.is_some());
534        assert_eq!(
535            fetched.as_ref().map(|w| w.metadata.name.as_str()),
536            Some("Test Workflow")
537        );
538
539        // Update
540        let mut updated = workflow.clone();
541        updated.metadata.name = "Updated Workflow".to_string();
542        let result = store.update(&id, &updated).await?;
543        assert!(result);
544
545        // Delete
546        let result = store.delete(&id).await?;
547        assert!(result);
548
549        // Verify deleted
550        let fetched = store.get(&id).await?;
551        assert!(fetched.is_none());
552
553        Ok(())
554    }
555}