use core::{borrow::Borrow, marker::PhantomData, ops::RangeInclusive};
use num_traits::{float::FloatCore, AsPrimitive, PrimInt, WrappingAdd, WrappingSub};
use crate::{generic_static_asserts, wrapping_pow2, BitArray};
use super::{
DecoderModel, Distribution, EncoderModel, EntropyModel, Inverse, IterableEntropyModel,
};
#[derive(Debug, Clone, Copy)]
pub struct LeakyQuantizer<F, Symbol, Probability, const PRECISION: usize> {
min_symbol_inclusive: Symbol,
max_symbol_inclusive: Symbol,
free_weight: F,
phantom: PhantomData<Probability>,
}
pub type DefaultLeakyQuantizer<F, Symbol> = LeakyQuantizer<F, Symbol, u32, 24>;
pub type SmallLeakyQuantizer<F, Symbol> = LeakyQuantizer<F, Symbol, u16, 12>;
impl<F, Symbol, Probability, const PRECISION: usize>
LeakyQuantizer<F, Symbol, Probability, PRECISION>
where
Probability: BitArray + Into<F>,
Symbol: PrimInt + AsPrimitive<Probability> + WrappingSub + WrappingAdd,
F: FloatCore,
{
pub fn new(support: RangeInclusive<Symbol>) -> Self {
generic_static_asserts!(
(Probability: BitArray; const PRECISION: usize);
PROBABILITY_MUST_SUPPORT_PRECISION: PRECISION <= Probability::BITS;
PRECISION_MUST_BE_NONZERO: PRECISION > 0;
);
assert!(support.end() > support.start());
let support_size_minus_one = support.end().wrapping_sub(support.start()).as_();
let max_probability = Probability::max_value() >> (Probability::BITS - PRECISION);
let free_weight = max_probability
.checked_sub(&support_size_minus_one)
.expect("The support is too large to assign a nonzero probability to each element.")
.into();
LeakyQuantizer {
min_symbol_inclusive: *support.start(),
max_symbol_inclusive: *support.end(),
free_weight,
phantom: PhantomData,
}
}
#[inline]
pub fn quantize<D: Distribution>(
self,
distribution: D,
) -> LeakilyQuantizedDistribution<F, Symbol, Probability, D, PRECISION> {
LeakilyQuantizedDistribution {
inner: distribution,
quantizer: self,
}
}
#[inline]
pub fn support(&self) -> RangeInclusive<Symbol> {
self.min_symbol_inclusive..=self.max_symbol_inclusive
}
}
#[derive(Debug, Clone, Copy)]
pub struct LeakilyQuantizedDistribution<F, Symbol, Probability, D, const PRECISION: usize> {
inner: D,
quantizer: LeakyQuantizer<F, Symbol, Probability, PRECISION>,
}
impl<F, Symbol, Probability, D, const PRECISION: usize>
LeakilyQuantizedDistribution<F, Symbol, Probability, D, PRECISION>
where
Probability: BitArray + Into<F>,
Symbol: PrimInt + AsPrimitive<Probability> + WrappingSub + WrappingAdd,
F: FloatCore,
{
#[inline]
pub fn quantizer(self) -> LeakyQuantizer<F, Symbol, Probability, PRECISION> {
self.quantizer
}
#[inline]
pub fn inner(&self) -> &D {
&self.inner
}
#[inline]
pub fn inner_mut(&mut self) -> &mut D {
&mut self.inner
}
#[inline]
pub fn into_inner(self) -> D {
self.inner
}
#[inline]
pub fn support(&self) -> RangeInclusive<Symbol> {
self.quantizer.support()
}
}
#[inline(always)]
fn slack<Probability, Symbol>(symbol: Symbol, min_symbol_inclusive: Symbol) -> Probability
where
Probability: BitArray,
Symbol: AsPrimitive<Probability> + WrappingSub,
{
let mask = wrapping_pow2::<Probability>(8 * core::mem::size_of::<Symbol>())
.wrapping_sub(&Probability::one());
symbol.wrapping_sub(&min_symbol_inclusive).as_() & mask
}
impl<F, Symbol, Probability, D, const PRECISION: usize> EntropyModel<PRECISION>
for LeakilyQuantizedDistribution<F, Symbol, Probability, D, PRECISION>
where
Probability: BitArray,
{
type Probability = Probability;
type Symbol = Symbol;
}
impl<Symbol, Probability, D, const PRECISION: usize> EncoderModel<PRECISION>
for LeakilyQuantizedDistribution<f64, Symbol, Probability, D, PRECISION>
where
f64: AsPrimitive<Probability>,
Symbol: PrimInt + AsPrimitive<Probability> + Into<f64> + WrappingSub,
Probability: BitArray + Into<f64>,
D: Distribution,
D::Value: AsPrimitive<Symbol>,
{
fn left_cumulative_and_probability(
&self,
symbol: impl Borrow<Symbol>,
) -> Option<(Probability, Probability::NonZero)> {
let min_symbol_inclusive = self.quantizer.min_symbol_inclusive;
let max_symbol_inclusive = self.quantizer.max_symbol_inclusive;
let free_weight = self.quantizer.free_weight;
if symbol.borrow() < &min_symbol_inclusive || symbol.borrow() > &max_symbol_inclusive {
return None;
};
let slack = slack(*symbol.borrow(), min_symbol_inclusive);
let left_sided_cumulative = if symbol.borrow() == &min_symbol_inclusive {
Probability::zero()
} else {
let non_leaky: Probability =
(free_weight * self.inner.distribution((*symbol.borrow()).into() - 0.5)).as_();
non_leaky + slack
};
let right_sided_cumulative = if symbol.borrow() == &max_symbol_inclusive {
wrapping_pow2(PRECISION)
} else {
let non_leaky: Probability =
(free_weight * self.inner.distribution((*symbol.borrow()).into() + 0.5)).as_();
non_leaky + slack + Probability::one()
};
let probability = right_sided_cumulative
.wrapping_sub(&left_sided_cumulative)
.into_nonzero()
.expect("Invalid underlying continuous probability distribution.");
Some((left_sided_cumulative, probability))
}
}
impl<Symbol, Probability, D, const PRECISION: usize> DecoderModel<PRECISION>
for LeakilyQuantizedDistribution<f64, Symbol, Probability, D, PRECISION>
where
f64: AsPrimitive<Probability>,
Symbol: PrimInt + AsPrimitive<Probability> + Into<f64> + WrappingSub + WrappingAdd,
Probability: BitArray + Into<f64>,
D: Inverse,
D::Value: AsPrimitive<Symbol>,
{
fn quantile_function(
&self,
quantile: Probability,
) -> (Self::Symbol, Probability, Probability::NonZero) {
let max_probability = Probability::max_value() >> (Probability::BITS - PRECISION);
assert!(quantile <= max_probability);
let inverse_denominator = 1.0 / (max_probability.into() + 1.0);
let min_symbol_inclusive = self.quantizer.min_symbol_inclusive;
let max_symbol_inclusive = self.quantizer.max_symbol_inclusive;
let free_weight = self.quantizer.free_weight;
let mut symbol: Self::Symbol = self
.inner
.inverse((quantile.into() + 0.5) * inverse_denominator)
.as_();
let mut left_sided_cumulative = if symbol <= min_symbol_inclusive {
symbol = min_symbol_inclusive;
Probability::zero()
} else {
if symbol > max_symbol_inclusive {
symbol = max_symbol_inclusive;
}
let non_leaky: Probability =
(free_weight * self.inner.distribution(symbol.into() - 0.5)).as_();
non_leaky + slack(symbol, min_symbol_inclusive)
};
let mut step = Self::Symbol::one(); let right_sided_cumulative = if left_sided_cumulative > quantile {
symbol = symbol - step;
let mut found_lower_bound = false;
loop {
let old_left_sided_cumulative = left_sided_cumulative;
if symbol == min_symbol_inclusive {
left_sided_cumulative = Probability::zero();
if step <= Symbol::one() {
break old_left_sided_cumulative;
}
} else {
let non_leaky: Probability =
(free_weight * self.inner.distribution(symbol.into() - 0.5)).as_();
left_sided_cumulative = non_leaky + slack(symbol, min_symbol_inclusive);
}
if left_sided_cumulative <= quantile {
found_lower_bound = true;
if step <= Symbol::one() {
let right_sided_cumulative = if symbol == max_symbol_inclusive {
wrapping_pow2(PRECISION)
} else {
let non_leaky: Probability =
(free_weight * self.inner.distribution(symbol.into() + 0.5)).as_();
(non_leaky + slack(symbol, min_symbol_inclusive))
.wrapping_add(&Probability::one())
};
break right_sided_cumulative;
} else {
step = step >> 1;
symbol = symbol + step;
}
} else if found_lower_bound {
if step > Symbol::one() {
step = step >> 1
}
symbol = symbol - step;
} else {
if step << 1 != Symbol::zero() {
step = step << 1;
}
symbol = loop {
let new_symbol = symbol.wrapping_sub(&step);
if new_symbol >= min_symbol_inclusive && new_symbol <= symbol {
break new_symbol;
}
step = step >> 1;
};
}
}
} else {
let mut found_upper_bound = false;
loop {
let right_sided_cumulative = if symbol == max_symbol_inclusive {
let right_sided_cumulative = wrapping_pow2(PRECISION);
if step <= Symbol::one() {
let non_leaky: Probability =
(free_weight * self.inner.distribution(symbol.into() - 0.5)).as_();
left_sided_cumulative = non_leaky + slack(symbol, min_symbol_inclusive);
if right_sided_cumulative == left_sided_cumulative {
panic!("Invalid underlying probability distribution.");
}
break right_sided_cumulative;
} else {
right_sided_cumulative
}
} else {
let non_leaky: Probability =
(free_weight * self.inner.distribution(symbol.into() + 0.5)).as_();
(non_leaky + slack(symbol, min_symbol_inclusive))
.wrapping_add(&Probability::one())
};
if right_sided_cumulative > quantile
|| right_sided_cumulative == Probability::zero()
{
found_upper_bound = true;
if step <= Symbol::one() {
left_sided_cumulative = if symbol == min_symbol_inclusive {
Probability::zero()
} else {
let non_leaky: Probability =
(free_weight * self.inner.distribution(symbol.into() - 0.5)).as_();
non_leaky + slack(symbol, min_symbol_inclusive)
};
if left_sided_cumulative <= quantile || symbol == min_symbol_inclusive {
break right_sided_cumulative;
}
} else {
step = step >> 1;
}
symbol = symbol - step;
} else if found_upper_bound {
if step > Symbol::one() {
step = step >> 1
}
symbol = symbol + step;
} else {
if step << 1 != Symbol::zero() {
step = step << 1;
}
symbol = loop {
let new_symbol = symbol.wrapping_add(&step);
if new_symbol <= max_symbol_inclusive && new_symbol >= symbol {
break new_symbol;
}
step = step >> 1;
};
}
}
};
let probability = unsafe {
right_sided_cumulative
.wrapping_sub(&left_sided_cumulative)
.into_nonzero_unchecked()
};
(symbol, left_sided_cumulative, probability)
}
}
impl<'m, Symbol, Probability, D, const PRECISION: usize> IterableEntropyModel<'m, PRECISION>
for LeakilyQuantizedDistribution<f64, Symbol, Probability, D, PRECISION>
where
f64: AsPrimitive<Probability>,
Symbol: PrimInt + AsPrimitive<Probability> + AsPrimitive<usize> + Into<f64> + WrappingSub,
Probability: BitArray + Into<f64>,
D: Distribution + 'm,
D::Value: AsPrimitive<Symbol>,
{
fn symbol_table(
&'m self,
) -> impl Iterator<
Item = (
Self::Symbol,
Self::Probability,
<Self::Probability as BitArray>::NonZero,
),
> {
LeakilyQuantizedDistributionIter {
model: self,
symbol: Some(self.quantizer.min_symbol_inclusive),
left_sided_cumulative: Probability::zero(),
}
}
}
#[derive(Debug)]
struct LeakilyQuantizedDistributionIter<Symbol, Probability, M, const PRECISION: usize> {
model: M,
symbol: Option<Symbol>,
left_sided_cumulative: Probability,
}
impl<Symbol, Probability, D, const PRECISION: usize> Iterator
for LeakilyQuantizedDistributionIter<
Symbol,
Probability,
&LeakilyQuantizedDistribution<f64, Symbol, Probability, D, PRECISION>,
PRECISION,
>
where
f64: AsPrimitive<Probability>,
Symbol: PrimInt + AsPrimitive<Probability> + AsPrimitive<usize> + Into<f64> + WrappingSub,
Probability: BitArray + Into<f64>,
D: Distribution,
D::Value: AsPrimitive<Symbol>,
{
type Item = (Symbol, Probability, Probability::NonZero);
fn next(&mut self) -> Option<Self::Item> {
let symbol = self.symbol?;
let right_sided_cumulative = if symbol == self.model.quantizer.max_symbol_inclusive {
self.symbol = None;
wrapping_pow2(PRECISION)
} else {
let next_symbol = symbol + Symbol::one();
self.symbol = Some(next_symbol);
let non_leaky: Probability = (self.model.quantizer.free_weight
* self.model.inner.distribution((symbol).into() - 0.5))
.as_();
non_leaky + slack(next_symbol, self.model.quantizer.min_symbol_inclusive)
};
let probability = unsafe {
right_sided_cumulative
.wrapping_sub(&self.left_sided_cumulative)
.into_nonzero_unchecked()
};
let left_sided_cumulative = self.left_sided_cumulative;
self.left_sided_cumulative = right_sided_cumulative;
Some((symbol, left_sided_cumulative, probability))
}
fn size_hint(&self) -> (usize, Option<usize>) {
if let Some(symbol) = self.symbol {
let len = slack::<usize, _>(symbol, self.model.quantizer.max_symbol_inclusive)
.saturating_add(1);
(len, None)
} else {
(0, Some(0))
}
}
}
#[cfg(test)]
mod tests {
use probability::prelude::*;
use super::*;
#[test]
fn split_almost_delta_distribution() {
fn inner(distribution: impl Distribution<Value = f64>) {
let quantizer = DefaultLeakyQuantizer::new(-10..=10);
let model = quantizer.quantize(distribution);
let (left_cdf, left_prob) = model.left_cumulative_and_probability(2).unwrap();
let (right_cdf, right_prob) = model.left_cumulative_and_probability(3).unwrap();
assert_eq!(
left_prob.get(),
right_prob.get() - 1,
"Peak not split evenly."
);
assert_eq!(
(1u32 << 24) - left_prob.get() - right_prob.get(),
19,
"Peak has wrong probability mass."
);
assert_eq!(left_cdf + left_prob.get(), right_cdf);
}
inner(Gaussian::new(2.5, 1e-40));
inner(Cauchy::new(2.5, 1e-40));
inner(Laplace::new(2.5, 1e-40));
}
#[test]
fn leakily_quantized_normal() {
#[cfg(not(miri))]
let (support, std_devs, means) = (
-127..=127,
[1e-40, 0.0001, 0.1, 3.5, 123.45, 1234.56],
[
-300.6, -127.5, -100.2, -4.5, 0.0, 50.3, 127.5, 180.2, 2000.0,
],
);
#[cfg(miri)]
let (support, std_devs, means) = (
-20..=20,
[1e-40, 0.0001, 3.5, 1234.56],
[-300.6, -20.5, -5.2, 8.5, 20.5, 2000.0],
);
let quantizer = LeakyQuantizer::<_, _, u32, 24>::new(support.clone());
for &std_dev in &std_devs {
for &mean in &means {
let distribution = Gaussian::new(mean, std_dev);
super::super::tests::test_entropy_model(
&quantizer.quantize(distribution),
*support.start()..*support.end() + 1,
);
}
}
}
#[test]
fn leakily_quantized_cauchy() {
#[cfg(not(miri))]
let (support, gammas, means) = (
-127..=127,
[1e-40, 0.0001, 0.1, 3.5, 123.45, 1234.56],
[
-300.6, -127.5, -100.2, -4.5, 0.0, 50.3, 127.5, 180.2, 2000.0,
],
);
#[cfg(miri)]
let (support, gammas, means) = (
-20..=20,
[1e-40, 0.0001, 3.5, 1234.56],
[-300.6, -20.5, -5.2, 8.5, 20.5, 2000.0],
);
let quantizer = LeakyQuantizer::<_, _, u32, 24>::new(support.clone());
for &gamma in &gammas {
for &mean in &means {
let distribution = Cauchy::new(mean, gamma);
super::super::tests::test_entropy_model(
&quantizer.quantize(distribution),
*support.start()..*support.end() + 1,
);
}
}
}
#[test]
fn leakily_quantized_laplace() {
#[cfg(not(miri))]
let (support, bs, means) = (
-127..=127,
[1e-40, 0.0001, 0.1, 3.5, 123.45, 1234.56],
[
-300.6, -127.5, -100.2, -4.5, 0.0, 50.3, 127.5, 180.2, 2000.0,
],
);
#[cfg(miri)]
let (support, bs, means) = (
-20..=20,
[1e-40, 0.0001, 3.5, 1234.56],
[-300.6, -20.5, -5.2, 8.5, 20.5, 2000.0],
);
let quantizer = LeakyQuantizer::<_, _, u32, 24>::new(support.clone());
for &b in &bs {
for &mean in &means {
let distribution = Laplace::new(mean, b);
super::super::tests::test_entropy_model(
&quantizer.quantize(distribution),
*support.start()..*support.end() + 1,
);
}
}
}
#[test]
fn leakily_quantized_binomial() {
#[cfg(not(miri))]
let (ns, ps) = (
[1, 2, 10, 100, 1000, 10_000],
[1e-30, 1e-20, 1e-10, 0.1, 0.4, 0.9],
);
#[cfg(miri)]
let (ns, ps) = ([1, 2, 100], [1e-30, 0.1, 0.4]);
for &n in &ns {
for &p in &ps {
if n < 1000 || p >= 0.1 {
let quantizer = LeakyQuantizer::<_, _, u32, 24>::new(0..=n as u32);
let distribution = Binomial::new(n, p);
super::super::tests::test_entropy_model(
&quantizer.quantize(distribution),
0..(n as u32 + 1),
);
}
}
}
}
}