import pytest
from concurrent.futures import ThreadPoolExecutor
from mrrc import ProducerConsumerPipeline
class TestProducerConsumerPipelineBasic:
@pytest.mark.benchmark
def test_pipeline_sequential_1x_10k(self, benchmark):
fixture_path = "tests/data/fixtures/10k_records.mrc"
def read_pipeline():
pipeline = ProducerConsumerPipeline.from_file(fixture_path)
count = 0
for record in pipeline:
count += 1
return count
result = benchmark(read_pipeline)
assert result == 10000
@pytest.mark.benchmark
def test_pipeline_sequential_4x_10k(self, benchmark):
fixture_path = "tests/data/fixtures/10k_records.mrc"
def read_pipelines():
total = 0
for _ in range(4):
pipeline = ProducerConsumerPipeline.from_file(fixture_path)
for record in pipeline:
total += 1
return total
result = benchmark(read_pipelines)
assert result == 40000
@pytest.mark.benchmark
def test_pipeline_parallel_2x_10k_threaded(self, benchmark):
fixture_path = "tests/data/fixtures/10k_records.mrc"
def read_parallel():
def read_pipeline(_):
pipeline = ProducerConsumerPipeline.from_file(fixture_path)
count = 0
for record in pipeline:
count += 1
return count
with ThreadPoolExecutor(max_workers=2) as executor:
results = list(executor.map(read_pipeline, range(2)))
return sum(results)
result = benchmark(read_parallel)
assert result == 20000
@pytest.mark.benchmark
def test_pipeline_parallel_4x_10k_threaded(self, benchmark):
fixture_path = "tests/data/fixtures/10k_records.mrc"
def read_parallel():
def read_pipeline(_):
pipeline = ProducerConsumerPipeline.from_file(fixture_path)
count = 0
for record in pipeline:
count += 1
return count
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(read_pipeline, range(4)))
return sum(results)
result = benchmark(read_parallel)
assert result == 40000
class TestProducerConsumerPipelineWithExtraction:
@pytest.mark.benchmark
def test_pipeline_sequential_extraction_4x_10k(self, benchmark):
fixture_path = "tests/data/fixtures/10k_records.mrc"
def read_with_extraction():
total = 0
for _ in range(4):
pipeline = ProducerConsumerPipeline.from_file(fixture_path)
for record in pipeline:
_ = record.title
_ = record.get_fields("100")
total += 1
return total
result = benchmark(read_with_extraction)
assert result == 40000
@pytest.mark.benchmark
def test_pipeline_parallel_extraction_4x_10k_threaded(self, benchmark):
fixture_path = "tests/data/fixtures/10k_records.mrc"
def read_with_extraction():
def process_pipeline(_):
pipeline = ProducerConsumerPipeline.from_file(fixture_path)
count = 0
for record in pipeline:
_ = record.title
_ = record.get_fields("100")
count += 1
return count
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(process_pipeline, range(4)))
return sum(results)
result = benchmark(read_with_extraction)
assert result == 40000
class TestProducerConsumerPipelineMultiFile:
@pytest.mark.benchmark
def test_process_4_files_sequential(self, benchmark):
fixture_path = "tests/data/fixtures/10k_records.mrc"
def process_sequential():
total = 0
for _ in range(4):
pipeline = ProducerConsumerPipeline.from_file(fixture_path)
for record in pipeline:
total += 1
return total
result = benchmark(process_sequential)
assert result == 40000
@pytest.mark.benchmark
def test_process_4_files_parallel_4_threads(self, benchmark):
fixture_path = "tests/data/fixtures/10k_records.mrc"
def process_parallel():
def process_file(_):
pipeline = ProducerConsumerPipeline.from_file(fixture_path)
count = 0
for record in pipeline:
count += 1
return count
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(process_file, range(4)))
return sum(results)
result = benchmark(process_parallel)
assert result == 40000
@pytest.mark.benchmark
def test_process_8_files_parallel_4_threads(self, benchmark):
fixture_path = "tests/data/fixtures/10k_records.mrc"
def process_parallel():
def process_file(_):
pipeline = ProducerConsumerPipeline.from_file(fixture_path)
count = 0
for record in pipeline:
count += 1
return count
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(process_file, range(8)))
return sum(results)
result = benchmark(process_parallel)
assert result == 80000