use crate::error::{IntegrateError, IntegrateResult};
use crate::monte_carlo::{ErrorEstimationMethod, MonteCarloOptions, MonteCarloResult};
use crate::IntegrateFloat;
use scirs2_core::ndarray::{Array1, ArrayView1};
use scirs2_core::random::prelude::*;
use scirs2_core::random::uniform::SampleUniform;
use scirs2_core::random::{Distribution, Uniform};
use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::{Arc, Mutex};
use scirs2_core::num_threads;
#[cfg(feature = "parallel")]
use scirs2_core::parallel_ops::ThreadPoolBuilder;
use scirs2_core::parallel_ops::{
IndexedParallelIterator, IntoParallelIterator, IntoParallelRefIterator, ParallelIterator,
};
#[derive(Debug, Clone)]
pub struct ParallelMonteCarloOptions<F: IntegrateFloat> {
pub n_samples: usize,
pub seed: Option<u64>,
pub error_method: ErrorEstimationMethod,
pub use_antithetic: bool,
pub n_threads: Option<usize>,
pub batch_size: usize,
pub use_chunking: bool,
pub phantom: PhantomData<F>,
}
impl<F: IntegrateFloat> Default for ParallelMonteCarloOptions<F> {
fn default() -> Self {
Self {
n_samples: 10000,
seed: None,
error_method: ErrorEstimationMethod::StandardError,
use_antithetic: false,
n_threads: None, batch_size: 1000, use_chunking: true,
phantom: PhantomData,
}
}
}
impl<F: IntegrateFloat> From<MonteCarloOptions<F>> for ParallelMonteCarloOptions<F> {
fn from(opts: MonteCarloOptions<F>) -> Self {
Self {
n_samples: opts.n_samples,
seed: opts.seed,
error_method: opts.error_method,
use_antithetic: opts.use_antithetic,
n_threads: None,
batch_size: 1000,
use_chunking: true,
phantom: PhantomData,
}
}
}
#[cfg(feature = "parallel")]
#[allow(dead_code)]
pub fn parallel_monte_carlo<F, Func>(
f: Func,
ranges: &[(F, F)],
options: Option<ParallelMonteCarloOptions<F>>,
) -> IntegrateResult<MonteCarloResult<F>>
where
F: IntegrateFloat + Send + Sync + SampleUniform,
Func: Fn(ArrayView1<F>) -> F + Sync + Send,
scirs2_core::random::StandardNormal: Distribution<F>,
{
let opts = options.unwrap_or_default();
let n_dims = ranges.len();
if n_dims == 0 {
return Err(IntegrateError::ValueError(
"Integration ranges cannot be empty".to_string(),
));
}
if opts.n_samples == 0 {
return Err(IntegrateError::ValueError(
"Number of samples must be positive".to_string(),
));
}
let pool = if let Some(n_threads) = opts.n_threads {
ThreadPoolBuilder::new()
.num_threads(n_threads)
.build()
.map_err(|e| {
IntegrateError::ComputationError(format!("Failed to create thread pool: {e}"))
})?
} else {
ThreadPoolBuilder::new().build().map_err(|e| {
IntegrateError::ComputationError(format!("Failed to create thread pool: {e}"))
})?
};
pool.install(|| {
if opts.use_chunking {
parallel_monte_carlo_chunked(&f, ranges, &opts)
} else {
parallel_monte_carlo_batched(&f, ranges, &opts)
}
})
}
#[cfg(feature = "parallel")]
#[allow(dead_code)]
fn parallel_monte_carlo_chunked<F, Func>(
f: &Func,
ranges: &[(F, F)],
opts: &ParallelMonteCarloOptions<F>,
) -> IntegrateResult<MonteCarloResult<F>>
where
F: IntegrateFloat + Send + Sync + SampleUniform,
Func: Fn(ArrayView1<F>) -> F + Sync + Send,
scirs2_core::random::StandardNormal: Distribution<F>,
{
let n_dims = ranges.len();
let n_actual_samples = if opts.use_antithetic {
opts.n_samples / 2 * 2 } else {
opts.n_samples
};
let mut volume = F::one();
for &(a, b) in ranges {
volume *= b - a;
}
let chunk_size = opts.batch_size;
let chunks: Vec<_> = (0..n_actual_samples)
.collect::<Vec<_>>()
.chunks(chunk_size)
.map(|chunk| chunk.to_vec())
.collect();
let results: Vec<_> = chunks
.par_iter()
.enumerate()
.map(|(chunk_idx, chunk)| {
let thread_seed = opts.seed.unwrap_or(42) + chunk_idx as u64;
let mut rng = StdRng::seed_from_u64(thread_seed);
let distributions: Vec<_> = ranges
.iter()
.map(|&(a, b)| Uniform::new_inclusive(a, b).expect("Operation failed"))
.collect();
let mut thread_sum = F::zero();
let mut thread_sum_sq = F::zero();
let mut thread_n_evals = 0;
let mut point = Array1::zeros(n_dims);
if opts.use_antithetic {
for _ in 0..(chunk.len() / 2) {
for (i, dist) in distributions.iter().enumerate() {
point[i] = dist.sample(&mut rng);
}
let value = f(point.view());
thread_sum += value;
thread_sum_sq += value * value;
thread_n_evals += 1;
for (i, &(a, b)) in ranges.iter().enumerate() {
point[i] = a + b - point[i];
}
let antithetic_value = f(point.view());
thread_sum += antithetic_value;
thread_sum_sq += antithetic_value * antithetic_value;
thread_n_evals += 1;
}
} else {
for _ in chunk {
for (i, dist) in distributions.iter().enumerate() {
point[i] = dist.sample(&mut rng);
}
let value = f(point.view());
thread_sum += value;
thread_sum_sq += value * value;
thread_n_evals += 1;
}
}
(thread_sum, thread_sum_sq, thread_n_evals)
})
.collect();
let (total_sum, total_sum_sq, total_n_evals) = results.into_iter().fold(
(F::zero(), F::zero(), 0),
|(sum, sum_sq, n), (thread_sum, thread_sum_sq, thread_n)| {
(sum + thread_sum, sum_sq + thread_sum_sq, n + thread_n)
},
);
compute_final_result(
total_sum,
total_sum_sq,
total_n_evals,
volume,
&opts.error_method,
)
}
#[cfg(feature = "parallel")]
#[allow(dead_code)]
fn parallel_monte_carlo_batched<F, Func>(
f: &Func,
ranges: &[(F, F)],
opts: &ParallelMonteCarloOptions<F>,
) -> IntegrateResult<MonteCarloResult<F>>
where
F: IntegrateFloat + Send + Sync + SampleUniform,
Func: Fn(ArrayView1<F>) -> F + Sync + Send,
scirs2_core::random::StandardNormal: Distribution<F>,
{
let n_dims = ranges.len();
let n_actual_samples = if opts.use_antithetic {
opts.n_samples / 2 * 2
} else {
opts.n_samples
};
let mut volume = F::one();
for &(a, b) in ranges {
volume *= b - a;
}
let n_threads = num_threads();
let samples_per_batch = n_actual_samples.div_ceil(n_threads);
let shared_sum = Arc::new(Mutex::new(F::zero()));
let shared_sum_sq = Arc::new(Mutex::new(F::zero()));
let shared_n_evals = Arc::new(Mutex::new(0usize));
(0..n_threads).into_par_iter().for_each(|batch_idx| {
let start_sample = batch_idx * samples_per_batch;
let end_sample = ((batch_idx + 1) * samples_per_batch).min(n_actual_samples);
let batch_samples = end_sample - start_sample;
if batch_samples == 0 {
return;
}
let thread_seed = opts.seed.unwrap_or(42) + batch_idx as u64;
let mut rng = StdRng::seed_from_u64(thread_seed);
let distributions: Vec<_> = ranges
.iter()
.map(|&(a, b)| Uniform::new_inclusive(a, b).expect("Operation failed"))
.collect();
let mut batch_sum = F::zero();
let mut batch_sum_sq = F::zero();
let mut batch_n_evals = 0;
let mut point = Array1::zeros(n_dims);
if opts.use_antithetic {
for _ in 0..(batch_samples / 2) {
for (i, dist) in distributions.iter().enumerate() {
point[i] = dist.sample(&mut rng);
}
let value = f(point.view());
batch_sum += value;
batch_sum_sq += value * value;
batch_n_evals += 1;
for (i, &(a, b)) in ranges.iter().enumerate() {
point[i] = a + b - point[i];
}
let antithetic_value = f(point.view());
batch_sum += antithetic_value;
batch_sum_sq += antithetic_value * antithetic_value;
batch_n_evals += 1;
}
} else {
for _ in 0..batch_samples {
for (i, dist) in distributions.iter().enumerate() {
point[i] = dist.sample(&mut rng);
}
let value = f(point.view());
batch_sum += value;
batch_sum_sq += value * value;
batch_n_evals += 1;
}
}
{
let mut sum = shared_sum.lock().expect("Operation failed");
*sum += batch_sum;
}
{
let mut sum_sq = shared_sum_sq.lock().expect("Operation failed");
*sum_sq += batch_sum_sq;
}
{
let mut n_evals = shared_n_evals.lock().expect("Operation failed");
*n_evals += batch_n_evals;
}
});
let total_sum = *shared_sum.lock().expect("Operation failed");
let total_sum_sq = *shared_sum_sq.lock().expect("Operation failed");
let total_n_evals = *shared_n_evals.lock().expect("Operation failed");
compute_final_result(
total_sum,
total_sum_sq,
total_n_evals,
volume,
&opts.error_method,
)
}
#[allow(dead_code)]
fn compute_final_result<F: IntegrateFloat>(
sum: F,
sum_sq: F,
n_evals: usize,
volume: F,
error_method: &ErrorEstimationMethod,
) -> IntegrateResult<MonteCarloResult<F>> {
let mean = sum / F::from_usize(n_evals).expect("Operation failed");
let integral_value = mean * volume;
let std_error = match error_method {
ErrorEstimationMethod::StandardError => {
let variance = (sum_sq - sum * sum / F::from_usize(n_evals).expect("Operation failed"))
/ F::from_usize(n_evals - 1).expect("Operation failed");
(variance / F::from_usize(n_evals).expect("Operation failed")).sqrt() * volume
}
ErrorEstimationMethod::BatchMeans => {
let variance = (sum_sq - sum * sum / F::from_usize(n_evals).expect("Operation failed"))
/ F::from_usize(n_evals - 1).expect("Operation failed");
(variance / F::from_usize(n_evals).expect("Operation failed")).sqrt() * volume
}
};
Ok(MonteCarloResult {
value: integral_value,
std_error,
n_evals,
})
}
#[cfg(feature = "parallel")]
#[allow(dead_code)]
pub fn adaptive_parallel_monte_carlo<F, Func>(
f: Func,
ranges: &[(F, F)],
target_variance: F,
max_samples: usize,
options: Option<ParallelMonteCarloOptions<F>>,
) -> IntegrateResult<MonteCarloResult<F>>
where
F: IntegrateFloat + Send + Sync + SampleUniform,
Func: Fn(ArrayView1<F>) -> F + Sync + Send,
scirs2_core::random::StandardNormal: Distribution<F>,
{
let opts = options.unwrap_or_default();
let initial_samples = opts.n_samples.min(max_samples / 4);
let mut current_opts = opts.clone();
current_opts.n_samples = initial_samples;
let mut result = parallel_monte_carlo(&f, ranges, Some(current_opts.clone()))?;
while result.std_error > target_variance && result.n_evals < max_samples {
let remaining_samples = max_samples - result.n_evals;
let next_batch_size = remaining_samples.min(initial_samples);
if next_batch_size == 0 {
break;
}
current_opts.n_samples = next_batch_size;
let additional_result = parallel_monte_carlo(&f, ranges, Some(current_opts.clone()))?;
let total_evals = result.n_evals + additional_result.n_evals;
let combined_value = (result.value
* F::from_usize(result.n_evals).expect("Operation failed")
+ additional_result.value
* F::from_usize(additional_result.n_evals).expect("Operation failed"))
/ F::from_usize(total_evals).expect("Operation failed");
let combined_error = (result.std_error
* result.std_error
* F::from_usize(result.n_evals).expect("Operation failed")
+ additional_result.std_error
* additional_result.std_error
* F::from_usize(additional_result.n_evals).expect("Operation failed"))
/ F::from_usize(total_evals).expect("Operation failed");
let combined_std_error = combined_error.sqrt();
result = MonteCarloResult {
value: combined_value,
std_error: combined_std_error,
n_evals: total_evals,
};
}
Ok(result)
}
#[cfg(not(feature = "parallel"))]
#[allow(dead_code)]
pub fn parallel_monte_carlo<F, Func>(
f: Func,
ranges: &[(F, F)],
options: Option<ParallelMonteCarloOptions<F>>,
) -> IntegrateResult<MonteCarloResult<F>>
where
F: IntegrateFloat + Send + Sync + SampleUniform,
Func: Fn(ArrayView1<F>) -> F + Sync + Send,
scirs2_core::random::StandardNormal: Distribution<F>,
{
let regular_opts = options.map(|opts| crate::monte_carlo::MonteCarloOptions {
n_samples: opts.n_samples,
seed: opts.seed,
error_method: opts.error_method,
use_antithetic: opts.use_antithetic,
phantom: PhantomData,
});
crate::monte_carlo::monte_carlo(f, ranges, regular_opts)
}
#[cfg(not(feature = "parallel"))]
#[allow(dead_code)]
pub fn adaptive_parallel_monte_carlo<F, Func>(
f: Func,
ranges: &[(F, F)],
target_variance: F,
max_samples: usize,
options: Option<ParallelMonteCarloOptions<F>>,
) -> IntegrateResult<MonteCarloResult<F>>
where
F: IntegrateFloat + Send + Sync + SampleUniform,
Func: Fn(ArrayView1<F>) -> F + Sync + Send,
scirs2_core::random::StandardNormal: Distribution<F>,
{
let regular_opts = options.map(|opts| crate::monte_carlo::MonteCarloOptions {
n_samples: max_samples,
seed: opts.seed,
error_method: opts.error_method,
use_antithetic: opts.use_antithetic,
phantom: PhantomData,
});
crate::monte_carlo::monte_carlo(f, ranges, regular_opts)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::monte_carlo::{ErrorEstimationMethod, MonteCarloOptions};
use approx::assert_relative_eq;
use scirs2_core::ndarray::ArrayView1;
use std::marker::PhantomData;
#[test]
#[cfg(feature = "parallel")]
fn test_parallel_monte_carlo_simple() {
let f = |x: ArrayView1<f64>| x[0];
let ranges = [(0.0, 1.0)];
let options = ParallelMonteCarloOptions {
n_samples: 100000,
n_threads: Some(2),
batch_size: 5000,
seed: Some(42),
..Default::default()
};
let result = parallel_monte_carlo(f, &ranges, Some(options)).expect("Operation failed");
assert_relative_eq!(result.value, 0.5, epsilon = 0.01);
assert!(result.std_error > 0.0);
assert_eq!(result.n_evals, 100000);
}
#[test]
#[cfg(feature = "parallel")]
fn test_parallel_monte_carlo_multidimensional() {
let f = |x: ArrayView1<f64>| x[0] * x[1];
let ranges = [(0.0, 1.0), (0.0, 1.0)];
let options = ParallelMonteCarloOptions {
n_samples: 50000,
n_threads: Some(4),
use_chunking: true,
seed: Some(123),
..Default::default()
};
let result = parallel_monte_carlo(f, &ranges, Some(options)).expect("Operation failed");
assert_relative_eq!(result.value, 0.25, epsilon = 0.02);
assert!(result.std_error > 0.0);
}
#[test]
#[cfg(feature = "parallel")]
fn test_parallel_monte_carlo_antithetic() {
let f = |x: ArrayView1<f64>| x[0] * x[0];
let ranges = [(0.0, 1.0)];
let options = ParallelMonteCarloOptions {
n_samples: 10000,
use_antithetic: true,
n_threads: Some(2),
seed: Some(456),
..Default::default()
};
let result = parallel_monte_carlo(f, &ranges, Some(options)).expect("Operation failed");
assert_relative_eq!(result.value, 1.0 / 3.0, epsilon = 0.02);
assert_eq!(result.n_evals % 2, 0);
}
#[test]
fn test_parallel_options_conversion() {
let regular_opts = MonteCarloOptions {
n_samples: 5000,
seed: Some(789),
error_method: ErrorEstimationMethod::BatchMeans,
use_antithetic: true,
phantom: PhantomData,
};
let parallel_opts: ParallelMonteCarloOptions<f64> = regular_opts.into();
assert_eq!(parallel_opts.n_samples, 5000);
assert_eq!(parallel_opts.seed, Some(789));
assert_eq!(
parallel_opts.error_method,
ErrorEstimationMethod::BatchMeans
);
assert!(parallel_opts.use_antithetic);
assert_eq!(parallel_opts.batch_size, 1000); }
#[test]
#[cfg(feature = "parallel")]
fn test_adaptive_parallel_monte_carlo() {
let f = |x: ArrayView1<f64>| (x[0] * std::f64::consts::PI).sin();
let ranges = [(0.0, 1.0)];
let target_variance = 0.001;
let max_samples = 100000;
let options = ParallelMonteCarloOptions {
n_samples: 5000, n_threads: Some(2),
seed: Some(999),
..Default::default()
};
let result =
adaptive_parallel_monte_carlo(f, &ranges, target_variance, max_samples, Some(options))
.expect("Operation failed");
assert!(result.std_error <= target_variance || result.n_evals >= max_samples);
assert_relative_eq!(result.value, 2.0 / std::f64::consts::PI, epsilon = 0.05);
}
}