mrrc 0.7.6

A Rust library for reading, writing, and manipulating MARC bibliographic records in ISO 2709 binary format
Documentation
"""
Performance analysis for writer backends.

Compares sequential vs concurrent performance and measures actual speedup.
Validates efficient GIL release pattern effectiveness.
"""

import pytest
import io
import time
import tempfile
import os
from concurrent.futures import ThreadPoolExecutor
from mrrc import MARCReader, MARCWriter


class TestSequentialBaseline:
     """Baseline measurements for sequential write performance."""
     
     def test_sequential_bytesio_baseline(self, fixture_10k):
         """Baseline: Write 10k records to BytesIO (pure Python, in-memory)."""
         # Pre-load records
         reader = MARCReader(io.BytesIO(fixture_10k))
         records = list(reader)
         assert len(records) == 10000
         
         # Warm up (JIT)
         output = io.BytesIO()
         writer = MARCWriter(output)
         for record in records[:100]:
             writer.write_record(record)
         writer.close()
         
         # Actual baseline measurement
         times = []
         for _ in range(3):
             start = time.perf_counter()
             output = io.BytesIO()
             writer = MARCWriter(output)
             for record in records:
                 writer.write_record(record)
             writer.close()
             elapsed = time.perf_counter() - start
             times.append(elapsed)
         
         avg_time = sum(times) / len(times)
         print(f"\nBaseline (BytesIO, 10k records): {avg_time*1000:.2f}ms")
         assert avg_time > 0
     
     def test_sequential_rustfile_baseline(self, fixture_10k):
         """Baseline: Write 10k records to file (RustFile backend)."""
         # Pre-load records
         reader = MARCReader(io.BytesIO(fixture_10k))
         records = list(reader)
         assert len(records) == 10000
         
         # Warm up
         with tempfile.NamedTemporaryFile(delete=False, suffix='.mrc') as tmp:
             temp_path = tmp.name
         try:
             writer = MARCWriter(temp_path)
             for record in records[:100]:
                 writer.write_record(record)
             writer.close()
         finally:
             if os.path.exists(temp_path):
                 os.unlink(temp_path)
         
         # Actual measurement
         times = []
         for _ in range(3):
             with tempfile.NamedTemporaryFile(delete=False, suffix='.mrc') as tmp:
                 temp_path = tmp.name
             
             try:
                 start = time.perf_counter()
                 writer = MARCWriter(temp_path)
                 for record in records:
                     writer.write_record(record)
                 writer.close()
                 elapsed = time.perf_counter() - start
                 times.append(elapsed)
             finally:
                 if os.path.exists(temp_path):
                     os.unlink(temp_path)
         
         avg_time = sum(times) / len(times)
         print(f"RustFile (file I/O, 10k records): {avg_time*1000:.2f}ms")
         assert avg_time > 0
     
     def test_sequential_baseline_comparison(self, fixture_10k):
         """Compare sequential performance: BytesIO vs RustFile."""
         # Pre-load records
         reader = MARCReader(io.BytesIO(fixture_10k))
         records = list(reader)
         
         # BytesIO baseline
         times_bytesio = []
         for _ in range(3):
             start = time.perf_counter()
             output = io.BytesIO()
             writer = MARCWriter(output)
             for record in records:
                 writer.write_record(record)
             writer.close()
             times_bytesio.append(time.perf_counter() - start)
         
         # RustFile baseline
         times_rustfile = []
         for _ in range(3):
             with tempfile.NamedTemporaryFile(delete=False, suffix='.mrc') as tmp:
                 temp_path = tmp.name
             
             try:
                 start = time.perf_counter()
                 writer = MARCWriter(temp_path)
                 for record in records:
                     writer.write_record(record)
                 writer.close()
                 times_rustfile.append(time.perf_counter() - start)
             finally:
                 if os.path.exists(temp_path):
                     os.unlink(temp_path)
         
         avg_bytesio = sum(times_bytesio) / len(times_bytesio)
         avg_rustfile = sum(times_rustfile) / len(times_rustfile)
         ratio = avg_bytesio / avg_rustfile
         
         print("\nSequential Comparison (10k records):")
         print(f"  BytesIO:  {avg_bytesio*1000:.2f}ms")
         print(f"  RustFile: {avg_rustfile*1000:.2f}ms")
         print(f"  Ratio:    {ratio:.2f}x (1.0 = same speed)")
         
         # BytesIO is in-memory, RustFile does disk I/O
         # RustFile being slightly slower is acceptable


