sklears_compose/execution/
strategies.rs

1//! Execution strategies for the composable execution engine
2//!
3//! This module provides pluggable execution strategies that can be configured
4//! and used by the execution engine for different workload types.
5
6use std::collections::HashMap;
7use std::future::Future;
8use std::pin::Pin;
9use std::time::{Duration, Instant, SystemTime};
10
11use sklears_core::error::Result as SklResult;
12
13use super::config::{
14    CachingStrategy, LoadBalancingAlgorithm, LoadBalancingConfig, OptimizationLevel,
15    ParameterValue, PerformanceTuning, PrefetchingStrategy, StrategyConfig,
16    StrategyResourceAllocation,
17};
18use super::resources::ResourceUtilization;
19use super::tasks::{ExecutionTask, ResourceUsage, TaskResult, TaskStatus, TaskType};
20
21/// Execution strategy trait for pluggable execution behaviors
22pub trait ExecutionStrategy: Send + Sync {
23    /// Strategy name
24    fn name(&self) -> &str;
25
26    /// Strategy description
27    fn description(&self) -> &str;
28
29    /// Execute a single task
30    fn execute_task(
31        &self,
32        task: ExecutionTask,
33    ) -> Pin<Box<dyn Future<Output = SklResult<TaskResult>> + Send + '_>>;
34
35    /// Execute multiple tasks
36    fn execute_batch(
37        &self,
38        tasks: Vec<ExecutionTask>,
39    ) -> Pin<Box<dyn Future<Output = SklResult<Vec<TaskResult>>> + Send + '_>>;
40
41    /// Get strategy configuration
42    fn get_config(&self) -> StrategyConfig;
43
44    /// Update strategy configuration
45    fn update_config(&mut self, config: StrategyConfig) -> SklResult<()>;
46
47    /// Get strategy metrics
48    fn get_metrics(&self) -> StrategyMetrics;
49
50    /// Check strategy health
51    fn health_check(&self) -> StrategyHealth;
52}
53
54/// Strategy metrics for monitoring and optimization
55#[derive(Debug, Clone)]
56pub struct StrategyMetrics {
57    /// Total tasks executed
58    pub tasks_executed: u64,
59    /// Total execution time
60    pub total_execution_time: Duration,
61    /// Average task execution time
62    pub average_execution_time: Duration,
63    /// Success rate (0.0-1.0)
64    pub success_rate: f64,
65    /// Error count
66    pub error_count: u64,
67    /// Resource utilization
68    pub resource_utilization: ResourceUtilization,
69    /// Last updated timestamp
70    pub last_updated: SystemTime,
71}
72
73impl Default for StrategyMetrics {
74    fn default() -> Self {
75        Self {
76            tasks_executed: 0,
77            total_execution_time: Duration::ZERO,
78            average_execution_time: Duration::ZERO,
79            success_rate: 1.0,
80            error_count: 0,
81            resource_utilization: ResourceUtilization::default(),
82            last_updated: SystemTime::now(),
83        }
84    }
85}
86
87/// Strategy health status
88#[derive(Debug, Clone, PartialEq)]
89pub enum StrategyHealth {
90    /// Strategy is healthy and operating normally
91    Healthy,
92    /// Strategy is degraded but still operational
93    Degraded { reason: String },
94    /// Strategy is unhealthy and should not be used
95    Unhealthy { reason: String },
96    /// Strategy health is unknown
97    Unknown,
98}
99
100/// Sequential execution strategy - executes tasks one by one
101pub struct SequentialStrategy {
102    config: StrategyConfig,
103    metrics: StrategyMetrics,
104    start_time: Instant,
105}
106
107impl SequentialStrategy {
108    #[must_use]
109    pub fn new() -> Self {
110        let config = StrategyConfig {
111            name: "sequential".to_string(),
112            parameters: HashMap::new(),
113            resource_allocation: StrategyResourceAllocation {
114                cpu_cores: 1.0,
115                memory_bytes: 100_000_000, // 100MB
116                priority: 1,
117            },
118            performance_tuning: PerformanceTuning {
119                optimization_level: OptimizationLevel::Low,
120                prefetching: PrefetchingStrategy::None,
121                caching: CachingStrategy::None,
122                load_balancing: LoadBalancingConfig {
123                    enabled: false,
124                    algorithm: LoadBalancingAlgorithm::RoundRobin,
125                    rebalance_threshold: 0.8,
126                    min_load_difference: 0.1,
127                },
128            },
129        };
130
131        Self {
132            config,
133            metrics: StrategyMetrics::default(),
134            start_time: Instant::now(),
135        }
136    }
137
138    async fn execute_task_impl(&self, task: ExecutionTask) -> SklResult<TaskResult> {
139        let start_time = Instant::now();
140
141        // Simple task execution simulation
142        match task.task_type {
143            TaskType::Computation => {
144                // Simulate computation work
145                tokio::time::sleep(Duration::from_millis(10)).await;
146                Ok(TaskResult {
147                    task_id: task.id,
148                    status: TaskStatus::Completed,
149                    execution_time: start_time.elapsed(),
150                    resource_usage: ResourceUsage::default(),
151                    output: None,
152                    error: None,
153                })
154            }
155            TaskType::IoOperation => {
156                // Simulate I/O work
157                tokio::time::sleep(Duration::from_millis(50)).await;
158                Ok(TaskResult {
159                    task_id: task.id,
160                    status: TaskStatus::Completed,
161                    execution_time: start_time.elapsed(),
162                    resource_usage: ResourceUsage::default(),
163                    output: None,
164                    error: None,
165                })
166            }
167            TaskType::NetworkOperation => {
168                // Simulate network work
169                tokio::time::sleep(Duration::from_millis(100)).await;
170                Ok(TaskResult {
171                    task_id: task.id,
172                    status: TaskStatus::Completed,
173                    execution_time: start_time.elapsed(),
174                    resource_usage: ResourceUsage::default(),
175                    output: None,
176                    error: None,
177                })
178            }
179            TaskType::Custom => {
180                // Custom task handling
181                tokio::time::sleep(Duration::from_millis(25)).await;
182                Ok(TaskResult {
183                    task_id: task.id,
184                    status: TaskStatus::Completed,
185                    execution_time: start_time.elapsed(),
186                    resource_usage: ResourceUsage::default(),
187                    output: None,
188                    error: None,
189                })
190            }
191        }
192    }
193}
194
195impl ExecutionStrategy for SequentialStrategy {
196    fn name(&self) -> &'static str {
197        "sequential"
198    }
199
200    fn description(&self) -> &'static str {
201        "Sequential execution strategy - executes tasks one by one"
202    }
203
204    fn execute_task(
205        &self,
206        task: ExecutionTask,
207    ) -> Pin<Box<dyn Future<Output = SklResult<TaskResult>> + Send + '_>> {
208        Box::pin(self.execute_task_impl(task))
209    }
210
211    fn execute_batch(
212        &self,
213        tasks: Vec<ExecutionTask>,
214    ) -> Pin<Box<dyn Future<Output = SklResult<Vec<TaskResult>>> + Send + '_>> {
215        Box::pin(async move {
216            let mut results = Vec::with_capacity(tasks.len());
217
218            for task in tasks {
219                let result = self.execute_task_impl(task).await?;
220                results.push(result);
221            }
222
223            Ok(results)
224        })
225    }
226
227    fn get_config(&self) -> StrategyConfig {
228        self.config.clone()
229    }
230
231    fn update_config(&mut self, config: StrategyConfig) -> SklResult<()> {
232        self.config = config;
233        Ok(())
234    }
235
236    fn get_metrics(&self) -> StrategyMetrics {
237        self.metrics.clone()
238    }
239
240    fn health_check(&self) -> StrategyHealth {
241        let uptime = self.start_time.elapsed();
242
243        if uptime > Duration::from_secs(24 * 3600) && self.metrics.success_rate > 0.95 {
244            StrategyHealth::Healthy
245        } else if self.metrics.success_rate > 0.8 {
246            StrategyHealth::Degraded {
247                reason: "Lower than optimal success rate".to_string(),
248            }
249        } else {
250            StrategyHealth::Unhealthy {
251                reason: "Poor success rate".to_string(),
252            }
253        }
254    }
255}
256
257/// Parallel execution strategy - executes tasks concurrently
258pub struct ParallelStrategy {
259    config: StrategyConfig,
260    metrics: StrategyMetrics,
261    max_concurrent_tasks: usize,
262    start_time: Instant,
263}
264
265impl ParallelStrategy {
266    #[must_use]
267    pub fn new(max_concurrent_tasks: usize) -> Self {
268        let config = StrategyConfig {
269            name: "parallel".to_string(),
270            parameters: HashMap::from([(
271                "max_concurrent_tasks".to_string(),
272                ParameterValue::Integer(max_concurrent_tasks as i64),
273            )]),
274            resource_allocation: StrategyResourceAllocation {
275                cpu_cores: max_concurrent_tasks as f64,
276                memory_bytes: max_concurrent_tasks as u64 * 100_000_000, // 100MB per task
277                priority: 2,
278            },
279            performance_tuning: PerformanceTuning {
280                optimization_level: OptimizationLevel::High,
281                prefetching: PrefetchingStrategy::Adaptive,
282                caching: CachingStrategy::LRU,
283                load_balancing: LoadBalancingConfig {
284                    enabled: true,
285                    algorithm: LoadBalancingAlgorithm::LeastLoaded,
286                    rebalance_threshold: 0.7,
287                    min_load_difference: 0.2,
288                },
289            },
290        };
291
292        Self {
293            config,
294            metrics: StrategyMetrics::default(),
295            max_concurrent_tasks,
296            start_time: Instant::now(),
297        }
298    }
299
300    async fn execute_task_impl(&self, task: ExecutionTask) -> SklResult<TaskResult> {
301        let start_time = Instant::now();
302
303        match task.task_type {
304            TaskType::Computation => {
305                tokio::time::sleep(Duration::from_millis(5)).await; // Faster with parallelism
306            }
307            TaskType::IoOperation => {
308                tokio::time::sleep(Duration::from_millis(25)).await;
309            }
310            TaskType::NetworkOperation => {
311                tokio::time::sleep(Duration::from_millis(50)).await;
312            }
313            TaskType::Custom => {
314                tokio::time::sleep(Duration::from_millis(12)).await;
315            }
316        }
317
318        Ok(TaskResult {
319            task_id: task.id,
320            status: TaskStatus::Completed,
321            execution_time: start_time.elapsed(),
322            resource_usage: ResourceUsage::default(),
323            output: None,
324            error: None,
325        })
326    }
327}
328
329impl ExecutionStrategy for ParallelStrategy {
330    fn name(&self) -> &'static str {
331        "parallel"
332    }
333
334    fn description(&self) -> &'static str {
335        "Parallel execution strategy - executes tasks concurrently"
336    }
337
338    fn execute_task(
339        &self,
340        task: ExecutionTask,
341    ) -> Pin<Box<dyn Future<Output = SklResult<TaskResult>> + Send + '_>> {
342        Box::pin(self.execute_task_impl(task))
343    }
344
345    fn execute_batch(
346        &self,
347        tasks: Vec<ExecutionTask>,
348    ) -> Pin<Box<dyn Future<Output = SklResult<Vec<TaskResult>>> + Send + '_>> {
349        Box::pin(async move {
350            use futures::stream::{self, StreamExt};
351
352            let results: Result<Vec<_>, _> = stream::iter(tasks)
353                .map(|task| self.execute_task_impl(task))
354                .buffer_unordered(self.max_concurrent_tasks)
355                .collect::<Vec<_>>()
356                .await
357                .into_iter()
358                .collect();
359
360            results
361        })
362    }
363
364    fn get_config(&self) -> StrategyConfig {
365        self.config.clone()
366    }
367
368    fn update_config(&mut self, config: StrategyConfig) -> SklResult<()> {
369        // Update max_concurrent_tasks if provided in parameters
370        if let Some(ParameterValue::Integer(max_tasks)) =
371            config.parameters.get("max_concurrent_tasks")
372        {
373            self.max_concurrent_tasks = *max_tasks as usize;
374        }
375
376        self.config = config;
377        Ok(())
378    }
379
380    fn get_metrics(&self) -> StrategyMetrics {
381        self.metrics.clone()
382    }
383
384    fn health_check(&self) -> StrategyHealth {
385        if self.metrics.resource_utilization.cpu_percent > 90.0 {
386            StrategyHealth::Degraded {
387                reason: "High CPU utilization".to_string(),
388            }
389        } else if self.metrics.success_rate > 0.95 {
390            StrategyHealth::Healthy
391        } else {
392            StrategyHealth::Unhealthy {
393                reason: "Poor success rate".to_string(),
394            }
395        }
396    }
397}
398
399impl Default for SequentialStrategy {
400    fn default() -> Self {
401        Self::new()
402    }
403}
404
405impl Default for ParallelStrategy {
406    fn default() -> Self {
407        Self::new(4) // Default to 4 concurrent tasks
408    }
409}