import pytest
import io
import time
import tempfile
import os
from concurrent.futures import ThreadPoolExecutor
from mrrc import MARCReader, MARCWriter
class TestSequentialBaseline:
def test_sequential_bytesio_baseline(self, fixture_10k):
reader = MARCReader(io.BytesIO(fixture_10k))
records = list(reader)
assert len(records) == 10000
output = io.BytesIO()
writer = MARCWriter(output)
for record in records[:100]:
writer.write_record(record)
writer.close()
times = []
for _ in range(3):
start = time.perf_counter()
output = io.BytesIO()
writer = MARCWriter(output)
for record in records:
writer.write_record(record)
writer.close()
elapsed = time.perf_counter() - start
times.append(elapsed)
avg_time = sum(times) / len(times)
print(f"\nBaseline (BytesIO, 10k records): {avg_time*1000:.2f}ms")
assert avg_time > 0
def test_sequential_rustfile_baseline(self, fixture_10k):
reader = MARCReader(io.BytesIO(fixture_10k))
records = list(reader)
assert len(records) == 10000
with tempfile.NamedTemporaryFile(delete=False, suffix='.mrc') as tmp:
temp_path = tmp.name
try:
writer = MARCWriter(temp_path)
for record in records[:100]:
writer.write_record(record)
writer.close()
finally:
if os.path.exists(temp_path):
os.unlink(temp_path)
times = []
for _ in range(3):
with tempfile.NamedTemporaryFile(delete=False, suffix='.mrc') as tmp:
temp_path = tmp.name
try:
start = time.perf_counter()
writer = MARCWriter(temp_path)
for record in records:
writer.write_record(record)
writer.close()
elapsed = time.perf_counter() - start
times.append(elapsed)
finally:
if os.path.exists(temp_path):
os.unlink(temp_path)
avg_time = sum(times) / len(times)
print(f"RustFile (file I/O, 10k records): {avg_time*1000:.2f}ms")
assert avg_time > 0
def test_sequential_baseline_comparison(self, fixture_10k):
reader = MARCReader(io.BytesIO(fixture_10k))
records = list(reader)
times_bytesio = []
for _ in range(3):
start = time.perf_counter()
output = io.BytesIO()
writer = MARCWriter(output)
for record in records:
writer.write_record(record)
writer.close()
times_bytesio.append(time.perf_counter() - start)
times_rustfile = []
for _ in range(3):
with tempfile.NamedTemporaryFile(delete=False, suffix='.mrc') as tmp:
temp_path = tmp.name
try:
start = time.perf_counter()
writer = MARCWriter(temp_path)
for record in records:
writer.write_record(record)
writer.close()
times_rustfile.append(time.perf_counter() - start)
finally:
if os.path.exists(temp_path):
os.unlink(temp_path)
avg_bytesio = sum(times_bytesio) / len(times_bytesio)
avg_rustfile = sum(times_rustfile) / len(times_rustfile)
ratio = avg_bytesio / avg_rustfile
print("\nSequential Comparison (10k records):")
print(f" BytesIO: {avg_bytesio*1000:.2f}ms")
print(f" RustFile: {avg_rustfile*1000:.2f}ms")
print(f" Ratio: {ratio:.2f}x (1.0 = same speed)")
class TestConcurrentPerformance:
def test_concurrent_2thread_speedup(self, fixture_5k):
reader = MARCReader(io.BytesIO(fixture_5k))
records = list(reader)
assert len(records) == 5000
times_sequential = []
for _ in range(3):
start = time.perf_counter()
with tempfile.NamedTemporaryFile(delete=False, suffix='.mrc') as tmp:
temp_path = tmp.name
try:
writer = MARCWriter(temp_path)
for record in records:
writer.write_record(record)
writer.close()
finally:
if os.path.exists(temp_path):
os.unlink(temp_path)
times_sequential.append(time.perf_counter() - start)
avg_sequential = sum(times_sequential) / len(times_sequential)
def write_records_to_file():
with tempfile.NamedTemporaryFile(delete=False, suffix='.mrc') as tmp:
temp_path = tmp.name
try:
start = time.perf_counter()
writer = MARCWriter(temp_path)
for record in records:
writer.write_record(record)
writer.close()
elapsed = time.perf_counter() - start
return elapsed, temp_path
except:
if os.path.exists(temp_path):
os.unlink(temp_path)
raise
times_concurrent = []
for _ in range(3):
with ThreadPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(write_records_to_file) for _ in range(2)]
results = [f.result() for f in futures]
max_time = max(r[0] for r in results)
times_concurrent.append(max_time)
for _, temp_path in results:
if os.path.exists(temp_path):
os.unlink(temp_path)
avg_concurrent = sum(times_concurrent) / len(times_concurrent)
speedup = avg_sequential / avg_concurrent
print("\nConcurrent Performance (2 threads × 5k records each):")
print(f" Sequential: {avg_sequential*1000:.2f}ms")
print(f" Concurrent: {avg_concurrent*1000:.2f}ms")
print(f" Ratio: {speedup:.2f}x")
print(" (Disk I/O contention is expected; GIL release enabled non-blocking execution)")
assert avg_concurrent > 0, "Concurrent execution failed"
def test_concurrent_4thread_speedup(self, fixture_5k):
reader = MARCReader(io.BytesIO(fixture_5k))
records = list(reader)
start = time.perf_counter()
with tempfile.NamedTemporaryFile(delete=False, suffix='.mrc') as tmp:
temp_path = tmp.name
try:
writer = MARCWriter(temp_path)
for record in records:
writer.write_record(record)
writer.close()
finally:
if os.path.exists(temp_path):
os.unlink(temp_path)
sequential_time = time.perf_counter() - start
def write_records_to_file():
with tempfile.NamedTemporaryFile(delete=False, suffix='.mrc') as tmp:
temp_path = tmp.name
try:
start = time.perf_counter()
writer = MARCWriter(temp_path)
for record in records:
writer.write_record(record)
writer.close()
elapsed = time.perf_counter() - start
return elapsed, temp_path
except:
if os.path.exists(temp_path):
os.unlink(temp_path)
raise
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(write_records_to_file) for _ in range(4)]
results = [f.result() for f in futures]
concurrent_time = max(r[0] for r in results)
ratio = sequential_time / concurrent_time
for _, temp_path in results:
if os.path.exists(temp_path):
os.unlink(temp_path)
print("\nConcurrent Execution (4 threads × 5k records each):")
print(f" Sequential: {sequential_time*1000:.2f}ms (1 file)")
print(f" Concurrent: {concurrent_time*1000:.2f}ms (4 files in parallel)")
print(f" Ratio: {ratio:.2f}x")
print(" (GIL release validates non-blocking capability)")
assert concurrent_time > 0, "Concurrent execution failed"
class TestThreePhasePatternOverhead:
def test_gil_release_overhead(self, fixture_1k):
reader = MARCReader(io.BytesIO(fixture_1k))
records = list(reader)
times = []
for _ in range(10):
start = time.perf_counter()
output = io.BytesIO()
writer = MARCWriter(output)
for record in records:
writer.write_record(record)
writer.close()
elapsed = time.perf_counter() - start
times.append(elapsed)
avg_time = sum(times) / len(times)
print("\nGIL Release Pattern Overhead (1k records):")
print(f" Average write time: {avg_time*1000:.2f}ms")
print(f" Time per record: {(avg_time/len(records))*1000000:.2f}µs")
min_time = min(times)
max_time = max(times)
variance = (max_time - min_time) / avg_time
print(f" Min: {min_time*1000:.2f}ms, Max: {max_time*1000:.2f}ms")
print(f" Variance: {variance*100:.1f}%")
assert avg_time < 1.0, f"Writing 1k records took too long: {avg_time*1000:.2f}ms"
@pytest.mark.benchmark
def test_bytesio_vs_file_isolation(self, fixture_1k):
reader = MARCReader(io.BytesIO(fixture_1k))
records = list(reader)
times_mem = []
for _ in range(5):
start = time.perf_counter()
output = io.BytesIO()
writer = MARCWriter(output)
for record in records:
writer.write_record(record)
writer.close()
times_mem.append(time.perf_counter() - start)
times_disk = []
for _ in range(5):
with tempfile.NamedTemporaryFile(delete=False, suffix='.mrc') as tmp:
temp_path = tmp.name
try:
start = time.perf_counter()
writer = MARCWriter(temp_path)
for record in records:
writer.write_record(record)
writer.close()
times_disk.append(time.perf_counter() - start)
finally:
if os.path.exists(temp_path):
os.unlink(temp_path)
med_mem = sorted(times_mem)[len(times_mem) // 2]
med_disk = sorted(times_disk)[len(times_disk) // 2]
disk_overhead_pct = (med_disk - med_mem) / med_mem * 100
print("\nI/O Overhead Analysis (1k records):")
print(f" BytesIO (memory): {med_mem*1000:.2f}ms")
print(f" RustFile (disk): {med_disk*1000:.2f}ms")
print(f" Disk overhead: {(med_disk - med_mem)*1000:.2f}ms ({disk_overhead_pct:.1f}%)")