use crate::error::StatsResult;
use scirs2_core::ndarray::{Array1, Array2, ArrayView1};
use scirs2_core::numeric::{Float, FromPrimitive, One, Zero};
use scirs2_core::random::{rngs::StdRng, Rng, RngExt, SeedableRng};
use scirs2_core::simd_ops::SimdUnifiedOps;
use std::marker::PhantomData;
use std::sync::{Arc, Mutex};
pub struct AdvancedParallelMonteCarlo<F> {
pub config: MonteCarloConfig,
pub adaptive_state: Arc<Mutex<AdaptiveState<F>>>,
pub variance_reduction: VarianceReductionConfig,
pub metrics: Arc<Mutex<IntegrationMetrics>>,
_phantom: PhantomData<F>,
}
#[derive(Debug, Clone)]
pub struct MonteCarloConfig {
pub initial_samples: usize,
pub max_samples: usize,
pub target_relative_error: f64,
pub confidence_level: f64,
pub parallel: bool,
pub num_workers: usize,
pub chunksize: usize,
pub adaptive_sampling: bool,
pub seed: Option<u64>,
pub use_gpu: bool,
}
impl Default for MonteCarloConfig {
fn default() -> Self {
Self {
initial_samples: 10000,
max_samples: 1000000,
target_relative_error: 0.01,
confidence_level: 0.95,
parallel: true,
num_workers: num_cpus::get(),
chunksize: 1000,
adaptive_sampling: true,
seed: None,
use_gpu: false,
}
}
}
#[derive(Debug, Clone)]
pub struct VarianceReductionConfig {
pub importance_sampling: bool,
pub control_variates: bool,
pub antithetic_variables: bool,
pub stratified_sampling: bool,
pub num_strata: usize,
pub control_functions: Vec<String>,
}
impl Default for VarianceReductionConfig {
fn default() -> Self {
Self {
importance_sampling: false,
control_variates: false,
antithetic_variables: true,
stratified_sampling: false,
num_strata: 10,
control_functions: vec![],
}
}
}
#[derive(Debug, Clone)]
pub struct AdaptiveState<F> {
pub n_samples_: usize,
pub running_mean: F,
pub running_variance: F,
pub regions: Vec<IntegrationRegion<F>>,
pub importance_weights: Option<Array1<F>>,
pub error_history: Vec<F>,
}
#[derive(Debug, Clone)]
pub struct IntegrationRegion<F> {
pub lower_bounds: Array1<F>,
pub upper_bounds: Array1<F>,
pub estimated_value: F,
pub estimated_error: F,
pub n_samples_: usize,
pub priority: f64,
}
#[derive(Debug, Clone, Default)]
pub struct IntegrationMetrics {
pub total_samples: usize,
pub total_time_seconds: f64,
pub samples_per_second: f64,
pub memory_usage_mb: f64,
pub parallel_efficiency: f64,
pub variance_reduction_factor: f64,
pub convergence_rate: f64,
}
#[derive(Debug, Clone)]
pub struct MonteCarloResult<F> {
pub value: F,
pub standard_error: F,
pub confidence_interval: (F, F),
pub relative_error: F,
pub n_samples_: usize,
pub metrics: IntegrationMetrics,
pub converged: bool,
}
pub trait IntegrableFunction<F>: Send + Sync
where
F: Float + Copy + std::fmt::Display,
{
fn evaluate(&self, x: &ArrayView1<F>) -> F;
fn dimension(&self) -> usize;
fn bounds(&self) -> (Array1<F>, Array1<F>);
fn importance_density(&self, x: &ArrayView1<F>) -> Option<F> {
None
}
fn control_variate(&self, x: &ArrayView1<F>) -> Option<F> {
None
}
}
impl<F> AdvancedParallelMonteCarlo<F>
where
F: Float
+ Zero
+ One
+ Copy
+ Send
+ Sync
+ SimdUnifiedOps
+ FromPrimitive
+ std::iter::Product
+ 'static
+ std::fmt::Display,
{
pub fn new(config: MonteCarloConfig, variance_reduction: VarianceReductionConfig) -> Self {
let adaptive_state = Arc::new(Mutex::new(AdaptiveState {
n_samples_: 0,
running_mean: F::zero(),
running_variance: F::zero(),
regions: vec![],
importance_weights: None,
error_history: vec![],
}));
let metrics = Arc::new(Mutex::new(IntegrationMetrics::default()));
Self {
config,
adaptive_state,
variance_reduction,
metrics,
_phantom: PhantomData,
}
}
pub fn integrate<T>(&mut self, function: &T) -> StatsResult<MonteCarloResult<F>>
where
T: IntegrableFunction<F>,
{
let start_time = std::time::Instant::now();
let (lower_bounds, upper_bounds) = function.bounds();
let _dimension = function.dimension();
{
let mut state = self.adaptive_state.lock().expect("Operation failed");
state.regions = vec![IntegrationRegion {
lower_bounds: lower_bounds.clone(),
upper_bounds: upper_bounds.clone(),
estimated_value: F::zero(),
estimated_error: F::infinity(),
n_samples_: 0,
priority: 1.0,
}];
}
let mut main_rng = match self.config.seed {
Some(seed) => StdRng::seed_from_u64(seed),
None => StdRng::from_rng(&mut scirs2_core::random::thread_rng()),
};
let mut total_samples = 0;
let mut current_estimate = F::zero();
let mut current_error = F::infinity();
let mut converged = false;
while total_samples < self.config.max_samples {
let batchsize = self
.config
.initial_samples
.min(self.config.max_samples - total_samples);
let result = if self.config.parallel {
self.parallel_sampling(function, batchsize, &mut main_rng)?
} else {
self.sequential_sampling(function, batchsize, &mut main_rng)?
};
total_samples += batchsize;
current_estimate = self.update_estimate(result, total_samples)?;
current_error = self.estimate_error(total_samples)?;
let relative_error = if current_estimate.abs()
> F::from(1e-10).expect("Failed to convert constant to float")
{
(current_error / current_estimate.abs())
.to_f64()
.expect("Operation failed")
} else {
current_error.to_f64().expect("Operation failed")
};
if relative_error < self.config.target_relative_error {
converged = true;
break;
}
if self.config.adaptive_sampling {
self.adaptive_refinement(function)?;
}
}
let z_score = self.get_z_score(self.config.confidence_level);
let margin = F::from(z_score).expect("Failed to convert to float") * current_error;
let confidence_interval = (current_estimate - margin, current_estimate + margin);
let elapsed_time = start_time.elapsed().as_secs_f64();
{
let mut metrics = self.metrics.lock().expect("Operation failed");
metrics.total_samples = total_samples;
metrics.total_time_seconds = elapsed_time;
metrics.samples_per_second = total_samples as f64 / elapsed_time;
metrics.convergence_rate = self.estimate_convergence_rate()?;
}
Ok(MonteCarloResult {
value: current_estimate,
standard_error: current_error,
confidence_interval,
relative_error: F::from(if current_estimate.abs() > F::zero() {
(current_error / current_estimate.abs())
.to_f64()
.expect("Operation failed")
} else {
current_error.to_f64().expect("Operation failed")
})
.expect("Operation failed"),
n_samples_: total_samples,
metrics: self.metrics.lock().expect("Operation failed").clone(),
converged,
})
}
fn parallel_sampling<T>(
&self,
function: &T,
n_samples_: usize,
rng: &mut StdRng,
) -> StatsResult<Array1<F>>
where
T: IntegrableFunction<F>,
{
let chunksize = self.config.chunksize;
let num_chunks = n_samples_.div_ceil(chunksize);
let seeds: Vec<u64> = (0..num_chunks).map(|_| rng.random()).collect();
let results: Vec<StatsResult<Array1<F>>> = seeds
.iter()
.enumerate()
.map(|(chunk_idx, &seed)| {
let start = chunk_idx * chunksize;
let end = ((chunk_idx + 1) * chunksize).min(n_samples_);
let actual_chunksize = end - start;
self.evaluate_chunk(function, actual_chunksize, seed)
})
.collect();
let mut combined_values = Vec::new();
for result in results {
let chunk_values = result?;
combined_values.extend(chunk_values.iter().copied());
}
Ok(Array1::from_vec(combined_values))
}
fn sequential_sampling<T>(
&self,
function: &T,
n_samples_: usize,
rng: &mut StdRng,
) -> StatsResult<Array1<F>>
where
T: IntegrableFunction<F>,
{
self.evaluate_chunk(function, n_samples_, rng.random())
}
fn evaluate_chunk<T>(
&self,
function: &T,
n_samples_: usize,
seed: u64,
) -> StatsResult<Array1<F>>
where
T: IntegrableFunction<F>,
{
let mut rng = StdRng::seed_from_u64(seed);
let (lower_bounds, upper_bounds) = function.bounds();
let dimension = function.dimension();
let mut values = Vec::with_capacity(n_samples_);
if n_samples_ >= 64 {
return self.evaluate_chunk_simd_ultra(function, n_samples_, seed);
}
for _ in 0..n_samples_ {
let mut point = Array1::zeros(dimension);
for j in 0..dimension {
let u: f64 = rng.random();
let range = upper_bounds[j] - lower_bounds[j];
point[j] =
lower_bounds[j] + F::from(u).expect("Failed to convert to float") * range;
}
let sample_value =
if self.variance_reduction.antithetic_variables && values.len() % 2 == 1 {
let antithetic_point =
self.generate_antithetic_point(&point, &lower_bounds, &upper_bounds);
let value1 = function.evaluate(&point.view());
let value2 = function.evaluate(&antithetic_point.view());
(value1 + value2) / F::from(2.0).expect("Failed to convert constant to float")
} else {
function.evaluate(&point.view())
};
let weighted_value = if self.variance_reduction.importance_sampling {
if let Some(importance_density) = function.importance_density(&point.view()) {
sample_value / importance_density
} else {
sample_value
}
} else {
sample_value
};
let final_value = if self.variance_reduction.control_variates {
if let Some(control_value) = function.control_variate(&point.view()) {
weighted_value
- F::from(0.5).expect("Failed to convert constant to float") * control_value
} else {
weighted_value
}
} else {
weighted_value
};
values.push(final_value);
}
let volume = self.compute_integration_volume(&lower_bounds, &upper_bounds);
let scaled_values = values.into_iter().map(|v| v * volume).collect();
Ok(Array1::from_vec(scaled_values))
}
fn evaluate_chunk_simd_ultra<T>(
&self,
function: &T,
n_samples_: usize,
seed: u64,
) -> StatsResult<Array1<F>>
where
T: IntegrableFunction<F>,
{
use scirs2_core::simd_ops::PlatformCapabilities;
let capabilities = PlatformCapabilities::detect();
let mut rng = StdRng::seed_from_u64(seed);
let (lower_bounds, upper_bounds) = function.bounds();
let dimension = function.dimension();
let chunk_size = if capabilities.has_avx512() {
16
} else if capabilities.has_avx2() {
8
} else {
4
};
let num_chunks = n_samples_.div_ceil(chunk_size);
let mut values = Vec::with_capacity(n_samples_);
let mut sample_points = Vec::with_capacity(chunk_size * dimension);
let mut sample_values = Vec::with_capacity(chunk_size);
let mut variance_reduction_buffer = Vec::with_capacity(chunk_size);
for _chunk_idx in 0..num_chunks {
let current_chunk_size = std::cmp::min(chunk_size, n_samples_ - values.len());
if current_chunk_size == 0 {
break;
}
sample_points.clear();
for _ in 0..current_chunk_size {
for j in 0..dimension {
let u: f64 = rng.random();
let range = upper_bounds[j] - lower_bounds[j];
let sample_coord =
lower_bounds[j] + F::from(u).expect("Failed to convert to float") * range;
sample_points.push(sample_coord.to_f64().expect("Operation failed") as f32);
}
}
sample_values.clear();
for i in 0..current_chunk_size {
let point_start = i * dimension;
let point_slice = &sample_points[point_start..point_start + dimension];
let mut point = Array1::zeros(dimension);
for (j, &val) in point_slice.iter().enumerate() {
point[j] = F::from(val as f64).expect("Failed to convert to float");
}
let sample_value = function.evaluate(&point.view());
sample_values.push(sample_value.to_f64().expect("Operation failed") as f32);
}
variance_reduction_buffer.clear();
if self.variance_reduction.antithetic_variables {
for i in (0..current_chunk_size).step_by(2) {
if i + 1 < current_chunk_size {
let avg_value = (sample_values[i] + sample_values[i + 1]) * 0.5;
variance_reduction_buffer.push(avg_value);
variance_reduction_buffer.push(avg_value);
} else {
variance_reduction_buffer.push(sample_values[i]);
}
}
} else {
variance_reduction_buffer.extend_from_slice(&sample_values);
}
let volume = self.compute_integration_volume(&lower_bounds, &upper_bounds);
let volume_f32 = volume.to_f64().expect("Operation failed") as f32;
if capabilities.has_avx2() && variance_reduction_buffer.len() >= 8 {
let mut scaled_chunk = Array1::zeros(variance_reduction_buffer.len());
let volume_vec = Array1::from_elem(variance_reduction_buffer.len(), volume_f32);
let variance_reduction_array = Array1::from_vec(variance_reduction_buffer.clone());
f32::simd_mul_f32_ultra(
&variance_reduction_array.view(),
&volume_vec.view(),
&mut scaled_chunk.view_mut(),
);
for &val in scaled_chunk.iter() {
values.push(F::from(val as f64).expect("Failed to convert to float"));
}
} else {
for &val in &variance_reduction_buffer {
values.push(F::from((val * volume_f32) as f64).expect("Operation failed"));
}
}
}
values.truncate(n_samples_);
Ok(Array1::from_vec(values))
}
fn generate_antithetic_point(
&self,
point: &Array1<F>,
lower_bounds: &Array1<F>,
upper_bounds: &Array1<F>,
) -> Array1<F> {
let mut antithetic = Array1::zeros(point.len());
for i in 0..point.len() {
let range = upper_bounds[i] - lower_bounds[i];
let normalized = (point[i] - lower_bounds[i]) / range;
let antithetic_normalized = F::one() - normalized;
antithetic[i] = lower_bounds[i] + antithetic_normalized * range;
}
antithetic
}
fn compute_integration_volume(&self, lower_bounds: &Array1<F>, upper_bounds: &Array1<F>) -> F {
upper_bounds
.iter()
.zip(lower_bounds.iter())
.map(|(&upper, &lower)| upper - lower)
.product()
}
fn update_estimate(&self, new_values: Array1<F>, total_samples: usize) -> StatsResult<F> {
let mut state = self.adaptive_state.lock().expect("Operation failed");
let batch_mean = new_values.mean().expect("Operation failed");
let batchsize = new_values.len();
if state.n_samples_ == 0 {
state.running_mean = batch_mean;
state.running_variance = if batchsize > 1 {
new_values.var(F::one())
} else {
F::zero()
};
} else {
let old_n = state.n_samples_ as f64;
let new_n = total_samples as f64;
let old_mean = state.running_mean;
state.running_mean = (old_mean * F::from(old_n).expect("Failed to convert to float")
+ batch_mean * F::from(batchsize).expect("Failed to convert to float"))
/ F::from(new_n).expect("Failed to convert to float");
let batch_var = if batchsize > 1 {
new_values.var(F::one())
} else {
F::zero()
};
let mean_diff = batch_mean - old_mean;
state.running_variance = (state.running_variance
* F::from(old_n - 1.0).expect("Failed to convert to float")
+ batch_var * F::from(batchsize - 1).expect("Failed to convert to float")
+ mean_diff
* mean_diff
* F::from(old_n * batchsize as f64 / new_n)
.expect("Failed to convert to float"))
/ F::from(new_n - 1.0).expect("Failed to convert to float");
}
state.n_samples_ = total_samples;
Ok(state.running_mean)
}
fn estimate_error(&self, n_samples_: usize) -> StatsResult<F> {
let state = self.adaptive_state.lock().expect("Operation failed");
if n_samples_ <= 1 {
return Ok(F::infinity());
}
let standard_error = (state.running_variance
/ F::from(n_samples_).expect("Failed to convert to float"))
.sqrt();
Ok(standard_error)
}
fn adaptive_refinement<T>(&self, function: &T) -> StatsResult<()>
where
T: IntegrableFunction<F>,
{
let mut state = self.adaptive_state.lock().expect("Operation failed");
if let Some(worst_region_idx) = state
.regions
.iter()
.enumerate()
.max_by(|(_, a), (_, b)| {
let ratio_a = a.estimated_error.to_f64().expect("Operation failed")
/ (a.n_samples_ + 1) as f64;
let ratio_b = b.estimated_error.to_f64().expect("Operation failed")
/ (b.n_samples_ + 1) as f64;
ratio_a
.partial_cmp(&ratio_b)
.unwrap_or(std::cmp::Ordering::Equal)
})
.map(|(idx, _)| idx)
{
let region = state.regions[worst_region_idx].clone();
if !region.lower_bounds.is_empty() {
let mid = (region.lower_bounds[0] + region.upper_bounds[0])
/ F::from(2.0).expect("Failed to convert constant to float");
let mut left_upper = region.upper_bounds.clone();
left_upper[0] = mid;
let mut right_lower = region.lower_bounds.clone();
right_lower[0] = mid;
let left_region = IntegrationRegion {
lower_bounds: region.lower_bounds.clone(),
upper_bounds: left_upper,
estimated_value: region.estimated_value
/ F::from(2.0).expect("Failed to convert constant to float"),
estimated_error: region.estimated_error
/ F::from(2.0).expect("Failed to convert constant to float"),
n_samples_: 0,
priority: 1.0,
};
let right_region = IntegrationRegion {
lower_bounds: right_lower,
upper_bounds: region.upper_bounds.clone(),
estimated_value: region.estimated_value
/ F::from(2.0).expect("Failed to convert constant to float"),
estimated_error: region.estimated_error
/ F::from(2.0).expect("Failed to convert constant to float"),
n_samples_: 0,
priority: 1.0,
};
state.regions[worst_region_idx] = left_region;
state.regions.push(right_region);
}
}
Ok(())
}
fn get_z_score(&self, confidence_level: f64) -> f64 {
match confidence_level {
x if x >= 0.99 => 2.576,
x if x >= 0.95 => 1.96,
x if x >= 0.90 => 1.645,
_ => 1.96,
}
}
fn estimate_convergence_rate(&self) -> StatsResult<f64> {
let state = self.adaptive_state.lock().expect("Operation failed");
if state.error_history.len() < 3 {
return Ok(1.0); }
let recent_errors: Vec<f64> = state
.error_history
.iter()
.rev()
.take(5)
.map(|&e| e.to_f64().expect("Operation failed"))
.collect();
if recent_errors.len() >= 2 {
let ratio = recent_errors[0] / recent_errors[recent_errors.len() - 1];
Ok(ratio.ln() / (recent_errors.len() - 1) as f64)
} else {
Ok(1.0)
}
}
}
#[allow(dead_code)]
pub fn integrate_parallel<T, F>(
function: &T,
config: Option<MonteCarloConfig>,
) -> StatsResult<MonteCarloResult<F>>
where
T: IntegrableFunction<F>,
F: Float
+ Zero
+ One
+ Copy
+ Send
+ Sync
+ SimdUnifiedOps
+ FromPrimitive
+ 'static
+ std::iter::Product
+ std::fmt::Display,
{
let config = config.unwrap_or_default();
let variance_reduction = VarianceReductionConfig::default();
let mut integrator = AdvancedParallelMonteCarlo::new(config, variance_reduction);
integrator.integrate(function)
}
pub struct TestFunction {
pub dimension: usize,
pub lower_bounds: Array1<f64>,
pub upper_bounds: Array1<f64>,
}
impl TestFunction {
pub fn new(dimension: usize) -> Self {
Self {
dimension,
lower_bounds: Array1::zeros(dimension),
upper_bounds: Array1::ones(dimension),
}
}
}
impl IntegrableFunction<f64> for TestFunction {
fn evaluate(&self, x: &ArrayView1<f64>) -> f64 {
x.iter().product()
}
fn dimension(&self) -> usize {
self.dimension
}
fn bounds(&self) -> (Array1<f64>, Array1<f64>) {
(self.lower_bounds.clone(), self.upper_bounds.clone())
}
}
pub struct GaussianFunction {
pub mean: Array1<f64>,
pub covariance: Array2<f64>,
pub lower_bounds: Array1<f64>,
pub upper_bounds: Array1<f64>,
}
impl GaussianFunction {
pub fn new(mean: Array1<f64>, covariance: Array2<f64>) -> Self {
let dimension = mean.len();
let lower_bounds = Array1::from_elem(dimension, -5.0);
let upper_bounds = Array1::from_elem(dimension, 5.0);
Self {
mean,
covariance,
lower_bounds,
upper_bounds,
}
}
}
impl IntegrableFunction<f64> for GaussianFunction {
fn evaluate(&self, x: &ArrayView1<f64>) -> f64 {
let diff = x - &self.mean;
let cov_inv = scirs2_linalg::inv(&self.covariance.view(), None).expect("Operation failed");
let quad_form = diff.dot(&cov_inv.dot(&diff));
let det = scirs2_linalg::det(&self.covariance.view(), None).expect("Operation failed");
let normalization =
1.0 / ((2.0 * std::f64::consts::PI).powf(self.mean.len() as f64 / 2.0) * det.sqrt());
normalization * (-0.5 * quad_form).exp()
}
fn dimension(&self) -> usize {
self.mean.len()
}
fn bounds(&self) -> (Array1<f64>, Array1<f64>) {
(self.lower_bounds.clone(), self.upper_bounds.clone())
}
}