class TestConcurrentPerformance:
     """Measure actual speedup from concurrent GIL release."""
     
     def test_concurrent_2thread_speedup(self, fixture_5k):
         """
         Measure speedup with 2 concurrent threads vs sequential.
         
         NOTE: With RustFile backend and disk I/O, concurrent writes to different
         files may be slower than sequential due to:
         - Disk I/O contention (drives have limited parallelism)
         - Kernel disk scheduling overhead
         - Thread creation/context switch overhead
         
         The main benefit of GIL release is enabling non-blocking I/O, which
         allows responsive systems (e.g., web servers) to handle multiple clients
         without freezing. This test validates the GIL is released, even if
         absolute performance shows disk I/O contention.
         """
         # Pre-load records
         reader = MARCReader(io.BytesIO(fixture_5k))
         records = list(reader)
         assert len(records) == 5000
         
         # Sequential baseline (single thread, write once)
         times_sequential = []
         for _ in range(3):
             start = time.perf_counter()
             with tempfile.NamedTemporaryFile(delete=False, suffix='.mrc') as tmp:
                 temp_path = tmp.name
             try:
                 writer = MARCWriter(temp_path)
                 for record in records:
                     writer.write_record(record)
                 writer.close()
             finally:
                 if os.path.exists(temp_path):
                     os.unlink(temp_path)
             times_sequential.append(time.perf_counter() - start)
         
         avg_sequential = sum(times_sequential) / len(times_sequential)
         
         # Concurrent baseline (2 threads, each write 5k records to different files)
         def write_records_to_file():
             with tempfile.NamedTemporaryFile(delete=False, suffix='.mrc') as tmp:
                 temp_path = tmp.name
             
             try:
                 start = time.perf_counter()
                 writer = MARCWriter(temp_path)
                 for record in records:
                     writer.write_record(record)
                 writer.close()
                 elapsed = time.perf_counter() - start
                 return elapsed, temp_path
             except:
                 if os.path.exists(temp_path):
                     os.unlink(temp_path)
                 raise
         
         times_concurrent = []
         for _ in range(3):
             with ThreadPoolExecutor(max_workers=2) as executor:
                 futures = [executor.submit(write_records_to_file) for _ in range(2)]
                 results = [f.result() for f in futures]
             
             # Wall clock time is max of the two threads
             max_time = max(r[0] for r in results)
             times_concurrent.append(max_time)
             
             # Clean up
             for _, temp_path in results:
                 if os.path.exists(temp_path):
                     os.unlink(temp_path)
         
         avg_concurrent = sum(times_concurrent) / len(times_concurrent)
         speedup = avg_sequential / avg_concurrent
         
         print("\nConcurrent Performance (2 threads × 5k records each):")
         print(f"  Sequential:  {avg_sequential*1000:.2f}ms")
         print(f"  Concurrent:  {avg_concurrent*1000:.2f}ms")
         print(f"  Ratio:       {speedup:.2f}x")
         print("  (Disk I/O contention is expected; GIL release enabled non-blocking execution)")
         
         # The main validation is that threads can run concurrently without deadlock
         # Disk I/O performance depends on storage characteristics
         # Just verify both threads completed successfully
         assert avg_concurrent > 0, "Concurrent execution failed"
     
     def test_concurrent_4thread_speedup(self, fixture_5k):
         """Measure concurrent execution with 4 threads (tests scalability of GIL release)."""
         # Pre-load records
         reader = MARCReader(io.BytesIO(fixture_5k))
         records = list(reader)
         
         # Sequential baseline
         start = time.perf_counter()
         with tempfile.NamedTemporaryFile(delete=False, suffix='.mrc') as tmp:
             temp_path = tmp.name
         try:
             writer = MARCWriter(temp_path)
             for record in records:
                 writer.write_record(record)
             writer.close()
         finally:
             if os.path.exists(temp_path):
                 os.unlink(temp_path)
         sequential_time = time.perf_counter() - start
         
         # Concurrent baseline (4 threads)
         def write_records_to_file():
             with tempfile.NamedTemporaryFile(delete=False, suffix='.mrc') as tmp:
                 temp_path = tmp.name
             
             try:
                 start = time.perf_counter()
                 writer = MARCWriter(temp_path)
                 for record in records:
                     writer.write_record(record)
                 writer.close()
                 elapsed = time.perf_counter() - start
                 return elapsed, temp_path
             except:
                 if os.path.exists(temp_path):
                     os.unlink(temp_path)
                 raise
         
         with ThreadPoolExecutor(max_workers=4) as executor:
             futures = [executor.submit(write_records_to_file) for _ in range(4)]
             results = [f.result() for f in futures]
         
         # Wall clock time is max of the four threads
         concurrent_time = max(r[0] for r in results)
         ratio = sequential_time / concurrent_time
         
         # Clean up
         for _, temp_path in results:
             if os.path.exists(temp_path):
                 os.unlink(temp_path)
         
         print("\nConcurrent Execution (4 threads × 5k records each):")
         print(f"  Sequential:  {sequential_time*1000:.2f}ms (1 file)")
         print(f"  Concurrent:  {concurrent_time*1000:.2f}ms (4 files in parallel)")
         print(f"  Ratio:       {ratio:.2f}x")
         print("  (GIL release validates non-blocking capability)")
         
         # Just verify all threads completed without deadlock
         assert concurrent_time > 0, "Concurrent execution failed"


