Skip to main content

subx_cli/core/parallel/
scheduler.rs

1//! Task scheduler for parallel processing
2use super::{Task, TaskResult, TaskStatus};
3use crate::Result;
4use crate::config::{Config, OverflowStrategy};
5use crate::core::parallel::config::ParallelConfig;
6use crate::error::SubXError;
7use std::collections::VecDeque;
8use std::sync::{Arc, Mutex};
9use tokio::sync::{Semaphore, oneshot};
10
11struct PendingTask {
12    task: Box<dyn Task + Send + Sync>,
13    result_sender: oneshot::Sender<TaskResult>,
14    task_id: String,
15    priority: TaskPriority,
16}
17
18/// RAII guard that removes a task entry from `active_tasks` when dropped.
19///
20/// This ensures the scheduler's active task map stays consistent even if
21/// the awaiting future is cancelled, panics, or returns through an early
22/// error path before reaching the explicit cleanup site.
23struct ActiveTaskGuard {
24    active_tasks: Arc<Mutex<std::collections::HashMap<String, TaskInfo>>>,
25    task_id: String,
26}
27
28impl Drop for ActiveTaskGuard {
29    fn drop(&mut self) {
30        if let Ok(mut active) = self.active_tasks.lock() {
31            active.remove(&self.task_id);
32        }
33    }
34}
35
36impl PartialEq for PendingTask {
37    fn eq(&self, other: &Self) -> bool {
38        self.priority == other.priority
39    }
40}
41
42impl Eq for PendingTask {}
43
44impl PartialOrd for PendingTask {
45    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
46        Some(self.cmp(other))
47    }
48}
49
50impl Ord for PendingTask {
51    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
52        self.priority.cmp(&other.priority)
53    }
54}
55
56/// Priority levels for task execution scheduling.
57///
58/// Determines the execution order of tasks in the queue, with higher
59/// priority tasks being processed before lower priority ones.
60#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
61pub enum TaskPriority {
62    /// Low priority for background operations
63    Low = 0,
64    /// Normal priority for standard operations
65    Normal = 1,
66    /// High priority for user-initiated operations
67    High = 2,
68    /// Critical priority for system operations
69    Critical = 3,
70}
71
72/// Information about an active task in the scheduler.
73///
74/// Contains runtime information about a task currently being processed
75/// or queued for execution.
76#[derive(Debug, Clone)]
77pub struct TaskInfo {
78    /// Unique identifier for the task
79    pub task_id: String,
80    /// Type of task being executed
81    pub task_type: String,
82    /// Current status of the task
83    pub status: TaskStatus,
84    /// When the task started execution
85    pub start_time: std::time::Instant,
86    /// Current progress percentage (0.0 to 1.0)
87    pub progress: f32,
88}
89
90/// Scheduler to manage and execute tasks in parallel
91pub struct TaskScheduler {
92    /// Parallel processing configuration
93    _config: ParallelConfig,
94    /// Optional load balancer for dynamic worker adjustment
95    load_balancer: Option<crate::core::parallel::load_balancer::LoadBalancer>,
96    /// Task execution timeout setting
97    task_timeout: std::time::Duration,
98    /// Worker thread idle timeout setting
99    worker_idle_timeout: std::time::Duration,
100    task_queue: Arc<Mutex<VecDeque<PendingTask>>>,
101    semaphore: Arc<Semaphore>,
102    active_tasks: Arc<Mutex<std::collections::HashMap<String, TaskInfo>>>,
103    scheduler_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
104}
105
106impl TaskScheduler {
107    /// Create a new scheduler based on configuration
108    pub fn new_with_config(app_config: &Config) -> Result<Self> {
109        let config = ParallelConfig::from_app_config(app_config);
110        config.validate()?;
111        let semaphore = Arc::new(Semaphore::new(config.max_concurrent_jobs));
112        let task_queue = Arc::new(Mutex::new(VecDeque::new()));
113        let active_tasks = Arc::new(Mutex::new(std::collections::HashMap::new()));
114
115        // Read timeout settings from general configuration
116        let general = &app_config.general;
117        let scheduler = Self {
118            _config: config.clone(),
119            task_queue: task_queue.clone(),
120            semaphore: semaphore.clone(),
121            active_tasks: active_tasks.clone(),
122            scheduler_handle: Arc::new(Mutex::new(None)),
123            load_balancer: if config.auto_balance_workers {
124                Some(crate::core::parallel::load_balancer::LoadBalancer::new())
125            } else {
126                None
127            },
128            task_timeout: std::time::Duration::from_secs(general.task_timeout_seconds),
129            worker_idle_timeout: std::time::Duration::from_secs(
130                general.worker_idle_timeout_seconds,
131            ),
132        };
133
134        // Start background scheduler loop
135        scheduler.start_scheduler_loop();
136        Ok(scheduler)
137    }
138
139    /// Create a new scheduler with default settings (for testing)
140    pub fn new_with_defaults() -> Self {
141        // Use default application config for default testing
142        let default_app_config = Config::default();
143        let config = ParallelConfig::from_app_config(&default_app_config);
144        let _ = config.validate();
145        let semaphore = Arc::new(Semaphore::new(config.max_concurrent_jobs));
146        let task_queue = Arc::new(Mutex::new(VecDeque::new()));
147        let active_tasks = Arc::new(Mutex::new(std::collections::HashMap::new()));
148
149        let general = &default_app_config.general;
150        let scheduler = Self {
151            _config: config.clone(),
152            task_queue: task_queue.clone(),
153            semaphore: semaphore.clone(),
154            active_tasks: active_tasks.clone(),
155            scheduler_handle: Arc::new(Mutex::new(None)),
156            load_balancer: if config.auto_balance_workers {
157                Some(crate::core::parallel::load_balancer::LoadBalancer::new())
158            } else {
159                None
160            },
161            task_timeout: std::time::Duration::from_secs(general.task_timeout_seconds),
162            worker_idle_timeout: std::time::Duration::from_secs(
163                general.worker_idle_timeout_seconds,
164            ),
165        };
166
167        // Start background scheduler loop
168        scheduler.start_scheduler_loop();
169        scheduler
170    }
171
172    /// Create a new scheduler with default configuration
173    pub fn new() -> Result<Self> {
174        let default_config = Config::default();
175        Self::new_with_config(&default_config)
176    }
177
178    /// Start the background scheduler loop
179    fn start_scheduler_loop(&self) {
180        let task_queue = Arc::clone(&self.task_queue);
181        let semaphore = Arc::clone(&self.semaphore);
182        let active_tasks = Arc::clone(&self.active_tasks);
183        let config = self._config.clone();
184        let task_timeout = self.task_timeout;
185        let worker_idle_timeout = self.worker_idle_timeout;
186
187        let handle = tokio::spawn(async move {
188            // Idle check timer
189            let mut last_active = std::time::Instant::now();
190            loop {
191                // End scheduler after idle timeout
192                let has_pending = {
193                    let q = task_queue.lock().unwrap();
194                    !q.is_empty()
195                };
196                let has_active = {
197                    let a = active_tasks.lock().unwrap();
198                    !a.is_empty()
199                };
200                if has_pending || has_active {
201                    last_active = std::time::Instant::now();
202                } else if last_active.elapsed() > worker_idle_timeout {
203                    break;
204                }
205                // Try to get a semaphore permit and a task from the queue
206                if let Ok(permit) = semaphore.clone().try_acquire_owned() {
207                    let pending = {
208                        let mut queue = task_queue.lock().unwrap();
209                        // select next task by priority or FIFO
210                        if config.enable_task_priorities {
211                            // find highest priority task
212                            if let Some((idx, _)) =
213                                queue.iter().enumerate().max_by_key(|(_, t)| t.priority)
214                            {
215                                queue.remove(idx)
216                            } else {
217                                None
218                            }
219                        } else {
220                            queue.pop_front()
221                        }
222                    };
223                    if let Some(p) = pending {
224                        // Update task status to running
225                        {
226                            let mut active = active_tasks.lock().unwrap();
227                            if let Some(info) = active.get_mut(&p.task_id) {
228                                info.status = TaskStatus::Running;
229                            }
230                        }
231
232                        let task_id = p.task_id.clone();
233                        let active_tasks_clone = Arc::clone(&active_tasks);
234
235                        // Spawn the actual task execution
236                        tokio::spawn(async move {
237                            // Task execution timeout handling
238                            let result = match tokio::time::timeout(task_timeout, p.task.execute())
239                                .await
240                            {
241                                Ok(res) => res,
242                                Err(_) => TaskResult::Failed("Task execution timeout".to_string()),
243                            };
244
245                            // Update task status
246                            {
247                                let mut at = active_tasks_clone.lock().unwrap();
248                                if let Some(info) = at.get_mut(&task_id) {
249                                    info.status = TaskStatus::Completed(result.clone());
250                                    info.progress = 1.0;
251                                }
252                            }
253
254                            // Send result back
255                            let _ = p.result_sender.send(result);
256
257                            // Release the permit
258                            drop(permit);
259                        });
260                    } else {
261                        // No tasks in queue, release permit and wait a bit
262                        drop(permit);
263                        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
264                    }
265                } else {
266                    // No permits available, wait a bit
267                    tokio::time::sleep(std::time::Duration::from_millis(10)).await;
268                }
269            }
270        });
271
272        // Store the handle
273        *self.scheduler_handle.lock().unwrap() = Some(handle);
274    }
275
276    /// Submit a task with normal priority
277    pub async fn submit_task(&self, task: Box<dyn Task + Send + Sync>) -> Result<TaskResult> {
278        self.submit_task_with_priority(task, TaskPriority::Normal)
279            .await
280    }
281
282    /// Submit a task with specified priority
283    pub async fn submit_task_with_priority(
284        &self,
285        task: Box<dyn Task + Send + Sync>,
286        priority: TaskPriority,
287    ) -> Result<TaskResult> {
288        let task_id = task.task_id();
289        let task_type = task.task_type().to_string();
290        let (tx, rx) = oneshot::channel();
291
292        // Register task info
293        {
294            let mut active = self.active_tasks.lock().unwrap();
295            active.insert(
296                task_id.clone(),
297                TaskInfo {
298                    task_id: task_id.clone(),
299                    task_type,
300                    status: TaskStatus::Pending,
301                    start_time: std::time::Instant::now(),
302                    progress: 0.0,
303                },
304            );
305        }
306
307        // RAII guard ensures the active_tasks entry is removed on any exit
308        // path (normal completion, early return, cancellation, or panic).
309        let _guard = ActiveTaskGuard {
310            active_tasks: Arc::clone(&self.active_tasks),
311            task_id: task_id.clone(),
312        };
313
314        // Handle queue overflow strategy before enqueuing
315        let pending = PendingTask {
316            task,
317            result_sender: tx,
318            task_id: task_id.clone(),
319            priority,
320        };
321        if self.get_queue_size() >= self._config.task_queue_size {
322            match self._config.queue_overflow_strategy {
323                OverflowStrategy::Block => {
324                    // Block until space available
325                    while self.get_queue_size() >= self._config.task_queue_size {
326                        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
327                    }
328                }
329                OverflowStrategy::DropOldest => {
330                    let evicted_id = {
331                        let mut q = self.task_queue.lock().unwrap();
332                        if let Some(evicted) = q.pop_front() {
333                            let id = evicted.task_id.clone();
334                            let _ = evicted.result_sender.send(TaskResult::Failed(
335                                "Task dropped due to queue overflow".to_string(),
336                            ));
337                            Some(id)
338                        } else {
339                            None
340                        }
341                    };
342                    if let Some(id) = evicted_id {
343                        // Clean up the evicted task from active_tasks as its
344                        // original submitter may still be awaiting the guard
345                        // drop; removing here prevents a stale entry window.
346                        let mut active = self.active_tasks.lock().unwrap();
347                        active.remove(&id);
348                    }
349                }
350                OverflowStrategy::Reject => {
351                    return Err(SubXError::parallel_processing(
352                        "Task queue is full".to_string(),
353                    ));
354                }
355                OverflowStrategy::Drop => {
356                    // Drop the new task - return Failed result
357                    return Ok(TaskResult::Failed(
358                        "Task dropped due to queue overflow".to_string(),
359                    ));
360                }
361                OverflowStrategy::Expand => {
362                    // Allow queue to expand beyond limits
363                    // No action needed, task will be added below
364                }
365            }
366        }
367        // Enqueue task according to priority setting
368        {
369            let mut q = self.task_queue.lock().unwrap();
370            if self._config.enable_task_priorities {
371                let pos = q
372                    .iter()
373                    .position(|t| t.priority < pending.priority)
374                    .unwrap_or(q.len());
375                q.insert(pos, pending);
376            } else {
377                q.push_back(pending);
378            }
379        }
380
381        // Ensure the scheduler loop is alive; restart it if it exited due
382        // to the idle timeout so queued tasks continue to be drained.
383        self.ensure_scheduler_running();
384
385        // Await result. The `_guard` defined above will clean up the
386        // active_tasks entry automatically when this function returns,
387        // regardless of which branch is taken.
388        let result = rx.await.map_err(|_| {
389            crate::error::SubXError::parallel_processing("Task execution interrupted".to_string())
390        })?;
391
392        Ok(result)
393    }
394
395    /// Restart the background scheduler loop if it has exited.
396    ///
397    /// The scheduler loop voluntarily terminates after the configured idle
398    /// timeout. When new work is submitted afterwards we must bring it back
399    /// up, otherwise queued tasks would never be drained.
400    fn ensure_scheduler_running(&self) {
401        let needs_restart = {
402            let handle = self.scheduler_handle.lock().unwrap();
403            match handle.as_ref() {
404                Some(h) => h.is_finished(),
405                None => true,
406            }
407        };
408        if needs_restart {
409            self.start_scheduler_loop();
410        }
411    }
412
413    async fn try_execute_next_task(&self) {
414        // This method is no longer needed as we have background scheduler loop
415        // Keep it for compatibility but it does nothing
416    }
417
418    /// Submit multiple tasks and await all results
419    pub async fn submit_batch_tasks(
420        &self,
421        tasks: Vec<Box<dyn Task + Send + Sync>>,
422    ) -> Vec<TaskResult> {
423        let mut receivers = Vec::new();
424
425        // First add all tasks to the queue
426        for task in tasks {
427            let task_id = task.task_id();
428            let task_type = task.task_type().to_string();
429            let (tx, rx) = oneshot::channel();
430
431            // Register task information
432            {
433                let mut active = self.active_tasks.lock().unwrap();
434                active.insert(
435                    task_id.clone(),
436                    TaskInfo {
437                        task_id: task_id.clone(),
438                        task_type,
439                        status: TaskStatus::Pending,
440                        start_time: std::time::Instant::now(),
441                        progress: 0.0,
442                    },
443                );
444            }
445
446            // Enqueue each task with overflow and priority handling
447            let pending = PendingTask {
448                task,
449                result_sender: tx,
450                task_id: task_id.clone(),
451                priority: TaskPriority::Normal,
452            };
453            if self.get_queue_size() >= self._config.task_queue_size {
454                match self._config.queue_overflow_strategy {
455                    OverflowStrategy::Block => {
456                        // Block until space available
457                        while self.get_queue_size() >= self._config.task_queue_size {
458                            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
459                        }
460                    }
461                    OverflowStrategy::DropOldest => {
462                        let evicted_id = {
463                            let mut q = self.task_queue.lock().unwrap();
464                            if let Some(evicted) = q.pop_front() {
465                                let id = evicted.task_id.clone();
466                                let _ = evicted.result_sender.send(TaskResult::Failed(
467                                    "Task dropped due to queue overflow".to_string(),
468                                ));
469                                Some(id)
470                            } else {
471                                None
472                            }
473                        };
474                        if let Some(id) = evicted_id {
475                            let mut active = self.active_tasks.lock().unwrap();
476                            active.remove(&id);
477                        }
478                    }
479                    OverflowStrategy::Reject => {
480                        // Reject entire batch when queue is full
481                        return Vec::new();
482                    }
483                    OverflowStrategy::Drop => {
484                        // Drop the current task, continue with others
485                        continue;
486                    }
487                    OverflowStrategy::Expand => {
488                        // Allow queue to expand beyond limits
489                        // No action needed, task will be added below
490                    }
491                }
492            }
493            // Insert task according to priority setting
494            {
495                let mut q = self.task_queue.lock().unwrap();
496                if self._config.enable_task_priorities {
497                    let pos = q
498                        .iter()
499                        .position(|t| t.priority < pending.priority)
500                        .unwrap_or(q.len());
501                    q.insert(pos, pending);
502                } else {
503                    q.push_back(pending);
504                }
505            }
506
507            receivers.push((task_id, rx));
508        }
509
510        // Ensure the scheduler loop is alive after enqueuing the batch.
511        self.ensure_scheduler_running();
512
513        // Wait for all results
514        let mut results = Vec::new();
515        for (task_id, rx) in receivers {
516            match rx.await {
517                Ok(result) => results.push(result),
518                Err(_) => {
519                    results.push(TaskResult::Failed("Task execution interrupted".to_string()))
520                }
521            }
522
523            // Clean up task information
524            {
525                let mut active = self.active_tasks.lock().unwrap();
526                active.remove(&task_id);
527            }
528        }
529
530        results
531    }
532
533    /// Get number of tasks waiting in queue
534    pub fn get_queue_size(&self) -> usize {
535        self.task_queue.lock().unwrap().len()
536    }
537
538    /// Get number of active workers
539    pub fn get_active_workers(&self) -> usize {
540        self._config.max_concurrent_jobs - self.semaphore.available_permits()
541    }
542
543    /// Get status of a specific task
544    pub fn get_task_status(&self, task_id: &str) -> Option<TaskInfo> {
545        self.active_tasks.lock().unwrap().get(task_id).cloned()
546    }
547
548    /// List all active tasks
549    pub fn list_active_tasks(&self) -> Vec<TaskInfo> {
550        self.active_tasks
551            .lock()
552            .unwrap()
553            .values()
554            .cloned()
555            .collect()
556    }
557}
558
559impl Clone for TaskScheduler {
560    fn clone(&self) -> Self {
561        Self {
562            _config: self._config.clone(),
563            task_queue: Arc::clone(&self.task_queue),
564            semaphore: Arc::clone(&self.semaphore),
565            active_tasks: Arc::clone(&self.active_tasks),
566            scheduler_handle: Arc::clone(&self.scheduler_handle),
567            load_balancer: self.load_balancer.clone(),
568            task_timeout: self.task_timeout,
569            worker_idle_timeout: self.worker_idle_timeout,
570        }
571    }
572}
573
574#[cfg(test)]
575mod tests {
576    use super::{Task, TaskPriority, TaskResult, TaskScheduler};
577    use std::sync::atomic::{AtomicUsize, Ordering};
578    use std::sync::{Arc, Mutex};
579    use tokio::time::Duration;
580    use uuid::Uuid;
581
582    struct MockTask {
583        name: String,
584        duration: Duration,
585    }
586
587    #[async_trait::async_trait]
588    impl Task for MockTask {
589        async fn execute(&self) -> TaskResult {
590            tokio::time::sleep(self.duration).await;
591            TaskResult::Success(format!("Task completed: {}", self.name))
592        }
593        fn task_type(&self) -> &'static str {
594            "mock"
595        }
596        fn task_id(&self) -> String {
597            format!("mock_{}", self.name)
598        }
599    }
600
601    struct CounterTask {
602        counter: Arc<AtomicUsize>,
603    }
604    impl CounterTask {
605        fn new(counter: Arc<AtomicUsize>) -> Self {
606            Self { counter }
607        }
608    }
609    #[async_trait::async_trait]
610    impl Task for CounterTask {
611        async fn execute(&self) -> TaskResult {
612            self.counter.fetch_add(1, Ordering::SeqCst);
613            TaskResult::Success("Counter task completed".to_string())
614        }
615        fn task_type(&self) -> &'static str {
616            "counter"
617        }
618        fn task_id(&self) -> String {
619            Uuid::now_v7().to_string()
620        }
621    }
622
623    struct OrderTask {
624        name: String,
625        order: Arc<Mutex<Vec<String>>>,
626    }
627    impl OrderTask {
628        fn new(name: &str, order: Arc<Mutex<Vec<String>>>) -> Self {
629            Self {
630                name: name.to_string(),
631                order,
632            }
633        }
634    }
635    #[async_trait::async_trait]
636    impl Task for OrderTask {
637        async fn execute(&self) -> TaskResult {
638            let mut v = self.order.lock().unwrap();
639            v.push(self.name.clone());
640            TaskResult::Success(format!("Order task completed: {}", self.name))
641        }
642        fn task_type(&self) -> &'static str {
643            "order"
644        }
645        fn task_id(&self) -> String {
646            format!("order_{}", self.name)
647        }
648    }
649
650    #[tokio::test]
651    async fn test_task_scheduler_basic() {
652        let scheduler = TaskScheduler::new_with_defaults();
653        let task = Box::new(MockTask {
654            name: "test".to_string(),
655            duration: Duration::from_millis(10),
656        });
657        let result = scheduler.submit_task(task).await.unwrap();
658        assert!(matches!(result, TaskResult::Success(_)));
659    }
660
661    #[test]
662    fn counter_task_id_is_uuidv7() {
663        let counter = Arc::new(AtomicUsize::new(0));
664        let task = CounterTask::new(counter);
665        let id = task.task_id();
666        let parsed = Uuid::parse_str(&id).expect("task_id must be a valid UUID");
667        assert_eq!(parsed.get_version_num(), 7);
668    }
669
670    #[tokio::test]
671    async fn test_concurrent_task_execution() {
672        let scheduler = TaskScheduler::new_with_defaults();
673        let counter = Arc::new(AtomicUsize::new(0));
674
675        // Test single task
676        let task = Box::new(CounterTask::new(counter.clone()));
677        let result = scheduler.submit_task(task).await.unwrap();
678        assert!(matches!(result, TaskResult::Success(_)));
679        assert_eq!(counter.load(Ordering::SeqCst), 1);
680
681        // Test multiple tasks - serial execution
682        for _ in 0..4 {
683            let task = Box::new(CounterTask::new(counter.clone()));
684            let _result = scheduler.submit_task(task).await.unwrap();
685        }
686        assert_eq!(counter.load(Ordering::SeqCst), 5);
687    }
688
689    #[tokio::test]
690    async fn test_task_priority_ordering() {
691        let scheduler = TaskScheduler::new_with_defaults();
692        let order = Arc::new(Mutex::new(Vec::new()));
693
694        // Create tasks with different priorities
695        let tasks = vec![
696            (TaskPriority::Low, "low"),
697            (TaskPriority::High, "high"),
698            (TaskPriority::Normal, "normal"),
699            (TaskPriority::Critical, "critical"),
700        ];
701
702        let mut handles = Vec::new();
703        for (prio, name) in tasks {
704            let task = Box::new(OrderTask::new(name, order.clone()));
705            let scheduler_clone = scheduler.clone();
706            let handle = tokio::spawn(async move {
707                scheduler_clone
708                    .submit_task_with_priority(task, prio)
709                    .await
710                    .unwrap()
711            });
712            handles.push(handle);
713        }
714
715        // Wait for all tasks to complete
716        for handle in handles {
717            let _ = handle.await.unwrap();
718        }
719
720        let v = order.lock().unwrap();
721        assert_eq!(v.len(), 4);
722        // Due to priority scheduling, critical should execute first
723        assert!(v.contains(&"critical".to_string()));
724        assert!(v.contains(&"high".to_string()));
725        assert!(v.contains(&"normal".to_string()));
726        assert!(v.contains(&"low".to_string()));
727    }
728
729    #[tokio::test]
730    async fn test_queue_and_active_workers_metrics() {
731        let scheduler = TaskScheduler::new_with_defaults();
732
733        // Check initial state
734        assert_eq!(scheduler.get_queue_size(), 0);
735        assert_eq!(scheduler.get_active_workers(), 0);
736
737        // Submit a longer task
738        let task = Box::new(MockTask {
739            name: "long_task".to_string(),
740            duration: Duration::from_millis(100),
741        });
742
743        let handle = {
744            let scheduler_clone = scheduler.clone();
745            tokio::spawn(async move { scheduler_clone.submit_task(task).await })
746        };
747
748        // Wait a short time and check metrics
749        tokio::time::sleep(Duration::from_millis(20)).await;
750
751        // Complete task
752        let _result = handle.await.unwrap().unwrap();
753
754        // Check final state
755        assert_eq!(scheduler.get_queue_size(), 0);
756    }
757
758    #[tokio::test]
759    async fn test_continuous_scheduling() {
760        let scheduler = TaskScheduler::new_with_defaults();
761        let counter = Arc::new(AtomicUsize::new(0));
762
763        // Submit multiple tasks to queue
764        let mut handles = Vec::new();
765        for i in 0..10 {
766            let task = Box::new(CounterTask::new(counter.clone()));
767            let scheduler_clone = scheduler.clone();
768            let handle =
769                tokio::spawn(async move { scheduler_clone.submit_task(task).await.unwrap() });
770            handles.push(handle);
771
772            // Delayed submission to test continuous scheduling
773            if i % 3 == 0 {
774                tokio::time::sleep(Duration::from_millis(5)).await;
775            }
776        }
777
778        // Wait for all tasks to complete
779        for handle in handles {
780            let result = handle.await.unwrap();
781            assert!(matches!(result, TaskResult::Success(_)));
782        }
783
784        // Verify all tasks were executed
785        assert_eq!(counter.load(Ordering::SeqCst), 10);
786    }
787
788    #[tokio::test]
789    async fn test_batch_task_execution() {
790        let scheduler = TaskScheduler::new_with_defaults();
791        let counter = Arc::new(AtomicUsize::new(0));
792
793        // Use batch submission to test concurrent execution
794        let mut tasks: Vec<Box<dyn Task + Send + Sync>> = Vec::new();
795        for _ in 0..3 {
796            // Reduce task count to simplify test
797            tasks.push(Box::new(CounterTask::new(counter.clone())));
798        }
799
800        let results = scheduler.submit_batch_tasks(tasks).await;
801        assert_eq!(results.len(), 3);
802        assert_eq!(counter.load(Ordering::SeqCst), 3);
803        for result in results {
804            assert!(matches!(result, TaskResult::Success(_)));
805        }
806    }
807
808    #[tokio::test]
809    async fn test_high_concurrency_stress() {
810        let scheduler = TaskScheduler::new_with_defaults();
811        let counter = Arc::new(AtomicUsize::new(0));
812
813        // Create large number of tasks
814        let mut handles = Vec::new();
815        for i in 0..50 {
816            let task = Box::new(CounterTask::new(counter.clone()));
817            let scheduler_clone = scheduler.clone();
818            let priority = match i % 4 {
819                0 => TaskPriority::Low,
820                1 => TaskPriority::Normal,
821                2 => TaskPriority::High,
822                3 => TaskPriority::Critical,
823                _ => TaskPriority::Normal,
824            };
825
826            let handle = tokio::spawn(async move {
827                scheduler_clone
828                    .submit_task_with_priority(task, priority)
829                    .await
830                    .unwrap()
831            });
832            handles.push(handle);
833
834            // Interleaved submission to simulate real usage scenarios
835            if i % 5 == 0 {
836                tokio::time::sleep(Duration::from_millis(1)).await;
837            }
838        }
839
840        // Wait for all tasks to complete
841        for handle in handles {
842            let result = handle.await.unwrap();
843            assert!(matches!(result, TaskResult::Success(_)));
844        }
845
846        // Verify all tasks were executed
847        assert_eq!(counter.load(Ordering::SeqCst), 50);
848
849        // Check final state
850        assert_eq!(scheduler.get_queue_size(), 0);
851        assert_eq!(scheduler.get_active_workers(), 0);
852    }
853
854    #[tokio::test]
855    async fn test_mixed_batch_and_individual_tasks() {
856        let scheduler = TaskScheduler::new_with_defaults();
857        let counter = Arc::new(AtomicUsize::new(0));
858
859        // First submit some individual tasks
860        let mut individual_handles = Vec::new();
861        for _ in 0..3 {
862            let task = Box::new(CounterTask::new(counter.clone()));
863            let scheduler_clone = scheduler.clone();
864            let handle =
865                tokio::spawn(async move { scheduler_clone.submit_task(task).await.unwrap() });
866            individual_handles.push(handle);
867        }
868
869        // Then submit batch tasks
870        let mut batch_tasks: Vec<Box<dyn Task + Send + Sync>> = Vec::new();
871        for _ in 0..4 {
872            batch_tasks.push(Box::new(CounterTask::new(counter.clone())));
873        }
874
875        let batch_handle = {
876            let scheduler_clone = scheduler.clone();
877            tokio::spawn(async move { scheduler_clone.submit_batch_tasks(batch_tasks).await })
878        };
879
880        // Submit more individual tasks during batch execution
881        let mut more_individual_handles = Vec::new();
882        for _ in 0..2 {
883            let task = Box::new(CounterTask::new(counter.clone()));
884            let scheduler_clone = scheduler.clone();
885            let handle =
886                tokio::spawn(async move { scheduler_clone.submit_task(task).await.unwrap() });
887            more_individual_handles.push(handle);
888        }
889
890        // Wait for all tasks to complete
891        for handle in individual_handles {
892            let result = handle.await.unwrap();
893            assert!(matches!(result, TaskResult::Success(_)));
894        }
895
896        let batch_results = batch_handle.await.unwrap();
897        assert_eq!(batch_results.len(), 4);
898        for result in batch_results {
899            assert!(matches!(result, TaskResult::Success(_)));
900        }
901
902        for handle in more_individual_handles {
903            let result = handle.await.unwrap();
904            assert!(matches!(result, TaskResult::Success(_)));
905        }
906
907        // Verify total count is correct (3 + 4 + 2 = 9)
908        assert_eq!(counter.load(Ordering::SeqCst), 9);
909    }
910
911    /// Test task scheduling strategies with different priorities
912    #[tokio::test]
913    async fn test_task_scheduling_strategies() {
914        use std::sync::Arc;
915        use std::sync::atomic::{AtomicUsize, Ordering};
916
917        struct PriorityTask {
918            id: String,
919            priority: TaskPriority,
920            counter: Arc<AtomicUsize>,
921            execution_order: Arc<Mutex<Vec<String>>>,
922        }
923
924        #[async_trait::async_trait]
925        impl Task for PriorityTask {
926            async fn execute(&self) -> TaskResult {
927                self.counter.fetch_add(1, Ordering::SeqCst);
928                self.execution_order.lock().unwrap().push(self.id.clone());
929                // Longer delay to make priority effects more visible
930                tokio::time::sleep(Duration::from_millis(50)).await;
931                TaskResult::Success(format!("Priority task {} completed", self.id))
932            }
933            fn task_type(&self) -> &'static str {
934                "priority"
935            }
936            fn task_id(&self) -> String {
937                self.id.clone()
938            }
939        }
940
941        let scheduler = TaskScheduler::new_with_defaults();
942        let counter = Arc::new(AtomicUsize::new(0));
943        let execution_order = Arc::new(Mutex::new(Vec::new()));
944
945        // Submit tasks with different priorities
946        let priorities = vec![
947            ("low", TaskPriority::Low),
948            ("high", TaskPriority::High),
949            ("critical", TaskPriority::Critical),
950            ("normal", TaskPriority::Normal),
951        ];
952
953        for (id, priority) in priorities {
954            let task = PriorityTask {
955                id: id.to_string(),
956                priority,
957                counter: Arc::clone(&counter),
958                execution_order: Arc::clone(&execution_order),
959            };
960
961            scheduler
962                .submit_task_with_priority(Box::new(task), priority)
963                .await
964                .unwrap();
965        }
966
967        // Wait for all tasks to complete
968        tokio::time::sleep(Duration::from_millis(200)).await;
969
970        // Verify all tasks were executed
971        let final_count = counter.load(Ordering::SeqCst);
972        assert_eq!(final_count, 4, "All 4 tasks should have been executed");
973
974        // Verify execution order respects priority (highest first)
975        let order = execution_order.lock().unwrap();
976        println!("Task execution order: {:?}", *order);
977
978        // Check that all tasks were executed, but don't strictly enforce ordering
979        // since concurrent execution can vary
980        assert!(
981            order.contains(&"critical".to_string()),
982            "Critical task should have been executed"
983        );
984        assert!(
985            order.contains(&"low".to_string()),
986            "Low task should have been executed"
987        );
988        assert!(
989            order.contains(&"high".to_string()),
990            "High task should have been executed"
991        );
992        assert!(
993            order.contains(&"normal".to_string()),
994            "Normal task should have been executed"
995        );
996    }
997
998    /// Test load balancing across multiple workers
999    #[tokio::test]
1000    async fn test_load_balancing() {
1001        let scheduler = TaskScheduler::new_with_defaults();
1002        let task_counter = Arc::new(AtomicUsize::new(0));
1003
1004        // Submit multiple tasks concurrently
1005        for _i in 0..10 {
1006            let task = CounterTask::new(Arc::clone(&task_counter));
1007            let result = scheduler.submit_task(Box::new(task)).await.unwrap();
1008            assert!(matches!(result, TaskResult::Success(_)));
1009        }
1010
1011        // Verify all tasks were processed
1012        let final_count = task_counter.load(Ordering::SeqCst);
1013        assert_eq!(final_count, 10);
1014
1015        // Check scheduler queue is empty
1016        assert_eq!(scheduler.get_queue_size(), 0);
1017    }
1018
1019    /// Test task priority handling
1020    #[tokio::test]
1021    async fn test_task_priority_processing() {
1022        let scheduler = TaskScheduler::new_with_defaults();
1023
1024        // Test priority comparison
1025        assert!(TaskPriority::Critical > TaskPriority::High);
1026        assert!(TaskPriority::High > TaskPriority::Normal);
1027        assert!(TaskPriority::Normal > TaskPriority::Low);
1028
1029        // Submit tasks with different priorities and verify they're handled
1030        let high_task = MockTask {
1031            name: "high_priority".to_string(),
1032            duration: Duration::from_millis(5),
1033        };
1034
1035        let low_task = MockTask {
1036            name: "low_priority".to_string(),
1037            duration: Duration::from_millis(5),
1038        };
1039
1040        let high_result = scheduler
1041            .submit_task_with_priority(Box::new(high_task), TaskPriority::High)
1042            .await
1043            .unwrap();
1044        let low_result = scheduler
1045            .submit_task_with_priority(Box::new(low_task), TaskPriority::Low)
1046            .await
1047            .unwrap();
1048
1049        assert!(matches!(high_result, TaskResult::Success(_)));
1050        assert!(matches!(low_result, TaskResult::Success(_)));
1051    }
1052
1053    /// Test scheduler state management
1054    #[tokio::test]
1055    async fn test_scheduler_state_management() {
1056        let scheduler = TaskScheduler::new_with_defaults();
1057
1058        // Initial state
1059        assert_eq!(scheduler.get_queue_size(), 0);
1060        assert_eq!(scheduler.get_active_workers(), 0);
1061
1062        // Submit a task
1063        let task = MockTask {
1064            name: "state_test".to_string(),
1065            duration: Duration::from_millis(50),
1066        };
1067
1068        let result = scheduler.submit_task(Box::new(task)).await.unwrap();
1069
1070        // Queue should increase temporarily
1071        tokio::time::sleep(Duration::from_millis(5)).await;
1072
1073        // Wait for completion
1074        assert!(matches!(result, TaskResult::Success(_)));
1075
1076        // State should return to initial
1077        assert_eq!(scheduler.get_queue_size(), 0);
1078    }
1079
1080    /// Test scheduler overflow strategies
1081    #[tokio::test]
1082    async fn test_overflow_strategy_handling() {
1083        let scheduler = TaskScheduler::new_with_defaults();
1084
1085        // Submit many long-running tasks to potentially trigger overflow
1086        for i in 0..20 {
1087            let task = MockTask {
1088                name: format!("overflow_test_{}", i),
1089                duration: Duration::from_millis(20),
1090            };
1091
1092            match scheduler.submit_task(Box::new(task)).await {
1093                Ok(result) => {
1094                    assert!(matches!(result, TaskResult::Success(_)));
1095                }
1096                Err(_) => {
1097                    // Some tasks might be rejected due to overflow, which is acceptable
1098                    break;
1099                }
1100            }
1101        }
1102
1103        // Wait for tasks to complete
1104        tokio::time::sleep(Duration::from_millis(100)).await;
1105
1106        // Scheduler should recover to normal state
1107        assert_eq!(scheduler.get_queue_size(), 0);
1108    }
1109
1110    /// Test concurrent task submission and execution
1111    #[tokio::test]
1112    async fn test_concurrent_task_submission() {
1113        let scheduler = TaskScheduler::new_with_defaults();
1114        let completion_counter = Arc::new(AtomicUsize::new(0));
1115        let mut submission_handles = Vec::new();
1116
1117        // Submit tasks concurrently from multiple threads
1118        for _i in 0..8 {
1119            let scheduler_clone = scheduler.clone();
1120            let counter_clone = Arc::clone(&completion_counter);
1121
1122            let submission_handle = tokio::spawn(async move {
1123                let task = CounterTask::new(counter_clone);
1124                scheduler_clone.submit_task(Box::new(task)).await.unwrap()
1125            });
1126
1127            submission_handles.push(submission_handle);
1128        }
1129
1130        // Wait for all concurrent submissions to complete
1131        for handle in submission_handles {
1132            let result = handle.await.unwrap();
1133            assert!(matches!(result, TaskResult::Success(_)));
1134        }
1135
1136        // Verify all tasks completed
1137        let final_count = completion_counter.load(Ordering::SeqCst);
1138        assert_eq!(final_count, 8);
1139    }
1140
1141    /// Test scheduler performance metrics
1142    #[tokio::test]
1143    async fn test_scheduler_performance_metrics() {
1144        let scheduler = TaskScheduler::new_with_defaults();
1145        let start_time = std::time::Instant::now();
1146        let task_count = 5;
1147
1148        // Submit multiple tasks
1149        for i in 0..task_count {
1150            let task = MockTask {
1151                name: format!("perf_test_{}", i),
1152                duration: Duration::from_millis(10),
1153            };
1154            let result = scheduler.submit_task(Box::new(task)).await.unwrap();
1155            assert!(matches!(result, TaskResult::Success(_)));
1156        }
1157
1158        let total_time = start_time.elapsed();
1159
1160        // Verify reasonable performance (tasks should complete in reasonable time)
1161        assert!(
1162            total_time < Duration::from_millis(500),
1163            "Tasks took too long: {:?}",
1164            total_time
1165        );
1166
1167        // Verify final state
1168        assert_eq!(scheduler.get_queue_size(), 0);
1169        assert_eq!(scheduler.get_active_workers(), 0);
1170    }
1171
1172    /// Verify that `ActiveTaskGuard` removes its entry from `active_tasks`
1173    /// when dropped, independently of any explicit cleanup code path.
1174    #[tokio::test]
1175    async fn test_active_task_guard_cleanup() {
1176        use super::{ActiveTaskGuard, TaskInfo};
1177        use std::collections::HashMap;
1178
1179        let active_tasks = Arc::new(Mutex::new(HashMap::<String, TaskInfo>::new()));
1180        let task_id = "guard_test_task".to_string();
1181
1182        active_tasks.lock().unwrap().insert(
1183            task_id.clone(),
1184            TaskInfo {
1185                task_id: task_id.clone(),
1186                task_type: "mock".to_string(),
1187                status: crate::core::parallel::TaskStatus::Pending,
1188                start_time: std::time::Instant::now(),
1189                progress: 0.0,
1190            },
1191        );
1192        assert!(active_tasks.lock().unwrap().contains_key(&task_id));
1193
1194        {
1195            let _guard = ActiveTaskGuard {
1196                active_tasks: Arc::clone(&active_tasks),
1197                task_id: task_id.clone(),
1198            };
1199            // Guard is alive here; entry still present.
1200            assert!(active_tasks.lock().unwrap().contains_key(&task_id));
1201        }
1202
1203        // After the guard drops, the entry must be gone.
1204        assert!(!active_tasks.lock().unwrap().contains_key(&task_id));
1205    }
1206
1207    /// Verify that the `DropOldest` overflow strategy delivers a
1208    /// `TaskResult::Failed` back to the evicted submitter rather than
1209    /// silently dropping its oneshot sender.
1210    #[tokio::test]
1211    async fn test_drop_oldest_sends_failed() {
1212        use crate::config::{Config, OverflowStrategy};
1213
1214        let mut config = Config::default();
1215        config.parallel.task_queue_size = 1;
1216        config.general.max_concurrent_jobs = 1;
1217        config.parallel.overflow_strategy = OverflowStrategy::DropOldest;
1218        config.parallel.enable_task_priorities = false;
1219        config.parallel.auto_balance_workers = false;
1220
1221        let scheduler = TaskScheduler::new_with_config(&config).unwrap();
1222
1223        // Occupy the single worker with a slow task so the queue fills.
1224        let blocker = Box::new(MockTask {
1225            name: "blocker".to_string(),
1226            duration: Duration::from_millis(300),
1227        });
1228        let blocker_scheduler = scheduler.clone();
1229        let blocker_handle =
1230            tokio::spawn(async move { blocker_scheduler.submit_task(blocker).await });
1231
1232        // Give the blocker a moment to start running.
1233        tokio::time::sleep(Duration::from_millis(30)).await;
1234
1235        // Fill the queue slot.
1236        let first = Box::new(MockTask {
1237            name: "first_queued".to_string(),
1238            duration: Duration::from_millis(50),
1239        });
1240        let first_scheduler = scheduler.clone();
1241        let first_handle = tokio::spawn(async move { first_scheduler.submit_task(first).await });
1242
1243        // Ensure the first queued task is actually enqueued before the second.
1244        tokio::time::sleep(Duration::from_millis(30)).await;
1245
1246        // Submitting a second task should evict the first with DropOldest.
1247        let second = Box::new(MockTask {
1248            name: "second_queued".to_string(),
1249            duration: Duration::from_millis(10),
1250        });
1251        let second_scheduler = scheduler.clone();
1252        let second_handle = tokio::spawn(async move { second_scheduler.submit_task(second).await });
1253
1254        // The evicted (first) task must receive a Failed result.
1255        let first_result = first_handle.await.unwrap().unwrap();
1256        match first_result {
1257            TaskResult::Failed(msg) => {
1258                assert!(
1259                    msg.contains("overflow"),
1260                    "expected overflow-related failure message, got: {}",
1261                    msg
1262                );
1263            }
1264            other => panic!("expected Failed for evicted task, got {:?}", other),
1265        }
1266
1267        // The blocker and second task should still complete successfully.
1268        let blocker_result = blocker_handle.await.unwrap().unwrap();
1269        assert!(matches!(blocker_result, TaskResult::Success(_)));
1270        let second_result = second_handle.await.unwrap().unwrap();
1271        assert!(matches!(second_result, TaskResult::Success(_)));
1272    }
1273
1274    /// Verify that submitting a task after the scheduler loop has exited
1275    /// due to idle timeout transparently restarts the loop and completes
1276    /// the new task.
1277    #[tokio::test]
1278    async fn test_scheduler_restart_after_idle() {
1279        let mut scheduler = TaskScheduler::new_with_defaults();
1280
1281        // Abort the default scheduler loop so we can restart it with a
1282        // short idle timeout.
1283        {
1284            let mut handle = scheduler.scheduler_handle.lock().unwrap();
1285            if let Some(h) = handle.take() {
1286                h.abort();
1287            }
1288        }
1289        // Allow the aborted task to be fully cancelled before we continue.
1290        tokio::time::sleep(Duration::from_millis(30)).await;
1291
1292        scheduler.worker_idle_timeout = Duration::from_millis(100);
1293        scheduler.start_scheduler_loop();
1294
1295        // Submit and complete a task normally.
1296        let t1 = Box::new(MockTask {
1297            name: "before_idle".to_string(),
1298            duration: Duration::from_millis(10),
1299        });
1300        let r1 = scheduler.submit_task(t1).await.unwrap();
1301        assert!(matches!(r1, TaskResult::Success(_)));
1302
1303        // Wait well past the idle timeout so the loop exits on its own.
1304        tokio::time::sleep(Duration::from_millis(350)).await;
1305
1306        let loop_finished = {
1307            let handle = scheduler.scheduler_handle.lock().unwrap();
1308            handle.as_ref().map(|h| h.is_finished()).unwrap_or(true)
1309        };
1310        assert!(
1311            loop_finished,
1312            "scheduler loop should have exited after idle timeout"
1313        );
1314
1315        // Submitting again must restart the loop and run the task.
1316        let t2 = Box::new(MockTask {
1317            name: "after_idle".to_string(),
1318            duration: Duration::from_millis(10),
1319        });
1320        let r2 = scheduler.submit_task(t2).await.unwrap();
1321        assert!(matches!(r2, TaskResult::Success(_)));
1322
1323        // The handle should now point at a running loop again.
1324        let still_running = {
1325            let handle = scheduler.scheduler_handle.lock().unwrap();
1326            handle.as_ref().map(|h| !h.is_finished()).unwrap_or(false)
1327        };
1328        assert!(
1329            still_running,
1330            "scheduler loop should be running after restart"
1331        );
1332    }
1333}