from contextlib import contextmanager, redirect_stderr
import importlib.util
from io import StringIO
import os
import pathlib
import sys
import tempfile
import unittest
from unittest.mock import patch
import warnings
from oxidized_importer import (
OxidizedFinder,
OxidizedResourceCollector,
PythonModuleBytecode,
find_resources_in_path,
)
@contextmanager
def assert_tempfile_cleaned_up(TemporaryDirectory=tempfile.TemporaryDirectory):
call_count = 0
class TrackingTempDir(TemporaryDirectory):
def cleanup(self):
nonlocal call_count
call_count += 1
super().cleanup()
patcher = patch("tempfile.TemporaryDirectory", TrackingTempDir)
msg = fr"""^Implicitly cleaning up <{TrackingTempDir.__name__} (['"]).*\1>$"""
with warnings.catch_warnings(record=True) as cm, patcher:
warnings.filterwarnings(
"error", category=ResourceWarning, module=r"^tempfile$", message=msg
)
yield TrackingTempDir, TrackingTempDir.cleanup
assert call_count == 1, f"tempfile.TemporaryDirectory.cleanup {call_count=}≠1"
class TestImporterResourceCollector(unittest.TestCase):
def setUp(self):
self.raw_temp_dir = tempfile.TemporaryDirectory(
prefix="oxidized_importer-test-"
)
self.td = pathlib.Path(self.raw_temp_dir.name)
def tearDown(self):
self.raw_temp_dir.cleanup()
del self.raw_temp_dir
del self.td
def test_construct(self):
with self.assertRaises(TypeError):
OxidizedResourceCollector()
c = OxidizedResourceCollector(allowed_locations=["in-memory"])
self.assertEqual(c.allowed_locations, ["in-memory"])
def test_tempdir_error(self):
class BadTempDir(tempfile.TemporaryDirectory):
def cleanup(self):
super().cleanup()
raise FileNotFoundError(self.name)
c = OxidizedResourceCollector(allowed_locations=["in-memory"])
stderr = StringIO()
assertion = assert_tempfile_cleaned_up(BadTempDir)
with assertion as (TrackingTempDir, cleanup), redirect_stderr(stderr):
c.oxidize()
self.assertRegex(
stderr.getvalue(),
fr"""Exception ignored in: <bound method {cleanup.__qualname__} """,
)
def test_source_module(self):
c = OxidizedResourceCollector(allowed_locations=["in-memory"])
source_path = self.td / "foo.py"
with source_path.open("wb") as fh:
fh.write(b"import io\n")
for resource in find_resources_in_path(self.td):
c.add_in_memory(resource)
f = OxidizedFinder()
with assert_tempfile_cleaned_up():
oxide = c.oxidize()
f.add_resources(oxide[0])
resources = [r for r in f.indexed_resources() if r.name == "foo"]
self.assertEqual(len(resources), 1)
r = resources[0]
self.assertEqual(r.in_memory_source, b"import io\n")
def test_add_sys_path(self):
c = OxidizedResourceCollector(
allowed_locations=["in-memory", "filesystem-relative"]
)
for path in sys.path:
if os.path.isdir(path):
for resource in find_resources_in_path(path):
c.add_in_memory(resource)
c.add_filesystem_relative("", resource)
with assert_tempfile_cleaned_up():
resources, file_installs = c.oxidize()
f = OxidizedFinder()
f.add_resources(resources)
with (self.td / "serialized").open("wb") as fh:
fh.write(f.serialize_indexed_resources())
f = OxidizedFinder()
f.index_file_memory_mapped(self.td / "serialized")
self.assertGreaterEqual(len(f.indexed_resources()), len(resources))
for r in f.indexed_resources():
r.in_memory_source
r.in_memory_bytecode
def test_urllib(self):
c = OxidizedResourceCollector(allowed_locations=["filesystem-relative"])
for path in sys.path:
if os.path.isdir(path):
for resource in find_resources_in_path(path):
if isinstance(resource, PythonModuleBytecode):
if resource.module.startswith("urllib"):
if resource.optimize_level == 0:
c.add_filesystem_relative("lib", resource)
with assert_tempfile_cleaned_up():
resources, file_installs = c.oxidize()
self.assertEqual(len(resources), len(file_installs))
idx = None
for i, resource in enumerate(resources):
if resource.name == "urllib.request":
idx = i
break
self.assertIsNotNone(idx)
(path, data, executable) = file_installs[idx]
self.assertEqual(
path,
pathlib.Path("lib")
/ "urllib"
/ "__pycache__"
/ ("request.%s.pyc" % sys.implementation.cache_tag),
)
self.assertTrue(data.startswith(importlib.util.MAGIC_NUMBER))
if __name__ == "__main__":
unittest.main()