#![warn(clippy::perf, clippy::style, missing_docs)]
#![allow(clippy::excessive_precision)]
#![cfg_attr(coverage_nightly, feature(coverage_attribute))]
pub use std::f64;
#[cfg(feature = "python")]
use pyo3::PyErr;
#[cfg(feature = "mpi")]
#[cfg_attr(coverage_nightly, coverage(off))]
pub mod mpi {
use std::{
ops::Range,
sync::{
atomic::{AtomicBool, Ordering},
OnceLock,
},
};
use lazy_static::lazy_static;
use mpi::{
datatype::PartitionMut,
environment::Universe,
topology::{Process, SimpleCommunicator},
traits::{Communicator, CommunicatorCollectives, Equivalence},
};
use parking_lot::RwLock;
lazy_static! {
static ref USE_MPI: AtomicBool = AtomicBool::new(false);
}
static MPI_UNIVERSE: OnceLock<RwLock<Option<Universe>>> = OnceLock::new();
pub const ROOT_RANK: i32 = 0;
pub fn is_root() -> bool {
if let Some(world) = crate::mpi::get_world() {
world.rank() == ROOT_RANK
} else {
true
}
}
pub fn get_world() -> Option<SimpleCommunicator> {
if let Some(universe_lock) = MPI_UNIVERSE.get() {
if let Some(universe) = &*universe_lock.read() {
return Some(universe.world());
}
}
None
}
pub fn get_rank() -> i32 {
get_world().map(|w| w.rank()).unwrap_or(ROOT_RANK)
}
pub fn get_size() -> i32 {
get_world().map(|w| w.size()).unwrap_or(1)
}
pub fn use_mpi(trigger: bool) {
if trigger {
USE_MPI.store(true, Ordering::SeqCst);
MPI_UNIVERSE.get_or_init(|| {
#[cfg(feature = "rayon")]
let threading = mpi::Threading::Funneled;
#[cfg(not(feature = "rayon"))]
let threading = mpi::Threading::Single;
let (universe, _threading) = mpi::initialize_with_threading(threading).unwrap();
let world = universe.world();
if world.size() == 1 {
eprintln!("Warning: MPI is enabled, but only one process is available. MPI will not be used, but single-CPU parallelism may still be used if enabled.");
finalize_mpi();
USE_MPI.store(false, Ordering::SeqCst);
RwLock::new(None)
} else {
RwLock::new(Some(universe))
}
});
}
}
pub fn finalize_mpi() {
if get_world().is_some() {
if let Some(universe_lock) = MPI_UNIVERSE.get() {
let mut universe = universe_lock.write();
*universe = None;
}
}
USE_MPI.store(false, Ordering::SeqCst);
}
pub fn using_mpi() -> bool {
USE_MPI.load(Ordering::SeqCst)
}
fn counts_displs(size: usize, total: usize, stride: usize) -> (Vec<i32>, Vec<i32>) {
let mut counts = vec![0i32; size];
let mut displs = vec![0i32; size];
if size == 0 {
return (counts, displs);
}
let base = total / size;
let remainder = total % size;
let mut offset = 0i32;
for rank in 0..size {
let n = if rank < remainder { base + 1 } else { base };
let scaled = (n * stride) as i32;
counts[rank] = scaled;
displs[rank] = offset;
offset += scaled;
}
(counts, displs)
}
#[inline]
fn rank_local_from_global(i_global: usize, size: usize, total: usize) -> (usize, usize) {
assert!(size > 0, "Communicator must have at least one rank");
assert!(total > 0, "Cannot map global indices when dataset is empty");
assert!(
i_global < total,
"Global index {} out of bounds for {} events",
i_global,
total
);
let base = total / size;
let remainder = total % size;
let big_block = base + 1;
let threshold = remainder * big_block;
if i_global < threshold {
let rank = i_global / big_block;
let local = i_global % big_block;
(rank, local)
} else {
let adjusted = i_global - threshold;
let rank = remainder + adjusted / base;
let local = adjusted % base;
(rank, local)
}
}
#[derive(Clone, Debug)]
pub struct Partition {
counts: Vec<i32>,
displs: Vec<i32>,
total: usize,
}
impl Partition {
pub fn new(size: usize, total: usize) -> Self {
assert!(size > 0, "Communicator must have at least one rank");
let (counts, displs) = counts_displs(size, total, 1);
Self {
counts,
displs,
total,
}
}
pub fn total(&self) -> usize {
self.total
}
pub fn n_ranks(&self) -> usize {
self.counts.len()
}
pub fn len_for_rank(&self, rank: usize) -> usize {
self.counts[rank] as usize
}
pub fn start_for_rank(&self, rank: usize) -> usize {
self.displs[rank] as usize
}
pub fn range_for_rank(&self, rank: usize) -> Range<usize> {
let start = self.start_for_rank(rank);
start..start + self.len_for_rank(rank)
}
pub fn owner_of(&self, global_index: usize) -> (usize, usize) {
assert!(
self.total > 0,
"Cannot map global indices when dataset is empty"
);
rank_local_from_global(global_index, self.n_ranks(), self.total)
}
pub fn into_raw(self) -> (Vec<i32>, Vec<i32>) {
(self.counts, self.displs)
}
}
pub trait LadduMPI {
fn process_at_root(&self) -> Process<'_>;
fn is_root(&self) -> bool;
fn all_gather_partitioned<T: Equivalence + Default + Clone>(
&self,
local: &[T],
total: usize,
stride: Option<usize>,
) -> Vec<T>;
fn all_gather_with_counts<T: Equivalence + Default + Clone>(
&self,
local: &[T],
counts: &[i32],
displs: &[i32],
) -> Vec<T>;
fn all_gather_batched_partitioned<T: Equivalence + Default + Clone>(
&self,
local: &[T],
global_indices: &[usize],
total: usize,
stride: Option<usize>,
) -> Vec<T>;
fn owner_of_global_index(&self, global_index: usize, total: usize) -> (i32, usize);
fn locals_from_globals(&self, global_indices: &[usize], total: usize) -> Vec<usize>;
fn get_counts_displs(&self, buf_len: usize) -> (Vec<i32>, Vec<i32>);
fn partition(&self, total: usize) -> Partition;
fn get_flattened_counts_displs(
&self,
unflattened_len: usize,
internal_len: usize,
) -> (Vec<i32>, Vec<i32>);
}
impl LadduMPI for SimpleCommunicator {
fn process_at_root(&self) -> Process<'_> {
self.process_at_rank(crate::mpi::ROOT_RANK)
}
fn is_root(&self) -> bool {
self.rank() == crate::mpi::ROOT_RANK
}
fn all_gather_partitioned<T: Equivalence + Default + Clone>(
&self,
local: &[T],
total: usize,
stride: Option<usize>,
) -> Vec<T> {
let size = self.size() as usize;
let stride = stride.unwrap_or(1);
assert!(stride > 0, "Stride must be greater than zero");
let mut out = vec![T::default(); total * stride];
if total == 0 || size == 0 {
return out;
}
let (counts, displs) = counts_displs(size, total, stride);
{
let mut partition = PartitionMut::new(&mut out, counts, displs);
self.all_gather_varcount_into(local, &mut partition);
}
out
}
fn all_gather_with_counts<T: Equivalence + Default + Clone>(
&self,
local: &[T],
counts: &[i32],
displs: &[i32],
) -> Vec<T> {
assert_eq!(
counts.len(),
displs.len(),
"Counts and displacements must have the same length"
);
assert_eq!(
counts.len(),
self.size() as usize,
"Counts/displacements must match communicator size"
);
let total = counts.iter().map(|count| *count as usize).sum();
let mut out = vec![T::default(); total];
{
let mut partition = PartitionMut::new(&mut out, counts.to_vec(), displs.to_vec());
self.all_gather_varcount_into(local, &mut partition);
}
out
}
fn all_gather_batched_partitioned<T: Equivalence + Default + Clone>(
&self,
local: &[T],
global_indices: &[usize],
total: usize,
stride: Option<usize>,
) -> Vec<T> {
let size = self.size() as usize;
let stride = stride.unwrap_or(1);
assert!(stride > 0, "Stride must be greater than zero");
let n_indices = global_indices.len();
let mut gathered = vec![T::default(); n_indices * stride];
if n_indices == 0 || size == 0 {
return gathered;
}
assert!(
total > 0,
"Cannot gather batched data from an empty dataset"
);
let partition = Partition::new(size, total);
let mut locals_by_rank = vec![Vec::<usize>::new(); size];
let mut targets_by_rank = vec![Vec::<usize>::new(); size];
for (position, &global_index) in global_indices.iter().enumerate() {
let (rank, local_index) = partition.owner_of(global_index);
locals_by_rank[rank].push(local_index);
targets_by_rank[rank].push(position);
}
let mut counts = vec![0i32; size];
let mut displs = vec![0i32; size];
for rank in 0..size {
counts[rank] = (locals_by_rank[rank].len() * stride) as i32;
displs[rank] = if rank == 0 {
0
} else {
displs[rank - 1] + counts[rank - 1]
};
}
let expected_local = locals_by_rank[self.rank() as usize].len() * stride;
debug_assert_eq!(
local.len(),
expected_local,
"Local buffer length does not match expected gathered size for rank {}",
self.rank()
);
{
let mut partition =
PartitionMut::new(&mut gathered, counts.clone(), displs.clone());
self.all_gather_varcount_into(local, &mut partition);
}
let mut result = vec![T::default(); n_indices * stride];
for rank in 0..size {
let mut cursor = displs[rank] as usize;
for &target in &targets_by_rank[rank] {
let dst = target * stride;
result[dst..(stride + dst)]
.clone_from_slice(&gathered[cursor..(stride + cursor)]);
cursor += stride;
}
}
result
}
fn owner_of_global_index(&self, global_index: usize, total: usize) -> (i32, usize) {
let partition = Partition::new(self.size() as usize, total);
let (rank, local) = partition.owner_of(global_index);
(rank as i32, local)
}
fn locals_from_globals(&self, global_indices: &[usize], total: usize) -> Vec<usize> {
let partition = Partition::new(self.size() as usize, total);
let this_rank = self.rank() as usize;
let mut locals = Vec::new();
if total == 0 {
return locals;
}
for &global_index in global_indices {
let (rank, local_index) = partition.owner_of(global_index);
if rank == this_rank {
locals.push(local_index);
}
}
locals
}
fn get_counts_displs(&self, buf_len: usize) -> (Vec<i32>, Vec<i32>) {
self.partition(buf_len).into_raw()
}
fn partition(&self, total: usize) -> Partition {
Partition::new(self.size() as usize, total)
}
fn get_flattened_counts_displs(
&self,
unflattened_len: usize,
internal_len: usize,
) -> (Vec<i32>, Vec<i32>) {
let mut counts = vec![0; self.size() as usize];
let mut displs = vec![0; self.size() as usize];
let chunk_size = unflattened_len / self.size() as usize;
let surplus = unflattened_len % self.size() as usize;
for i in 0..self.size() as usize {
counts[i] = if i < surplus {
(chunk_size + 1) * internal_len
} else {
chunk_size * internal_len
} as i32;
displs[i] = if i == 0 {
0
} else {
displs[i - 1] + counts[i - 1]
};
}
(counts, displs)
}
}
}
use thiserror::Error;
pub mod amplitude;
pub mod data;
pub mod execution;
pub mod expression;
pub mod kinematics;
pub mod math;
pub mod parameters;
pub mod quantum;
pub mod reaction;
pub mod resources;
pub mod variables;
pub mod vectors;
pub mod traits {
pub use crate::{amplitude::Amplitude, variables::Variable};
}
pub use amplitude::{Amplitude, AmplitudeID, AmplitudeSemanticField, AmplitudeSemanticKey};
#[cfg(feature = "execution-context-prototype")]
pub use crate::execution::{ExecutionContext, ScratchAllocator, ThreadPolicy};
pub use crate::{
data::{
BinnedDataset, Dataset, DatasetMetadata, DatasetReadOptions, Event, EventData, OwnedEvent,
},
execution::ThreadPoolManager,
expression::{CompiledExpression, CompiledExpressionNode, Evaluator, Expression},
kinematics::{DecayAngles, FrameAxes, RestFrame},
parameters::{Parameter, ParameterID, ParameterMap, Parameters},
quantum::{
allowed_projections, AllowedPartialWave, AngularMomentum, Channel, Charge, Frame, Isospin,
OrbitalAngularMomentum, Parity, PartialWave, ParticleProperties, Projection, Reflectivity,
RuleSet, SelectionRules, SpinState, Statistics,
},
reaction::{
Decay, Particle, ParticleGraph, ParticleSource, Reaction, ReactionTopology,
ResolvedTwoToTwo, TwoToTwoReaction,
},
resources::{
Cache, ComplexMatrixID, ComplexScalarID, ComplexVectorID, MatrixID, Resources, ScalarID,
VectorID,
},
variables::{
Angles, CosTheta, IntoP4Selection, Mandelstam, Mass, P4Selection, Phi, PolAngle,
PolMagnitude, Polarization,
},
vectors::{Vec3, Vec4},
};
pub const PI: f64 = std::f64::consts::PI;
pub type LadduResult<T> = Result<T, LadduError>;
#[derive(Error, Debug)]
pub enum LadduError {
#[error(transparent)]
IOError(#[from] std::io::Error),
#[error(transparent)]
ParquetError(#[from] parquet::errors::ParquetError),
#[error(transparent)]
ArrowError(#[from] arrow::error::ArrowError),
#[error(transparent)]
LookupError(#[from] shellexpand::LookupError<std::env::VarError>),
#[error("An amplitude by the name \"{name}\" is already registered!")]
RegistrationError {
name: String,
},
#[error("No registered amplitude with tag \"{name}\"!")]
AmplitudeNotFoundError {
name: String,
},
#[error("Failed to parse string: \"{name}\" does not correspond to a valid \"{object}\"!")]
ParseError {
name: String,
object: String,
},
#[error(transparent)]
BitcodeError(#[from] bitcode::Error),
#[error(transparent)]
PickleError(#[from] serde_pickle::Error),
#[error("Parameter \"{name}\" conflict: {reason}")]
ParameterConflict {
name: String,
reason: String,
},
#[error("Parameter \"{name}\" could not be registered: {reason}")]
UnregisteredParameter {
name: String,
reason: String,
},
#[error("Execution context setup failed: {reason}")]
ExecutionContextError {
reason: String,
},
#[cfg(feature = "rayon")]
#[error(transparent)]
ThreadPoolError(#[from] rayon::ThreadPoolBuildError),
#[cfg(feature = "numpy")]
#[error(transparent)]
NumpyError(#[from] numpy::FromVecError),
#[error("Required column \"{name}\" was not found in the dataset")]
MissingColumn {
name: String,
},
#[error("Column \"{name}\" has unsupported type \"{datatype}\"")]
InvalidColumnType {
name: String,
datatype: String,
},
#[error("{context} length mismatch: expected {expected}, received {actual}")]
LengthMismatch {
context: String,
expected: usize,
actual: usize,
},
#[error("Duplicate {category} name \"{name}\" provided")]
DuplicateName {
category: &'static str,
name: String,
},
#[error("Unknown {category} name \"{name}\"")]
UnknownName {
category: &'static str,
name: String,
},
#[error("{0}")]
Custom(String),
}
pub fn validate_free_parameter_len(input_len: usize, expected_len: usize) -> LadduResult<()> {
if input_len != expected_len {
return Err(LadduError::LengthMismatch {
context: "free parameter vector".to_string(),
expected: expected_len,
actual: input_len,
});
}
Ok(())
}
impl Clone for LadduError {
fn clone(&self) -> Self {
let err_string = self.to_string();
LadduError::Custom(err_string)
}
}
#[cfg(feature = "python")]
impl From<LadduError> for PyErr {
fn from(err: LadduError) -> Self {
use pyo3::exceptions::*;
let err_string = err.to_string();
match err {
LadduError::LookupError(_)
| LadduError::RegistrationError { .. }
| LadduError::AmplitudeNotFoundError { .. }
| LadduError::ParseError { .. } => PyValueError::new_err(err_string),
LadduError::ParquetError(_)
| LadduError::ArrowError(_)
| LadduError::IOError(_)
| LadduError::BitcodeError(_)
| LadduError::PickleError(_) => PyIOError::new_err(err_string),
LadduError::MissingColumn { .. } | LadduError::UnknownName { .. } => {
PyKeyError::new_err(err_string)
}
LadduError::InvalidColumnType { .. }
| LadduError::LengthMismatch { .. }
| LadduError::DuplicateName { .. }
| LadduError::ParameterConflict { .. }
| LadduError::UnregisteredParameter { .. } => PyValueError::new_err(err_string),
LadduError::ExecutionContextError { .. } => PyRuntimeError::new_err(err_string),
LadduError::Custom(_) => PyRuntimeError::new_err(err_string),
#[cfg(feature = "rayon")]
LadduError::ThreadPoolError(_) => PyRuntimeError::new_err(err_string),
#[cfg(feature = "numpy")]
LadduError::NumpyError(_) => PyValueError::new_err(err_string),
}
}
}