morph_cli/core/execution/
scheduler.rs1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::{Arc, Mutex};
4use std::time::Duration;
5
6pub struct TaskScheduler {
7 tasks: Arc<Mutex<Vec<Task>>>,
8 results: Arc<Mutex<HashMap<usize, TaskResult>>>,
9 metrics: Arc<Mutex<ExecutionMetrics>>,
10}
11
12#[derive(Debug, Clone)]
13pub struct Task {
14 pub id: usize,
15 pub name: String,
16 pub task_type: TaskType,
17 pub source: PathBuf,
18 pub priority: u8,
19}
20
21#[derive(Debug, Clone)]
22pub enum TaskType {
23 Parse,
24 Transform,
25 Analyze,
26 Format,
27}
28
29#[derive(Debug, Clone)]
30pub struct TaskResult {
31 pub task_id: usize,
32 pub success: bool,
33 pub duration: Duration,
34 pub output: Option<String>,
35 pub error: Option<String>,
36}
37
38#[derive(Debug, Clone, Default)]
39pub struct ExecutionMetrics {
40 pub total_tasks: usize,
41 pub completed_tasks: usize,
42 pub failed_tasks: usize,
43 pub parse_time_ms: u64,
44 pub transform_time_ms: u64,
45 pub total_time_ms: u64,
46 pub peak_memory_mb: Option<u64>,
47}
48
49impl ExecutionMetrics {
50 pub fn add_parse_time(&mut self, ms: u64) {
51 self.parse_time_ms += ms;
52 }
53
54 pub fn add_transform_time(&mut self, ms: u64) {
55 self.transform_time_ms += ms;
56 }
57
58 pub fn record_completion(&mut self, success: bool) {
59 self.completed_tasks += 1;
60 if !success {
61 self.failed_tasks += 1;
62 }
63 }
64
65 pub fn summary(&self) -> String {
66 format!(
67 "Tasks: {}/{} completed, {} failed. Time: parse={}ms, transform={}ms, total={}ms",
68 self.completed_tasks,
69 self.total_tasks,
70 self.failed_tasks,
71 self.parse_time_ms,
72 self.transform_time_ms,
73 self.total_time_ms
74 )
75 }
76}
77
78impl TaskScheduler {
79 pub fn new() -> Self {
80 Self {
81 tasks: Arc::new(Mutex::new(Vec::new())),
82 results: Arc::new(Mutex::new(HashMap::new())),
83 metrics: Arc::new(Mutex::new(ExecutionMetrics::default())),
84 }
85 }
86
87 pub fn add_task(&mut self, task: Task) {
88 let mut tasks = self.tasks.lock().unwrap();
89 tasks.push(task);
90 }
91
92 pub fn add_tasks(&mut self, tasks: Vec<Task>) {
93 let mut queue = self.tasks.lock().unwrap();
94 queue.extend(tasks);
95 }
96
97 pub fn get_next_task(&self) -> Option<Task> {
98 let mut tasks = self.tasks.lock().unwrap();
99 tasks.sort_by_key(|t| t.priority);
100 tasks.pop()
101 }
102
103 pub fn get_tasks(&self) -> Vec<Task> {
104 let tasks = self.tasks.lock().unwrap();
105 tasks.clone()
106 }
107
108 pub fn record_result(&self, result: TaskResult) {
109 let mut results = self.results.lock().unwrap();
110 results.insert(result.task_id, result);
111 }
112
113 pub fn get_result(&self, task_id: usize) -> Option<TaskResult> {
114 let results = self.results.lock().unwrap();
115 results.get(&task_id).cloned()
116 }
117
118 pub fn get_metrics(&self) -> ExecutionMetrics {
119 let metrics = self.metrics.lock().unwrap();
120 metrics.clone()
121 }
122
123 pub fn update_metrics(&self, metrics: ExecutionMetrics) {
124 let mut current = self.metrics.lock().unwrap();
125 *current = metrics;
126 }
127
128 pub fn task_count(&self) -> usize {
129 let tasks = self.tasks.lock().unwrap();
130 tasks.len()
131 }
132
133 pub fn completed_count(&self) -> usize {
134 let results = self.results.lock().unwrap();
135 results.len()
136 }
137
138 pub fn is_empty(&self) -> bool {
139 let tasks = self.tasks.lock().unwrap();
140 tasks.is_empty()
141 }
142
143 pub fn clear(&mut self) {
144 let mut tasks = self.tasks.lock().unwrap();
145 tasks.clear();
146 drop(tasks);
147
148 let mut results = self.results.lock().unwrap();
149 results.clear();
150 }
151}
152
153impl Default for TaskScheduler {
154 fn default() -> Self {
155 Self::new()
156 }
157}
158
159pub struct SchedulerBuilder {
160 max_concurrent: usize,
161 priority_threshold: u8,
162}
163
164impl SchedulerBuilder {
165 pub fn new() -> Self {
166 Self {
167 max_concurrent: num_cpus(),
168 priority_threshold: 0,
169 }
170 }
171
172 pub fn max_concurrent(mut self, max: usize) -> Self {
173 self.max_concurrent = max;
174 self
175 }
176
177 pub fn priority_threshold(mut self, threshold: u8) -> Self {
178 self.priority_threshold = threshold;
179 self
180 }
181
182 pub fn build(self) -> TaskScheduler {
183 TaskScheduler::new()
184 }
185}
186
187impl Default for SchedulerBuilder {
188 fn default() -> Self {
189 Self::new()
190 }
191}
192
193fn num_cpus() -> usize {
194 std::thread::available_parallelism()
195 .map(|n| n.get())
196 .unwrap_or(1)
197}
198
199pub fn schedule_tasks(tasks: Vec<Task>, max_workers: usize) -> Vec<Vec<Task>> {
200 let chunk_size = (tasks.len() / max_workers.max(1)).max(1);
201
202 tasks
203 .chunks(chunk_size)
204 .map(|chunk| chunk.to_vec())
205 .collect()
206}
207
208#[cfg(test)]
209mod tests {
210 use super::*;
211
212 fn create_test_task(id: usize) -> Task {
213 Task {
214 id,
215 name: format!("Task {}", id),
216 task_type: TaskType::Parse,
217 source: PathBuf::from(format!("file{}.js", id)),
218 priority: 0,
219 }
220 }
221
222 #[test]
223 fn test_task_scheduler_new() {
224 let scheduler = TaskScheduler::new();
225 assert!(scheduler.is_empty());
226 assert_eq!(scheduler.task_count(), 0);
227 }
228
229 #[test]
230 fn test_add_task() {
231 let mut scheduler = TaskScheduler::new();
232 scheduler.add_task(create_test_task(1));
233 assert_eq!(scheduler.task_count(), 1);
234 }
235
236 #[test]
237 fn test_add_tasks() {
238 let mut scheduler = TaskScheduler::new();
239 let tasks: Vec<Task> = (1..=5).map(create_test_task).collect();
240 scheduler.add_tasks(tasks);
241 assert_eq!(scheduler.task_count(), 5);
242 }
243
244 #[test]
245 fn test_get_next_task() {
246 let mut scheduler = TaskScheduler::new();
247 scheduler.add_task(create_test_task(1));
248 scheduler.add_task(create_test_task(2));
249
250 let task = scheduler.get_next_task();
251 assert!(task.is_some());
252 }
253
254 #[test]
255 fn test_record_result() {
256 let scheduler = TaskScheduler::new();
257 let result = TaskResult {
258 task_id: 1,
259 success: true,
260 duration: Duration::from_millis(100),
261 output: Some("done".to_string()),
262 error: None,
263 };
264
265 scheduler.record_result(result);
266 let stored = scheduler.get_result(1);
267 assert!(stored.is_some());
268 assert!(stored.unwrap().success);
269 }
270
271 #[test]
272 fn test_metrics() {
273 let scheduler = TaskScheduler::new();
274 let metrics = scheduler.get_metrics();
275 assert_eq!(metrics.total_tasks, 0);
276 }
277
278 #[test]
279 fn test_execution_metrics_add_times() {
280 let mut metrics = ExecutionMetrics::default();
281 metrics.add_parse_time(100);
282 metrics.add_transform_time(200);
283 assert_eq!(metrics.parse_time_ms, 100);
284 assert_eq!(metrics.transform_time_ms, 200);
285 }
286
287 #[test]
288 fn test_execution_metrics_record_completion() {
289 let mut metrics = ExecutionMetrics::default();
290 metrics.total_tasks = 5;
291 metrics.record_completion(true);
292 assert_eq!(metrics.completed_tasks, 1);
293
294 metrics.record_completion(false);
295 assert_eq!(metrics.failed_tasks, 1);
296 }
297
298 #[test]
299 fn test_schedule_tasks() {
300 let tasks: Vec<Task> = (0..10).map(|i| create_test_task(i)).collect();
301 let chunks = schedule_tasks(tasks, 3);
302
303 assert!(chunks.len() <= 4);
304 let total: usize = chunks.iter().map(|c| c.len()).sum();
305 assert_eq!(total, 10);
306 }
307
308 #[test]
309 fn test_scheduler_builder() {
310 let scheduler = SchedulerBuilder::new()
311 .max_concurrent(8)
312 .priority_threshold(5)
313 .build();
314
315 assert!(scheduler.is_empty());
316 }
317
318 #[test]
319 fn test_clear_scheduler() {
320 let mut scheduler = TaskScheduler::new();
321 scheduler.add_task(create_test_task(1));
322 scheduler.clear();
323 assert!(scheduler.is_empty());
324 }
325}