use std::{
cmp::Ordering,
os::raw::{c_char, c_double, c_int, c_void},
ptr,
string::FromUtf8Error,
sync::RwLock,
thread::{self, ThreadId},
};
use conv::ConvUtil;
use once_cell::sync::Lazy;
use crate::{attribute::AppNum, ffi};
use crate::{attribute::UniverseSize, traits::FromRaw};
use crate::{
topology::traits::AnyCommunicator,
topology::{Communicator, InterCommunicator, SimpleCommunicator},
traits::AsRaw,
};
use crate::{with_uninitialized, with_uninitialized2};
pub(crate) struct UniverseState {
#[allow(unused)]
pub main_thread: ThreadId,
}
pub(crate) static UNIVERSE_STATE: Lazy<RwLock<Option<UniverseState>>> =
Lazy::new(|| RwLock::new(None));
pub struct Universe {
buffer: Option<Vec<u8>>,
}
impl Universe {
pub fn world(&self) -> SimpleCommunicator {
SimpleCommunicator::world()
}
pub fn size(&self) -> Option<usize> {
self.world()
.get_attr::<UniverseSize>()
.map(|s| usize::try_from(s).expect("universe size must be non-negative"))
}
pub fn appnum(&self) -> Option<isize> {
self.world().get_attr::<AppNum>().map(isize::from)
}
pub fn buffer_size(&self) -> usize {
self.buffer.as_ref().map_or(0, Vec::len)
}
pub fn set_buffer_size(&mut self, size: usize) {
self.detach_buffer();
if size > 0 {
let mut buffer = vec![0; size];
unsafe {
ffi::MPI_Buffer_attach(
buffer.as_mut_ptr() as _,
buffer
.len()
.value_as()
.expect("Buffer length exceeds the range of a C int."),
);
}
self.buffer = Some(buffer);
}
}
pub fn detach_buffer(&mut self) {
if let Some(buffer) = self.buffer.take() {
let mut addr: *const c_void = ptr::null();
let addr_ptr: *mut *const c_void = &mut addr;
let mut size: c_int = 0;
unsafe {
ffi::MPI_Buffer_detach(addr_ptr as *mut c_void, &mut size);
assert_eq!(addr, buffer.as_ptr() as _);
}
assert_eq!(
size,
buffer
.len()
.value_as()
.expect("Buffer length exceeds the range of a C int.")
);
}
}
pub fn disconnect_parent(&mut self) {
if let Some(parent) = self.world().parent() {
let _p = unsafe { InterCommunicator::from_raw(parent.as_raw()) };
}
}
fn free_attribute_keys(&mut self) {
let mut comm_attrs = crate::attribute::COMM_ATTRS.write().unwrap();
for (_, v) in comm_attrs.drain() {
let mut k = v.as_raw();
unsafe { ffi::MPI_Comm_free_keyval(&mut k) };
}
}
}
impl Drop for Universe {
fn drop(&mut self) {
let mut _universe_state = UNIVERSE_STATE
.write()
.expect("rsmpi internal error: UNIVERSE_STATE lock poisoned");
self.detach_buffer();
self.disconnect_parent();
self.free_attribute_keys();
unsafe {
ffi::MPI_Finalize();
}
}
}
#[derive(Copy, Clone, PartialEq, Eq, Debug)]
pub enum Threading {
Single,
Funneled,
Serialized,
Multiple,
}
impl Threading {
fn as_raw(self) -> c_int {
match self {
Threading::Single => unsafe { ffi::RSMPI_THREAD_SINGLE },
Threading::Funneled => unsafe { ffi::RSMPI_THREAD_FUNNELED },
Threading::Serialized => unsafe { ffi::RSMPI_THREAD_SERIALIZED },
Threading::Multiple => unsafe { ffi::RSMPI_THREAD_MULTIPLE },
}
}
}
impl PartialOrd<Threading> for Threading {
fn partial_cmp(&self, other: &Threading) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for Threading {
fn cmp(&self, other: &Threading) -> Ordering {
self.as_raw().cmp(&other.as_raw())
}
}
impl From<c_int> for Threading {
fn from(i: c_int) -> Threading {
if i == unsafe { ffi::RSMPI_THREAD_SINGLE } {
return Threading::Single;
} else if i == unsafe { ffi::RSMPI_THREAD_FUNNELED } {
return Threading::Funneled;
} else if i == unsafe { ffi::RSMPI_THREAD_SERIALIZED } {
return Threading::Serialized;
} else if i == unsafe { ffi::RSMPI_THREAD_MULTIPLE } {
return Threading::Multiple;
}
panic!("Unknown threading level: {}", i)
}
}
pub(crate) fn is_initialized() -> bool {
unsafe { with_uninitialized(|initialized| ffi::MPI_Initialized(initialized)).1 != 0 }
}
#[allow(unused)]
pub(crate) fn is_finalized() -> bool {
unsafe { with_uninitialized(|finalized| ffi::MPI_Finalized(finalized)).1 != 0 }
}
pub fn initialize() -> Option<Universe> {
initialize_with_threading(Threading::Single).map(|x| x.0)
}
pub fn initialize_with_threading(threading: Threading) -> Option<(Universe, Threading)> {
let mut universe_state = UNIVERSE_STATE
.write()
.expect("rsmpi internal error: UNIVERSE_STATE lock poisoned");
if is_initialized() {
return None;
}
let (_, provided) = unsafe {
with_uninitialized(|provided| {
ffi::MPI_Init_thread(
ptr::null_mut(),
ptr::null_mut(),
threading.as_raw(),
provided,
)
})
};
*universe_state = Some(UniverseState {
main_thread: thread::current().id(),
});
Some((Universe { buffer: None }, provided.into()))
}
pub fn threading_support() -> Threading {
unsafe {
with_uninitialized(|threading| ffi::MPI_Query_thread(threading))
.1
.into()
}
}
pub fn version() -> (c_int, c_int) {
let (_, version, subversion) = unsafe {
with_uninitialized2(|version, subversion| ffi::MPI_Get_version(version, subversion))
};
(version, subversion)
}
pub fn library_version() -> Result<String, FromUtf8Error> {
let bufsize = unsafe { ffi::RSMPI_MAX_LIBRARY_VERSION_STRING }
.value_as()
.unwrap_or_else(|_| {
panic!(
"MPI_MAX_LIBRARY_SIZE ({}) cannot be expressed as a usize.",
unsafe { ffi::RSMPI_MAX_LIBRARY_VERSION_STRING }
)
});
let mut buf = vec![0u8; bufsize];
let mut len: c_int = 0;
unsafe {
ffi::MPI_Get_library_version(buf.as_mut_ptr() as *mut c_char, &mut len);
}
buf.truncate(len.value_as().unwrap_or_else(|_| {
panic!(
"Length of library version string ({}) cannot \
be expressed as a usize.",
len
)
}));
String::from_utf8(buf)
}
pub fn processor_name() -> Result<String, FromUtf8Error> {
let bufsize = unsafe { ffi::RSMPI_MAX_PROCESSOR_NAME }
.value_as()
.unwrap_or_else(|_| {
panic!(
"MPI_MAX_LIBRARY_SIZE ({}) \
cannot be expressed as a \
usize.",
unsafe { ffi::RSMPI_MAX_PROCESSOR_NAME }
)
});
let mut buf = vec![0u8; bufsize];
let mut len: c_int = 0;
unsafe {
ffi::MPI_Get_processor_name(buf.as_mut_ptr() as *mut c_char, &mut len);
}
buf.truncate(len.value_as().unwrap_or_else(|_| {
panic!(
"Length of processor name string ({}) cannot be \
expressed as a usize.",
len
)
}));
String::from_utf8(buf)
}
pub fn time() -> c_double {
unsafe { ffi::RSMPI_Wtime() }
}
pub fn time_resolution() -> c_double {
unsafe { ffi::RSMPI_Wtick() }
}