#![warn(clippy::perf, clippy::style, missing_docs)]
#![allow(clippy::excessive_precision)]
use ganesh::swarms::{Particle, SwarmPositionInitializer};
use ganesh::{Point, Swarm};
#[cfg(feature = "python")]
use pyo3::PyErr;
#[cfg(feature = "mpi")]
pub mod mpi {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::OnceLock;
use lazy_static::lazy_static;
use mpi::environment::Universe;
use mpi::topology::{Process, SimpleCommunicator};
use mpi::traits::Communicator;
use parking_lot::RwLock;
lazy_static! {
static ref USE_MPI: AtomicBool = AtomicBool::new(false);
}
static MPI_UNIVERSE: OnceLock<RwLock<Option<Universe>>> = OnceLock::new();
pub const ROOT_RANK: i32 = 0;
pub fn is_root() -> bool {
if let Some(world) = crate::mpi::get_world() {
world.rank() == ROOT_RANK
} else {
false
}
}
pub fn get_world() -> Option<SimpleCommunicator> {
if let Some(universe_lock) = MPI_UNIVERSE.get() {
if let Some(universe) = &*universe_lock.read() {
let world = universe.world();
if world.size() == 1 {
return None;
}
return Some(world);
}
}
None
}
pub fn get_rank() -> Option<i32> {
get_world().map(|w| w.rank())
}
pub fn get_size() -> Option<i32> {
get_world().map(|w| w.size())
}
pub fn use_mpi(trigger: bool) {
if trigger {
USE_MPI.store(true, Ordering::SeqCst);
MPI_UNIVERSE.get_or_init(|| {
#[cfg(feature = "rayon")]
let threading = mpi::Threading::Funneled;
#[cfg(not(feature = "rayon"))]
let threading = mpi::Threading::Single;
let (universe, _threading) = mpi::initialize_with_threading(threading).unwrap();
let world = universe.world();
if world.size() == 1 {
eprintln!("Warning: MPI is enabled, but only one process is available. MPI will not be used, but single-CPU parallelism may still be used if enabled.");
finalize_mpi();
USE_MPI.store(false, Ordering::SeqCst);
RwLock::new(None)
} else {
RwLock::new(Some(universe))
}
});
}
}
pub fn finalize_mpi() {
if using_mpi() {
let mut universe = MPI_UNIVERSE.get().unwrap().write();
*universe = None;
}
}
pub fn using_mpi() -> bool {
USE_MPI.load(Ordering::SeqCst)
}
pub trait LadduMPI {
fn process_at_root(&self) -> Process<'_>;
fn is_root(&self) -> bool;
fn get_counts_displs(&self, buf_len: usize) -> (Vec<i32>, Vec<i32>);
fn get_flattened_counts_displs(
&self,
unflattened_len: usize,
internal_len: usize,
) -> (Vec<i32>, Vec<i32>);
}
impl LadduMPI for SimpleCommunicator {
fn process_at_root(&self) -> Process<'_> {
self.process_at_rank(crate::mpi::ROOT_RANK)
}
fn is_root(&self) -> bool {
self.rank() == crate::mpi::ROOT_RANK
}
fn get_counts_displs(&self, buf_len: usize) -> (Vec<i32>, Vec<i32>) {
let mut counts = vec![0; self.size() as usize];
let mut displs = vec![0; self.size() as usize];
let chunk_size = buf_len / self.size() as usize;
let surplus = buf_len % self.size() as usize;
for i in 0..self.size() as usize {
counts[i] = if i < surplus {
chunk_size + 1
} else {
chunk_size
} as i32;
displs[i] = if i == 0 {
0
} else {
displs[i - 1] + counts[i - 1]
};
}
(counts, displs)
}
fn get_flattened_counts_displs(
&self,
unflattened_len: usize,
internal_len: usize,
) -> (Vec<i32>, Vec<i32>) {
let mut counts = vec![0; self.size() as usize];
let mut displs = vec![0; self.size() as usize];
let chunk_size = unflattened_len / self.size() as usize;
let surplus = unflattened_len % self.size() as usize;
for i in 0..self.size() as usize {
counts[i] = if i < surplus {
(chunk_size + 1) * internal_len
} else {
chunk_size * internal_len
} as i32;
displs[i] = if i == 0 {
0
} else {
displs[i - 1] + counts[i - 1]
};
}
(counts, displs)
}
}
}
use thiserror::Error;
pub mod amplitudes;
pub mod data;
pub mod resources;
pub mod utils;
pub mod traits {
pub use crate::amplitudes::Amplitude;
pub use crate::utils::variables::Variable;
pub use crate::ReadWrite;
}
pub use crate::data::{open, BinnedDataset, Dataset, Event};
pub use crate::resources::{
Cache, ComplexMatrixID, ComplexScalarID, ComplexVectorID, MatrixID, ParameterID, Parameters,
Resources, ScalarID, VectorID,
};
pub use crate::utils::enums::{Channel, Frame, Sign};
pub use crate::utils::variables::{
Angles, CosTheta, Mandelstam, Mass, Phi, PolAngle, PolMagnitude, Polarization,
};
pub use crate::utils::vectors::{Vec3, Vec4};
pub use amplitudes::{
constant, parameter, AmplitudeID, Evaluator, Expression, Manager, Model, ParameterLike,
};
pub use ganesh::{Bound, Ensemble, Status};
pub use nalgebra::DVector;
pub use num::Complex;
#[cfg(not(feature = "f32"))]
pub type Float = f64;
#[cfg(feature = "f32")]
pub type Float = f32;
#[cfg(not(feature = "f32"))]
pub const PI: Float = std::f64::consts::PI;
#[cfg(feature = "f32")]
pub const PI: Float = std::f32::consts::PI;
#[derive(Error, Debug)]
pub enum LadduError {
#[error("IO Error: {0}")]
IOError(#[from] std::io::Error),
#[error("Parquet Error: {0}")]
ParquetError(#[from] parquet::errors::ParquetError),
#[error("Arrow Error: {0}")]
ArrowError(#[from] arrow::error::ArrowError),
#[error("Failed to expand path: {0}")]
LookupError(#[from] shellexpand::LookupError<std::env::VarError>),
#[error("An amplitude by the name \"{name}\" is already registered by this manager!")]
RegistrationError {
name: String,
},
#[error("No registered amplitude with name \"{name}\"!")]
AmplitudeNotFoundError {
name: String,
},
#[error("Failed to parse string: \"{name}\" does not correspond to a valid \"{object}\"!")]
ParseError {
name: String,
object: String,
},
#[error("Encoder error: {0}")]
EncodeError(#[from] bincode::error::EncodeError),
#[error("Decoder error: {0}")]
DecodeError(#[from] bincode::error::DecodeError),
#[error("Pickle conversion error: {0}")]
PickleError(#[from] serde_pickle::Error),
#[cfg(feature = "rayon")]
#[error("Error building thread pool: {0}")]
ThreadPoolError(#[from] rayon::ThreadPoolBuildError),
#[cfg(feature = "numpy")]
#[error("Numpy error: {0}")]
NumpyError(#[from] numpy::FromVecError),
#[error("{0}")]
Custom(String),
}
impl Clone for LadduError {
fn clone(&self) -> Self {
let err_string = self.to_string();
LadduError::Custom(err_string)
}
}
#[cfg(feature = "python")]
impl From<LadduError> for PyErr {
fn from(err: LadduError) -> Self {
use pyo3::exceptions::*;
let err_string = err.to_string();
match err {
LadduError::LookupError(_)
| LadduError::RegistrationError { .. }
| LadduError::AmplitudeNotFoundError { .. }
| LadduError::ParseError { .. } => PyValueError::new_err(err_string),
LadduError::ParquetError(_)
| LadduError::ArrowError(_)
| LadduError::IOError(_)
| LadduError::EncodeError(_)
| LadduError::DecodeError(_)
| LadduError::PickleError(_) => PyIOError::new_err(err_string),
LadduError::Custom(_) => PyException::new_err(err_string),
#[cfg(feature = "rayon")]
LadduError::ThreadPoolError(_) => PyException::new_err(err_string),
#[cfg(feature = "numpy")]
LadduError::NumpyError(_) => PyException::new_err(err_string),
}
}
}
use serde::{de::DeserializeOwned, Serialize};
use std::{
fmt::Debug,
fs::File,
io::{BufReader, BufWriter},
path::Path,
};
pub trait ReadWrite: Serialize + DeserializeOwned {
fn create_null() -> Self;
fn save_as<T: AsRef<str>>(&self, file_path: T) -> Result<(), LadduError> {
let expanded_path = shellexpand::full(file_path.as_ref())?;
let file_path = Path::new(expanded_path.as_ref());
let file = File::create(file_path)?;
let mut writer = BufWriter::new(file);
serde_pickle::to_writer(&mut writer, self, Default::default())?;
Ok(())
}
fn load_from<T: AsRef<str>>(file_path: T) -> Result<Self, LadduError> {
let file_path = Path::new(&*shellexpand::full(file_path.as_ref())?).canonicalize()?;
let file = File::open(file_path)?;
let reader = BufReader::new(file);
serde_pickle::from_reader(reader, Default::default()).map_err(LadduError::from)
}
}
impl ReadWrite for Status {
fn create_null() -> Self {
Status::default()
}
}
impl ReadWrite for Ensemble {
fn create_null() -> Self {
Ensemble::new(Vec::default())
}
}
impl ReadWrite for Point {
fn create_null() -> Self {
Point::default()
}
}
impl ReadWrite for Particle {
fn create_null() -> Self {
Particle::default()
}
}
impl ReadWrite for Swarm {
fn create_null() -> Self {
Swarm::new(SwarmPositionInitializer::Zero {
n_particles: 0,
n_dimensions: 0,
})
}
}
impl ReadWrite for Model {
fn create_null() -> Self {
Model {
manager: Manager::default(),
expression: Expression::default(),
}
}
}