import constriction
import numpy as np
import sys
import scipy
def test_module_example1():
message = np.array([6, 10, -4, 2, 5, 2, 1, 0, 2], dtype=np.int32)
entropy_model = constriction.stream.model.QuantizedGaussian(
-50, 50, 3.2, 9.6)
encoder = constriction.stream.stack.AnsCoder()
encoder.encode_reverse(message, entropy_model)
compressed = encoder.get_compressed()
print(f"compressed representation: {compressed}")
print(f"(in binary: {[bin(word) for word in compressed]})")
decoder = constriction.stream.stack.AnsCoder(compressed)
decoded = decoder.decode(entropy_model, 9) assert np.all(decoded == message)
def test_module_example2():
message = np.array([6, 10, -4, 2, 5, 2, 1, 0, 2], dtype=np.int32)
entropy_model = constriction.stream.model.QuantizedGaussian(
-50, 50, 3.2, 9.6)
encoder = constriction.stream.queue.RangeEncoder() encoder.encode(message, entropy_model)
compressed = encoder.get_compressed()
print(f"compressed representation: {compressed}")
print(f"(in binary: {[bin(word) for word in compressed]})")
decoder = constriction.stream.queue.RangeDecoder(
compressed) decoded = decoder.decode(entropy_model, 9) assert np.all(decoded == message)
def test_old_module_example1():
coder = constriction.stream.stack.AnsCoder()
model = constriction.stream.model.QuantizedGaussian(-100, 100)
symbols = np.array([23, -15, 78, 43, -69], dtype=np.int32)
means = np.array([35.2, -1.7, 30.1, 71.2, -75.1], dtype=np.float64)
stds = np.array([10.1, 25.3, 23.8, 35.4, 3.9], dtype=np.float64)
coder.encode_reverse(symbols, model, means, stds)
print(f"Compressed size: {coder.num_bits()} bits")
print(
f"(without unnecessary trailing zeros: {coder.num_valid_bits()} bits)")
compressed = coder.get_compressed()
if sys.byteorder == "big":
compressed.byteswap(inplace=True)
if sys.byteorder == "big":
compressed.byteswap(inplace=True)
coder = constriction.stream.stack.AnsCoder(compressed)
min_supported_symbol, max_supported_symbol = -100, 100 means = np.array([35.2, -1.7, 30.1, 71.2, -75.1], dtype=np.float64)
stds = np.array([10.1, 25.3, 23.8, 35.4, 3.9], dtype=np.float64)
reconstructed = coder.decode(model, means, stds)
assert coder.is_empty()
assert np.all(reconstructed == symbols)
def test_module_example3():
message = np.array(
[6, 10, -4, 2, 5, 2, 1, 0, 2], dtype=np.int32)
means = np.array([2.3, 6.1, -8.5, 4.1, 1.3], dtype=np.float64)
stds = np.array([6.2, 5.3, 3.8, 3.2, 4.7], dtype=np.float64)
entropy_model1 = constriction.stream.model.QuantizedGaussian(-50, 50)
entropy_model2 = constriction.stream.model.Categorical(
np.array([0.2, 0.5, 0.3], dtype=np.float64), perfect=False
)
encoder = constriction.stream.queue.RangeEncoder()
encoder.encode(message[0:5], entropy_model1, means, stds)
encoder.encode(message[5:9], entropy_model2)
compressed = encoder.get_compressed()
print(f"compressed representation: {compressed}")
print(f"(in binary: {[bin(word) for word in compressed]})")
assert np.all(compressed == np.array([3176507208], dtype=np.uint32))
decoder = constriction.stream.queue.RangeDecoder(compressed)
decoded_part1 = decoder.decode(entropy_model1, means, stds)
decoded_part2 = decoder.decode(entropy_model2, 4)
assert np.all(np.concatenate((decoded_part1, decoded_part2)) == message)
def test_chain1():
leaky_gaussian = constriction.stream.model.QuantizedGaussian(-100, 100)
means = np.array([3.2, -14.3, 5.7])
stds = np.array([6.4, 4.2, 3.9])
def run_encoder_part(side_information):
coder = constriction.stream.chain.ChainCoder(
side_information, is_remainders=False, seal=True )
symbols = coder.decode(leaky_gaussian, means, stds)
remaining1, remaining2 = coder.get_remainders()
return symbols, np.concatenate([remaining1, remaining2])
def run_decoder_part(symbols, remaining):
coder = constriction.stream.chain.ChainCoder(
remaining, is_remainders=True, seal=False )
coder.encode_reverse(symbols, leaky_gaussian, means, stds)
data1, data2 = coder.get_data(unseal=True)
return np.concatenate([data1, data2])
np.random.seed(123)
sample_side_information = np.random.randint(2**32, size=10, dtype=np.uint32)
symbols, remaining = run_encoder_part(sample_side_information)
recovered = run_decoder_part(symbols, remaining)
assert np.all(recovered == sample_side_information)
def test_chain2():
data = np.array(
[0x80d14131, 0xdda97c6c, 0x5017a640, 0x01170a3e], np.uint32)
probabilities = np.array(
[[0.1, 0.7, 0.1, 0.1], [0.2, 0.2, 0.1, 0.5], [0.2, 0.1, 0.4, 0.3]]) model_family = constriction.stream.model.Categorical(perfect=False)
ansCoder = constriction.stream.stack.AnsCoder(data, seal=True)
assert np.all(ansCoder.decode(model_family, probabilities)
== np.array([0, 0, 2], dtype=np.int32))
probabilities[0, :] = np.array([0.09, 0.71, 0.1, 0.1])
ansCoder = constriction.stream.stack.AnsCoder(data, seal=True)
assert np.all(ansCoder.decode(model_family, probabilities)
== np.array([1, 0, 0], dtype=np.int32))
def test_chain3():
data = np.array(
[0x80d14131, 0xdda97c6c, 0x5017a640, 0x01170a3e], np.uint32)
probabilities = np.array(
[[0.1, 0.7, 0.1, 0.1],
[0.2, 0.2, 0.1, 0.5],
[0.2, 0.1, 0.4, 0.3]])
model_family = constriction.stream.model.Categorical(perfect=False)
chainCoder = constriction.stream.chain.ChainCoder(data, seal=True)
assert np.all(chainCoder.decode(model_family, probabilities)
== np.array([0, 3, 3], dtype=np.int32))
probabilities[0, :] = np.array([0.09, 0.71, 0.1, 0.1])
chainCoder = constriction.stream.chain.ChainCoder(data, seal=True)
assert np.all(chainCoder.decode(model_family, probabilities)
== np.array([1, 3, 3], dtype=np.int32))
def test_stack1():
message_part1 = np.array([1, 2, 0, 3, 2, 3, 0], dtype=np.int32)
probabilities_part1 = np.array([0.2, 0.4, 0.1, 0.3], dtype=np.float64)
model_part1 = constriction.stream.model.Categorical(probabilities_part1, perfect=False)
message_part2 = np.array([6, 10, -4, 2], dtype=np.int32)
means_part2 = np.array([2.5, 13.1, -1.1, -3.0], dtype=np.float64)
stds_part2 = np.array([4.1, 8.7, 6.2, 5.4], dtype=np.float64)
model_family_part2 = constriction.stream.model.QuantizedGaussian(-100, 100)
print(
f"Original message: {np.concatenate([message_part1, message_part2])}")
coder = constriction.stream.stack.AnsCoder()
coder.encode_reverse(
message_part2, model_family_part2, means_part2, stds_part2)
coder.encode_reverse(message_part1, model_part1)
compressed = coder.get_compressed()
print(f"compressed representation: {compressed}")
print(f"(in binary: {[bin(word) for word in compressed]})")
decoded_part1 = coder.decode(model_part1, 7) decoded_part2 = coder.decode(model_family_part2, means_part2, stds_part2)
print(f"Decoded message: {np.concatenate([decoded_part1, decoded_part2])}")
assert np.all(decoded_part1 == message_part1)
assert np.all(decoded_part2 == message_part2)
def test_stack2():
ans = constriction.stream.stack.AnsCoder()
symbols = np.array([2, -1, 0, 2, 3], dtype=np.int32)
min_supported_symbol, max_supported_symbol = -10, 10 model = constriction.stream.model.QuantizedGaussian(
min_supported_symbol, max_supported_symbol)
means = np.array([2.3, -1.7, 0.1, 2.2, -5.1], dtype=np.float64)
stds = np.array([1.1, 5.3, 3.8, 1.4, 3.9], dtype=np.float64)
ans.encode_reverse(symbols, model, means, stds)
print(f"Compressed size: {ans.num_valid_bits()} bits")
compressed = ans.get_compressed()
ans = constriction.stream.stack.AnsCoder(compressed)
min_supported_symbol, max_supported_symbol = -10, 10 model = constriction.stream.model.QuantizedGaussian(
min_supported_symbol, max_supported_symbol)
means = np.array([2.3, -1.7, 0.1, 2.2, -5.1], dtype=np.float64)
stds = np.array([1.1, 5.3, 3.8, 1.4, 3.9], dtype=np.float64)
reconstructed = ans.decode(model, means, stds)
assert ans.is_empty()
assert np.all(reconstructed == symbols)
def test_ans_decode1():
probabilities = np.array([0.1, 0.6, 0.3], dtype=np.float64)
model = constriction.stream.model.Categorical(probabilities, perfect=False)
compressed = np.array([2514924296, 114], dtype=np.uint32)
coder = constriction.stream.stack.AnsCoder(compressed)
symbol = coder.decode(model)
assert symbol == 2
def test_ans_decode2():
probabilities = np.array([0.1, 0.6, 0.3], dtype=np.float64)
model = constriction.stream.model.Categorical(probabilities, perfect=False)
compressed = np.array([1441153686, 108], dtype=np.uint32)
coder = constriction.stream.stack.AnsCoder(compressed)
symbols = coder.decode(model, 9)
assert np.all(symbols == np.array(
[2, 0, 0, 1, 2, 2, 1, 2, 2], dtype=np.int32))
def test_ans_decode3():
model_family = constriction.stream.model.QuantizedGaussian(-100, 100)
means = np.array([10.3, -4.7, 20.5], dtype=np.float64)
stds = np.array([5.2, 24.2, 3.1], dtype=np.float64)
compressed = np.array([597775281, 3], dtype=np.uint32)
coder = constriction.stream.stack.AnsCoder(compressed)
symbols = coder.decode(model_family, means, stds)
assert np.all(symbols == np.array([12, -13, 25], dtype=np.int32))
def test_ans_decode4():
probabilities = np.array(
[[0.1, 0.2, 0.3, 0.1, 0.3], [0.3, 0.2, 0.2, 0.2, 0.1]], dtype=np.float64)
model_family = constriction.stream.model.Categorical(perfect=False)
compressed = np.array([2142112014, 31], dtype=np.uint32)
coder = constriction.stream.stack.AnsCoder(compressed)
symbols = coder.decode(model_family, probabilities)
assert np.all(symbols == np.array([3, 1], dtype=np.int32))
def test_ans_encode_reverse1():
probabilities = np.array([0.1, 0.6, 0.3], dtype=np.float64)
model = constriction.stream.model.Categorical(probabilities, perfect=False)
coder = constriction.stream.stack.AnsCoder()
coder.encode_reverse(2, model)
def test_ans_encode_reverse2():
probabilities = np.array([0.1, 0.6, 0.3], dtype=np.float64)
model = constriction.stream.model.Categorical(probabilities, perfect=False)
symbols = np.array([0, 2, 1, 2, 0, 2, 0, 2, 1], dtype=np.int32)
coder = constriction.stream.stack.AnsCoder()
coder.encode_reverse(symbols, model)
assert np.all(coder.get_compressed() == np.array(
[1276728145, 172], dtype=np.uint32))
def test_ans_encode_reverse3():
model_family = constriction.stream.model.QuantizedGaussian(-100, 100)
means = np.array([10.3, -4.7, 20.5], dtype=np.float64)
stds = np.array([5.2, 24.2, 3.1], dtype=np.float64)
symbols = np.array([12, -13, 25], dtype=np.int32)
coder = constriction.stream.stack.AnsCoder()
coder.encode_reverse(symbols, model_family, means, stds)
assert np.all(coder.get_compressed() == np.array(
[597775281, 3], dtype=np.uint32))
def test_ans_encode_reverse4():
probabilities = np.array(
[[0.1, 0.2, 0.3, 0.1, 0.3], [0.3, 0.2, 0.2, 0.2, 0.1]], dtype=np.float64)
model_family = constriction.stream.model.Categorical(perfect=False)
symbols = np.array([3, 1], dtype=np.int32)
coder = constriction.stream.stack.AnsCoder()
coder.encode_reverse(symbols, model_family, probabilities)
assert np.all(coder.get_compressed() == np.array(
[45298481], dtype=np.uint32))
def test_ans_seek():
probabilities = np.array([0.2, 0.4, 0.1, 0.3], dtype=np.float64)
model = constriction.stream.model.Categorical(probabilities, perfect=False)
message_part1 = np.array([1, 2, 0, 3, 2, 3, 0], dtype=np.int32)
message_part2 = np.array([2, 2, 0, 1, 3], dtype=np.int32)
coder = constriction.stream.stack.AnsCoder()
coder.encode_reverse(message_part2, model)
(position, state) = coder.pos() coder.encode_reverse(message_part1, model)
assert coder.decode(model) == 1
coder.seek(position, state)
decoded_part2 = coder.decode(model, 5)
assert np.all(decoded_part2 == message_part2)
def test_range_coding_mod():
message_part1 = np.array([1, 2, 0, 3, 2, 3, 0], dtype=np.int32)
probabilities_part1 = np.array([0.2, 0.4, 0.1, 0.3], dtype=np.float64)
model_part1 = constriction.stream.model.Categorical(probabilities_part1, perfect=False)
message_part2 = np.array([6, 10, -4, 2], dtype=np.int32)
means_part2 = np.array([2.5, 13.1, -1.1, -3.0], dtype=np.float64)
stds_part2 = np.array([4.1, 8.7, 6.2, 5.4], dtype=np.float64)
model_family_part2 = constriction.stream.model.QuantizedGaussian(-100, 100)
print(
f"Original message: {np.concatenate([message_part1, message_part2])}")
encoder = constriction.stream.queue.RangeEncoder()
encoder.encode(message_part1, model_part1)
encoder.encode(message_part2, model_family_part2, means_part2, stds_part2)
compressed = encoder.get_compressed()
print(f"compressed representation: {compressed}")
print(f"(in binary: {[bin(word) for word in compressed]})")
decoder = constriction.stream.queue.RangeDecoder(compressed)
decoded_part1 = decoder.decode(model_part1, 7) decoded_part2 = decoder.decode(model_family_part2, means_part2, stds_part2)
print(f"Decoded message: {np.concatenate([decoded_part1, decoded_part2])}")
assert np.all(decoded_part1 == message_part1)
assert np.all(decoded_part2 == message_part2)
def test_old_module_example2():
encoder = constriction.stream.queue.RangeEncoder()
model = constriction.stream.model.QuantizedGaussian(-100, 100)
symbols = np.array([23, -15, 78, 43, -69], dtype=np.int32)
means = np.array([35.2, -1.7, 30.1, 71.2, -75.1], dtype=np.float64)
stds = np.array([10.1, 25.3, 23.8, 35.4, 3.9], dtype=np.float64)
encoder.encode(symbols, model, means, stds)
print(f"Compressed size: {encoder.num_bits()} bits")
compressed = encoder.get_compressed()
decoder = constriction.stream.queue.RangeDecoder(compressed)
reconstructed = decoder.decode(model, means, stds)
assert decoder.maybe_exhausted()
assert np.all(reconstructed == symbols)
def test_ans_example():
ans = constriction.stream.stack.AnsCoder()
model = constriction.stream.model.QuantizedGaussian(-10, 10)
symbols = np.array([2, -1, 0, 2, 3], dtype=np.int32)
means = np.array([2.3, -1.7, 0.1, 2.2, -5.1], dtype=np.float64)
stds = np.array([1.1, 5.3, 3.8, 1.4, 3.9], dtype=np.float64)
ans.encode_reverse(symbols, model, means, stds)
print(f"Compressed size: {ans.num_valid_bits()} bits")
compressed = ans.get_compressed()
if sys.byteorder == "big":
compressed.byteswap(inplace=True)
if sys.byteorder == "big":
compressed.byteswap(inplace=True)
ans = constriction.stream.stack.AnsCoder(compressed)
min_supported_symbol, max_supported_symbol = -10, 10 means = np.array([2.3, -1.7, 0.1, 2.2, -5.1], dtype=np.float64)
stds = np.array([1.1, 5.3, 3.8, 1.4, 3.9], dtype=np.float64)
reconstructed = ans.decode(model, means, stds)
assert ans.is_empty()
assert np.all(reconstructed == symbols)
def test_range_coder_encode1():
probabilities = np.array([0.1, 0.6, 0.3], dtype=np.float64)
model = constriction.stream.model.Categorical(probabilities, perfect=False)
encoder = constriction.stream.queue.RangeEncoder()
encoder.encode(2, model)
def test_range_coder_encode2():
probabilities = np.array([0.1, 0.6, 0.3], dtype=np.float64)
model = constriction.stream.model.Categorical(probabilities, perfect=False)
symbols = np.array([0, 2, 1, 2, 0, 2, 0, 2, 1], dtype=np.int32)
encoder = constriction.stream.queue.RangeEncoder()
encoder.encode(symbols, model)
assert np.all(encoder.get_compressed() ==
np.array([369323576], dtype=np.uint32))
def test_range_coder_encode3():
model_family = constriction.stream.model.QuantizedGaussian(-100, 100)
means = np.array([10.3, -4.7, 20.5], dtype=np.float64)
stds = np.array([5.2, 24.2, 3.1], dtype=np.float64)
symbols = np.array([12, -13, 25], dtype=np.int32)
encoder = constriction.stream.queue.RangeEncoder()
encoder.encode(symbols, model_family, means, stds)
assert np.all(encoder.get_compressed() ==
np.array([2655472005], dtype=np.uint32))
def test_range_coder_encode4():
probabilities = np.array(
[[0.1, 0.2, 0.3, 0.1, 0.3], [0.3, 0.2, 0.2, 0.2, 0.1]], dtype=np.float64)
model_family = constriction.stream.model.Categorical(perfect=False)
symbols = np.array([3, 1], dtype=np.int32)
encoder = constriction.stream.queue.RangeEncoder()
encoder.encode(symbols, model_family, probabilities)
assert np.all(encoder.get_compressed() ==
np.array([2705829254], dtype=np.uint32))
def test_range_coding_decode1():
probabilities = np.array([0.1, 0.6, 0.3], dtype=np.float64)
model = constriction.stream.model.Categorical(probabilities, perfect=False)
compressed = np.array([3089773345, 1894195597], dtype=np.uint32)
decoder = constriction.stream.queue.RangeDecoder(compressed)
symbol = decoder.decode(model)
assert symbol == 2
def test_range_coding_decode2():
probabilities = np.array([0.1, 0.6, 0.3], dtype=np.float64)
model = constriction.stream.model.Categorical(probabilities, perfect=False)
compressed = np.array([369323576], dtype=np.uint32)
decoder = constriction.stream.queue.RangeDecoder(compressed)
symbols = decoder.decode(model, 9)
assert np.all(symbols == np.array(
[0, 2, 1, 2, 0, 2, 0, 2, 1], dtype=np.int32))
def test_range_coding_seek():
probabilities = np.array([0.2, 0.4, 0.1, 0.3], dtype=np.float64)
model = constriction.stream.model.Categorical(probabilities, perfect=False)
message_part1 = np.array([1, 2, 0, 3, 2, 3, 0], dtype=np.int32)
message_part2 = np.array([2, 2, 0, 1, 3], dtype=np.int32)
encoder = constriction.stream.queue.RangeEncoder()
encoder.encode(message_part1, model)
(position, state) = encoder.pos() encoder.encode(message_part2, model)
compressed = encoder.get_compressed()
decoder = constriction.stream.queue.RangeDecoder(compressed)
assert decoder.decode(model) == 1
decoder.seek(position, state)
decoded_part2 = decoder.decode(model, 5)
assert np.all(decoded_part2 == message_part2)
def test_range_coding_decode3():
model_family = constriction.stream.model.QuantizedGaussian(-100, 100)
means = np.array([10.3, -4.7, 20.5], dtype=np.float64)
stds = np.array([5.2, 24.2, 3.1], dtype=np.float64)
compressed = np.array([2655472005], dtype=np.uint32)
decoder = constriction.stream.queue.RangeDecoder(compressed)
symbols = decoder.decode(model_family, means, stds)
assert np.all(symbols == np.array([12, -13, 25], dtype=np.int32))
def test_range_coding_decode4():
probabilities = np.array(
[[0.1, 0.2, 0.3, 0.1, 0.3], [0.3, 0.2, 0.2, 0.2, 0.1]], dtype=np.float64)
model_family = constriction.stream.model.Categorical(perfect=False)
compressed = np.array([2705829535], dtype=np.uint32)
decoder = constriction.stream.queue.RangeDecoder(compressed)
symbols = decoder.decode(model_family, probabilities)
assert np.all(symbols == np.array([3, 1], dtype=np.int32))
def test_custom_model_ans():
def fixed_model_params():
model_scipy = scipy.stats.cauchy(loc=10.3, scale=5.8)
model = constriction.stream.model.CustomModel(
model_scipy.cdf, model_scipy.ppf, -100, 100)
symbols = np.array([5, 14, -1, 21], dtype=np.int32)
coder = constriction.stream.stack.AnsCoder()
coder.encode_reverse(symbols, model)
assert np.all(coder.decode(model, 4) == symbols)
def variable_model_params():
model = constriction.stream.model.CustomModel(
lambda x, loc, scale: scipy.stats.cauchy.cdf(x, loc, scale),
lambda x, loc, scale: scipy.stats.cauchy.ppf(x, loc, scale),
-100, 100)
model_parameters = np.array([
(7.3, 3.9), (11.5, 5.2), (-3.2, 4.9), (25.9, 7.1),
])
symbols = np.array([5, 14, -1, 21], dtype=np.int32)
coder = constriction.stream.stack.AnsCoder()
coder.encode_reverse(
symbols, model, model_parameters[:, 0].copy(), model_parameters[:, 1].copy())
assert np.all(
coder.decode(model, model_parameters[:, 0].copy(), model_parameters[:, 1].copy()) == symbols)
def discrete_distribution():
model = constriction.stream.model.CustomModel(
lambda x, params: scipy.stats.binom.cdf(x, n=10, p=params),
lambda x, params: scipy.stats.binom.ppf(x, n=10, p=params),
0, 10)
success_probabilities = np.array([0.3, 0.7, 0.2, 0.6])
symbols = np.array([4, 8, 1, 5], dtype=np.int32)
coder = constriction.stream.stack.AnsCoder()
coder.encode_reverse(
symbols, model, success_probabilities)
assert np.all(
coder.decode(model, success_probabilities) == symbols)
fixed_model_params()
variable_model_params()
discrete_distribution()
def test_model_mod1():
model = constriction.stream.model.QuantizedGaussian(-100, 100, 12.6, 7.3)
symbols = np.array([12, 15, 4, -2, 18, 5], dtype=np.int32)
coder = constriction.stream.stack.AnsCoder() coder.encode_reverse(symbols, model)
assert np.all(coder.get_compressed() == np.array(
[745994372, 25704], dtype=np.uint32))
reconstructed = coder.decode(model, 6) assert np.all(reconstructed == symbols)
def test_model_mod2():
model_family = constriction.stream.model.QuantizedGaussian(-100, 100)
symbols = np.array([12, 15, 4, -2, 18, 5], dtype=np.int32)
means = np.array([13.2, 17.9, 7.3, -4.2, 25.1, 3.2], dtype=np.float64)
stds = np.array([3.2, 4.7, 5.2, 3.1, 6.3, 2.9], dtype=np.float64)
coder = constriction.stream.stack.AnsCoder() coder.encode_reverse(symbols, model_family, means, stds)
assert np.all(coder.get_compressed() == np.array(
[2051958011, 1549], dtype=np.uint32))
reconstructed = coder.decode(model_family, means, stds)
assert np.all(reconstructed == symbols)
def test_categorical1():
probabilities = np.array([0.2, 0.4, 0.1, 0.3], dtype=np.float64)
model = constriction.stream.model.Categorical(probabilities, perfect=False)
symbols = np.array([0, 3, 2, 3, 2, 0, 2, 1], dtype=np.int32)
coder = constriction.stream.stack.AnsCoder() coder.encode_reverse(symbols, model)
assert np.all(coder.get_compressed() == np.array(
[488222996, 175], dtype=np.uint32))
reconstructed = coder.decode(model, 8) assert np.all(reconstructed == symbols)
def test_categorical2():
model_family = constriction.stream.model.Categorical(perfect=False)
probabilities = np.array(
[[0.3, 0.1, 0.1, 0.3, 0.2], [0.1, 0.4, 0.2, 0.1, 0.2], [0.4, 0.2, 0.1, 0.2, 0.1]], dtype=np.float64)
symbols = np.array([0, 4, 1], dtype=np.int32)
coder = constriction.stream.stack.AnsCoder() coder.encode_reverse(symbols, model_family, probabilities)
assert np.all(coder.get_compressed() == np.array(
[104018741], dtype=np.uint32))
reconstructed = coder.decode(model_family, probabilities)
assert np.all(reconstructed == symbols)
def test_custom_model1():
model = constriction.stream.model.CustomModel(
lambda x: 0.5 + 0.5 * np.tanh(x * 0.1), lambda xi: xi, -100, 100)
symbols = np.array([-3, 2, 5, 5, 6], dtype=np.int32)
coder = constriction.stream.stack.AnsCoder() coder.encode_reverse(symbols, model)
print(coder.get_compressed())
reconstructed = coder.decode(model, 5) assert np.all(reconstructed == symbols)
def test_custom_model2():
model_family = constriction.stream.model.CustomModel(
lambda x, a, b: 0.5 + 0.5 * np.tanh(a + x * b), lambda xi, a, b: xi, -100, 100)
symbols = np.array([-2, 1, 4], dtype=np.int32)
model_params1 = np.array([1, 10, -3], dtype=np.float64)
model_params2 = np.array([0.01, 0.04, 0.2], dtype=np.float64)
coder = constriction.stream.stack.AnsCoder() coder.encode_reverse(symbols, model_family, model_params1, model_params2)
print(coder.get_compressed())
reconstructed = coder.decode(model_family, model_params1, model_params2)
assert np.all(reconstructed == symbols)
def test_scipy_model1():
import scipy.stats
scipy_model = scipy.stats.cauchy(loc=6.7, scale=12.4)
model = constriction.stream.model.ScipyModel(scipy_model, -100, 100)
symbols = np.array([22, 14, 5, -3, 19, 7], dtype=np.int32)
coder = constriction.stream.stack.AnsCoder() coder.encode_reverse(symbols, model)
assert np.all(coder.get_compressed() == np.array(
[3569876501, 1944098], dtype=np.uint32))
reconstructed = coder.decode(model, 6) assert np.all(reconstructed == symbols)
def test_scipy_model2():
import scipy.stats
scipy_model_family = scipy.stats.cauchy
model_family = constriction.stream.model.ScipyModel(
scipy_model_family, -100, 100)
symbols = np.array([22, 14, 5, -3, 19, 7], dtype=np.int32)
locs = np.array([26.2, 10.9, 8.7, -6.3, 25.1, 8.9], dtype=np.float64)
scales = np.array([4.3, 7.4, 2.9, 4.1, 9.7, 3.4], dtype=np.float64)
coder = constriction.stream.stack.AnsCoder() coder.encode_reverse(symbols, model_family, locs, scales)
assert np.all(coder.get_compressed() == np.array(
[3493721376, 17526], dtype=np.uint32))
reconstructed = coder.decode(model_family, locs, scales)
assert np.all(reconstructed == symbols)
def test_custom_model_range():
def fixed_model_params():
model_scipy = scipy.stats.cauchy(loc=10.3, scale=5.8)
model = constriction.stream.model.CustomModel(
model_scipy.cdf, model_scipy.ppf, -100, 100)
symbols = np.array([5, 14, -1, 21], dtype=np.int32)
encoder = constriction.stream.queue.RangeEncoder()
encoder.encode(symbols, model)
compressed = encoder.get_compressed()
decoder = constriction.stream.queue.RangeDecoder(compressed)
assert np.all(decoder.decode(model, 4) == symbols)
def variable_model_params():
model = constriction.stream.model.CustomModel(
lambda x, loc, scale: scipy.stats.cauchy.cdf(x, loc, scale),
lambda x, loc, scale: scipy.stats.cauchy.ppf(x, loc, scale),
-100, 100)
model_parameters = np.array([
(7.3, 3.9), (11.5, 5.2), (-3.2, 4.9), (25.9, 7.1),
])
symbols = np.array([5, 14, -1, 21], dtype=np.int32)
encoder = constriction.stream.queue.RangeEncoder()
encoder.encode(
symbols, model, model_parameters[:, 0].copy(), model_parameters[:, 1].copy())
compressed = encoder.get_compressed()
decoder = constriction.stream.queue.RangeDecoder(compressed)
assert np.all(
decoder.decode(model, model_parameters[:, 0].copy(), model_parameters[:, 1].copy()) == symbols)
def discrete_distribution():
model = constriction.stream.model.CustomModel(
lambda x, params: scipy.stats.binom.cdf(x, n=10, p=params),
lambda x, params: scipy.stats.binom.ppf(x, n=10, p=params),
0, 10)
success_probabilities = np.array([0.3, 0.7, 0.2, 0.6])
symbols = np.array([4, 8, 1, 5], dtype=np.int32)
encoder = constriction.stream.queue.RangeEncoder()
encoder.encode(symbols, model, success_probabilities)
compressed = encoder.get_compressed()
decoder = constriction.stream.queue.RangeDecoder(compressed)
assert np.all(
decoder.decode(model, success_probabilities) == symbols)
fixed_model_params()
variable_model_params()
discrete_distribution()
def test_old_custom_model_chain():
compressed = np.array(
[0xa5dd25f7, 0xfaef49b5, 0xd5b12228, 0x156ceb98, 0x71a0a92b,
0x99e6d365, 0x2eebfadb, 0x404a567b, 0xf6cbdc09, 0xe63f3848],
dtype=np.uint32)
def fixed_model_params():
model_scipy = scipy.stats.cauchy(loc=10.3, scale=5.8)
model = constriction.stream.model.CustomModel(
model_scipy.cdf, model_scipy.ppf, -100, 100)
coder = constriction.stream.chain.ChainCoder(compressed, False, False)
symbols = coder.decode(model, 4)
assert np.all(symbols == np.array([18, 6, 33, 59]))
coder.encode_reverse(symbols, model)
assert np.all(np.hstack(coder.get_data()) == compressed)
def variable_model_params():
model = constriction.stream.model.CustomModel(
lambda x, loc, scale: scipy.stats.cauchy.cdf(x, loc, scale),
lambda x, loc, scale: scipy.stats.cauchy.ppf(x, loc, scale),
-100, 100)
model_parameters = np.array([
(7.3, 3.9), (11.5, 5.2), (-3.2, 4.9), (25.9, 7.1),
])
coder = constriction.stream.chain.ChainCoder(compressed, False, False)
symbols = coder.decode(
model, model_parameters[:, 0].copy(), model_parameters[:, 1].copy())
assert np.all(symbols == np.array([13, 7, 16, 85]))
coder.encode_reverse(
symbols, model, model_parameters[:, 0].copy(), model_parameters[:, 1].copy())
assert np.all(np.hstack(coder.get_data()) == compressed)
def discrete_distribution():
model = constriction.stream.model.CustomModel(
lambda x, params: scipy.stats.binom.cdf(x, n=10, p=params),
lambda x, params: scipy.stats.binom.ppf(x, n=10, p=params),
0, 10)
success_probabilities = np.array([0.3, 0.7, 0.2, 0.6])
coder = constriction.stream.chain.ChainCoder(compressed, False, False)
symbols = coder.decode(model, success_probabilities)
assert np.all(symbols == np.array([4, 6, 4, 9]))
coder.encode_reverse(
symbols, model, success_probabilities)
assert np.all(np.hstack(coder.get_data()) == compressed)
fixed_model_params()
variable_model_params()
discrete_distribution()
def test_huffman1():
probabils = np.array([0.3, 0.2, 0.4, 0.1], dtype=np.float64)
message = [1, 3, 2, 3, 0, 1, 3, 0, 2, 1, 1, 3, 3, 1, 2, 0, 1, 3, 1]
encoder = constriction.symbol.QueueEncoder()
encoder_codebook = constriction.symbol.huffman.EncoderHuffmanTree(
probabils)
for symbol in message:
encoder.encode_symbol(symbol, encoder_codebook)
compressed, bitrate = encoder.get_compressed()
print(compressed, bitrate) print(f"(in binary: {[bin(word) for word in compressed]}")
assert np.all(compressed == np.array([3756389791, 61358], dtype=np.uint32))
assert bitrate == 48
decoder = constriction.symbol.QueueDecoder(compressed)
decoded = []
decoder_codebook = constriction.symbol.huffman.DecoderHuffmanTree(
probabils)
for symbol in range(19):
decoded.append(decoder.decode_symbol(decoder_codebook))
assert decoded == message
def test_huffman2():
probabils = np.array([0.3, 0.2, 0.4, 0.1], dtype=np.float64)
message = [1, 3, 2, 3, 0, 1, 3, 0, 2, 1, 1, 3, 3, 1, 2, 0, 1, 3, 1]
coder = constriction.symbol.StackCoder()
encoder_codebook = constriction.symbol.huffman.EncoderHuffmanTree(
probabils)
for symbol in reversed(message): coder.encode_symbol(symbol, encoder_codebook)
compressed, bitrate = coder.get_compressed()
print(compressed, bitrate) print(f"(in binary: {[bin(word) for word in compressed]}")
assert np.all(compressed == np.array([2818274807, 129455], dtype=np.uint32))
assert bitrate == 48
decoded = []
decoder_codebook = constriction.symbol.huffman.DecoderHuffmanTree(
probabils)
for symbol in range(19):
decoded.append(coder.decode_symbol(decoder_codebook))
assert decoded == message