import os
import tempfile
import pytest
import polars as pl
pyreadstat = pytest.importorskip("pyreadstat")
import ambers
PASSED = 0
FAILED = 0
def check(name, condition, detail=""):
global PASSED, FAILED
if condition:
PASSED += 1
print(f" PASS: {name}")
else:
FAILED += 1
print(f" FAIL: {name} — {detail}")
def _test_single_long_string(length, label):
print(f"\n--- {label}: single column, {length} chars ---")
value = "A" * length
df_write = pl.DataFrame({"LongStr": [value, value[:length // 2], "short"]})
with tempfile.NamedTemporaryFile(suffix=".sav", delete=False) as f:
path = f.name
try:
pyreadstat.write_sav(df_write, path)
sav = ambers.read_sav(path)
df_ambers, meta_ambers = sav.data, sav.meta
check(
"ambers column count",
df_ambers.width == 1,
f"expected 1 column, got {df_ambers.width}: {df_ambers.columns}",
)
check("ambers row count", df_ambers.height == 3, f"got {df_ambers.height}")
ambers_val = df_ambers["LongStr"][0]
check(
f"ambers string length (row 0)",
len(ambers_val) == length,
f"expected {length}, got {len(ambers_val)}",
)
check(
"ambers string content (row 0)",
ambers_val == value,
f"first 50 chars: {ambers_val[:50]}...",
)
ambers_val1 = df_ambers["LongStr"][1]
expected1 = value[:length // 2]
check(
f"ambers string length (row 1)",
len(ambers_val1) == len(expected1),
f"expected {len(expected1)}, got {len(ambers_val1)}",
)
ambers_val2 = df_ambers["LongStr"][2]
check(
"ambers short string (row 2)",
ambers_val2 == "short",
f"got: '{ambers_val2}'",
)
finally:
os.unlink(path)
def test_multiple_long_strings():
print("\n--- Multiple long string columns + numerics ---")
df_write = pl.DataFrame({
"id": [1.0, 2.0, 3.0],
"score": [99.5, 88.3, 77.1],
"short_str": ["hello", "world", "test"],
"str_255": ["B" * 255] * 3,
"str_500": ["C" * 500] * 3,
"str_1000": ["D" * 1000] * 3,
"another_num": [10.0, 20.0, 30.0],
})
with tempfile.NamedTemporaryFile(suffix=".sav", delete=False) as f:
path = f.name
try:
pyreadstat.write_sav(df_write, path)
sav = ambers.read_sav(path)
df_ambers, meta = sav.data, sav.meta
check(
"ambers column count (mixed)",
df_ambers.width == 7,
f"expected 7, got {df_ambers.width}: {df_ambers.columns}",
)
expected_cols = sorted(["id", "score", "short_str", "str_255", "str_500", "str_1000", "another_num"])
ambers_cols = sorted(df_ambers.columns)
check(
"column names match",
ambers_cols == expected_cols,
f"ambers={ambers_cols}, expected={expected_cols}",
)
check(
"numeric id preserved",
list(df_ambers["id"]) == [1.0, 2.0, 3.0],
f"got {list(df_ambers['id'])}",
)
check(
"numeric score preserved",
list(df_ambers["score"]) == [99.5, 88.3, 77.1],
f"got {list(df_ambers['score'])}",
)
for col, char, length in [
("short_str", None, None),
("str_255", "B", 255),
("str_500", "C", 500),
("str_1000", "D", 1000),
]:
if length:
val = df_ambers[col][0]
check(
f"{col} length",
len(val) == length,
f"expected {length}, got {len(val)}",
)
check(
f"{col} content",
val == char * length,
f"first 30 chars: {val[:30]}",
)
else:
check(
f"{col} value",
df_ambers[col][0] == "hello",
f"got: '{df_ambers[col][0]}'",
)
finally:
os.unlink(path)
def test_issue_119_reproduction():
print("\n--- Issue #119 reproduction (reading only) ---")
lorem = (
"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do "
"eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim "
"ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut "
"aliquip ex ea commodo consequat. Duis aute irure dolor in "
"reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla "
"pariatur. Excepteur sint occaecat cupidatat non proident, sunt in "
"culpa qui officia deserunt mollit anim id est laborum."
)
df_write = pl.DataFrame({
"LongString1": [lorem],
"LongString2": [lorem + " " + lorem],
})
with tempfile.NamedTemporaryFile(suffix=".sav", delete=False) as f:
path = f.name
try:
pyreadstat.write_sav(df_write, path)
sav = ambers.read_sav(path)
df_ambers, meta = sav.data, sav.meta
check(
"ambers: 2 columns (not split)",
df_ambers.width == 2,
f"got {df_ambers.width}: {df_ambers.columns}",
)
val1 = df_ambers["LongString1"][0]
check(
"LongString1 matches",
val1 == lorem,
f"len={len(val1)}, expected={len(lorem)}",
)
expected2 = lorem + " " + lorem
val2 = df_ambers["LongString2"][0]
check(
"LongString2 matches",
val2 == expected2,
f"len={len(val2)}, expected={len(expected2)}",
)
finally:
os.unlink(path)
def test_boundary_504_505():
print("\n--- Boundary test: 504 vs 505 chars (issue #119 comment) ---")
columns = [
"so3_10_9_1", "so3_10_10_1", "so3_10_11_1", "so3_10_12_1",
"so3_10_13_1", "so3_10_14_1", "so3_10_15_1", "so3_10_16_1",
"so3_10_17_1", "so3_10_18_1", "so3_10_19_1", "so3_10_20_1",
"so3_10_96opn", "so3_10_97opn", "so3_10_98opn",
]
data = {}
for col in columns:
data[col] = [""]
data["so3_10_98opn"] = ["a" * 505]
data["so3_10_97opn"] = ["a" * 504]
data["so3_10_96opn"] = ["a" * 503]
df_write = pl.DataFrame(data)
with tempfile.NamedTemporaryFile(suffix=".sav", delete=False) as f:
path = f.name
try:
pyreadstat.write_sav(df_write, path)
sav = ambers.read_sav(path)
df_ambers, meta = sav.data, sav.meta
check(
"ambers: 15 columns",
df_ambers.width == len(columns),
f"got {df_ambers.width}: {df_ambers.columns}",
)
for col, expected_len in [
("so3_10_96opn", 503),
("so3_10_97opn", 504),
("so3_10_98opn", 505),
]:
val = df_ambers[col][0]
if val is None:
check(f"{col} not null", False, "got None")
continue
check(
f"{col} length={expected_len}",
len(val) == expected_len,
f"got {len(val)}",
)
check(
f"{col} content",
val == "a" * expected_len,
f"first 30: {val[:30]}",
)
finally:
os.unlink(path)
def main():
global PASSED, FAILED
print("=" * 60)
print("LONG STRING VARIABLE PRESSURE TEST")
print("(pyreadstat issue #119)")
print("=" * 60)
_test_single_long_string(255, "255 chars (max normal)")
_test_single_long_string(256, "256 chars (min VLS)")
_test_single_long_string(504, "504 chars (issue boundary)")
_test_single_long_string(505, "505 chars (issue boundary)")
_test_single_long_string(1000, "1000 chars (multi-segment)")
test_multiple_long_strings()
test_issue_119_reproduction()
test_boundary_504_505()
total = PASSED + FAILED
print(f"\n{'=' * 60}")
print(f"RESULTS: {PASSED}/{total} passed, {FAILED} failed")
print(f"{'=' * 60}")
if FAILED > 0:
exit(1)
if __name__ == "__main__":
main()