sklears_compose/execution/
strategies.rs1use std::collections::HashMap;
7use std::future::Future;
8use std::pin::Pin;
9use std::time::{Duration, Instant, SystemTime};
10
11use sklears_core::error::Result as SklResult;
12
13use super::config::{
14 CachingStrategy, LoadBalancingAlgorithm, LoadBalancingConfig, OptimizationLevel,
15 ParameterValue, PerformanceTuning, PrefetchingStrategy, StrategyConfig,
16 StrategyResourceAllocation,
17};
18use super::resources::ResourceUtilization;
19use super::tasks::{ExecutionTask, ResourceUsage, TaskResult, TaskStatus, TaskType};
20
21pub trait ExecutionStrategy: Send + Sync {
23 fn name(&self) -> &str;
25
26 fn description(&self) -> &str;
28
29 fn execute_task(
31 &self,
32 task: ExecutionTask,
33 ) -> Pin<Box<dyn Future<Output = SklResult<TaskResult>> + Send + '_>>;
34
35 fn execute_batch(
37 &self,
38 tasks: Vec<ExecutionTask>,
39 ) -> Pin<Box<dyn Future<Output = SklResult<Vec<TaskResult>>> + Send + '_>>;
40
41 fn get_config(&self) -> StrategyConfig;
43
44 fn update_config(&mut self, config: StrategyConfig) -> SklResult<()>;
46
47 fn get_metrics(&self) -> StrategyMetrics;
49
50 fn health_check(&self) -> StrategyHealth;
52}
53
54#[derive(Debug, Clone)]
56pub struct StrategyMetrics {
57 pub tasks_executed: u64,
59 pub total_execution_time: Duration,
61 pub average_execution_time: Duration,
63 pub success_rate: f64,
65 pub error_count: u64,
67 pub resource_utilization: ResourceUtilization,
69 pub last_updated: SystemTime,
71}
72
73impl Default for StrategyMetrics {
74 fn default() -> Self {
75 Self {
76 tasks_executed: 0,
77 total_execution_time: Duration::ZERO,
78 average_execution_time: Duration::ZERO,
79 success_rate: 1.0,
80 error_count: 0,
81 resource_utilization: ResourceUtilization::default(),
82 last_updated: SystemTime::now(),
83 }
84 }
85}
86
87#[derive(Debug, Clone, PartialEq)]
89pub enum StrategyHealth {
90 Healthy,
92 Degraded { reason: String },
94 Unhealthy { reason: String },
96 Unknown,
98}
99
100pub struct SequentialStrategy {
102 config: StrategyConfig,
103 metrics: StrategyMetrics,
104 start_time: Instant,
105}
106
107impl SequentialStrategy {
108 #[must_use]
109 pub fn new() -> Self {
110 let config = StrategyConfig {
111 name: "sequential".to_string(),
112 parameters: HashMap::new(),
113 resource_allocation: StrategyResourceAllocation {
114 cpu_cores: 1.0,
115 memory_bytes: 100_000_000, priority: 1,
117 },
118 performance_tuning: PerformanceTuning {
119 optimization_level: OptimizationLevel::Low,
120 prefetching: PrefetchingStrategy::None,
121 caching: CachingStrategy::None,
122 load_balancing: LoadBalancingConfig {
123 enabled: false,
124 algorithm: LoadBalancingAlgorithm::RoundRobin,
125 rebalance_threshold: 0.8,
126 min_load_difference: 0.1,
127 },
128 },
129 };
130
131 Self {
132 config,
133 metrics: StrategyMetrics::default(),
134 start_time: Instant::now(),
135 }
136 }
137
138 async fn execute_task_impl(&self, task: ExecutionTask) -> SklResult<TaskResult> {
139 let start_time = Instant::now();
140
141 match task.task_type {
143 TaskType::Computation => {
144 tokio::time::sleep(Duration::from_millis(10)).await;
146 Ok(TaskResult {
147 task_id: task.id,
148 status: TaskStatus::Completed,
149 execution_time: start_time.elapsed(),
150 resource_usage: ResourceUsage::default(),
151 output: None,
152 error: None,
153 })
154 }
155 TaskType::IoOperation => {
156 tokio::time::sleep(Duration::from_millis(50)).await;
158 Ok(TaskResult {
159 task_id: task.id,
160 status: TaskStatus::Completed,
161 execution_time: start_time.elapsed(),
162 resource_usage: ResourceUsage::default(),
163 output: None,
164 error: None,
165 })
166 }
167 TaskType::NetworkOperation => {
168 tokio::time::sleep(Duration::from_millis(100)).await;
170 Ok(TaskResult {
171 task_id: task.id,
172 status: TaskStatus::Completed,
173 execution_time: start_time.elapsed(),
174 resource_usage: ResourceUsage::default(),
175 output: None,
176 error: None,
177 })
178 }
179 TaskType::Custom => {
180 tokio::time::sleep(Duration::from_millis(25)).await;
182 Ok(TaskResult {
183 task_id: task.id,
184 status: TaskStatus::Completed,
185 execution_time: start_time.elapsed(),
186 resource_usage: ResourceUsage::default(),
187 output: None,
188 error: None,
189 })
190 }
191 }
192 }
193}
194
195impl ExecutionStrategy for SequentialStrategy {
196 fn name(&self) -> &'static str {
197 "sequential"
198 }
199
200 fn description(&self) -> &'static str {
201 "Sequential execution strategy - executes tasks one by one"
202 }
203
204 fn execute_task(
205 &self,
206 task: ExecutionTask,
207 ) -> Pin<Box<dyn Future<Output = SklResult<TaskResult>> + Send + '_>> {
208 Box::pin(self.execute_task_impl(task))
209 }
210
211 fn execute_batch(
212 &self,
213 tasks: Vec<ExecutionTask>,
214 ) -> Pin<Box<dyn Future<Output = SklResult<Vec<TaskResult>>> + Send + '_>> {
215 Box::pin(async move {
216 let mut results = Vec::with_capacity(tasks.len());
217
218 for task in tasks {
219 let result = self.execute_task_impl(task).await?;
220 results.push(result);
221 }
222
223 Ok(results)
224 })
225 }
226
227 fn get_config(&self) -> StrategyConfig {
228 self.config.clone()
229 }
230
231 fn update_config(&mut self, config: StrategyConfig) -> SklResult<()> {
232 self.config = config;
233 Ok(())
234 }
235
236 fn get_metrics(&self) -> StrategyMetrics {
237 self.metrics.clone()
238 }
239
240 fn health_check(&self) -> StrategyHealth {
241 let uptime = self.start_time.elapsed();
242
243 if uptime > Duration::from_secs(24 * 3600) && self.metrics.success_rate > 0.95 {
244 StrategyHealth::Healthy
245 } else if self.metrics.success_rate > 0.8 {
246 StrategyHealth::Degraded {
247 reason: "Lower than optimal success rate".to_string(),
248 }
249 } else {
250 StrategyHealth::Unhealthy {
251 reason: "Poor success rate".to_string(),
252 }
253 }
254 }
255}
256
257pub struct ParallelStrategy {
259 config: StrategyConfig,
260 metrics: StrategyMetrics,
261 max_concurrent_tasks: usize,
262 start_time: Instant,
263}
264
265impl ParallelStrategy {
266 #[must_use]
267 pub fn new(max_concurrent_tasks: usize) -> Self {
268 let config = StrategyConfig {
269 name: "parallel".to_string(),
270 parameters: HashMap::from([(
271 "max_concurrent_tasks".to_string(),
272 ParameterValue::Integer(max_concurrent_tasks as i64),
273 )]),
274 resource_allocation: StrategyResourceAllocation {
275 cpu_cores: max_concurrent_tasks as f64,
276 memory_bytes: max_concurrent_tasks as u64 * 100_000_000, priority: 2,
278 },
279 performance_tuning: PerformanceTuning {
280 optimization_level: OptimizationLevel::High,
281 prefetching: PrefetchingStrategy::Adaptive,
282 caching: CachingStrategy::LRU,
283 load_balancing: LoadBalancingConfig {
284 enabled: true,
285 algorithm: LoadBalancingAlgorithm::LeastLoaded,
286 rebalance_threshold: 0.7,
287 min_load_difference: 0.2,
288 },
289 },
290 };
291
292 Self {
293 config,
294 metrics: StrategyMetrics::default(),
295 max_concurrent_tasks,
296 start_time: Instant::now(),
297 }
298 }
299
300 async fn execute_task_impl(&self, task: ExecutionTask) -> SklResult<TaskResult> {
301 let start_time = Instant::now();
302
303 match task.task_type {
304 TaskType::Computation => {
305 tokio::time::sleep(Duration::from_millis(5)).await; }
307 TaskType::IoOperation => {
308 tokio::time::sleep(Duration::from_millis(25)).await;
309 }
310 TaskType::NetworkOperation => {
311 tokio::time::sleep(Duration::from_millis(50)).await;
312 }
313 TaskType::Custom => {
314 tokio::time::sleep(Duration::from_millis(12)).await;
315 }
316 }
317
318 Ok(TaskResult {
319 task_id: task.id,
320 status: TaskStatus::Completed,
321 execution_time: start_time.elapsed(),
322 resource_usage: ResourceUsage::default(),
323 output: None,
324 error: None,
325 })
326 }
327}
328
329impl ExecutionStrategy for ParallelStrategy {
330 fn name(&self) -> &'static str {
331 "parallel"
332 }
333
334 fn description(&self) -> &'static str {
335 "Parallel execution strategy - executes tasks concurrently"
336 }
337
338 fn execute_task(
339 &self,
340 task: ExecutionTask,
341 ) -> Pin<Box<dyn Future<Output = SklResult<TaskResult>> + Send + '_>> {
342 Box::pin(self.execute_task_impl(task))
343 }
344
345 fn execute_batch(
346 &self,
347 tasks: Vec<ExecutionTask>,
348 ) -> Pin<Box<dyn Future<Output = SklResult<Vec<TaskResult>>> + Send + '_>> {
349 Box::pin(async move {
350 use futures::stream::{self, StreamExt};
351
352 let results: Result<Vec<_>, _> = stream::iter(tasks)
353 .map(|task| self.execute_task_impl(task))
354 .buffer_unordered(self.max_concurrent_tasks)
355 .collect::<Vec<_>>()
356 .await
357 .into_iter()
358 .collect();
359
360 results
361 })
362 }
363
364 fn get_config(&self) -> StrategyConfig {
365 self.config.clone()
366 }
367
368 fn update_config(&mut self, config: StrategyConfig) -> SklResult<()> {
369 if let Some(ParameterValue::Integer(max_tasks)) =
371 config.parameters.get("max_concurrent_tasks")
372 {
373 self.max_concurrent_tasks = *max_tasks as usize;
374 }
375
376 self.config = config;
377 Ok(())
378 }
379
380 fn get_metrics(&self) -> StrategyMetrics {
381 self.metrics.clone()
382 }
383
384 fn health_check(&self) -> StrategyHealth {
385 if self.metrics.resource_utilization.cpu_percent > 90.0 {
386 StrategyHealth::Degraded {
387 reason: "High CPU utilization".to_string(),
388 }
389 } else if self.metrics.success_rate > 0.95 {
390 StrategyHealth::Healthy
391 } else {
392 StrategyHealth::Unhealthy {
393 reason: "Poor success rate".to_string(),
394 }
395 }
396 }
397}
398
399impl Default for SequentialStrategy {
400 fn default() -> Self {
401 Self::new()
402 }
403}
404
405impl Default for ParallelStrategy {
406 fn default() -> Self {
407 Self::new(4) }
409}