pub fn overclocked_parallel_sort(input: &[i32], max_val: usize) -> Vec<i32> {
let num_elements = input.len();
if num_elements == 0 {
return vec![];
}
let num_threads = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4);
let chunk_sz = num_elements / num_threads;
let mut local_buckets = vec![vec![0usize; max_val + 1]; num_threads];
std::thread::scope(|s| {
for (i, bucket) in local_buckets.iter_mut().enumerate() {
let start = i * chunk_sz;
let end = if i == num_threads - 1 { num_elements } else { (i + 1) * chunk_sz };
s.spawn(move || {
const L2_CACHE_BLOCK_SIZE: usize = 64_000;
let mut idx = start;
while idx < end {
let block_end = std::cmp::min(idx + L2_CACHE_BLOCK_SIZE, end);
for &x in &input[idx..block_end] {
bucket[x as usize] += 1;
}
idx = block_end;
}
});
}
});
let mut global_offsets = vec![0usize; max_val + 2];
let mut current_offset = 0;
struct WriteJob {
offset: usize,
count: usize,
val: i32,
}
let mut write_jobs = Vec::new();
let max_job_size = 500_000;
for val in 1..=max_val {
global_offsets[val] = current_offset;
let mut count = 0;
for t in 0..num_threads {
count += local_buckets[t][val];
}
if count == 0 { continue; }
let mut remaining = count;
while remaining > 0 {
let job_count = std::cmp::min(remaining, max_job_size);
write_jobs.push(WriteJob {
offset: current_offset,
count: job_count,
val: val as i32,
});
current_offset += job_count;
remaining -= job_count;
}
}
let mut result_vec = vec![0i32; num_elements];
let result_ptr = result_vec.as_mut_ptr() as usize;
let job_counter = std::sync::atomic::AtomicUsize::new(0);
let jobs_ref = &write_jobs;
std::thread::scope(|s| {
for _ in 0..num_threads {
s.spawn(|| {
loop {
let idx = job_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if idx >= jobs_ref.len() { break; }
let job = &jobs_ref[idx];
unsafe {
let ptr = (result_ptr as *mut i32).add(job.offset);
std::slice::from_raw_parts_mut(ptr, job.count).fill(job.val);
}
}
});
}
});
result_vec
}