sklears_compose/execution/
engine.rs1use sklears_core::{
6 error::{Result as SklResult, SklearsError},
7 traits::Estimator,
8};
9use std::any::Any;
10use std::collections::{HashMap, VecDeque};
11use std::sync::{Arc, Mutex, RwLock};
12use std::time::{Duration, Instant};
13
14use super::config::{ExecutionEngineConfig, ResourceConstraints, StrategyConfig};
15
16pub struct ComposableExecutionEngine {
18 config: ExecutionEngineConfig,
20 strategies: HashMap<String, Box<dyn ExecutionStrategy>>,
22 resource_manager: Arc<ResourceManager>,
24 scheduler: Box<dyn TaskScheduler>,
26 context: ExecutionContext,
28 metrics: Arc<Mutex<ExecutionMetrics>>,
30}
31
32#[derive(Debug, Clone)]
34pub struct ExecutionContext {
35 pub id: String,
37 pub phase: ExecutionPhase,
39 pub metadata: HashMap<String, String>,
41 pub resource_usage: ResourceUsage,
43 pub start_time: Instant,
45}
46
47#[derive(Debug, Clone, PartialEq)]
49pub enum ExecutionPhase {
50 Initializing,
52 Planning,
54 Executing,
56 Monitoring,
58 Completed,
60 Failed,
62}
63
64#[derive(Debug, Clone)]
66pub struct ResourceUsage {
67 pub cpu_usage: f64,
69 pub memory_usage: u64,
71 pub active_tasks: usize,
73 pub io_ops: u64,
75}
76
77pub trait ExecutionStrategy: Send + Sync {
79 fn name(&self) -> &str;
81
82 fn execute(
84 &self,
85 task: &dyn ExecutableTask,
86 context: &ExecutionContext,
87 ) -> SklResult<ExecutionResult>;
88
89 fn can_handle(&self, task: &dyn ExecutableTask) -> bool;
91
92 fn config(&self) -> &StrategyConfig;
94
95 fn clone_strategy(&self) -> Box<dyn ExecutionStrategy>;
97}
98
99pub trait ExecutableTask: Send + Sync {
101 fn id(&self) -> &str;
103
104 fn task_type(&self) -> &str;
106
107 fn execute(&self) -> SklResult<TaskResult>;
109
110 fn resource_estimate(&self) -> ResourceEstimate;
112
113 fn dependencies(&self) -> Vec<String>;
115}
116
117#[derive(Debug)]
119pub struct TaskResult {
120 pub task_id: String,
122 pub status: TaskStatus,
124 pub data: Option<Box<dyn Any + Send>>,
126 pub duration: Duration,
128 pub resource_usage: ResourceUsage,
130}
131
132#[derive(Debug, Clone, PartialEq)]
134pub enum TaskStatus {
135 Pending,
137 Running,
139 Completed,
141 Failed,
143 Cancelled,
145}
146
147#[derive(Debug, Clone)]
149pub struct ResourceEstimate {
150 pub cpu_cores: f64,
152 pub memory_bytes: u64,
154 pub execution_time: Duration,
156 pub io_operations: u64,
158}
159
160#[derive(Debug)]
162pub struct ExecutionResult {
163 pub strategy_name: String,
165 pub task_result: TaskResult,
167 pub metadata: HashMap<String, String>,
169}
170
171pub trait TaskScheduler: Send + Sync {
173 fn schedule_task(&mut self, task: Box<dyn ExecutableTask>) -> SklResult<()>;
175
176 fn next_task(&mut self) -> Option<Box<dyn ExecutableTask>>;
178
179 fn queue_size(&self) -> usize;
181
182 fn set_config(&mut self, config: SchedulerConfig);
184}
185
186#[derive(Debug, Clone)]
188pub struct SchedulerConfig {
189 pub algorithm: SchedulingAlgorithm,
191 pub priority_weights: HashMap<String, f64>,
193 pub resource_aware: bool,
195}
196
197#[derive(Debug, Clone)]
199pub enum SchedulingAlgorithm {
200 FIFO,
202 Priority,
204 ShortestJobFirst,
206 ResourceAware,
208 DeadlineAware,
210}
211
212pub struct ResourceManager {
214 allocations: Arc<RwLock<HashMap<String, ResourceAllocation>>>,
216 constraints: ResourceConstraints,
218 monitor: Arc<ResourceMonitor>,
220}
221
222#[derive(Debug, Clone)]
224pub struct ResourceAllocation {
225 pub consumer_id: String,
227 pub cpu_cores: f64,
229 pub memory_bytes: u64,
231 pub allocated_at: Instant,
233}
234
235pub struct ResourceMonitor {
237 usage_history: Arc<RwLock<VecDeque<ResourceSnapshot>>>,
239 interval: Duration,
241}
242
243#[derive(Debug, Clone)]
245pub struct ResourceSnapshot {
246 pub timestamp: Instant,
248 pub cpu_usage: f64,
250 pub memory_usage: u64,
252 pub active_tasks: usize,
254 pub io_rate: f64,
256}
257
258#[derive(Debug, Default, Clone)]
260pub struct ExecutionMetrics {
261 pub tasks_executed: u64,
263 pub total_execution_time: Duration,
265 pub average_task_duration: Duration,
267 pub failed_tasks: u64,
269 pub resource_utilization: ResourceUtilization,
271}
272
273#[derive(Debug, Default, Clone)]
275pub struct ResourceUtilization {
276 pub avg_cpu_usage: f64,
278 pub peak_cpu_usage: f64,
280 pub avg_memory_usage: u64,
282 pub peak_memory_usage: u64,
284}
285
286impl ComposableExecutionEngine {
287 pub fn new(config: ExecutionEngineConfig) -> SklResult<Self> {
289 let resource_manager = Arc::new(ResourceManager::new(config.resource_constraints.clone()));
290 let scheduler = Box::new(DefaultTaskScheduler::new());
291 let context = ExecutionContext {
292 id: format!("ctx_{}", uuid::Uuid::new_v4()),
293 phase: ExecutionPhase::Initializing,
294 metadata: HashMap::new(),
295 resource_usage: ResourceUsage {
296 cpu_usage: 0.0,
297 memory_usage: 0,
298 active_tasks: 0,
299 io_ops: 0,
300 },
301 start_time: Instant::now(),
302 };
303
304 Ok(Self {
305 config,
306 strategies: HashMap::new(),
307 resource_manager,
308 scheduler,
309 context,
310 metrics: Arc::new(Mutex::new(ExecutionMetrics::default())),
311 })
312 }
313
314 pub fn register_strategy(&mut self, strategy: Box<dyn ExecutionStrategy>) {
316 let name = strategy.name().to_string();
317 self.strategies.insert(name, strategy);
318 }
319
320 pub fn execute_task(&mut self, task: Box<dyn ExecutableTask>) -> SklResult<ExecutionResult> {
322 let strategy_name = self.select_strategy(&*task)?;
324
325 if let Some(strategy) = self.strategies.get(&strategy_name) {
326 self.context.phase = ExecutionPhase::Executing;
328
329 let result = strategy.execute(&*task, &self.context)?;
331
332 self.update_metrics(&result);
334
335 self.context.phase = ExecutionPhase::Completed;
337
338 Ok(result)
339 } else {
340 Err(SklearsError::InvalidInput(format!(
341 "Strategy '{strategy_name}' not found"
342 )))
343 }
344 }
345
346 fn select_strategy(&self, task: &dyn ExecutableTask) -> SklResult<String> {
348 let capable_strategies: Vec<_> = self
350 .strategies
351 .iter()
352 .filter(|(_, strategy)| strategy.can_handle(task))
353 .collect();
354
355 if capable_strategies.is_empty() {
356 return Err(SklearsError::InvalidInput(
357 "No capable strategy found for task".to_string(),
358 ));
359 }
360
361 Ok(capable_strategies[0].0.clone())
364 }
365
366 fn update_metrics(&self, result: &ExecutionResult) {
368 if let Ok(mut metrics) = self.metrics.lock() {
369 metrics.tasks_executed += 1;
370 metrics.total_execution_time += result.task_result.duration;
371
372 if result.task_result.status == TaskStatus::Failed {
373 metrics.failed_tasks += 1;
374 }
375
376 metrics.average_task_duration = Duration::from_nanos(
378 metrics.total_execution_time.as_nanos() as u64 / metrics.tasks_executed,
379 );
380 }
381 }
382
383 #[must_use]
385 pub fn metrics(&self) -> ExecutionMetrics {
386 self.metrics.lock().unwrap().clone()
387 }
388
389 #[must_use]
391 pub fn config(&self) -> &ExecutionEngineConfig {
392 &self.config
393 }
394}
395
396impl ResourceManager {
397 #[must_use]
399 pub fn new(constraints: ResourceConstraints) -> Self {
400 Self {
401 allocations: Arc::new(RwLock::new(HashMap::new())),
402 constraints,
403 monitor: Arc::new(ResourceMonitor::new(Duration::from_secs(1))),
404 }
405 }
406
407 pub fn allocate_resources(
409 &self,
410 consumer_id: String,
411 request: ResourceRequest,
412 ) -> SklResult<ResourceAllocation> {
413 let allocation = ResourceAllocation {
414 consumer_id: consumer_id.clone(),
415 cpu_cores: request.cpu_cores,
416 memory_bytes: request.memory_bytes,
417 allocated_at: Instant::now(),
418 };
419
420 if let Ok(mut allocations) = self.allocations.write() {
421 allocations.insert(consumer_id, allocation.clone());
422 }
423
424 Ok(allocation)
425 }
426}
427
428#[derive(Debug, Clone)]
430pub struct ResourceRequest {
431 pub cpu_cores: f64,
433 pub memory_bytes: u64,
435 pub priority: u32,
437}
438
439impl ResourceMonitor {
440 #[must_use]
442 pub fn new(interval: Duration) -> Self {
443 Self {
444 usage_history: Arc::new(RwLock::new(VecDeque::new())),
445 interval,
446 }
447 }
448}
449
450pub struct DefaultTaskScheduler {
452 queue: VecDeque<Box<dyn ExecutableTask>>,
454 config: SchedulerConfig,
456}
457
458impl Default for DefaultTaskScheduler {
459 fn default() -> Self {
460 Self::new()
461 }
462}
463
464impl DefaultTaskScheduler {
465 #[must_use]
467 pub fn new() -> Self {
468 Self {
469 queue: VecDeque::new(),
470 config: SchedulerConfig {
471 algorithm: SchedulingAlgorithm::FIFO,
472 priority_weights: HashMap::new(),
473 resource_aware: false,
474 },
475 }
476 }
477}
478
479impl TaskScheduler for DefaultTaskScheduler {
480 fn schedule_task(&mut self, task: Box<dyn ExecutableTask>) -> SklResult<()> {
481 self.queue.push_back(task);
482 Ok(())
483 }
484
485 fn next_task(&mut self) -> Option<Box<dyn ExecutableTask>> {
486 self.queue.pop_front()
487 }
488
489 fn queue_size(&self) -> usize {
490 self.queue.len()
491 }
492
493 fn set_config(&mut self, config: SchedulerConfig) {
494 self.config = config;
495 }
496}