#[cfg(feature = "parallel")]
use rayon::prelude::*;
use crate::tile::{GateDecision, GateThresholds, TileReport, TileZero, WorkerTile, SyndromeDelta};
use crate::error::{Result, RuQuError};
#[derive(Clone, Debug)]
pub struct ParallelConfig {
pub num_threads: usize,
pub chunk_size: usize,
pub work_stealing: bool,
pub thresholds: GateThresholds,
}
impl Default for ParallelConfig {
fn default() -> Self {
Self {
num_threads: 0, chunk_size: 16, work_stealing: true,
thresholds: GateThresholds::default(),
}
}
}
impl ParallelConfig {
pub fn low_latency() -> Self {
Self {
num_threads: 4,
chunk_size: 64, work_stealing: false, thresholds: GateThresholds::default(),
}
}
pub fn high_throughput() -> Self {
Self {
num_threads: 0, chunk_size: 8, work_stealing: true,
thresholds: GateThresholds::default(),
}
}
}
pub struct ParallelFabric {
workers: Vec<WorkerTile>,
coordinator: TileZero,
config: ParallelConfig,
stats: ParallelStats,
}
#[derive(Clone, Copy, Debug, Default)]
pub struct ParallelStats {
pub total_processed: u64,
pub batches: u64,
pub avg_batch_time_ns: u64,
pub peak_throughput: f64,
}
impl ParallelFabric {
pub fn new(config: ParallelConfig) -> Result<Self> {
#[cfg(feature = "parallel")]
{
if config.num_threads > 0 {
rayon::ThreadPoolBuilder::new()
.num_threads(config.num_threads)
.build_global()
.ok(); }
}
let workers: Vec<WorkerTile> = (1..=255u8)
.map(WorkerTile::new)
.collect();
let coordinator = TileZero::with_random_key(config.thresholds.clone());
Ok(Self {
workers,
coordinator,
config,
stats: ParallelStats::default(),
})
}
#[cfg(feature = "parallel")]
pub fn process_parallel(&mut self, syndrome: &SyndromeDelta) -> Result<GateDecision> {
use std::time::Instant;
let start = Instant::now();
let reports: Vec<TileReport> = self.workers
.par_iter_mut()
.with_min_len(self.config.chunk_size)
.map(|worker| worker.tick(syndrome))
.collect();
let decision = self.coordinator.merge_reports(reports);
let elapsed_ns = start.elapsed().as_nanos() as u64;
self.stats.total_processed += 255;
self.stats.batches += 1;
self.stats.avg_batch_time_ns =
(self.stats.avg_batch_time_ns * (self.stats.batches - 1) + elapsed_ns)
/ self.stats.batches;
let throughput = 255.0 / (elapsed_ns as f64 / 1_000_000_000.0);
if throughput > self.stats.peak_throughput {
self.stats.peak_throughput = throughput;
}
Ok(decision)
}
#[cfg(not(feature = "parallel"))]
pub fn process_parallel(&mut self, syndrome: &SyndromeDelta) -> Result<GateDecision> {
use std::time::Instant;
let start = Instant::now();
let reports: Vec<TileReport> = self.workers
.iter_mut()
.map(|worker| worker.tick(syndrome))
.collect();
let decision = self.coordinator.merge_reports(reports);
let elapsed_ns = start.elapsed().as_nanos() as u64;
self.stats.total_processed += 255;
self.stats.batches += 1;
self.stats.avg_batch_time_ns =
(self.stats.avg_batch_time_ns * (self.stats.batches - 1) + elapsed_ns)
/ self.stats.batches;
Ok(decision)
}
#[cfg(feature = "parallel")]
pub fn process_batch(&mut self, syndromes: &[SyndromeDelta]) -> Result<Vec<GateDecision>> {
let decisions: Vec<GateDecision> = syndromes
.iter()
.map(|s| self.process_parallel(s).unwrap_or(GateDecision::Defer))
.collect();
Ok(decisions)
}
#[cfg(not(feature = "parallel"))]
pub fn process_batch(&mut self, syndromes: &[SyndromeDelta]) -> Result<Vec<GateDecision>> {
let decisions: Vec<GateDecision> = syndromes
.iter()
.map(|s| self.process_parallel(s).unwrap_or(GateDecision::Defer))
.collect();
Ok(decisions)
}
pub fn stats(&self) -> &ParallelStats {
&self.stats
}
pub fn reset_stats(&mut self) {
self.stats = ParallelStats::default();
}
pub fn coordinator(&self) -> &TileZero {
&self.coordinator
}
pub fn coordinator_mut(&mut self) -> &mut TileZero {
&mut self.coordinator
}
}
#[cfg(feature = "parallel")]
pub fn parallel_aggregate(reports: &[TileReport]) -> (f64, f64, f64) {
use rayon::prelude::*;
if reports.is_empty() {
return (f64::MAX, 0.0, 1.0);
}
let min_cut = reports
.par_iter()
.map(|r| if r.local_cut > 0.0 { r.local_cut } else { f64::MAX })
.reduce(|| f64::MAX, |a, b| a.min(b));
let max_shift = reports
.par_iter()
.map(|r| r.shift_score)
.reduce(|| 0.0, |a, b| a.max(b));
let log_sum: f64 = reports
.par_iter()
.map(|r| f64::log2(r.e_value.max(1e-10)))
.sum();
let e_aggregate = f64::exp2(log_sum / reports.len() as f64);
(min_cut, max_shift, e_aggregate)
}
#[cfg(not(feature = "parallel"))]
pub fn parallel_aggregate(reports: &[TileReport]) -> (f64, f64, f64) {
if reports.is_empty() {
return (f64::MAX, 0.0, 1.0);
}
let mut min_cut = f64::MAX;
let mut max_shift = 0.0;
let mut log_sum = 0.0;
for r in reports {
if r.local_cut > 0.0 && r.local_cut < min_cut {
min_cut = r.local_cut;
}
if r.shift_score > max_shift {
max_shift = r.shift_score;
}
log_sum += f64::log2(r.e_value.max(1e-10));
}
let e_aggregate = f64::exp2(log_sum / reports.len() as f64);
(min_cut, max_shift, e_aggregate)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parallel_config_default() {
let config = ParallelConfig::default();
assert_eq!(config.num_threads, 0);
assert!(config.work_stealing);
}
#[test]
fn test_parallel_fabric_creation() {
let config = ParallelConfig::default();
let fabric = ParallelFabric::new(config);
assert!(fabric.is_ok());
let fabric = fabric.unwrap();
assert_eq!(fabric.workers.len(), 255);
}
#[test]
fn test_parallel_process() {
let config = ParallelConfig::default();
let mut fabric = ParallelFabric::new(config).unwrap();
let syndrome = SyndromeDelta::new(1, 2, 100);
let decision = fabric.process_parallel(&syndrome);
assert!(decision.is_ok());
}
#[test]
fn test_parallel_aggregate() {
let reports: Vec<TileReport> = (1..=10)
.map(|i| {
let mut r = TileReport::new(i);
r.local_cut = i as f64 * 2.0;
r.shift_score = i as f64 * 0.05;
r.e_value = 100.0;
r
})
.collect();
let (min_cut, max_shift, e_agg) = parallel_aggregate(&reports);
assert_eq!(min_cut, 2.0); assert!((max_shift - 0.5).abs() < 0.001); assert!((e_agg - 100.0).abs() < 0.001); }
}