1use crate::error::{ClusterError, Result};
12use dashmap::DashMap;
13use parking_lot::RwLock;
14use serde::{Deserialize, Serialize};
15use std::collections::{HashMap, VecDeque};
16use std::sync::Arc;
17use std::time::{Duration, SystemTime};
18
19pub type WorkflowId = uuid::Uuid;
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct Workflow {
25 pub id: WorkflowId,
27 pub name: String,
29 pub version: String,
31 pub description: Option<String>,
33 pub steps: Vec<WorkflowStep>,
35 pub variables: HashMap<String, serde_json::Value>,
37 pub created_at: SystemTime,
39 pub updated_at: SystemTime,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct WorkflowStep {
46 pub id: String,
48 pub name: String,
50 pub step_type: StepType,
52 pub depends_on: Vec<String>,
54 pub retry: Option<RetryConfig>,
56 pub timeout: Option<Duration>,
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
62#[serde(tag = "type")]
63pub enum StepType {
64 Task {
66 task_template: String,
68 parameters: HashMap<String, serde_json::Value>,
70 },
71 Condition {
73 condition: String,
75 then_steps: Vec<String>,
77 else_steps: Option<Vec<String>>,
79 },
80 Loop {
82 iterator: String,
84 items: Vec<serde_json::Value>,
86 body_steps: Vec<String>,
88 },
89 Parallel {
91 branches: Vec<Vec<String>>,
93 },
94 Wait {
96 duration: Duration,
98 },
99 Checkpoint {
101 name: String,
103 },
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct RetryConfig {
109 pub max_attempts: u32,
111 pub backoff: BackoffStrategy,
113 pub retry_on: Vec<String>,
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
119pub enum BackoffStrategy {
120 Fixed {
122 delay: Duration,
124 },
125 Exponential {
127 initial: Duration,
129 multiplier: f64,
131 max: Duration,
133 },
134 Linear {
136 initial: Duration,
138 increment: Duration,
140 max: Duration,
142 },
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct WorkflowExecution {
148 pub id: uuid::Uuid,
150 pub workflow_id: WorkflowId,
152 pub status: WorkflowStatus,
154 pub current_step: Option<String>,
156 pub completed_steps: Vec<String>,
158 pub failed_steps: Vec<String>,
160 pub context: HashMap<String, serde_json::Value>,
162 pub started_at: SystemTime,
164 pub completed_at: Option<SystemTime>,
166 pub history: Vec<ExecutionEvent>,
168}
169
170#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
172pub enum WorkflowStatus {
173 Pending,
175 Running,
177 Paused,
179 Completed,
181 Failed,
183 Cancelled,
185}
186
187#[derive(Debug, Clone, Serialize, Deserialize)]
189pub struct ExecutionEvent {
190 pub event_type: EventType,
192 pub step_id: Option<String>,
194 pub timestamp: SystemTime,
196 pub message: String,
198}
199
200#[derive(Debug, Clone, Serialize, Deserialize)]
202pub enum EventType {
203 WorkflowStarted,
205 StepStarted,
207 StepCompleted,
209 StepFailed,
211 StepRetrying,
213 WorkflowCompleted,
215 WorkflowFailed,
217 WorkflowPaused,
219 WorkflowResumed,
221 WorkflowCancelled,
223}
224
225pub struct WorkflowEngine {
227 workflows: Arc<DashMap<WorkflowId, Workflow>>,
229 executions: Arc<DashMap<uuid::Uuid, RwLock<WorkflowExecution>>>,
231 templates: Arc<DashMap<String, WorkflowTemplate>>,
233 queue: Arc<RwLock<VecDeque<uuid::Uuid>>>,
235 stats: Arc<RwLock<WorkflowStats>>,
237}
238
239#[derive(Debug, Clone, Serialize, Deserialize)]
241pub struct WorkflowTemplate {
242 pub name: String,
244 pub version: String,
246 pub parameters: Vec<TemplateParameter>,
248 pub steps: Vec<WorkflowStep>,
250}
251
252#[derive(Debug, Clone, Serialize, Deserialize)]
254pub struct TemplateParameter {
255 pub name: String,
257 pub param_type: String,
259 pub required: bool,
261 pub default: Option<serde_json::Value>,
263 pub description: Option<String>,
265}
266
267#[derive(Debug, Clone, Default, Serialize, Deserialize)]
269pub struct WorkflowStats {
270 pub total_workflows: usize,
272 pub total_executions: u64,
274 pub running_executions: usize,
276 pub completed_executions: u64,
278 pub failed_executions: u64,
280 pub average_execution_time: Duration,
282}
283
284impl WorkflowEngine {
285 pub fn new() -> Self {
287 Self {
288 workflows: Arc::new(DashMap::new()),
289 executions: Arc::new(DashMap::new()),
290 templates: Arc::new(DashMap::new()),
291 queue: Arc::new(RwLock::new(VecDeque::new())),
292 stats: Arc::new(RwLock::new(WorkflowStats::default())),
293 }
294 }
295
296 pub fn register_workflow(&self, workflow: Workflow) -> Result<WorkflowId> {
298 let id = workflow.id;
299 self.workflows.insert(id, workflow);
300
301 let mut stats = self.stats.write();
302 stats.total_workflows = self.workflows.len();
303
304 Ok(id)
305 }
306
307 pub fn register_template(&self, template: WorkflowTemplate) -> Result<()> {
309 self.templates.insert(template.name.clone(), template);
310 Ok(())
311 }
312
313 pub fn create_from_template(
315 &self,
316 template_name: &str,
317 parameters: HashMap<String, serde_json::Value>,
318 ) -> Result<Workflow> {
319 let template = self
320 .templates
321 .get(template_name)
322 .ok_or_else(|| ClusterError::WorkflowNotFound(template_name.to_string()))?;
323
324 for param in &template.parameters {
326 if param.required && !parameters.contains_key(¶m.name) {
327 return Err(ClusterError::InvalidConfiguration(format!(
328 "Missing required parameter: {}",
329 param.name
330 )));
331 }
332 }
333
334 let workflow = Workflow {
335 id: uuid::Uuid::new_v4(),
336 name: template.name.clone(),
337 version: template.version.clone(),
338 description: None,
339 steps: template.steps.clone(),
340 variables: parameters,
341 created_at: SystemTime::now(),
342 updated_at: SystemTime::now(),
343 };
344
345 Ok(workflow)
346 }
347
348 pub fn start_execution(&self, workflow_id: WorkflowId) -> Result<uuid::Uuid> {
350 let workflow = self
351 .workflows
352 .get(&workflow_id)
353 .ok_or_else(|| ClusterError::WorkflowNotFound(workflow_id.to_string()))?;
354
355 let execution_id = uuid::Uuid::new_v4();
356 let execution = WorkflowExecution {
357 id: execution_id,
358 workflow_id,
359 status: WorkflowStatus::Running,
360 current_step: None,
361 completed_steps: Vec::new(),
362 failed_steps: Vec::new(),
363 context: workflow.variables.clone(),
364 started_at: SystemTime::now(),
365 completed_at: None,
366 history: vec![ExecutionEvent {
367 event_type: EventType::WorkflowStarted,
368 step_id: None,
369 timestamp: SystemTime::now(),
370 message: format!("Started workflow execution: {}", workflow.name),
371 }],
372 };
373
374 self.executions.insert(execution_id, RwLock::new(execution));
375 self.queue.write().push_back(execution_id);
376
377 let mut stats = self.stats.write();
378 stats.total_executions += 1;
379 stats.running_executions = self.count_running_executions();
380
381 Ok(execution_id)
382 }
383
384 pub fn pause_execution(&self, execution_id: uuid::Uuid) -> Result<()> {
386 let execution = self
387 .executions
388 .get(&execution_id)
389 .ok_or_else(|| ClusterError::WorkflowNotFound(execution_id.to_string()))?;
390
391 let mut exec = execution.write();
392 if exec.status != WorkflowStatus::Running {
393 return Err(ClusterError::InvalidOperation(format!(
394 "Cannot pause workflow in status {:?}",
395 exec.status
396 )));
397 }
398
399 exec.status = WorkflowStatus::Paused;
400 let current_step = exec.current_step.clone();
401 exec.history.push(ExecutionEvent {
402 event_type: EventType::WorkflowPaused,
403 step_id: current_step,
404 timestamp: SystemTime::now(),
405 message: "Workflow paused".to_string(),
406 });
407
408 Ok(())
409 }
410
411 pub fn resume_execution(&self, execution_id: uuid::Uuid) -> Result<()> {
413 let execution = self
414 .executions
415 .get(&execution_id)
416 .ok_or_else(|| ClusterError::WorkflowNotFound(execution_id.to_string()))?;
417
418 let mut exec = execution.write();
419 if exec.status != WorkflowStatus::Paused {
420 return Err(ClusterError::InvalidOperation(format!(
421 "Cannot resume workflow in status {:?}",
422 exec.status
423 )));
424 }
425
426 exec.status = WorkflowStatus::Running;
427 let current_step = exec.current_step.clone();
428 exec.history.push(ExecutionEvent {
429 event_type: EventType::WorkflowResumed,
430 step_id: current_step,
431 timestamp: SystemTime::now(),
432 message: "Workflow resumed".to_string(),
433 });
434
435 self.queue.write().push_back(execution_id);
436
437 Ok(())
438 }
439
440 pub fn cancel_execution(&self, execution_id: uuid::Uuid) -> Result<()> {
442 let execution = self
443 .executions
444 .get(&execution_id)
445 .ok_or_else(|| ClusterError::WorkflowNotFound(execution_id.to_string()))?;
446
447 let mut exec = execution.write();
448 exec.status = WorkflowStatus::Cancelled;
449 exec.completed_at = Some(SystemTime::now());
450 let current_step = exec.current_step.clone();
451 exec.history.push(ExecutionEvent {
452 event_type: EventType::WorkflowCancelled,
453 step_id: current_step,
454 timestamp: SystemTime::now(),
455 message: "Workflow cancelled".to_string(),
456 });
457
458 let mut stats = self.stats.write();
459 stats.running_executions = self.count_running_executions();
460
461 Ok(())
462 }
463
464 pub fn complete_step(&self, execution_id: uuid::Uuid, step_id: String) -> Result<()> {
466 let execution = self
467 .executions
468 .get(&execution_id)
469 .ok_or_else(|| ClusterError::WorkflowNotFound(execution_id.to_string()))?;
470
471 let mut exec = execution.write();
472 exec.completed_steps.push(step_id.clone());
473 exec.history.push(ExecutionEvent {
474 event_type: EventType::StepCompleted,
475 step_id: Some(step_id),
476 timestamp: SystemTime::now(),
477 message: "Step completed successfully".to_string(),
478 });
479
480 Ok(())
481 }
482
483 pub fn fail_step(
485 &self,
486 execution_id: uuid::Uuid,
487 step_id: String,
488 error: String,
489 ) -> Result<()> {
490 let execution = self
491 .executions
492 .get(&execution_id)
493 .ok_or_else(|| ClusterError::WorkflowNotFound(execution_id.to_string()))?;
494
495 let mut exec = execution.write();
496 exec.failed_steps.push(step_id.clone());
497 exec.history.push(ExecutionEvent {
498 event_type: EventType::StepFailed,
499 step_id: Some(step_id),
500 timestamp: SystemTime::now(),
501 message: error,
502 });
503
504 Ok(())
505 }
506
507 pub fn get_execution(&self, execution_id: uuid::Uuid) -> Option<WorkflowExecution> {
509 self.executions.get(&execution_id).map(|e| e.read().clone())
510 }
511
512 pub fn list_executions(&self, workflow_id: WorkflowId) -> Vec<WorkflowExecution> {
514 self.executions
515 .iter()
516 .filter(|entry| entry.value().read().workflow_id == workflow_id)
517 .map(|entry| entry.value().read().clone())
518 .collect()
519 }
520
521 pub fn list_running_executions(&self) -> Vec<WorkflowExecution> {
523 self.executions
524 .iter()
525 .filter(|entry| entry.value().read().status == WorkflowStatus::Running)
526 .map(|entry| entry.value().read().clone())
527 .collect()
528 }
529
530 fn count_running_executions(&self) -> usize {
531 self.executions
532 .iter()
533 .filter(|entry| entry.value().read().status == WorkflowStatus::Running)
534 .count()
535 }
536
537 pub fn get_stats(&self) -> WorkflowStats {
539 self.stats.read().clone()
540 }
541}
542
543impl Default for WorkflowEngine {
544 fn default() -> Self {
545 Self::new()
546 }
547}
548
549#[cfg(test)]
550#[allow(clippy::expect_used, clippy::unwrap_used)]
551mod tests {
552 use super::*;
553
554 #[test]
555 fn test_workflow_creation() {
556 let workflow = Workflow {
557 id: uuid::Uuid::new_v4(),
558 name: "test-workflow".to_string(),
559 version: "1.0.0".to_string(),
560 description: Some("Test workflow".to_string()),
561 steps: vec![],
562 variables: HashMap::new(),
563 created_at: SystemTime::now(),
564 updated_at: SystemTime::now(),
565 };
566
567 let engine = WorkflowEngine::new();
568 let result = engine.register_workflow(workflow.clone());
569
570 assert!(result.is_ok());
571 assert_eq!(
572 result.expect("workflow registration should succeed"),
573 workflow.id
574 );
575 }
576
577 #[test]
578 fn test_workflow_execution() {
579 let workflow = Workflow {
580 id: uuid::Uuid::new_v4(),
581 name: "test-workflow".to_string(),
582 version: "1.0.0".to_string(),
583 description: None,
584 steps: vec![],
585 variables: HashMap::new(),
586 created_at: SystemTime::now(),
587 updated_at: SystemTime::now(),
588 };
589
590 let engine = WorkflowEngine::new();
591 let workflow_id = engine
592 .register_workflow(workflow)
593 .expect("workflow registration should succeed");
594
595 let execution_id = engine
596 .start_execution(workflow_id)
597 .expect("workflow execution should start");
598 let execution = engine.get_execution(execution_id);
599
600 assert!(execution.is_some());
601 let execution = execution.expect("execution should exist");
602 assert_eq!(execution.status, WorkflowStatus::Running);
603 }
604
605 #[test]
606 fn test_workflow_pause_resume() {
607 let workflow = Workflow {
608 id: uuid::Uuid::new_v4(),
609 name: "test-workflow".to_string(),
610 version: "1.0.0".to_string(),
611 description: None,
612 steps: vec![],
613 variables: HashMap::new(),
614 created_at: SystemTime::now(),
615 updated_at: SystemTime::now(),
616 };
617
618 let engine = WorkflowEngine::new();
619 let workflow_id = engine
620 .register_workflow(workflow)
621 .expect("workflow registration should succeed");
622 let execution_id = engine
623 .start_execution(workflow_id)
624 .expect("workflow execution should start");
625
626 engine.pause_execution(execution_id).ok();
628 let execution = engine
629 .get_execution(execution_id)
630 .expect("execution should exist after pause");
631 assert_eq!(execution.status, WorkflowStatus::Paused);
632
633 engine.resume_execution(execution_id).ok();
635 let execution = engine
636 .get_execution(execution_id)
637 .expect("execution should exist after resume");
638 assert_eq!(execution.status, WorkflowStatus::Running);
639 }
640
641 #[test]
642 fn test_template_creation() {
643 let template = WorkflowTemplate {
644 name: "test-template".to_string(),
645 version: "1.0.0".to_string(),
646 parameters: vec![TemplateParameter {
647 name: "input".to_string(),
648 param_type: "string".to_string(),
649 required: true,
650 default: None,
651 description: Some("Input parameter".to_string()),
652 }],
653 steps: vec![],
654 };
655
656 let engine = WorkflowEngine::new();
657 engine.register_template(template.clone()).ok();
658
659 let mut params = HashMap::new();
660 params.insert("input".to_string(), serde_json::json!("test-value"));
661
662 let workflow = engine.create_from_template("test-template", params);
663 assert!(workflow.is_ok());
664 }
665}