1use std::collections::{HashMap, HashSet, VecDeque};
7use std::sync::Arc;
8use serde::{Deserialize, Serialize};
9
10use crate::ir::{WorkflowIR, WorkflowStrategyOp, ActivityIR};
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
14pub enum OptimizationStrategy {
15 CostBased {
17 cost_model: CostModel,
18 budget_limit: Option<f64>,
19 },
20 PerformanceBased {
22 target_throughput: f64,
23 target_latency: std::time::Duration,
24 },
25 ResourceBased {
27 max_parallelism: usize,
28 resource_limits: HashMap<String, f64>,
29 },
30 QualityBased {
32 reliability_target: f64,
33 quality_metrics: HashMap<String, f64>,
34 },
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct CostModel {
40 pub cpu_cost_factor: f64,
42 pub memory_cost_factor: f64,
44 pub io_cost_factor: f64,
46 pub network_cost_factor: f64,
48 pub activity_costs: HashMap<String, ActivityCost>,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct ActivityCost {
55 pub base_cost: f64,
56 pub cpu_usage: f64,
57 pub memory_usage: f64,
58 pub io_operations: f64,
59 pub network_usage: f64,
60 pub estimated_duration: std::time::Duration,
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
65pub struct OptimizationResult {
66 pub original_workflow: WorkflowIR,
67 pub optimized_workflow: WorkflowIR,
68 pub estimated_cost: f64,
69 pub estimated_duration: std::time::Duration,
70 pub improvements: Vec<OptimizationImprovement>,
71 pub applied_optimizations: Vec<String>,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct OptimizationImprovement {
77 pub improvement_type: String,
78 pub description: String,
79 pub cost_savings: f64,
80 pub duration_reduction: std::time::Duration,
81 pub confidence: f64, }
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct ParallelExecutionPlan {
87 pub stages: Vec<ExecutionStage>,
88 pub dependencies: HashMap<String, Vec<String>>,
89 pub resource_requirements: HashMap<String, ResourceRequirement>,
90 pub estimated_parallelism: usize,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct ExecutionStage {
96 pub stage_id: String,
97 pub activities: Vec<String>,
98 pub execution_mode: ExecutionMode,
99 pub estimated_duration: std::time::Duration,
100 pub resource_usage: ResourceUsage,
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize)]
105pub enum ExecutionMode {
106 Sequential,
107 Parallel,
108 Conditional,
109 Loop,
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct ResourceRequirement {
115 pub cpu_cores: usize,
116 pub memory_mb: usize,
117 pub io_bandwidth: f64,
118 pub network_bandwidth: f64,
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize)]
123pub struct ResourceUsage {
124 pub cpu_usage: f64,
125 pub memory_usage: f64,
126 pub io_usage: f64,
127 pub network_usage: f64,
128}
129
130pub struct WorkflowOptimizer {
132 cost_model: CostModel,
133 resource_manager: ResourceManager,
134 optimization_rules: Vec<Box<dyn OptimizationRule>>,
135}
136
137#[async_trait::async_trait]
138pub trait OptimizationRule: Send + Sync {
139 async fn apply(&self, workflow: &WorkflowIR, context: &OptimizationContext) -> Option<OptimizationResult>;
140 fn name(&self) -> &str;
141 fn priority(&self) -> i32; }
143
144#[derive(Debug, Clone)]
146pub struct OptimizationContext {
147 pub available_resources: HashMap<String, f64>,
148 pub historical_performance: HashMap<String, ActivityPerformance>,
149 pub target_constraints: OptimizationStrategy,
150 pub execution_history: Vec<WorkflowExecution>,
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct ActivityPerformance {
156 pub activity_name: String,
157 pub avg_execution_time: std::time::Duration,
158 pub success_rate: f64,
159 pub resource_usage: ResourceUsage,
160 pub cost_per_execution: f64,
161 pub sample_size: usize,
162}
163
164#[derive(Debug, Clone, Serialize, Deserialize)]
166pub struct WorkflowExecution {
167 pub workflow_id: String,
168 pub execution_time: std::time::Duration,
169 pub cost: f64,
170 pub success: bool,
171 pub resource_usage: ResourceUsage,
172}
173
174pub struct ResourceManager {
176 available_resources: HashMap<String, f64>,
177 allocated_resources: HashMap<String, f64>,
178}
179
180impl ResourceManager {
181 pub fn new(available_resources: HashMap<String, f64>) -> Self {
182 Self {
183 available_resources,
184 allocated_resources: HashMap::new(),
185 }
186 }
187
188 pub fn check_resource_availability(&self, requirements: &ResourceRequirement) -> bool {
190 let available_cpu = self.available_resources.get("cpu").unwrap_or(&0.0) - self.allocated_resources.get("cpu").unwrap_or(&0.0);
191 let available_memory = self.available_resources.get("memory").unwrap_or(&0.0) - self.allocated_resources.get("memory").unwrap_or(&0.0);
192
193 available_cpu >= requirements.cpu_cores as f64 && available_memory >= requirements.memory_mb as f64
194 }
195
196 pub fn allocate_resources(&mut self, requirements: &ResourceRequirement) -> bool {
198 if !self.check_resource_availability(requirements) {
199 return false;
200 }
201
202 *self.allocated_resources.entry("cpu".to_string()).or_insert(0.0) += requirements.cpu_cores as f64;
203 *self.allocated_resources.entry("memory".to_string()).or_insert(0.0) += requirements.memory_mb as f64;
204
205 true
206 }
207
208 pub fn release_resources(&mut self, requirements: &ResourceRequirement) {
210 if let Some(cpu) = self.allocated_resources.get_mut("cpu") {
211 *cpu = (*cpu - requirements.cpu_cores as f64).max(0.0);
212 }
213 if let Some(memory) = self.allocated_resources.get_mut("memory") {
214 *memory = (*memory - requirements.memory_mb as f64).max(0.0);
215 }
216 }
217}
218
219impl WorkflowOptimizer {
220 pub fn new(cost_model: CostModel, resource_manager: ResourceManager) -> Self {
221 Self {
222 cost_model,
223 resource_manager,
224 optimization_rules: Vec::new(),
225 }
226 }
227
228 pub fn add_rule(&mut self, rule: Box<dyn OptimizationRule>) {
230 self.optimization_rules.push(rule);
231 self.optimization_rules.sort_by_key(|r| r.priority());
232 }
233
234 pub async fn optimize_workflow(
236 &self,
237 workflow: &WorkflowIR,
238 context: &OptimizationContext,
239 ) -> Result<OptimizationResult, OptimizationError> {
240 let mut best_result = None;
241 let mut best_score = f64::INFINITY;
242
243 for rule in &self.optimization_rules {
245 if let Some(result) = rule.apply(workflow, context).await {
246 let score = self.calculate_optimization_score(&result);
247 if score < best_score {
248 best_score = score;
249 best_result = Some(result);
250 }
251 }
252 }
253
254 best_result.ok_or(OptimizationError::NoOptimizationPossible)
255 }
256
257 pub async fn generate_parallel_plan(
259 &self,
260 workflow: &WorkflowIR,
261 max_parallelism: usize,
262 ) -> Result<ParallelExecutionPlan, OptimizationError> {
263 let mut stages = Vec::new();
264 let mut dependencies = HashMap::new();
265 let mut processed = HashSet::new();
266 let mut queue = VecDeque::new();
267
268 queue.push_back(workflow.strategy.clone());
270
271 while !queue.is_empty() {
272 let mut current_stage_activities = Vec::new();
273 let mut current_dependencies: HashMap<String, Vec<String>> = HashMap::new();
274
275 let batch_size = std::cmp::min(queue.len(), max_parallelism);
277 for _ in 0..batch_size {
278 if let Some(strategy) = queue.pop_front() {
279 match strategy {
280 WorkflowStrategyOp::Activity { activity_ref, .. } => {
281 if !processed.contains(&activity_ref) {
282 current_stage_activities.push(activity_ref.clone());
283 processed.insert(activity_ref);
284 }
285 }
286 WorkflowStrategyOp::Seq { strategies } => {
287 for strategy in strategies {
288 queue.push_back(*strategy);
289 }
290 }
291 WorkflowStrategyOp::Parallel { branches, .. } => {
292 for branch in branches {
293 queue.push_back(*branch);
294 }
295 }
296 _ => {
297 queue.push_back(strategy);
299 }
300 }
301 }
302 }
303
304 if !current_stage_activities.is_empty() {
305 let stage = ExecutionStage {
306 stage_id: format!("stage_{}", stages.len()),
307 activities: current_stage_activities,
308 execution_mode: ExecutionMode::Parallel,
309 estimated_duration: std::time::Duration::from_secs(1), resource_usage: ResourceUsage {
311 cpu_usage: 1.0,
312 memory_usage: 100.0,
313 io_usage: 0.0,
314 network_usage: 0.0,
315 },
316 };
317 stages.push(stage);
318 }
319 }
320
321 Ok(ParallelExecutionPlan {
322 stages,
323 dependencies,
324 resource_requirements: HashMap::new(), estimated_parallelism: max_parallelism,
326 })
327 }
328
329 pub fn estimate_cost(&self, workflow: &WorkflowIR) -> f64 {
331 let mut total_cost = 0.0;
332
333 self.traverse_workflow(&workflow.strategy, &mut |activity_ref| {
334 if let Some(activity_cost) = self.cost_model.activity_costs.get(activity_ref) {
335 total_cost += activity_cost.base_cost
336 + activity_cost.cpu_usage * self.cost_model.cpu_cost_factor
337 + activity_cost.memory_usage * self.cost_model.memory_cost_factor
338 + activity_cost.io_operations * self.cost_model.io_cost_factor
339 + activity_cost.network_usage * self.cost_model.network_cost_factor;
340 }
341 });
342
343 total_cost
344 }
345
346 pub fn estimate_duration(&self, workflow: &WorkflowIR) -> std::time::Duration {
348 let mut total_duration = std::time::Duration::from_secs(0);
349
350 self.traverse_workflow(&workflow.strategy, &mut |activity_ref| {
351 if let Some(activity_cost) = self.cost_model.activity_costs.get(activity_ref) {
352 total_duration += activity_cost.estimated_duration;
353 }
354 });
355
356 total_duration
357 }
358
359 fn traverse_workflow<F>(&self, strategy: &WorkflowStrategyOp, processor: &mut F)
361 where
362 F: FnMut(&str),
363 {
364 match strategy {
365 WorkflowStrategyOp::Activity { activity_ref, .. } => {
366 processor(activity_ref);
367 }
368 WorkflowStrategyOp::Seq { strategies } => {
369 for strategy in strategies {
370 self.traverse_workflow(strategy, processor);
371 }
372 }
373 WorkflowStrategyOp::Parallel { branches, .. } => {
374 for branch in branches {
375 self.traverse_workflow(branch, processor);
376 }
377 }
378 WorkflowStrategyOp::Decision { conditions, default_branch } => {
379 for branch in conditions {
380 self.traverse_workflow(&branch.branch, processor);
381 }
382 if let Some(default_branch) = default_branch {
383 self.traverse_workflow(default_branch, processor);
384 }
385 }
386 WorkflowStrategyOp::Wait { .. } => {
387 }
389 WorkflowStrategyOp::Saga { main_flow, compensation } => {
390 self.traverse_workflow(main_flow, processor);
391 self.traverse_workflow(compensation, processor);
392 }
393 WorkflowStrategyOp::SubWorkflow { .. } => {
394 }
396 _ => {}
397 }
398 }
399
400 fn calculate_optimization_score(&self, result: &OptimizationResult) -> f64 {
402 let cost_weight = 0.6;
404 let time_weight = 0.4;
405
406 let cost_score = result.improvements.iter()
407 .map(|imp| imp.cost_savings * imp.confidence)
408 .sum::<f64>();
409
410 let time_score = result.improvements.iter()
411 .map(|imp| imp.duration_reduction.as_secs_f64() * imp.confidence)
412 .sum::<f64>();
413
414 -(cost_score * cost_weight + time_score * time_weight) }
416}
417
418pub struct ParallelExecutionRule {
420 max_parallelism: usize,
421}
422
423impl ParallelExecutionRule {
424 pub fn new(max_parallelism: usize) -> Self {
425 Self { max_parallelism }
426 }
427}
428
429#[async_trait::async_trait]
430impl OptimizationRule for ParallelExecutionRule {
431 async fn apply(&self, workflow: &WorkflowIR, context: &OptimizationContext) -> Option<OptimizationResult> {
432 let mut parallelizable_activities = Vec::new();
434
435 Self::find_parallelizable_activities(&workflow.strategy, &mut parallelizable_activities);
436
437 if parallelizable_activities.len() < 2 {
438 return None;
439 }
440
441 let optimized_strategy = Self::optimize_parallel_execution(&workflow.strategy, self.max_parallelism);
443
444 let improvements = vec![
445 OptimizationImprovement {
446 improvement_type: "parallel_execution".to_string(),
447 description: format!("Parallelized {} activities", parallelizable_activities.len()),
448 cost_savings: 0.0, duration_reduction: std::time::Duration::from_secs(10), confidence: 0.8,
451 }
452 ];
453
454 let mut optimized_workflow = workflow.clone();
455 optimized_workflow.strategy = optimized_strategy;
456
457 Some(OptimizationResult {
458 original_workflow: workflow.clone(),
459 optimized_workflow,
460 estimated_cost: 0.0, estimated_duration: std::time::Duration::from_secs(30), improvements,
463 applied_optimizations: vec!["parallel_execution".to_string()],
464 })
465 }
466
467 fn name(&self) -> &str {
468 "parallel_execution"
469 }
470
471 fn priority(&self) -> i32 {
472 1
473 }
474}
475
476impl ParallelExecutionRule {
477 fn find_parallelizable_activities(strategy: &WorkflowStrategyOp, activities: &mut Vec<String>) {
478 match strategy {
479 WorkflowStrategyOp::Activity { activity_ref, .. } => {
480 activities.push(activity_ref.clone());
481 }
482 WorkflowStrategyOp::Seq { strategies } => {
483 for strategy in strategies {
484 Self::find_parallelizable_activities(strategy, activities);
485 }
486 }
487 _ => {
488 }
490 }
491 }
492
493 fn optimize_parallel_execution(strategy: &WorkflowStrategyOp, max_parallelism: usize) -> WorkflowStrategyOp {
494 match strategy {
495 WorkflowStrategyOp::Seq { strategies } => {
496 let mut optimized_strategies = Vec::new();
497
498 let mut parallel_group = Vec::new();
500 let mut sequential_part = Vec::new();
501
502 for strategy in strategies {
503 if Self::is_parallelizable(strategy) {
504 parallel_group.push(*strategy.clone());
505 } else {
506 if !parallel_group.is_empty() {
507 if parallel_group.len() > 1 {
508 optimized_strategies.push(WorkflowStrategyOp::Parallel {
509 branches: parallel_group.into_iter().map(Box::new).collect(),
510 completion_condition: crate::ir::CompletionCondition::All,
511 });
512 } else {
513 optimized_strategies.extend(parallel_group);
514 }
515 parallel_group = Vec::new();
516 }
517 sequential_part.push(*strategy.clone());
518 }
519 }
520
521 if !parallel_group.is_empty() {
523 if parallel_group.len() > 1 {
524 optimized_strategies.push(WorkflowStrategyOp::Parallel {
525 branches: parallel_group.into_iter().map(Box::new).collect(),
526 completion_condition: crate::ir::CompletionCondition::All,
527 });
528 } else {
529 optimized_strategies.extend(parallel_group);
530 }
531 }
532
533 WorkflowStrategyOp::Seq {
534 strategies: optimized_strategies.into_iter().map(Box::new).collect(),
535 }
536 }
537 _ => strategy.clone(),
538 }
539 }
540
541 fn is_parallelizable(strategy: &WorkflowStrategyOp) -> bool {
542 matches!(strategy, WorkflowStrategyOp::Activity { .. })
543 }
544}
545
546pub struct CostBasedOptimizationRule {
548 cost_threshold: f64,
549}
550
551impl CostBasedOptimizationRule {
552 pub fn new(cost_threshold: f64) -> Self {
553 Self { cost_threshold }
554 }
555}
556
557#[async_trait::async_trait]
558impl OptimizationRule for CostBasedOptimizationRule {
559 async fn apply(&self, workflow: &WorkflowIR, context: &OptimizationContext) -> Option<OptimizationResult> {
560 Some(OptimizationResult {
564 original_workflow: workflow.clone(),
565 optimized_workflow: workflow.clone(), estimated_cost: 0.0,
567 estimated_duration: std::time::Duration::from_secs(30),
568 improvements: vec![],
569 applied_optimizations: vec!["cost_based".to_string()],
570 })
571 }
572
573 fn name(&self) -> &str {
574 "cost_based"
575 }
576
577 fn priority(&self) -> i32 {
578 2
579 }
580}
581
582#[derive(Debug, thiserror::Error)]
583pub enum OptimizationError {
584 #[error("No optimization possible")]
585 NoOptimizationPossible,
586 #[error("Resource constraints violated")]
587 ResourceConstraintsViolated,
588 #[error("Invalid optimization strategy")]
589 InvalidStrategy,
590}