import constriction
import numpy as np
import sys
import scipy
def test_module_example3():
message = np.array(
[6, 10, -4, 2, 5, 2, 1, 0, 2], dtype=np.int32)
means = np.array([2.3, 6.1, -8.5, 4.1, 1.3], dtype=np.float64)
stds = np.array([6.2, 5.3, 3.8, 3.2, 4.7], dtype=np.float64)
entropy_model1 = constriction.stream.model.QuantizedGaussian(-50, 50)
entropy_model2 = constriction.stream.model.Categorical(
np.array([0.2, 0.5, 0.3], dtype=np.float32), lazy=True
)
encoder = constriction.stream.queue.RangeEncoder()
encoder.encode(message[0:5], entropy_model1, means, stds)
encoder.encode(message[5:9], entropy_model2)
compressed = encoder.get_compressed()
print(f"compressed representation: {compressed}")
print(f"(in binary: {[bin(word) for word in compressed]})")
assert np.all(compressed == np.array([3176507208], dtype=np.uint32))
decoder = constriction.stream.queue.RangeDecoder(compressed)
decoded_part1 = decoder.decode(entropy_model1, means, stds)
decoded_part2 = decoder.decode(entropy_model2, 4)
assert np.all(np.concatenate((decoded_part1, decoded_part2)) == message)
def test_chain2():
data = np.array(
[0x80d14131, 0xdda97c6c, 0x5017a640, 0x01170a3e], np.uint32)
probabilities = np.array(
[[0.1, 0.7, 0.1, 0.1], [0.2, 0.2, 0.1, 0.5], [0.2, 0.1, 0.4, 0.3]]) model_family = constriction.stream.model.Categorical(lazy=True)
ansCoder = constriction.stream.stack.AnsCoder(data, seal=True)
assert np.all(ansCoder.decode(model_family, probabilities)
== np.array([0, 0, 2], dtype=np.int32))
probabilities[0, :] = np.array([0.09, 0.71, 0.1, 0.1])
ansCoder = constriction.stream.stack.AnsCoder(data, seal=True)
assert np.all(ansCoder.decode(model_family, probabilities)
== np.array([1, 0, 0], dtype=np.int32))
def test_chain3():
data = np.array(
[0x80d14131, 0xdda97c6c, 0x5017a640, 0x01170a3e], np.uint32)
probabilities = np.array(
[[0.1, 0.7, 0.1, 0.1],
[0.2, 0.2, 0.1, 0.5],
[0.2, 0.1, 0.4, 0.3]])
model_family = constriction.stream.model.Categorical(lazy=True)
chainCoder = constriction.stream.chain.ChainCoder(data, seal=True)
assert np.all(chainCoder.decode(model_family, probabilities)
== np.array([0, 3, 3], dtype=np.int32))
probabilities[0, :] = np.array([0.09, 0.71, 0.1, 0.1])
chainCoder = constriction.stream.chain.ChainCoder(data, seal=True)
assert np.all(chainCoder.decode(model_family, probabilities)
== np.array([1, 3, 3], dtype=np.int32))
def test_stack1():
message_part1 = np.array([1, 2, 0, 3, 2, 3, 0], dtype=np.int32)
probabilities_part1 = np.array([0.2, 0.4, 0.1, 0.3], dtype=np.float64)
model_part1 = constriction.stream.model.Categorical(probabilities_part1, lazy=True)
message_part2 = np.array([6, 10, -4, 2], dtype=np.int32)
means_part2 = np.array([2.5, 13.1, -1.1, -3.0], dtype=np.float64)
stds_part2 = np.array([4.1, 8.7, 6.2, 5.4], dtype=np.float64)
model_family_part2 = constriction.stream.model.QuantizedGaussian(-100, 100)
print(
f"Original message: {np.concatenate([message_part1, message_part2])}")
coder = constriction.stream.stack.AnsCoder()
coder.encode_reverse(
message_part2, model_family_part2, means_part2, stds_part2)
coder.encode_reverse(message_part1, model_part1)
compressed = coder.get_compressed()
print(f"compressed representation: {compressed}")
print(f"(in binary: {[bin(word) for word in compressed]})")
decoded_part1 = coder.decode(model_part1, 7) decoded_part2 = coder.decode(model_family_part2, means_part2, stds_part2)
print(f"Decoded message: {np.concatenate([decoded_part1, decoded_part2])}")
assert np.all(decoded_part1 == message_part1)
assert np.all(decoded_part2 == message_part2)
def test_ans_decode1():
probabilities = np.array([0.1, 0.6, 0.3], dtype=np.float64)
model = constriction.stream.model.Categorical(probabilities, lazy=True)
compressed = np.array([2514924296, 114], dtype=np.uint32)
coder = constriction.stream.stack.AnsCoder(compressed)
symbol = coder.decode(model)
assert symbol == 2
def test_ans_decode2():
probabilities = np.array([0.1, 0.6, 0.3], dtype=np.float64)
model = constriction.stream.model.Categorical(probabilities, lazy=True)
compressed = np.array([1441153686, 108], dtype=np.uint32)
coder = constriction.stream.stack.AnsCoder(compressed)
symbols = coder.decode(model, 9)
assert np.all(symbols == np.array(
[2, 0, 0, 1, 2, 2, 1, 2, 2], dtype=np.int32))
def test_ans_decode4():
probabilities = np.array(
[[0.1, 0.2, 0.3, 0.1, 0.3], [0.3, 0.2, 0.2, 0.2, 0.1]], dtype=np.float64)
model_family = constriction.stream.model.Categorical(lazy=True)
compressed = np.array([2142112014, 31], dtype=np.uint32)
coder = constriction.stream.stack.AnsCoder(compressed)
symbols = coder.decode(model_family, probabilities)
assert np.all(symbols == np.array([3, 1], dtype=np.int32))
def test_ans_encode_reverse1():
probabilities = np.array([0.1, 0.6, 0.3], dtype=np.float64)
model = constriction.stream.model.Categorical(probabilities, lazy=True)
coder = constriction.stream.stack.AnsCoder()
coder.encode_reverse(2, model)
def test_ans_encode_reverse2():
probabilities = np.array([0.1, 0.6, 0.3], dtype=np.float64)
model = constriction.stream.model.Categorical(probabilities, lazy=True)
symbols = np.array([0, 2, 1, 2, 0, 2, 0, 2, 1], dtype=np.int32)
coder = constriction.stream.stack.AnsCoder()
coder.encode_reverse(symbols, model)
assert np.all(coder.get_compressed() == np.array(
[1276728145, 172], dtype=np.uint32))
def test_ans_encode_reverse4():
probabilities = np.array(
[[0.1, 0.2, 0.3, 0.1, 0.3], [0.3, 0.2, 0.2, 0.2, 0.1]], dtype=np.float64)
model_family = constriction.stream.model.Categorical(lazy=True)
symbols = np.array([3, 1], dtype=np.int32)
coder = constriction.stream.stack.AnsCoder()
coder.encode_reverse(symbols, model_family, probabilities)
assert np.all(coder.get_compressed() == np.array(
[45298481], dtype=np.uint32))
def test_ans_seek():
probabilities = np.array([0.2, 0.4, 0.1, 0.3], dtype=np.float64)
model = constriction.stream.model.Categorical(probabilities, lazy=True)
message_part1 = np.array([1, 2, 0, 3, 2, 3, 0], dtype=np.int32)
message_part2 = np.array([2, 2, 0, 1, 3], dtype=np.int32)
coder = constriction.stream.stack.AnsCoder()
coder.encode_reverse(message_part2, model)
(position, state) = coder.pos() coder.encode_reverse(message_part1, model)
assert coder.decode(model) == 1
coder.seek(position, state)
decoded_part2 = coder.decode(model, 5)
assert np.all(decoded_part2 == message_part2)
def test_range_coding_mod():
message_part1 = np.array([1, 2, 0, 3, 2, 3, 0], dtype=np.int32)
probabilities_part1 = np.array([0.2, 0.4, 0.1, 0.3], dtype=np.float64)
model_part1 = constriction.stream.model.Categorical(probabilities_part1, lazy=True)
message_part2 = np.array([6, 10, -4, 2], dtype=np.int32)
means_part2 = np.array([2.5, 13.1, -1.1, -3.0], dtype=np.float64)
stds_part2 = np.array([4.1, 8.7, 6.2, 5.4], dtype=np.float64)
model_family_part2 = constriction.stream.model.QuantizedGaussian(-100, 100)
print(
f"Original message: {np.concatenate([message_part1, message_part2])}")
encoder = constriction.stream.queue.RangeEncoder()
encoder.encode(message_part1, model_part1)
encoder.encode(message_part2, model_family_part2, means_part2, stds_part2)
compressed = encoder.get_compressed()
print(f"compressed representation: {compressed}")
print(f"(in binary: {[bin(word) for word in compressed]})")
decoder = constriction.stream.queue.RangeDecoder(compressed)
decoded_part1 = decoder.decode(model_part1, 7) decoded_part2 = decoder.decode(model_family_part2, means_part2, stds_part2)
print(f"Decoded message: {np.concatenate([decoded_part1, decoded_part2])}")
assert np.all(decoded_part1 == message_part1)
assert np.all(decoded_part2 == message_part2)
def test_range_coder_encode1():
probabilities = np.array([0.1, 0.6, 0.3], dtype=np.float64)
model = constriction.stream.model.Categorical(probabilities, lazy=True)
encoder = constriction.stream.queue.RangeEncoder()
encoder.encode(2, model)
def test_range_coder_encode2():
probabilities = np.array([0.1, 0.6, 0.3], dtype=np.float64)
model = constriction.stream.model.Categorical(probabilities, lazy=True)
symbols = np.array([0, 2, 1, 2, 0, 2, 0, 2, 1], dtype=np.int32)
encoder = constriction.stream.queue.RangeEncoder()
encoder.encode(symbols, model)
assert np.all(encoder.get_compressed() ==
np.array([369323576], dtype=np.uint32))
def test_range_coder_encode4():
probabilities = np.array(
[[0.1, 0.2, 0.3, 0.1, 0.3], [0.3, 0.2, 0.2, 0.2, 0.1]], dtype=np.float64)
model_family = constriction.stream.model.Categorical(lazy=True)
symbols = np.array([3, 1], dtype=np.int32)
encoder = constriction.stream.queue.RangeEncoder()
encoder.encode(symbols, model_family, probabilities)
assert np.all(encoder.get_compressed() ==
np.array([2705829254], dtype=np.uint32))
def test_range_coding_decode1():
probabilities = np.array([0.1, 0.6, 0.3], dtype=np.float64)
model = constriction.stream.model.Categorical(probabilities, lazy=True)
compressed = np.array([3089773345, 1894195597], dtype=np.uint32)
decoder = constriction.stream.queue.RangeDecoder(compressed)
symbol = decoder.decode(model)
assert symbol == 2
def test_range_coding_decode2():
probabilities = np.array([0.1, 0.6, 0.3], dtype=np.float64)
model = constriction.stream.model.Categorical(probabilities, lazy=True)
compressed = np.array([369323576], dtype=np.uint32)
decoder = constriction.stream.queue.RangeDecoder(compressed)
symbols = decoder.decode(model, 9)
assert np.all(symbols == np.array(
[0, 2, 1, 2, 0, 2, 0, 2, 1], dtype=np.int32))
def test_range_coding_seek():
probabilities = np.array([0.2, 0.4, 0.1, 0.3], dtype=np.float64)
model = constriction.stream.model.Categorical(probabilities, lazy=True)
message_part1 = np.array([1, 2, 0, 3, 2, 3, 0], dtype=np.int32)
message_part2 = np.array([2, 2, 0, 1, 3], dtype=np.int32)
encoder = constriction.stream.queue.RangeEncoder()
encoder.encode(message_part1, model)
(position, state) = encoder.pos() encoder.encode(message_part2, model)
compressed = encoder.get_compressed()
decoder = constriction.stream.queue.RangeDecoder(compressed)
assert decoder.decode(model) == 1
decoder.seek(position, state)
decoded_part2 = decoder.decode(model, 5)
assert np.all(decoded_part2 == message_part2)
def test_range_coding_decode4():
probabilities = np.array(
[[0.1, 0.2, 0.3, 0.1, 0.3], [0.3, 0.2, 0.2, 0.2, 0.1]], dtype=np.float64)
model_family = constriction.stream.model.Categorical(lazy=True)
compressed = np.array([2705829535], dtype=np.uint32)
decoder = constriction.stream.queue.RangeDecoder(compressed)
symbols = decoder.decode(model_family, probabilities)
assert np.all(symbols == np.array([3, 1], dtype=np.int32))
def test_categorical1():
probabilities = np.array([0.2, 0.4, 0.1, 0.3], dtype=np.float64)
model = constriction.stream.model.Categorical(probabilities, lazy=True)
symbols = np.array([0, 3, 2, 3, 2, 0, 2, 1], dtype=np.int32)
coder = constriction.stream.stack.AnsCoder() coder.encode_reverse(symbols, model)
assert np.all(coder.get_compressed() == np.array(
[488222996, 175], dtype=np.uint32))
reconstructed = coder.decode(model, 8) assert np.all(reconstructed == symbols)
def test_categorical2():
model_family = constriction.stream.model.Categorical(lazy=True)
probabilities = np.array(
[[0.3, 0.1, 0.1, 0.3, 0.2], [0.1, 0.4, 0.2, 0.1, 0.2], [0.4, 0.2, 0.1, 0.2, 0.1]], dtype=np.float64)
symbols = np.array([0, 4, 1], dtype=np.int32)
coder = constriction.stream.stack.AnsCoder() coder.encode_reverse(symbols, model_family, probabilities)
assert np.all(coder.get_compressed() == np.array(
[104018741], dtype=np.uint32))
reconstructed = coder.decode(model_family, probabilities)
assert np.all(reconstructed == symbols)