sklears_compose/execution/
engine.rs1use sklears_core::{
6 error::{Result as SklResult, SklearsError},
7 traits::Estimator,
8};
9use std::any::Any;
10use std::collections::{HashMap, VecDeque};
11use std::sync::{Arc, Mutex, RwLock};
12use std::time::{Duration, Instant};
13
14use super::config::{ExecutionEngineConfig, ResourceConstraints, StrategyConfig};
15
16pub struct ComposableExecutionEngine {
18 config: ExecutionEngineConfig,
20 strategies: HashMap<String, Box<dyn ExecutionStrategy>>,
22 resource_manager: Arc<ResourceManager>,
24 scheduler: Box<dyn TaskScheduler>,
26 context: ExecutionContext,
28 metrics: Arc<Mutex<ExecutionMetrics>>,
30}
31
32#[derive(Debug, Clone)]
34pub struct ExecutionContext {
35 pub id: String,
37 pub phase: ExecutionPhase,
39 pub metadata: HashMap<String, String>,
41 pub resource_usage: ResourceUsage,
43 pub start_time: Instant,
45}
46
47#[derive(Debug, Clone, PartialEq)]
49pub enum ExecutionPhase {
50 Initializing,
52 Planning,
54 Executing,
56 Monitoring,
58 Completed,
60 Failed,
62}
63
64#[derive(Debug, Clone)]
66pub struct ResourceUsage {
67 pub cpu_usage: f64,
69 pub memory_usage: u64,
71 pub active_tasks: usize,
73 pub io_ops: u64,
75}
76
77pub trait ExecutionStrategy: Send + Sync {
79 fn name(&self) -> &str;
81
82 fn execute(
84 &self,
85 task: &dyn ExecutableTask,
86 context: &ExecutionContext,
87 ) -> SklResult<ExecutionResult>;
88
89 fn can_handle(&self, task: &dyn ExecutableTask) -> bool;
91
92 fn config(&self) -> &StrategyConfig;
94
95 fn clone_strategy(&self) -> Box<dyn ExecutionStrategy>;
97}
98
99pub trait ExecutableTask: Send + Sync {
101 fn id(&self) -> &str;
103
104 fn task_type(&self) -> &str;
106
107 fn execute(&self) -> SklResult<TaskResult>;
109
110 fn resource_estimate(&self) -> ResourceEstimate;
112
113 fn dependencies(&self) -> Vec<String>;
115}
116
117#[derive(Debug)]
119pub struct TaskResult {
120 pub task_id: String,
122 pub status: TaskStatus,
124 pub data: Option<Box<dyn Any + Send>>,
126 pub duration: Duration,
128 pub resource_usage: ResourceUsage,
130}
131
132#[derive(Debug, Clone, PartialEq)]
134pub enum TaskStatus {
135 Pending,
137 Running,
139 Completed,
141 Failed,
143 Cancelled,
145}
146
147#[derive(Debug, Clone)]
149pub struct ResourceEstimate {
150 pub cpu_cores: f64,
152 pub memory_bytes: u64,
154 pub execution_time: Duration,
156 pub io_operations: u64,
158}
159
160#[derive(Debug)]
162pub struct ExecutionResult {
163 pub strategy_name: String,
165 pub task_result: TaskResult,
167 pub metadata: HashMap<String, String>,
169}
170
171pub trait TaskScheduler: Send + Sync {
173 fn schedule_task(&mut self, task: Box<dyn ExecutableTask>) -> SklResult<()>;
175
176 fn next_task(&mut self) -> Option<Box<dyn ExecutableTask>>;
178
179 fn queue_size(&self) -> usize;
181
182 fn set_config(&mut self, config: SchedulerConfig);
184}
185
186#[derive(Debug, Clone)]
188pub struct SchedulerConfig {
189 pub algorithm: SchedulingAlgorithm,
191 pub priority_weights: HashMap<String, f64>,
193 pub resource_aware: bool,
195}
196
197#[derive(Debug, Clone)]
199pub enum SchedulingAlgorithm {
200 FIFO,
202 Priority,
204 ShortestJobFirst,
206 ResourceAware,
208 DeadlineAware,
210}
211
212pub struct ResourceManager {
214 allocations: Arc<RwLock<HashMap<String, ResourceAllocation>>>,
216 constraints: ResourceConstraints,
218 monitor: Arc<ResourceMonitor>,
220}
221
222#[derive(Debug, Clone)]
224pub struct ResourceAllocation {
225 pub consumer_id: String,
227 pub cpu_cores: f64,
229 pub memory_bytes: u64,
231 pub allocated_at: Instant,
233}
234
235pub struct ResourceMonitor {
237 usage_history: Arc<RwLock<VecDeque<ResourceSnapshot>>>,
239 interval: Duration,
241}
242
243#[derive(Debug, Clone)]
245pub struct ResourceSnapshot {
246 pub timestamp: Instant,
248 pub cpu_usage: f64,
250 pub memory_usage: u64,
252 pub active_tasks: usize,
254 pub io_rate: f64,
256}
257
258#[derive(Debug, Default, Clone)]
260pub struct ExecutionMetrics {
261 pub tasks_executed: u64,
263 pub total_execution_time: Duration,
265 pub average_task_duration: Duration,
267 pub failed_tasks: u64,
269 pub resource_utilization: ResourceUtilization,
271}
272
273#[derive(Debug, Default, Clone)]
275pub struct ResourceUtilization {
276 pub avg_cpu_usage: f64,
278 pub peak_cpu_usage: f64,
280 pub avg_memory_usage: u64,
282 pub peak_memory_usage: u64,
284}
285
286impl ComposableExecutionEngine {
287 pub fn new(config: ExecutionEngineConfig) -> SklResult<Self> {
289 let resource_manager = Arc::new(ResourceManager::new(config.resource_constraints.clone()));
290 let scheduler = Box::new(DefaultTaskScheduler::new());
291 let context = ExecutionContext {
292 id: format!("ctx_{}", uuid::Uuid::new_v4()),
293 phase: ExecutionPhase::Initializing,
294 metadata: HashMap::new(),
295 resource_usage: ResourceUsage {
296 cpu_usage: 0.0,
297 memory_usage: 0,
298 active_tasks: 0,
299 io_ops: 0,
300 },
301 start_time: Instant::now(),
302 };
303
304 Ok(Self {
305 config,
306 strategies: HashMap::new(),
307 resource_manager,
308 scheduler,
309 context,
310 metrics: Arc::new(Mutex::new(ExecutionMetrics::default())),
311 })
312 }
313
314 pub fn register_strategy(&mut self, strategy: Box<dyn ExecutionStrategy>) {
316 let name = strategy.name().to_string();
317 self.strategies.insert(name, strategy);
318 }
319
320 pub fn execute_task(&mut self, task: Box<dyn ExecutableTask>) -> SklResult<ExecutionResult> {
322 let strategy_name = self.select_strategy(&*task)?;
324
325 if let Some(strategy) = self.strategies.get(&strategy_name) {
326 self.context.phase = ExecutionPhase::Executing;
328
329 let result = strategy.execute(&*task, &self.context)?;
331
332 self.update_metrics(&result);
334
335 self.context.phase = ExecutionPhase::Completed;
337
338 Ok(result)
339 } else {
340 Err(SklearsError::InvalidInput(format!(
341 "Strategy '{strategy_name}' not found"
342 )))
343 }
344 }
345
346 fn select_strategy(&self, task: &dyn ExecutableTask) -> SklResult<String> {
348 let capable_strategies: Vec<_> = self
350 .strategies
351 .iter()
352 .filter(|(_, strategy)| strategy.can_handle(task))
353 .collect();
354
355 if capable_strategies.is_empty() {
356 return Err(SklearsError::InvalidInput(
357 "No capable strategy found for task".to_string(),
358 ));
359 }
360
361 Ok(capable_strategies[0].0.clone())
364 }
365
366 fn update_metrics(&self, result: &ExecutionResult) {
368 if let Ok(mut metrics) = self.metrics.lock() {
369 metrics.tasks_executed += 1;
370 metrics.total_execution_time += result.task_result.duration;
371
372 if result.task_result.status == TaskStatus::Failed {
373 metrics.failed_tasks += 1;
374 }
375
376 metrics.average_task_duration = Duration::from_nanos(
378 metrics.total_execution_time.as_nanos() as u64 / metrics.tasks_executed,
379 );
380 }
381 }
382
383 #[must_use]
385 pub fn metrics(&self) -> ExecutionMetrics {
386 self.metrics
387 .lock()
388 .unwrap_or_else(|e| e.into_inner())
389 .clone()
390 }
391
392 #[must_use]
394 pub fn config(&self) -> &ExecutionEngineConfig {
395 &self.config
396 }
397}
398
399impl ResourceManager {
400 #[must_use]
402 pub fn new(constraints: ResourceConstraints) -> Self {
403 Self {
404 allocations: Arc::new(RwLock::new(HashMap::new())),
405 constraints,
406 monitor: Arc::new(ResourceMonitor::new(Duration::from_secs(1))),
407 }
408 }
409
410 pub fn allocate_resources(
412 &self,
413 consumer_id: String,
414 request: ResourceRequest,
415 ) -> SklResult<ResourceAllocation> {
416 let allocation = ResourceAllocation {
417 consumer_id: consumer_id.clone(),
418 cpu_cores: request.cpu_cores,
419 memory_bytes: request.memory_bytes,
420 allocated_at: Instant::now(),
421 };
422
423 if let Ok(mut allocations) = self.allocations.write() {
424 allocations.insert(consumer_id, allocation.clone());
425 }
426
427 Ok(allocation)
428 }
429}
430
431#[derive(Debug, Clone)]
433pub struct ResourceRequest {
434 pub cpu_cores: f64,
436 pub memory_bytes: u64,
438 pub priority: u32,
440}
441
442impl ResourceMonitor {
443 #[must_use]
445 pub fn new(interval: Duration) -> Self {
446 Self {
447 usage_history: Arc::new(RwLock::new(VecDeque::new())),
448 interval,
449 }
450 }
451}
452
453pub struct DefaultTaskScheduler {
455 queue: VecDeque<Box<dyn ExecutableTask>>,
457 config: SchedulerConfig,
459}
460
461impl Default for DefaultTaskScheduler {
462 fn default() -> Self {
463 Self::new()
464 }
465}
466
467impl DefaultTaskScheduler {
468 #[must_use]
470 pub fn new() -> Self {
471 Self {
472 queue: VecDeque::new(),
473 config: SchedulerConfig {
474 algorithm: SchedulingAlgorithm::FIFO,
475 priority_weights: HashMap::new(),
476 resource_aware: false,
477 },
478 }
479 }
480}
481
482impl TaskScheduler for DefaultTaskScheduler {
483 fn schedule_task(&mut self, task: Box<dyn ExecutableTask>) -> SklResult<()> {
484 self.queue.push_back(task);
485 Ok(())
486 }
487
488 fn next_task(&mut self) -> Option<Box<dyn ExecutableTask>> {
489 self.queue.pop_front()
490 }
491
492 fn queue_size(&self) -> usize {
493 self.queue.len()
494 }
495
496 fn set_config(&mut self, config: SchedulerConfig) {
497 self.config = config;
498 }
499}