import pytest
from mrrc import MARCReader, RecordBoundaryScanner
@pytest.fixture
def simple_book_bytes():
with open("tests/data/simple_book.mrc", "rb") as f:
return f.read()
@pytest.fixture
def multi_records_bytes():
with open("tests/data/multi_records.mrc", "rb") as f:
return f.read()
class TestBoundaryScannerBasics:
def test_scanner_creation(self):
scanner = RecordBoundaryScanner()
assert scanner is not None
def test_scan_single_record(self):
data = bytes([1, 2, 3, 0x1D]) scanner = RecordBoundaryScanner()
boundaries = scanner.scan(data)
assert len(boundaries) == 1
assert boundaries[0] == (0, 4)
def test_scan_multiple_records(self):
data = bytes([1, 2, 0x1D, 3, 4, 0x1D, 5, 0x1D]) scanner = RecordBoundaryScanner()
boundaries = scanner.scan(data)
assert len(boundaries) == 3
assert boundaries[0] == (0, 3)
assert boundaries[1] == (3, 3)
assert boundaries[2] == (6, 2)
def test_scan_empty_buffer(self):
scanner = RecordBoundaryScanner()
with pytest.raises(Exception): scanner.scan(b"")
def test_scan_no_terminators(self):
data = bytes([1, 2, 3, 4]) scanner = RecordBoundaryScanner()
with pytest.raises(Exception): scanner.scan(data)
class TestBoundaryScannerRealData:
def test_scan_simple_book(self, simple_book_bytes):
scanner = RecordBoundaryScanner()
boundaries = scanner.scan(simple_book_bytes)
assert len(boundaries) >= 1, "Should find at least one record"
assert boundaries[0][0] == 0, "First record should start at offset 0"
for offset, length in boundaries:
assert offset + length <= len(simple_book_bytes), \
"Record boundary exceeds file size"
def test_scan_multi_records(self, multi_records_bytes):
scanner = RecordBoundaryScanner()
boundaries = scanner.scan(multi_records_bytes)
assert len(boundaries) > 1
offsets = [b[0] for b in boundaries]
assert offsets == sorted(offsets)
def test_boundary_reconstruction(self, simple_book_bytes):
scanner = RecordBoundaryScanner()
boundaries = scanner.scan(simple_book_bytes)
for offset, length in boundaries:
record_bytes = simple_book_bytes[offset : offset + length]
assert len(record_bytes) > 0, "Record should have content"
assert record_bytes[-1] == 0x1D, "Record should end with 0x1D record terminator"
assert offset + length <= len(simple_book_bytes), \
"Record should not exceed file boundaries"
class TestBoundaryScannerLimiting:
def test_scan_limited(self):
data = bytes([1, 0x1D, 2, 0x1D, 3, 0x1D]) scanner = RecordBoundaryScanner()
boundaries = scanner.scan_limited(data, 2)
assert len(boundaries) == 2
assert boundaries[0] == (0, 2)
assert boundaries[1] == (2, 2)
def test_scan_limited_exceeds_available(self):
data = bytes([1, 0x1D, 2, 0x1D]) scanner = RecordBoundaryScanner()
boundaries = scanner.scan_limited(data, 10)
assert len(boundaries) == 2
def test_scan_limited_batch_processing(self, multi_records_bytes):
scanner = RecordBoundaryScanner()
all_boundaries = scanner.scan(multi_records_bytes)
total_records = len(all_boundaries)
assert total_records > 0, "Should find at least one record"
half = (total_records + 1) // 2
limited_boundaries = scanner.scan_limited(multi_records_bytes, half)
assert len(limited_boundaries) <= half, \
f"Should return at most {half} records, got {len(limited_boundaries)}"
assert len(limited_boundaries) > 0, "Should return at least one record"
class TestBoundaryScannerCounting:
def test_count_records(self):
data = bytes([1, 0x1D, 2, 0x1D]) scanner = RecordBoundaryScanner()
count = scanner.count_records(data)
assert count == 2
def test_count_records_empty(self):
scanner = RecordBoundaryScanner()
count = scanner.count_records(b"")
assert count == 0
def test_count_records_no_terminators(self):
data = bytes([1, 2, 3, 4])
scanner = RecordBoundaryScanner()
count = scanner.count_records(data)
assert count == 0
def test_count_matches_scan(self, multi_records_bytes):
scanner = RecordBoundaryScanner()
count = scanner.count_records(multi_records_bytes)
boundaries = scanner.scan(multi_records_bytes)
assert count == len(boundaries)
class TestBoundaryScannerPerformance:
def test_large_buffer_scan(self):
data = bytearray()
for i in range(1000):
data.append(0x01 if i % 2 == 0 else 0x02)
data.append(0x1D)
scanner = RecordBoundaryScanner()
boundaries = scanner.scan(bytes(data))
assert len(boundaries) == 1000
assert boundaries[0] == (0, 2)
assert boundaries[999] == (1998, 2)
def test_scanner_reuse(self):
scanner = RecordBoundaryScanner()
data1 = bytes([1, 0x1D, 2, 0x1D]) boundaries1 = scanner.scan(data1)
assert len(boundaries1) == 2
data2 = bytes([1, 0x1D])
boundaries2 = scanner.scan(data2)
assert len(boundaries2) == 1
assert boundaries2[0] == (0, 2)
def test_count_vs_scan_performance(self, multi_records_bytes):
scanner = RecordBoundaryScanner()
count = scanner.count_records(multi_records_bytes)
boundaries = scanner.scan(multi_records_bytes)
assert count == len(boundaries)
class TestBoundaryScannerIntegration:
def test_boundaries_enable_parallel_parsing(self, multi_records_bytes):
scanner = RecordBoundaryScanner()
boundaries = scanner.scan(multi_records_bytes)
assert len(boundaries) > 0, "Should find at least one record"
for i, (offset1, len1) in enumerate(boundaries):
for j, (offset2, len2) in enumerate(boundaries):
if i != j:
end1 = offset1 + len1
assert end1 <= offset2 or offset2 + len2 <= offset1, \
f"Records {i} and {j} overlap"
def test_sequential_vs_boundary_parsing(self, multi_records_bytes):
reader = MARCReader(multi_records_bytes)
sequential_records = []
while True:
record = reader.read_record()
if record is None:
break
sequential_records.append(record)
assert len(sequential_records) > 0, "Should find records via sequential parsing"
scanner = RecordBoundaryScanner()
boundaries = scanner.scan(multi_records_bytes)
assert len(boundaries) > 0, "Boundary scan should find record boundaries"
assert len(boundaries) >= len(sequential_records), \
"Boundary scan should find >= complete records"
class TestBoundaryScannerAcceptanceCriteria:
def test_accepts_real_marc_data(self, simple_book_bytes, multi_records_bytes):
scanner = RecordBoundaryScanner()
boundaries1 = scanner.scan(simple_book_bytes)
assert len(boundaries1) > 0
boundaries2 = scanner.scan(multi_records_bytes)
assert len(boundaries2) > 0
def test_produces_valid_boundaries(self, multi_records_bytes):
scanner = RecordBoundaryScanner()
boundaries = scanner.scan(multi_records_bytes)
for offset, length in boundaries:
assert 0 <= offset < len(multi_records_bytes)
assert offset + length <= len(multi_records_bytes)
assert multi_records_bytes[offset + length - 1] == 0x1D
def test_enables_parallel_parsing_readiness(self, multi_records_bytes):
scanner = RecordBoundaryScanner()
boundaries = scanner.scan(multi_records_bytes)
seen_ranges = set()
for offset, length in boundaries:
for i in range(offset, offset + length):
assert i not in seen_ranges, "Overlapping boundaries"
seen_ranges.add(i)
assert len(seen_ranges) == len(seen_ranges), "Should not have duplicates"
for offset, length in boundaries:
assert offset + length <= len(multi_records_bytes), \
f"Boundary exceeds file: {offset} + {length} > {len(multi_records_bytes)}"