use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use std::time::{SystemTime, UNIX_EPOCH};
use rayon::prelude::*;
use crate::backend::refprop::RefpropBackend;
use crate::converter::{Converter, UnitSystem};
use crate::error::*;
use crate::fluid::Fluid;
use crate::properties::*;
use crate::sys::find_dll_in_dir;
use crate::traits::FluidApi;
fn copy_dll(original: &Path, dest_dir: &Path, index: usize) -> Result<PathBuf> {
let stem = original
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("refprop");
let ext = original
.extension()
.and_then(|s| s.to_str())
.unwrap_or("");
let new_name = if ext.is_empty() {
format!("{stem}_{index}")
} else {
format!("{stem}_{index}.{ext}")
};
let dest = dest_dir.join(new_name);
fs::copy(original, &dest).map_err(|e| {
RefpropError::CalculationFailed(format!(
"Failed to copy REFPROP library to {}: {e}",
dest.display()
))
})?;
Ok(dest)
}
pub struct ParallelFluid {
workers: Vec<Mutex<RefpropBackend>>,
conv: Converter,
temp_dir: PathBuf,
n_workers: usize,
}
impl ParallelFluid {
pub fn new(fluid_name: &str) -> Result<Self> {
Self::with_units(fluid_name, UnitSystem::refprop())
}
pub fn with_units(fluid_name: &str, units: UnitSystem) -> Result<Self> {
let n = num_cpus::get().max(1);
Self::with_units_and_workers(fluid_name, units, n)
}
pub fn with_workers(fluid_name: &str, n_workers: usize) -> Result<Self> {
Self::with_units_and_workers(fluid_name, UnitSystem::refprop(), n_workers)
}
pub fn with_units_and_workers(
fluid_name: &str,
units: UnitSystem,
n_workers: usize,
) -> Result<Self> {
let n_workers = n_workers.max(1);
Fluid::load_dotenv();
let refprop_path = Fluid::find_refprop_path()?;
let data_dir = Path::new(&refprop_path);
let original_dll = find_dll_in_dir(data_dir).ok_or_else(|| {
RefpropError::LibraryNotFound(format!(
"No REFPROP shared library found in {}",
data_dir.display()
))
})?;
let temp_dir = Self::create_temp_dir()?;
let mut workers = Vec::with_capacity(n_workers);
for i in 0..n_workers {
let dll_copy = copy_dll(&original_dll, &temp_dir, i)?;
let backend =
RefpropBackend::new_isolated(fluid_name, &refprop_path, &dll_copy)?;
workers.push(Mutex::new(backend));
}
let mm = workers[0]
.lock()
.map_err(|_| {
RefpropError::CalculationFailed("Worker lock poisoned during init".into())
})?
.molar_mass_mix_direct();
let conv = Converter::new(units, mm);
Ok(Self {
workers,
conv,
temp_dir,
n_workers,
})
}
pub fn mixture(components: &[(&str, f64)]) -> Result<Self> {
Self::mixture_with_units(components, UnitSystem::refprop())
}
pub fn mixture_with_units(
components: &[(&str, f64)],
units: UnitSystem,
) -> Result<Self> {
let n = num_cpus::get().max(1);
Self::mixture_with_units_and_workers(components, units, n)
}
pub fn mixture_with_units_and_workers(
components: &[(&str, f64)],
units: UnitSystem,
n_workers: usize,
) -> Result<Self> {
let n_workers = n_workers.max(1);
Fluid::load_dotenv();
let refprop_path = Fluid::find_refprop_path()?;
let data_dir = Path::new(&refprop_path);
let original_dll = find_dll_in_dir(data_dir).ok_or_else(|| {
RefpropError::LibraryNotFound(format!(
"No REFPROP shared library found in {}",
data_dir.display()
))
})?;
let temp_dir = Self::create_temp_dir()?;
let mut workers = Vec::with_capacity(n_workers);
for i in 0..n_workers {
let dll_copy = copy_dll(&original_dll, &temp_dir, i)?;
let backend = RefpropBackend::new_mixture_isolated(
components,
&refprop_path,
&dll_copy,
)?;
workers.push(Mutex::new(backend));
}
let mm = workers[0]
.lock()
.map_err(|_| {
RefpropError::CalculationFailed("Worker lock poisoned during init".into())
})?
.molar_mass_mix_direct();
let conv = Converter::new(units, mm);
Ok(Self {
workers,
conv,
temp_dir,
n_workers,
})
}
pub fn worker_count(&self) -> usize {
self.n_workers
}
pub fn converter(&self) -> &Converter {
&self.conv
}
pub fn get(
&self,
output: &str,
key1: &str,
val1: f64,
key2: &str,
val2: f64,
) -> Result<f64> {
let v1 = self.conv.input_to_rp(key1, val1)?;
let v2 = self.conv.input_to_rp(key2, val2)?;
let guard = self.lock_worker(0)?;
let raw = guard.get_direct(output, key1, v1, key2, v2)?;
Ok(self.conv.output_from_rp(output, raw))
}
pub fn props_tp(&self, t: f64, p: f64) -> Result<ThermoProp> {
let guard = self.lock_worker(0)?;
let raw = guard.props_tp_direct(self.conv.t_to_rp(t), self.conv.p_to_rp(p))?;
Ok(self.convert_thermo(raw))
}
pub fn props_ph(&self, p: f64, h: f64) -> Result<ThermoProp> {
let guard = self.lock_worker(0)?;
let raw = guard.props_ph_direct(self.conv.p_to_rp(p), self.conv.h_to_rp(h))?;
Ok(self.convert_thermo(raw))
}
pub fn props_ps(&self, p: f64, s: f64) -> Result<ThermoProp> {
let guard = self.lock_worker(0)?;
let raw = guard.props_ps_direct(self.conv.p_to_rp(p), self.conv.s_to_rp(s))?;
Ok(self.convert_thermo(raw))
}
pub fn props_tq(&self, t: f64, q: f64) -> Result<ThermoProp> {
let guard = self.lock_worker(0)?;
let raw = guard.props_tq_direct(self.conv.t_to_rp(t), self.conv.q_to_rp(q)?)?;
Ok(self.convert_thermo(raw))
}
pub fn props_pq(&self, p: f64, q: f64) -> Result<ThermoProp> {
let guard = self.lock_worker(0)?;
let raw = guard.props_pq_direct(self.conv.p_to_rp(p), self.conv.q_to_rp(q)?)?;
Ok(self.convert_thermo(raw))
}
pub fn props_td(&self, t: f64, d: f64) -> Result<ThermoProp> {
let guard = self.lock_worker(0)?;
let raw = guard.props_td_direct(self.conv.t_to_rp(t), self.conv.d_to_rp(d))?;
Ok(self.convert_thermo(raw))
}
pub fn props_th(&self, t: f64, h: f64) -> Result<ThermoProp> {
let guard = self.lock_worker(0)?;
let raw = guard.props_th_direct(self.conv.t_to_rp(t), self.conv.h_to_rp(h))?;
Ok(self.convert_thermo(raw))
}
pub fn props_ts(&self, t: f64, s: f64) -> Result<ThermoProp> {
let guard = self.lock_worker(0)?;
let raw = guard.props_ts_direct(self.conv.t_to_rp(t), self.conv.s_to_rp(s))?;
Ok(self.convert_thermo(raw))
}
pub fn props_pd(&self, p: f64, d: f64) -> Result<ThermoProp> {
let guard = self.lock_worker(0)?;
let raw = guard.props_pd_direct(self.conv.p_to_rp(p), self.conv.d_to_rp(d))?;
Ok(self.convert_thermo(raw))
}
pub fn props_dh(&self, d: f64, h: f64) -> Result<ThermoProp> {
let guard = self.lock_worker(0)?;
let raw = guard.props_dh_direct(self.conv.d_to_rp(d), self.conv.h_to_rp(h))?;
Ok(self.convert_thermo(raw))
}
pub fn props_ds(&self, d: f64, s: f64) -> Result<ThermoProp> {
let guard = self.lock_worker(0)?;
let raw = guard.props_ds_direct(self.conv.d_to_rp(d), self.conv.s_to_rp(s))?;
Ok(self.convert_thermo(raw))
}
pub fn props_hs(&self, h: f64, s: f64) -> Result<ThermoProp> {
let guard = self.lock_worker(0)?;
let raw = guard.props_hs_direct(self.conv.h_to_rp(h), self.conv.s_to_rp(s))?;
Ok(self.convert_thermo(raw))
}
pub fn saturation_p(&self, p: f64) -> Result<SaturationProps> {
let guard = self.lock_worker(0)?;
let raw = guard.saturation_p_direct(self.conv.p_to_rp(p))?;
Ok(self.convert_sat(raw))
}
pub fn saturation_t(&self, t: f64) -> Result<SaturationProps> {
let guard = self.lock_worker(0)?;
let raw = guard.saturation_t_direct(self.conv.t_to_rp(t))?;
Ok(self.convert_sat(raw))
}
pub fn transport(&self, t: f64, d: f64) -> Result<TransportProps> {
let guard = self.lock_worker(0)?;
let raw = guard.transport_direct(self.conv.t_to_rp(t), self.conv.d_to_rp(d))?;
Ok(TransportProps {
viscosity: self.conv.eta_from_rp(raw.viscosity),
thermal_conductivity: self.conv.tcx_from_rp(raw.thermal_conductivity),
})
}
pub fn info(&self) -> Result<FluidInfo> {
let guard = self.lock_worker(0)?;
Ok(guard.fluid_info_direct())
}
pub fn critical_point(&self) -> Result<CriticalProps> {
let guard = self.lock_worker(0)?;
let raw = guard.critical_point_direct()?;
Ok(CriticalProps {
temperature: self.conv.t_from_rp(raw.temperature),
pressure: self.conv.p_from_rp(raw.pressure),
density: self.conv.d_from_rp(raw.density),
})
}
pub fn par_get(
&self,
output: &str,
key1: &str,
vals1: &[f64],
key2: &str,
vals2: &[f64],
) -> Result<Vec<f64>> {
if vals1.len() != vals2.len() {
return Err(RefpropError::InvalidInput(format!(
"vals1.len() ({}) != vals2.len() ({})",
vals1.len(),
vals2.len()
)));
}
if vals1.is_empty() {
return Ok(vec![]);
}
let inputs: Vec<(f64, f64)> = vals1.iter().copied().zip(vals2.iter().copied()).collect();
let chunk_size = (inputs.len() + self.n_workers - 1) / self.n_workers;
let results: Vec<Vec<Result<f64>>> = inputs
.chunks(chunk_size)
.enumerate()
.collect::<Vec<_>>()
.into_par_iter()
.map(|(worker_idx, chunk)| {
let guard = self.workers[worker_idx].lock().unwrap();
chunk
.iter()
.map(|(v1, v2)| {
let v1_rp = self.conv.input_to_rp(key1, *v1)?;
let v2_rp = self.conv.input_to_rp(key2, *v2)?;
let raw = guard.get_direct(output, key1, v1_rp, key2, v2_rp)?;
Ok(self.conv.output_from_rp(output, raw))
})
.collect()
})
.collect();
results.into_iter().flatten().collect()
}
pub fn par_props_tp(&self, inputs: &[(f64, f64)]) -> Vec<Result<ThermoProp>> {
self.par_flash(inputs, |guard, t, p| {
guard
.props_tp_direct(self.conv.t_to_rp(t), self.conv.p_to_rp(p))
.map(|raw| self.convert_thermo(raw))
})
}
pub fn par_props_ph(&self, inputs: &[(f64, f64)]) -> Vec<Result<ThermoProp>> {
self.par_flash(inputs, |guard, p, h| {
guard
.props_ph_direct(self.conv.p_to_rp(p), self.conv.h_to_rp(h))
.map(|raw| self.convert_thermo(raw))
})
}
pub fn par_props_ps(&self, inputs: &[(f64, f64)]) -> Vec<Result<ThermoProp>> {
self.par_flash(inputs, |guard, p, s| {
guard
.props_ps_direct(self.conv.p_to_rp(p), self.conv.s_to_rp(s))
.map(|raw| self.convert_thermo(raw))
})
}
pub fn par_props_tq(&self, inputs: &[(f64, f64)]) -> Vec<Result<ThermoProp>> {
self.par_flash(inputs, |guard, t, q| {
let q_rp = self.conv.q_to_rp(q)?;
guard
.props_tq_direct(self.conv.t_to_rp(t), q_rp)
.map(|raw| self.convert_thermo(raw))
})
}
pub fn par_props_pq(&self, inputs: &[(f64, f64)]) -> Vec<Result<ThermoProp>> {
self.par_flash(inputs, |guard, p, q| {
let q_rp = self.conv.q_to_rp(q)?;
guard
.props_pq_direct(self.conv.p_to_rp(p), q_rp)
.map(|raw| self.convert_thermo(raw))
})
}
fn lock_worker(
&self,
idx: usize,
) -> Result<std::sync::MutexGuard<'_, RefpropBackend>> {
self.workers[idx].lock().map_err(|_| {
RefpropError::CalculationFailed("Worker lock poisoned".into())
})
}
fn create_temp_dir() -> Result<PathBuf> {
let ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let dir = std::env::temp_dir().join(format!(
"refprop_pool_{}_{ts}",
std::process::id()
));
fs::create_dir_all(&dir).map_err(|e| {
RefpropError::CalculationFailed(format!(
"Failed to create temp directory {}: {e}",
dir.display()
))
})?;
Ok(dir)
}
fn par_flash<F>(&self, inputs: &[(f64, f64)], compute: F) -> Vec<Result<ThermoProp>>
where
F: Fn(&RefpropBackend, f64, f64) -> Result<ThermoProp> + Sync,
{
if inputs.is_empty() {
return vec![];
}
let chunk_size = (inputs.len() + self.n_workers - 1) / self.n_workers;
inputs
.chunks(chunk_size)
.enumerate()
.collect::<Vec<_>>()
.into_par_iter()
.map(|(worker_idx, chunk)| {
let guard = self.workers[worker_idx].lock().unwrap();
chunk
.iter()
.map(|(a, b)| compute(&guard, *a, *b))
.collect::<Vec<_>>()
})
.flatten()
.collect()
}
fn convert_sat(&self, raw: SaturationProps) -> SaturationProps {
SaturationProps {
temperature: self.conv.t_from_rp(raw.temperature),
pressure: self.conv.p_from_rp(raw.pressure),
density_liquid: self.conv.d_from_rp(raw.density_liquid),
density_vapor: self.conv.d_from_rp(raw.density_vapor),
}
}
fn convert_thermo(&self, raw: ThermoProp) -> ThermoProp {
ThermoProp {
temperature: self.conv.t_from_rp(raw.temperature),
pressure: self.conv.p_from_rp(raw.pressure),
density: self.conv.d_from_rp(raw.density),
enthalpy: self.conv.h_from_rp(raw.enthalpy),
entropy: self.conv.s_from_rp(raw.entropy),
cv: self.conv.s_from_rp(raw.cv),
cp: self.conv.s_from_rp(raw.cp),
sound_speed: raw.sound_speed,
quality: self.conv.q_from_rp(raw.quality),
internal_energy: self.conv.h_from_rp(raw.internal_energy),
}
}
}
impl FluidApi for ParallelFluid {
fn get(&self, output: &str, key1: &str, val1: f64, key2: &str, val2: f64) -> Result<f64> {
self.get(output, key1, val1, key2, val2)
}
fn props_tp(&self, t: f64, p: f64) -> Result<ThermoProp> { self.props_tp(t, p) }
fn props_ph(&self, p: f64, h: f64) -> Result<ThermoProp> { self.props_ph(p, h) }
fn props_ps(&self, p: f64, s: f64) -> Result<ThermoProp> { self.props_ps(p, s) }
fn props_td(&self, t: f64, d: f64) -> Result<ThermoProp> { self.props_td(t, d) }
fn props_th(&self, t: f64, h: f64) -> Result<ThermoProp> { self.props_th(t, h) }
fn props_ts(&self, t: f64, s: f64) -> Result<ThermoProp> { self.props_ts(t, s) }
fn props_pd(&self, p: f64, d: f64) -> Result<ThermoProp> { self.props_pd(p, d) }
fn props_dh(&self, d: f64, h: f64) -> Result<ThermoProp> { self.props_dh(d, h) }
fn props_ds(&self, d: f64, s: f64) -> Result<ThermoProp> { self.props_ds(d, s) }
fn props_hs(&self, h: f64, s: f64) -> Result<ThermoProp> { self.props_hs(h, s) }
fn props_tq(&self, t: f64, q: f64) -> Result<ThermoProp> { self.props_tq(t, q) }
fn props_pq(&self, p: f64, q: f64) -> Result<ThermoProp> { self.props_pq(p, q) }
fn saturation_p(&self, p: f64) -> Result<SaturationProps> { self.saturation_p(p) }
fn saturation_t(&self, t: f64) -> Result<SaturationProps> { self.saturation_t(t) }
fn transport(&self, t: f64, d: f64) -> Result<TransportProps> { self.transport(t, d) }
fn critical_point(&self) -> Result<CriticalProps> { self.critical_point() }
fn info(&self) -> Result<FluidInfo> { self.info() }
fn converter(&self) -> &Converter { self.converter() }
}
impl Drop for ParallelFluid {
fn drop(&mut self) {
self.workers.clear();
let _ = fs::remove_dir_all(&self.temp_dir);
}
}