use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::Duration;
pub struct TaskScheduler {
tasks: Arc<Mutex<Vec<Task>>>,
results: Arc<Mutex<HashMap<usize, TaskResult>>>,
metrics: Arc<Mutex<ExecutionMetrics>>,
}
#[derive(Debug, Clone)]
pub struct Task {
pub id: usize,
pub name: String,
pub task_type: TaskType,
pub source: PathBuf,
pub priority: u8,
}
#[derive(Debug, Clone)]
pub enum TaskType {
Parse,
Transform,
Analyze,
Format,
}
#[derive(Debug, Clone)]
pub struct TaskResult {
pub task_id: usize,
pub success: bool,
pub duration: Duration,
pub output: Option<String>,
pub error: Option<String>,
}
#[derive(Debug, Clone, Default)]
pub struct ExecutionMetrics {
pub total_tasks: usize,
pub completed_tasks: usize,
pub failed_tasks: usize,
pub parse_time_ms: u64,
pub transform_time_ms: u64,
pub total_time_ms: u64,
pub peak_memory_mb: Option<u64>,
}
impl ExecutionMetrics {
pub fn add_parse_time(&mut self, ms: u64) {
self.parse_time_ms += ms;
}
pub fn add_transform_time(&mut self, ms: u64) {
self.transform_time_ms += ms;
}
pub fn record_completion(&mut self, success: bool) {
self.completed_tasks += 1;
if !success {
self.failed_tasks += 1;
}
}
pub fn summary(&self) -> String {
format!(
"Tasks: {}/{} completed, {} failed. Time: parse={}ms, transform={}ms, total={}ms",
self.completed_tasks,
self.total_tasks,
self.failed_tasks,
self.parse_time_ms,
self.transform_time_ms,
self.total_time_ms
)
}
}
impl TaskScheduler {
pub fn new() -> Self {
Self {
tasks: Arc::new(Mutex::new(Vec::new())),
results: Arc::new(Mutex::new(HashMap::new())),
metrics: Arc::new(Mutex::new(ExecutionMetrics::default())),
}
}
pub fn add_task(&mut self, task: Task) {
let mut tasks = self.tasks.lock().unwrap();
tasks.push(task);
}
pub fn add_tasks(&mut self, tasks: Vec<Task>) {
let mut queue = self.tasks.lock().unwrap();
queue.extend(tasks);
}
pub fn get_next_task(&self) -> Option<Task> {
let mut tasks = self.tasks.lock().unwrap();
tasks.sort_by_key(|t| t.priority);
tasks.pop()
}
pub fn get_tasks(&self) -> Vec<Task> {
let tasks = self.tasks.lock().unwrap();
tasks.clone()
}
pub fn record_result(&self, result: TaskResult) {
let mut results = self.results.lock().unwrap();
results.insert(result.task_id, result);
}
pub fn get_result(&self, task_id: usize) -> Option<TaskResult> {
let results = self.results.lock().unwrap();
results.get(&task_id).cloned()
}
pub fn get_metrics(&self) -> ExecutionMetrics {
let metrics = self.metrics.lock().unwrap();
metrics.clone()
}
pub fn update_metrics(&self, metrics: ExecutionMetrics) {
let mut current = self.metrics.lock().unwrap();
*current = metrics;
}
pub fn task_count(&self) -> usize {
let tasks = self.tasks.lock().unwrap();
tasks.len()
}
pub fn completed_count(&self) -> usize {
let results = self.results.lock().unwrap();
results.len()
}
pub fn is_empty(&self) -> bool {
let tasks = self.tasks.lock().unwrap();
tasks.is_empty()
}
pub fn clear(&mut self) {
let mut tasks = self.tasks.lock().unwrap();
tasks.clear();
drop(tasks);
let mut results = self.results.lock().unwrap();
results.clear();
}
}
impl Default for TaskScheduler {
fn default() -> Self {
Self::new()
}
}
pub struct SchedulerBuilder {
max_concurrent: usize,
priority_threshold: u8,
}
impl SchedulerBuilder {
pub fn new() -> Self {
Self {
max_concurrent: num_cpus(),
priority_threshold: 0,
}
}
pub fn max_concurrent(mut self, max: usize) -> Self {
self.max_concurrent = max;
self
}
pub fn priority_threshold(mut self, threshold: u8) -> Self {
self.priority_threshold = threshold;
self
}
pub fn build(self) -> TaskScheduler {
TaskScheduler::new()
}
}
impl Default for SchedulerBuilder {
fn default() -> Self {
Self::new()
}
}
fn num_cpus() -> usize {
std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1)
}
pub fn schedule_tasks(tasks: Vec<Task>, max_workers: usize) -> Vec<Vec<Task>> {
let chunk_size = (tasks.len() / max_workers.max(1)).max(1);
tasks
.chunks(chunk_size)
.map(|chunk| chunk.to_vec())
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_task(id: usize) -> Task {
Task {
id,
name: format!("Task {}", id),
task_type: TaskType::Parse,
source: PathBuf::from(format!("file{}.js", id)),
priority: 0,
}
}
#[test]
fn test_task_scheduler_new() {
let scheduler = TaskScheduler::new();
assert!(scheduler.is_empty());
assert_eq!(scheduler.task_count(), 0);
}
#[test]
fn test_add_task() {
let mut scheduler = TaskScheduler::new();
scheduler.add_task(create_test_task(1));
assert_eq!(scheduler.task_count(), 1);
}
#[test]
fn test_add_tasks() {
let mut scheduler = TaskScheduler::new();
let tasks: Vec<Task> = (1..=5).map(create_test_task).collect();
scheduler.add_tasks(tasks);
assert_eq!(scheduler.task_count(), 5);
}
#[test]
fn test_get_next_task() {
let mut scheduler = TaskScheduler::new();
scheduler.add_task(create_test_task(1));
scheduler.add_task(create_test_task(2));
let task = scheduler.get_next_task();
assert!(task.is_some());
}
#[test]
fn test_record_result() {
let scheduler = TaskScheduler::new();
let result = TaskResult {
task_id: 1,
success: true,
duration: Duration::from_millis(100),
output: Some("done".to_string()),
error: None,
};
scheduler.record_result(result);
let stored = scheduler.get_result(1);
assert!(stored.is_some());
assert!(stored.unwrap().success);
}
#[test]
fn test_metrics() {
let scheduler = TaskScheduler::new();
let metrics = scheduler.get_metrics();
assert_eq!(metrics.total_tasks, 0);
}
#[test]
fn test_execution_metrics_add_times() {
let mut metrics = ExecutionMetrics::default();
metrics.add_parse_time(100);
metrics.add_transform_time(200);
assert_eq!(metrics.parse_time_ms, 100);
assert_eq!(metrics.transform_time_ms, 200);
}
#[test]
fn test_execution_metrics_record_completion() {
let mut metrics = ExecutionMetrics::default();
metrics.total_tasks = 5;
metrics.record_completion(true);
assert_eq!(metrics.completed_tasks, 1);
metrics.record_completion(false);
assert_eq!(metrics.failed_tasks, 1);
}
#[test]
fn test_schedule_tasks() {
let tasks: Vec<Task> = (0..10).map(|i| create_test_task(i)).collect();
let chunks = schedule_tasks(tasks, 3);
assert!(chunks.len() <= 4);
let total: usize = chunks.iter().map(|c| c.len()).sum();
assert_eq!(total, 10);
}
#[test]
fn test_scheduler_builder() {
let scheduler = SchedulerBuilder::new()
.max_concurrent(8)
.priority_threshold(5)
.build();
assert!(scheduler.is_empty());
}
#[test]
fn test_clear_scheduler() {
let mut scheduler = TaskScheduler::new();
scheduler.add_task(create_test_task(1));
scheduler.clear();
assert!(scheduler.is_empty());
}
}