import pytest
from mrrc import MARCReader, RecordBoundaryScanner
from mrrc.rayon_parser_pool import parse_batch_parallel
@pytest.fixture
def simple_book_bytes():
with open("tests/data/simple_book.mrc", "rb") as f:
return f.read()
@pytest.fixture
def multi_records_bytes():
with open("tests/data/multi_records.mrc", "rb") as f:
return f.read()
class TestRayonParserPoolBasics:
def test_parser_pool_single_record(self, simple_book_bytes):
scanner = RecordBoundaryScanner()
boundaries = scanner.scan(simple_book_bytes)
boundaries = boundaries[:1]
records = parse_batch_parallel(boundaries, simple_book_bytes)
assert len(records) == 1
assert records[0] is not None
def test_parser_pool_multiple_records(self, multi_records_bytes):
scanner = RecordBoundaryScanner()
boundaries = scanner.scan(multi_records_bytes)
assert len(boundaries) > 1, "Test data should have multiple records"
records = parse_batch_parallel(boundaries, multi_records_bytes)
assert len(records) == len(boundaries)
for record in records:
assert record is not None
def test_parser_pool_empty_boundaries(self, multi_records_bytes):
records = parse_batch_parallel([], multi_records_bytes)
assert len(records) == 0
def test_parser_pool_invalid_boundary(self):
buffer = b"test data"
boundaries = [(0, 100)]
with pytest.raises(Exception): parse_batch_parallel(boundaries, buffer)
class TestRayonParserPoolParity:
def test_parity_simple_book(self, simple_book_bytes):
reader = MARCReader(simple_book_bytes)
sequential_records = []
while True:
record = reader.read_record()
if record is None:
break
sequential_records.append(record)
scanner = RecordBoundaryScanner()
boundaries = scanner.scan(simple_book_bytes)
parallel_records = parse_batch_parallel(boundaries, simple_book_bytes)
assert len(parallel_records) == len(sequential_records), \
f"Record count mismatch: {len(parallel_records)} vs {len(sequential_records)}"
for i, (seq_rec, par_rec) in enumerate(zip(sequential_records, parallel_records)):
seq_json = seq_rec.to_marcjson()
par_json = par_rec.to_marcjson()
assert seq_json == par_json, \
f"Record {i} mismatch:\nSequential: {seq_json}\nParallel: {par_json}"
def test_parity_multi_records(self, multi_records_bytes):
reader = MARCReader(multi_records_bytes)
sequential_records = []
while True:
record = reader.read_record()
if record is None:
break
sequential_records.append(record)
scanner = RecordBoundaryScanner()
boundaries = scanner.scan(multi_records_bytes)
parallel_records = parse_batch_parallel(boundaries, multi_records_bytes)
assert len(parallel_records) == len(sequential_records)
for seq_rec, par_rec in zip(sequential_records, parallel_records):
assert seq_rec.to_marcjson() == par_rec.to_marcjson()
class TestRayonParserPoolBatching:
def test_parse_batch_limited(self, multi_records_bytes):
scanner = RecordBoundaryScanner()
boundaries = scanner.scan(multi_records_bytes)
if len(boundaries) > 1:
limited_boundaries = boundaries[:2]
records = parse_batch_parallel(limited_boundaries, multi_records_bytes)
assert len(records) == 2
def test_parse_batch_order_preserved(self, multi_records_bytes):
scanner = RecordBoundaryScanner()
boundaries = scanner.scan(multi_records_bytes)
if len(boundaries) > 1:
records = parse_batch_parallel(boundaries, multi_records_bytes)
for i, record in enumerate(records):
assert record is not None, f"Record {i} is None"
class TestRayonParserPoolThreadSafety:
def test_concurrent_parallel_parsing(self, multi_records_bytes):
import threading
scanner = RecordBoundaryScanner()
boundaries = scanner.scan(multi_records_bytes)
results = []
errors = []
def parse_worker():
try:
records = parse_batch_parallel(boundaries, multi_records_bytes)
results.append(len(records))
except Exception as e:
errors.append(e)
threads = [threading.Thread(target=parse_worker) for _ in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()
assert len(errors) == 0, f"Errors occurred: {errors}"
assert len(results) == 3
assert len(set(results)) == 1, "Inconsistent record counts"
def test_parse_while_reading_sequential(self, multi_records_bytes):
import threading
sequential_records = []
parallel_records = []
def sequential_reader():
reader = MARCReader(multi_records_bytes)
while True:
record = reader.read_record()
if record is None:
break
sequential_records.append(record)
def parallel_parser():
scanner = RecordBoundaryScanner()
boundaries = scanner.scan(multi_records_bytes)
records = parse_batch_parallel(boundaries, multi_records_bytes)
parallel_records.extend(records)
t1 = threading.Thread(target=sequential_reader)
t2 = threading.Thread(target=parallel_parser)
t1.start()
t2.start()
t1.join()
t2.join()
assert len(sequential_records) > 0
assert len(parallel_records) > 0
assert len(sequential_records) == len(parallel_records)
class TestRayonParserPoolErrorHandling:
def test_error_in_parallel_task(self):
buffer = b"bad data"
boundaries = [(0, 8)]
with pytest.raises(Exception):
parse_batch_parallel(boundaries, buffer)
def test_mixed_valid_invalid_records(self):
buffer = b"\x00" * 24 + b"\x1E" + b"invalid"
boundaries = [(0, 25), (25, 7)]
with pytest.raises(Exception):
parse_batch_parallel(boundaries, buffer)
class TestRayonParserPoolPerformance:
def test_large_batch_parsing(self):
buffer = bytearray()
boundaries = []
for i in range(100):
offset = len(buffer)
buffer.extend(b"\x00" * 24)
buffer.append(0x1E)
boundaries.append((offset, 25))
try:
parse_batch_parallel(boundaries, bytes(buffer))
except Exception:
pass
def test_parser_reuse_across_calls(self, multi_records_bytes):
scanner = RecordBoundaryScanner()
boundaries = scanner.scan(multi_records_bytes)
records1 = parse_batch_parallel(boundaries, multi_records_bytes)
records2 = parse_batch_parallel(boundaries, multi_records_bytes)
assert len(records1) == len(records2)
for r1, r2 in zip(records1, records2):
assert r1.to_marcjson() == r2.to_marcjson()
class TestRayonParserPoolAcceptanceCriteria:
def test_criterion_parallel_produces_identical_output(self, multi_records_bytes):
reader = MARCReader(multi_records_bytes)
sequential = []
while True:
record = reader.read_record()
if record is None:
break
sequential.append(record.to_marcjson())
scanner = RecordBoundaryScanner()
boundaries = scanner.scan(multi_records_bytes)
parallel = parse_batch_parallel(boundaries, multi_records_bytes)
parallel_json = [r.to_marcjson() for r in parallel]
assert sequential == parallel_json, "Parallel output doesn't match sequential"
def test_criterion_error_within_parallel_context(self):
buffer = b"x" * 100
boundaries = [(0, 200)]
with pytest.raises(Exception) as excinfo:
parse_batch_parallel(boundaries, buffer)
assert "bound" in str(excinfo.value).lower() or "exceed" in str(excinfo.value).lower()
def test_criterion_all_records_parsed_identically(self, simple_book_bytes, multi_records_bytes):
for test_data in [simple_book_bytes, multi_records_bytes]:
reader = MARCReader(test_data)
sequential_count = 0
while reader.read_record():
sequential_count += 1
scanner = RecordBoundaryScanner()
boundaries = scanner.scan(test_data)
parallel_records = parse_batch_parallel(boundaries, test_data)
assert len(parallel_records) == sequential_count, \
f"Record count mismatch for {test_data[:20]!r}"
class TestRayonParserPoolIntegration:
def test_boundary_scanner_to_parser_pipeline(self, multi_records_bytes):
scanner = RecordBoundaryScanner()
boundaries = scanner.scan(multi_records_bytes)
records = parse_batch_parallel(boundaries, multi_records_bytes)
assert len(records) > 0
assert len(records) == len(boundaries)
def test_parser_with_limited_boundaries(self, multi_records_bytes):
scanner = RecordBoundaryScanner()
boundaries = scanner.scan(multi_records_bytes)
if len(boundaries) > 2:
half = len(boundaries) // 2
limited = boundaries[:half]
records = parse_batch_parallel(limited, multi_records_bytes)
assert len(records) == half