1use super::{Task, TaskResult, TaskStatus};
3use crate::Result;
4use crate::config::{Config, OverflowStrategy};
5use crate::core::parallel::config::ParallelConfig;
6use crate::error::SubXError;
7use std::collections::VecDeque;
8use std::sync::{Arc, Mutex};
9use tokio::sync::{Semaphore, oneshot};
10
11struct PendingTask {
12 task: Box<dyn Task + Send + Sync>,
13 result_sender: oneshot::Sender<TaskResult>,
14 task_id: String,
15 priority: TaskPriority,
16}
17
18struct ActiveTaskGuard {
24 active_tasks: Arc<Mutex<std::collections::HashMap<String, TaskInfo>>>,
25 task_id: String,
26}
27
28impl Drop for ActiveTaskGuard {
29 fn drop(&mut self) {
30 if let Ok(mut active) = self.active_tasks.lock() {
31 active.remove(&self.task_id);
32 }
33 }
34}
35
36impl PartialEq for PendingTask {
37 fn eq(&self, other: &Self) -> bool {
38 self.priority == other.priority
39 }
40}
41
42impl Eq for PendingTask {}
43
44impl PartialOrd for PendingTask {
45 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
46 Some(self.cmp(other))
47 }
48}
49
50impl Ord for PendingTask {
51 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
52 self.priority.cmp(&other.priority)
53 }
54}
55
56#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
61pub enum TaskPriority {
62 Low = 0,
64 Normal = 1,
66 High = 2,
68 Critical = 3,
70}
71
72#[derive(Debug, Clone)]
77pub struct TaskInfo {
78 pub task_id: String,
80 pub task_type: String,
82 pub status: TaskStatus,
84 pub start_time: std::time::Instant,
86 pub progress: f32,
88}
89
90pub struct TaskScheduler {
92 _config: ParallelConfig,
94 load_balancer: Option<crate::core::parallel::load_balancer::LoadBalancer>,
96 task_timeout: std::time::Duration,
98 worker_idle_timeout: std::time::Duration,
100 task_queue: Arc<Mutex<VecDeque<PendingTask>>>,
101 semaphore: Arc<Semaphore>,
102 active_tasks: Arc<Mutex<std::collections::HashMap<String, TaskInfo>>>,
103 scheduler_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
104}
105
106impl TaskScheduler {
107 pub fn new_with_config(app_config: &Config) -> Result<Self> {
109 let config = ParallelConfig::from_app_config(app_config);
110 config.validate()?;
111 let semaphore = Arc::new(Semaphore::new(config.max_concurrent_jobs));
112 let task_queue = Arc::new(Mutex::new(VecDeque::new()));
113 let active_tasks = Arc::new(Mutex::new(std::collections::HashMap::new()));
114
115 let general = &app_config.general;
117 let scheduler = Self {
118 _config: config.clone(),
119 task_queue: task_queue.clone(),
120 semaphore: semaphore.clone(),
121 active_tasks: active_tasks.clone(),
122 scheduler_handle: Arc::new(Mutex::new(None)),
123 load_balancer: if config.auto_balance_workers {
124 Some(crate::core::parallel::load_balancer::LoadBalancer::new())
125 } else {
126 None
127 },
128 task_timeout: std::time::Duration::from_secs(general.task_timeout_seconds),
129 worker_idle_timeout: std::time::Duration::from_secs(
130 general.worker_idle_timeout_seconds,
131 ),
132 };
133
134 scheduler.start_scheduler_loop();
136 Ok(scheduler)
137 }
138
139 pub fn new_with_defaults() -> Self {
141 let default_app_config = Config::default();
143 let config = ParallelConfig::from_app_config(&default_app_config);
144 let _ = config.validate();
145 let semaphore = Arc::new(Semaphore::new(config.max_concurrent_jobs));
146 let task_queue = Arc::new(Mutex::new(VecDeque::new()));
147 let active_tasks = Arc::new(Mutex::new(std::collections::HashMap::new()));
148
149 let general = &default_app_config.general;
150 let scheduler = Self {
151 _config: config.clone(),
152 task_queue: task_queue.clone(),
153 semaphore: semaphore.clone(),
154 active_tasks: active_tasks.clone(),
155 scheduler_handle: Arc::new(Mutex::new(None)),
156 load_balancer: if config.auto_balance_workers {
157 Some(crate::core::parallel::load_balancer::LoadBalancer::new())
158 } else {
159 None
160 },
161 task_timeout: std::time::Duration::from_secs(general.task_timeout_seconds),
162 worker_idle_timeout: std::time::Duration::from_secs(
163 general.worker_idle_timeout_seconds,
164 ),
165 };
166
167 scheduler.start_scheduler_loop();
169 scheduler
170 }
171
172 pub fn new() -> Result<Self> {
174 let default_config = Config::default();
175 Self::new_with_config(&default_config)
176 }
177
178 fn start_scheduler_loop(&self) {
180 let task_queue = Arc::clone(&self.task_queue);
181 let semaphore = Arc::clone(&self.semaphore);
182 let active_tasks = Arc::clone(&self.active_tasks);
183 let config = self._config.clone();
184 let task_timeout = self.task_timeout;
185 let worker_idle_timeout = self.worker_idle_timeout;
186
187 let handle = tokio::spawn(async move {
188 let mut last_active = std::time::Instant::now();
190 loop {
191 let has_pending = {
193 let q = task_queue.lock().unwrap();
194 !q.is_empty()
195 };
196 let has_active = {
197 let a = active_tasks.lock().unwrap();
198 !a.is_empty()
199 };
200 if has_pending || has_active {
201 last_active = std::time::Instant::now();
202 } else if last_active.elapsed() > worker_idle_timeout {
203 break;
204 }
205 if let Ok(permit) = semaphore.clone().try_acquire_owned() {
207 let pending = {
208 let mut queue = task_queue.lock().unwrap();
209 if config.enable_task_priorities {
211 if let Some((idx, _)) =
213 queue.iter().enumerate().max_by_key(|(_, t)| t.priority)
214 {
215 queue.remove(idx)
216 } else {
217 None
218 }
219 } else {
220 queue.pop_front()
221 }
222 };
223 if let Some(p) = pending {
224 {
226 let mut active = active_tasks.lock().unwrap();
227 if let Some(info) = active.get_mut(&p.task_id) {
228 info.status = TaskStatus::Running;
229 }
230 }
231
232 let task_id = p.task_id.clone();
233 let active_tasks_clone = Arc::clone(&active_tasks);
234
235 tokio::spawn(async move {
237 let result = match tokio::time::timeout(task_timeout, p.task.execute())
239 .await
240 {
241 Ok(res) => res,
242 Err(_) => TaskResult::Failed("Task execution timeout".to_string()),
243 };
244
245 {
247 let mut at = active_tasks_clone.lock().unwrap();
248 if let Some(info) = at.get_mut(&task_id) {
249 info.status = TaskStatus::Completed(result.clone());
250 info.progress = 1.0;
251 }
252 }
253
254 let _ = p.result_sender.send(result);
256
257 drop(permit);
259 });
260 } else {
261 drop(permit);
263 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
264 }
265 } else {
266 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
268 }
269 }
270 });
271
272 *self.scheduler_handle.lock().unwrap() = Some(handle);
274 }
275
276 pub async fn submit_task(&self, task: Box<dyn Task + Send + Sync>) -> Result<TaskResult> {
278 self.submit_task_with_priority(task, TaskPriority::Normal)
279 .await
280 }
281
282 pub async fn submit_task_with_priority(
284 &self,
285 task: Box<dyn Task + Send + Sync>,
286 priority: TaskPriority,
287 ) -> Result<TaskResult> {
288 let task_id = task.task_id();
289 let task_type = task.task_type().to_string();
290 let (tx, rx) = oneshot::channel();
291
292 {
294 let mut active = self.active_tasks.lock().unwrap();
295 active.insert(
296 task_id.clone(),
297 TaskInfo {
298 task_id: task_id.clone(),
299 task_type,
300 status: TaskStatus::Pending,
301 start_time: std::time::Instant::now(),
302 progress: 0.0,
303 },
304 );
305 }
306
307 let _guard = ActiveTaskGuard {
310 active_tasks: Arc::clone(&self.active_tasks),
311 task_id: task_id.clone(),
312 };
313
314 let pending = PendingTask {
316 task,
317 result_sender: tx,
318 task_id: task_id.clone(),
319 priority,
320 };
321 if self.get_queue_size() >= self._config.task_queue_size {
322 match self._config.queue_overflow_strategy {
323 OverflowStrategy::Block => {
324 while self.get_queue_size() >= self._config.task_queue_size {
326 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
327 }
328 }
329 OverflowStrategy::DropOldest => {
330 let evicted_id = {
331 let mut q = self.task_queue.lock().unwrap();
332 if let Some(evicted) = q.pop_front() {
333 let id = evicted.task_id.clone();
334 let _ = evicted.result_sender.send(TaskResult::Failed(
335 "Task dropped due to queue overflow".to_string(),
336 ));
337 Some(id)
338 } else {
339 None
340 }
341 };
342 if let Some(id) = evicted_id {
343 let mut active = self.active_tasks.lock().unwrap();
347 active.remove(&id);
348 }
349 }
350 OverflowStrategy::Reject => {
351 return Err(SubXError::parallel_processing(
352 "Task queue is full".to_string(),
353 ));
354 }
355 OverflowStrategy::Drop => {
356 return Ok(TaskResult::Failed(
358 "Task dropped due to queue overflow".to_string(),
359 ));
360 }
361 OverflowStrategy::Expand => {
362 }
365 }
366 }
367 {
369 let mut q = self.task_queue.lock().unwrap();
370 if self._config.enable_task_priorities {
371 let pos = q
372 .iter()
373 .position(|t| t.priority < pending.priority)
374 .unwrap_or(q.len());
375 q.insert(pos, pending);
376 } else {
377 q.push_back(pending);
378 }
379 }
380
381 self.ensure_scheduler_running();
384
385 let result = rx.await.map_err(|_| {
389 crate::error::SubXError::parallel_processing("Task execution interrupted".to_string())
390 })?;
391
392 Ok(result)
393 }
394
395 fn ensure_scheduler_running(&self) {
401 let needs_restart = {
402 let handle = self.scheduler_handle.lock().unwrap();
403 match handle.as_ref() {
404 Some(h) => h.is_finished(),
405 None => true,
406 }
407 };
408 if needs_restart {
409 self.start_scheduler_loop();
410 }
411 }
412
413 async fn try_execute_next_task(&self) {
414 }
417
418 pub async fn submit_batch_tasks(
420 &self,
421 tasks: Vec<Box<dyn Task + Send + Sync>>,
422 ) -> Vec<TaskResult> {
423 let mut receivers = Vec::new();
424
425 for task in tasks {
427 let task_id = task.task_id();
428 let task_type = task.task_type().to_string();
429 let (tx, rx) = oneshot::channel();
430
431 {
433 let mut active = self.active_tasks.lock().unwrap();
434 active.insert(
435 task_id.clone(),
436 TaskInfo {
437 task_id: task_id.clone(),
438 task_type,
439 status: TaskStatus::Pending,
440 start_time: std::time::Instant::now(),
441 progress: 0.0,
442 },
443 );
444 }
445
446 let pending = PendingTask {
448 task,
449 result_sender: tx,
450 task_id: task_id.clone(),
451 priority: TaskPriority::Normal,
452 };
453 if self.get_queue_size() >= self._config.task_queue_size {
454 match self._config.queue_overflow_strategy {
455 OverflowStrategy::Block => {
456 while self.get_queue_size() >= self._config.task_queue_size {
458 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
459 }
460 }
461 OverflowStrategy::DropOldest => {
462 let evicted_id = {
463 let mut q = self.task_queue.lock().unwrap();
464 if let Some(evicted) = q.pop_front() {
465 let id = evicted.task_id.clone();
466 let _ = evicted.result_sender.send(TaskResult::Failed(
467 "Task dropped due to queue overflow".to_string(),
468 ));
469 Some(id)
470 } else {
471 None
472 }
473 };
474 if let Some(id) = evicted_id {
475 let mut active = self.active_tasks.lock().unwrap();
476 active.remove(&id);
477 }
478 }
479 OverflowStrategy::Reject => {
480 return Vec::new();
482 }
483 OverflowStrategy::Drop => {
484 continue;
486 }
487 OverflowStrategy::Expand => {
488 }
491 }
492 }
493 {
495 let mut q = self.task_queue.lock().unwrap();
496 if self._config.enable_task_priorities {
497 let pos = q
498 .iter()
499 .position(|t| t.priority < pending.priority)
500 .unwrap_or(q.len());
501 q.insert(pos, pending);
502 } else {
503 q.push_back(pending);
504 }
505 }
506
507 receivers.push((task_id, rx));
508 }
509
510 self.ensure_scheduler_running();
512
513 let mut results = Vec::new();
515 for (task_id, rx) in receivers {
516 match rx.await {
517 Ok(result) => results.push(result),
518 Err(_) => {
519 results.push(TaskResult::Failed("Task execution interrupted".to_string()))
520 }
521 }
522
523 {
525 let mut active = self.active_tasks.lock().unwrap();
526 active.remove(&task_id);
527 }
528 }
529
530 results
531 }
532
533 pub fn get_queue_size(&self) -> usize {
535 self.task_queue.lock().unwrap().len()
536 }
537
538 pub fn get_active_workers(&self) -> usize {
540 self._config.max_concurrent_jobs - self.semaphore.available_permits()
541 }
542
543 pub fn get_task_status(&self, task_id: &str) -> Option<TaskInfo> {
545 self.active_tasks.lock().unwrap().get(task_id).cloned()
546 }
547
548 pub fn list_active_tasks(&self) -> Vec<TaskInfo> {
550 self.active_tasks
551 .lock()
552 .unwrap()
553 .values()
554 .cloned()
555 .collect()
556 }
557}
558
559impl Clone for TaskScheduler {
560 fn clone(&self) -> Self {
561 Self {
562 _config: self._config.clone(),
563 task_queue: Arc::clone(&self.task_queue),
564 semaphore: Arc::clone(&self.semaphore),
565 active_tasks: Arc::clone(&self.active_tasks),
566 scheduler_handle: Arc::clone(&self.scheduler_handle),
567 load_balancer: self.load_balancer.clone(),
568 task_timeout: self.task_timeout,
569 worker_idle_timeout: self.worker_idle_timeout,
570 }
571 }
572}
573
574#[cfg(test)]
575mod tests {
576 use super::{Task, TaskPriority, TaskResult, TaskScheduler};
577 use std::sync::atomic::{AtomicUsize, Ordering};
578 use std::sync::{Arc, Mutex};
579 use tokio::time::Duration;
580 use uuid::Uuid;
581
582 struct MockTask {
583 name: String,
584 duration: Duration,
585 }
586
587 #[async_trait::async_trait]
588 impl Task for MockTask {
589 async fn execute(&self) -> TaskResult {
590 tokio::time::sleep(self.duration).await;
591 TaskResult::Success(format!("Task completed: {}", self.name))
592 }
593 fn task_type(&self) -> &'static str {
594 "mock"
595 }
596 fn task_id(&self) -> String {
597 format!("mock_{}", self.name)
598 }
599 }
600
601 struct CounterTask {
602 counter: Arc<AtomicUsize>,
603 }
604 impl CounterTask {
605 fn new(counter: Arc<AtomicUsize>) -> Self {
606 Self { counter }
607 }
608 }
609 #[async_trait::async_trait]
610 impl Task for CounterTask {
611 async fn execute(&self) -> TaskResult {
612 self.counter.fetch_add(1, Ordering::SeqCst);
613 TaskResult::Success("Counter task completed".to_string())
614 }
615 fn task_type(&self) -> &'static str {
616 "counter"
617 }
618 fn task_id(&self) -> String {
619 Uuid::now_v7().to_string()
620 }
621 }
622
623 struct OrderTask {
624 name: String,
625 order: Arc<Mutex<Vec<String>>>,
626 }
627 impl OrderTask {
628 fn new(name: &str, order: Arc<Mutex<Vec<String>>>) -> Self {
629 Self {
630 name: name.to_string(),
631 order,
632 }
633 }
634 }
635 #[async_trait::async_trait]
636 impl Task for OrderTask {
637 async fn execute(&self) -> TaskResult {
638 let mut v = self.order.lock().unwrap();
639 v.push(self.name.clone());
640 TaskResult::Success(format!("Order task completed: {}", self.name))
641 }
642 fn task_type(&self) -> &'static str {
643 "order"
644 }
645 fn task_id(&self) -> String {
646 format!("order_{}", self.name)
647 }
648 }
649
650 #[tokio::test]
651 async fn test_task_scheduler_basic() {
652 let scheduler = TaskScheduler::new_with_defaults();
653 let task = Box::new(MockTask {
654 name: "test".to_string(),
655 duration: Duration::from_millis(10),
656 });
657 let result = scheduler.submit_task(task).await.unwrap();
658 assert!(matches!(result, TaskResult::Success(_)));
659 }
660
661 #[test]
662 fn counter_task_id_is_uuidv7() {
663 let counter = Arc::new(AtomicUsize::new(0));
664 let task = CounterTask::new(counter);
665 let id = task.task_id();
666 let parsed = Uuid::parse_str(&id).expect("task_id must be a valid UUID");
667 assert_eq!(parsed.get_version_num(), 7);
668 }
669
670 #[tokio::test]
671 async fn test_concurrent_task_execution() {
672 let scheduler = TaskScheduler::new_with_defaults();
673 let counter = Arc::new(AtomicUsize::new(0));
674
675 let task = Box::new(CounterTask::new(counter.clone()));
677 let result = scheduler.submit_task(task).await.unwrap();
678 assert!(matches!(result, TaskResult::Success(_)));
679 assert_eq!(counter.load(Ordering::SeqCst), 1);
680
681 for _ in 0..4 {
683 let task = Box::new(CounterTask::new(counter.clone()));
684 let _result = scheduler.submit_task(task).await.unwrap();
685 }
686 assert_eq!(counter.load(Ordering::SeqCst), 5);
687 }
688
689 #[tokio::test]
690 async fn test_task_priority_ordering() {
691 let scheduler = TaskScheduler::new_with_defaults();
692 let order = Arc::new(Mutex::new(Vec::new()));
693
694 let tasks = vec![
696 (TaskPriority::Low, "low"),
697 (TaskPriority::High, "high"),
698 (TaskPriority::Normal, "normal"),
699 (TaskPriority::Critical, "critical"),
700 ];
701
702 let mut handles = Vec::new();
703 for (prio, name) in tasks {
704 let task = Box::new(OrderTask::new(name, order.clone()));
705 let scheduler_clone = scheduler.clone();
706 let handle = tokio::spawn(async move {
707 scheduler_clone
708 .submit_task_with_priority(task, prio)
709 .await
710 .unwrap()
711 });
712 handles.push(handle);
713 }
714
715 for handle in handles {
717 let _ = handle.await.unwrap();
718 }
719
720 let v = order.lock().unwrap();
721 assert_eq!(v.len(), 4);
722 assert!(v.contains(&"critical".to_string()));
724 assert!(v.contains(&"high".to_string()));
725 assert!(v.contains(&"normal".to_string()));
726 assert!(v.contains(&"low".to_string()));
727 }
728
729 #[tokio::test]
730 async fn test_queue_and_active_workers_metrics() {
731 let scheduler = TaskScheduler::new_with_defaults();
732
733 assert_eq!(scheduler.get_queue_size(), 0);
735 assert_eq!(scheduler.get_active_workers(), 0);
736
737 let task = Box::new(MockTask {
739 name: "long_task".to_string(),
740 duration: Duration::from_millis(100),
741 });
742
743 let handle = {
744 let scheduler_clone = scheduler.clone();
745 tokio::spawn(async move { scheduler_clone.submit_task(task).await })
746 };
747
748 tokio::time::sleep(Duration::from_millis(20)).await;
750
751 let _result = handle.await.unwrap().unwrap();
753
754 assert_eq!(scheduler.get_queue_size(), 0);
756 }
757
758 #[tokio::test]
759 async fn test_continuous_scheduling() {
760 let scheduler = TaskScheduler::new_with_defaults();
761 let counter = Arc::new(AtomicUsize::new(0));
762
763 let mut handles = Vec::new();
765 for i in 0..10 {
766 let task = Box::new(CounterTask::new(counter.clone()));
767 let scheduler_clone = scheduler.clone();
768 let handle =
769 tokio::spawn(async move { scheduler_clone.submit_task(task).await.unwrap() });
770 handles.push(handle);
771
772 if i % 3 == 0 {
774 tokio::time::sleep(Duration::from_millis(5)).await;
775 }
776 }
777
778 for handle in handles {
780 let result = handle.await.unwrap();
781 assert!(matches!(result, TaskResult::Success(_)));
782 }
783
784 assert_eq!(counter.load(Ordering::SeqCst), 10);
786 }
787
788 #[tokio::test]
789 async fn test_batch_task_execution() {
790 let scheduler = TaskScheduler::new_with_defaults();
791 let counter = Arc::new(AtomicUsize::new(0));
792
793 let mut tasks: Vec<Box<dyn Task + Send + Sync>> = Vec::new();
795 for _ in 0..3 {
796 tasks.push(Box::new(CounterTask::new(counter.clone())));
798 }
799
800 let results = scheduler.submit_batch_tasks(tasks).await;
801 assert_eq!(results.len(), 3);
802 assert_eq!(counter.load(Ordering::SeqCst), 3);
803 for result in results {
804 assert!(matches!(result, TaskResult::Success(_)));
805 }
806 }
807
808 #[tokio::test]
809 async fn test_high_concurrency_stress() {
810 let scheduler = TaskScheduler::new_with_defaults();
811 let counter = Arc::new(AtomicUsize::new(0));
812
813 let mut handles = Vec::new();
815 for i in 0..50 {
816 let task = Box::new(CounterTask::new(counter.clone()));
817 let scheduler_clone = scheduler.clone();
818 let priority = match i % 4 {
819 0 => TaskPriority::Low,
820 1 => TaskPriority::Normal,
821 2 => TaskPriority::High,
822 3 => TaskPriority::Critical,
823 _ => TaskPriority::Normal,
824 };
825
826 let handle = tokio::spawn(async move {
827 scheduler_clone
828 .submit_task_with_priority(task, priority)
829 .await
830 .unwrap()
831 });
832 handles.push(handle);
833
834 if i % 5 == 0 {
836 tokio::time::sleep(Duration::from_millis(1)).await;
837 }
838 }
839
840 for handle in handles {
842 let result = handle.await.unwrap();
843 assert!(matches!(result, TaskResult::Success(_)));
844 }
845
846 assert_eq!(counter.load(Ordering::SeqCst), 50);
848
849 assert_eq!(scheduler.get_queue_size(), 0);
851 assert_eq!(scheduler.get_active_workers(), 0);
852 }
853
854 #[tokio::test]
855 async fn test_mixed_batch_and_individual_tasks() {
856 let scheduler = TaskScheduler::new_with_defaults();
857 let counter = Arc::new(AtomicUsize::new(0));
858
859 let mut individual_handles = Vec::new();
861 for _ in 0..3 {
862 let task = Box::new(CounterTask::new(counter.clone()));
863 let scheduler_clone = scheduler.clone();
864 let handle =
865 tokio::spawn(async move { scheduler_clone.submit_task(task).await.unwrap() });
866 individual_handles.push(handle);
867 }
868
869 let mut batch_tasks: Vec<Box<dyn Task + Send + Sync>> = Vec::new();
871 for _ in 0..4 {
872 batch_tasks.push(Box::new(CounterTask::new(counter.clone())));
873 }
874
875 let batch_handle = {
876 let scheduler_clone = scheduler.clone();
877 tokio::spawn(async move { scheduler_clone.submit_batch_tasks(batch_tasks).await })
878 };
879
880 let mut more_individual_handles = Vec::new();
882 for _ in 0..2 {
883 let task = Box::new(CounterTask::new(counter.clone()));
884 let scheduler_clone = scheduler.clone();
885 let handle =
886 tokio::spawn(async move { scheduler_clone.submit_task(task).await.unwrap() });
887 more_individual_handles.push(handle);
888 }
889
890 for handle in individual_handles {
892 let result = handle.await.unwrap();
893 assert!(matches!(result, TaskResult::Success(_)));
894 }
895
896 let batch_results = batch_handle.await.unwrap();
897 assert_eq!(batch_results.len(), 4);
898 for result in batch_results {
899 assert!(matches!(result, TaskResult::Success(_)));
900 }
901
902 for handle in more_individual_handles {
903 let result = handle.await.unwrap();
904 assert!(matches!(result, TaskResult::Success(_)));
905 }
906
907 assert_eq!(counter.load(Ordering::SeqCst), 9);
909 }
910
911 #[tokio::test]
913 async fn test_task_scheduling_strategies() {
914 use std::sync::Arc;
915 use std::sync::atomic::{AtomicUsize, Ordering};
916
917 struct PriorityTask {
918 id: String,
919 priority: TaskPriority,
920 counter: Arc<AtomicUsize>,
921 execution_order: Arc<Mutex<Vec<String>>>,
922 }
923
924 #[async_trait::async_trait]
925 impl Task for PriorityTask {
926 async fn execute(&self) -> TaskResult {
927 self.counter.fetch_add(1, Ordering::SeqCst);
928 self.execution_order.lock().unwrap().push(self.id.clone());
929 tokio::time::sleep(Duration::from_millis(50)).await;
931 TaskResult::Success(format!("Priority task {} completed", self.id))
932 }
933 fn task_type(&self) -> &'static str {
934 "priority"
935 }
936 fn task_id(&self) -> String {
937 self.id.clone()
938 }
939 }
940
941 let scheduler = TaskScheduler::new_with_defaults();
942 let counter = Arc::new(AtomicUsize::new(0));
943 let execution_order = Arc::new(Mutex::new(Vec::new()));
944
945 let priorities = vec![
947 ("low", TaskPriority::Low),
948 ("high", TaskPriority::High),
949 ("critical", TaskPriority::Critical),
950 ("normal", TaskPriority::Normal),
951 ];
952
953 for (id, priority) in priorities {
954 let task = PriorityTask {
955 id: id.to_string(),
956 priority,
957 counter: Arc::clone(&counter),
958 execution_order: Arc::clone(&execution_order),
959 };
960
961 scheduler
962 .submit_task_with_priority(Box::new(task), priority)
963 .await
964 .unwrap();
965 }
966
967 tokio::time::sleep(Duration::from_millis(200)).await;
969
970 let final_count = counter.load(Ordering::SeqCst);
972 assert_eq!(final_count, 4, "All 4 tasks should have been executed");
973
974 let order = execution_order.lock().unwrap();
976 println!("Task execution order: {:?}", *order);
977
978 assert!(
981 order.contains(&"critical".to_string()),
982 "Critical task should have been executed"
983 );
984 assert!(
985 order.contains(&"low".to_string()),
986 "Low task should have been executed"
987 );
988 assert!(
989 order.contains(&"high".to_string()),
990 "High task should have been executed"
991 );
992 assert!(
993 order.contains(&"normal".to_string()),
994 "Normal task should have been executed"
995 );
996 }
997
998 #[tokio::test]
1000 async fn test_load_balancing() {
1001 let scheduler = TaskScheduler::new_with_defaults();
1002 let task_counter = Arc::new(AtomicUsize::new(0));
1003
1004 for _i in 0..10 {
1006 let task = CounterTask::new(Arc::clone(&task_counter));
1007 let result = scheduler.submit_task(Box::new(task)).await.unwrap();
1008 assert!(matches!(result, TaskResult::Success(_)));
1009 }
1010
1011 let final_count = task_counter.load(Ordering::SeqCst);
1013 assert_eq!(final_count, 10);
1014
1015 assert_eq!(scheduler.get_queue_size(), 0);
1017 }
1018
1019 #[tokio::test]
1021 async fn test_task_priority_processing() {
1022 let scheduler = TaskScheduler::new_with_defaults();
1023
1024 assert!(TaskPriority::Critical > TaskPriority::High);
1026 assert!(TaskPriority::High > TaskPriority::Normal);
1027 assert!(TaskPriority::Normal > TaskPriority::Low);
1028
1029 let high_task = MockTask {
1031 name: "high_priority".to_string(),
1032 duration: Duration::from_millis(5),
1033 };
1034
1035 let low_task = MockTask {
1036 name: "low_priority".to_string(),
1037 duration: Duration::from_millis(5),
1038 };
1039
1040 let high_result = scheduler
1041 .submit_task_with_priority(Box::new(high_task), TaskPriority::High)
1042 .await
1043 .unwrap();
1044 let low_result = scheduler
1045 .submit_task_with_priority(Box::new(low_task), TaskPriority::Low)
1046 .await
1047 .unwrap();
1048
1049 assert!(matches!(high_result, TaskResult::Success(_)));
1050 assert!(matches!(low_result, TaskResult::Success(_)));
1051 }
1052
1053 #[tokio::test]
1055 async fn test_scheduler_state_management() {
1056 let scheduler = TaskScheduler::new_with_defaults();
1057
1058 assert_eq!(scheduler.get_queue_size(), 0);
1060 assert_eq!(scheduler.get_active_workers(), 0);
1061
1062 let task = MockTask {
1064 name: "state_test".to_string(),
1065 duration: Duration::from_millis(50),
1066 };
1067
1068 let result = scheduler.submit_task(Box::new(task)).await.unwrap();
1069
1070 tokio::time::sleep(Duration::from_millis(5)).await;
1072
1073 assert!(matches!(result, TaskResult::Success(_)));
1075
1076 assert_eq!(scheduler.get_queue_size(), 0);
1078 }
1079
1080 #[tokio::test]
1082 async fn test_overflow_strategy_handling() {
1083 let scheduler = TaskScheduler::new_with_defaults();
1084
1085 for i in 0..20 {
1087 let task = MockTask {
1088 name: format!("overflow_test_{}", i),
1089 duration: Duration::from_millis(20),
1090 };
1091
1092 match scheduler.submit_task(Box::new(task)).await {
1093 Ok(result) => {
1094 assert!(matches!(result, TaskResult::Success(_)));
1095 }
1096 Err(_) => {
1097 break;
1099 }
1100 }
1101 }
1102
1103 tokio::time::sleep(Duration::from_millis(100)).await;
1105
1106 assert_eq!(scheduler.get_queue_size(), 0);
1108 }
1109
1110 #[tokio::test]
1112 async fn test_concurrent_task_submission() {
1113 let scheduler = TaskScheduler::new_with_defaults();
1114 let completion_counter = Arc::new(AtomicUsize::new(0));
1115 let mut submission_handles = Vec::new();
1116
1117 for _i in 0..8 {
1119 let scheduler_clone = scheduler.clone();
1120 let counter_clone = Arc::clone(&completion_counter);
1121
1122 let submission_handle = tokio::spawn(async move {
1123 let task = CounterTask::new(counter_clone);
1124 scheduler_clone.submit_task(Box::new(task)).await.unwrap()
1125 });
1126
1127 submission_handles.push(submission_handle);
1128 }
1129
1130 for handle in submission_handles {
1132 let result = handle.await.unwrap();
1133 assert!(matches!(result, TaskResult::Success(_)));
1134 }
1135
1136 let final_count = completion_counter.load(Ordering::SeqCst);
1138 assert_eq!(final_count, 8);
1139 }
1140
1141 #[tokio::test]
1143 async fn test_scheduler_performance_metrics() {
1144 let scheduler = TaskScheduler::new_with_defaults();
1145 let start_time = std::time::Instant::now();
1146 let task_count = 5;
1147
1148 for i in 0..task_count {
1150 let task = MockTask {
1151 name: format!("perf_test_{}", i),
1152 duration: Duration::from_millis(10),
1153 };
1154 let result = scheduler.submit_task(Box::new(task)).await.unwrap();
1155 assert!(matches!(result, TaskResult::Success(_)));
1156 }
1157
1158 let total_time = start_time.elapsed();
1159
1160 assert!(
1162 total_time < Duration::from_millis(500),
1163 "Tasks took too long: {:?}",
1164 total_time
1165 );
1166
1167 assert_eq!(scheduler.get_queue_size(), 0);
1169 assert_eq!(scheduler.get_active_workers(), 0);
1170 }
1171
1172 #[tokio::test]
1175 async fn test_active_task_guard_cleanup() {
1176 use super::{ActiveTaskGuard, TaskInfo};
1177 use std::collections::HashMap;
1178
1179 let active_tasks = Arc::new(Mutex::new(HashMap::<String, TaskInfo>::new()));
1180 let task_id = "guard_test_task".to_string();
1181
1182 active_tasks.lock().unwrap().insert(
1183 task_id.clone(),
1184 TaskInfo {
1185 task_id: task_id.clone(),
1186 task_type: "mock".to_string(),
1187 status: crate::core::parallel::TaskStatus::Pending,
1188 start_time: std::time::Instant::now(),
1189 progress: 0.0,
1190 },
1191 );
1192 assert!(active_tasks.lock().unwrap().contains_key(&task_id));
1193
1194 {
1195 let _guard = ActiveTaskGuard {
1196 active_tasks: Arc::clone(&active_tasks),
1197 task_id: task_id.clone(),
1198 };
1199 assert!(active_tasks.lock().unwrap().contains_key(&task_id));
1201 }
1202
1203 assert!(!active_tasks.lock().unwrap().contains_key(&task_id));
1205 }
1206
1207 #[tokio::test]
1211 async fn test_drop_oldest_sends_failed() {
1212 use crate::config::{Config, OverflowStrategy};
1213
1214 let mut config = Config::default();
1215 config.parallel.task_queue_size = 1;
1216 config.general.max_concurrent_jobs = 1;
1217 config.parallel.overflow_strategy = OverflowStrategy::DropOldest;
1218 config.parallel.enable_task_priorities = false;
1219 config.parallel.auto_balance_workers = false;
1220
1221 let scheduler = TaskScheduler::new_with_config(&config).unwrap();
1222
1223 let blocker = Box::new(MockTask {
1225 name: "blocker".to_string(),
1226 duration: Duration::from_millis(300),
1227 });
1228 let blocker_scheduler = scheduler.clone();
1229 let blocker_handle =
1230 tokio::spawn(async move { blocker_scheduler.submit_task(blocker).await });
1231
1232 tokio::time::sleep(Duration::from_millis(30)).await;
1234
1235 let first = Box::new(MockTask {
1237 name: "first_queued".to_string(),
1238 duration: Duration::from_millis(50),
1239 });
1240 let first_scheduler = scheduler.clone();
1241 let first_handle = tokio::spawn(async move { first_scheduler.submit_task(first).await });
1242
1243 tokio::time::sleep(Duration::from_millis(30)).await;
1245
1246 let second = Box::new(MockTask {
1248 name: "second_queued".to_string(),
1249 duration: Duration::from_millis(10),
1250 });
1251 let second_scheduler = scheduler.clone();
1252 let second_handle = tokio::spawn(async move { second_scheduler.submit_task(second).await });
1253
1254 let first_result = first_handle.await.unwrap().unwrap();
1256 match first_result {
1257 TaskResult::Failed(msg) => {
1258 assert!(
1259 msg.contains("overflow"),
1260 "expected overflow-related failure message, got: {}",
1261 msg
1262 );
1263 }
1264 other => panic!("expected Failed for evicted task, got {:?}", other),
1265 }
1266
1267 let blocker_result = blocker_handle.await.unwrap().unwrap();
1269 assert!(matches!(blocker_result, TaskResult::Success(_)));
1270 let second_result = second_handle.await.unwrap().unwrap();
1271 assert!(matches!(second_result, TaskResult::Success(_)));
1272 }
1273
1274 #[tokio::test]
1278 async fn test_scheduler_restart_after_idle() {
1279 let mut scheduler = TaskScheduler::new_with_defaults();
1280
1281 {
1284 let mut handle = scheduler.scheduler_handle.lock().unwrap();
1285 if let Some(h) = handle.take() {
1286 h.abort();
1287 }
1288 }
1289 tokio::time::sleep(Duration::from_millis(30)).await;
1291
1292 scheduler.worker_idle_timeout = Duration::from_millis(100);
1293 scheduler.start_scheduler_loop();
1294
1295 let t1 = Box::new(MockTask {
1297 name: "before_idle".to_string(),
1298 duration: Duration::from_millis(10),
1299 });
1300 let r1 = scheduler.submit_task(t1).await.unwrap();
1301 assert!(matches!(r1, TaskResult::Success(_)));
1302
1303 tokio::time::sleep(Duration::from_millis(350)).await;
1305
1306 let loop_finished = {
1307 let handle = scheduler.scheduler_handle.lock().unwrap();
1308 handle.as_ref().map(|h| h.is_finished()).unwrap_or(true)
1309 };
1310 assert!(
1311 loop_finished,
1312 "scheduler loop should have exited after idle timeout"
1313 );
1314
1315 let t2 = Box::new(MockTask {
1317 name: "after_idle".to_string(),
1318 duration: Duration::from_millis(10),
1319 });
1320 let r2 = scheduler.submit_task(t2).await.unwrap();
1321 assert!(matches!(r2, TaskResult::Success(_)));
1322
1323 let still_running = {
1325 let handle = scheduler.scheduler_handle.lock().unwrap();
1326 handle.as_ref().map(|h| !h.is_finished()).unwrap_or(false)
1327 };
1328 assert!(
1329 still_running,
1330 "scheduler loop should be running after restart"
1331 );
1332 }
1333}