import pytest
import io
import tempfile
import shutil
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
from mrrc import MARCReader
class TestPythonParallelBenchmarks:
@pytest.mark.benchmark
def test_sequential_reading_1k(self, benchmark, fixture_1k):
def read_all():
data = io.BytesIO(fixture_1k)
reader = MARCReader(data)
count = 0
while reader.read_record() is not None:
count += 1
return count
result = benchmark(read_all)
assert result == 1000
@pytest.mark.benchmark
def test_sequential_2x_reading_1k(self, benchmark, fixture_1k):
def read_twice():
total = 0
for _ in range(2):
data = io.BytesIO(fixture_1k)
reader = MARCReader(data)
while reader.read_record() is not None:
total += 1
return total
result = benchmark(read_twice)
assert result == 2000
@pytest.mark.benchmark
def test_sequential_4x_reading_1k(self, benchmark, fixture_1k):
def read_4x():
total = 0
for _ in range(4):
data = io.BytesIO(fixture_1k)
reader = MARCReader(data)
while reader.read_record() is not None:
total += 1
return total
result = benchmark(read_4x)
assert result == 4000
@pytest.mark.benchmark
def test_threaded_reading_1k(self, benchmark, fixture_1k):
def read_with_threads():
def read_single_file(data):
reader = MARCReader(io.BytesIO(data))
count = 0
while reader.read_record() is not None:
count += 1
return count
with ThreadPoolExecutor(max_workers=2) as executor:
results = list(executor.map(read_single_file, [fixture_1k, fixture_1k]))
return sum(results)
result = benchmark(read_with_threads)
assert result == 2000
@pytest.mark.benchmark
def test_threaded_reading_4x_1k(self, benchmark, fixture_1k):
def read_with_threads():
def read_single_file(data):
reader = MARCReader(io.BytesIO(data))
count = 0
while reader.read_record() is not None:
count += 1
return count
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(
read_single_file,
[fixture_1k] * 4
))
return sum(results)
result = benchmark(read_with_threads)
assert result == 4000
@pytest.mark.benchmark
def test_sequential_10k(self, benchmark, fixture_10k):
def read_all():
data = io.BytesIO(fixture_10k)
reader = MARCReader(data)
count = 0
while reader.read_record() is not None:
count += 1
return count
result = benchmark(read_all)
assert result == 10000
@pytest.mark.benchmark
def test_sequential_2x_reading_10k(self, benchmark, fixture_10k):
def read_twice():
total = 0
for _ in range(2):
data = io.BytesIO(fixture_10k)
reader = MARCReader(data)
while reader.read_record() is not None:
total += 1
return total
result = benchmark(read_twice)
assert result == 20000
@pytest.mark.benchmark
def test_threaded_reading_2x_10k(self, benchmark, fixture_10k):
def read_with_threads():
def read_single_file(data):
reader = MARCReader(io.BytesIO(data))
count = 0
while reader.read_record() is not None:
count += 1
return count
with ThreadPoolExecutor(max_workers=2) as executor:
results = list(executor.map(read_single_file, [fixture_10k, fixture_10k]))
return sum(results)
result = benchmark(read_with_threads)
assert result == 20000
@pytest.mark.benchmark
def test_threaded_reading_4x_10k(self, benchmark, fixture_10k):
def read_with_threads():
def read_single_file(data):
reader = MARCReader(io.BytesIO(data))
count = 0
while reader.read_record() is not None:
count += 1
return count
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(
read_single_file,
[fixture_10k] * 4
))
return sum(results)
result = benchmark(read_with_threads)
assert result == 40000
class TestParallelSummary:
@pytest.mark.benchmark
def test_threading_speedup_2x_10k(self, benchmark, fixture_10k):
def threaded_vs_sequential():
def read_single_file(data):
reader = MARCReader(io.BytesIO(data))
count = 0
while reader.read_record() is not None:
count += 1
return count
with ThreadPoolExecutor(max_workers=2) as executor:
results = list(executor.map(read_single_file, [fixture_10k, fixture_10k]))
return sum(results)
result = benchmark(threaded_vs_sequential)
assert result == 20000
@pytest.mark.benchmark
def test_threading_speedup_4x_10k(self, benchmark, fixture_10k):
def threaded_4x():
def read_single_file(data):
reader = MARCReader(io.BytesIO(data))
count = 0
while reader.read_record() is not None:
count += 1
return count
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(
read_single_file,
[fixture_10k] * 4
))
return sum(results)
result = benchmark(threaded_4x)
assert result == 40000
class TestParallelWithFieldAccess:
@pytest.mark.benchmark
def test_sequential_with_title_extraction_10k(self, benchmark, fixture_10k):
def read_with_extraction():
data = io.BytesIO(fixture_10k)
reader = MARCReader(data)
titles = []
while (record := reader.read_record()) is not None:
title = record.title or "Unknown"
titles.append(title)
return len(titles)
result = benchmark(read_with_extraction)
assert result == 10000
@pytest.mark.benchmark
def test_threaded_with_title_extraction_2x_10k(self, benchmark, fixture_10k):
def read_with_extraction():
def extract_titles(data):
reader = MARCReader(io.BytesIO(data))
titles = []
while (record := reader.read_record()) is not None:
title = record.title or "Unknown"
titles.append(title)
return len(titles)
with ThreadPoolExecutor(max_workers=2) as executor:
results = list(executor.map(extract_titles, [fixture_10k, fixture_10k]))
return sum(results)
result = benchmark(read_with_extraction)
assert result == 20000
@pytest.mark.benchmark
def test_threaded_with_title_extraction_4x_10k(self, benchmark, fixture_10k):
def read_with_extraction():
def extract_titles(data):
reader = MARCReader(io.BytesIO(data))
titles = []
while (record := reader.read_record()) is not None:
title = record.title or "Unknown"
titles.append(title)
return len(titles)
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(
extract_titles,
[fixture_10k] * 4
))
return sum(results)
result = benchmark(read_with_extraction)
assert result == 40000
class TestIndividualOperationParallel:
@pytest.mark.benchmark
def test_parallel_read_4x_1k(self, benchmark, fixture_1k):
def read_parallel():
def read_single_file(data):
reader = MARCReader(io.BytesIO(data))
count = 0
while reader.read_record() is not None:
count += 1
return count
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(read_single_file, [fixture_1k] * 4))
return sum(results)
result = benchmark(read_parallel)
assert result == 4000
@pytest.mark.benchmark
def test_parallel_read_with_extract_4x_1k(self, benchmark, fixture_1k):
def read_parallel_extract():
def read_and_extract(data):
reader = MARCReader(io.BytesIO(data))
count = 0
while (record := reader.read_record()) is not None:
_ = record.title
_ = record.get_fields("100")
count += 1
return count
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(read_and_extract, [fixture_1k] * 4))
return sum(results)
result = benchmark(read_parallel_extract)
assert result == 4000
@pytest.mark.benchmark
def test_parallel_read_4x_10k(self, benchmark, fixture_10k):
def read_parallel():
def read_single_file(data):
reader = MARCReader(io.BytesIO(data))
count = 0
while reader.read_record() is not None:
count += 1
return count
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(read_single_file, [fixture_10k] * 4))
return sum(results)
result = benchmark(read_parallel)
assert result == 40000
@pytest.mark.benchmark
def test_parallel_read_with_extract_4x_10k(self, benchmark, fixture_10k):
def read_parallel_extract():
def read_and_extract(data):
reader = MARCReader(io.BytesIO(data))
count = 0
while (record := reader.read_record()) is not None:
_ = record.title
_ = record.get_fields("100")
count += 1
return count
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(read_and_extract, [fixture_10k] * 4))
return sum(results)
result = benchmark(read_parallel_extract)
assert result == 40000
class TestFileBatchParallelBenchmarks:
@pytest.fixture
def temp_fixtures(self, fixture_10k):
tmpdir = tempfile.mkdtemp()
file_paths = []
try:
for i in range(4):
filepath = Path(tmpdir) / f"batch_{i}.mrc"
filepath.write_bytes(fixture_10k)
file_paths.append(str(filepath))
yield file_paths
finally:
shutil.rmtree(tmpdir)
@pytest.mark.benchmark
def test_file_sequential_1x_10k(self, benchmark, temp_fixtures):
filepath = temp_fixtures[0]
def read_file():
reader = MARCReader(filepath)
count = 0
while reader.read_record() is not None:
count += 1
return count
result = benchmark(read_file)
assert result == 10000
@pytest.mark.benchmark
def test_file_sequential_2x_10k(self, benchmark, temp_fixtures):
filepaths = temp_fixtures[:2]
def read_files():
total = 0
for filepath in filepaths:
reader = MARCReader(filepath)
while reader.read_record() is not None:
total += 1
return total
result = benchmark(read_files)
assert result == 20000
@pytest.mark.benchmark
def test_file_sequential_4x_10k(self, benchmark, temp_fixtures):
filepaths = temp_fixtures
def read_files():
total = 0
for filepath in filepaths:
reader = MARCReader(filepath)
while reader.read_record() is not None:
total += 1
return total
result = benchmark(read_files)
assert result == 40000
@pytest.mark.benchmark
def test_file_parallel_2x_10k(self, benchmark, temp_fixtures):
filepaths = temp_fixtures[:2]
def read_parallel():
def read_file(filepath):
reader = MARCReader(filepath)
count = 0
while reader.read_record() is not None:
count += 1
return count
with ThreadPoolExecutor(max_workers=2) as executor:
results = list(executor.map(read_file, filepaths))
return sum(results)
result = benchmark(read_parallel)
assert result == 20000
@pytest.mark.benchmark
def test_file_parallel_4x_10k(self, benchmark, temp_fixtures):
filepaths = temp_fixtures
def read_parallel():
def read_file(filepath):
reader = MARCReader(filepath)
count = 0
while reader.read_record() is not None:
count += 1
return count
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(read_file, filepaths))
return sum(results)
result = benchmark(read_parallel)
assert result == 40000
@pytest.mark.benchmark
def test_file_parallel_4x_10k_with_extraction(self, benchmark, temp_fixtures):
filepaths = temp_fixtures
def read_parallel_extract():
def process_file(filepath):
reader = MARCReader(filepath)
count = 0
while (record := reader.read_record()) is not None:
_ = record.title
_ = record.get_fields("100")
count += 1
return count
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(process_file, filepaths))
return sum(results)
result = benchmark(read_parallel_extract)
assert result == 40000