import pytest
from mrrc import (
Record,
Field,
Leader,
FieldQuery,
TagRangeQuery,
SubfieldPatternQuery,
SubfieldValueQuery,
)
def create_field(tag, ind1=" ", ind2=" ", **subfields):
field = Field(tag, ind1, ind2)
for code, value in subfields.items():
field.add_subfield(code, value)
return field
def create_test_record():
leader = Leader()
record = Record(leader)
record.add_control_field("001", "12345")
record.add_control_field("008", "210101s2021 nyu 000 0 eng d")
record.add_field(create_field("020", a="978-0-12-345678-9"))
record.add_field(create_field("020", a="0-12-345678-X"))
record.add_field(create_field("020", a="979-10-12-345678-7"))
record.add_field(create_field("245", "1", "0", a="The Great Book", b="a subtitle", c="Author Name"))
record.add_field(create_field("100", "1", " ", a="Smith, John", d="1900-1980"))
record.add_field(create_field("700", "1", " ", a="Jones, Mary", d="1950-"))
record.add_field(create_field("650", " ", "0", a="History", x="20th century", v="Periodicals"))
record.add_field(create_field("650", " ", "0", a="Science", x="History"))
record.add_field(create_field("650", " ", "2", a="Medical Subject", x="therapy"))
record.add_field(create_field("650", " ", "4", a="Local Subject"))
record.add_field(create_field("651", " ", "0", a="United States", x="History"))
record.add_field(create_field("600", "1", "0", a="Lincoln, Abraham", d="1809-1865"))
record.add_field(create_field("500", a="General note."))
record.add_field(create_field("504", a="Includes bibliographical references."))
return record
class TestFieldQuery:
def test_empty_query_matches_all(self):
record = create_test_record()
query = FieldQuery()
results = record.fields_matching(query)
assert len(results) > 0
def test_query_by_tag(self):
record = create_test_record()
query = FieldQuery().tag("650")
results = record.fields_matching(query)
assert len(results) == 4 for field in results:
assert field.tag == "650"
def test_query_by_indicator1(self):
record = create_test_record()
query = FieldQuery().tag("100").indicator1("1")
results = record.fields_matching(query)
assert len(results) == 1
assert results[0].indicator1 == "1"
def test_query_by_indicator2(self):
record = create_test_record()
query = FieldQuery().tag("650").indicator2("0")
results = record.fields_matching(query)
assert len(results) == 2 for field in results:
assert field.indicator2 == "0"
def test_query_with_required_subfield(self):
record = create_test_record()
query = FieldQuery().tag("650").has_subfield("x")
results = record.fields_matching(query)
assert len(results) == 3 for field in results:
values = field.subfields_by_code("x")
assert len(values) > 0
def test_query_with_multiple_required_subfields(self):
record = create_test_record()
query = FieldQuery().tag("650").has_subfield("a").has_subfield("v")
results = record.fields_matching(query)
assert len(results) == 1
def test_query_with_has_subfields_list(self):
record = create_test_record()
query = FieldQuery().tag("650").has_subfields(["a", "x"])
results = record.fields_matching(query)
assert len(results) == 3
def test_combined_query(self):
record = create_test_record()
query = FieldQuery().tag("650").indicator2("0").has_subfield("x")
results = record.fields_matching(query)
assert len(results) == 2
def test_query_indicator_wildcard(self):
record = create_test_record()
query = FieldQuery().tag("650").indicator1(None).indicator2("0")
results = record.fields_matching(query)
assert len(results) == 2
def test_query_repr(self):
query = FieldQuery().tag("650").indicator2("0").has_subfield("a")
repr_str = repr(query)
assert "FieldQuery" in repr_str
assert "650" in repr_str
assert "0" in repr_str
class TestTagRangeQuery:
def test_subject_range(self):
record = create_test_record()
query = TagRangeQuery("600", "699")
results = record.fields_matching_range(query)
assert len(results) == 6
for field in results:
assert "600" <= field.tag <= "699"
def test_range_with_indicator_filter(self):
record = create_test_record()
query = TagRangeQuery("600", "699", indicator2="0")
results = record.fields_matching_range(query)
assert len(results) == 4
for field in results:
assert field.indicator2 == "0"
def test_range_with_required_subfields(self):
record = create_test_record()
query = TagRangeQuery("600", "699", required_subfields=["x"])
results = record.fields_matching_range(query)
assert len(results) == 4
def test_tag_in_range_method(self):
query = TagRangeQuery("600", "699")
assert query.tag_in_range("600") is True
assert query.tag_in_range("650") is True
assert query.tag_in_range("699") is True
assert query.tag_in_range("599") is False
assert query.tag_in_range("700") is False
def test_range_getters(self):
query = TagRangeQuery("600", "699", indicator1="1", indicator2="0")
assert query.start_tag == "600"
assert query.end_tag == "699"
assert query.indicator1 == "1"
assert query.indicator2 == "0"
def test_range_from_field_query(self):
query = FieldQuery().indicator2("0").has_subfield("a").tag_range("600", "699")
record = create_test_record()
results = record.fields_matching_range(query)
assert len(results) == 4
class TestSubfieldPatternQuery:
def test_isbn13_pattern(self):
record = create_test_record()
query = SubfieldPatternQuery("020", "a", r"^978-")
results = record.fields_matching_pattern(query)
assert len(results) == 1
assert "978-0-12-345678-9" in results[0].subfields_by_code("a")
def test_isbn13_both_prefixes(self):
record = create_test_record()
query = SubfieldPatternQuery("020", "a", r"^97[89]-")
results = record.fields_matching_pattern(query)
assert len(results) == 2
def test_date_range_pattern(self):
record = create_test_record()
query = SubfieldPatternQuery("100", "d", r"\d{4}-\d{4}")
results = record.fields_matching_pattern(query)
assert len(results) == 1
assert "1900-1980" in results[0].subfields_by_code("d")
def test_open_date_pattern(self):
record = create_test_record()
query = SubfieldPatternQuery("700", "d", r"^\d{4}-$")
results = record.fields_matching_pattern(query)
assert len(results) == 1
assert "1950-" in results[0].subfields_by_code("d")
def test_invalid_regex_raises_error(self):
with pytest.raises(ValueError) as excinfo:
SubfieldPatternQuery("020", "a", r"[invalid")
assert "Invalid regex pattern" in str(excinfo.value)
def test_empty_subfield_code_raises_error(self):
with pytest.raises(ValueError):
SubfieldPatternQuery("020", "", r"^978-")
def test_pattern_getters(self):
query = SubfieldPatternQuery("020", "a", r"^978-")
assert query.tag == "020"
assert query.subfield_code == "a"
def test_negated_pattern_excludes_match(self):
record = create_test_record()
query = SubfieldPatternQuery("020", "a", r"^978-")
results = record.fields_matching_pattern(query)
assert len(results) == 1
query_neg = SubfieldPatternQuery("020", "a", r"^978-", negate=True)
results_neg = record.fields_matching_pattern(query_neg)
assert len(results_neg) > 0
for f in results_neg:
assert not f['a'].startswith('978-')
def test_negated_pattern_includes_nonmatch(self):
record = create_test_record()
query = SubfieldPatternQuery("020", "a", r"^NONEXISTENT", negate=True)
results = record.fields_matching_pattern(query)
all_020 = record.get_fields("020")
assert len(results) == len(all_020)
def test_negate_property(self):
query = SubfieldPatternQuery("020", "a", r"^978-")
assert query.negate is False
query_neg = SubfieldPatternQuery("020", "a", r"^978-", negate=True)
assert query_neg.negate is True
def test_negate_repr(self):
query = SubfieldPatternQuery("020", "a", r"^978-", negate=True)
assert "negate=true" in repr(query)
query_pos = SubfieldPatternQuery("020", "a", r"^978-")
assert "negate" not in repr(query_pos)
class TestSubfieldValueQuery:
def test_exact_match(self):
record = create_test_record()
query = SubfieldValueQuery("650", "a", "History")
results = record.fields_matching_value(query)
assert len(results) == 1
assert "History" in results[0].subfields_by_code("a")
def test_exact_match_case_sensitive(self):
record = create_test_record()
query = SubfieldValueQuery("650", "a", "history") results = record.fields_matching_value(query)
assert len(results) == 0
def test_partial_match(self):
record = create_test_record()
query = SubfieldValueQuery("650", "a", "Subject", partial=True)
results = record.fields_matching_value(query)
assert len(results) == 2
def test_partial_match_in_subdivision(self):
record = create_test_record()
query = SubfieldValueQuery("650", "x", "History", partial=True)
results = record.fields_matching_value(query)
assert len(results) == 1
def test_empty_subfield_code_raises_error(self):
with pytest.raises(ValueError):
SubfieldValueQuery("650", "", "History")
def test_value_getters(self):
query = SubfieldValueQuery("650", "a", "History", partial=True)
assert query.tag == "650"
assert query.subfield_code == "a"
assert query.value == "History"
assert query.partial is True
def test_negated_exact_excludes_match(self):
record = create_test_record()
query = SubfieldValueQuery("650", "a", "History")
results = record.fields_matching_value(query)
assert len(results) == 1
query_neg = SubfieldValueQuery("650", "a", "History", negate=True)
results_neg = record.fields_matching_value(query_neg)
assert len(results_neg) > 0
for f in results_neg:
assert f['a'] != 'History'
def test_negated_partial_excludes_match(self):
record = create_test_record()
query = SubfieldValueQuery("650", "a", "History", partial=True, negate=True)
results = record.fields_matching_value(query)
for f in results:
assert 'History' not in f['a']
def test_negate_property(self):
query = SubfieldValueQuery("650", "a", "History")
assert query.negate is False
query_neg = SubfieldValueQuery("650", "a", "History", negate=True)
assert query_neg.negate is True
def test_negate_repr(self):
query = SubfieldValueQuery("650", "a", "History", negate=True)
assert "negate=true" in repr(query)
query_pos = SubfieldValueQuery("650", "a", "History")
assert "negate" not in repr(query_pos)
class TestConvenienceMethods:
def test_fields_by_indicator(self):
record = create_test_record()
results = record.fields_by_indicator("650", indicator2="0")
assert len(results) == 2
for field in results:
assert field.tag == "650"
assert field.indicator2 == "0"
def test_fields_by_indicator_both(self):
record = create_test_record()
results = record.fields_by_indicator("700", indicator1="1")
assert len(results) == 1
def test_fields_in_range(self):
record = create_test_record()
results = record.fields_in_range("500", "599")
assert len(results) == 2 for field in results:
assert "500" <= field.tag <= "599"
class TestRealWorldUseCases:
def test_find_lcsh_subjects_with_subdivisions(self):
record = create_test_record()
query = (
FieldQuery()
.tag("650")
.indicator2("0") .has_subfield("x") )
results = record.fields_matching(query)
assert len(results) == 2 for field in results:
subdivisions = field.subfields_by_code("x")
assert len(subdivisions) > 0
def test_find_all_subjects_for_export(self):
record = create_test_record()
results = record.fields_in_range("600", "699")
assert len(results) == 6
tags = {f.tag for f in results}
assert tags == {"600", "650", "651"}
def test_find_isbn13_for_linking(self):
record = create_test_record()
query = SubfieldPatternQuery("020", "a", r"^978-")
results = record.fields_matching_pattern(query)
assert len(results) == 1
def test_find_persons_with_death_dates(self):
record = create_test_record()
query = SubfieldPatternQuery("100", "d", r"^\d{4}-\d{4}$")
results = record.fields_matching_pattern(query)
assert len(results) == 1
def test_combine_queries_for_complex_analysis(self):
record = create_test_record()
all_subjects = record.fields_in_range("600", "699")
assert len(all_subjects) == 6
lcsh_query = TagRangeQuery("600", "699", indicator2="0")
lcsh_subjects = record.fields_matching_range(lcsh_query)
assert len(lcsh_subjects) == 4
with_subdivisions = [
f for f in lcsh_subjects if len(f.subfields_by_code("x")) > 0
]
assert len(with_subdivisions) == 3
class TestEdgeCases:
def test_query_empty_record(self):
leader = Leader()
record = Record(leader)
query = FieldQuery().tag("650")
results = record.fields_matching(query)
assert len(results) == 0
def test_query_nonexistent_tag(self):
record = create_test_record()
query = FieldQuery().tag("999")
results = record.fields_matching(query)
assert len(results) == 0
def test_range_no_matches(self):
record = create_test_record()
query = TagRangeQuery("800", "899")
results = record.fields_matching_range(query)
assert len(results) == 0
def test_pattern_no_matches(self):
record = create_test_record()
query = SubfieldPatternQuery("020", "a", r"^NOTFOUND")
results = record.fields_matching_pattern(query)
assert len(results) == 0
def test_value_no_matches(self):
record = create_test_record()
query = SubfieldValueQuery("650", "a", "NonexistentSubject")
results = record.fields_matching_value(query)
assert len(results) == 0