import pytest
from s4_codec import (
CpuGzip,
CpuZstd,
S4Error,
S4SizeMismatchError,
gpu_available,
)
def test_cpu_zstd_roundtrip():
codec = CpuZstd(level=3)
data = b"hello squished s3 " * 1000
compressed, orig_size, crc = codec.compress(data)
assert orig_size == len(data)
assert isinstance(crc, int)
roundtrip = codec.decompress(compressed, orig_size, crc)
assert roundtrip == data
def test_cpu_zstd_default_level():
codec = CpuZstd() data = b"x" * 1024
compressed, orig_size, crc = codec.compress(data)
assert codec.decompress(compressed, orig_size, crc) == data
def test_cpu_gzip_produces_valid_gzip_magic():
codec = CpuGzip(level=6)
data = b"test data" * 100
compressed, _orig_size, _crc = codec.compress(data)
assert compressed[:2] == b"\x1f\x8b", "gzip output must start with 1f 8b"
def test_cpu_gzip_roundtrip_through_python_gzip_module():
import gzip
codec = CpuGzip(level=6)
data = b"compatibility check " * 100
compressed, _orig_size, _crc = codec.compress(data)
decompressed = gzip.decompress(compressed)
assert decompressed == data
def test_gpu_available_returns_bool():
result = gpu_available()
assert isinstance(result, bool)
def test_corrupted_decompress_raises_specific_error():
codec = CpuZstd()
data = b"hello world " * 100
compressed, orig_size, crc = codec.compress(data)
tampered = bytearray(compressed)
if len(tampered) > 10:
tampered[5] ^= 0xFF
with pytest.raises((S4Error, OSError, RuntimeError)):
codec.decompress(bytes(tampered), orig_size, crc)
def test_decompress_wrong_size_raises_size_mismatch():
codec = CpuZstd()
data = b"foo" * 100
compressed, orig_size, crc = codec.compress(data)
with pytest.raises(S4SizeMismatchError):
codec.decompress(compressed, orig_size + 1, crc)
def test_compress_releases_gil():
import threading
import time
codec = CpuZstd()
big_data = b"x" * (50 * 1024 * 1024)
progress = []
def background():
for _ in range(5):
time.sleep(0.01)
progress.append(time.time())
t = threading.Thread(target=background)
t.start()
codec.compress(big_data) t.join()
assert len(progress) == 5
def test_module_version_matches_workspace():
import s4_codec
v = s4_codec.__version__
parts = v.split(".")
major, minor = int(parts[0]), int(parts[1])
assert (major, minor) >= (0, 8), (
f"binding version {v} < 0.8 — workspace inheritance broken?"
)
def test_exception_class_hierarchy():
from s4_codec import (
S4BackendError,
S4CodecMismatchError,
S4CrcMismatchError,
S4IoError,
S4ManifestSizeExceedsLimitError,
S4ManifestSizeMismatchError,
S4UnregisteredCodecError,
)
assert issubclass(S4CrcMismatchError, S4Error)
assert issubclass(S4SizeMismatchError, S4Error)
assert issubclass(S4CodecMismatchError, S4Error)
assert issubclass(S4UnregisteredCodecError, S4Error)
assert issubclass(S4ManifestSizeExceedsLimitError, S4Error)
assert issubclass(S4ManifestSizeMismatchError, S4Error)
assert issubclass(S4Error, ValueError)
assert issubclass(S4BackendError, RuntimeError)
assert issubclass(S4IoError, OSError) assert not issubclass(S4BackendError, S4Error)
assert not issubclass(S4IoError, S4Error)