morph_cli/core/execution/
parallel.rs1use std::collections::VecDeque;
2use std::sync::{Arc, Mutex};
3use std::thread;
4use std::time::Duration;
5
6pub struct WorkerPool {
7 workers: Vec<Worker>,
8 jobs: Arc<Mutex<VecDeque<Job>>>,
9 results: Arc<Mutex<Vec<JobResult>>>,
10 config: super::ExecutionConfig,
11}
12
13pub struct Job {
14 pub id: usize,
15 pub data: JobData,
16}
17
18pub enum JobData {
19 ParseFile(String),
20 TransformFile(String),
21 ScanDirectory(String),
22}
23
24pub struct JobResult {
25 pub id: usize,
26 pub success: bool,
27 pub output: Option<String>,
28 pub error: Option<String>,
29 pub duration_ms: u64,
30}
31
32impl WorkerPool {
33 pub fn new(config: super::ExecutionConfig) -> Self {
34 let jobs = Arc::new(Mutex::new(VecDeque::new()));
35 let results = Arc::new(Mutex::new(Vec::new()));
36
37 let mut workers = Vec::new();
38
39 for id in 0..config.max_workers {
40 let jobs = Arc::clone(&jobs);
41 let results = Arc::clone(&results);
42
43 let worker = Worker { _id: id, _handle: None };
44
45 if !config.sequential {
46 let handle = thread::spawn(move || {
47 loop {
48 let job = {
49 let mut queue = jobs.lock().unwrap();
50 queue.pop_front()
51 };
52
53 match job {
54 Some(job) => {
55 let result = execute_job(job);
56 let mut results = results.lock().unwrap();
57 results.push(result);
58 }
59 None => {
60 thread::sleep(Duration::from_millis(10));
61 }
62 }
63 }
64 });
65
66 workers.push(Worker {
67 _id: id,
68 _handle: Some(handle),
69 });
70 } else {
71 workers.push(worker);
72 }
73 }
74
75 Self {
76 workers,
77 jobs,
78 results,
79 config,
80 }
81 }
82
83 pub fn submit(&mut self, job: Job) {
84 let mut queue = self.jobs.lock().unwrap();
85 queue.push_back(job);
86 }
87
88 pub fn submit_many(&mut self, jobs: Vec<Job>) {
89 let mut queue = self.jobs.lock().unwrap();
90 for job in jobs {
91 queue.push_back(job);
92 }
93 }
94
95 pub fn wait_for_completion(&self) -> Vec<JobResult> {
96 loop {
97 let queue_len = {
98 let queue = self.jobs.lock().unwrap();
99 queue.len()
100 };
101
102 let results_len = {
103 let results = self.results.lock().unwrap();
104 results.len()
105 };
106
107 if queue_len == 0 && results_len > 0 {
108 let mut results = self.results.lock().unwrap();
109 return std::mem::take(&mut *results);
110 }
111
112 thread::sleep(Duration::from_millis(50));
113 }
114 }
115
116 pub fn drain_results(&self) -> Vec<JobResult> {
117 let mut results = self.results.lock().unwrap();
118 std::mem::take(&mut *results)
119 }
120
121 pub fn is_sequential(&self) -> bool {
122 self.config.sequential
123 }
124
125 pub fn worker_count(&self) -> usize {
126 self.config.max_workers
127 }
128}
129
130impl Drop for WorkerPool {
131 fn drop(&mut self) {
132 for _worker in &mut self.workers {
133 }
136 }
137}
138
139struct Worker {
140 _id: usize,
141 _handle: Option<thread::JoinHandle<()>>,
142}
143
144fn execute_job(job: Job) -> JobResult {
145 let start = std::time::Instant::now();
146
147 match job.data {
148 JobData::ParseFile(path) => {
149 let output = format!("Parsed: {}", path);
151 JobResult {
152 id: job.id,
153 success: true,
154 output: Some(output),
155 error: None,
156 duration_ms: start.elapsed().as_millis() as u64,
157 }
158 }
159 JobData::TransformFile(path) => {
160 let output = format!("Transformed: {}", path);
161 JobResult {
162 id: job.id,
163 success: true,
164 output: Some(output),
165 error: None,
166 duration_ms: start.elapsed().as_millis() as u64,
167 }
168 }
169 JobData::ScanDirectory(path) => {
170 let output = format!("Scanned: {}", path);
171 JobResult {
172 id: job.id,
173 success: true,
174 output: Some(output),
175 error: None,
176 duration_ms: start.elapsed().as_millis() as u64,
177 }
178 }
179 }
180}
181
182pub struct ParallelExecutor {
183 pool: WorkerPool,
184 _config: super::ExecutionConfig,
185}
186
187impl ParallelExecutor {
188 pub fn new(config: super::ExecutionConfig) -> Self {
189 let pool = WorkerPool::new(config.clone());
190 Self { pool, _config: config }
191 }
192
193 pub fn execute_parse_jobs(&mut self, paths: Vec<String>) -> Vec<JobResult> {
194 let jobs: Vec<Job> = paths
195 .into_iter()
196 .enumerate()
197 .map(|(id, path)| Job {
198 id,
199 data: JobData::ParseFile(path),
200 })
201 .collect();
202
203 self.pool.submit_many(jobs);
204 self.pool.wait_for_completion()
205 }
206
207 pub fn execute_transform_jobs(&mut self, paths: Vec<String>) -> Vec<JobResult> {
208 let jobs: Vec<Job> = paths
209 .into_iter()
210 .enumerate()
211 .map(|(id, path)| Job {
212 id,
213 data: JobData::TransformFile(path),
214 })
215 .collect();
216
217 self.pool.submit_many(jobs);
218 self.pool.wait_for_completion()
219 }
220
221 pub fn is_parallel(&self) -> bool {
222 !self.pool.is_sequential()
223 }
224
225 pub fn worker_count(&self) -> usize {
226 self.pool.worker_count()
227 }
228}
229
230pub fn create_executor(jobs: usize, sequential: bool) -> ParallelExecutor {
231 let config = if sequential {
232 super::ExecutionConfig::sequential()
233 } else {
234 super::ExecutionConfig::with_workers(jobs)
235 };
236
237 ParallelExecutor::new(config)
238}
239
240#[cfg(test)]
241mod tests {
242 use crate::core::execution::{ExecutionConfig, ParallelExecutor, WorkerPool, create_executor};
243
244 #[allow(dead_code)]
245 fn num_cpus() -> usize {
246 std::thread::available_parallelism()
247 .map(|n| n.get())
248 .unwrap_or(1)
249 }
250
251 #[test]
252 fn test_worker_pool_sequential() {
253 let config = ExecutionConfig::sequential();
254 let pool = WorkerPool::new(config);
255 assert!(pool.is_sequential());
256 assert_eq!(pool.worker_count(), 1);
257 }
258
259 #[test]
260 fn test_worker_pool_parallel() {
261 let config = ExecutionConfig::with_workers(4);
262 let pool = WorkerPool::new(config);
263 assert!(!pool.is_sequential());
264 assert_eq!(pool.worker_count(), 4);
265 }
266
267 #[test]
268 fn test_execute_parse_jobs() {
269 let config = ExecutionConfig::sequential();
270 let mut executor = ParallelExecutor::new(config);
271
272 let paths = vec!["file1.js".to_string(), "file2.js".to_string()];
273 let results = executor.execute_parse_jobs(paths);
274
275 assert_eq!(results.len(), 2);
276 assert!(results.iter().all(|r| r.success));
277 }
278
279 #[test]
280 fn test_create_executor_sequential() {
281 let executor = create_executor(4, true);
282 assert!(!executor.is_parallel());
283 }
284
285 #[test]
286 fn test_create_executor_parallel() {
287 let executor = create_executor(4, false);
288 assert!(executor.is_parallel());
289 }
290}