import pytest
import io
import tempfile
import os
import time
from pathlib import Path
from mrrc import MARCReader, MARCWriter
class TestWritingBenchmarks:
@pytest.mark.benchmark
def test_roundtrip_1k_records(self, benchmark, fixture_1k):
def roundtrip():
data = io.BytesIO(fixture_1k)
reader = MARCReader(data)
records = []
while record := reader.read_record():
records.append(record)
output = io.BytesIO()
writer = MARCWriter(output)
for record in records:
writer.write_record(record)
return output.getvalue()
result = benchmark(roundtrip)
assert len(result) > 0
assert len(result) > len(fixture_1k) * 0.9
@pytest.mark.benchmark
def test_write_only_1k_records(self, benchmark, fixture_1k):
data = io.BytesIO(fixture_1k)
reader = MARCReader(data)
records = []
while record := reader.read_record():
records.append(record)
def write_all():
output = io.BytesIO()
writer = MARCWriter(output)
for record in records:
writer.write_record(record)
return output.getvalue()
result = benchmark(write_all)
assert len(result) > 0
@pytest.mark.benchmark
def test_write_only_10k_records(self, benchmark, fixture_10k):
data = io.BytesIO(fixture_10k)
reader = MARCReader(data)
records = []
while record := reader.read_record():
records.append(record)
def write_all():
output = io.BytesIO()
writer = MARCWriter(output)
for record in records:
writer.write_record(record)
return output.getvalue()
result = benchmark(write_all)
assert len(result) > 0
class TestIncrementalWriting:
@pytest.mark.benchmark
def test_stream_write_1k(self, benchmark, fixture_1k):
def stream_and_write():
data = io.BytesIO(fixture_1k)
input_reader = MARCReader(data)
output = io.BytesIO()
writer = MARCWriter(output)
count = 0
while record := input_reader.read_record():
writer.write_record(record)
count += 1
return output.getvalue(), count
result, count = benchmark(stream_and_write)
assert count == 1000
assert len(result) > 0
class TestRustFileBackendBenchmarks:
@pytest.mark.benchmark
def test_write_only_1k_rustfile(self, benchmark, fixture_1k):
data = io.BytesIO(fixture_1k)
reader = MARCReader(data)
records = []
while record := reader.read_record():
records.append(record)
def write_all():
with tempfile.NamedTemporaryFile(delete=False, suffix='.mrc') as tmp:
temp_path = tmp.name
try:
writer = MARCWriter(temp_path)
for record in records:
writer.write_record(record)
writer.close()
file_size = os.path.getsize(temp_path)
return file_size
finally:
if os.path.exists(temp_path):
os.unlink(temp_path)
result = benchmark(write_all)
assert result > 0
@pytest.mark.benchmark
def test_write_only_10k_rustfile(self, benchmark, fixture_10k):
data = io.BytesIO(fixture_10k)
reader = MARCReader(data)
records = []
while record := reader.read_record():
records.append(record)
def write_all():
with tempfile.NamedTemporaryFile(delete=False, suffix='.mrc') as tmp:
temp_path = tmp.name
try:
writer = MARCWriter(temp_path)
for record in records:
writer.write_record(record)
writer.close()
file_size = os.path.getsize(temp_path)
return file_size
finally:
if os.path.exists(temp_path):
os.unlink(temp_path)
result = benchmark(write_all)
assert result > 0
@pytest.mark.benchmark
def test_write_pathlib_1k_rustfile(self, benchmark, fixture_1k):
data = io.BytesIO(fixture_1k)
reader = MARCReader(data)
records = []
while record := reader.read_record():
records.append(record)
def write_all():
with tempfile.NamedTemporaryFile(delete=False, suffix='.mrc') as tmp:
temp_path = Path(tmp.name)
try:
writer = MARCWriter(temp_path)
for record in records:
writer.write_record(record)
writer.close()
file_size = temp_path.stat().st_size
return file_size
finally:
if temp_path.exists():
temp_path.unlink()
result = benchmark(write_all)
assert result > 0
class TestBackendComparison:
@pytest.mark.benchmark
def test_backend_comparison_1k(self, fixture_1k):
data = io.BytesIO(fixture_1k)
reader = MARCReader(data)
records = []
while record := reader.read_record():
records.append(record)
pythonfile_times = []
for _ in range(5):
start = time.perf_counter()
output = io.BytesIO()
writer = MARCWriter(output)
for record in records:
writer.write_record(record)
writer.close()
elapsed = time.perf_counter() - start
pythonfile_times.append(elapsed)
rustfile_times = []
for _ in range(5):
with tempfile.NamedTemporaryFile(delete=False, suffix='.mrc') as tmp:
temp_path = tmp.name
try:
start = time.perf_counter()
writer = MARCWriter(temp_path)
for record in records:
writer.write_record(record)
writer.close()
elapsed = time.perf_counter() - start
rustfile_times.append(elapsed)
finally:
if os.path.exists(temp_path):
os.unlink(temp_path)
median_pythonfile = sorted(pythonfile_times)[len(pythonfile_times) // 2]
median_rustfile = sorted(rustfile_times)[len(rustfile_times) // 2]
speedup = median_pythonfile / median_rustfile
print("\n1k records benchmark:")
print(f" PythonFile (BytesIO): {median_pythonfile*1000:.2f}ms")
print(f" RustFile (temp file): {median_rustfile*1000:.2f}ms")
print(f" Speedup ratio: {speedup:.2f}x")