1use std::any::Any;
7use std::collections::HashMap;
8use std::time::{Duration, SystemTime};
9
10use sklears_core::error::{Result as SklResult, SklearsError};
11
12#[derive(Debug)]
14pub struct ExecutionTask {
15 pub id: String,
17 pub task_type: TaskType,
19 pub metadata: TaskMetadata,
21 pub requirements: ResourceRequirements,
23 pub input_data: Option<Box<dyn Any + Send + Sync>>,
25 pub configuration: TaskConfiguration,
27}
28
29#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub enum TaskType {
32 Computation,
34 IoOperation,
36 NetworkOperation,
38 Custom,
40}
41
42#[derive(Debug, Clone)]
44pub struct TaskMetadata {
45 pub name: String,
47 pub description: String,
49 pub priority: TaskPriority,
51 pub estimated_duration: Option<Duration>,
53 pub deadline: Option<SystemTime>,
55 pub dependencies: Vec<String>,
57 pub tags: Vec<String>,
59 pub created_at: SystemTime,
61}
62
63#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
65pub enum TaskPriority {
66 Low,
68 Normal,
70 High,
72 Critical,
74}
75
76#[derive(Debug, Clone)]
78pub struct ResourceRequirements {
79 pub cpu_cores: f64,
81 pub memory_bytes: u64,
83 pub disk_bytes: u64,
85 pub network_bandwidth: u64,
87 pub gpu_memory_bytes: u64,
89 pub special_resources: Vec<String>,
91}
92
93impl Default for ResourceRequirements {
94 fn default() -> Self {
95 Self {
96 cpu_cores: 1.0,
97 memory_bytes: 100_000_000, disk_bytes: 0,
99 network_bandwidth: 0,
100 gpu_memory_bytes: 0,
101 special_resources: Vec::new(),
102 }
103 }
104}
105
106#[derive(Debug, Clone)]
108pub struct TaskConfiguration {
109 pub parameters: HashMap<String, ConfigValue>,
111 pub environment: HashMap<String, String>,
113 pub working_directory: Option<String>,
115 pub timeout: Option<Duration>,
117}
118
119impl Default for TaskConfiguration {
120 fn default() -> Self {
121 Self {
122 parameters: HashMap::new(),
123 environment: HashMap::new(),
124 working_directory: None,
125 timeout: Some(Duration::from_secs(3600)), }
127 }
128}
129
130#[derive(Debug, Clone)]
132pub enum ConfigValue {
133 String(String),
135 Integer(i64),
137 Float(f64),
139 Boolean(bool),
141 Duration(Duration),
143 List(Vec<ConfigValue>),
145 Object(HashMap<String, ConfigValue>),
147}
148
149#[derive(Debug)]
151pub struct TaskResult {
152 pub task_id: String,
154 pub status: TaskStatus,
156 pub execution_time: Duration,
158 pub resource_usage: ResourceUsage,
160 pub output: Option<Box<dyn Any + Send + Sync>>,
162 pub error: Option<TaskError>,
164}
165
166#[derive(Debug, Clone, PartialEq)]
168pub enum TaskStatus {
169 Pending,
171 Running,
173 Completed,
175 Failed,
177 Cancelled,
179 TimedOut,
181}
182
183#[derive(Debug, Clone)]
185pub struct ResourceUsage {
186 pub cpu_percent: f64,
188 pub memory_bytes: u64,
190 pub io_operations: u64,
192 pub network_bytes: u64,
194 pub gpu_memory_bytes: u64,
196 pub execution_duration: Duration,
198}
199
200impl Default for ResourceUsage {
201 fn default() -> Self {
202 Self {
203 cpu_percent: 0.0,
204 memory_bytes: 0,
205 io_operations: 0,
206 network_bytes: 0,
207 gpu_memory_bytes: 0,
208 execution_duration: Duration::ZERO,
209 }
210 }
211}
212
213#[derive(Debug, Clone)]
215pub struct TaskError {
216 pub error_type: String,
218 pub message: String,
220 pub code: Option<i32>,
222 pub stack_trace: Option<String>,
224 pub recovery_suggestions: Vec<String>,
226}
227
228pub struct TaskBuilder {
230 task: ExecutionTask,
231}
232
233impl TaskBuilder {
234 #[must_use]
236 pub fn new(id: String, name: String) -> Self {
237 Self {
238 task: ExecutionTask {
239 id,
240 task_type: TaskType::Computation,
241 metadata: TaskMetadata {
242 name,
243 description: String::new(),
244 priority: TaskPriority::Normal,
245 estimated_duration: None,
246 deadline: None,
247 dependencies: Vec::new(),
248 tags: Vec::new(),
249 created_at: SystemTime::now(),
250 },
251 requirements: ResourceRequirements::default(),
252 input_data: None,
253 configuration: TaskConfiguration::default(),
254 },
255 }
256 }
257
258 #[must_use]
260 pub fn task_type(mut self, task_type: TaskType) -> Self {
261 self.task.task_type = task_type;
262 self
263 }
264
265 #[must_use]
267 pub fn description(mut self, description: String) -> Self {
268 self.task.metadata.description = description;
269 self
270 }
271
272 #[must_use]
274 pub fn priority(mut self, priority: TaskPriority) -> Self {
275 self.task.metadata.priority = priority;
276 self
277 }
278
279 #[must_use]
281 pub fn estimated_duration(mut self, duration: Duration) -> Self {
282 self.task.metadata.estimated_duration = Some(duration);
283 self
284 }
285
286 #[must_use]
288 pub fn deadline(mut self, deadline: SystemTime) -> Self {
289 self.task.metadata.deadline = Some(deadline);
290 self
291 }
292
293 #[must_use]
295 pub fn dependency(mut self, task_id: String) -> Self {
296 self.task.metadata.dependencies.push(task_id);
297 self
298 }
299
300 #[must_use]
302 pub fn tag(mut self, tag: String) -> Self {
303 self.task.metadata.tags.push(tag);
304 self
305 }
306
307 #[must_use]
309 pub fn cpu_cores(mut self, cores: f64) -> Self {
310 self.task.requirements.cpu_cores = cores;
311 self
312 }
313
314 #[must_use]
316 pub fn memory_bytes(mut self, bytes: u64) -> Self {
317 self.task.requirements.memory_bytes = bytes;
318 self
319 }
320
321 #[must_use]
323 pub fn disk_bytes(mut self, bytes: u64) -> Self {
324 self.task.requirements.disk_bytes = bytes;
325 self
326 }
327
328 #[must_use]
330 pub fn network_bandwidth(mut self, bandwidth: u64) -> Self {
331 self.task.requirements.network_bandwidth = bandwidth;
332 self
333 }
334
335 #[must_use]
337 pub fn gpu_memory_bytes(mut self, bytes: u64) -> Self {
338 self.task.requirements.gpu_memory_bytes = bytes;
339 self
340 }
341
342 pub fn parameter<K: Into<String>>(mut self, key: K, value: ConfigValue) -> Self {
344 self.task.configuration.parameters.insert(key.into(), value);
345 self
346 }
347
348 pub fn environment<K: Into<String>, V: Into<String>>(mut self, key: K, value: V) -> Self {
350 self.task
351 .configuration
352 .environment
353 .insert(key.into(), value.into());
354 self
355 }
356
357 pub fn working_directory<P: Into<String>>(mut self, path: P) -> Self {
359 self.task.configuration.working_directory = Some(path.into());
360 self
361 }
362
363 #[must_use]
365 pub fn timeout(mut self, timeout: Duration) -> Self {
366 self.task.configuration.timeout = Some(timeout);
367 self
368 }
369
370 #[must_use]
372 pub fn build(self) -> ExecutionTask {
373 self.task
374 }
375}
376
377#[derive(Debug)]
379pub struct TaskQueue {
380 tasks: Vec<ExecutionTask>,
382 max_size: Option<usize>,
384}
385
386impl TaskQueue {
387 #[must_use]
389 pub fn new() -> Self {
390 Self {
391 tasks: Vec::new(),
392 max_size: None,
393 }
394 }
395
396 #[must_use]
398 pub fn with_capacity(max_size: usize) -> Self {
399 Self {
400 tasks: Vec::new(),
401 max_size: Some(max_size),
402 }
403 }
404
405 pub fn enqueue(&mut self, task: ExecutionTask) -> SklResult<()> {
407 if let Some(max_size) = self.max_size {
408 if self.tasks.len() >= max_size {
409 return Err(SklearsError::InvalidInput("Task queue is full".to_string()));
410 }
411 }
412
413 self.tasks.push(task);
414 Ok(())
415 }
416
417 pub fn dequeue(&mut self) -> Option<ExecutionTask> {
419 if self.tasks.is_empty() {
420 None
421 } else {
422 Some(self.tasks.remove(0))
423 }
424 }
425
426 #[must_use]
428 pub fn peek(&self) -> Option<&ExecutionTask> {
429 self.tasks.first()
430 }
431
432 #[must_use]
434 pub fn len(&self) -> usize {
435 self.tasks.len()
436 }
437
438 #[must_use]
440 pub fn is_empty(&self) -> bool {
441 self.tasks.is_empty()
442 }
443
444 pub fn clear(&mut self) {
446 self.tasks.clear();
447 }
448
449 #[must_use]
451 pub fn tasks(&self) -> &[ExecutionTask] {
452 &self.tasks
453 }
454
455 pub fn sort_by_priority(&mut self) {
457 self.tasks
458 .sort_by(|a, b| b.metadata.priority.cmp(&a.metadata.priority));
459 }
460
461 pub fn filter<F>(&self, predicate: F) -> Vec<&ExecutionTask>
463 where
464 F: Fn(&ExecutionTask) -> bool,
465 {
466 self.tasks.iter().filter(|task| predicate(task)).collect()
467 }
468}
469
470impl Default for TaskQueue {
471 fn default() -> Self {
472 Self::new()
473 }
474}
475
476#[allow(non_snake_case)]
477#[cfg(test)]
478mod tests {
479 use super::*;
480
481 #[test]
482 fn test_task_builder() {
483 let task = TaskBuilder::new("task1".to_string(), "Test Task".to_string())
484 .task_type(TaskType::Computation)
485 .priority(TaskPriority::High)
486 .cpu_cores(2.0)
487 .memory_bytes(1024 * 1024 * 1024)
488 .parameter(
489 "param1".to_string(),
490 ConfigValue::String("value1".to_string()),
491 )
492 .build();
493
494 assert_eq!(task.id, "task1");
495 assert_eq!(task.metadata.name, "Test Task");
496 assert_eq!(task.task_type, TaskType::Computation);
497 assert_eq!(task.metadata.priority, TaskPriority::High);
498 assert_eq!(task.requirements.cpu_cores, 2.0);
499 assert_eq!(task.requirements.memory_bytes, 1024 * 1024 * 1024);
500 assert!(task.configuration.parameters.contains_key("param1"));
501 }
502
503 #[test]
504 fn test_task_queue() {
505 let mut queue = TaskQueue::new();
506 assert!(queue.is_empty());
507 assert_eq!(queue.len(), 0);
508
509 let task = TaskBuilder::new("task1".to_string(), "Test Task".to_string()).build();
510 queue.enqueue(task).unwrap();
511
512 assert!(!queue.is_empty());
513 assert_eq!(queue.len(), 1);
514
515 let dequeued = queue.dequeue().unwrap();
516 assert_eq!(dequeued.id, "task1");
517 assert!(queue.is_empty());
518 }
519
520 #[test]
521 fn test_task_priority_ordering() {
522 assert!(TaskPriority::Critical > TaskPriority::High);
523 assert!(TaskPriority::High > TaskPriority::Normal);
524 assert!(TaskPriority::Normal > TaskPriority::Low);
525 }
526
527 #[test]
528 fn test_task_queue_capacity() {
529 let mut queue = TaskQueue::with_capacity(1);
530 let task1 = TaskBuilder::new("task1".to_string(), "Task 1".to_string()).build();
531 let task2 = TaskBuilder::new("task2".to_string(), "Task 2".to_string()).build();
532
533 assert!(queue.enqueue(task1).is_ok());
534 assert!(queue.enqueue(task2).is_err());
535 }
536
537 #[test]
538 fn test_task_queue_priority_sorting() {
539 let mut queue = TaskQueue::new();
540
541 let low_task = TaskBuilder::new("low".to_string(), "Low Task".to_string())
542 .priority(TaskPriority::Low)
543 .build();
544 let high_task = TaskBuilder::new("high".to_string(), "High Task".to_string())
545 .priority(TaskPriority::High)
546 .build();
547 let critical_task = TaskBuilder::new("critical".to_string(), "Critical Task".to_string())
548 .priority(TaskPriority::Critical)
549 .build();
550
551 queue.enqueue(low_task).unwrap();
552 queue.enqueue(high_task).unwrap();
553 queue.enqueue(critical_task).unwrap();
554
555 queue.sort_by_priority();
556
557 assert_eq!(queue.tasks[0].id, "critical");
558 assert_eq!(queue.tasks[1].id, "high");
559 assert_eq!(queue.tasks[2].id, "low");
560 }
561}