Skip to main content

morph_cli/core/execution/
parallel.rs

1use std::collections::VecDeque;
2use std::sync::{Arc, Mutex};
3use std::thread;
4use std::time::Duration;
5
6pub struct WorkerPool {
7    workers: Vec<Worker>,
8    jobs: Arc<Mutex<VecDeque<Job>>>,
9    results: Arc<Mutex<Vec<JobResult>>>,
10    config: super::ExecutionConfig,
11}
12
13pub struct Job {
14    pub id: usize,
15    pub data: JobData,
16}
17
18pub enum JobData {
19    ParseFile(String),
20    TransformFile(String),
21    ScanDirectory(String),
22}
23
24pub struct JobResult {
25    pub id: usize,
26    pub success: bool,
27    pub output: Option<String>,
28    pub error: Option<String>,
29    pub duration_ms: u64,
30}
31
32impl WorkerPool {
33    pub fn new(config: super::ExecutionConfig) -> Self {
34        let jobs = Arc::new(Mutex::new(VecDeque::new()));
35        let results = Arc::new(Mutex::new(Vec::new()));
36
37        let mut workers = Vec::new();
38
39        for id in 0..config.max_workers {
40            let jobs = Arc::clone(&jobs);
41            let results = Arc::clone(&results);
42
43            let worker = Worker { _id: id, _handle: None };
44
45            if !config.sequential {
46                let handle = thread::spawn(move || {
47                    loop {
48                        let job = {
49                            let mut queue = jobs.lock().unwrap();
50                            queue.pop_front()
51                        };
52
53                        match job {
54                            Some(job) => {
55                                let result = execute_job(job);
56                                let mut results = results.lock().unwrap();
57                                results.push(result);
58                            }
59                            None => {
60                                thread::sleep(Duration::from_millis(10));
61                            }
62                        }
63                    }
64                });
65
66                workers.push(Worker {
67                    _id: id,
68                    _handle: Some(handle),
69                });
70            } else {
71                workers.push(worker);
72            }
73        }
74
75        Self {
76            workers,
77            jobs,
78            results,
79            config,
80        }
81    }
82
83    pub fn submit(&mut self, job: Job) {
84        let mut queue = self.jobs.lock().unwrap();
85        queue.push_back(job);
86    }
87
88    pub fn submit_many(&mut self, jobs: Vec<Job>) {
89        let mut queue = self.jobs.lock().unwrap();
90        for job in jobs {
91            queue.push_back(job);
92        }
93    }
94
95    pub fn wait_for_completion(&self) -> Vec<JobResult> {
96        loop {
97            let queue_len = {
98                let queue = self.jobs.lock().unwrap();
99                queue.len()
100            };
101
102            let results_len = {
103                let results = self.results.lock().unwrap();
104                results.len()
105            };
106
107            if queue_len == 0 && results_len > 0 {
108                let mut results = self.results.lock().unwrap();
109                return std::mem::take(&mut *results);
110            }
111
112            thread::sleep(Duration::from_millis(50));
113        }
114    }
115
116    pub fn drain_results(&self) -> Vec<JobResult> {
117        let mut results = self.results.lock().unwrap();
118        std::mem::take(&mut *results)
119    }
120
121    pub fn is_sequential(&self) -> bool {
122        self.config.sequential
123    }
124
125    pub fn worker_count(&self) -> usize {
126        self.config.max_workers
127    }
128}
129
130impl Drop for WorkerPool {
131    fn drop(&mut self) {
132        for _worker in &mut self.workers {
133            // Workers run indefinitely in this simple implementation
134            // In production, we'd use a shutdown signal
135        }
136    }
137}
138
139struct Worker {
140    _id: usize,
141    _handle: Option<thread::JoinHandle<()>>,
142}
143
144fn execute_job(job: Job) -> JobResult {
145    let start = std::time::Instant::now();
146
147    match job.data {
148        JobData::ParseFile(path) => {
149            // Simulated parse - in production, use actual parser
150            let output = format!("Parsed: {}", path);
151            JobResult {
152                id: job.id,
153                success: true,
154                output: Some(output),
155                error: None,
156                duration_ms: start.elapsed().as_millis() as u64,
157            }
158        }
159        JobData::TransformFile(path) => {
160            let output = format!("Transformed: {}", path);
161            JobResult {
162                id: job.id,
163                success: true,
164                output: Some(output),
165                error: None,
166                duration_ms: start.elapsed().as_millis() as u64,
167            }
168        }
169        JobData::ScanDirectory(path) => {
170            let output = format!("Scanned: {}", path);
171            JobResult {
172                id: job.id,
173                success: true,
174                output: Some(output),
175                error: None,
176                duration_ms: start.elapsed().as_millis() as u64,
177            }
178        }
179    }
180}
181
182pub struct ParallelExecutor {
183    pool: WorkerPool,
184    _config: super::ExecutionConfig,
185}
186
187impl ParallelExecutor {
188    pub fn new(config: super::ExecutionConfig) -> Self {
189        let pool = WorkerPool::new(config.clone());
190        Self { pool, _config: config }
191    }
192
193    pub fn execute_parse_jobs(&mut self, paths: Vec<String>) -> Vec<JobResult> {
194        let jobs: Vec<Job> = paths
195            .into_iter()
196            .enumerate()
197            .map(|(id, path)| Job {
198                id,
199                data: JobData::ParseFile(path),
200            })
201            .collect();
202
203        self.pool.submit_many(jobs);
204        self.pool.wait_for_completion()
205    }
206
207    pub fn execute_transform_jobs(&mut self, paths: Vec<String>) -> Vec<JobResult> {
208        let jobs: Vec<Job> = paths
209            .into_iter()
210            .enumerate()
211            .map(|(id, path)| Job {
212                id,
213                data: JobData::TransformFile(path),
214            })
215            .collect();
216
217        self.pool.submit_many(jobs);
218        self.pool.wait_for_completion()
219    }
220
221    pub fn is_parallel(&self) -> bool {
222        !self.pool.is_sequential()
223    }
224
225    pub fn worker_count(&self) -> usize {
226        self.pool.worker_count()
227    }
228}
229
230pub fn create_executor(jobs: usize, sequential: bool) -> ParallelExecutor {
231    let config = if sequential {
232        super::ExecutionConfig::sequential()
233    } else {
234        super::ExecutionConfig::with_workers(jobs)
235    };
236
237    ParallelExecutor::new(config)
238}
239
240#[cfg(test)]
241mod tests {
242    use crate::core::execution::{ExecutionConfig, ParallelExecutor, WorkerPool, create_executor};
243
244    #[allow(dead_code)]
245    fn num_cpus() -> usize {
246        std::thread::available_parallelism()
247            .map(|n| n.get())
248            .unwrap_or(1)
249    }
250
251    #[test]
252    fn test_worker_pool_sequential() {
253        let config = ExecutionConfig::sequential();
254        let pool = WorkerPool::new(config);
255        assert!(pool.is_sequential());
256        assert_eq!(pool.worker_count(), 1);
257    }
258
259    #[test]
260    fn test_worker_pool_parallel() {
261        let config = ExecutionConfig::with_workers(4);
262        let pool = WorkerPool::new(config);
263        assert!(!pool.is_sequential());
264        assert_eq!(pool.worker_count(), 4);
265    }
266
267    #[test]
268    fn test_execute_parse_jobs() {
269        let config = ExecutionConfig::sequential();
270        let mut executor = ParallelExecutor::new(config);
271
272        let paths = vec!["file1.js".to_string(), "file2.js".to_string()];
273        let results = executor.execute_parse_jobs(paths);
274
275        assert_eq!(results.len(), 2);
276        assert!(results.iter().all(|r| r.success));
277    }
278
279    #[test]
280    fn test_create_executor_sequential() {
281        let executor = create_executor(4, true);
282        assert!(!executor.is_parallel());
283    }
284
285    #[test]
286    fn test_create_executor_parallel() {
287        let executor = create_executor(4, false);
288        assert!(executor.is_parallel());
289    }
290}