Skip to main content

morph_cli/core/execution/
scheduler.rs

1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::{Arc, Mutex};
4use std::time::Duration;
5
6pub struct TaskScheduler {
7    tasks: Arc<Mutex<Vec<Task>>>,
8    results: Arc<Mutex<HashMap<usize, TaskResult>>>,
9    metrics: Arc<Mutex<ExecutionMetrics>>,
10}
11
12#[derive(Debug, Clone)]
13pub struct Task {
14    pub id: usize,
15    pub name: String,
16    pub task_type: TaskType,
17    pub source: PathBuf,
18    pub priority: u8,
19}
20
21#[derive(Debug, Clone)]
22pub enum TaskType {
23    Parse,
24    Transform,
25    Analyze,
26    Format,
27}
28
29#[derive(Debug, Clone)]
30pub struct TaskResult {
31    pub task_id: usize,
32    pub success: bool,
33    pub duration: Duration,
34    pub output: Option<String>,
35    pub error: Option<String>,
36}
37
38#[derive(Debug, Clone, Default)]
39pub struct ExecutionMetrics {
40    pub total_tasks: usize,
41    pub completed_tasks: usize,
42    pub failed_tasks: usize,
43    pub parse_time_ms: u64,
44    pub transform_time_ms: u64,
45    pub total_time_ms: u64,
46    pub peak_memory_mb: Option<u64>,
47}
48
49impl ExecutionMetrics {
50    pub fn add_parse_time(&mut self, ms: u64) {
51        self.parse_time_ms += ms;
52    }
53
54    pub fn add_transform_time(&mut self, ms: u64) {
55        self.transform_time_ms += ms;
56    }
57
58    pub fn record_completion(&mut self, success: bool) {
59        self.completed_tasks += 1;
60        if !success {
61            self.failed_tasks += 1;
62        }
63    }
64
65    pub fn summary(&self) -> String {
66        format!(
67            "Tasks: {}/{} completed, {} failed. Time: parse={}ms, transform={}ms, total={}ms",
68            self.completed_tasks,
69            self.total_tasks,
70            self.failed_tasks,
71            self.parse_time_ms,
72            self.transform_time_ms,
73            self.total_time_ms
74        )
75    }
76}
77
78impl TaskScheduler {
79    pub fn new() -> Self {
80        Self {
81            tasks: Arc::new(Mutex::new(Vec::new())),
82            results: Arc::new(Mutex::new(HashMap::new())),
83            metrics: Arc::new(Mutex::new(ExecutionMetrics::default())),
84        }
85    }
86
87    pub fn add_task(&mut self, task: Task) {
88        let mut tasks = self.tasks.lock().unwrap();
89        tasks.push(task);
90    }
91
92    pub fn add_tasks(&mut self, tasks: Vec<Task>) {
93        let mut queue = self.tasks.lock().unwrap();
94        queue.extend(tasks);
95    }
96
97    pub fn get_next_task(&self) -> Option<Task> {
98        let mut tasks = self.tasks.lock().unwrap();
99        tasks.sort_by_key(|t| t.priority);
100        tasks.pop()
101    }
102
103    pub fn get_tasks(&self) -> Vec<Task> {
104        let tasks = self.tasks.lock().unwrap();
105        tasks.clone()
106    }
107
108    pub fn record_result(&self, result: TaskResult) {
109        let mut results = self.results.lock().unwrap();
110        results.insert(result.task_id, result);
111    }
112
113    pub fn get_result(&self, task_id: usize) -> Option<TaskResult> {
114        let results = self.results.lock().unwrap();
115        results.get(&task_id).cloned()
116    }
117
118    pub fn get_metrics(&self) -> ExecutionMetrics {
119        let metrics = self.metrics.lock().unwrap();
120        metrics.clone()
121    }
122
123    pub fn update_metrics(&self, metrics: ExecutionMetrics) {
124        let mut current = self.metrics.lock().unwrap();
125        *current = metrics;
126    }
127
128    pub fn task_count(&self) -> usize {
129        let tasks = self.tasks.lock().unwrap();
130        tasks.len()
131    }
132
133    pub fn completed_count(&self) -> usize {
134        let results = self.results.lock().unwrap();
135        results.len()
136    }
137
138    pub fn is_empty(&self) -> bool {
139        let tasks = self.tasks.lock().unwrap();
140        tasks.is_empty()
141    }
142
143    pub fn clear(&mut self) {
144        let mut tasks = self.tasks.lock().unwrap();
145        tasks.clear();
146        drop(tasks);
147
148        let mut results = self.results.lock().unwrap();
149        results.clear();
150    }
151}
152
153impl Default for TaskScheduler {
154    fn default() -> Self {
155        Self::new()
156    }
157}
158
159pub struct SchedulerBuilder {
160    max_concurrent: usize,
161    priority_threshold: u8,
162}
163
164impl SchedulerBuilder {
165    pub fn new() -> Self {
166        Self {
167            max_concurrent: num_cpus(),
168            priority_threshold: 0,
169        }
170    }
171
172    pub fn max_concurrent(mut self, max: usize) -> Self {
173        self.max_concurrent = max;
174        self
175    }
176
177    pub fn priority_threshold(mut self, threshold: u8) -> Self {
178        self.priority_threshold = threshold;
179        self
180    }
181
182    pub fn build(self) -> TaskScheduler {
183        TaskScheduler::new()
184    }
185}
186
187impl Default for SchedulerBuilder {
188    fn default() -> Self {
189        Self::new()
190    }
191}
192
193fn num_cpus() -> usize {
194    std::thread::available_parallelism()
195        .map(|n| n.get())
196        .unwrap_or(1)
197}
198
199pub fn schedule_tasks(tasks: Vec<Task>, max_workers: usize) -> Vec<Vec<Task>> {
200    let chunk_size = (tasks.len() / max_workers.max(1)).max(1);
201
202    tasks
203        .chunks(chunk_size)
204        .map(|chunk| chunk.to_vec())
205        .collect()
206}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211
212    fn create_test_task(id: usize) -> Task {
213        Task {
214            id,
215            name: format!("Task {}", id),
216            task_type: TaskType::Parse,
217            source: PathBuf::from(format!("file{}.js", id)),
218            priority: 0,
219        }
220    }
221
222    #[test]
223    fn test_task_scheduler_new() {
224        let scheduler = TaskScheduler::new();
225        assert!(scheduler.is_empty());
226        assert_eq!(scheduler.task_count(), 0);
227    }
228
229    #[test]
230    fn test_add_task() {
231        let mut scheduler = TaskScheduler::new();
232        scheduler.add_task(create_test_task(1));
233        assert_eq!(scheduler.task_count(), 1);
234    }
235
236    #[test]
237    fn test_add_tasks() {
238        let mut scheduler = TaskScheduler::new();
239        let tasks: Vec<Task> = (1..=5).map(create_test_task).collect();
240        scheduler.add_tasks(tasks);
241        assert_eq!(scheduler.task_count(), 5);
242    }
243
244    #[test]
245    fn test_get_next_task() {
246        let mut scheduler = TaskScheduler::new();
247        scheduler.add_task(create_test_task(1));
248        scheduler.add_task(create_test_task(2));
249
250        let task = scheduler.get_next_task();
251        assert!(task.is_some());
252    }
253
254    #[test]
255    fn test_record_result() {
256        let scheduler = TaskScheduler::new();
257        let result = TaskResult {
258            task_id: 1,
259            success: true,
260            duration: Duration::from_millis(100),
261            output: Some("done".to_string()),
262            error: None,
263        };
264
265        scheduler.record_result(result);
266        let stored = scheduler.get_result(1);
267        assert!(stored.is_some());
268        assert!(stored.unwrap().success);
269    }
270
271    #[test]
272    fn test_metrics() {
273        let scheduler = TaskScheduler::new();
274        let metrics = scheduler.get_metrics();
275        assert_eq!(metrics.total_tasks, 0);
276    }
277
278    #[test]
279    fn test_execution_metrics_add_times() {
280        let mut metrics = ExecutionMetrics::default();
281        metrics.add_parse_time(100);
282        metrics.add_transform_time(200);
283        assert_eq!(metrics.parse_time_ms, 100);
284        assert_eq!(metrics.transform_time_ms, 200);
285    }
286
287    #[test]
288    fn test_execution_metrics_record_completion() {
289        let mut metrics = ExecutionMetrics::default();
290        metrics.total_tasks = 5;
291        metrics.record_completion(true);
292        assert_eq!(metrics.completed_tasks, 1);
293
294        metrics.record_completion(false);
295        assert_eq!(metrics.failed_tasks, 1);
296    }
297
298    #[test]
299    fn test_schedule_tasks() {
300        let tasks: Vec<Task> = (0..10).map(|i| create_test_task(i)).collect();
301        let chunks = schedule_tasks(tasks, 3);
302
303        assert!(chunks.len() <= 4);
304        let total: usize = chunks.iter().map(|c| c.len()).sum();
305        assert_eq!(total, 10);
306    }
307
308    #[test]
309    fn test_scheduler_builder() {
310        let scheduler = SchedulerBuilder::new()
311            .max_concurrent(8)
312            .priority_threshold(5)
313            .build();
314
315        assert!(scheduler.is_empty());
316    }
317
318    #[test]
319    fn test_clear_scheduler() {
320        let mut scheduler = TaskScheduler::new();
321        scheduler.add_task(create_test_task(1));
322        scheduler.clear();
323        assert!(scheduler.is_empty());
324    }
325}