kotoba_workflow/
optimization.rs

1//! Workflow Optimization - Phase 3
2//!
3//! ワークフロー実行の最適化を提供します。
4//! コストベース最適化、並列実行最適化、クエリ最適化などをサポート。
5
6use std::collections::{HashMap, HashSet, VecDeque};
7use std::sync::Arc;
8use serde::{Deserialize, Serialize};
9
10use crate::ir::{WorkflowIR, WorkflowStrategyOp, ActivityIR};
11
12/// 最適化戦略
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub enum OptimizationStrategy {
15    /// コストベース最適化
16    CostBased {
17        cost_model: CostModel,
18        budget_limit: Option<f64>,
19    },
20    /// パフォーマンスベース最適化
21    PerformanceBased {
22        target_throughput: f64,
23        target_latency: std::time::Duration,
24    },
25    /// リソースベース最適化
26    ResourceBased {
27        max_parallelism: usize,
28        resource_limits: HashMap<String, f64>,
29    },
30    /// 品質ベース最適化
31    QualityBased {
32        reliability_target: f64,
33        quality_metrics: HashMap<String, f64>,
34    },
35}
36
37/// コストモデル
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct CostModel {
40    /// CPUコスト係数
41    pub cpu_cost_factor: f64,
42    /// メモリコスト係数
43    pub memory_cost_factor: f64,
44    /// IOコスト係数
45    pub io_cost_factor: f64,
46    /// ネットワークコスト係数
47    pub network_cost_factor: f64,
48    /// Activityごとのコスト
49    pub activity_costs: HashMap<String, ActivityCost>,
50}
51
52/// Activityコスト情報
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct ActivityCost {
55    pub base_cost: f64,
56    pub cpu_usage: f64,
57    pub memory_usage: f64,
58    pub io_operations: f64,
59    pub network_usage: f64,
60    pub estimated_duration: std::time::Duration,
61}
62
63/// 最適化結果
64#[derive(Debug, Clone, Serialize, Deserialize)]
65pub struct OptimizationResult {
66    pub original_workflow: WorkflowIR,
67    pub optimized_workflow: WorkflowIR,
68    pub estimated_cost: f64,
69    pub estimated_duration: std::time::Duration,
70    pub improvements: Vec<OptimizationImprovement>,
71    pub applied_optimizations: Vec<String>,
72}
73
74/// 最適化改善点
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct OptimizationImprovement {
77    pub improvement_type: String,
78    pub description: String,
79    pub cost_savings: f64,
80    pub duration_reduction: std::time::Duration,
81    pub confidence: f64, // 0.0 to 1.0
82}
83
84/// 並列実行計画
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct ParallelExecutionPlan {
87    pub stages: Vec<ExecutionStage>,
88    pub dependencies: HashMap<String, Vec<String>>,
89    pub resource_requirements: HashMap<String, ResourceRequirement>,
90    pub estimated_parallelism: usize,
91}
92
93/// 実行ステージ
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct ExecutionStage {
96    pub stage_id: String,
97    pub activities: Vec<String>,
98    pub execution_mode: ExecutionMode,
99    pub estimated_duration: std::time::Duration,
100    pub resource_usage: ResourceUsage,
101}
102
103/// 実行モード
104#[derive(Debug, Clone, Serialize, Deserialize)]
105pub enum ExecutionMode {
106    Sequential,
107    Parallel,
108    Conditional,
109    Loop,
110}
111
112/// リソース要件
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct ResourceRequirement {
115    pub cpu_cores: usize,
116    pub memory_mb: usize,
117    pub io_bandwidth: f64,
118    pub network_bandwidth: f64,
119}
120
121/// リソース使用量
122#[derive(Debug, Clone, Serialize, Deserialize)]
123pub struct ResourceUsage {
124    pub cpu_usage: f64,
125    pub memory_usage: f64,
126    pub io_usage: f64,
127    pub network_usage: f64,
128}
129
130/// ワークフロー最適化エンジン
131pub struct WorkflowOptimizer {
132    cost_model: CostModel,
133    resource_manager: ResourceManager,
134    optimization_rules: Vec<Box<dyn OptimizationRule>>,
135}
136
137#[async_trait::async_trait]
138pub trait OptimizationRule: Send + Sync {
139    async fn apply(&self, workflow: &WorkflowIR, context: &OptimizationContext) -> Option<OptimizationResult>;
140    fn name(&self) -> &str;
141    fn priority(&self) -> i32; // 低いほど優先度が高い
142}
143
144/// 最適化コンテキスト
145#[derive(Debug, Clone)]
146pub struct OptimizationContext {
147    pub available_resources: HashMap<String, f64>,
148    pub historical_performance: HashMap<String, ActivityPerformance>,
149    pub target_constraints: OptimizationStrategy,
150    pub execution_history: Vec<WorkflowExecution>,
151}
152
153/// Activityパフォーマンス情報
154#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct ActivityPerformance {
156    pub activity_name: String,
157    pub avg_execution_time: std::time::Duration,
158    pub success_rate: f64,
159    pub resource_usage: ResourceUsage,
160    pub cost_per_execution: f64,
161    pub sample_size: usize,
162}
163
164/// ワークフロー実行履歴
165#[derive(Debug, Clone, Serialize, Deserialize)]
166pub struct WorkflowExecution {
167    pub workflow_id: String,
168    pub execution_time: std::time::Duration,
169    pub cost: f64,
170    pub success: bool,
171    pub resource_usage: ResourceUsage,
172}
173
174/// リソースマネージャー
175pub struct ResourceManager {
176    available_resources: HashMap<String, f64>,
177    allocated_resources: HashMap<String, f64>,
178}
179
180impl ResourceManager {
181    pub fn new(available_resources: HashMap<String, f64>) -> Self {
182        Self {
183            available_resources,
184            allocated_resources: HashMap::new(),
185        }
186    }
187
188    /// リソースが利用可能かチェック
189    pub fn check_resource_availability(&self, requirements: &ResourceRequirement) -> bool {
190        let available_cpu = self.available_resources.get("cpu").unwrap_or(&0.0) - self.allocated_resources.get("cpu").unwrap_or(&0.0);
191        let available_memory = self.available_resources.get("memory").unwrap_or(&0.0) - self.allocated_resources.get("memory").unwrap_or(&0.0);
192
193        available_cpu >= requirements.cpu_cores as f64 && available_memory >= requirements.memory_mb as f64
194    }
195
196    /// リソースを割り当て
197    pub fn allocate_resources(&mut self, requirements: &ResourceRequirement) -> bool {
198        if !self.check_resource_availability(requirements) {
199            return false;
200        }
201
202        *self.allocated_resources.entry("cpu".to_string()).or_insert(0.0) += requirements.cpu_cores as f64;
203        *self.allocated_resources.entry("memory".to_string()).or_insert(0.0) += requirements.memory_mb as f64;
204
205        true
206    }
207
208    /// リソースを解放
209    pub fn release_resources(&mut self, requirements: &ResourceRequirement) {
210        if let Some(cpu) = self.allocated_resources.get_mut("cpu") {
211            *cpu = (*cpu - requirements.cpu_cores as f64).max(0.0);
212        }
213        if let Some(memory) = self.allocated_resources.get_mut("memory") {
214            *memory = (*memory - requirements.memory_mb as f64).max(0.0);
215        }
216    }
217}
218
219impl WorkflowOptimizer {
220    pub fn new(cost_model: CostModel, resource_manager: ResourceManager) -> Self {
221        Self {
222            cost_model,
223            resource_manager,
224            optimization_rules: Vec::new(),
225        }
226    }
227
228    /// 最適化ルールを追加
229    pub fn add_rule(&mut self, rule: Box<dyn OptimizationRule>) {
230        self.optimization_rules.push(rule);
231        self.optimization_rules.sort_by_key(|r| r.priority());
232    }
233
234    /// ワークフローを最適化
235    pub async fn optimize_workflow(
236        &self,
237        workflow: &WorkflowIR,
238        context: &OptimizationContext,
239    ) -> Result<OptimizationResult, OptimizationError> {
240        let mut best_result = None;
241        let mut best_score = f64::INFINITY;
242
243        // 各最適化ルールを適用
244        for rule in &self.optimization_rules {
245            if let Some(result) = rule.apply(workflow, context).await {
246                let score = self.calculate_optimization_score(&result);
247                if score < best_score {
248                    best_score = score;
249                    best_result = Some(result);
250                }
251            }
252        }
253
254        best_result.ok_or(OptimizationError::NoOptimizationPossible)
255    }
256
257    /// 並列実行計画を生成
258    pub async fn generate_parallel_plan(
259        &self,
260        workflow: &WorkflowIR,
261        max_parallelism: usize,
262    ) -> Result<ParallelExecutionPlan, OptimizationError> {
263        let mut stages = Vec::new();
264        let mut dependencies = HashMap::new();
265        let mut processed = HashSet::new();
266        let mut queue = VecDeque::new();
267
268        // 依存関係のないActivityから開始
269        queue.push_back(workflow.strategy.clone());
270
271        while !queue.is_empty() {
272            let mut current_stage_activities = Vec::new();
273            let mut current_dependencies: HashMap<String, Vec<String>> = HashMap::new();
274
275            // 現在のステージで実行可能なActivityを収集
276            let batch_size = std::cmp::min(queue.len(), max_parallelism);
277            for _ in 0..batch_size {
278                if let Some(strategy) = queue.pop_front() {
279                    match strategy {
280                        WorkflowStrategyOp::Activity { activity_ref, .. } => {
281                            if !processed.contains(&activity_ref) {
282                                current_stage_activities.push(activity_ref.clone());
283                                processed.insert(activity_ref);
284                            }
285                        }
286                        WorkflowStrategyOp::Seq { strategies } => {
287                            for strategy in strategies {
288                                queue.push_back(*strategy);
289                            }
290                        }
291                        WorkflowStrategyOp::Parallel { branches, .. } => {
292                            for branch in branches {
293                                queue.push_back(*branch);
294                            }
295                        }
296                        _ => {
297                            // その他の戦略は後で処理
298                            queue.push_back(strategy);
299                        }
300                    }
301                }
302            }
303
304            if !current_stage_activities.is_empty() {
305                let stage = ExecutionStage {
306                    stage_id: format!("stage_{}", stages.len()),
307                    activities: current_stage_activities,
308                    execution_mode: ExecutionMode::Parallel,
309                    estimated_duration: std::time::Duration::from_secs(1), // TODO: 実際の推定時間を計算
310                    resource_usage: ResourceUsage {
311                        cpu_usage: 1.0,
312                        memory_usage: 100.0,
313                        io_usage: 0.0,
314                        network_usage: 0.0,
315                    },
316                };
317                stages.push(stage);
318            }
319        }
320
321        Ok(ParallelExecutionPlan {
322            stages,
323            dependencies,
324            resource_requirements: HashMap::new(), // TODO: リソース要件を計算
325            estimated_parallelism: max_parallelism,
326        })
327    }
328
329    /// コストを見積もり
330    pub fn estimate_cost(&self, workflow: &WorkflowIR) -> f64 {
331        let mut total_cost = 0.0;
332
333        self.traverse_workflow(&workflow.strategy, &mut |activity_ref| {
334            if let Some(activity_cost) = self.cost_model.activity_costs.get(activity_ref) {
335                total_cost += activity_cost.base_cost
336                    + activity_cost.cpu_usage * self.cost_model.cpu_cost_factor
337                    + activity_cost.memory_usage * self.cost_model.memory_cost_factor
338                    + activity_cost.io_operations * self.cost_model.io_cost_factor
339                    + activity_cost.network_usage * self.cost_model.network_cost_factor;
340            }
341        });
342
343        total_cost
344    }
345
346    /// 実行時間を推定
347    pub fn estimate_duration(&self, workflow: &WorkflowIR) -> std::time::Duration {
348        let mut total_duration = std::time::Duration::from_secs(0);
349
350        self.traverse_workflow(&workflow.strategy, &mut |activity_ref| {
351            if let Some(activity_cost) = self.cost_model.activity_costs.get(activity_ref) {
352                total_duration += activity_cost.estimated_duration;
353            }
354        });
355
356        total_duration
357    }
358
359    /// ワークフローをトラバースしてActivityを処理
360    fn traverse_workflow<F>(&self, strategy: &WorkflowStrategyOp, processor: &mut F)
361    where
362        F: FnMut(&str),
363    {
364        match strategy {
365            WorkflowStrategyOp::Activity { activity_ref, .. } => {
366                processor(activity_ref);
367            }
368            WorkflowStrategyOp::Seq { strategies } => {
369                for strategy in strategies {
370                    self.traverse_workflow(strategy, processor);
371                }
372            }
373            WorkflowStrategyOp::Parallel { branches, .. } => {
374                for branch in branches {
375                    self.traverse_workflow(branch, processor);
376                }
377            }
378            WorkflowStrategyOp::Decision { conditions, default_branch } => {
379                for branch in conditions {
380                    self.traverse_workflow(&branch.branch, processor);
381                }
382                if let Some(default_branch) = default_branch {
383                    self.traverse_workflow(default_branch, processor);
384                }
385            }
386            WorkflowStrategyOp::Wait { .. } => {
387                // Waitはコストなし
388            }
389            WorkflowStrategyOp::Saga { main_flow, compensation } => {
390                self.traverse_workflow(main_flow, processor);
391                self.traverse_workflow(compensation, processor);
392            }
393            WorkflowStrategyOp::SubWorkflow { .. } => {
394                // TODO: サブワークフローのコストを計算
395            }
396            _ => {}
397        }
398    }
399
400    /// 最適化結果のスコアを計算(低いほど良い)
401    fn calculate_optimization_score(&self, result: &OptimizationResult) -> f64 {
402        // コスト削減と時間削減のバランスを考慮したスコア
403        let cost_weight = 0.6;
404        let time_weight = 0.4;
405
406        let cost_score = result.improvements.iter()
407            .map(|imp| imp.cost_savings * imp.confidence)
408            .sum::<f64>();
409
410        let time_score = result.improvements.iter()
411            .map(|imp| imp.duration_reduction.as_secs_f64() * imp.confidence)
412            .sum::<f64>();
413
414        -(cost_score * cost_weight + time_score * time_weight) // 負の値にして最大化問題を最小化問題に変換
415    }
416}
417
418/// 並列実行最適化ルール
419pub struct ParallelExecutionRule {
420    max_parallelism: usize,
421}
422
423impl ParallelExecutionRule {
424    pub fn new(max_parallelism: usize) -> Self {
425        Self { max_parallelism }
426    }
427}
428
429#[async_trait::async_trait]
430impl OptimizationRule for ParallelExecutionRule {
431    async fn apply(&self, workflow: &WorkflowIR, context: &OptimizationContext) -> Option<OptimizationResult> {
432        // 並列実行可能な部分を特定
433        let mut parallelizable_activities = Vec::new();
434
435        Self::find_parallelizable_activities(&workflow.strategy, &mut parallelizable_activities);
436
437        if parallelizable_activities.len() < 2 {
438            return None;
439        }
440
441        // 並列実行を最適化
442        let optimized_strategy = Self::optimize_parallel_execution(&workflow.strategy, self.max_parallelism);
443
444        let improvements = vec![
445            OptimizationImprovement {
446                improvement_type: "parallel_execution".to_string(),
447                description: format!("Parallelized {} activities", parallelizable_activities.len()),
448                cost_savings: 0.0, // TODO: コスト削減を計算
449                duration_reduction: std::time::Duration::from_secs(10), // TODO: 実際の時間削減を計算
450                confidence: 0.8,
451            }
452        ];
453
454        let mut optimized_workflow = workflow.clone();
455        optimized_workflow.strategy = optimized_strategy;
456
457        Some(OptimizationResult {
458            original_workflow: workflow.clone(),
459            optimized_workflow,
460            estimated_cost: 0.0, // TODO: コストを計算
461            estimated_duration: std::time::Duration::from_secs(30), // TODO: 時間を計算
462            improvements,
463            applied_optimizations: vec!["parallel_execution".to_string()],
464        })
465    }
466
467    fn name(&self) -> &str {
468        "parallel_execution"
469    }
470
471    fn priority(&self) -> i32 {
472        1
473    }
474}
475
476impl ParallelExecutionRule {
477    fn find_parallelizable_activities(strategy: &WorkflowStrategyOp, activities: &mut Vec<String>) {
478        match strategy {
479            WorkflowStrategyOp::Activity { activity_ref, .. } => {
480                activities.push(activity_ref.clone());
481            }
482            WorkflowStrategyOp::Seq { strategies } => {
483                for strategy in strategies {
484                    Self::find_parallelizable_activities(strategy, activities);
485                }
486            }
487            _ => {
488                // 他の戦略は並列化が難しい
489            }
490        }
491    }
492
493    fn optimize_parallel_execution(strategy: &WorkflowStrategyOp, max_parallelism: usize) -> WorkflowStrategyOp {
494        match strategy {
495            WorkflowStrategyOp::Seq { strategies } => {
496                let mut optimized_strategies = Vec::new();
497
498                // 並列実行可能なActivityをグループ化
499                let mut parallel_group = Vec::new();
500                let mut sequential_part = Vec::new();
501
502                for strategy in strategies {
503                    if Self::is_parallelizable(strategy) {
504                        parallel_group.push(*strategy.clone());
505                    } else {
506                        if !parallel_group.is_empty() {
507                            if parallel_group.len() > 1 {
508                                optimized_strategies.push(WorkflowStrategyOp::Parallel {
509                                    branches: parallel_group.into_iter().map(Box::new).collect(),
510                                    completion_condition: crate::ir::CompletionCondition::All,
511                                });
512                            } else {
513                                optimized_strategies.extend(parallel_group);
514                            }
515                            parallel_group = Vec::new();
516                        }
517                        sequential_part.push(*strategy.clone());
518                    }
519                }
520
521                // 残りの並列グループを処理
522                if !parallel_group.is_empty() {
523                    if parallel_group.len() > 1 {
524                        optimized_strategies.push(WorkflowStrategyOp::Parallel {
525                            branches: parallel_group.into_iter().map(Box::new).collect(),
526                            completion_condition: crate::ir::CompletionCondition::All,
527                        });
528                    } else {
529                        optimized_strategies.extend(parallel_group);
530                    }
531                }
532
533                WorkflowStrategyOp::Seq {
534                    strategies: optimized_strategies.into_iter().map(Box::new).collect(),
535                }
536            }
537            _ => strategy.clone(),
538        }
539    }
540
541    fn is_parallelizable(strategy: &WorkflowStrategyOp) -> bool {
542        matches!(strategy, WorkflowStrategyOp::Activity { .. })
543    }
544}
545
546/// コストベース最適化ルール
547pub struct CostBasedOptimizationRule {
548    cost_threshold: f64,
549}
550
551impl CostBasedOptimizationRule {
552    pub fn new(cost_threshold: f64) -> Self {
553        Self { cost_threshold }
554    }
555}
556
557#[async_trait::async_trait]
558impl OptimizationRule for CostBasedOptimizationRule {
559    async fn apply(&self, workflow: &WorkflowIR, context: &OptimizationContext) -> Option<OptimizationResult> {
560        // TODO: コストベースの最適化を実装
561        // 例: 高コストのActivityをより効率的な代替手段に置き換え
562
563        Some(OptimizationResult {
564            original_workflow: workflow.clone(),
565            optimized_workflow: workflow.clone(), // TODO: 最適化されたワークフロー
566            estimated_cost: 0.0,
567            estimated_duration: std::time::Duration::from_secs(30),
568            improvements: vec![],
569            applied_optimizations: vec!["cost_based".to_string()],
570        })
571    }
572
573    fn name(&self) -> &str {
574        "cost_based"
575    }
576
577    fn priority(&self) -> i32 {
578        2
579    }
580}
581
582#[derive(Debug, thiserror::Error)]
583pub enum OptimizationError {
584    #[error("No optimization possible")]
585    NoOptimizationPossible,
586    #[error("Resource constraints violated")]
587    ResourceConstraintsViolated,
588    #[error("Invalid optimization strategy")]
589    InvalidStrategy,
590}