use crate::error::OptimizeError;
use crate::unconstrained::OptimizeResult;
use scirs2_core::ndarray::{Array1, Array2};
use scirs2_core::random::{Rng, RngExt};
use std::cmp::min;
use std::future::Future;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, Mutex, RwLock};
use tokio::task::JoinHandle;
#[derive(Debug, Clone)]
pub struct AsyncOptimizationConfig {
pub max_workers: usize,
pub evaluation_timeout: Option<Duration>,
pub completion_timeout: Option<Duration>,
pub slow_evaluation_strategy: SlowEvaluationStrategy,
pub min_evaluations: usize,
}
#[derive(Debug, Clone)]
pub enum SlowEvaluationStrategy {
WaitAll,
CancelSlow { timeout: Duration },
UsePartial { min_fraction: f64 },
}
#[derive(Debug, Clone)]
pub struct EvaluationRequest {
pub id: usize,
pub point: Array1<f64>,
pub generation: usize,
pub submitted_at: Instant,
}
#[derive(Debug, Clone)]
pub struct EvaluationResult {
pub id: usize,
pub point: Array1<f64>,
pub value: f64,
pub generation: usize,
pub evaluation_time: Duration,
pub completed_at: Instant,
}
#[derive(Debug, Clone)]
pub struct AsyncOptimizationStats {
pub total_submitted: usize,
pub total_completed: usize,
pub total_cancelled: usize,
pub avg_evaluation_time: Duration,
pub min_evaluation_time: Duration,
pub max_evaluation_time: Duration,
pub active_workers: usize,
pub total_time: Duration,
}
pub struct AsyncDifferentialEvolution {
config: AsyncOptimizationConfig,
population_size: usize,
dimensions: usize,
bounds: Option<(Array1<f64>, Array1<f64>)>,
mutation_factor: f64,
crossover_probability: f64,
max_generations: usize,
tolerance: f64,
}
impl Default for AsyncOptimizationConfig {
fn default() -> Self {
let max_workers = std::thread::available_parallelism()
.map(|p| p.get())
.unwrap_or(4);
Self {
max_workers,
evaluation_timeout: Some(Duration::from_secs(300)), completion_timeout: Some(Duration::from_secs(60)), slow_evaluation_strategy: SlowEvaluationStrategy::UsePartial { min_fraction: 0.8 },
min_evaluations: 10,
}
}
}
impl AsyncDifferentialEvolution {
pub fn new(
dimensions: usize,
population_size: Option<usize>,
config: Option<AsyncOptimizationConfig>,
) -> Self {
let pop_size = population_size.unwrap_or(std::cmp::max(4, dimensions * 10));
Self {
config: config.unwrap_or_default(),
population_size: pop_size,
dimensions,
bounds: None,
mutation_factor: 0.8,
crossover_probability: 0.7,
max_generations: 1000,
tolerance: 1e-6,
}
}
pub fn with_bounds(
mut self,
lower: Array1<f64>,
upper: Array1<f64>,
) -> Result<Self, OptimizeError> {
if lower.len() != self.dimensions || upper.len() != self.dimensions {
return Err(OptimizeError::ValueError(
"Bounds dimensions must match problem dimensions".to_string(),
));
}
for (&l, &u) in lower.iter().zip(upper.iter()) {
if l >= u {
return Err(OptimizeError::ValueError(
"Lower bounds must be less than upper bounds".to_string(),
));
}
}
self.bounds = Some((lower, upper));
Ok(self)
}
pub fn with_parameters(
mut self,
mutation_factor: f64,
crossover_probability: f64,
max_generations: usize,
tolerance: f64,
) -> Self {
self.mutation_factor = mutation_factor;
self.crossover_probability = crossover_probability;
self.max_generations = max_generations;
self.tolerance = tolerance;
self
}
pub async fn optimize<F, Fut>(
&self,
objective_fn: F,
) -> Result<(OptimizeResult<f64>, AsyncOptimizationStats), OptimizeError>
where
F: Fn(Array1<f64>) -> Fut + Send + Sync + Clone + 'static,
Fut: Future<Output = f64> + Send + 'static,
{
let start_time = Instant::now();
let mut population = self.initialize_population();
let mut fitness_values = vec![f64::INFINITY; self.population_size];
let (request_tx, request_rx) = mpsc::unbounded_channel::<EvaluationRequest>();
let (result_tx, mut result_rx) = mpsc::unbounded_channel::<EvaluationResult>();
let stats = Arc::new(RwLock::new(AsyncOptimizationStats {
total_submitted: 0,
total_completed: 0,
total_cancelled: 0,
avg_evaluation_time: Duration::from_millis(0),
min_evaluation_time: Duration::from_secs(u64::MAX),
max_evaluation_time: Duration::from_millis(0),
active_workers: 0,
total_time: Duration::from_millis(0),
}));
let worker_handles = self
.spawn_workers(objective_fn, request_rx, result_tx.clone(), stats.clone())
.await;
let mut pending_evaluations = std::collections::HashMap::new();
let mut request_id = 0;
for (i, individual) in population.outer_iter().enumerate() {
let request = EvaluationRequest {
id: request_id,
point: individual.to_owned(),
generation: 0,
submitted_at: Instant::now(),
};
pending_evaluations.insert(request_id, i);
request_tx.send(request)?;
request_id += 1;
}
let mut best_individual = Array1::zeros(self.dimensions);
let mut best_fitness = f64::INFINITY;
let mut generation = 0;
let mut completed_in_generation = 0;
while generation < self.max_generations {
let timeout_duration = self
.config
.completion_timeout
.unwrap_or(Duration::from_secs(60));
match tokio::time::timeout(timeout_duration, result_rx.recv()).await {
Ok(Some(result)) => {
if result.generation == generation
&& pending_evaluations.contains_key(&result.id)
{
if let Some(individual_index) = pending_evaluations.remove(&result.id) {
fitness_values[individual_index] = result.value;
completed_in_generation += 1;
if result.value < best_fitness {
best_fitness = result.value;
best_individual = result.point.clone();
}
self.update_stats(&stats, &result).await;
}
}
if completed_in_generation >= self.population_size
|| self
.should_proceed_with_partial_results(&stats, completed_in_generation)
.await
{
if completed_in_generation < self.population_size {
self.handle_incomplete_generation(
&mut fitness_values,
completed_in_generation,
);
}
if self.check_convergence(&fitness_values) {
break;
}
generation += 1;
completed_in_generation = 0;
let new_population =
self.generate_next_population(&population, &fitness_values);
population = new_population;
for (i, individual) in population.outer_iter().enumerate() {
let request = EvaluationRequest {
id: request_id,
point: individual.to_owned(),
generation,
submitted_at: Instant::now(),
};
pending_evaluations.insert(request_id, i);
request_tx.send(request)?;
request_id += 1;
}
fitness_values.fill(f64::INFINITY);
}
}
Ok(None) => {
break;
}
Err(_) => {
match self.config.slow_evaluation_strategy {
SlowEvaluationStrategy::WaitAll => continue,
SlowEvaluationStrategy::CancelSlow { .. }
| SlowEvaluationStrategy::UsePartial { .. } => {
if completed_in_generation >= self.config.min_evaluations {
self.handle_incomplete_generation(
&mut fitness_values,
completed_in_generation,
);
break;
}
}
}
}
}
}
drop(request_tx);
for handle in worker_handles {
let _ = handle.await;
}
let final_stats = {
let mut stats_guard = stats.write().await;
stats_guard.total_time = start_time.elapsed();
stats_guard.clone()
};
let result = OptimizeResult {
x: best_individual,
fun: best_fitness,
nit: generation,
func_evals: final_stats.total_completed,
nfev: final_stats.total_completed,
jacobian: None,
hessian: None,
success: best_fitness.is_finite(),
message: format!(
"Async differential evolution completed after {} generations",
generation
),
};
Ok((result, final_stats))
}
fn initialize_population(&self) -> Array2<f64> {
let mut population = Array2::zeros((self.population_size, self.dimensions));
let mut rng = scirs2_core::random::rng();
if let Some((ref lower, ref upper)) = self.bounds {
for mut individual in population.outer_iter_mut() {
for (j, gene) in individual.iter_mut().enumerate() {
*gene = lower[j] + rng.random::<f64>() * (upper[j] - lower[j]);
}
}
} else {
for mut individual in population.outer_iter_mut() {
for gene in individual.iter_mut() {
*gene = rng.random::<f64>() * 2.0 - 1.0; }
}
}
population
}
async fn spawn_workers<F, Fut>(
&self,
objective_fn: F,
request_rx: mpsc::UnboundedReceiver<EvaluationRequest>,
result_tx: mpsc::UnboundedSender<EvaluationResult>,
stats: Arc<RwLock<AsyncOptimizationStats>>,
) -> Vec<JoinHandle<()>>
where
F: Fn(Array1<f64>) -> Fut + Send + Sync + Clone + 'static,
Fut: Future<Output = f64> + Send + 'static,
{
let request_rx = Arc::new(Mutex::new(request_rx));
let mut handles = Vec::new();
for _worker_id in 0..self.config.max_workers {
let objective_fn = objective_fn.clone();
let request_rx = request_rx.clone();
let result_tx = result_tx.clone();
let stats = stats.clone();
let config = self.config.clone();
let handle = tokio::spawn(async move {
loop {
let request = {
let mut rx = request_rx.lock().await;
rx.recv().await
};
match request {
Some(req) => {
{
let mut stats_guard = stats.write().await;
stats_guard.active_workers += 1;
stats_guard.total_submitted += 1;
}
let start_time = Instant::now();
let evaluation_result = if let Some(timeout) = config.evaluation_timeout
{
tokio::time::timeout(timeout, objective_fn(req.point.clone())).await
} else {
Ok(objective_fn(req.point.clone()).await)
};
let evaluation_time = start_time.elapsed();
match evaluation_result {
Ok(value) => {
let result = EvaluationResult {
id: req.id,
point: req.point,
value,
generation: req.generation,
evaluation_time,
completed_at: Instant::now(),
};
if result_tx.send(result).is_err() {
break; }
}
Err(_) => {
let mut stats_guard = stats.write().await;
stats_guard.total_cancelled += 1;
}
}
{
let mut stats_guard = stats.write().await;
stats_guard.active_workers =
stats_guard.active_workers.saturating_sub(1);
}
}
None => break, }
}
});
handles.push(handle);
}
handles
}
async fn update_stats(
&self,
stats: &Arc<RwLock<AsyncOptimizationStats>>,
result: &EvaluationResult,
) {
let mut stats_guard = stats.write().await;
stats_guard.total_completed += 1;
let total_time = stats_guard.avg_evaluation_time * (stats_guard.total_completed - 1) as u32
+ result.evaluation_time;
stats_guard.avg_evaluation_time = if stats_guard.total_completed > 0 {
total_time / stats_guard.total_completed as u32
} else {
Duration::ZERO };
if result.evaluation_time < stats_guard.min_evaluation_time {
stats_guard.min_evaluation_time = result.evaluation_time;
}
if result.evaluation_time > stats_guard.max_evaluation_time {
stats_guard.max_evaluation_time = result.evaluation_time;
}
}
async fn should_proceed_with_partial_results(
&self,
_stats: &Arc<RwLock<AsyncOptimizationStats>>,
completed: usize,
) -> bool {
match self.config.slow_evaluation_strategy {
SlowEvaluationStrategy::UsePartial { min_fraction } => {
let fraction = if self.population_size > 0 {
completed as f64 / self.population_size as f64
} else {
0.0 };
fraction >= min_fraction && completed >= self.config.min_evaluations
}
_ => false,
}
}
fn handle_incomplete_generation(&self, fitness_values: &mut [f64], completed: usize) {
let max_completed_fitness = fitness_values[..completed]
.iter()
.filter(|&&f| f.is_finite())
.fold(f64::NEG_INFINITY, |acc, &f| acc.max(f));
let penalty = if max_completed_fitness.is_finite() {
max_completed_fitness * 2.0
} else {
1e6
};
for fitness in fitness_values[completed..].iter_mut() {
*fitness = penalty;
}
}
fn check_convergence(&self, fitness_values: &[f64]) -> bool {
let finite_fitness: Vec<f64> = fitness_values
.iter()
.filter(|&&f| f.is_finite())
.cloned()
.collect();
if finite_fitness.len() < 2 {
return false;
}
let mean = if !finite_fitness.is_empty() {
finite_fitness.iter().sum::<f64>() / finite_fitness.len() as f64
} else {
return false; };
let variance = if !finite_fitness.is_empty() {
finite_fitness
.iter()
.map(|&f| (f - mean).powi(2))
.sum::<f64>()
/ finite_fitness.len() as f64
} else {
0.0 };
let std_dev = if variance >= 0.0 {
variance.sqrt()
} else {
0.0 };
std_dev < self.tolerance
}
fn generate_next_population(
&self,
current_population: &Array2<f64>,
_fitness_values: &[f64],
) -> Array2<f64> {
let mut new_population = Array2::zeros((self.population_size, self.dimensions));
let mut rng = scirs2_core::random::rng();
for i in 0..self.population_size {
let mut indices = Vec::new();
while indices.len() < 3 {
let idx = rng.random_range(0..self.population_size);
if idx != i && !indices.contains(&idx) {
indices.push(idx);
}
}
let mut mutant = Array1::zeros(self.dimensions);
for j in 0..self.dimensions {
mutant[j] = current_population[[indices[0], j]]
+ self.mutation_factor
* (current_population[[indices[1], j]]
- current_population[[indices[2], j]]);
}
if let Some((ref lower, ref upper)) = self.bounds {
for (j, value) in mutant.iter_mut().enumerate() {
*value = value.max(lower[j]).min(upper[j]);
}
}
let mut trial = current_population.row(i).to_owned();
let r = rng.random_range(0..self.dimensions);
for j in 0..self.dimensions {
if j == r || rng.random::<f64>() < self.crossover_probability {
trial[j] = mutant[j];
}
}
new_population.row_mut(i).assign(&trial);
}
new_population
}
}
impl From<mpsc::error::SendError<EvaluationRequest>> for OptimizeError {
fn from(_: mpsc::error::SendError<EvaluationRequest>) -> Self {
OptimizeError::ValueError("Failed to send evaluation request".to_string())
}
}
#[cfg(test)]
mod tests {
use super::*;
use approx::assert_abs_diff_eq;
use std::time::Duration;
use tokio::time::sleep;
#[tokio::test]
async fn test_async_differential_evolution_simple() {
let objective = |x: Array1<f64>| async move {
sleep(Duration::from_millis(1)).await;
x.iter().map(|&xi| xi.powi(2)).sum::<f64>()
};
let bounds_lower = Array1::from_vec(vec![-5.0, -5.0]);
let bounds_upper = Array1::from_vec(vec![5.0, 5.0]);
let config = AsyncOptimizationConfig {
max_workers: 2,
evaluation_timeout: Some(Duration::from_millis(100)),
completion_timeout: Some(Duration::from_millis(1000)),
slow_evaluation_strategy: SlowEvaluationStrategy::UsePartial { min_fraction: 0.6 },
min_evaluations: 3,
};
let optimizer = AsyncDifferentialEvolution::new(2, Some(12), Some(config))
.with_bounds(bounds_lower, bounds_upper)
.expect("Setting bounds should succeed for valid dimensions")
.with_parameters(0.8, 0.7, 10, 1e-3);
let (result, stats) = optimizer
.optimize(objective)
.await
.expect("Optimization should complete successfully");
assert!(result.success);
assert!(result.fun < 20.0); assert!(stats.total_completed > 0);
}
#[tokio::test]
async fn test_async_optimization_with_varying_times() {
use scirs2_core::random::{Rng, RngExt};
let objective = |x: Array1<f64>| async move {
let delay = scirs2_core::random::rng().random_range(10..=100);
sleep(Duration::from_millis(delay)).await;
let a = 1.0 - x[0];
let b = x[1] - x[0].powi(2);
a.powi(2) + 100.0 * b.powi(2)
};
let bounds_lower = Array1::from_vec(vec![-2.0, -2.0]);
let bounds_upper = Array1::from_vec(vec![2.0, 2.0]);
let config = AsyncOptimizationConfig {
max_workers: 2,
evaluation_timeout: Some(Duration::from_millis(200)),
completion_timeout: Some(Duration::from_millis(1000)),
slow_evaluation_strategy: SlowEvaluationStrategy::UsePartial { min_fraction: 0.6 },
min_evaluations: 3,
};
let optimizer = AsyncDifferentialEvolution::new(2, Some(8), Some(config))
.with_bounds(bounds_lower, bounds_upper)
.expect("Setting bounds should succeed for valid dimensions")
.with_parameters(0.8, 0.7, 3, 1e-2);
let (result, stats) = optimizer
.optimize(objective)
.await
.expect("Optimization should complete successfully");
assert!(result.success);
assert!(result.fun < 1000.0); assert!(stats.total_completed > 0);
assert!(stats.avg_evaluation_time > Duration::from_millis(0));
println!("Async DE Results:");
println!(" Final solution: [{:.6}, {:.6}]", result.x[0], result.x[1]);
println!(" Final cost: {:.6}", result.fun);
println!(" Generations: {}", result.nit);
println!(" Total evaluations: {}", stats.total_completed);
println!(" Average eval time: {:?}", stats.avg_evaluation_time);
}
#[tokio::test]
#[ignore = "slow / timing-sensitive"]
async fn test_timeout_handling() {
let objective = |x: Array1<f64>| async move {
if scirs2_core::random::rng().random::<f64>() < 0.5 {
sleep(Duration::from_secs(1)).await; } else {
sleep(Duration::from_millis(10)).await; }
x.iter().map(|&xi| xi.powi(2)).sum::<f64>()
};
let config = AsyncOptimizationConfig {
max_workers: 2,
evaluation_timeout: Some(Duration::from_millis(50)), completion_timeout: Some(Duration::from_millis(200)),
slow_evaluation_strategy: SlowEvaluationStrategy::CancelSlow {
timeout: Duration::from_millis(50),
},
min_evaluations: 2,
};
let bounds_lower = Array1::from_vec(vec![-1.0, -1.0]);
let bounds_upper = Array1::from_vec(vec![1.0, 1.0]);
let optimizer = AsyncDifferentialEvolution::new(2, Some(6), Some(config))
.with_bounds(bounds_lower, bounds_upper)
.expect("Setting bounds should succeed for valid dimensions")
.with_parameters(0.8, 0.7, 2, 1e-1);
let (result, stats) = optimizer
.optimize(objective)
.await
.expect("Optimization should complete successfully");
assert!(result.success);
assert!(stats.total_cancelled > 0); assert!(stats.total_completed > 0); }
}