import io
import pytest
from mrrc import MARCReader
class TestIteratorProtocol:
def test_iter_returns_self(self, fixture_1k):
reader = MARCReader(io.BytesIO(fixture_1k))
assert iter(reader) is reader
def test_next_raises_stop_iteration(self, fixture_small):
reader = MARCReader(io.BytesIO(fixture_small))
records = []
try:
while True:
records.append(next(reader))
except StopIteration:
pass
assert len(records) > 0
with pytest.raises(StopIteration):
next(reader)
def test_iteration_protocol_standard_loop(self, fixture_1k):
reader = MARCReader(io.BytesIO(fixture_1k))
records = []
for record in reader:
records.append(record)
assert len(records) == 1000
def test_iteration_protocol_list_conversion(self, fixture_1k):
reader = MARCReader(io.BytesIO(fixture_1k))
records = list(reader)
assert len(records) == 1000
class TestEofIdempotence:
def test_eof_idempotence_repeated_next_calls(self, fixture_small):
reader = MARCReader(io.BytesIO(fixture_small))
list(reader)
for call_num in range(10):
with pytest.raises(StopIteration, match=None):
next(reader)
def test_eof_idempotence_no_io_after_eof(self, fixture_small):
class TrackingBytesIO(io.BytesIO):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.read_count = 0
def read(self, n=-1):
self.read_count += 1
return super().read(n)
bytes_io = TrackingBytesIO(fixture_small)
reader = MARCReader(bytes_io)
list(reader)
initial_read_count = bytes_io.read_count
for _ in range(5):
with pytest.raises(StopIteration):
next(reader)
assert bytes_io.read_count == initial_read_count
def test_eof_idempotence_state_unchanged(self, fixture_small):
reader = MARCReader(io.BytesIO(fixture_small))
list(reader)
repr_before = repr(reader)
for _ in range(3):
with pytest.raises(StopIteration):
next(reader)
repr_after = repr(reader)
assert repr_before == repr_after
class TestPartialBatchAtEof:
def test_partial_batch_at_eof_all_delivered(self, fixture_217):
reader = MARCReader(io.BytesIO(fixture_217))
records = list(reader)
assert len(records) == 217
def test_partial_batch_eof_flag_set(self, fixture_217):
reader = MARCReader(io.BytesIO(fixture_217))
records = list(reader)
assert len(records) == 217
with pytest.raises(StopIteration):
next(reader)
def test_exact_batch_multiple_at_eof(self, fixture_500):
reader = MARCReader(io.BytesIO(fixture_500))
records = list(reader)
assert len(records) == 500
with pytest.raises(StopIteration):
next(reader)
class TestQueueStateTransitions:
def test_state_check_queue_non_empty_transition(self, fixture_1k):
reader = MARCReader(io.BytesIO(fixture_1k))
rec1 = next(reader)
assert rec1 is not None
for i in range(99):
rec = next(reader)
assert rec is not None
def test_state_read_batch_transition(self, fixture_1k):
reader = MARCReader(io.BytesIO(fixture_1k))
for i in range(100):
rec = next(reader)
assert rec is not None
rec_101 = next(reader)
assert rec_101 is not None
def test_state_check_eof_idempotent(self, fixture_small):
reader = MARCReader(io.BytesIO(fixture_small))
list(reader)
for _ in range(5):
with pytest.raises(StopIteration):
next(reader)
class TestResumeAfterPartialRead:
def test_pause_resume_iteration(self, fixture_1k):
reader = MARCReader(io.BytesIO(fixture_1k))
records = []
for _ in range(50):
records.append(next(reader))
assert len(records) == 50
for _ in range(50):
records.append(next(reader))
assert len(records) == 100
for record in reader:
records.append(record)
assert len(records) == 1000
def test_pause_resume_across_batch_boundary(self, fixture_1k):
reader = MARCReader(io.BytesIO(fixture_1k))
records = []
for _ in range(75):
records.append(next(reader))
for _ in range(50):
records.append(next(reader))
assert len(records) == 125
for record in reader:
records.append(record)
assert len(records) == 1000
class TestConcurrentIterators:
def test_independent_iterators_independent_state(self, fixture_1k):
data = fixture_1k
reader1 = MARCReader(io.BytesIO(data))
reader2 = MARCReader(io.BytesIO(data))
for _ in range(10):
next(reader1)
rec2_first = next(reader2)
assert rec2_first is not None
rec1_tenth = next(reader1)
assert rec1_tenth is not None
def test_two_readers_same_file_consumed_independently(self, fixture_small):
data = fixture_small
reader1 = MARCReader(io.BytesIO(data))
reader2 = MARCReader(io.BytesIO(data))
list(reader1)
rec = next(reader2)
assert rec is not None
for record in reader2:
pass
with pytest.raises(StopIteration):
next(reader1)
with pytest.raises(StopIteration):
next(reader2)
class TestErrorRecovery:
def test_eof_after_malformed_record(self, fixture_with_error):
reader = MARCReader(io.BytesIO(fixture_with_error))
records = []
error_count = 0
try:
for record in reader:
records.append(record)
except Exception:
error_count += 1
try:
next(reader)
except (StopIteration, Exception):
pass
class TestBatchSizeEdgeCases:
def test_batch_200_record_hard_limit(self):
pass
def test_batch_300kb_hard_limit(self):
pass