1use super::{Task, TaskResult, TaskStatus};
3use crate::Result;
4use crate::config::{Config, OverflowStrategy};
5use crate::core::parallel::config::ParallelConfig;
6use crate::error::SubXError;
7use std::collections::VecDeque;
8use std::sync::{Arc, Mutex};
9use tokio::sync::{Semaphore, oneshot};
10
11struct PendingTask {
12 task: Box<dyn Task + Send + Sync>,
13 result_sender: oneshot::Sender<TaskResult>,
14 task_id: String,
15 priority: TaskPriority,
16}
17
18impl PartialEq for PendingTask {
19 fn eq(&self, other: &Self) -> bool {
20 self.priority == other.priority
21 }
22}
23
24impl Eq for PendingTask {}
25
26impl PartialOrd for PendingTask {
27 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
28 Some(self.cmp(other))
29 }
30}
31
32impl Ord for PendingTask {
33 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
34 self.priority.cmp(&other.priority)
35 }
36}
37
38#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
43pub enum TaskPriority {
44 Low = 0,
46 Normal = 1,
48 High = 2,
50 Critical = 3,
52}
53
54#[derive(Debug, Clone)]
59pub struct TaskInfo {
60 pub task_id: String,
62 pub task_type: String,
64 pub status: TaskStatus,
66 pub start_time: std::time::Instant,
68 pub progress: f32,
70}
71
72pub struct TaskScheduler {
74 _config: ParallelConfig,
76 load_balancer: Option<crate::core::parallel::load_balancer::LoadBalancer>,
78 task_timeout: std::time::Duration,
80 worker_idle_timeout: std::time::Duration,
82 task_queue: Arc<Mutex<VecDeque<PendingTask>>>,
83 semaphore: Arc<Semaphore>,
84 active_tasks: Arc<Mutex<std::collections::HashMap<String, TaskInfo>>>,
85 scheduler_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
86}
87
88impl TaskScheduler {
89 pub fn new_with_config(app_config: &Config) -> Result<Self> {
91 let config = ParallelConfig::from_app_config(app_config);
92 config.validate()?;
93 let semaphore = Arc::new(Semaphore::new(config.max_concurrent_jobs));
94 let task_queue = Arc::new(Mutex::new(VecDeque::new()));
95 let active_tasks = Arc::new(Mutex::new(std::collections::HashMap::new()));
96
97 let general = &app_config.general;
99 let scheduler = Self {
100 _config: config.clone(),
101 task_queue: task_queue.clone(),
102 semaphore: semaphore.clone(),
103 active_tasks: active_tasks.clone(),
104 scheduler_handle: Arc::new(Mutex::new(None)),
105 load_balancer: if config.auto_balance_workers {
106 Some(crate::core::parallel::load_balancer::LoadBalancer::new())
107 } else {
108 None
109 },
110 task_timeout: std::time::Duration::from_secs(general.task_timeout_seconds),
111 worker_idle_timeout: std::time::Duration::from_secs(
112 general.worker_idle_timeout_seconds,
113 ),
114 };
115
116 scheduler.start_scheduler_loop();
118 Ok(scheduler)
119 }
120
121 pub fn new_with_defaults() -> Self {
123 let default_app_config = Config::default();
125 let config = ParallelConfig::from_app_config(&default_app_config);
126 let _ = config.validate();
127 let semaphore = Arc::new(Semaphore::new(config.max_concurrent_jobs));
128 let task_queue = Arc::new(Mutex::new(VecDeque::new()));
129 let active_tasks = Arc::new(Mutex::new(std::collections::HashMap::new()));
130
131 let general = &default_app_config.general;
132 let scheduler = Self {
133 _config: config.clone(),
134 task_queue: task_queue.clone(),
135 semaphore: semaphore.clone(),
136 active_tasks: active_tasks.clone(),
137 scheduler_handle: Arc::new(Mutex::new(None)),
138 load_balancer: if config.auto_balance_workers {
139 Some(crate::core::parallel::load_balancer::LoadBalancer::new())
140 } else {
141 None
142 },
143 task_timeout: std::time::Duration::from_secs(general.task_timeout_seconds),
144 worker_idle_timeout: std::time::Duration::from_secs(
145 general.worker_idle_timeout_seconds,
146 ),
147 };
148
149 scheduler.start_scheduler_loop();
151 scheduler
152 }
153
154 pub fn new() -> Result<Self> {
156 let default_config = Config::default();
157 Self::new_with_config(&default_config)
158 }
159
160 fn start_scheduler_loop(&self) {
162 let task_queue = Arc::clone(&self.task_queue);
163 let semaphore = Arc::clone(&self.semaphore);
164 let active_tasks = Arc::clone(&self.active_tasks);
165 let config = self._config.clone();
166 let task_timeout = self.task_timeout;
167 let worker_idle_timeout = self.worker_idle_timeout;
168
169 let handle = tokio::spawn(async move {
170 let mut last_active = std::time::Instant::now();
172 loop {
173 let has_pending = {
175 let q = task_queue.lock().unwrap();
176 !q.is_empty()
177 };
178 let has_active = {
179 let a = active_tasks.lock().unwrap();
180 !a.is_empty()
181 };
182 if has_pending || has_active {
183 last_active = std::time::Instant::now();
184 } else if last_active.elapsed() > worker_idle_timeout {
185 break;
186 }
187 if let Ok(permit) = semaphore.clone().try_acquire_owned() {
189 let pending = {
190 let mut queue = task_queue.lock().unwrap();
191 if config.enable_task_priorities {
193 if let Some((idx, _)) =
195 queue.iter().enumerate().max_by_key(|(_, t)| t.priority)
196 {
197 queue.remove(idx)
198 } else {
199 None
200 }
201 } else {
202 queue.pop_front()
203 }
204 };
205 if let Some(p) = pending {
206 {
208 let mut active = active_tasks.lock().unwrap();
209 if let Some(info) = active.get_mut(&p.task_id) {
210 info.status = TaskStatus::Running;
211 }
212 }
213
214 let task_id = p.task_id.clone();
215 let active_tasks_clone = Arc::clone(&active_tasks);
216
217 tokio::spawn(async move {
219 let result = match tokio::time::timeout(task_timeout, p.task.execute())
221 .await
222 {
223 Ok(res) => res,
224 Err(_) => TaskResult::Failed("Task execution timeout".to_string()),
225 };
226
227 {
229 let mut at = active_tasks_clone.lock().unwrap();
230 if let Some(info) = at.get_mut(&task_id) {
231 info.status = TaskStatus::Completed(result.clone());
232 info.progress = 1.0;
233 }
234 }
235
236 let _ = p.result_sender.send(result);
238
239 drop(permit);
241 });
242 } else {
243 drop(permit);
245 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
246 }
247 } else {
248 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
250 }
251 }
252 });
253
254 *self.scheduler_handle.lock().unwrap() = Some(handle);
256 }
257
258 pub async fn submit_task(&self, task: Box<dyn Task + Send + Sync>) -> Result<TaskResult> {
260 self.submit_task_with_priority(task, TaskPriority::Normal)
261 .await
262 }
263
264 pub async fn submit_task_with_priority(
266 &self,
267 task: Box<dyn Task + Send + Sync>,
268 priority: TaskPriority,
269 ) -> Result<TaskResult> {
270 let task_id = task.task_id();
271 let task_type = task.task_type().to_string();
272 let (tx, rx) = oneshot::channel();
273
274 {
276 let mut active = self.active_tasks.lock().unwrap();
277 active.insert(
278 task_id.clone(),
279 TaskInfo {
280 task_id: task_id.clone(),
281 task_type,
282 status: TaskStatus::Pending,
283 start_time: std::time::Instant::now(),
284 progress: 0.0,
285 },
286 );
287 }
288
289 let pending = PendingTask {
291 task,
292 result_sender: tx,
293 task_id: task_id.clone(),
294 priority,
295 };
296 if self.get_queue_size() >= self._config.task_queue_size {
297 match self._config.queue_overflow_strategy {
298 OverflowStrategy::Block => {
299 while self.get_queue_size() >= self._config.task_queue_size {
301 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
302 }
303 }
304 OverflowStrategy::DropOldest => {
305 let mut q = self.task_queue.lock().unwrap();
306 q.pop_front();
307 }
308 OverflowStrategy::Reject => {
309 return Err(SubXError::parallel_processing(
310 "Task queue is full".to_string(),
311 ));
312 }
313 OverflowStrategy::Drop => {
314 return Ok(TaskResult::Failed(
316 "Task dropped due to queue overflow".to_string(),
317 ));
318 }
319 OverflowStrategy::Expand => {
320 }
323 }
324 }
325 {
327 let mut q = self.task_queue.lock().unwrap();
328 if self._config.enable_task_priorities {
329 let pos = q
330 .iter()
331 .position(|t| t.priority < pending.priority)
332 .unwrap_or(q.len());
333 q.insert(pos, pending);
334 } else {
335 q.push_back(pending);
336 }
337 }
338
339 let result = rx.await.map_err(|_| {
341 crate::error::SubXError::parallel_processing("Task execution interrupted".to_string())
342 })?;
343
344 {
346 let mut active = self.active_tasks.lock().unwrap();
347 active.remove(&task_id);
348 }
349 Ok(result)
350 }
351
352 async fn try_execute_next_task(&self) {
353 }
356
357 pub async fn submit_batch_tasks(
359 &self,
360 tasks: Vec<Box<dyn Task + Send + Sync>>,
361 ) -> Vec<TaskResult> {
362 let mut receivers = Vec::new();
363
364 for task in tasks {
366 let task_id = task.task_id();
367 let task_type = task.task_type().to_string();
368 let (tx, rx) = oneshot::channel();
369
370 {
372 let mut active = self.active_tasks.lock().unwrap();
373 active.insert(
374 task_id.clone(),
375 TaskInfo {
376 task_id: task_id.clone(),
377 task_type,
378 status: TaskStatus::Pending,
379 start_time: std::time::Instant::now(),
380 progress: 0.0,
381 },
382 );
383 }
384
385 let pending = PendingTask {
387 task,
388 result_sender: tx,
389 task_id: task_id.clone(),
390 priority: TaskPriority::Normal,
391 };
392 if self.get_queue_size() >= self._config.task_queue_size {
393 match self._config.queue_overflow_strategy {
394 OverflowStrategy::Block => {
395 while self.get_queue_size() >= self._config.task_queue_size {
397 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
398 }
399 }
400 OverflowStrategy::DropOldest => {
401 let mut q = self.task_queue.lock().unwrap();
402 q.pop_front();
403 }
404 OverflowStrategy::Reject => {
405 return Vec::new();
407 }
408 OverflowStrategy::Drop => {
409 continue;
411 }
412 OverflowStrategy::Expand => {
413 }
416 }
417 }
418 {
420 let mut q = self.task_queue.lock().unwrap();
421 if self._config.enable_task_priorities {
422 let pos = q
423 .iter()
424 .position(|t| t.priority < pending.priority)
425 .unwrap_or(q.len());
426 q.insert(pos, pending);
427 } else {
428 q.push_back(pending);
429 }
430 }
431
432 receivers.push((task_id, rx));
433 }
434
435 let mut results = Vec::new();
437 for (task_id, rx) in receivers {
438 match rx.await {
439 Ok(result) => results.push(result),
440 Err(_) => {
441 results.push(TaskResult::Failed("Task execution interrupted".to_string()))
442 }
443 }
444
445 {
447 let mut active = self.active_tasks.lock().unwrap();
448 active.remove(&task_id);
449 }
450 }
451
452 results
453 }
454
455 pub fn get_queue_size(&self) -> usize {
457 self.task_queue.lock().unwrap().len()
458 }
459
460 pub fn get_active_workers(&self) -> usize {
462 self._config.max_concurrent_jobs - self.semaphore.available_permits()
463 }
464
465 pub fn get_task_status(&self, task_id: &str) -> Option<TaskInfo> {
467 self.active_tasks.lock().unwrap().get(task_id).cloned()
468 }
469
470 pub fn list_active_tasks(&self) -> Vec<TaskInfo> {
472 self.active_tasks
473 .lock()
474 .unwrap()
475 .values()
476 .cloned()
477 .collect()
478 }
479}
480
481impl Clone for TaskScheduler {
482 fn clone(&self) -> Self {
483 Self {
484 _config: self._config.clone(),
485 task_queue: Arc::clone(&self.task_queue),
486 semaphore: Arc::clone(&self.semaphore),
487 active_tasks: Arc::clone(&self.active_tasks),
488 scheduler_handle: Arc::clone(&self.scheduler_handle),
489 load_balancer: self.load_balancer.clone(),
490 task_timeout: self.task_timeout,
491 worker_idle_timeout: self.worker_idle_timeout,
492 }
493 }
494}
495
496#[cfg(test)]
497mod tests {
498 use super::{Task, TaskPriority, TaskResult, TaskScheduler};
499 use std::sync::atomic::{AtomicUsize, Ordering};
500 use std::sync::{Arc, Mutex};
501 use tokio::time::Duration;
502 use uuid::Uuid;
503
504 struct MockTask {
505 name: String,
506 duration: Duration,
507 }
508
509 #[async_trait::async_trait]
510 impl Task for MockTask {
511 async fn execute(&self) -> TaskResult {
512 tokio::time::sleep(self.duration).await;
513 TaskResult::Success(format!("Task completed: {}", self.name))
514 }
515 fn task_type(&self) -> &'static str {
516 "mock"
517 }
518 fn task_id(&self) -> String {
519 format!("mock_{}", self.name)
520 }
521 }
522
523 struct CounterTask {
524 counter: Arc<AtomicUsize>,
525 }
526 impl CounterTask {
527 fn new(counter: Arc<AtomicUsize>) -> Self {
528 Self { counter }
529 }
530 }
531 #[async_trait::async_trait]
532 impl Task for CounterTask {
533 async fn execute(&self) -> TaskResult {
534 self.counter.fetch_add(1, Ordering::SeqCst);
535 TaskResult::Success("Counter task completed".to_string())
536 }
537 fn task_type(&self) -> &'static str {
538 "counter"
539 }
540 fn task_id(&self) -> String {
541 Uuid::new_v4().to_string()
542 }
543 }
544
545 struct OrderTask {
546 name: String,
547 order: Arc<Mutex<Vec<String>>>,
548 }
549 impl OrderTask {
550 fn new(name: &str, order: Arc<Mutex<Vec<String>>>) -> Self {
551 Self {
552 name: name.to_string(),
553 order,
554 }
555 }
556 }
557 #[async_trait::async_trait]
558 impl Task for OrderTask {
559 async fn execute(&self) -> TaskResult {
560 let mut v = self.order.lock().unwrap();
561 v.push(self.name.clone());
562 TaskResult::Success(format!("Order task completed: {}", self.name))
563 }
564 fn task_type(&self) -> &'static str {
565 "order"
566 }
567 fn task_id(&self) -> String {
568 format!("order_{}", self.name)
569 }
570 }
571
572 #[tokio::test]
573 async fn test_task_scheduler_basic() {
574 let scheduler = TaskScheduler::new_with_defaults();
575 let task = Box::new(MockTask {
576 name: "test".to_string(),
577 duration: Duration::from_millis(10),
578 });
579 let result = scheduler.submit_task(task).await.unwrap();
580 assert!(matches!(result, TaskResult::Success(_)));
581 }
582
583 #[tokio::test]
584 async fn test_concurrent_task_execution() {
585 let scheduler = TaskScheduler::new_with_defaults();
586 let counter = Arc::new(AtomicUsize::new(0));
587
588 let task = Box::new(CounterTask::new(counter.clone()));
590 let result = scheduler.submit_task(task).await.unwrap();
591 assert!(matches!(result, TaskResult::Success(_)));
592 assert_eq!(counter.load(Ordering::SeqCst), 1);
593
594 for _ in 0..4 {
596 let task = Box::new(CounterTask::new(counter.clone()));
597 let _result = scheduler.submit_task(task).await.unwrap();
598 }
599 assert_eq!(counter.load(Ordering::SeqCst), 5);
600 }
601
602 #[tokio::test]
603 async fn test_task_priority_ordering() {
604 let scheduler = TaskScheduler::new_with_defaults();
605 let order = Arc::new(Mutex::new(Vec::new()));
606
607 let tasks = vec![
609 (TaskPriority::Low, "low"),
610 (TaskPriority::High, "high"),
611 (TaskPriority::Normal, "normal"),
612 (TaskPriority::Critical, "critical"),
613 ];
614
615 let mut handles = Vec::new();
616 for (prio, name) in tasks {
617 let task = Box::new(OrderTask::new(name, order.clone()));
618 let scheduler_clone = scheduler.clone();
619 let handle = tokio::spawn(async move {
620 scheduler_clone
621 .submit_task_with_priority(task, prio)
622 .await
623 .unwrap()
624 });
625 handles.push(handle);
626 }
627
628 for handle in handles {
630 let _ = handle.await.unwrap();
631 }
632
633 let v = order.lock().unwrap();
634 assert_eq!(v.len(), 4);
635 assert!(v.contains(&"critical".to_string()));
637 assert!(v.contains(&"high".to_string()));
638 assert!(v.contains(&"normal".to_string()));
639 assert!(v.contains(&"low".to_string()));
640 }
641
642 #[tokio::test]
643 async fn test_queue_and_active_workers_metrics() {
644 let scheduler = TaskScheduler::new_with_defaults();
645
646 assert_eq!(scheduler.get_queue_size(), 0);
648 assert_eq!(scheduler.get_active_workers(), 0);
649
650 let task = Box::new(MockTask {
652 name: "long_task".to_string(),
653 duration: Duration::from_millis(100),
654 });
655
656 let handle = {
657 let scheduler_clone = scheduler.clone();
658 tokio::spawn(async move { scheduler_clone.submit_task(task).await })
659 };
660
661 tokio::time::sleep(Duration::from_millis(20)).await;
663
664 let _result = handle.await.unwrap().unwrap();
666
667 assert_eq!(scheduler.get_queue_size(), 0);
669 }
670
671 #[tokio::test]
672 async fn test_continuous_scheduling() {
673 let scheduler = TaskScheduler::new_with_defaults();
674 let counter = Arc::new(AtomicUsize::new(0));
675
676 let mut handles = Vec::new();
678 for i in 0..10 {
679 let task = Box::new(CounterTask::new(counter.clone()));
680 let scheduler_clone = scheduler.clone();
681 let handle =
682 tokio::spawn(async move { scheduler_clone.submit_task(task).await.unwrap() });
683 handles.push(handle);
684
685 if i % 3 == 0 {
687 tokio::time::sleep(Duration::from_millis(5)).await;
688 }
689 }
690
691 for handle in handles {
693 let result = handle.await.unwrap();
694 assert!(matches!(result, TaskResult::Success(_)));
695 }
696
697 assert_eq!(counter.load(Ordering::SeqCst), 10);
699 }
700
701 #[tokio::test]
702 async fn test_batch_task_execution() {
703 let scheduler = TaskScheduler::new_with_defaults();
704 let counter = Arc::new(AtomicUsize::new(0));
705
706 let mut tasks: Vec<Box<dyn Task + Send + Sync>> = Vec::new();
708 for _ in 0..3 {
709 tasks.push(Box::new(CounterTask::new(counter.clone())));
711 }
712
713 let results = scheduler.submit_batch_tasks(tasks).await;
714 assert_eq!(results.len(), 3);
715 assert_eq!(counter.load(Ordering::SeqCst), 3);
716 for result in results {
717 assert!(matches!(result, TaskResult::Success(_)));
718 }
719 }
720
721 #[tokio::test]
722 async fn test_high_concurrency_stress() {
723 let scheduler = TaskScheduler::new_with_defaults();
724 let counter = Arc::new(AtomicUsize::new(0));
725
726 let mut handles = Vec::new();
728 for i in 0..50 {
729 let task = Box::new(CounterTask::new(counter.clone()));
730 let scheduler_clone = scheduler.clone();
731 let priority = match i % 4 {
732 0 => TaskPriority::Low,
733 1 => TaskPriority::Normal,
734 2 => TaskPriority::High,
735 3 => TaskPriority::Critical,
736 _ => TaskPriority::Normal,
737 };
738
739 let handle = tokio::spawn(async move {
740 scheduler_clone
741 .submit_task_with_priority(task, priority)
742 .await
743 .unwrap()
744 });
745 handles.push(handle);
746
747 if i % 5 == 0 {
749 tokio::time::sleep(Duration::from_millis(1)).await;
750 }
751 }
752
753 for handle in handles {
755 let result = handle.await.unwrap();
756 assert!(matches!(result, TaskResult::Success(_)));
757 }
758
759 assert_eq!(counter.load(Ordering::SeqCst), 50);
761
762 assert_eq!(scheduler.get_queue_size(), 0);
764 assert_eq!(scheduler.get_active_workers(), 0);
765 }
766
767 #[tokio::test]
768 async fn test_mixed_batch_and_individual_tasks() {
769 let scheduler = TaskScheduler::new_with_defaults();
770 let counter = Arc::new(AtomicUsize::new(0));
771
772 let mut individual_handles = Vec::new();
774 for _ in 0..3 {
775 let task = Box::new(CounterTask::new(counter.clone()));
776 let scheduler_clone = scheduler.clone();
777 let handle =
778 tokio::spawn(async move { scheduler_clone.submit_task(task).await.unwrap() });
779 individual_handles.push(handle);
780 }
781
782 let mut batch_tasks: Vec<Box<dyn Task + Send + Sync>> = Vec::new();
784 for _ in 0..4 {
785 batch_tasks.push(Box::new(CounterTask::new(counter.clone())));
786 }
787
788 let batch_handle = {
789 let scheduler_clone = scheduler.clone();
790 tokio::spawn(async move { scheduler_clone.submit_batch_tasks(batch_tasks).await })
791 };
792
793 let mut more_individual_handles = Vec::new();
795 for _ in 0..2 {
796 let task = Box::new(CounterTask::new(counter.clone()));
797 let scheduler_clone = scheduler.clone();
798 let handle =
799 tokio::spawn(async move { scheduler_clone.submit_task(task).await.unwrap() });
800 more_individual_handles.push(handle);
801 }
802
803 for handle in individual_handles {
805 let result = handle.await.unwrap();
806 assert!(matches!(result, TaskResult::Success(_)));
807 }
808
809 let batch_results = batch_handle.await.unwrap();
810 assert_eq!(batch_results.len(), 4);
811 for result in batch_results {
812 assert!(matches!(result, TaskResult::Success(_)));
813 }
814
815 for handle in more_individual_handles {
816 let result = handle.await.unwrap();
817 assert!(matches!(result, TaskResult::Success(_)));
818 }
819
820 assert_eq!(counter.load(Ordering::SeqCst), 9);
822 }
823
824 #[tokio::test]
826 async fn test_task_scheduling_strategies() {
827 use std::sync::Arc;
828 use std::sync::atomic::{AtomicUsize, Ordering};
829
830 struct PriorityTask {
831 id: String,
832 priority: TaskPriority,
833 counter: Arc<AtomicUsize>,
834 execution_order: Arc<Mutex<Vec<String>>>,
835 }
836
837 #[async_trait::async_trait]
838 impl Task for PriorityTask {
839 async fn execute(&self) -> TaskResult {
840 self.counter.fetch_add(1, Ordering::SeqCst);
841 self.execution_order.lock().unwrap().push(self.id.clone());
842 tokio::time::sleep(Duration::from_millis(50)).await;
844 TaskResult::Success(format!("Priority task {} completed", self.id))
845 }
846 fn task_type(&self) -> &'static str {
847 "priority"
848 }
849 fn task_id(&self) -> String {
850 self.id.clone()
851 }
852 }
853
854 let scheduler = TaskScheduler::new_with_defaults();
855 let counter = Arc::new(AtomicUsize::new(0));
856 let execution_order = Arc::new(Mutex::new(Vec::new()));
857
858 let priorities = vec![
860 ("low", TaskPriority::Low),
861 ("high", TaskPriority::High),
862 ("critical", TaskPriority::Critical),
863 ("normal", TaskPriority::Normal),
864 ];
865
866 for (id, priority) in priorities {
867 let task = PriorityTask {
868 id: id.to_string(),
869 priority,
870 counter: Arc::clone(&counter),
871 execution_order: Arc::clone(&execution_order),
872 };
873
874 scheduler
875 .submit_task_with_priority(Box::new(task), priority)
876 .await
877 .unwrap();
878 }
879
880 tokio::time::sleep(Duration::from_millis(200)).await;
882
883 let final_count = counter.load(Ordering::SeqCst);
885 assert_eq!(final_count, 4, "All 4 tasks should have been executed");
886
887 let order = execution_order.lock().unwrap();
889 println!("Task execution order: {:?}", *order);
890
891 assert!(
894 order.contains(&"critical".to_string()),
895 "Critical task should have been executed"
896 );
897 assert!(
898 order.contains(&"low".to_string()),
899 "Low task should have been executed"
900 );
901 assert!(
902 order.contains(&"high".to_string()),
903 "High task should have been executed"
904 );
905 assert!(
906 order.contains(&"normal".to_string()),
907 "Normal task should have been executed"
908 );
909 }
910
911 #[tokio::test]
913 async fn test_load_balancing() {
914 let scheduler = TaskScheduler::new_with_defaults();
915 let task_counter = Arc::new(AtomicUsize::new(0));
916
917 for _i in 0..10 {
919 let task = CounterTask::new(Arc::clone(&task_counter));
920 let result = scheduler.submit_task(Box::new(task)).await.unwrap();
921 assert!(matches!(result, TaskResult::Success(_)));
922 }
923
924 let final_count = task_counter.load(Ordering::SeqCst);
926 assert_eq!(final_count, 10);
927
928 assert_eq!(scheduler.get_queue_size(), 0);
930 }
931
932 #[tokio::test]
934 async fn test_task_priority_processing() {
935 let scheduler = TaskScheduler::new_with_defaults();
936
937 assert!(TaskPriority::Critical > TaskPriority::High);
939 assert!(TaskPriority::High > TaskPriority::Normal);
940 assert!(TaskPriority::Normal > TaskPriority::Low);
941
942 let high_task = MockTask {
944 name: "high_priority".to_string(),
945 duration: Duration::from_millis(5),
946 };
947
948 let low_task = MockTask {
949 name: "low_priority".to_string(),
950 duration: Duration::from_millis(5),
951 };
952
953 let high_result = scheduler
954 .submit_task_with_priority(Box::new(high_task), TaskPriority::High)
955 .await
956 .unwrap();
957 let low_result = scheduler
958 .submit_task_with_priority(Box::new(low_task), TaskPriority::Low)
959 .await
960 .unwrap();
961
962 assert!(matches!(high_result, TaskResult::Success(_)));
963 assert!(matches!(low_result, TaskResult::Success(_)));
964 }
965
966 #[tokio::test]
968 async fn test_scheduler_state_management() {
969 let scheduler = TaskScheduler::new_with_defaults();
970
971 assert_eq!(scheduler.get_queue_size(), 0);
973 assert_eq!(scheduler.get_active_workers(), 0);
974
975 let task = MockTask {
977 name: "state_test".to_string(),
978 duration: Duration::from_millis(50),
979 };
980
981 let result = scheduler.submit_task(Box::new(task)).await.unwrap();
982
983 tokio::time::sleep(Duration::from_millis(5)).await;
985
986 assert!(matches!(result, TaskResult::Success(_)));
988
989 assert_eq!(scheduler.get_queue_size(), 0);
991 }
992
993 #[tokio::test]
995 async fn test_overflow_strategy_handling() {
996 let scheduler = TaskScheduler::new_with_defaults();
997
998 for i in 0..20 {
1000 let task = MockTask {
1001 name: format!("overflow_test_{}", i),
1002 duration: Duration::from_millis(20),
1003 };
1004
1005 match scheduler.submit_task(Box::new(task)).await {
1006 Ok(result) => {
1007 assert!(matches!(result, TaskResult::Success(_)));
1008 }
1009 Err(_) => {
1010 break;
1012 }
1013 }
1014 }
1015
1016 tokio::time::sleep(Duration::from_millis(100)).await;
1018
1019 assert_eq!(scheduler.get_queue_size(), 0);
1021 }
1022
1023 #[tokio::test]
1025 async fn test_concurrent_task_submission() {
1026 let scheduler = TaskScheduler::new_with_defaults();
1027 let completion_counter = Arc::new(AtomicUsize::new(0));
1028 let mut submission_handles = Vec::new();
1029
1030 for _i in 0..8 {
1032 let scheduler_clone = scheduler.clone();
1033 let counter_clone = Arc::clone(&completion_counter);
1034
1035 let submission_handle = tokio::spawn(async move {
1036 let task = CounterTask::new(counter_clone);
1037 scheduler_clone.submit_task(Box::new(task)).await.unwrap()
1038 });
1039
1040 submission_handles.push(submission_handle);
1041 }
1042
1043 for handle in submission_handles {
1045 let result = handle.await.unwrap();
1046 assert!(matches!(result, TaskResult::Success(_)));
1047 }
1048
1049 let final_count = completion_counter.load(Ordering::SeqCst);
1051 assert_eq!(final_count, 8);
1052 }
1053
1054 #[tokio::test]
1056 async fn test_scheduler_performance_metrics() {
1057 let scheduler = TaskScheduler::new_with_defaults();
1058 let start_time = std::time::Instant::now();
1059 let task_count = 5;
1060
1061 for i in 0..task_count {
1063 let task = MockTask {
1064 name: format!("perf_test_{}", i),
1065 duration: Duration::from_millis(10),
1066 };
1067 let result = scheduler.submit_task(Box::new(task)).await.unwrap();
1068 assert!(matches!(result, TaskResult::Success(_)));
1069 }
1070
1071 let total_time = start_time.elapsed();
1072
1073 assert!(
1075 total_time < Duration::from_millis(500),
1076 "Tasks took too long: {:?}",
1077 total_time
1078 );
1079
1080 assert_eq!(scheduler.get_queue_size(), 0);
1082 assert_eq!(scheduler.get_active_workers(), 0);
1083 }
1084}