class TestThreePhasePatternOverhead:
     """Analyze overhead of the three-phase GIL pattern."""
     
     def test_gil_release_overhead(self, fixture_1k):
         """Measure overhead from three-phase pattern (extract, release, write)."""
         # Pre-load records
         reader = MARCReader(io.BytesIO(fixture_1k))
         records = list(reader)
         
         # Time just the write phase (serialization + output)
         times = []
         for _ in range(10):
             start = time.perf_counter()
             output = io.BytesIO()
             writer = MARCWriter(output)
             for record in records:
                 writer.write_record(record)
             writer.close()
             elapsed = time.perf_counter() - start
             times.append(elapsed)
         
         avg_time = sum(times) / len(times)
         print("\nGIL Release Pattern Overhead (1k records):")
         print(f"  Average write time: {avg_time*1000:.2f}ms")
         print(f"  Time per record: {(avg_time/len(records))*1000000:.2f}µs")
         
         # Track min/max to see variance
         min_time = min(times)
         max_time = max(times)
         variance = (max_time - min_time) / avg_time
         print(f"  Min: {min_time*1000:.2f}ms, Max: {max_time*1000:.2f}ms")
         print(f"  Variance: {variance*100:.1f}%")
         
         # Verify we can write 1k records in reasonable time
         assert avg_time < 1.0, f"Writing 1k records took too long: {avg_time*1000:.2f}ms"
     
     @pytest.mark.benchmark
     def test_bytesio_vs_file_isolation(self, fixture_1k):
         """
         Isolate I/O overhead from serialization.
         
         This helps identify if slowdown is from:
         - GIL release pattern overhead
         - Disk I/O vs memory I/O
         - Python object handling
         """
         # Pre-load records
         reader = MARCReader(io.BytesIO(fixture_1k))
         records = list(reader)
         
         # Test 1: BytesIO (memory-only, fast I/O)
         times_mem = []
         for _ in range(5):
             start = time.perf_counter()
             output = io.BytesIO()
             writer = MARCWriter(output)
             for record in records:
                 writer.write_record(record)
             writer.close()
             times_mem.append(time.perf_counter() - start)

         # Test 2: RustFile (disk I/O)
         times_disk = []
         for _ in range(5):
             with tempfile.NamedTemporaryFile(delete=False, suffix='.mrc') as tmp:
                 temp_path = tmp.name
             
             try:
                 start = time.perf_counter()
                 writer = MARCWriter(temp_path)
                 for record in records:
                     writer.write_record(record)
                 writer.close()
                 times_disk.append(time.perf_counter() - start)
             finally:
                 if os.path.exists(temp_path):
                     os.unlink(temp_path)
         
         # Use median (not mean) to resist CI runner I/O outliers
         med_mem = sorted(times_mem)[len(times_mem) // 2]
         med_disk = sorted(times_disk)[len(times_disk) // 2]

         disk_overhead_pct = (med_disk - med_mem) / med_mem * 100
         print("\nI/O Overhead Analysis (1k records):")
         print(f"  BytesIO (memory): {med_mem*1000:.2f}ms")
         print(f"  RustFile (disk):  {med_disk*1000:.2f}ms")
         print(f"  Disk overhead:    {(med_disk - med_mem)*1000:.2f}ms ({disk_overhead_pct:.1f}%)")