1use crate::error::OptimizeError;
8use crate::unconstrained::OptimizeResult;
9use scirs2_core::ndarray::{Array1, Array2};
10use scirs2_core::random::Rng;
11use std::cmp::min;
12use std::future::Future;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15use tokio::sync::{mpsc, Mutex, RwLock};
16use tokio::task::JoinHandle;
17
18#[derive(Debug, Clone)]
20pub struct AsyncOptimizationConfig {
21 pub max_workers: usize,
23 pub evaluation_timeout: Option<Duration>,
25 pub completion_timeout: Option<Duration>,
27 pub slow_evaluation_strategy: SlowEvaluationStrategy,
29 pub min_evaluations: usize,
31}
32
33#[derive(Debug, Clone)]
35pub enum SlowEvaluationStrategy {
36 WaitAll,
38 CancelSlow { timeout: Duration },
40 UsePartial { min_fraction: f64 },
42}
43
44#[derive(Debug, Clone)]
46pub struct EvaluationRequest {
47 pub id: usize,
48 pub point: Array1<f64>,
49 pub generation: usize,
50 pub submitted_at: Instant,
51}
52
53#[derive(Debug, Clone)]
55pub struct EvaluationResult {
56 pub id: usize,
57 pub point: Array1<f64>,
58 pub value: f64,
59 pub generation: usize,
60 pub evaluation_time: Duration,
61 pub completed_at: Instant,
62}
63
64#[derive(Debug, Clone)]
66pub struct AsyncOptimizationStats {
67 pub total_submitted: usize,
69 pub total_completed: usize,
71 pub total_cancelled: usize,
73 pub avg_evaluation_time: Duration,
75 pub min_evaluation_time: Duration,
77 pub max_evaluation_time: Duration,
79 pub active_workers: usize,
81 pub total_time: Duration,
83}
84
85pub struct AsyncDifferentialEvolution {
87 config: AsyncOptimizationConfig,
88 population_size: usize,
89 dimensions: usize,
90 bounds: Option<(Array1<f64>, Array1<f64>)>,
91 mutation_factor: f64,
92 crossover_probability: f64,
93 max_generations: usize,
94 tolerance: f64,
95}
96
97impl Default for AsyncOptimizationConfig {
98 fn default() -> Self {
99 let max_workers = std::thread::available_parallelism()
101 .map(|p| p.get())
102 .unwrap_or(4);
103
104 Self {
105 max_workers,
106 evaluation_timeout: Some(Duration::from_secs(300)), completion_timeout: Some(Duration::from_secs(60)), slow_evaluation_strategy: SlowEvaluationStrategy::UsePartial { min_fraction: 0.8 },
109 min_evaluations: 10,
110 }
111 }
112}
113
114impl AsyncDifferentialEvolution {
115 pub fn new(
117 dimensions: usize,
118 population_size: Option<usize>,
119 config: Option<AsyncOptimizationConfig>,
120 ) -> Self {
121 let pop_size = population_size.unwrap_or(std::cmp::max(4, dimensions * 10));
122
123 Self {
124 config: config.unwrap_or_default(),
125 population_size: pop_size,
126 dimensions,
127 bounds: None,
128 mutation_factor: 0.8,
129 crossover_probability: 0.7,
130 max_generations: 1000,
131 tolerance: 1e-6,
132 }
133 }
134
135 pub fn with_bounds(
137 mut self,
138 lower: Array1<f64>,
139 upper: Array1<f64>,
140 ) -> Result<Self, OptimizeError> {
141 if lower.len() != self.dimensions || upper.len() != self.dimensions {
142 return Err(OptimizeError::ValueError(
143 "Bounds dimensions must match problem dimensions".to_string(),
144 ));
145 }
146
147 for (&l, &u) in lower.iter().zip(upper.iter()) {
148 if l >= u {
149 return Err(OptimizeError::ValueError(
150 "Lower bounds must be less than upper bounds".to_string(),
151 ));
152 }
153 }
154
155 self.bounds = Some((lower, upper));
156 Ok(self)
157 }
158
159 pub fn with_parameters(
161 mut self,
162 mutation_factor: f64,
163 crossover_probability: f64,
164 max_generations: usize,
165 tolerance: f64,
166 ) -> Self {
167 self.mutation_factor = mutation_factor;
168 self.crossover_probability = crossover_probability;
169 self.max_generations = max_generations;
170 self.tolerance = tolerance;
171 self
172 }
173
174 pub async fn optimize<F, Fut>(
176 &self,
177 objective_fn: F,
178 ) -> Result<(OptimizeResult<f64>, AsyncOptimizationStats), OptimizeError>
179 where
180 F: Fn(Array1<f64>) -> Fut + Send + Sync + Clone + 'static,
181 Fut: Future<Output = f64> + Send + 'static,
182 {
183 let start_time = Instant::now();
184
185 let mut population = self.initialize_population();
187 let mut fitness_values = vec![f64::INFINITY; self.population_size];
188
189 let (request_tx, request_rx) = mpsc::unbounded_channel::<EvaluationRequest>();
191 let (result_tx, mut result_rx) = mpsc::unbounded_channel::<EvaluationResult>();
192
193 let stats = Arc::new(RwLock::new(AsyncOptimizationStats {
195 total_submitted: 0,
196 total_completed: 0,
197 total_cancelled: 0,
198 avg_evaluation_time: Duration::from_millis(0),
199 min_evaluation_time: Duration::from_secs(u64::MAX),
200 max_evaluation_time: Duration::from_millis(0),
201 active_workers: 0,
202 total_time: Duration::from_millis(0),
203 }));
204
205 let worker_handles = self
207 .spawn_workers(objective_fn, request_rx, result_tx.clone(), stats.clone())
208 .await;
209
210 let mut pending_evaluations = std::collections::HashMap::new();
212
213 let mut request_id = 0;
215 for (i, individual) in population.outer_iter().enumerate() {
216 let request = EvaluationRequest {
217 id: request_id,
218 point: individual.to_owned(),
219 generation: 0,
220 submitted_at: Instant::now(),
221 };
222
223 pending_evaluations.insert(request_id, i);
224 request_tx.send(request)?;
225 request_id += 1;
226 }
227 let mut best_individual = Array1::zeros(self.dimensions);
228 let mut best_fitness = f64::INFINITY;
229 let mut generation = 0;
230 let mut completed_in_generation = 0;
231
232 while generation < self.max_generations {
234 let timeout_duration = self
236 .config
237 .completion_timeout
238 .unwrap_or(Duration::from_secs(60));
239
240 match tokio::time::timeout(timeout_duration, result_rx.recv()).await {
241 Ok(Some(result)) => {
242 if result.generation == generation
244 && pending_evaluations.contains_key(&result.id)
245 {
246 if let Some(individual_index) = pending_evaluations.remove(&result.id) {
247 fitness_values[individual_index] = result.value;
248 completed_in_generation += 1;
249
250 if result.value < best_fitness {
252 best_fitness = result.value;
253 best_individual = result.point.clone();
254 }
255
256 self.update_stats(&stats, &result).await;
258 }
259 }
260
261 if completed_in_generation >= self.population_size
263 || self
264 .should_proceed_with_partial_results(&stats, completed_in_generation)
265 .await
266 {
267 if completed_in_generation < self.population_size {
269 self.handle_incomplete_generation(
270 &mut fitness_values,
271 completed_in_generation,
272 );
273 }
274
275 if self.check_convergence(&fitness_values) {
277 break;
278 }
279
280 generation += 1;
282 completed_in_generation = 0;
283
284 let new_population =
285 self.generate_next_population(&population, &fitness_values);
286 population = new_population;
287
288 for (i, individual) in population.outer_iter().enumerate() {
290 let request = EvaluationRequest {
291 id: request_id,
292 point: individual.to_owned(),
293 generation,
294 submitted_at: Instant::now(),
295 };
296
297 pending_evaluations.insert(request_id, i);
298 request_tx.send(request)?;
299 request_id += 1;
300 }
301
302 fitness_values.fill(f64::INFINITY);
304 }
305 }
306 Ok(None) => {
307 break;
309 }
310 Err(_) => {
311 match self.config.slow_evaluation_strategy {
313 SlowEvaluationStrategy::WaitAll => continue,
314 SlowEvaluationStrategy::CancelSlow { .. }
315 | SlowEvaluationStrategy::UsePartial { .. } => {
316 if completed_in_generation >= self.config.min_evaluations {
317 self.handle_incomplete_generation(
318 &mut fitness_values,
319 completed_in_generation,
320 );
321 break;
322 }
323 }
324 }
325 }
326 }
327 }
328
329 drop(request_tx);
331 for handle in worker_handles {
332 let _ = handle.await;
333 }
334
335 let final_stats = {
337 let mut stats_guard = stats.write().await;
338 stats_guard.total_time = start_time.elapsed();
339 stats_guard.clone()
340 };
341
342 let result = OptimizeResult {
343 x: best_individual,
344 fun: best_fitness,
345 nit: generation,
346 func_evals: final_stats.total_completed,
347 nfev: final_stats.total_completed,
348 jacobian: None,
349 hessian: None,
350 success: best_fitness.is_finite(),
351 message: format!(
352 "Async differential evolution completed after {} generations",
353 generation
354 ),
355 };
356
357 Ok((result, final_stats))
358 }
359
360 fn initialize_population(&self) -> Array2<f64> {
362 let mut population = Array2::zeros((self.population_size, self.dimensions));
363 let mut rng = scirs2_core::random::rng();
364
365 if let Some((ref lower, ref upper)) = self.bounds {
366 for mut individual in population.outer_iter_mut() {
367 for (j, gene) in individual.iter_mut().enumerate() {
368 *gene = lower[j] + rng.random::<f64>() * (upper[j] - lower[j]);
369 }
370 }
371 } else {
372 for mut individual in population.outer_iter_mut() {
373 for gene in individual.iter_mut() {
374 *gene = rng.random::<f64>() * 2.0 - 1.0; }
376 }
377 }
378
379 population
380 }
381
382 async fn spawn_workers<F, Fut>(
384 &self,
385 objective_fn: F,
386 request_rx: mpsc::UnboundedReceiver<EvaluationRequest>,
387 result_tx: mpsc::UnboundedSender<EvaluationResult>,
388 stats: Arc<RwLock<AsyncOptimizationStats>>,
389 ) -> Vec<JoinHandle<()>>
390 where
391 F: Fn(Array1<f64>) -> Fut + Send + Sync + Clone + 'static,
392 Fut: Future<Output = f64> + Send + 'static,
393 {
394 let request_rx = Arc::new(Mutex::new(request_rx));
395 let mut handles = Vec::new();
396
397 for _worker_id in 0..self.config.max_workers {
398 let objective_fn = objective_fn.clone();
399 let request_rx = request_rx.clone();
400 let result_tx = result_tx.clone();
401 let stats = stats.clone();
402 let config = self.config.clone();
403
404 let handle = tokio::spawn(async move {
405 loop {
406 let request = {
408 let mut rx = request_rx.lock().await;
409 rx.recv().await
410 };
411
412 match request {
413 Some(req) => {
414 {
416 let mut stats_guard = stats.write().await;
417 stats_guard.active_workers += 1;
418 stats_guard.total_submitted += 1;
419 }
420
421 let start_time = Instant::now();
422
423 let evaluation_result = if let Some(timeout) = config.evaluation_timeout
425 {
426 tokio::time::timeout(timeout, objective_fn(req.point.clone())).await
427 } else {
428 Ok(objective_fn(req.point.clone()).await)
429 };
430
431 let evaluation_time = start_time.elapsed();
432
433 match evaluation_result {
434 Ok(value) => {
435 let result = EvaluationResult {
436 id: req.id,
437 point: req.point,
438 value,
439 generation: req.generation,
440 evaluation_time,
441 completed_at: Instant::now(),
442 };
443
444 if result_tx.send(result).is_err() {
445 break; }
447 }
448 Err(_) => {
449 let mut stats_guard = stats.write().await;
451 stats_guard.total_cancelled += 1;
452 }
453 }
454
455 {
457 let mut stats_guard = stats.write().await;
458 stats_guard.active_workers =
459 stats_guard.active_workers.saturating_sub(1);
460 }
461 }
462 None => break, }
464 }
465 });
466
467 handles.push(handle);
468 }
469
470 handles
471 }
472
473 async fn update_stats(
475 &self,
476 stats: &Arc<RwLock<AsyncOptimizationStats>>,
477 result: &EvaluationResult,
478 ) {
479 let mut stats_guard = stats.write().await;
480
481 stats_guard.total_completed += 1;
482
483 let total_time = stats_guard.avg_evaluation_time * (stats_guard.total_completed - 1) as u32
484 + result.evaluation_time;
485 stats_guard.avg_evaluation_time = if stats_guard.total_completed > 0 {
486 total_time / stats_guard.total_completed as u32
487 } else {
488 Duration::ZERO };
490
491 if result.evaluation_time < stats_guard.min_evaluation_time {
492 stats_guard.min_evaluation_time = result.evaluation_time;
493 }
494
495 if result.evaluation_time > stats_guard.max_evaluation_time {
496 stats_guard.max_evaluation_time = result.evaluation_time;
497 }
498 }
499
500 async fn should_proceed_with_partial_results(
502 &self,
503 _stats: &Arc<RwLock<AsyncOptimizationStats>>,
504 completed: usize,
505 ) -> bool {
506 match self.config.slow_evaluation_strategy {
507 SlowEvaluationStrategy::UsePartial { min_fraction } => {
508 let fraction = if self.population_size > 0 {
509 completed as f64 / self.population_size as f64
510 } else {
511 0.0 };
513 fraction >= min_fraction && completed >= self.config.min_evaluations
514 }
515 _ => false,
516 }
517 }
518
519 fn handle_incomplete_generation(&self, fitness_values: &mut [f64], completed: usize) {
521 let max_completed_fitness = fitness_values[..completed]
523 .iter()
524 .filter(|&&f| f.is_finite())
525 .fold(f64::NEG_INFINITY, |acc, &f| acc.max(f));
526
527 let penalty = if max_completed_fitness.is_finite() {
528 max_completed_fitness * 2.0
529 } else {
530 1e6
531 };
532
533 for fitness in fitness_values[completed..].iter_mut() {
534 *fitness = penalty;
535 }
536 }
537
538 fn check_convergence(&self, fitness_values: &[f64]) -> bool {
540 let finite_fitness: Vec<f64> = fitness_values
541 .iter()
542 .filter(|&&f| f.is_finite())
543 .cloned()
544 .collect();
545
546 if finite_fitness.len() < 2 {
547 return false;
548 }
549
550 let mean = if !finite_fitness.is_empty() {
551 finite_fitness.iter().sum::<f64>() / finite_fitness.len() as f64
552 } else {
553 return false; };
555 let variance = if !finite_fitness.is_empty() {
556 finite_fitness
557 .iter()
558 .map(|&f| (f - mean).powi(2))
559 .sum::<f64>()
560 / finite_fitness.len() as f64
561 } else {
562 0.0 };
564
565 let std_dev = if variance >= 0.0 {
567 variance.sqrt()
568 } else {
569 0.0 };
571 std_dev < self.tolerance
572 }
573
574 fn generate_next_population(
576 &self,
577 current_population: &Array2<f64>,
578 _fitness_values: &[f64],
579 ) -> Array2<f64> {
580 let mut new_population = Array2::zeros((self.population_size, self.dimensions));
581 let mut rng = scirs2_core::random::rng();
582
583 for i in 0..self.population_size {
584 let mut indices = Vec::new();
586 while indices.len() < 3 {
587 let idx = rng.random_range(0..self.population_size);
588 if idx != i && !indices.contains(&idx) {
589 indices.push(idx);
590 }
591 }
592
593 let mut mutant = Array1::zeros(self.dimensions);
595 for j in 0..self.dimensions {
596 mutant[j] = current_population[[indices[0], j]]
597 + self.mutation_factor
598 * (current_population[[indices[1], j]]
599 - current_population[[indices[2], j]]);
600 }
601
602 if let Some((ref lower, ref upper)) = self.bounds {
604 for (j, value) in mutant.iter_mut().enumerate() {
605 *value = value.max(lower[j]).min(upper[j]);
606 }
607 }
608
609 let mut trial = current_population.row(i).to_owned();
611 let r = rng.random_range(0..self.dimensions);
612
613 for j in 0..self.dimensions {
614 if j == r || rng.random::<f64>() < self.crossover_probability {
615 trial[j] = mutant[j];
616 }
617 }
618
619 new_population.row_mut(i).assign(&trial);
620 }
621
622 new_population
623 }
624}
625
626impl From<mpsc::error::SendError<EvaluationRequest>> for OptimizeError {
627 fn from(_: mpsc::error::SendError<EvaluationRequest>) -> Self {
628 OptimizeError::ValueError("Failed to send evaluation request".to_string())
629 }
630}
631
632#[cfg(test)]
633mod tests {
634 use super::*;
635 use approx::assert_abs_diff_eq;
636 use std::time::Duration;
637 use tokio::time::sleep;
638
639 #[tokio::test]
640 async fn test_async_differential_evolution_simple() {
641 let objective = |x: Array1<f64>| async move {
643 sleep(Duration::from_millis(1)).await;
645 x.iter().map(|&xi| xi.powi(2)).sum::<f64>()
646 };
647
648 let bounds_lower = Array1::from_vec(vec![-5.0, -5.0]);
649 let bounds_upper = Array1::from_vec(vec![5.0, 5.0]);
650
651 let config = AsyncOptimizationConfig {
652 max_workers: 2,
653 evaluation_timeout: Some(Duration::from_millis(100)),
654 completion_timeout: Some(Duration::from_millis(1000)),
655 slow_evaluation_strategy: SlowEvaluationStrategy::UsePartial { min_fraction: 0.6 },
656 min_evaluations: 3,
657 };
658
659 let optimizer = AsyncDifferentialEvolution::new(2, Some(12), Some(config))
660 .with_bounds(bounds_lower, bounds_upper)
661 .expect("Setting bounds should succeed for valid dimensions")
662 .with_parameters(0.8, 0.7, 10, 1e-3);
663
664 let (result, stats) = optimizer
665 .optimize(objective)
666 .await
667 .expect("Optimization should complete successfully");
668
669 assert!(result.success);
670 assert!(result.fun < 20.0); assert!(stats.total_completed > 0);
673 }
674
675 #[tokio::test]
676 async fn test_async_optimization_with_varying_times() {
677 use scirs2_core::random::Rng;
678
679 let objective = |x: Array1<f64>| async move {
681 let delay = scirs2_core::random::rng().random_range(10..=100);
683 sleep(Duration::from_millis(delay)).await;
684
685 let a = 1.0 - x[0];
687 let b = x[1] - x[0].powi(2);
688 a.powi(2) + 100.0 * b.powi(2)
689 };
690
691 let bounds_lower = Array1::from_vec(vec![-2.0, -2.0]);
692 let bounds_upper = Array1::from_vec(vec![2.0, 2.0]);
693
694 let config = AsyncOptimizationConfig {
695 max_workers: 2,
696 evaluation_timeout: Some(Duration::from_millis(200)),
697 completion_timeout: Some(Duration::from_millis(1000)),
698 slow_evaluation_strategy: SlowEvaluationStrategy::UsePartial { min_fraction: 0.6 },
699 min_evaluations: 3,
700 };
701
702 let optimizer = AsyncDifferentialEvolution::new(2, Some(8), Some(config))
703 .with_bounds(bounds_lower, bounds_upper)
704 .expect("Setting bounds should succeed for valid dimensions")
705 .with_parameters(0.8, 0.7, 3, 1e-2);
706
707 let (result, stats) = optimizer
708 .optimize(objective)
709 .await
710 .expect("Optimization should complete successfully");
711
712 assert!(result.success);
713 assert!(result.fun < 1000.0); assert!(stats.total_completed > 0);
715 assert!(stats.avg_evaluation_time > Duration::from_millis(0));
716
717 println!("Async DE Results:");
718 println!(" Final solution: [{:.6}, {:.6}]", result.x[0], result.x[1]);
719 println!(" Final cost: {:.6}", result.fun);
720 println!(" Generations: {}", result.nit);
721 println!(" Total evaluations: {}", stats.total_completed);
722 println!(" Average eval time: {:?}", stats.avg_evaluation_time);
723 }
724
725 #[tokio::test]
726 #[ignore = "slow / timing-sensitive"]
727 async fn test_timeout_handling() {
728 let objective = |x: Array1<f64>| async move {
730 if scirs2_core::random::rng().random::<f64>() < 0.5 {
732 sleep(Duration::from_secs(1)).await; } else {
734 sleep(Duration::from_millis(10)).await; }
736 x.iter().map(|&xi| xi.powi(2)).sum::<f64>()
737 };
738
739 let config = AsyncOptimizationConfig {
740 max_workers: 2,
741 evaluation_timeout: Some(Duration::from_millis(50)), completion_timeout: Some(Duration::from_millis(200)),
743 slow_evaluation_strategy: SlowEvaluationStrategy::CancelSlow {
744 timeout: Duration::from_millis(50),
745 },
746 min_evaluations: 2,
747 };
748
749 let bounds_lower = Array1::from_vec(vec![-1.0, -1.0]);
750 let bounds_upper = Array1::from_vec(vec![1.0, 1.0]);
751
752 let optimizer = AsyncDifferentialEvolution::new(2, Some(6), Some(config))
753 .with_bounds(bounds_lower, bounds_upper)
754 .expect("Setting bounds should succeed for valid dimensions")
755 .with_parameters(0.8, 0.7, 2, 1e-1);
756
757 let (result, stats) = optimizer
758 .optimize(objective)
759 .await
760 .expect("Optimization should complete successfully");
761
762 assert!(result.success);
764 assert!(stats.total_cancelled > 0); assert!(stats.total_completed > 0); }
767}