import os
import tempfile
import unittest
from scripts.scraper.rom_database import RomDatabase, RomDbKey, HardwareType, hardware_from_console_type_and_region
class TestRomDatabase(unittest.TestCase):
def setUp(self) -> None:
fd, self.db_path = tempfile.mkstemp(prefix="romdb_test_", suffix=".sqlite")
os.close(fd)
self.db = RomDatabase(self.db_path)
def tearDown(self) -> None:
try:
self.db.close()
except Exception:
pass
try:
os.unlink(self.db_path)
except Exception:
pass
def test_schema_created_and_reset(self):
cur = self.db._conn.execute("PRAGMA table_info(roms)")
cols = {r[1] for r in cur.fetchall()}
self.assertIn(RomDbKey.CRC.value, cols)
self.assertIn(RomDbKey.NAME.value, cols)
self.assertIn(RomDbKey.ROM_ID.value, cols)
self.assertIn(RomDbKey.COUNTRY.value, cols)
self.db.reset_schema()
cur = self.db._conn.execute("PRAGMA table_info(roms)")
cols_after = {r[1] for r in cur.fetchall()}
self.assertIn(RomDbKey.CONSOLE_CLASS.value, cols_after)
self.assertIn(RomDbKey.COUNTRY.value, cols_after)
def test_insert_and_get_by_crc(self):
data = {
RomDbKey.CRC.value: "DEADBEEF",
RomDbKey.HARDWARE.value: 0,
RomDbKey.NAMETABLE_LAYOUT.value: "horizontal",
}
self.db.insert_rom_by_crc(data)
fetched = self.db.get_rom_by_crc("DEADBEEF")
self.assertIsNotNone(fetched)
self.assertEqual(fetched.get(RomDbKey.CRC.value), "DEADBEEF")
self.assertEqual(str(fetched.get(RomDbKey.HARDWARE.value)), "0")
def test_upsert_and_get_rom(self):
rom_id = 42
payload = {
RomDbKey.NAME.value: "Test ROM",
RomDbKey.CRC.value: "ABCD1234",
RomDbKey.HARDWARE.value: 0,
RomDbKey.MAPPER.value: 1,
RomDbKey.PRG_ROM_SIZE.value: 2,
}
self.db.upsert_rom(rom_id, payload)
got = self.db.get_rom(rom_id)
self.assertIsNotNone(got)
self.assertEqual(got.get(RomDbKey.NAME.value), "Test ROM")
self.assertEqual(got.get(RomDbKey.CRC.value), "ABCD1234")
def test_update_rom_by_crc(self):
crc = "BEEFCAFE"
self.db.insert_rom_by_crc({RomDbKey.CRC.value: crc})
self.db.update_rom_by_crc(crc, {RomDbKey.NAMETABLE_LAYOUT.value: "vertical"})
got = self.db.get_rom_by_crc(crc)
self.assertIsNotNone(got)
self.assertEqual(got.get(RomDbKey.NAMETABLE_LAYOUT.value), "vertical")
def test_missing_columns_remain_null_on_insert(self):
crc = "NULLONINSERT"
self.db.insert_rom_by_crc({RomDbKey.CRC.value: crc})
got = self.db.get_rom_by_crc(crc)
self.assertIsNotNone(got)
self.assertIsNone(got.get(RomDbKey.PRG_RAM_SIZE.value))
self.assertIsNone(got.get(RomDbKey.CHR_RAM_SIZE.value))
def test_update_can_set_explicit_null_and_zero_stays_distinct(self):
crc = "NULLVSZERO"
self.db.insert_rom_by_crc({RomDbKey.CRC.value: crc, RomDbKey.PRG_RAM_SIZE.value: 0})
got = self.db.get_rom_by_crc(crc)
self.assertIsNotNone(got)
self.assertEqual(got.get(RomDbKey.PRG_RAM_SIZE.value), 0)
self.db.update_rom_by_crc(crc, {RomDbKey.PRG_RAM_SIZE.value: None})
got = self.db.get_rom_by_crc(crc)
self.assertIsNotNone(got)
self.assertIsNone(got.get(RomDbKey.PRG_RAM_SIZE.value))
def test_process_record_by_crc_outcomes(self):
crc = "FEEDFACE"
add_res = self.db.process_record_by_crc({RomDbKey.CRC.value: crc})
self.assertEqual(add_res, (1, 0, 0, 0))
upd_res = self.db.process_record_by_crc({RomDbKey.CRC.value: crc, RomDbKey.NAME.value: "Name1"})
self.assertEqual(upd_res, (0, 1, 0, 0))
skip_res = self.db.process_record_by_crc({RomDbKey.CRC.value: crc, RomDbKey.NAME.value: "Name1"})
self.assertEqual(skip_res, (0, 0, 1, 0))
conflict_res = self.db.process_record_by_crc({RomDbKey.CRC.value: crc, RomDbKey.NAME.value: "Other"})
self.assertEqual(conflict_res, (0, 0, 0, 1))
def test_process_record_by_crc_inserts_full_record(self):
crc = "AABBCCDD"
payload = {
RomDbKey.CRC.value: crc,
RomDbKey.MAPPER.value: 2,
RomDbKey.PRG_ROM_SIZE.value: 16384,
}
add_res = self.db.process_record_by_crc(payload)
self.assertEqual(add_res, (1, 0, 0, 0))
fetched = self.db.get_rom_by_crc(crc)
self.assertIsNotNone(fetched)
self.assertEqual(str(fetched.get(RomDbKey.MAPPER.value)), "2")
self.assertEqual(str(fetched.get(RomDbKey.PRG_ROM_SIZE.value)), "16384")
def test_process_record_by_crc_ram_sum_mismatch_conflict(self):
crc = "RAMSUMCONFLICT"
self.db.insert_rom_by_crc({
RomDbKey.CRC.value: crc,
RomDbKey.PRG_RAM_SIZE.value: 8,
RomDbKey.PRG_NVRAM_SIZE.value: 0,
RomDbKey.CHR_RAM_SIZE.value: 4,
RomDbKey.CHR_NVRAM_SIZE.value: 0,
})
conflict_res = self.db.process_record_by_crc({
RomDbKey.CRC.value: crc,
RomDbKey.PRG_RAM_SIZE.value: 16,
RomDbKey.PRG_NVRAM_SIZE.value: 0,
RomDbKey.CHR_RAM_SIZE.value: 4,
RomDbKey.CHR_NVRAM_SIZE.value: 0,
})
self.assertEqual(conflict_res, (0, 0, 0, 1))
def test_process_record_by_crc_ram_sum_match_no_conflict(self):
crc = "RAMSUMMATCH"
self.db.insert_rom_by_crc({
RomDbKey.CRC.value: crc,
RomDbKey.PRG_RAM_SIZE.value: 8,
RomDbKey.PRG_NVRAM_SIZE.value: 0,
RomDbKey.CHR_RAM_SIZE.value: 4,
RomDbKey.CHR_NVRAM_SIZE.value: 0,
})
skip_res = self.db.process_record_by_crc({
RomDbKey.CRC.value: crc,
RomDbKey.PRG_RAM_SIZE.value: 0,
RomDbKey.PRG_NVRAM_SIZE.value: 8,
RomDbKey.CHR_RAM_SIZE.value: 0,
RomDbKey.CHR_NVRAM_SIZE.value: 4,
})
self.assertEqual(skip_res, (0, 0, 1, 0))
def test_list_roms_ordering(self):
self.db.upsert_rom(2, {RomDbKey.NAME.value: "B", RomDbKey.CRC.value: "C2"})
self.db.upsert_rom(1, {RomDbKey.NAME.value: "A", RomDbKey.CRC.value: "C1"})
rows = self.db.list_roms()
self.assertGreaterEqual(len(rows), 2)
ids = [r[RomDbKey.ROM_ID.value] for r in rows if r.get(RomDbKey.ROM_ID.value) in (1, 2)]
self.assertIn(1, ids)
self.assertIn(2, ids)
def test_ensure_columns_adds_column(self):
self.db._ensure_columns({"new_test_col": "TEXT"})
cur = self.db._conn.execute("PRAGMA table_info(roms)")
cols = {r[1] for r in cur.fetchall()}
self.assertIn("new_test_col", cols)
def test_upsert_stores_all_fields(self):
rom_id = 7
col_types = self.db.list_columns_with_types()
col_names = [name for name in col_types.keys() if name != RomDbKey.ROM_ID.value]
payload = {}
int_value = 1
for name in col_names:
if col_types[name] == "INTEGER":
payload[name] = int_value
int_value += 1
else:
payload[name] = f"val_{name}"
self.assertEqual(len(payload), len(col_names))
self.db.upsert_rom(rom_id, payload)
fetched = self.db.get_rom(rom_id)
self.assertIsNotNone(fetched)
for key, value in payload.items():
self.assertEqual(str(fetched.get(key)), str(value))
def test_hardware_ntsc_does_not_overwrite_more_specific_xml_value(self) -> None:
crc = "MULTIREGCRC"
self.db.insert_rom_by_crc({
RomDbKey.CRC.value: crc,
RomDbKey.HARDWARE.value: HardwareType.NES_MULTI_REGION.value,
})
result = self.db.process_record_by_crc({
RomDbKey.CRC.value: crc,
RomDbKey.HARDWARE.value: HardwareType.NES_NTSC.value,
})
self.assertNotEqual(result, (0, 0, 0, 1), "Should not produce a conflict")
row = self.db.get_rom_by_crc(crc)
self.assertEqual(str(row[RomDbKey.HARDWARE.value]), str(HardwareType.NES_MULTI_REGION.value))
def test_hardware_ntsc_does_not_overwrite_pal(self) -> None:
crc = "PALOVERNTSC"
self.db.insert_rom_by_crc({
RomDbKey.CRC.value: crc,
RomDbKey.HARDWARE.value: HardwareType.NES_PAL.value,
})
result = self.db.process_record_by_crc({
RomDbKey.CRC.value: crc,
RomDbKey.HARDWARE.value: HardwareType.NES_NTSC.value,
})
self.assertNotEqual(result, (0, 0, 0, 1), "Should not produce a conflict")
row = self.db.get_rom_by_crc(crc)
self.assertEqual(str(row[RomDbKey.HARDWARE.value]), str(HardwareType.NES_PAL.value))
def test_hardware_conflict_between_two_specific_values(self) -> None:
crc = "HWCONFLICT"
self.db.insert_rom_by_crc({
RomDbKey.CRC.value: crc,
RomDbKey.HARDWARE.value: HardwareType.NES_PAL.value,
})
result = self.db.process_record_by_crc({
RomDbKey.CRC.value: crc,
RomDbKey.HARDWARE.value: HardwareType.NES_MULTI_REGION.value,
})
self.assertEqual(result, (0, 0, 0, 1), "Two differing specific values must conflict")
def test_hardware_multi_region_existing_ignores_incoming(self) -> None:
crc = "MULTIREG_LOCK"
self.db.insert_rom_by_crc({
RomDbKey.CRC.value: crc,
RomDbKey.HARDWARE.value: HardwareType.NES_MULTI_REGION.value,
})
result = self.db.process_record_by_crc({
RomDbKey.CRC.value: crc,
RomDbKey.HARDWARE.value: HardwareType.NES_PAL.value,
})
self.assertNotEqual(result, (0, 0, 0, 1), "NES_MULTI_REGION existing must not conflict")
row = self.db.get_rom_by_crc(crc)
self.assertEqual(
str(row[RomDbKey.HARDWARE.value]),
str(HardwareType.NES_MULTI_REGION.value),
"NES_MULTI_REGION must be preserved",
)
def test_hardware_famicom_upgrades_ntsc(self) -> None:
crc = "FAMICOM_UP"
self.db.insert_rom_by_crc({
RomDbKey.CRC.value: crc,
RomDbKey.HARDWARE.value: HardwareType.NES_NTSC.value,
})
result = self.db.process_record_by_crc({
RomDbKey.CRC.value: crc,
RomDbKey.HARDWARE.value: HardwareType.FAMICOM.value,
})
self.assertNotEqual(result, (0, 0, 0, 1), "FAMICOM upgrading NES_NTSC must not conflict")
row = self.db.get_rom_by_crc(crc)
self.assertEqual(
str(row[RomDbKey.HARDWARE.value]),
str(HardwareType.FAMICOM.value),
"FAMICOM must overwrite NES_NTSC",
)
class TestHardwareFromConsoleTypeAndRegion(unittest.TestCase):
def test_region_2_numeric_is_multi_region(self):
result = hardware_from_console_type_and_region("0", "2")
self.assertEqual(result, HardwareType.NES_MULTI_REGION.value)
def test_region_3_numeric_is_dendy(self):
result = hardware_from_console_type_and_region("0", "3")
self.assertEqual(result, HardwareType.DENDY.value)
def test_region_text_dendy_is_dendy(self):
result = hardware_from_console_type_and_region("0", "dendy")
self.assertEqual(result, HardwareType.DENDY.value)
def test_region_text_universal_is_multi_region(self):
result = hardware_from_console_type_and_region("0", "universal")
self.assertEqual(result, HardwareType.NES_MULTI_REGION.value)
def test_region_0_is_ntsc(self):
result = hardware_from_console_type_and_region("0", "0")
self.assertEqual(result, HardwareType.NES_NTSC.value)
def test_region_1_is_pal(self):
result = hardware_from_console_type_and_region("0", "1")
self.assertEqual(result, HardwareType.NES_PAL.value)
def test_japan_ntsc_is_famicom(self):
result = hardware_from_console_type_and_region("0", "0", country="Licensed Japan")
self.assertEqual(result, HardwareType.FAMICOM.value)
def test_japan_multi_region_is_famicom(self):
result = hardware_from_console_type_and_region("0", "2", country="Licensed Japan")
self.assertEqual(result, HardwareType.NES_MULTI_REGION.value)
def test_japan_pal_remains_pal(self):
result = hardware_from_console_type_and_region("0", "1", country="Japan")
self.assertEqual(result, HardwareType.NES_PAL.value)
def test_non_japan_ntsc_remains_nes_ntsc(self):
result = hardware_from_console_type_and_region("0", "0", country="USA")
self.assertEqual(result, HardwareType.NES_NTSC.value)
if __name__ == "__main__":
unittest.main()