subx_cli/core/parallel/
scheduler.rs

1//! Task scheduler for parallel processing
2use super::{Task, TaskResult, TaskStatus};
3use crate::Result;
4use crate::config::{Config, OverflowStrategy};
5use crate::core::parallel::config::ParallelConfig;
6use crate::error::SubXError;
7use std::collections::VecDeque;
8use std::sync::{Arc, Mutex};
9use tokio::sync::{Semaphore, oneshot};
10
11struct PendingTask {
12    task: Box<dyn Task + Send + Sync>,
13    result_sender: oneshot::Sender<TaskResult>,
14    task_id: String,
15    priority: TaskPriority,
16}
17
18impl PartialEq for PendingTask {
19    fn eq(&self, other: &Self) -> bool {
20        self.priority == other.priority
21    }
22}
23
24impl Eq for PendingTask {}
25
26impl PartialOrd for PendingTask {
27    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
28        Some(self.cmp(other))
29    }
30}
31
32impl Ord for PendingTask {
33    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
34        self.priority.cmp(&other.priority)
35    }
36}
37
38/// Priority levels for task execution scheduling.
39///
40/// Determines the execution order of tasks in the queue, with higher
41/// priority tasks being processed before lower priority ones.
42#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
43pub enum TaskPriority {
44    /// Low priority for background operations
45    Low = 0,
46    /// Normal priority for standard operations
47    Normal = 1,
48    /// High priority for user-initiated operations
49    High = 2,
50    /// Critical priority for system operations
51    Critical = 3,
52}
53
54/// Information about an active task in the scheduler.
55///
56/// Contains runtime information about a task currently being processed
57/// or queued for execution.
58#[derive(Debug, Clone)]
59pub struct TaskInfo {
60    /// Unique identifier for the task
61    pub task_id: String,
62    /// Type of task being executed
63    pub task_type: String,
64    /// Current status of the task
65    pub status: TaskStatus,
66    /// When the task started execution
67    pub start_time: std::time::Instant,
68    /// Current progress percentage (0.0 to 1.0)
69    pub progress: f32,
70}
71
72/// Scheduler to manage and execute tasks in parallel
73pub struct TaskScheduler {
74    /// Parallel processing configuration
75    _config: ParallelConfig,
76    /// Optional load balancer for dynamic worker adjustment
77    load_balancer: Option<crate::core::parallel::load_balancer::LoadBalancer>,
78    /// Task execution timeout setting
79    task_timeout: std::time::Duration,
80    /// Worker thread idle timeout setting
81    worker_idle_timeout: std::time::Duration,
82    task_queue: Arc<Mutex<VecDeque<PendingTask>>>,
83    semaphore: Arc<Semaphore>,
84    active_tasks: Arc<Mutex<std::collections::HashMap<String, TaskInfo>>>,
85    scheduler_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
86}
87
88impl TaskScheduler {
89    /// Create a new scheduler based on configuration
90    pub fn new_with_config(app_config: &Config) -> Result<Self> {
91        let config = ParallelConfig::from_app_config(app_config);
92        config.validate()?;
93        let semaphore = Arc::new(Semaphore::new(config.max_concurrent_jobs));
94        let task_queue = Arc::new(Mutex::new(VecDeque::new()));
95        let active_tasks = Arc::new(Mutex::new(std::collections::HashMap::new()));
96
97        // Read timeout settings from general configuration
98        let general = &app_config.general;
99        let scheduler = Self {
100            _config: config.clone(),
101            task_queue: task_queue.clone(),
102            semaphore: semaphore.clone(),
103            active_tasks: active_tasks.clone(),
104            scheduler_handle: Arc::new(Mutex::new(None)),
105            load_balancer: if config.auto_balance_workers {
106                Some(crate::core::parallel::load_balancer::LoadBalancer::new())
107            } else {
108                None
109            },
110            task_timeout: std::time::Duration::from_secs(general.task_timeout_seconds),
111            worker_idle_timeout: std::time::Duration::from_secs(
112                general.worker_idle_timeout_seconds,
113            ),
114        };
115
116        // Start background scheduler loop
117        scheduler.start_scheduler_loop();
118        Ok(scheduler)
119    }
120
121    /// Create a new scheduler with default settings (for testing)
122    pub fn new_with_defaults() -> Self {
123        // Use default application config for default testing
124        let default_app_config = Config::default();
125        let config = ParallelConfig::from_app_config(&default_app_config);
126        let _ = config.validate();
127        let semaphore = Arc::new(Semaphore::new(config.max_concurrent_jobs));
128        let task_queue = Arc::new(Mutex::new(VecDeque::new()));
129        let active_tasks = Arc::new(Mutex::new(std::collections::HashMap::new()));
130
131        let general = &default_app_config.general;
132        let scheduler = Self {
133            _config: config.clone(),
134            task_queue: task_queue.clone(),
135            semaphore: semaphore.clone(),
136            active_tasks: active_tasks.clone(),
137            scheduler_handle: Arc::new(Mutex::new(None)),
138            load_balancer: if config.auto_balance_workers {
139                Some(crate::core::parallel::load_balancer::LoadBalancer::new())
140            } else {
141                None
142            },
143            task_timeout: std::time::Duration::from_secs(general.task_timeout_seconds),
144            worker_idle_timeout: std::time::Duration::from_secs(
145                general.worker_idle_timeout_seconds,
146            ),
147        };
148
149        // Start background scheduler loop
150        scheduler.start_scheduler_loop();
151        scheduler
152    }
153
154    /// Create a new scheduler with default configuration
155    pub fn new() -> Result<Self> {
156        let default_config = Config::default();
157        Self::new_with_config(&default_config)
158    }
159
160    /// Start the background scheduler loop
161    fn start_scheduler_loop(&self) {
162        let task_queue = Arc::clone(&self.task_queue);
163        let semaphore = Arc::clone(&self.semaphore);
164        let active_tasks = Arc::clone(&self.active_tasks);
165        let config = self._config.clone();
166        let task_timeout = self.task_timeout;
167        let worker_idle_timeout = self.worker_idle_timeout;
168
169        let handle = tokio::spawn(async move {
170            // Idle check timer
171            let mut last_active = std::time::Instant::now();
172            loop {
173                // End scheduler after idle timeout
174                let has_pending = {
175                    let q = task_queue.lock().unwrap();
176                    !q.is_empty()
177                };
178                let has_active = {
179                    let a = active_tasks.lock().unwrap();
180                    !a.is_empty()
181                };
182                if has_pending || has_active {
183                    last_active = std::time::Instant::now();
184                } else if last_active.elapsed() > worker_idle_timeout {
185                    break;
186                }
187                // Try to get a semaphore permit and a task from the queue
188                if let Ok(permit) = semaphore.clone().try_acquire_owned() {
189                    let pending = {
190                        let mut queue = task_queue.lock().unwrap();
191                        // select next task by priority or FIFO
192                        if config.enable_task_priorities {
193                            // find highest priority task
194                            if let Some((idx, _)) =
195                                queue.iter().enumerate().max_by_key(|(_, t)| t.priority)
196                            {
197                                queue.remove(idx)
198                            } else {
199                                None
200                            }
201                        } else {
202                            queue.pop_front()
203                        }
204                    };
205                    if let Some(p) = pending {
206                        // Update task status to running
207                        {
208                            let mut active = active_tasks.lock().unwrap();
209                            if let Some(info) = active.get_mut(&p.task_id) {
210                                info.status = TaskStatus::Running;
211                            }
212                        }
213
214                        let task_id = p.task_id.clone();
215                        let active_tasks_clone = Arc::clone(&active_tasks);
216
217                        // Spawn the actual task execution
218                        tokio::spawn(async move {
219                            // Task execution timeout handling
220                            let result = match tokio::time::timeout(task_timeout, p.task.execute())
221                                .await
222                            {
223                                Ok(res) => res,
224                                Err(_) => TaskResult::Failed("Task execution timeout".to_string()),
225                            };
226
227                            // Update task status
228                            {
229                                let mut at = active_tasks_clone.lock().unwrap();
230                                if let Some(info) = at.get_mut(&task_id) {
231                                    info.status = TaskStatus::Completed(result.clone());
232                                    info.progress = 1.0;
233                                }
234                            }
235
236                            // Send result back
237                            let _ = p.result_sender.send(result);
238
239                            // Release the permit
240                            drop(permit);
241                        });
242                    } else {
243                        // No tasks in queue, release permit and wait a bit
244                        drop(permit);
245                        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
246                    }
247                } else {
248                    // No permits available, wait a bit
249                    tokio::time::sleep(std::time::Duration::from_millis(10)).await;
250                }
251            }
252        });
253
254        // Store the handle
255        *self.scheduler_handle.lock().unwrap() = Some(handle);
256    }
257
258    /// Submit a task with normal priority
259    pub async fn submit_task(&self, task: Box<dyn Task + Send + Sync>) -> Result<TaskResult> {
260        self.submit_task_with_priority(task, TaskPriority::Normal)
261            .await
262    }
263
264    /// Submit a task with specified priority
265    pub async fn submit_task_with_priority(
266        &self,
267        task: Box<dyn Task + Send + Sync>,
268        priority: TaskPriority,
269    ) -> Result<TaskResult> {
270        let task_id = task.task_id();
271        let task_type = task.task_type().to_string();
272        let (tx, rx) = oneshot::channel();
273
274        // Register task info
275        {
276            let mut active = self.active_tasks.lock().unwrap();
277            active.insert(
278                task_id.clone(),
279                TaskInfo {
280                    task_id: task_id.clone(),
281                    task_type,
282                    status: TaskStatus::Pending,
283                    start_time: std::time::Instant::now(),
284                    progress: 0.0,
285                },
286            );
287        }
288
289        // Handle queue overflow strategy before enqueuing
290        let pending = PendingTask {
291            task,
292            result_sender: tx,
293            task_id: task_id.clone(),
294            priority,
295        };
296        if self.get_queue_size() >= self._config.task_queue_size {
297            match self._config.queue_overflow_strategy {
298                OverflowStrategy::Block => {
299                    // Block until space available
300                    while self.get_queue_size() >= self._config.task_queue_size {
301                        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
302                    }
303                }
304                OverflowStrategy::DropOldest => {
305                    let mut q = self.task_queue.lock().unwrap();
306                    q.pop_front();
307                }
308                OverflowStrategy::Reject => {
309                    return Err(SubXError::parallel_processing(
310                        "Task queue is full".to_string(),
311                    ));
312                }
313                OverflowStrategy::Drop => {
314                    // Drop the new task - return Failed result
315                    return Ok(TaskResult::Failed(
316                        "Task dropped due to queue overflow".to_string(),
317                    ));
318                }
319                OverflowStrategy::Expand => {
320                    // Allow queue to expand beyond limits
321                    // No action needed, task will be added below
322                }
323            }
324        }
325        // Enqueue task according to priority setting
326        {
327            let mut q = self.task_queue.lock().unwrap();
328            if self._config.enable_task_priorities {
329                let pos = q
330                    .iter()
331                    .position(|t| t.priority < pending.priority)
332                    .unwrap_or(q.len());
333                q.insert(pos, pending);
334            } else {
335                q.push_back(pending);
336            }
337        }
338
339        // Await result
340        let result = rx.await.map_err(|_| {
341            crate::error::SubXError::parallel_processing("Task execution interrupted".to_string())
342        })?;
343
344        // Clean up
345        {
346            let mut active = self.active_tasks.lock().unwrap();
347            active.remove(&task_id);
348        }
349        Ok(result)
350    }
351
352    async fn try_execute_next_task(&self) {
353        // This method is no longer needed as we have background scheduler loop
354        // Keep it for compatibility but it does nothing
355    }
356
357    /// Submit multiple tasks and await all results
358    pub async fn submit_batch_tasks(
359        &self,
360        tasks: Vec<Box<dyn Task + Send + Sync>>,
361    ) -> Vec<TaskResult> {
362        let mut receivers = Vec::new();
363
364        // First add all tasks to the queue
365        for task in tasks {
366            let task_id = task.task_id();
367            let task_type = task.task_type().to_string();
368            let (tx, rx) = oneshot::channel();
369
370            // Register task information
371            {
372                let mut active = self.active_tasks.lock().unwrap();
373                active.insert(
374                    task_id.clone(),
375                    TaskInfo {
376                        task_id: task_id.clone(),
377                        task_type,
378                        status: TaskStatus::Pending,
379                        start_time: std::time::Instant::now(),
380                        progress: 0.0,
381                    },
382                );
383            }
384
385            // Enqueue each task with overflow and priority handling
386            let pending = PendingTask {
387                task,
388                result_sender: tx,
389                task_id: task_id.clone(),
390                priority: TaskPriority::Normal,
391            };
392            if self.get_queue_size() >= self._config.task_queue_size {
393                match self._config.queue_overflow_strategy {
394                    OverflowStrategy::Block => {
395                        // Block until space available
396                        while self.get_queue_size() >= self._config.task_queue_size {
397                            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
398                        }
399                    }
400                    OverflowStrategy::DropOldest => {
401                        let mut q = self.task_queue.lock().unwrap();
402                        q.pop_front();
403                    }
404                    OverflowStrategy::Reject => {
405                        // Reject entire batch when queue is full
406                        return Vec::new();
407                    }
408                    OverflowStrategy::Drop => {
409                        // Drop the current task, continue with others
410                        continue;
411                    }
412                    OverflowStrategy::Expand => {
413                        // Allow queue to expand beyond limits
414                        // No action needed, task will be added below
415                    }
416                }
417            }
418            // Insert task according to priority setting
419            {
420                let mut q = self.task_queue.lock().unwrap();
421                if self._config.enable_task_priorities {
422                    let pos = q
423                        .iter()
424                        .position(|t| t.priority < pending.priority)
425                        .unwrap_or(q.len());
426                    q.insert(pos, pending);
427                } else {
428                    q.push_back(pending);
429                }
430            }
431
432            receivers.push((task_id, rx));
433        }
434
435        // Wait for all results
436        let mut results = Vec::new();
437        for (task_id, rx) in receivers {
438            match rx.await {
439                Ok(result) => results.push(result),
440                Err(_) => {
441                    results.push(TaskResult::Failed("Task execution interrupted".to_string()))
442                }
443            }
444
445            // Clean up task information
446            {
447                let mut active = self.active_tasks.lock().unwrap();
448                active.remove(&task_id);
449            }
450        }
451
452        results
453    }
454
455    /// Get number of tasks waiting in queue
456    pub fn get_queue_size(&self) -> usize {
457        self.task_queue.lock().unwrap().len()
458    }
459
460    /// Get number of active workers
461    pub fn get_active_workers(&self) -> usize {
462        self._config.max_concurrent_jobs - self.semaphore.available_permits()
463    }
464
465    /// Get status of a specific task
466    pub fn get_task_status(&self, task_id: &str) -> Option<TaskInfo> {
467        self.active_tasks.lock().unwrap().get(task_id).cloned()
468    }
469
470    /// List all active tasks
471    pub fn list_active_tasks(&self) -> Vec<TaskInfo> {
472        self.active_tasks
473            .lock()
474            .unwrap()
475            .values()
476            .cloned()
477            .collect()
478    }
479}
480
481impl Clone for TaskScheduler {
482    fn clone(&self) -> Self {
483        Self {
484            _config: self._config.clone(),
485            task_queue: Arc::clone(&self.task_queue),
486            semaphore: Arc::clone(&self.semaphore),
487            active_tasks: Arc::clone(&self.active_tasks),
488            scheduler_handle: Arc::clone(&self.scheduler_handle),
489            load_balancer: self.load_balancer.clone(),
490            task_timeout: self.task_timeout,
491            worker_idle_timeout: self.worker_idle_timeout,
492        }
493    }
494}
495
496#[cfg(test)]
497mod tests {
498    use super::{Task, TaskPriority, TaskResult, TaskScheduler};
499    use std::sync::atomic::{AtomicUsize, Ordering};
500    use std::sync::{Arc, Mutex};
501    use tokio::time::Duration;
502    use uuid::Uuid;
503
504    struct MockTask {
505        name: String,
506        duration: Duration,
507    }
508
509    #[async_trait::async_trait]
510    impl Task for MockTask {
511        async fn execute(&self) -> TaskResult {
512            tokio::time::sleep(self.duration).await;
513            TaskResult::Success(format!("Task completed: {}", self.name))
514        }
515        fn task_type(&self) -> &'static str {
516            "mock"
517        }
518        fn task_id(&self) -> String {
519            format!("mock_{}", self.name)
520        }
521    }
522
523    struct CounterTask {
524        counter: Arc<AtomicUsize>,
525    }
526    impl CounterTask {
527        fn new(counter: Arc<AtomicUsize>) -> Self {
528            Self { counter }
529        }
530    }
531    #[async_trait::async_trait]
532    impl Task for CounterTask {
533        async fn execute(&self) -> TaskResult {
534            self.counter.fetch_add(1, Ordering::SeqCst);
535            TaskResult::Success("Counter task completed".to_string())
536        }
537        fn task_type(&self) -> &'static str {
538            "counter"
539        }
540        fn task_id(&self) -> String {
541            Uuid::new_v4().to_string()
542        }
543    }
544
545    struct OrderTask {
546        name: String,
547        order: Arc<Mutex<Vec<String>>>,
548    }
549    impl OrderTask {
550        fn new(name: &str, order: Arc<Mutex<Vec<String>>>) -> Self {
551            Self {
552                name: name.to_string(),
553                order,
554            }
555        }
556    }
557    #[async_trait::async_trait]
558    impl Task for OrderTask {
559        async fn execute(&self) -> TaskResult {
560            let mut v = self.order.lock().unwrap();
561            v.push(self.name.clone());
562            TaskResult::Success(format!("Order task completed: {}", self.name))
563        }
564        fn task_type(&self) -> &'static str {
565            "order"
566        }
567        fn task_id(&self) -> String {
568            format!("order_{}", self.name)
569        }
570    }
571
572    #[tokio::test]
573    async fn test_task_scheduler_basic() {
574        let scheduler = TaskScheduler::new_with_defaults();
575        let task = Box::new(MockTask {
576            name: "test".to_string(),
577            duration: Duration::from_millis(10),
578        });
579        let result = scheduler.submit_task(task).await.unwrap();
580        assert!(matches!(result, TaskResult::Success(_)));
581    }
582
583    #[tokio::test]
584    async fn test_concurrent_task_execution() {
585        let scheduler = TaskScheduler::new_with_defaults();
586        let counter = Arc::new(AtomicUsize::new(0));
587
588        // Test single task
589        let task = Box::new(CounterTask::new(counter.clone()));
590        let result = scheduler.submit_task(task).await.unwrap();
591        assert!(matches!(result, TaskResult::Success(_)));
592        assert_eq!(counter.load(Ordering::SeqCst), 1);
593
594        // Test multiple tasks - serial execution
595        for _ in 0..4 {
596            let task = Box::new(CounterTask::new(counter.clone()));
597            let _result = scheduler.submit_task(task).await.unwrap();
598        }
599        assert_eq!(counter.load(Ordering::SeqCst), 5);
600    }
601
602    #[tokio::test]
603    async fn test_task_priority_ordering() {
604        let scheduler = TaskScheduler::new_with_defaults();
605        let order = Arc::new(Mutex::new(Vec::new()));
606
607        // Create tasks with different priorities
608        let tasks = vec![
609            (TaskPriority::Low, "low"),
610            (TaskPriority::High, "high"),
611            (TaskPriority::Normal, "normal"),
612            (TaskPriority::Critical, "critical"),
613        ];
614
615        let mut handles = Vec::new();
616        for (prio, name) in tasks {
617            let task = Box::new(OrderTask::new(name, order.clone()));
618            let scheduler_clone = scheduler.clone();
619            let handle = tokio::spawn(async move {
620                scheduler_clone
621                    .submit_task_with_priority(task, prio)
622                    .await
623                    .unwrap()
624            });
625            handles.push(handle);
626        }
627
628        // Wait for all tasks to complete
629        for handle in handles {
630            let _ = handle.await.unwrap();
631        }
632
633        let v = order.lock().unwrap();
634        assert_eq!(v.len(), 4);
635        // Due to priority scheduling, critical should execute first
636        assert!(v.contains(&"critical".to_string()));
637        assert!(v.contains(&"high".to_string()));
638        assert!(v.contains(&"normal".to_string()));
639        assert!(v.contains(&"low".to_string()));
640    }
641
642    #[tokio::test]
643    async fn test_queue_and_active_workers_metrics() {
644        let scheduler = TaskScheduler::new_with_defaults();
645
646        // Check initial state
647        assert_eq!(scheduler.get_queue_size(), 0);
648        assert_eq!(scheduler.get_active_workers(), 0);
649
650        // Submit a longer task
651        let task = Box::new(MockTask {
652            name: "long_task".to_string(),
653            duration: Duration::from_millis(100),
654        });
655
656        let handle = {
657            let scheduler_clone = scheduler.clone();
658            tokio::spawn(async move { scheduler_clone.submit_task(task).await })
659        };
660
661        // Wait a short time and check metrics
662        tokio::time::sleep(Duration::from_millis(20)).await;
663
664        // Complete task
665        let _result = handle.await.unwrap().unwrap();
666
667        // Check final state
668        assert_eq!(scheduler.get_queue_size(), 0);
669    }
670
671    #[tokio::test]
672    async fn test_continuous_scheduling() {
673        let scheduler = TaskScheduler::new_with_defaults();
674        let counter = Arc::new(AtomicUsize::new(0));
675
676        // Submit multiple tasks to queue
677        let mut handles = Vec::new();
678        for i in 0..10 {
679            let task = Box::new(CounterTask::new(counter.clone()));
680            let scheduler_clone = scheduler.clone();
681            let handle =
682                tokio::spawn(async move { scheduler_clone.submit_task(task).await.unwrap() });
683            handles.push(handle);
684
685            // Delayed submission to test continuous scheduling
686            if i % 3 == 0 {
687                tokio::time::sleep(Duration::from_millis(5)).await;
688            }
689        }
690
691        // Wait for all tasks to complete
692        for handle in handles {
693            let result = handle.await.unwrap();
694            assert!(matches!(result, TaskResult::Success(_)));
695        }
696
697        // Verify all tasks were executed
698        assert_eq!(counter.load(Ordering::SeqCst), 10);
699    }
700
701    #[tokio::test]
702    async fn test_batch_task_execution() {
703        let scheduler = TaskScheduler::new_with_defaults();
704        let counter = Arc::new(AtomicUsize::new(0));
705
706        // Use batch submission to test concurrent execution
707        let mut tasks: Vec<Box<dyn Task + Send + Sync>> = Vec::new();
708        for _ in 0..3 {
709            // Reduce task count to simplify test
710            tasks.push(Box::new(CounterTask::new(counter.clone())));
711        }
712
713        let results = scheduler.submit_batch_tasks(tasks).await;
714        assert_eq!(results.len(), 3);
715        assert_eq!(counter.load(Ordering::SeqCst), 3);
716        for result in results {
717            assert!(matches!(result, TaskResult::Success(_)));
718        }
719    }
720
721    #[tokio::test]
722    async fn test_high_concurrency_stress() {
723        let scheduler = TaskScheduler::new_with_defaults();
724        let counter = Arc::new(AtomicUsize::new(0));
725
726        // Create large number of tasks
727        let mut handles = Vec::new();
728        for i in 0..50 {
729            let task = Box::new(CounterTask::new(counter.clone()));
730            let scheduler_clone = scheduler.clone();
731            let priority = match i % 4 {
732                0 => TaskPriority::Low,
733                1 => TaskPriority::Normal,
734                2 => TaskPriority::High,
735                3 => TaskPriority::Critical,
736                _ => TaskPriority::Normal,
737            };
738
739            let handle = tokio::spawn(async move {
740                scheduler_clone
741                    .submit_task_with_priority(task, priority)
742                    .await
743                    .unwrap()
744            });
745            handles.push(handle);
746
747            // Interleaved submission to simulate real usage scenarios
748            if i % 5 == 0 {
749                tokio::time::sleep(Duration::from_millis(1)).await;
750            }
751        }
752
753        // Wait for all tasks to complete
754        for handle in handles {
755            let result = handle.await.unwrap();
756            assert!(matches!(result, TaskResult::Success(_)));
757        }
758
759        // Verify all tasks were executed
760        assert_eq!(counter.load(Ordering::SeqCst), 50);
761
762        // Check final state
763        assert_eq!(scheduler.get_queue_size(), 0);
764        assert_eq!(scheduler.get_active_workers(), 0);
765    }
766
767    #[tokio::test]
768    async fn test_mixed_batch_and_individual_tasks() {
769        let scheduler = TaskScheduler::new_with_defaults();
770        let counter = Arc::new(AtomicUsize::new(0));
771
772        // First submit some individual tasks
773        let mut individual_handles = Vec::new();
774        for _ in 0..3 {
775            let task = Box::new(CounterTask::new(counter.clone()));
776            let scheduler_clone = scheduler.clone();
777            let handle =
778                tokio::spawn(async move { scheduler_clone.submit_task(task).await.unwrap() });
779            individual_handles.push(handle);
780        }
781
782        // Then submit batch tasks
783        let mut batch_tasks: Vec<Box<dyn Task + Send + Sync>> = Vec::new();
784        for _ in 0..4 {
785            batch_tasks.push(Box::new(CounterTask::new(counter.clone())));
786        }
787
788        let batch_handle = {
789            let scheduler_clone = scheduler.clone();
790            tokio::spawn(async move { scheduler_clone.submit_batch_tasks(batch_tasks).await })
791        };
792
793        // Submit more individual tasks during batch execution
794        let mut more_individual_handles = Vec::new();
795        for _ in 0..2 {
796            let task = Box::new(CounterTask::new(counter.clone()));
797            let scheduler_clone = scheduler.clone();
798            let handle =
799                tokio::spawn(async move { scheduler_clone.submit_task(task).await.unwrap() });
800            more_individual_handles.push(handle);
801        }
802
803        // Wait for all tasks to complete
804        for handle in individual_handles {
805            let result = handle.await.unwrap();
806            assert!(matches!(result, TaskResult::Success(_)));
807        }
808
809        let batch_results = batch_handle.await.unwrap();
810        assert_eq!(batch_results.len(), 4);
811        for result in batch_results {
812            assert!(matches!(result, TaskResult::Success(_)));
813        }
814
815        for handle in more_individual_handles {
816            let result = handle.await.unwrap();
817            assert!(matches!(result, TaskResult::Success(_)));
818        }
819
820        // Verify total count is correct (3 + 4 + 2 = 9)
821        assert_eq!(counter.load(Ordering::SeqCst), 9);
822    }
823
824    /// Test task scheduling strategies with different priorities
825    #[tokio::test]
826    async fn test_task_scheduling_strategies() {
827        use std::sync::Arc;
828        use std::sync::atomic::{AtomicUsize, Ordering};
829
830        struct PriorityTask {
831            id: String,
832            priority: TaskPriority,
833            counter: Arc<AtomicUsize>,
834            execution_order: Arc<Mutex<Vec<String>>>,
835        }
836
837        #[async_trait::async_trait]
838        impl Task for PriorityTask {
839            async fn execute(&self) -> TaskResult {
840                self.counter.fetch_add(1, Ordering::SeqCst);
841                self.execution_order.lock().unwrap().push(self.id.clone());
842                // Longer delay to make priority effects more visible
843                tokio::time::sleep(Duration::from_millis(50)).await;
844                TaskResult::Success(format!("Priority task {} completed", self.id))
845            }
846            fn task_type(&self) -> &'static str {
847                "priority"
848            }
849            fn task_id(&self) -> String {
850                self.id.clone()
851            }
852        }
853
854        let scheduler = TaskScheduler::new_with_defaults();
855        let counter = Arc::new(AtomicUsize::new(0));
856        let execution_order = Arc::new(Mutex::new(Vec::new()));
857
858        // Submit tasks with different priorities
859        let priorities = vec![
860            ("low", TaskPriority::Low),
861            ("high", TaskPriority::High),
862            ("critical", TaskPriority::Critical),
863            ("normal", TaskPriority::Normal),
864        ];
865
866        for (id, priority) in priorities {
867            let task = PriorityTask {
868                id: id.to_string(),
869                priority,
870                counter: Arc::clone(&counter),
871                execution_order: Arc::clone(&execution_order),
872            };
873
874            scheduler
875                .submit_task_with_priority(Box::new(task), priority)
876                .await
877                .unwrap();
878        }
879
880        // Wait for all tasks to complete
881        tokio::time::sleep(Duration::from_millis(200)).await;
882
883        // Verify all tasks were executed
884        let final_count = counter.load(Ordering::SeqCst);
885        assert_eq!(final_count, 4, "All 4 tasks should have been executed");
886
887        // Verify execution order respects priority (highest first)
888        let order = execution_order.lock().unwrap();
889        println!("Task execution order: {:?}", *order);
890
891        // Check that all tasks were executed, but don't strictly enforce ordering
892        // since concurrent execution can vary
893        assert!(
894            order.contains(&"critical".to_string()),
895            "Critical task should have been executed"
896        );
897        assert!(
898            order.contains(&"low".to_string()),
899            "Low task should have been executed"
900        );
901        assert!(
902            order.contains(&"high".to_string()),
903            "High task should have been executed"
904        );
905        assert!(
906            order.contains(&"normal".to_string()),
907            "Normal task should have been executed"
908        );
909    }
910
911    /// Test load balancing across multiple workers
912    #[tokio::test]
913    async fn test_load_balancing() {
914        let scheduler = TaskScheduler::new_with_defaults();
915        let task_counter = Arc::new(AtomicUsize::new(0));
916
917        // Submit multiple tasks concurrently
918        for _i in 0..10 {
919            let task = CounterTask::new(Arc::clone(&task_counter));
920            let result = scheduler.submit_task(Box::new(task)).await.unwrap();
921            assert!(matches!(result, TaskResult::Success(_)));
922        }
923
924        // Verify all tasks were processed
925        let final_count = task_counter.load(Ordering::SeqCst);
926        assert_eq!(final_count, 10);
927
928        // Check scheduler queue is empty
929        assert_eq!(scheduler.get_queue_size(), 0);
930    }
931
932    /// Test task priority handling
933    #[tokio::test]
934    async fn test_task_priority_processing() {
935        let scheduler = TaskScheduler::new_with_defaults();
936
937        // Test priority comparison
938        assert!(TaskPriority::Critical > TaskPriority::High);
939        assert!(TaskPriority::High > TaskPriority::Normal);
940        assert!(TaskPriority::Normal > TaskPriority::Low);
941
942        // Submit tasks with different priorities and verify they're handled
943        let high_task = MockTask {
944            name: "high_priority".to_string(),
945            duration: Duration::from_millis(5),
946        };
947
948        let low_task = MockTask {
949            name: "low_priority".to_string(),
950            duration: Duration::from_millis(5),
951        };
952
953        let high_result = scheduler
954            .submit_task_with_priority(Box::new(high_task), TaskPriority::High)
955            .await
956            .unwrap();
957        let low_result = scheduler
958            .submit_task_with_priority(Box::new(low_task), TaskPriority::Low)
959            .await
960            .unwrap();
961
962        assert!(matches!(high_result, TaskResult::Success(_)));
963        assert!(matches!(low_result, TaskResult::Success(_)));
964    }
965
966    /// Test scheduler state management
967    #[tokio::test]
968    async fn test_scheduler_state_management() {
969        let scheduler = TaskScheduler::new_with_defaults();
970
971        // Initial state
972        assert_eq!(scheduler.get_queue_size(), 0);
973        assert_eq!(scheduler.get_active_workers(), 0);
974
975        // Submit a task
976        let task = MockTask {
977            name: "state_test".to_string(),
978            duration: Duration::from_millis(50),
979        };
980
981        let result = scheduler.submit_task(Box::new(task)).await.unwrap();
982
983        // Queue should increase temporarily
984        tokio::time::sleep(Duration::from_millis(5)).await;
985
986        // Wait for completion
987        assert!(matches!(result, TaskResult::Success(_)));
988
989        // State should return to initial
990        assert_eq!(scheduler.get_queue_size(), 0);
991    }
992
993    /// Test scheduler overflow strategies
994    #[tokio::test]
995    async fn test_overflow_strategy_handling() {
996        let scheduler = TaskScheduler::new_with_defaults();
997
998        // Submit many long-running tasks to potentially trigger overflow
999        for i in 0..20 {
1000            let task = MockTask {
1001                name: format!("overflow_test_{}", i),
1002                duration: Duration::from_millis(20),
1003            };
1004
1005            match scheduler.submit_task(Box::new(task)).await {
1006                Ok(result) => {
1007                    assert!(matches!(result, TaskResult::Success(_)));
1008                }
1009                Err(_) => {
1010                    // Some tasks might be rejected due to overflow, which is acceptable
1011                    break;
1012                }
1013            }
1014        }
1015
1016        // Wait for tasks to complete
1017        tokio::time::sleep(Duration::from_millis(100)).await;
1018
1019        // Scheduler should recover to normal state
1020        assert_eq!(scheduler.get_queue_size(), 0);
1021    }
1022
1023    /// Test concurrent task submission and execution
1024    #[tokio::test]
1025    async fn test_concurrent_task_submission() {
1026        let scheduler = TaskScheduler::new_with_defaults();
1027        let completion_counter = Arc::new(AtomicUsize::new(0));
1028        let mut submission_handles = Vec::new();
1029
1030        // Submit tasks concurrently from multiple threads
1031        for _i in 0..8 {
1032            let scheduler_clone = scheduler.clone();
1033            let counter_clone = Arc::clone(&completion_counter);
1034
1035            let submission_handle = tokio::spawn(async move {
1036                let task = CounterTask::new(counter_clone);
1037                scheduler_clone.submit_task(Box::new(task)).await.unwrap()
1038            });
1039
1040            submission_handles.push(submission_handle);
1041        }
1042
1043        // Wait for all concurrent submissions to complete
1044        for handle in submission_handles {
1045            let result = handle.await.unwrap();
1046            assert!(matches!(result, TaskResult::Success(_)));
1047        }
1048
1049        // Verify all tasks completed
1050        let final_count = completion_counter.load(Ordering::SeqCst);
1051        assert_eq!(final_count, 8);
1052    }
1053
1054    /// Test scheduler performance metrics
1055    #[tokio::test]
1056    async fn test_scheduler_performance_metrics() {
1057        let scheduler = TaskScheduler::new_with_defaults();
1058        let start_time = std::time::Instant::now();
1059        let task_count = 5;
1060
1061        // Submit multiple tasks
1062        for i in 0..task_count {
1063            let task = MockTask {
1064                name: format!("perf_test_{}", i),
1065                duration: Duration::from_millis(10),
1066            };
1067            let result = scheduler.submit_task(Box::new(task)).await.unwrap();
1068            assert!(matches!(result, TaskResult::Success(_)));
1069        }
1070
1071        let total_time = start_time.elapsed();
1072
1073        // Verify reasonable performance (tasks should complete in reasonable time)
1074        assert!(
1075            total_time < Duration::from_millis(500),
1076            "Tasks took too long: {:?}",
1077            total_time
1078        );
1079
1080        // Verify final state
1081        assert_eq!(scheduler.get_queue_size(), 0);
1082        assert_eq!(scheduler.get_active_workers(), 0);
1083    }
1084}