use rand::prelude::IndexedRandom;
use rand::{Rng, RngCore};
use serde::{Deserialize, Serialize};
#[cfg(feature = "python")]
use pyo3::prelude::*;
use crate::core::{Individual, OError, VariableType, VariableValue};
#[derive(Debug)]
pub struct CrossoverChildren {
pub child1: Individual,
pub child2: Individual,
}
pub trait Crossover {
fn generate_offsprings(
&self,
parent1: &Individual,
parent2: &Individual,
rng: &mut dyn RngCore,
) -> Result<CrossoverChildren, OError>;
}
#[cfg_attr(feature = "python", pyclass(get_all))]
#[derive(Serialize, Deserialize, Clone)]
pub struct SimulatedBinaryCrossoverArgs {
pub distribution_index: f64,
pub crossover_probability: f64,
pub variable_probability: f64,
}
impl Default for SimulatedBinaryCrossoverArgs {
fn default() -> Self {
Self {
distribution_index: 15.0,
crossover_probability: 1.0,
variable_probability: 0.5,
}
}
}
#[cfg(feature = "python")]
#[pymethods]
impl SimulatedBinaryCrossoverArgs {
#[new]
#[pyo3(signature = (distribution_index=None, crossover_probability=None, variable_probability=None))]
fn new(
distribution_index: Option<f64>,
crossover_probability: Option<f64>,
variable_probability: Option<f64>,
) -> Self {
let defaults = SimulatedBinaryCrossoverArgs::default();
SimulatedBinaryCrossoverArgs {
distribution_index: distribution_index.unwrap_or(defaults.distribution_index),
crossover_probability: crossover_probability.unwrap_or(defaults.crossover_probability),
variable_probability: variable_probability.unwrap_or(defaults.variable_probability),
}
}
pub fn __repr__(&self) -> PyResult<String> {
Ok(format!(
"SimulatedBinaryCrossoverArgs(distribution_index={}, crossover_probability={}, variable_probability={})",
self.distribution_index, self.crossover_probability, self.variable_probability
))
}
fn __str__(&self) -> String {
self.__repr__().unwrap()
}
}
pub struct SimulatedBinaryCrossover {
distribution_index: f64,
crossover_probability: f64,
variable_probability: f64,
}
impl SimulatedBinaryCrossover {
pub fn new(args: SimulatedBinaryCrossoverArgs) -> Result<Self, OError> {
if args.distribution_index < 0.0 {
return Err(OError::CrossoverOperator(
"SBX".to_string(),
format!(
"The distribution index {} must be a positive number",
args.distribution_index
),
));
}
if !(0.0..=1.0).contains(&args.crossover_probability) {
return Err(OError::CrossoverOperator(
"SBX".to_string(),
format!(
"The crossover probability {} must be a number between 0 and 1",
args.crossover_probability
),
));
}
if !(0.0..=1.0).contains(&args.variable_probability) {
return Err(OError::CrossoverOperator(
"SBX".to_string(),
format!(
"The variable probability {} must be a number between 0 and 1",
args.variable_probability
),
));
}
Ok(Self {
distribution_index: args.distribution_index,
variable_probability: args.variable_probability,
crossover_probability: args.crossover_probability,
})
}
fn crossover_variables(
&self,
v1: f64,
v2: f64,
y_lower: f64,
y_upper: f64,
rng: &mut dyn RngCore,
) -> Option<(f64, f64)> {
if f64::abs(v1 - v2) < f64::EPSILON {
return None;
}
let (y1, y2) = if v1 < v2 { (v1, v2) } else { (v2, v1) };
let delta_y = y2 - y1;
let prob = rng.random_range(0.0..=1.0);
let beta = 1.0 + (2.0 * (y1 - y_lower) / delta_y);
let alpha = 2.0 - f64::powf(beta, -(self.distribution_index + 1.0));
let mut new_v1 = 0.5 * ((y1 + y2) - self.betaq(prob, alpha) * delta_y);
new_v1 = f64::min(f64::max(new_v1, y_lower), y_upper);
let beta = 1.0 + (2.0 * (y_upper - y2) / delta_y);
let alpha = 2.0 - f64::powf(beta, -(self.distribution_index + 1.0));
let mut new_v2 = 0.5 * ((y1 + y2) + self.betaq(prob, alpha) * delta_y);
new_v2 = f64::min(f64::max(new_v2, y_lower), y_upper);
if matches!([0, 1].choose(rng).unwrap(), 0) {
(new_v1, new_v2) = (new_v2, new_v1);
}
Some((new_v1, new_v2))
}
fn betaq(&self, prob: f64, alpha: f64) -> f64 {
if prob <= (1.0 / alpha) {
f64::powf(prob * alpha, 1.0 / (self.distribution_index + 1.0))
} else {
f64::powf(
1.0 / (2.0 - prob * alpha),
1.0 / (self.distribution_index + 1.0),
)
}
}
}
impl Crossover for SimulatedBinaryCrossover {
fn generate_offsprings(
&self,
parent1: &Individual,
parent2: &Individual,
rng: &mut dyn RngCore,
) -> Result<CrossoverChildren, OError> {
let mut child1 = parent1.clone_variables();
let mut child2 = parent2.clone_variables();
let problem = parent1.problem();
if !problem
.variables()
.iter()
.all(|(_, v)| v.is_real() | v.is_integer())
{
return Err(OError::CrossoverOperator(
"SBX".to_string(),
"The SBX operator only works with real or integer variables".to_string(),
));
}
if rng.random_range(0.0..=1.0) <= self.crossover_probability {
for (var_name, var_type) in problem.variables() {
if rng.random_range(0.0..=1.0) > self.variable_probability {
continue;
}
let v1 = parent1.get_variable_value(&var_name)?;
let v2 = parent2.get_variable_value(&var_name)?;
if let (VariableValue::Real(v1), VariableValue::Real(v2), VariableType::Real(vt)) =
(v1, v2, &var_type)
{
let (y_lower, y_upper) = vt.bounds();
match self.crossover_variables(*v1, *v2, y_lower, y_upper, rng) {
None => continue,
Some((new_v1, new_v2)) => {
child1.update_variable(&var_name, VariableValue::Real(new_v1))?;
child2.update_variable(&var_name, VariableValue::Real(new_v2))?;
}
};
} else if let (
VariableValue::Integer(v1),
VariableValue::Integer(v2),
VariableType::Integer(vt),
) = (v1, v2, var_type)
{
let (y_lower, y_upper) = vt.bounds();
match self.crossover_variables(
*v1 as f64,
*v2 as f64,
y_lower as f64,
y_upper as f64,
rng,
) {
None => continue,
Some((new_v1, new_v2)) => {
let mut new_v1 = new_v1.trunc() as i64;
if rng.random_range(0.0..=1.0) < 0.5 {
new_v1 += 1;
new_v1 = new_v1.min(y_upper);
}
let mut new_v2 = new_v2.trunc() as i64;
if rng.random_range(0.0..=1.0) < 0.5 {
new_v2 += 1;
new_v2 = new_v2.min(y_upper);
}
child1.update_variable(&var_name, VariableValue::Integer(new_v1))?;
child2.update_variable(&var_name, VariableValue::Integer(new_v2))?;
}
};
}
}
}
Ok(CrossoverChildren { child1, child2 })
}
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use crate::core::utils::{dummy_evaluator, get_rng};
use crate::core::{
BoundedNumber, Individual, Objective, ObjectiveDirection, Problem, VariableType,
VariableValue,
};
use crate::operators::{Crossover, SimulatedBinaryCrossover, SimulatedBinaryCrossoverArgs};
#[test]
fn test_new_sbx_panic() {
assert!(SimulatedBinaryCrossover::new(SimulatedBinaryCrossoverArgs {
distribution_index: -2.0,
crossover_probability: 1.0,
variable_probability: 0.5,
})
.is_err());
assert!(SimulatedBinaryCrossover::new(SimulatedBinaryCrossoverArgs {
distribution_index: 1.0,
crossover_probability: 2.0,
variable_probability: 0.5,
})
.is_err());
assert!(SimulatedBinaryCrossover::new(SimulatedBinaryCrossoverArgs {
distribution_index: 1.0,
crossover_probability: 1.0,
variable_probability: -0.5,
})
.is_err());
}
#[test]
fn test_sbx_crossover() {
let objectives = vec![Objective::new("obj1", ObjectiveDirection::Minimise)];
let variables = vec![
VariableType::Real(BoundedNumber::new("var1", 0.0, 1000.0).unwrap()),
VariableType::Integer(BoundedNumber::new("var2", -10, 20).unwrap()),
];
let problem =
Arc::new(Problem::new(objectives, variables, None, dummy_evaluator()).unwrap());
let mut a = Individual::new(problem.clone());
a.update_variable("var1", VariableValue::Real(0.2)).unwrap();
a.update_variable("var2", VariableValue::Integer(0))
.unwrap();
let mut b = Individual::new(problem.clone());
b.update_variable("var1", VariableValue::Real(0.8)).unwrap();
b.update_variable("var2", VariableValue::Integer(3))
.unwrap();
let parameters = SimulatedBinaryCrossoverArgs {
distribution_index: 1.0,
crossover_probability: 1.0,
variable_probability: 1.0,
};
let sbx = SimulatedBinaryCrossover::new(parameters).unwrap();
let mut rng = get_rng(Some(20000));
let out = sbx.generate_offsprings(&a, &b, &mut rng).unwrap();
assert_ne!(
*out.child1.get_variable_value("var1").unwrap(),
VariableValue::Real(0.2)
);
assert_ne!(
*out.child1.get_variable_value("var2").unwrap(),
VariableValue::Integer(0)
);
assert_ne!(
*out.child2.get_variable_value("var1").unwrap(),
VariableValue::Real(0.8)
);
assert_ne!(
*out.child2.get_variable_value("var2").unwrap(),
VariableValue::Integer(3)
);
}
}