1use crate::{DatabasePool, Result, StorageError};
4use chrono::{DateTime, Utc};
5use oxify_model::{Workflow, WorkflowId};
6use serde::{Deserialize, Serialize};
7use sqlx::Row;
8use uuid::Uuid;
9
10#[derive(Clone)]
12pub struct WorkflowStore {
13 pool: DatabasePool,
14}
15
16impl WorkflowStore {
17 pub fn new(pool: DatabasePool) -> Self {
19 Self { pool }
20 }
21
22 #[tracing::instrument(skip(self, workflow), fields(workflow_id = %workflow.metadata.id, workflow_name = %workflow.metadata.name))]
24 pub async fn create(&self, workflow: &Workflow) -> Result<WorkflowId> {
25 let id = workflow.metadata.id.to_string();
26 let name = &workflow.metadata.name;
27 let description = workflow.metadata.description.as_ref();
28 let definition = serde_json::to_string(workflow)?;
29 let tags = serde_json::to_string(&workflow.metadata.tags)?;
30
31 sqlx::query(
32 r#"
33 INSERT INTO workflows (id, name, description, definition, tags)
34 VALUES (?, ?, ?, ?, ?)
35 "#,
36 )
37 .bind(&id)
38 .bind(name)
39 .bind(description)
40 .bind(&definition)
41 .bind(&tags)
42 .execute(self.pool.pool())
43 .await?;
44
45 Ok(workflow.metadata.id)
46 }
47
48 #[tracing::instrument(skip(self), fields(workflow_id = %id))]
50 pub async fn get(&self, id: &WorkflowId) -> Result<Option<Workflow>> {
51 let id_str = id.to_string();
52 let row = sqlx::query(
53 r#"
54 SELECT id, name, description, created_at, updated_at, version, definition, tags
55 FROM workflows
56 WHERE id = ?
57 "#,
58 )
59 .bind(&id_str)
60 .fetch_optional(self.pool.pool())
61 .await?;
62
63 match row {
64 Some(row) => {
65 let definition_str: String = row.get("definition");
66 let workflow: Workflow = serde_json::from_str(&definition_str)?;
67 Ok(Some(workflow))
68 }
69 None => Ok(None),
70 }
71 }
72
73 pub async fn list(&self) -> Result<Vec<Workflow>> {
75 let rows = sqlx::query(
76 r#"
77 SELECT id, name, description, created_at, updated_at, version, definition, tags
78 FROM workflows
79 ORDER BY created_at DESC
80 "#,
81 )
82 .fetch_all(self.pool.pool())
83 .await?;
84
85 let workflows: Vec<Workflow> = rows
86 .into_iter()
87 .filter_map(|row| {
88 let definition_str: String = row.get("definition");
89 serde_json::from_str(&definition_str).ok()
90 })
91 .collect();
92
93 Ok(workflows)
94 }
95
96 pub async fn list_paginated(&self, limit: i64, offset: i64) -> Result<Vec<Workflow>> {
98 let rows = sqlx::query(
99 r#"
100 SELECT id, name, description, created_at, updated_at, version, definition, tags
101 FROM workflows
102 ORDER BY created_at DESC
103 LIMIT ? OFFSET ?
104 "#,
105 )
106 .bind(limit)
107 .bind(offset)
108 .fetch_all(self.pool.pool())
109 .await?;
110
111 let workflows: Vec<Workflow> = rows
112 .into_iter()
113 .filter_map(|row| {
114 let definition_str: String = row.get("definition");
115 serde_json::from_str(&definition_str).ok()
116 })
117 .collect();
118
119 Ok(workflows)
120 }
121
122 pub async fn count(&self) -> Result<i64> {
124 let row = sqlx::query("SELECT COUNT(*) as count FROM workflows")
125 .fetch_one(self.pool.pool())
126 .await?;
127
128 let count: i64 = row.get("count");
129 Ok(count)
130 }
131
132 #[tracing::instrument(skip(self, workflow), fields(workflow_id = %id, workflow_name = %workflow.metadata.name))]
134 pub async fn update(&self, id: &WorkflowId, workflow: &Workflow) -> Result<bool> {
135 let id_str = id.to_string();
136 let name = &workflow.metadata.name;
137 let description = workflow.metadata.description.as_ref();
138 let definition = serde_json::to_string(workflow)?;
139 let tags = serde_json::to_string(&workflow.metadata.tags)?;
140
141 let result = sqlx::query(
142 r#"
143 UPDATE workflows
144 SET name = ?, description = ?, definition = ?, tags = ?, version = version + 1, updated_at = datetime('now')
145 WHERE id = ?
146 "#,
147 )
148 .bind(name)
149 .bind(description)
150 .bind(&definition)
151 .bind(&tags)
152 .bind(&id_str)
153 .execute(self.pool.pool())
154 .await?;
155
156 Ok(result.rows_affected() > 0)
157 }
158
159 #[tracing::instrument(skip(self), fields(workflow_id = %id))]
161 pub async fn delete(&self, id: &WorkflowId) -> Result<bool> {
162 let id_str = id.to_string();
163 let result = sqlx::query(
164 r#"
165 DELETE FROM workflows
166 WHERE id = ?
167 "#,
168 )
169 .bind(&id_str)
170 .execute(self.pool.pool())
171 .await?;
172
173 Ok(result.rows_affected() > 0)
174 }
175
176 pub async fn search(&self, query: &str) -> Result<Vec<Workflow>> {
178 let pattern = format!("%{query}%");
179
180 let rows = sqlx::query(
181 r#"
182 SELECT id, name, description, created_at, updated_at, version, definition, tags
183 FROM workflows
184 WHERE name LIKE ? OR description LIKE ?
185 ORDER BY created_at DESC
186 "#,
187 )
188 .bind(&pattern)
189 .bind(&pattern)
190 .fetch_all(self.pool.pool())
191 .await?;
192
193 let workflows: Vec<Workflow> = rows
194 .into_iter()
195 .filter_map(|row| {
196 let definition_str: String = row.get("definition");
197 serde_json::from_str(&definition_str).ok()
198 })
199 .collect();
200
201 Ok(workflows)
202 }
203
204 pub async fn bulk_create(&self, workflows: &[Workflow]) -> Result<Vec<BulkOperationResult>> {
209 let mut results = Vec::with_capacity(workflows.len());
210
211 for workflow in workflows {
212 let result = match self.create(workflow).await {
213 Ok(id) => BulkOperationResult {
214 id,
215 success: true,
216 error: None,
217 },
218 Err(e) => BulkOperationResult {
219 id: workflow.metadata.id,
220 success: false,
221 error: Some(e.to_string()),
222 },
223 };
224 results.push(result);
225 }
226
227 Ok(results)
228 }
229
230 pub async fn bulk_delete(&self, ids: &[WorkflowId]) -> Result<Vec<BulkOperationResult>> {
233 let mut results = Vec::with_capacity(ids.len());
234
235 for id in ids {
236 let result = match self.delete(id).await {
237 Ok(deleted) => BulkOperationResult {
238 id: *id,
239 success: deleted,
240 error: if deleted {
241 None
242 } else {
243 Some("Workflow not found".to_string())
244 },
245 },
246 Err(e) => BulkOperationResult {
247 id: *id,
248 success: false,
249 error: Some(e.to_string()),
250 },
251 };
252 results.push(result);
253 }
254
255 Ok(results)
256 }
257
258 pub async fn bulk_export(&self, ids: Option<&[WorkflowId]>) -> Result<WorkflowExport> {
260 let workflows = match ids {
261 Some(ids) => {
262 let mut workflows = Vec::with_capacity(ids.len());
263 for id in ids {
264 if let Some(workflow) = self.get(id).await? {
265 workflows.push(workflow);
266 }
267 }
268 workflows
269 }
270 None => self.list().await?,
271 };
272
273 Ok(WorkflowExport {
274 version: "1.0".to_string(),
275 exported_at: Utc::now(),
276 count: workflows.len(),
277 workflows,
278 })
279 }
280
281 pub async fn bulk_import(
284 &self,
285 export: &WorkflowExport,
286 options: ImportOptions,
287 ) -> Result<ImportResult> {
288 let mut result = ImportResult {
289 total: export.workflows.len(),
290 imported: 0,
291 skipped: 0,
292 failed: 0,
293 errors: Vec::new(),
294 };
295
296 for workflow in &export.workflows {
297 let existing = self.get(&workflow.metadata.id).await?;
299
300 if existing.is_some() {
301 match options.on_conflict {
302 ConflictStrategy::Skip => {
303 result.skipped += 1;
304 continue;
305 }
306 ConflictStrategy::Replace => {
307 match self.update(&workflow.metadata.id, workflow).await {
308 Ok(_) => result.imported += 1,
309 Err(e) => {
310 result.failed += 1;
311 result.errors.push(ImportError {
312 workflow_id: workflow.metadata.id,
313 workflow_name: workflow.metadata.name.clone(),
314 error: e.to_string(),
315 });
316 }
317 }
318 }
319 ConflictStrategy::CreateNew => {
320 let mut new_workflow = workflow.clone();
322 new_workflow.metadata.id = Uuid::new_v4();
323 match self.create(&new_workflow).await {
324 Ok(_) => result.imported += 1,
325 Err(e) => {
326 result.failed += 1;
327 result.errors.push(ImportError {
328 workflow_id: workflow.metadata.id,
329 workflow_name: workflow.metadata.name.clone(),
330 error: e.to_string(),
331 });
332 }
333 }
334 }
335 ConflictStrategy::Fail => {
336 return Err(StorageError::ConstraintViolation(format!(
337 "Workflow {} already exists",
338 workflow.metadata.id
339 )));
340 }
341 }
342 } else {
343 match self.create(workflow).await {
344 Ok(_) => result.imported += 1,
345 Err(e) => {
346 result.failed += 1;
347 result.errors.push(ImportError {
348 workflow_id: workflow.metadata.id,
349 workflow_name: workflow.metadata.name.clone(),
350 error: e.to_string(),
351 });
352 }
353 }
354 }
355 }
356
357 Ok(result)
358 }
359
360 pub async fn list_by_tag(&self, tag: &str) -> Result<Vec<Workflow>> {
363 let pattern = format!("%\"{tag}\"%");
365
366 let rows = sqlx::query(
367 r#"
368 SELECT id, name, description, created_at, updated_at, version, definition, tags
369 FROM workflows
370 WHERE tags LIKE ?
371 ORDER BY created_at DESC
372 "#,
373 )
374 .bind(&pattern)
375 .fetch_all(self.pool.pool())
376 .await?;
377
378 let workflows: Vec<Workflow> = rows
379 .into_iter()
380 .filter_map(|row| {
381 let definition_str: String = row.get("definition");
382 serde_json::from_str(&definition_str).ok()
383 })
384 .collect();
385
386 Ok(workflows)
387 }
388
389 pub async fn list_by_tags(&self, tags: &[String]) -> Result<Vec<Workflow>> {
391 let all_workflows = self.list().await?;
393
394 let workflows: Vec<Workflow> = all_workflows
395 .into_iter()
396 .filter(|w| tags.iter().all(|tag| w.metadata.tags.contains(tag)))
397 .collect();
398
399 Ok(workflows)
400 }
401
402 pub async fn list_ids(&self) -> Result<Vec<WorkflowId>> {
404 let rows = sqlx::query("SELECT id FROM workflows ORDER BY created_at DESC")
405 .fetch_all(self.pool.pool())
406 .await?;
407
408 let ids: Vec<WorkflowId> = rows
409 .into_iter()
410 .filter_map(|r| {
411 let id_str: String = r.get("id");
412 Uuid::parse_str(&id_str).ok()
413 })
414 .collect();
415
416 Ok(ids)
417 }
418
419 pub async fn exists(&self, id: &WorkflowId) -> Result<bool> {
421 let id_str = id.to_string();
422 let row = sqlx::query("SELECT 1 FROM workflows WHERE id = ? LIMIT 1")
423 .bind(&id_str)
424 .fetch_optional(self.pool.pool())
425 .await?;
426
427 Ok(row.is_some())
428 }
429
430 pub async fn get_many(&self, ids: &[WorkflowId]) -> Result<Vec<Workflow>> {
432 let mut workflows = Vec::with_capacity(ids.len());
435
436 for id in ids {
437 if let Some(workflow) = self.get(id).await? {
438 workflows.push(workflow);
439 }
440 }
441
442 Ok(workflows)
443 }
444}
445
446#[derive(Debug, Clone, Serialize, Deserialize)]
448pub struct BulkOperationResult {
449 pub id: Uuid,
450 pub success: bool,
451 pub error: Option<String>,
452}
453
454#[derive(Debug, Clone, Serialize, Deserialize)]
456pub struct WorkflowExport {
457 pub version: String,
458 pub exported_at: DateTime<Utc>,
459 pub count: usize,
460 pub workflows: Vec<Workflow>,
461}
462
463#[derive(Debug, Clone, Default)]
465pub struct ImportOptions {
466 pub on_conflict: ConflictStrategy,
467}
468
469#[derive(Debug, Clone, Copy, Default)]
471pub enum ConflictStrategy {
472 #[default]
474 Skip,
475 Replace,
477 CreateNew,
479 Fail,
481}
482
483#[derive(Debug, Clone, Serialize, Deserialize)]
485pub struct ImportResult {
486 pub total: usize,
487 pub imported: usize,
488 pub skipped: usize,
489 pub failed: usize,
490 pub errors: Vec<ImportError>,
491}
492
493#[derive(Debug, Clone, Serialize, Deserialize)]
495pub struct ImportError {
496 pub workflow_id: Uuid,
497 pub workflow_name: String,
498 pub error: String,
499}
500
501#[cfg(test)]
502mod tests {
503 use super::*;
504 use oxify_model::{Node, NodeKind};
505
506 async fn setup_test_pool() -> Result<DatabasePool> {
507 let config = crate::DatabaseConfig {
508 database_url: std::env::var("DATABASE_URL")
509 .unwrap_or_else(|_| "sqlite::memory:".to_string()),
510 ..Default::default()
511 };
512 DatabasePool::new(config).await
513 }
514
515 #[tokio::test]
516 #[ignore] async fn test_workflow_crud() -> Result<()> {
518 let pool = setup_test_pool().await?;
519 pool.migrate().await?;
520
521 let store = WorkflowStore::new(pool);
522
523 let mut workflow = Workflow::new("Test Workflow".to_string());
525 workflow.add_node(Node::new("Start".to_string(), NodeKind::Start));
526
527 let id = store.create(&workflow).await?;
529 assert_eq!(id, workflow.metadata.id);
530
531 let fetched = store.get(&id).await?;
533 assert!(fetched.is_some());
534 assert_eq!(
535 fetched.as_ref().map(|w| w.metadata.name.as_str()),
536 Some("Test Workflow")
537 );
538
539 let mut updated = workflow.clone();
541 updated.metadata.name = "Updated Workflow".to_string();
542 let result = store.update(&id, &updated).await?;
543 assert!(result);
544
545 let result = store.delete(&id).await?;
547 assert!(result);
548
549 let fetched = store.get(&id).await?;
551 assert!(fetched.is_none());
552
553 Ok(())
554 }
555}