import gzip
import pytest
import numpy as np
import cramjam
import hashlib
def same_same(a, b):
return hashlib.md5(a).hexdigest() == hashlib.md5(b).hexdigest()
def test_has_version():
from cramjam import __version__
assert isinstance(__version__, str)
@pytest.mark.parametrize("is_bytearray", (True, False))
@pytest.mark.parametrize(
"variant_str", ("snappy", "brotli", "lz4", "gzip", "deflate", "zstd")
)
def test_variants_simple(variant_str, is_bytearray):
variant = getattr(cramjam, variant_str)
uncompressed = b"some bytes to compress 123" * 100000
if is_bytearray:
uncompressed = bytearray(uncompressed)
compressed = variant.compress(uncompressed)
assert compressed.read() != uncompressed
compressed.seek(0)
assert isinstance(compressed, cramjam.Buffer)
decompressed = variant.decompress(compressed, output_len=len(uncompressed))
assert decompressed.read() == uncompressed
assert isinstance(decompressed, cramjam.Buffer)
@pytest.mark.parametrize(
"variant_str", ("snappy", "brotli", "lz4", "gzip", "deflate", "zstd")
)
def test_variants_raise_exception(variant_str):
variant = getattr(cramjam, variant_str)
with pytest.raises(cramjam.DecompressionError):
variant.decompress(b"sknow")
@pytest.mark.parametrize(
"input_type", (bytes, bytearray, "numpy", cramjam.Buffer, cramjam.File)
)
@pytest.mark.parametrize(
"output_type", (bytes, bytearray, "numpy", cramjam.Buffer, cramjam.File)
)
@pytest.mark.parametrize(
"variant_str", ("snappy", "brotli", "gzip", "deflate", "zstd", "lz4")
)
def test_variants_compress_into(variant_str, input_type, output_type, tmpdir):
variant = getattr(cramjam, variant_str)
raw_data = b"oh what a beautiful morning, oh what a beautiful day!!" * 10000
if input_type == "numpy":
input = np.frombuffer(raw_data, dtype=np.uint8)
elif input_type == cramjam.File:
input = cramjam.File(str(tmpdir.join("input.txt")))
input.write(raw_data)
input.seek(0)
elif input_type == cramjam.Buffer:
input = cramjam.Buffer()
input.write(raw_data)
input.seek(0)
else:
input = input_type(raw_data)
compressed = variant.compress(raw_data)
compressed_len = len(compressed)
if output_type == "numpy":
output = np.zeros(compressed_len, dtype=np.uint8)
elif output_type == cramjam.File:
output = cramjam.File(str(tmpdir.join("output.txt")))
elif output_type == cramjam.Buffer:
output = cramjam.Buffer()
else:
output = output_type(b"0" * compressed_len)
n_bytes = variant.compress_into(input, output)
assert n_bytes == compressed_len
if hasattr(output, "read"):
output.seek(0)
output = output.read()
elif hasattr(output, "tobytes"):
output = output.tobytes()
else:
output = bytes(output)
assert same_same(output, compressed)
@pytest.mark.parametrize(
"input_type", (bytes, bytearray, "numpy", cramjam.Buffer, cramjam.File)
)
@pytest.mark.parametrize(
"output_type", (bytes, bytearray, "numpy", cramjam.Buffer, cramjam.File)
)
@pytest.mark.parametrize(
"variant_str", ("snappy", "brotli", "gzip", "deflate", "zstd", "lz4")
)
def test_variants_decompress_into(variant_str, input_type, output_type, tmpdir):
variant = getattr(cramjam, variant_str)
raw_data = b"oh what a beautiful morning, oh what a beautiful day!!" * 100
compressed = variant.compress(raw_data)
if input_type == "numpy":
input = np.frombuffer(compressed, dtype=np.uint8)
elif input_type == cramjam.File:
input = cramjam.File(str(tmpdir.join("input.txt")))
input.write(compressed)
input.seek(0)
elif input_type == cramjam.Buffer:
input = cramjam.Buffer()
input.write(compressed)
input.seek(0)
else:
input = input_type(compressed)
if output_type == "numpy":
output = np.zeros(len(raw_data), dtype=np.uint8)
elif output_type == cramjam.File:
output = cramjam.File(str(tmpdir.join("output.txt")))
elif output_type == cramjam.Buffer:
output = cramjam.Buffer()
else:
output = output_type(b"0" * len(raw_data))
n_bytes = variant.decompress_into(input, output)
assert n_bytes == len(raw_data)
if hasattr(output, "read"):
output.seek(0)
output = output.read()
elif hasattr(output, "tobytes"):
output = output.tobytes()
else:
output = bytes(output)
assert same_same(output, raw_data)
def test_variant_snappy_raw_into():
data = b"oh what a beautiful morning, oh what a beautiful day!!" * 1000000
compressed = cramjam.snappy.compress_raw(data)
compressed_size = cramjam.snappy.compress_raw_max_len(data)
compressed_buffer = np.zeros(compressed_size, dtype=np.uint8)
n_bytes = cramjam.snappy.compress_raw_into(data, compressed_buffer)
assert n_bytes == len(compressed)
decompressed_buffer = np.zeros(len(data), dtype=np.uint8)
n_bytes = cramjam.snappy.decompress_raw_into(
compressed_buffer[:n_bytes].tobytes(), decompressed_buffer
)
assert n_bytes == len(data)
assert same_same(decompressed_buffer[:n_bytes], data)
@pytest.mark.parametrize("Obj", (cramjam.File, cramjam.Buffer))
def test_dunders(Obj, tmpdir):
if Obj == cramjam.File:
path = str(tmpdir.join("tmp.txt"))
obj = Obj(path)
else:
obj = Obj()
assert len(obj) == 0
assert bool(obj) is False
obj.write(b"12345")
assert len(obj) == 5
assert bool(obj) is True
assert "len=5" in str(obj)
if isinstance(obj, cramjam.File):
assert f"path={path}" in str(obj)
@pytest.mark.parametrize(
"compress_kwargs",
(
dict(mode="default", acceleration=1, compression=1, store_size=True),
dict(mode="fast", acceleration=2, compression=2, store_size=False),
dict(mode="high_compression", acceleration=3, compression=3, store_size=True),
dict(mode="default", acceleration=5, compression=4, store_size=False),
),
)
def test_lz4_block(compress_kwargs):
from cramjam import lz4
data = b"howdy neighbor"
expected = b"\x0e\x00\x00\x00\xe0howdy neighbor"
assert bytes(lz4.compress_block(data)) == expected
expected = b"\xe0howdy neighbor"
assert bytes(lz4.compress_block(data, store_size=False)) == expected
out = lz4.decompress_block(
lz4.compress_block(data, **compress_kwargs),
output_len=len(data) if not compress_kwargs["store_size"] else None,
)
assert bytes(out) == data
def test_gzip_multiple_streams():
out1 = gzip.compress(b"foo")
out2 = gzip.compress(b"bar")
assert gzip.decompress(out1 + out2) == b"foobar"
out = bytes(cramjam.gzip.decompress(out1 + out2))
assert out == b"foobar"
o1 = bytes(cramjam.gzip.compress(b"foo"))
o2 = bytes(cramjam.gzip.compress(b"bar"))
out = bytes(cramjam.gzip.decompress(o1 + o2))
assert out == b"foobar"
@pytest.mark.parametrize(
"mod",
(
cramjam.brotli,
cramjam.deflate,
cramjam.gzip,
cramjam.lz4,
cramjam.snappy,
cramjam.zstd,
),
)
def test_streams_compressor(mod):
compressor = mod.Compressor()
compressor.compress(b"foo")
out = bytes(compressor.flush())
compressor.compress(b"bar")
out += bytes(compressor.flush())
out += bytes(compressor.finish())
decompressed = mod.decompress(out)
assert bytes(decompressed) == b"foobar"
assert bytes(compressor.finish()) == b""
with pytest.raises(cramjam.CompressionError):
compressor.compress(b"data")