#![allow(clippy::module_name_repetitions, clippy::similar_names, non_snake_case)]
//! Implementation of [star sets](https://link.springer.com/chapter/10.1007/978-3-030-30942-8_39)
//! for representing affine transformed sets
use crate::affine::{Affine, Affine2, Affine4};
use crate::bounds::Bounds1;
use crate::gaussian::GaussianDistribution;
use crate::lp::solve;
use crate::lp::LinearSolution;
use crate::polytope::Polytope;
use crate::tensorshape::TensorShape;
use crate::NNVFloat;
use ndarray::array;
use ndarray::ArrayView1;
use ndarray::Dimension;
use ndarray::Ix4;
use ndarray::{Array1, Array2};
use ndarray::{Array4, ArrayView2};
use ndarray::{Axis, Ix2};
use num::Float;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
pub type Star2 = Star<Ix2>;
pub type Star4 = Star<Ix4>;
/// Representation of a set acted on by a deep neural network (DNN)
///
/// Star sets are defined by a 1) constraint coefficient matrix, 2) upper
/// bound vector, 3) basis matrix, 4) center vector. (1) and (2) define a
/// polyhedron and (3) and (4) define an affine transformation of that
/// polyhedron.
///
/// Each Star set represents two sets implicitly: an input set and a
/// representation set. The input set is defined in the input space of the deep
/// neural network of interest. It's a polyhedron defined by the Star's
/// constraints (coefficient matrix and upper bound vector). The representation
/// set is defined in a latent or output space of the DNN. It is calculated by
/// applying the affine transformation defined by the Star's basis and center
/// to the input set polyhedron.
///
/// Based on: Tran, Hoang-Dung, et al. "Star-based reachability analysis of
/// deep neural networks." International Symposium on Formal Methods. Springer,
/// Cham, 2019.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Star<D: Dimension> {
/// `representation` is the concatenation of [basis center] (where
/// center is a column vector) and captures information about the
/// transformed set
representation: Affine<D>,
/// `constraints` is the concatenation of [coeffs upper_bounds]
/// and is a representation of the input polyhedron
constraints: Option<Polytope>,
}
impl<D: Dimension> Star<D> {
pub fn ndim(&self) -> usize {
self.representation.ndim()
}
pub fn input_space_polytope(&self) -> Option<&Polytope> {
self.constraints.as_ref()
}
pub fn center(&self) -> ArrayView1<NNVFloat> {
self.representation.shift()
}
pub fn get_representation(&self) -> &Affine<D> {
&self.representation
}
}
impl<D: Dimension> Star<D> {
pub fn num_constraints(&self) -> usize {
match &self.constraints {
Some(polytope) => polytope.num_constraints(),
None => 0,
}
}
/// Add constraints to restrict the input set. Each row represents a
/// constraint and the last column represents the upper bounds.
#[must_use]
pub fn add_constraint(mut self, coeffs: ArrayView1<NNVFloat>, rhs: NNVFloat) -> Self {
if let Some(ref mut constrs) = self.constraints {
constrs.add_eqn(coeffs, rhs);
} else {
self.constraints =
Polytope::nonempty_new(&coeffs.to_owned().insert_axis(Axis(0)), &array![rhs]);
}
self
}
#[must_use]
/// # Panics
pub fn remove_constraint(mut self, idx: usize) -> Self {
if let Some(ref mut constrs) = self.constraints {
constrs.remove_eqn(idx);
} else {
panic!();
}
self
}
}
impl Star2 {
pub fn get_constraint_coeffs(&self) -> Option<Array2<NNVFloat>> {
self.constraints.as_ref().map(|x| x.coeffs().to_owned())
}
#[must_use]
pub fn get_safe_subset(&self, safe_value: NNVFloat) -> Self {
let mut subset = self.clone();
let mut new_constr: Polytope = self.representation.clone().into();
let mut rhs = new_constr.rhs_mut();
rhs *= -1.;
rhs += safe_value;
subset.intersect_input(&new_constr);
subset
}
pub fn intersect_input(&mut self, other: &Polytope) {
self.constraints = self
.constraints
.as_ref()
.map_or(Some(other.clone()), |x| Some(x.intersect(other)));
}
pub fn get_input_trunc_gaussian(
&self,
mu: ArrayView1<NNVFloat>,
sigma: ArrayView2<NNVFloat>,
max_accept_reject_iters: usize,
stability_eps: NNVFloat,
bounds_opt: &Option<Bounds1>,
) -> Option<GaussianDistribution> {
self.constraints
.as_ref()
.and_then(|x| x.reduce_fixed_inputs(bounds_opt))
.map(|poly| {
poly.get_truncnorm_distribution(mu, sigma, max_accept_reject_iters, stability_eps)
})
}
/// Create a new Star with given dimension.
///
/// By default this Star covers the space because it has no constraints. To add constraints call `.add_constraints`.
///
/// # Panics
pub fn default(input_shape: &TensorShape) -> Self {
debug_assert_eq!(input_shape.rank(), 1);
debug_assert!(input_shape.is_fully_defined());
let dim = input_shape[0].unwrap();
Self {
representation: Affine2::new(Array2::eye(dim), Array1::zeros(dim)),
constraints: None,
}
}
/// Create a new Star with given basis vector and center.
///
/// By default this Star covers the space because it has no constraints. To add constraints call `.add_constraints`.
pub fn new(basis: Array2<NNVFloat>, center: Array1<NNVFloat>) -> Self {
Self {
representation: Affine2::new(basis, center),
constraints: None,
}
}
/// # Panics
#[must_use]
pub fn with_constraints(mut self, constraints: Polytope) -> Self {
debug_assert!(self.constraints.is_none(), "explicit panic");
self.constraints = Some(constraints);
self
}
/// Get the dimension of the input space
pub fn input_space_dim(&self) -> usize {
self.representation.input_dim()
}
/// Get the dimension of the representation space
pub fn representation_space_dim(&self) -> usize {
self.representation.output_dim()
}
}
impl Star2 {
/// Apply an affine transformation to the representation
#[must_use]
pub fn affine_map2(&self, affine: &Affine<Ix2>) -> Self {
Self {
representation: affine * &self.representation,
constraints: self.constraints.clone(),
}
}
pub fn step_relu2(
&self,
index: usize,
input_bounds_opt: &Option<Bounds1>,
) -> (Option<Self>, Option<Self>) {
let (coeffs, shift) = {
let aff = self.representation.get_eqn(index);
let neg_basis_part: Array2<NNVFloat> = &aff.basis() * -1.;
let shift = aff.shift();
(neg_basis_part.row(0).to_owned(), shift[[0]])
};
let upper_star = self.clone().add_constraint(coeffs.view(), shift);
let mut lower_star = self.clone().add_constraint((&coeffs * -1.).view(), -shift);
lower_star.representation.zero_eqn(index);
let lower_star_opt = if lower_star.is_empty(input_bounds_opt) {
None
} else {
Some(lower_star)
};
let upper_star_opt = if upper_star.is_empty(input_bounds_opt) {
None
} else {
Some(upper_star)
};
(lower_star_opt, upper_star_opt)
}
pub fn step_relu2_dropout(
&self,
index: usize,
input_bounds_opt: &Option<Bounds1>,
) -> (Option<Self>, Option<Self>, Option<Self>) {
let mut dropout_star = self.clone();
dropout_star.representation.zero_eqn(index);
let stars = self.step_relu2(index, input_bounds_opt);
let dropout_star_opt = if dropout_star.is_empty(input_bounds_opt) {
None
} else {
Some(dropout_star)
};
(dropout_star_opt, stars.0, stars.1)
}
/// Calculates the minimum value of the equation at index `idx`
/// given the constraints
///
/// This method assumes that the constraints bound each dimension,
/// both lower and upper.
///
/// # Panics
/// TODO: Change output type to Option<T>
///
/// TODO: `ResolutionError::Unbounded` can result whether or not the
/// constraints are infeasible if there are zeros in the
/// objective. This needs to be checked either here or in the
/// solve function. Currently this is way too hard to do, so we
/// panic instead. We have an assumption that we start with a
/// bounded box and therefore should never be unbounded.
pub fn get_output_min(&self, idx: usize, input_bounds: &Bounds1) -> NNVFloat {
let eqn = self.representation.get_eqn(idx);
let shift = eqn.shift()[0];
self.constraints.as_ref().map_or_else(
|| {
crate::util::signed_dot(&eqn.basis(), &input_bounds.lower(), &input_bounds.upper())
[[0]]
+ shift
},
|poly| {
let solved = solve(
poly.coeffs().rows(),
poly.rhs(),
eqn.basis().index_axis(Axis(0), 0),
Some(input_bounds),
);
if let LinearSolution::Solution(_, val) = solved {
shift + val
} else if let LinearSolution::Unbounded(_) = solved {
NNVFloat::neg_infinity()
} else {
panic!("Solution: {:?} on star {:?}", solved, self)
}
},
)
}
/// Calculates the maximum value of the equation at index `idx`
/// given the constraints
///
/// This method assumes that the constraints bound each dimension,
/// both lower and upper.
///
/// # Panics
/// TODO: Change output type to Option<T>
pub fn get_output_max(&self, idx: usize, input_bounds: &Bounds1) -> NNVFloat {
let eqn = self.representation.get_eqn(idx);
let shift = eqn.shift()[0];
self.constraints.as_ref().map_or_else(
|| {
crate::util::signed_dot(&eqn.basis(), &input_bounds.upper(), &input_bounds.lower())
[[0]]
+ shift
},
|poly| {
let solved = solve(
poly.coeffs().rows(),
poly.rhs(),
eqn.basis().index_axis(Axis(0), 0).mapv(|x| x * -1.).view(),
Some(input_bounds),
);
if let LinearSolution::Solution(_, val) = solved {
shift - val
} else if let LinearSolution::Unbounded(_) = solved {
NNVFloat::infinity()
} else {
panic!()
}
},
)
}
//pub fn nearest_euclidean_neighbor(&self, origin: Array1<NNVFloat>) -> Array1<NNVFloat> {}
/// # Panics
pub fn can_maximize_output_idx(&self, class_idx: usize) -> bool {
let class_eqn = self.representation.get_eqn(class_idx);
let (class_coeffs, class_shift): (ArrayView1<NNVFloat>, NNVFloat) = {
(
class_eqn.basis().remove_axis(Axis(0)),
class_eqn.shift()[[0]],
)
};
let nvars = class_coeffs.len();
if self.constraints.is_none() {
return true; // We should have more sophisticated handling here, but this is a stopgap
}
let poly = self.constraints.as_ref().unwrap();
let block_coeffs = poly.coeffs();
let (A, b) = (block_coeffs.rows(), poly.rhs());
// Add a constraint for each output class not equal to the one being maximized
let mut coeffs = Vec::new();
let mut shifts = Vec::new();
for idx in 0..self.representation.shift().ndim() {
if idx == class_idx {
continue;
}
{
let (diff_coeffs, diff_shift) = {
let other_class_eqn = self.representation.get_eqn(idx);
(
&other_class_eqn.basis().row(0) - &class_coeffs,
class_shift - other_class_eqn.shift()[[0]],
)
};
coeffs.push(diff_coeffs);
shifts.push(diff_shift);
}
}
let solve_a = A
.into_iter()
.chain(coeffs.iter().map(ndarray::ArrayBase::view));
let solved = solve(
solve_a,
b.iter().chain(shifts.iter()),
Array1::ones(nvars).view(),
Some(&Bounds1::trivial(nvars)),
);
matches!(
solved,
LinearSolution::Solution(..) | LinearSolution::Unbounded(..)
)
}
pub fn calculate_output_axis_aligned_bounding_box(
&self,
input_outer_bounds: &Bounds1,
) -> Bounds1 {
let lbs = Array1::from_iter(
(0..self.representation_space_dim())
.map(|x| self.get_output_min(x, input_outer_bounds)),
);
let ubs = Array1::from_iter(
(0..self.representation_space_dim())
.map(|x| self.get_output_max(x, input_outer_bounds)),
);
Bounds1::new(lbs.view(), ubs.view())
}
/// Check whether the Star set is empty.
pub fn is_empty(&self, input_bounds_opt: &Option<Bounds1>) -> bool {
self.constraints
.as_ref()
.map_or(false, |x| x.is_empty(input_bounds_opt))
}
}
impl Star4 {
/// Create a new Star with given dimension.
///
/// By default this Star covers the space because it has no constraints. To add constraints call `.add_constraints`.
///
/// # Panics
pub fn default(input_shape: &TensorShape) -> Self {
debug_assert_eq!(input_shape.rank(), 3);
debug_assert!(input_shape.is_fully_defined());
let shape_slice = input_shape.as_defined_slice().unwrap();
let slice_exact: [usize; 4] = [
shape_slice[0],
shape_slice[1],
shape_slice[2],
shape_slice[3],
];
Self {
representation: Affine4::new(Array4::ones(slice_exact), Array1::zeros(shape_slice[3])),
constraints: None,
}
}
}
#[cfg(test)]
mod test {
// use super::*;
// use crate::test_util::{array2, empty_star, non_empty_star};
// use ndarray::arr1;
// use proptest::prelude::*;
// use proptest::proptest;
// use std::panic;
/*
#[test]
fn test_gaussian_sample_manual() {
let mut rng = Pcg64::seed_from_u64(2);
let loc: Array1<f64> = arr1(&[-2.52]);
let scale_diag: Array1<f64> = arr1(&[0.00001]);
let lbs = loc.clone() - 3.5 * scale_diag.clone();
let ubs = loc.clone() + 3.5 * scale_diag.clone();
let input_bounds = Bounds1::new(lbs.view(), ubs.view());
let mut star: Star2<f64> = Star2::new(Array2::eye(1) * 8.016, Array1::zeros(1));
star = star.add_constraints(&Inequality::new(arr2(&[[-1.], [1.]]), arr1(&[2.52, 0.10])));
star.gaussian_sample(
&mut rng,
&loc,
&Array2::from_diag(&scale_diag).to_owned(),
10,
20,
&Some(input_bounds),
);
/*
Star { representation: Affine { basis: [[8.016197283509424]], shape=[1, 1], strides=[1, 1], layout=CFcf (0xf), const ndim=2, shift: [0.0], shape=[1], strides=[1], layout=CFcf (0xf), const ndim=1 }, constraints: Some(Polytope { halfspaces: Inequality { coeffs: [[-1.0],
[1.0],
[-1.0],
[1.0],
[-1.0]], shape=[5, 1], strides=[1, 1], layn
let lbs = loc.clone() - 3.5 * scale_diag.clone();
let ubs = loc.clone() + 3.5 * scale_diag.clone();
let input_bounds = Bounds1::new(lbs.view(), ubs.view());
star.gaussian_sample(&mut rng, &loc, &Array2::from_diag(&scale_diag).to_owned(), 10, 20, &Some(input_bounds));
}
*/
#[test]
fn test_get_output_min_feasible(star in non_empty_star(2,3)) {
prop_assert!(!star.input_space_polytope().unwrap().is_empty(), "Polytope is empty");
prop_assert!(!star.is_empty(), "Non empty star is empty");
let result = panic::catch_unwind(|| {
star.get_output_min(0);
});
prop_assert!(result.is_ok(), "Calculating min resulted in panic for feasible star");
}
#[test]
fn test_get_output_min_infeasible(star in empty_star(2,1)) {
prop_assert!(star.input_space_polytope().unwrap().is_empty(), "Polytope is not empty");
prop_assert!(star.is_empty(), "Empty star is not empty");
let result = panic::catch_unwind(|| {
star.get_output_min(0)
});
prop_assert!(result.is_err(), "Infeasible star did not panic for get_output_min {:#?}", result);
}
#[test]
fn test_get_output_max_feasible(star in non_empty_star(2,3)) {
prop_assert!(!star.input_space_polytope().unwrap().is_empty(), "Polytope is empty");
prop_assert!(!star.is_empty(), "Non empty star is empty");
let result = panic::catch_unwind(|| {
star.get_output_max(0);
});
prop_assert!(result.is_ok(), "Calculating min resulted in panic for feasible star");
}
#[test]
fn test_get_output_max_infeasible(star in empty_star(2,1)) {
prop_assert!(star.input_space_polytope().unwrap().is_empty(), "Polytope is empty");
prop_assert!(star.is_empty(), "Empty star is not empty");
let result = panic::catch_unwind(|| {
star.get_output_max(0)
});
prop_assert!(result.is_err(), "Infeasible star did not panic for get_output_min {:#?}", result);
}
#[test]
fn test_get_output_min_box_polytope(basis in array2(2, 2)) {
let num_dims = 2;
let box_bounds: Bounds1<f64> = Bounds1::new(Array1::from_elem(num_dims, -20.).view(), Array1::from_elem(num_dims, -20.).view());
let box_ineq = Inequality::new(Array2::zeros([1, num_dims]), Array1::zeros(1), box_bounds);
let poly = Polytope::from_halfspaces(box_ineq);
let center = arr1(&[0.0, 0.0]);
let star = Star2::new(basis, center).with_constraints(poly);
let result = panic::catch_unwind(|| {
star.get_output_min(0);
});
assert!(result.is_ok());
}
#[test]
fn test_get_output_max_box_polytope(basis in array2(2, 2)) {
let num_dims = 2;
let box_bounds: Bounds1<f64> = Bounds1::new(Array1::from_elem(num_dims, -20.).view(), Array1::from_elem(num_dims, -20.).view());
let box_ineq = Inequality::new(Array2::zeros([1, num_dims]), Array1::zeros(1), box_bounds);
let poly = Polytope::from_halfspaces(box_ineq);
let center = arr1(&[0.0, 0.0]);
let star = Star2::new(basis, center).with_constraints(poly);
let result = panic::catch_unwind(|| {
star.get_output_max(0);
});
assert!(result.is_ok());
}
}
*/
}