sklears_compose/execution/
tasks.rs

1//! Task definitions and management for the composable execution engine
2//!
3//! This module provides the core task structures and related functionality
4//! for the execution engine.
5
6use std::any::Any;
7use std::collections::HashMap;
8use std::time::{Duration, SystemTime};
9
10use sklears_core::error::{Result as SklResult, SklearsError};
11
12/// Core execution task structure
13#[derive(Debug)]
14pub struct ExecutionTask {
15    /// Unique task identifier
16    pub id: String,
17    /// Type of task
18    pub task_type: TaskType,
19    /// Task metadata
20    pub metadata: TaskMetadata,
21    /// Resource requirements
22    pub requirements: ResourceRequirements,
23    /// Input data for the task
24    pub input_data: Option<Box<dyn Any + Send + Sync>>,
25    /// Task configuration
26    pub configuration: TaskConfiguration,
27}
28
29/// Task types supported by the execution engine
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub enum TaskType {
32    /// Computational task
33    Computation,
34    /// I/O operation task
35    IoOperation,
36    /// Network operation task
37    NetworkOperation,
38    /// Custom task type
39    Custom,
40}
41
42/// Task metadata for tracking and management
43#[derive(Debug, Clone)]
44pub struct TaskMetadata {
45    /// Human-readable task name
46    pub name: String,
47    /// Task description
48    pub description: String,
49    /// Task priority level
50    pub priority: TaskPriority,
51    /// Estimated execution duration
52    pub estimated_duration: Option<Duration>,
53    /// Task deadline
54    pub deadline: Option<SystemTime>,
55    /// Task dependencies (task IDs)
56    pub dependencies: Vec<String>,
57    /// Custom tags for categorization
58    pub tags: Vec<String>,
59    /// Task creation timestamp
60    pub created_at: SystemTime,
61}
62
63/// Task priority levels
64#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
65pub enum TaskPriority {
66    /// Low priority
67    Low,
68    /// Normal priority
69    Normal,
70    /// High priority
71    High,
72    /// Critical priority
73    Critical,
74}
75
76/// Resource requirements for task execution
77#[derive(Debug, Clone)]
78pub struct ResourceRequirements {
79    /// Required CPU cores
80    pub cpu_cores: f64,
81    /// Required memory in bytes
82    pub memory_bytes: u64,
83    /// Required disk space in bytes
84    pub disk_bytes: u64,
85    /// Required network bandwidth in bytes/sec
86    pub network_bandwidth: u64,
87    /// Required GPU memory in bytes
88    pub gpu_memory_bytes: u64,
89    /// Special resource requirements
90    pub special_resources: Vec<String>,
91}
92
93impl Default for ResourceRequirements {
94    fn default() -> Self {
95        Self {
96            cpu_cores: 1.0,
97            memory_bytes: 100_000_000, // 100MB
98            disk_bytes: 0,
99            network_bandwidth: 0,
100            gpu_memory_bytes: 0,
101            special_resources: Vec::new(),
102        }
103    }
104}
105
106/// Task configuration parameters
107#[derive(Debug, Clone)]
108pub struct TaskConfiguration {
109    /// Configuration parameters
110    pub parameters: HashMap<String, ConfigValue>,
111    /// Environment variables
112    pub environment: HashMap<String, String>,
113    /// Working directory
114    pub working_directory: Option<String>,
115    /// Timeout configuration
116    pub timeout: Option<Duration>,
117}
118
119impl Default for TaskConfiguration {
120    fn default() -> Self {
121        Self {
122            parameters: HashMap::new(),
123            environment: HashMap::new(),
124            working_directory: None,
125            timeout: Some(Duration::from_secs(3600)), // 1 hour default
126        }
127    }
128}
129
130/// Configuration value types
131#[derive(Debug, Clone)]
132pub enum ConfigValue {
133    /// String value
134    String(String),
135    /// Integer value
136    Integer(i64),
137    /// Float value
138    Float(f64),
139    /// Boolean value
140    Boolean(bool),
141    /// Duration value
142    Duration(Duration),
143    /// List of values
144    List(Vec<ConfigValue>),
145    /// Nested configuration
146    Object(HashMap<String, ConfigValue>),
147}
148
149/// Task execution result
150#[derive(Debug)]
151pub struct TaskResult {
152    /// Task identifier
153    pub task_id: String,
154    /// Execution status
155    pub status: TaskStatus,
156    /// Execution time
157    pub execution_time: Duration,
158    /// Resource usage during execution
159    pub resource_usage: ResourceUsage,
160    /// Output data
161    pub output: Option<Box<dyn Any + Send + Sync>>,
162    /// Error information if failed
163    pub error: Option<TaskError>,
164}
165
166/// Task execution status
167#[derive(Debug, Clone, PartialEq)]
168pub enum TaskStatus {
169    /// Task is pending execution
170    Pending,
171    /// Task is currently running
172    Running,
173    /// Task completed successfully
174    Completed,
175    /// Task failed with error
176    Failed,
177    /// Task was cancelled
178    Cancelled,
179    /// Task timed out
180    TimedOut,
181}
182
183/// Resource usage during task execution
184#[derive(Debug, Clone)]
185pub struct ResourceUsage {
186    /// Peak CPU usage percentage
187    pub cpu_percent: f64,
188    /// Peak memory usage in bytes
189    pub memory_bytes: u64,
190    /// Total I/O operations
191    pub io_operations: u64,
192    /// Total network bytes transferred
193    pub network_bytes: u64,
194    /// Peak GPU memory usage in bytes
195    pub gpu_memory_bytes: u64,
196    /// Execution duration
197    pub execution_duration: Duration,
198}
199
200impl Default for ResourceUsage {
201    fn default() -> Self {
202        Self {
203            cpu_percent: 0.0,
204            memory_bytes: 0,
205            io_operations: 0,
206            network_bytes: 0,
207            gpu_memory_bytes: 0,
208            execution_duration: Duration::ZERO,
209        }
210    }
211}
212
213/// Task error information
214#[derive(Debug, Clone)]
215pub struct TaskError {
216    /// Error type/category
217    pub error_type: String,
218    /// Error message
219    pub message: String,
220    /// Error code (if applicable)
221    pub code: Option<i32>,
222    /// Stack trace (if applicable)
223    pub stack_trace: Option<String>,
224    /// Recovery suggestions
225    pub recovery_suggestions: Vec<String>,
226}
227
228/// Task builder for convenient task creation
229pub struct TaskBuilder {
230    task: ExecutionTask,
231}
232
233impl TaskBuilder {
234    /// Create a new task builder
235    #[must_use]
236    pub fn new(id: String, name: String) -> Self {
237        Self {
238            task: ExecutionTask {
239                id,
240                task_type: TaskType::Computation,
241                metadata: TaskMetadata {
242                    name,
243                    description: String::new(),
244                    priority: TaskPriority::Normal,
245                    estimated_duration: None,
246                    deadline: None,
247                    dependencies: Vec::new(),
248                    tags: Vec::new(),
249                    created_at: SystemTime::now(),
250                },
251                requirements: ResourceRequirements::default(),
252                input_data: None,
253                configuration: TaskConfiguration::default(),
254            },
255        }
256    }
257
258    /// Set task type
259    #[must_use]
260    pub fn task_type(mut self, task_type: TaskType) -> Self {
261        self.task.task_type = task_type;
262        self
263    }
264
265    /// Set task description
266    #[must_use]
267    pub fn description(mut self, description: String) -> Self {
268        self.task.metadata.description = description;
269        self
270    }
271
272    /// Set task priority
273    #[must_use]
274    pub fn priority(mut self, priority: TaskPriority) -> Self {
275        self.task.metadata.priority = priority;
276        self
277    }
278
279    /// Set estimated duration
280    #[must_use]
281    pub fn estimated_duration(mut self, duration: Duration) -> Self {
282        self.task.metadata.estimated_duration = Some(duration);
283        self
284    }
285
286    /// Set task deadline
287    #[must_use]
288    pub fn deadline(mut self, deadline: SystemTime) -> Self {
289        self.task.metadata.deadline = Some(deadline);
290        self
291    }
292
293    /// Add dependency
294    #[must_use]
295    pub fn dependency(mut self, task_id: String) -> Self {
296        self.task.metadata.dependencies.push(task_id);
297        self
298    }
299
300    /// Add tag
301    #[must_use]
302    pub fn tag(mut self, tag: String) -> Self {
303        self.task.metadata.tags.push(tag);
304        self
305    }
306
307    /// Set CPU requirements
308    #[must_use]
309    pub fn cpu_cores(mut self, cores: f64) -> Self {
310        self.task.requirements.cpu_cores = cores;
311        self
312    }
313
314    /// Set memory requirements
315    #[must_use]
316    pub fn memory_bytes(mut self, bytes: u64) -> Self {
317        self.task.requirements.memory_bytes = bytes;
318        self
319    }
320
321    /// Set disk requirements
322    #[must_use]
323    pub fn disk_bytes(mut self, bytes: u64) -> Self {
324        self.task.requirements.disk_bytes = bytes;
325        self
326    }
327
328    /// Set network bandwidth requirements
329    #[must_use]
330    pub fn network_bandwidth(mut self, bandwidth: u64) -> Self {
331        self.task.requirements.network_bandwidth = bandwidth;
332        self
333    }
334
335    /// Set GPU memory requirements
336    #[must_use]
337    pub fn gpu_memory_bytes(mut self, bytes: u64) -> Self {
338        self.task.requirements.gpu_memory_bytes = bytes;
339        self
340    }
341
342    /// Add configuration parameter
343    pub fn parameter<K: Into<String>>(mut self, key: K, value: ConfigValue) -> Self {
344        self.task.configuration.parameters.insert(key.into(), value);
345        self
346    }
347
348    /// Add environment variable
349    pub fn environment<K: Into<String>, V: Into<String>>(mut self, key: K, value: V) -> Self {
350        self.task
351            .configuration
352            .environment
353            .insert(key.into(), value.into());
354        self
355    }
356
357    /// Set working directory
358    pub fn working_directory<P: Into<String>>(mut self, path: P) -> Self {
359        self.task.configuration.working_directory = Some(path.into());
360        self
361    }
362
363    /// Set task timeout
364    #[must_use]
365    pub fn timeout(mut self, timeout: Duration) -> Self {
366        self.task.configuration.timeout = Some(timeout);
367        self
368    }
369
370    /// Build the task
371    #[must_use]
372    pub fn build(self) -> ExecutionTask {
373        self.task
374    }
375}
376
377/// Task queue for managing pending tasks
378#[derive(Debug)]
379pub struct TaskQueue {
380    /// Queued tasks
381    tasks: Vec<ExecutionTask>,
382    /// Maximum queue size
383    max_size: Option<usize>,
384}
385
386impl TaskQueue {
387    /// Create a new task queue
388    #[must_use]
389    pub fn new() -> Self {
390        Self {
391            tasks: Vec::new(),
392            max_size: None,
393        }
394    }
395
396    /// Create a new task queue with maximum size
397    #[must_use]
398    pub fn with_capacity(max_size: usize) -> Self {
399        Self {
400            tasks: Vec::new(),
401            max_size: Some(max_size),
402        }
403    }
404
405    /// Add a task to the queue
406    pub fn enqueue(&mut self, task: ExecutionTask) -> SklResult<()> {
407        if let Some(max_size) = self.max_size {
408            if self.tasks.len() >= max_size {
409                return Err(SklearsError::InvalidInput("Task queue is full".to_string()));
410            }
411        }
412
413        self.tasks.push(task);
414        Ok(())
415    }
416
417    /// Remove and return the next task from the queue
418    pub fn dequeue(&mut self) -> Option<ExecutionTask> {
419        if self.tasks.is_empty() {
420            None
421        } else {
422            Some(self.tasks.remove(0))
423        }
424    }
425
426    /// Peek at the next task without removing it
427    #[must_use]
428    pub fn peek(&self) -> Option<&ExecutionTask> {
429        self.tasks.first()
430    }
431
432    /// Get the number of tasks in the queue
433    #[must_use]
434    pub fn len(&self) -> usize {
435        self.tasks.len()
436    }
437
438    /// Check if the queue is empty
439    #[must_use]
440    pub fn is_empty(&self) -> bool {
441        self.tasks.is_empty()
442    }
443
444    /// Clear all tasks from the queue
445    pub fn clear(&mut self) {
446        self.tasks.clear();
447    }
448
449    /// Get all tasks in the queue
450    #[must_use]
451    pub fn tasks(&self) -> &[ExecutionTask] {
452        &self.tasks
453    }
454
455    /// Sort tasks by priority
456    pub fn sort_by_priority(&mut self) {
457        self.tasks
458            .sort_by(|a, b| b.metadata.priority.cmp(&a.metadata.priority));
459    }
460
461    /// Filter tasks by predicate
462    pub fn filter<F>(&self, predicate: F) -> Vec<&ExecutionTask>
463    where
464        F: Fn(&ExecutionTask) -> bool,
465    {
466        self.tasks.iter().filter(|task| predicate(task)).collect()
467    }
468}
469
470impl Default for TaskQueue {
471    fn default() -> Self {
472        Self::new()
473    }
474}
475
476#[allow(non_snake_case)]
477#[cfg(test)]
478mod tests {
479    use super::*;
480
481    #[test]
482    fn test_task_builder() {
483        let task = TaskBuilder::new("task1".to_string(), "Test Task".to_string())
484            .task_type(TaskType::Computation)
485            .priority(TaskPriority::High)
486            .cpu_cores(2.0)
487            .memory_bytes(1024 * 1024 * 1024)
488            .parameter(
489                "param1".to_string(),
490                ConfigValue::String("value1".to_string()),
491            )
492            .build();
493
494        assert_eq!(task.id, "task1");
495        assert_eq!(task.metadata.name, "Test Task");
496        assert_eq!(task.task_type, TaskType::Computation);
497        assert_eq!(task.metadata.priority, TaskPriority::High);
498        assert_eq!(task.requirements.cpu_cores, 2.0);
499        assert_eq!(task.requirements.memory_bytes, 1024 * 1024 * 1024);
500        assert!(task.configuration.parameters.contains_key("param1"));
501    }
502
503    #[test]
504    fn test_task_queue() {
505        let mut queue = TaskQueue::new();
506        assert!(queue.is_empty());
507        assert_eq!(queue.len(), 0);
508
509        let task = TaskBuilder::new("task1".to_string(), "Test Task".to_string()).build();
510        queue.enqueue(task).unwrap();
511
512        assert!(!queue.is_empty());
513        assert_eq!(queue.len(), 1);
514
515        let dequeued = queue.dequeue().unwrap();
516        assert_eq!(dequeued.id, "task1");
517        assert!(queue.is_empty());
518    }
519
520    #[test]
521    fn test_task_priority_ordering() {
522        assert!(TaskPriority::Critical > TaskPriority::High);
523        assert!(TaskPriority::High > TaskPriority::Normal);
524        assert!(TaskPriority::Normal > TaskPriority::Low);
525    }
526
527    #[test]
528    fn test_task_queue_capacity() {
529        let mut queue = TaskQueue::with_capacity(1);
530        let task1 = TaskBuilder::new("task1".to_string(), "Task 1".to_string()).build();
531        let task2 = TaskBuilder::new("task2".to_string(), "Task 2".to_string()).build();
532
533        assert!(queue.enqueue(task1).is_ok());
534        assert!(queue.enqueue(task2).is_err());
535    }
536
537    #[test]
538    fn test_task_queue_priority_sorting() {
539        let mut queue = TaskQueue::new();
540
541        let low_task = TaskBuilder::new("low".to_string(), "Low Task".to_string())
542            .priority(TaskPriority::Low)
543            .build();
544        let high_task = TaskBuilder::new("high".to_string(), "High Task".to_string())
545            .priority(TaskPriority::High)
546            .build();
547        let critical_task = TaskBuilder::new("critical".to_string(), "Critical Task".to_string())
548            .priority(TaskPriority::Critical)
549            .build();
550
551        queue.enqueue(low_task).unwrap();
552        queue.enqueue(high_task).unwrap();
553        queue.enqueue(critical_task).unwrap();
554
555        queue.sort_by_priority();
556
557        assert_eq!(queue.tasks[0].id, "critical");
558        assert_eq!(queue.tasks[1].id, "high");
559        assert_eq!(queue.tasks[2].id, "low");
560    }
561}