use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
pub struct WorkerPool {
workers: Vec<Worker>,
jobs: Arc<Mutex<VecDeque<Job>>>,
results: Arc<Mutex<Vec<JobResult>>>,
config: super::ExecutionConfig,
}
pub struct Job {
pub id: usize,
pub data: JobData,
}
pub enum JobData {
ParseFile(String),
TransformFile(String),
ScanDirectory(String),
}
pub struct JobResult {
pub id: usize,
pub success: bool,
pub output: Option<String>,
pub error: Option<String>,
pub duration_ms: u64,
}
impl WorkerPool {
pub fn new(config: super::ExecutionConfig) -> Self {
let jobs = Arc::new(Mutex::new(VecDeque::new()));
let results = Arc::new(Mutex::new(Vec::new()));
let mut workers = Vec::new();
for id in 0..config.max_workers {
let jobs = Arc::clone(&jobs);
let results = Arc::clone(&results);
let worker = Worker { _id: id, _handle: None };
if !config.sequential {
let handle = thread::spawn(move || {
loop {
let job = {
let mut queue = jobs.lock().unwrap();
queue.pop_front()
};
match job {
Some(job) => {
let result = execute_job(job);
let mut results = results.lock().unwrap();
results.push(result);
}
None => {
thread::sleep(Duration::from_millis(10));
}
}
}
});
workers.push(Worker {
_id: id,
_handle: Some(handle),
});
} else {
workers.push(worker);
}
}
Self {
workers,
jobs,
results,
config,
}
}
pub fn submit(&mut self, job: Job) {
let mut queue = self.jobs.lock().unwrap();
queue.push_back(job);
}
pub fn submit_many(&mut self, jobs: Vec<Job>) {
let mut queue = self.jobs.lock().unwrap();
for job in jobs {
queue.push_back(job);
}
}
pub fn wait_for_completion(&self) -> Vec<JobResult> {
loop {
let queue_len = {
let queue = self.jobs.lock().unwrap();
queue.len()
};
let results_len = {
let results = self.results.lock().unwrap();
results.len()
};
if queue_len == 0 && results_len > 0 {
let mut results = self.results.lock().unwrap();
return std::mem::take(&mut *results);
}
thread::sleep(Duration::from_millis(50));
}
}
pub fn drain_results(&self) -> Vec<JobResult> {
let mut results = self.results.lock().unwrap();
std::mem::take(&mut *results)
}
pub fn is_sequential(&self) -> bool {
self.config.sequential
}
pub fn worker_count(&self) -> usize {
self.config.max_workers
}
}
impl Drop for WorkerPool {
fn drop(&mut self) {
for _worker in &mut self.workers {
}
}
}
struct Worker {
_id: usize,
_handle: Option<thread::JoinHandle<()>>,
}
fn execute_job(job: Job) -> JobResult {
let start = std::time::Instant::now();
match job.data {
JobData::ParseFile(path) => {
let output = format!("Parsed: {}", path);
JobResult {
id: job.id,
success: true,
output: Some(output),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
}
}
JobData::TransformFile(path) => {
let output = format!("Transformed: {}", path);
JobResult {
id: job.id,
success: true,
output: Some(output),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
}
}
JobData::ScanDirectory(path) => {
let output = format!("Scanned: {}", path);
JobResult {
id: job.id,
success: true,
output: Some(output),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
}
}
}
}
pub struct ParallelExecutor {
pool: WorkerPool,
_config: super::ExecutionConfig,
}
impl ParallelExecutor {
pub fn new(config: super::ExecutionConfig) -> Self {
let pool = WorkerPool::new(config.clone());
Self { pool, _config: config }
}
pub fn execute_parse_jobs(&mut self, paths: Vec<String>) -> Vec<JobResult> {
let jobs: Vec<Job> = paths
.into_iter()
.enumerate()
.map(|(id, path)| Job {
id,
data: JobData::ParseFile(path),
})
.collect();
self.pool.submit_many(jobs);
self.pool.wait_for_completion()
}
pub fn execute_transform_jobs(&mut self, paths: Vec<String>) -> Vec<JobResult> {
let jobs: Vec<Job> = paths
.into_iter()
.enumerate()
.map(|(id, path)| Job {
id,
data: JobData::TransformFile(path),
})
.collect();
self.pool.submit_many(jobs);
self.pool.wait_for_completion()
}
pub fn is_parallel(&self) -> bool {
!self.pool.is_sequential()
}
pub fn worker_count(&self) -> usize {
self.pool.worker_count()
}
}
pub fn create_executor(jobs: usize, sequential: bool) -> ParallelExecutor {
let config = if sequential {
super::ExecutionConfig::sequential()
} else {
super::ExecutionConfig::with_workers(jobs)
};
ParallelExecutor::new(config)
}
#[cfg(test)]
mod tests {
use crate::core::execution::{ExecutionConfig, ParallelExecutor, WorkerPool, create_executor};
#[allow(dead_code)]
fn num_cpus() -> usize {
std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1)
}
#[test]
fn test_worker_pool_sequential() {
let config = ExecutionConfig::sequential();
let pool = WorkerPool::new(config);
assert!(pool.is_sequential());
assert_eq!(pool.worker_count(), 1);
}
#[test]
fn test_worker_pool_parallel() {
let config = ExecutionConfig::with_workers(4);
let pool = WorkerPool::new(config);
assert!(!pool.is_sequential());
assert_eq!(pool.worker_count(), 4);
}
#[test]
fn test_execute_parse_jobs() {
let config = ExecutionConfig::sequential();
let mut executor = ParallelExecutor::new(config);
let paths = vec!["file1.js".to_string(), "file2.js".to_string()];
let results = executor.execute_parse_jobs(paths);
assert_eq!(results.len(), 2);
assert!(results.iter().all(|r| r.success));
}
#[test]
fn test_create_executor_sequential() {
let executor = create_executor(4, true);
assert!(!executor.is_parallel());
}
#[test]
fn test_create_executor_parallel() {
let executor = create_executor(4, false);
assert!(executor.is_parallel());
}
}