import collections.abc
import email.message
import email.parser
import importlib.metadata
import os
import pathlib
import sys
import tempfile
import unittest
from oxidized_importer import (
OxidizedDistribution,
OxidizedFinder,
OxidizedResourceCollector,
find_resources_in_path,
)
class TestImporterMetadata(unittest.TestCase):
def setUp(self):
self.raw_temp_dir = tempfile.TemporaryDirectory(
prefix="oxidized_importer-test-"
)
self.td = pathlib.Path(self.raw_temp_dir.name)
self.old_finders = list(sys.meta_path)
self.old_path = list(sys.path)
def tearDown(self):
self.raw_temp_dir.cleanup()
del self.raw_temp_dir
del self.td
sys.meta_path[:] = self.old_finders
sys.path[:] = self.old_path
def _write_metadata(self):
metadata_path = self.td / "my_package-1.0.dist-info" / "METADATA"
metadata_path.parent.mkdir()
with metadata_path.open("w", encoding="utf-8") as fh:
fh.write("Name: my_package\n")
fh.write("Version: 1.0\n")
def _finder_from_td(self):
collector = OxidizedResourceCollector(allowed_locations=["in-memory"])
for r in find_resources_in_path(self.td):
collector.add_in_memory(r)
f = OxidizedFinder()
f.add_resources(collector.oxidize()[0])
return f
def test_find_distributions_empty(self):
f = OxidizedFinder()
dists = f.find_distributions()
self.assertIsInstance(dists, collections.abc.Iterator)
dists = list(dists)
self.assertEqual(len(dists), 0)
def test_find_distributions_default_context(self):
self._write_metadata()
f = self._finder_from_td()
dists = f.find_distributions(importlib.metadata.DistributionFinder.Context())
self.assertIsInstance(dists, collections.abc.Iterator)
dists = list(dists)
self.assertEqual(len(dists), 1)
def test_find_distributions_context_unknown_name(self):
f = OxidizedFinder()
dists = list(
f.find_distributions(
importlib.metadata.DistributionFinder.Context(name="missing")
)
)
self.assertEqual(len(dists), 0)
def test_find_distributions_context_name(self):
self._write_metadata()
f = self._finder_from_td()
dists = list(
f.find_distributions(
importlib.metadata.DistributionFinder.Context(name="my_package")
)
)
self.assertEqual(len(dists), 1)
dist = dists[0]
self.assertIsInstance(dist, OxidizedDistribution)
self.assertEqual(dist.version, "1.0")
def test_find_distributions_case_sensitivity(self):
pkginfo_path = self.td / "OneTwo-1.0.egg-info" / "PKG-INFO"
pkginfo_path.parent.mkdir()
with pkginfo_path.open("w", encoding="utf-8") as fh:
fh.write("Name: OneTwo\n")
fh.write("Version: 1.0\n")
f = self._finder_from_td()
dists = list(
f.find_distributions(
importlib.metadata.DistributionFinder.Context(name="onetwo")
)
)
self.assertEqual(len(dists), 1)
dists = list(
f.find_distributions(
importlib.metadata.DistributionFinder.Context(name="OneTwo")
)
)
self.assertEqual(len(dists), 1)
def test_read_text(self):
self._write_metadata()
f = self._finder_from_td()
dists = list(f.find_distributions())
self.assertEqual(len(dists), 1)
d = dists[0]
self.assertIsInstance(d, OxidizedDistribution)
self.assertIsNone(d.read_text("does_not_exist"))
data = d.read_text("METADATA")
self.assertEqual(data, "Name: my_package\nVersion: 1.0\n")
def test_load_metadata(self):
self._write_metadata()
f = self._finder_from_td()
dists = list(f.find_distributions())
self.assertEqual(len(dists), 1)
metadata = dists[0].metadata
self.assertIsInstance(metadata, email.message.Message)
try:
from importlib.metadata._adapters import Message as Adapter
self.assertIsInstance(metadata, Adapter)
except ImportError:
pass
self.assertEqual(metadata["Name"], "my_package")
self.assertEqual(metadata["Version"], "1.0")
def test_load_pkg_info(self):
pkginfo_path = self.td / "my_package-1.0.egg-info" / "PKG-INFO"
pkginfo_path.parent.mkdir()
with pkginfo_path.open("w", encoding="utf-8") as fh:
fh.write("Name: my_package\n")
fh.write("Version: 1.0\n")
collector = OxidizedResourceCollector(allowed_locations=["in-memory"])
for r in find_resources_in_path(self.td):
collector.add_in_memory(r)
f = OxidizedFinder()
f.add_resources(collector.oxidize()[0])
dists = list(f.find_distributions())
self.assertEqual(len(dists), 1)
metadata = dists[0].metadata
self.assertIsInstance(metadata, email.message.Message)
self.assertEqual(metadata["Name"], "my_package")
self.assertEqual(metadata["Version"], "1.0")
def test_name(self):
self._write_metadata()
f = self._finder_from_td()
dists = list(f.find_distributions())
self.assertEqual(dists[0].name, "my_package")
def test_normalized_name(self):
metadata_path = self.td / "my_package-1.0.dist-info" / "METADATA"
metadata_path.parent.mkdir()
with metadata_path.open("w", encoding="utf-8") as fh:
fh.write("Name: my-package\n")
fh.write("Version: 1.0\n")
f = self._finder_from_td()
dists = list(f.find_distributions())
self.assertEqual(dists[0].name, "my-package")
self.assertEqual(dists[0]._normalized_name, "my_package")
def test_version(self):
self._write_metadata()
f = self._finder_from_td()
dists = list(f.find_distributions())
self.assertEqual(dists[0].version, "1.0")
def test_missing_entry_points(self):
self._write_metadata()
f = self._finder_from_td()
dists = list(f.find_distributions())
self.assertEqual(len(dists), 1)
eps = dists[0].entry_points
self.assertIsInstance(eps, list)
self.assertEqual(len(eps), 0)
def test_populated_entry_points(self):
self._write_metadata()
entry_points_path = self.td / "my_package-1.0.dist-info" / "entry_points.txt"
with entry_points_path.open("w", encoding="utf-8") as fh:
fh.write("[console_scripts]\n")
fh.write("script = my_package:module\n")
f = self._finder_from_td()
dists = list(f.find_distributions())
eps = dists[0].entry_points
self.assertIsInstance(eps, list)
self.assertEqual(len(eps), 1)
ep = eps[0]
self.assertIsInstance(ep, importlib.metadata.EntryPoint)
self.assertEqual(ep.name, "script")
self.assertEqual(ep.value, "my_package:module")
self.assertEqual(ep.group, "console_scripts")
def test_requires_missing(self):
self._write_metadata()
f = self._finder_from_td()
dists = list(f.find_distributions())
self.assertIsNone(dists[0].requires)
def test_requires_metadata(self):
self._write_metadata()
with (self.td / "my_package-1.0.dist-info" / "METADATA").open("ab") as fh:
fh.write(b"Requires-Dist: foo\n")
fh.write(b"Requires-Dist: bar; extra == 'all'\n")
f = self._finder_from_td()
dists = list(f.find_distributions())
requires = dists[0].requires
self.assertIsInstance(requires, list)
self.assertEqual(requires, ["foo", "bar; extra == 'all'"])
def test_requires_egg_info(self):
pkginfo_path = self.td / "my_package-1.0.egg-info" / "PKG-INFO"
pkginfo_path.parent.mkdir()
with pkginfo_path.open("w", encoding="utf-8") as fh:
fh.write("Name: my_package\n")
fh.write("Version: 1.0\n")
requires_path = self.td / "my_package-1.0.egg-info" / "requires.txt"
with requires_path.open("w", encoding="utf-8") as fh:
fh.write("foo\n")
f = self._finder_from_td()
dists = list(f.find_distributions())
self.assertEqual(len(dists), 1)
requires = dists[0].requires
self.assertIsInstance(requires, list)
self.assertEqual(requires, ["foo"])
def test_distribution_locate_file(self):
self._write_metadata()
f = self._finder_from_td()
dist = list(f.find_distributions())[0]
with self.assertRaises(AttributeError):
dist.locate_file("METADATA")
def test_distribution_from_name(self):
self._write_metadata()
f = self._finder_from_td()
sys.meta_path = [f]
sys.path = []
with self.assertRaises(importlib.metadata.PackageNotFoundError):
OxidizedDistribution.from_name("missing")
dist = OxidizedDistribution.from_name("my_package")
self.assertIsInstance(dist, OxidizedDistribution)
metadata = importlib.metadata.metadata("my_package")
self.assertIsInstance(metadata, email.message.Message)
self.assertEqual(metadata["Name"], "my_package")
self.assertEqual(metadata["Version"], "1.0")
def test_distribution_discover(self):
self._write_metadata()
f = self._finder_from_td()
sys.meta_path = [f]
sys.path = []
dists = OxidizedDistribution.discover()
self.assertIsInstance(dists, collections.abc.Iterator)
dists = list(dists)
self.assertEqual(len(dists), 1)
dist = dists[0]
self.assertEqual(dist.metadata["Name"], "my_package")
def test_distribution_discover_context_kwarg(self):
self._write_metadata()
f = self._finder_from_td()
sys.meta_path = [f]
sys.path = []
dists = list(
OxidizedDistribution.discover(
context=importlib.metadata.DistributionFinder.Context(name="missing")
)
)
self.assertEqual(len(dists), 0)
dists = list(
OxidizedDistribution.discover(
context=importlib.metadata.DistributionFinder.Context()
)
)
self.assertEqual(len(dists), 1)
dists = list(
OxidizedDistribution.discover(
context=importlib.metadata.DistributionFinder.Context(name="my_package")
)
)
self.assertEqual(len(dists), 1)
def test_distribution_discover_name_kwarg(self):
self._write_metadata()
f = self._finder_from_td()
sys.meta_path = [f]
sys.path = []
dists = list(OxidizedDistribution.discover(name="missing"))
self.assertEqual(len(dists), 0)
dists = list(OxidizedDistribution.discover(name="my_package"))
self.assertEqual(len(dists), 1)
def test_distribution_discover_conflicting_args(self):
with self.assertRaises(ValueError):
OxidizedDistribution.discover(context="ignored", name="ignored")
def test_distribution_at(self):
self._write_metadata()
f = self._finder_from_td()
sys.meta_path = [f]
sys.path = []
with self.assertRaises(AttributeError):
OxidizedDistribution.at(self.td)
if __name__ == "__main__":
unittest.main()