from __future__ import absolute_import, division, print_function
import codecs
import gc
import os
import platform
import re
import subprocess
import six
import sys
import time
import traceback
from fnmatch import fnmatch
from weakref import WeakKeyDictionary
from _pytest.capture import MultiCapture, SysCapture
from _pytest._code import Source
import py
import pytest
from _pytest.main import Session, EXIT_OK
from _pytest.assertion.rewrite import AssertionRewritingHook
PYTEST_FULLPATH = os.path.abspath(pytest.__file__.rstrip("oc")).replace(
"$py.class", ".py"
)
IGNORE_PAM = [ u"/var/lib/sss/mc/passwd"
]
def pytest_addoption(parser):
parser.addoption(
"--lsof",
action="store_true",
dest="lsof",
default=False,
help=("run FD checks if lsof is available"),
)
parser.addoption(
"--runpytest",
default="inprocess",
dest="runpytest",
choices=("inprocess", "subprocess"),
help=(
"run pytest sub runs in tests using an 'inprocess' "
"or 'subprocess' (python -m main) method"
),
)
def pytest_configure(config):
if config.getvalue("lsof"):
checker = LsofFdLeakChecker()
if checker.matching_platform():
config.pluginmanager.register(checker)
class LsofFdLeakChecker(object):
def get_open_files(self):
out = self._exec_lsof()
open_files = self._parse_lsof_output(out)
return open_files
def _exec_lsof(self):
pid = os.getpid()
return py.process.cmdexec("lsof -Ffn0 -p %d" % pid)
def _parse_lsof_output(self, out):
def isopen(line):
return line.startswith("f") and (
"deleted" not in line
and "mem" not in line
and "txt" not in line
and "cwd" not in line
)
open_files = []
for line in out.split("\n"):
if isopen(line):
fields = line.split("\0")
fd = fields[0][1:]
filename = fields[1][1:]
if filename in IGNORE_PAM:
continue
if filename.startswith("/"):
open_files.append((fd, filename))
return open_files
def matching_platform(self):
try:
py.process.cmdexec("lsof -v")
except (py.process.cmdexec.Error, UnicodeDecodeError):
return False
else:
return True
@pytest.hookimpl(hookwrapper=True, tryfirst=True)
def pytest_runtest_protocol(self, item):
lines1 = self.get_open_files()
yield
if hasattr(sys, "pypy_version_info"):
gc.collect()
lines2 = self.get_open_files()
new_fds = {t[0] for t in lines2} - {t[0] for t in lines1}
leaked_files = [t for t in lines2 if t[0] in new_fds]
if leaked_files:
error = []
error.append("***** %s FD leakage detected" % len(leaked_files))
error.extend([str(f) for f in leaked_files])
error.append("*** Before:")
error.extend([str(f) for f in lines1])
error.append("*** After:")
error.extend([str(f) for f in lines2])
error.append(error[0])
error.append("*** function %s:%s: %s " % item.location)
error.append("See issue #2366")
item.warn("", "\n".join(error))
winpymap = {
"python2.7": r"C:\Python27\python.exe",
"python3.4": r"C:\Python34\python.exe",
"python3.5": r"C:\Python35\python.exe",
"python3.6": r"C:\Python36\python.exe",
}
def getexecutable(name, cache={}):
try:
return cache[name]
except KeyError:
executable = py.path.local.sysfind(name)
if executable:
import subprocess
popen = subprocess.Popen(
[str(executable), "--version"],
universal_newlines=True,
stderr=subprocess.PIPE,
)
out, err = popen.communicate()
if name == "jython":
if not err or "2.5" not in err:
executable = None
if "2.5.2" in err:
executable = None elif popen.returncode != 0:
executable = None
cache[name] = executable
return executable
@pytest.fixture(params=["python2.7", "python3.4", "pypy", "pypy3"])
def anypython(request):
name = request.param
executable = getexecutable(name)
if executable is None:
if sys.platform == "win32":
executable = winpymap.get(name, None)
if executable:
executable = py.path.local(executable)
if executable.check():
return executable
pytest.skip("no suitable %s found" % (name,))
return executable
@pytest.fixture
def _pytest(request):
return PytestArg(request)
class PytestArg(object):
def __init__(self, request):
self.request = request
def gethookrecorder(self, hook):
hookrecorder = HookRecorder(hook._pm)
self.request.addfinalizer(hookrecorder.finish_recording)
return hookrecorder
def get_public_names(values):
return [x for x in values if x[0] != "_"]
class ParsedCall(object):
def __init__(self, name, kwargs):
self.__dict__.update(kwargs)
self._name = name
def __repr__(self):
d = self.__dict__.copy()
del d["_name"]
return "<ParsedCall %r(**%r)>" % (self._name, d)
class HookRecorder(object):
def __init__(self, pluginmanager):
self._pluginmanager = pluginmanager
self.calls = []
def before(hook_name, hook_impls, kwargs):
self.calls.append(ParsedCall(hook_name, kwargs))
def after(outcome, hook_name, hook_impls, kwargs):
pass
self._undo_wrapping = pluginmanager.add_hookcall_monitoring(before, after)
def finish_recording(self):
self._undo_wrapping()
def getcalls(self, names):
if isinstance(names, str):
names = names.split()
return [call for call in self.calls if call._name in names]
def assert_contains(self, entries):
__tracebackhide__ = True
i = 0
entries = list(entries)
backlocals = sys._getframe(1).f_locals
while entries:
name, check = entries.pop(0)
for ind, call in enumerate(self.calls[i:]):
if call._name == name:
print("NAMEMATCH", name, call)
if eval(check, backlocals, call.__dict__):
print("CHECKERMATCH", repr(check), "->", call)
else:
print("NOCHECKERMATCH", repr(check), "-", call)
continue
i += ind + 1
break
print("NONAMEMATCH", name, "with", call)
else:
pytest.fail("could not find %r check %r" % (name, check))
def popcall(self, name):
__tracebackhide__ = True
for i, call in enumerate(self.calls):
if call._name == name:
del self.calls[i]
return call
lines = ["could not find call %r, in:" % (name,)]
lines.extend([" %s" % str(x) for x in self.calls])
pytest.fail("\n".join(lines))
def getcall(self, name):
values = self.getcalls(name)
assert len(values) == 1, (name, values)
return values[0]
def getreports(self, names="pytest_runtest_logreport pytest_collectreport"):
return [x.report for x in self.getcalls(names)]
def matchreport(
self,
inamepart="",
names="pytest_runtest_logreport pytest_collectreport",
when=None,
):
values = []
for rep in self.getreports(names=names):
try:
if not when and rep.when != "call" and rep.passed:
continue
except AttributeError:
pass
if when and getattr(rep, "when", None) != when:
continue
if not inamepart or inamepart in rep.nodeid.split("::"):
values.append(rep)
if not values:
raise ValueError(
"could not find test report matching %r: "
"no test reports at all!" % (inamepart,)
)
if len(values) > 1:
raise ValueError(
"found 2 or more testreports matching %r: %s" % (inamepart, values)
)
return values[0]
def getfailures(self, names="pytest_runtest_logreport pytest_collectreport"):
return [rep for rep in self.getreports(names) if rep.failed]
def getfailedcollections(self):
return self.getfailures("pytest_collectreport")
def listoutcomes(self):
passed = []
skipped = []
failed = []
for rep in self.getreports("pytest_collectreport pytest_runtest_logreport"):
if rep.passed:
if getattr(rep, "when", None) == "call":
passed.append(rep)
elif rep.skipped:
skipped.append(rep)
elif rep.failed:
failed.append(rep)
return passed, skipped, failed
def countoutcomes(self):
return [len(x) for x in self.listoutcomes()]
def assertoutcome(self, passed=0, skipped=0, failed=0):
realpassed, realskipped, realfailed = self.listoutcomes()
assert passed == len(realpassed)
assert skipped == len(realskipped)
assert failed == len(realfailed)
def clear(self):
self.calls[:] = []
@pytest.fixture
def linecomp(request):
return LineComp()
@pytest.fixture(name="LineMatcher")
def LineMatcher_fixture(request):
return LineMatcher
@pytest.fixture
def testdir(request, tmpdir_factory):
return Testdir(request, tmpdir_factory)
rex_outcome = re.compile(r"(\d+) ([\w-]+)")
class RunResult(object):
def __init__(self, ret, outlines, errlines, duration):
self.ret = ret
self.outlines = outlines
self.errlines = errlines
self.stdout = LineMatcher(outlines)
self.stderr = LineMatcher(errlines)
self.duration = duration
def parseoutcomes(self):
for line in reversed(self.outlines):
if "seconds" in line:
outcomes = rex_outcome.findall(line)
if outcomes:
d = {}
for num, cat in outcomes:
d[cat] = int(num)
return d
raise ValueError("Pytest terminal report not found")
def assert_outcomes(self, passed=0, skipped=0, failed=0, error=0):
d = self.parseoutcomes()
obtained = {
"passed": d.get("passed", 0),
"skipped": d.get("skipped", 0),
"failed": d.get("failed", 0),
"error": d.get("error", 0),
}
assert (
obtained == dict(passed=passed, skipped=skipped, failed=failed, error=error)
)
class CwdSnapshot(object):
def __init__(self):
self.__saved = os.getcwd()
def restore(self):
os.chdir(self.__saved)
class SysModulesSnapshot(object):
def __init__(self, preserve=None):
self.__preserve = preserve
self.__saved = dict(sys.modules)
def restore(self):
if self.__preserve:
self.__saved.update(
(k, m) for k, m in sys.modules.items() if self.__preserve(k)
)
sys.modules.clear()
sys.modules.update(self.__saved)
class SysPathsSnapshot(object):
def __init__(self):
self.__saved = list(sys.path), list(sys.meta_path)
def restore(self):
sys.path[:], sys.meta_path[:] = self.__saved
class Testdir(object):
def __init__(self, request, tmpdir_factory):
self.request = request
self._mod_collections = WeakKeyDictionary()
name = request.function.__name__
self.tmpdir = tmpdir_factory.mktemp(name, numbered=True)
self.plugins = []
self._cwd_snapshot = CwdSnapshot()
self._sys_path_snapshot = SysPathsSnapshot()
self._sys_modules_snapshot = self.__take_sys_modules_snapshot()
self.chdir()
self.request.addfinalizer(self.finalize)
method = self.request.config.getoption("--runpytest")
if method == "inprocess":
self._runpytest_method = self.runpytest_inprocess
elif method == "subprocess":
self._runpytest_method = self.runpytest_subprocess
def __repr__(self):
return "<Testdir %r>" % (self.tmpdir,)
def finalize(self):
self._sys_modules_snapshot.restore()
self._sys_path_snapshot.restore()
self._cwd_snapshot.restore()
def __take_sys_modules_snapshot(self):
def preserve_module(name):
return name.startswith("zope")
return SysModulesSnapshot(preserve=preserve_module)
def make_hook_recorder(self, pluginmanager):
assert not hasattr(pluginmanager, "reprec")
pluginmanager.reprec = reprec = HookRecorder(pluginmanager)
self.request.addfinalizer(reprec.finish_recording)
return reprec
def chdir(self):
self.tmpdir.chdir()
def _makefile(self, ext, args, kwargs, encoding="utf-8"):
items = list(kwargs.items())
def to_text(s):
return s.decode(encoding) if isinstance(s, bytes) else six.text_type(s)
if args:
source = u"\n".join(to_text(x) for x in args)
basename = self.request.function.__name__
items.insert(0, (basename, source))
ret = None
for basename, value in items:
p = self.tmpdir.join(basename).new(ext=ext)
p.dirpath().ensure_dir()
source = Source(value)
source = u"\n".join(to_text(line) for line in source.lines)
p.write(source.strip().encode(encoding), "wb")
if ret is None:
ret = p
return ret
def makefile(self, ext, *args, **kwargs):
return self._makefile(ext, args, kwargs)
def makeconftest(self, source):
return self.makepyfile(conftest=source)
def makeini(self, source):
return self.makefile(".ini", tox=source)
def getinicfg(self, source):
p = self.makeini(source)
return py.iniconfig.IniConfig(p)["pytest"]
def makepyfile(self, *args, **kwargs):
return self._makefile(".py", args, kwargs)
def maketxtfile(self, *args, **kwargs):
return self._makefile(".txt", args, kwargs)
def syspathinsert(self, path=None):
if path is None:
path = self.tmpdir
sys.path.insert(0, str(path))
self._possibly_invalidate_import_caches()
def _possibly_invalidate_import_caches(self):
try:
import importlib
except ImportError:
pass
else:
if hasattr(importlib, "invalidate_caches"):
importlib.invalidate_caches()
def mkdir(self, name):
return self.tmpdir.mkdir(name)
def mkpydir(self, name):
p = self.mkdir(name)
p.ensure("__init__.py")
return p
Session = Session
def getnode(self, config, arg):
session = Session(config)
assert "::" not in str(arg)
p = py.path.local(arg)
config.hook.pytest_sessionstart(session=session)
res = session.perform_collect([str(p)], genitems=False)[0]
config.hook.pytest_sessionfinish(session=session, exitstatus=EXIT_OK)
return res
def getpathnode(self, path):
config = self.parseconfigure(path)
session = Session(config)
x = session.fspath.bestrelpath(path)
config.hook.pytest_sessionstart(session=session)
res = session.perform_collect([x], genitems=False)[0]
config.hook.pytest_sessionfinish(session=session, exitstatus=EXIT_OK)
return res
def genitems(self, colitems):
session = colitems[0].session
result = []
for colitem in colitems:
result.extend(session.genitems(colitem))
return result
def runitem(self, source):
item = self.getitem(source)
testclassinstance = self.request.instance
runner = testclassinstance.getrunner()
return runner(item)
def inline_runsource(self, source, *cmdlineargs):
p = self.makepyfile(source)
values = list(cmdlineargs) + [p]
return self.inline_run(*values)
def inline_genitems(self, *args):
rec = self.inline_run("--collect-only", *args)
items = [x.item for x in rec.getcalls("pytest_itemcollected")]
return items, rec
def inline_run(self, *args, **kwargs):
finalizers = []
try:
orig_warn = AssertionRewritingHook._warn_already_imported
def revert_warn_already_imported():
AssertionRewritingHook._warn_already_imported = orig_warn
finalizers.append(revert_warn_already_imported)
AssertionRewritingHook._warn_already_imported = lambda *a: None
finalizers.append(self.__take_sys_modules_snapshot().restore)
finalizers.append(SysPathsSnapshot().restore)
rec = []
class Collect(object):
def pytest_configure(x, config):
rec.append(self.make_hook_recorder(config.pluginmanager))
plugins = kwargs.get("plugins") or []
plugins.append(Collect())
ret = pytest.main(list(args), plugins=plugins)
if len(rec) == 1:
reprec = rec.pop()
else:
class reprec(object):
pass
reprec.ret = ret
if ret == 2 and not kwargs.get("no_reraise_ctrlc"):
calls = reprec.getcalls("pytest_keyboard_interrupt")
if calls and calls[-1].excinfo.type == KeyboardInterrupt:
raise KeyboardInterrupt()
return reprec
finally:
for finalizer in finalizers:
finalizer()
def runpytest_inprocess(self, *args, **kwargs):
if kwargs.get("syspathinsert"):
self.syspathinsert()
now = time.time()
capture = MultiCapture(Capture=SysCapture)
capture.start_capturing()
try:
try:
reprec = self.inline_run(*args, **kwargs)
except SystemExit as e:
class reprec(object):
ret = e.args[0]
except Exception:
traceback.print_exc()
class reprec(object):
ret = 3
finally:
out, err = capture.readouterr()
capture.stop_capturing()
sys.stdout.write(out)
sys.stderr.write(err)
res = RunResult(reprec.ret, out.split("\n"), err.split("\n"), time.time() - now)
res.reprec = reprec
return res
def runpytest(self, *args, **kwargs):
args = self._ensure_basetemp(args)
return self._runpytest_method(*args, **kwargs)
def _ensure_basetemp(self, args):
args = [str(x) for x in args]
for x in args:
if str(x).startswith("--basetemp"):
break
else:
args.append("--basetemp=%s" % self.tmpdir.dirpath("basetemp"))
return args
def parseconfig(self, *args):
args = self._ensure_basetemp(args)
import _pytest.config
config = _pytest.config._prepareconfig(args, self.plugins)
self.request.addfinalizer(config._ensure_unconfigure)
return config
def parseconfigure(self, *args):
config = self.parseconfig(*args)
config._do_configure()
self.request.addfinalizer(config._ensure_unconfigure)
return config
def getitem(self, source, funcname="test_func"):
items = self.getitems(source)
for item in items:
if item.name == funcname:
return item
assert 0, (
"%r item not found in module:\n%s\nitems: %s" % (funcname, source, items)
)
def getitems(self, source):
modcol = self.getmodulecol(source)
return self.genitems([modcol])
def getmodulecol(self, source, configargs=(), withinit=False):
kw = {self.request.function.__name__: Source(source).strip()}
path = self.makepyfile(**kw)
if withinit:
self.makepyfile(__init__="#")
self.config = config = self.parseconfigure(path, *configargs)
node = self.getnode(config, path)
return node
def collect_by_name(self, modcol, name):
if modcol not in self._mod_collections:
self._mod_collections[modcol] = list(modcol.collect())
for colitem in self._mod_collections[modcol]:
if colitem.name == name:
return colitem
def popen(self, cmdargs, stdout, stderr, **kw):
env = os.environ.copy()
env["PYTHONPATH"] = os.pathsep.join(
filter(None, [str(os.getcwd()), env.get("PYTHONPATH", "")])
)
kw["env"] = env
popen = subprocess.Popen(
cmdargs, stdin=subprocess.PIPE, stdout=stdout, stderr=stderr, **kw
)
popen.stdin.close()
return popen
def run(self, *cmdargs):
return self._run(*cmdargs)
def _run(self, *cmdargs):
cmdargs = [str(x) for x in cmdargs]
p1 = self.tmpdir.join("stdout")
p2 = self.tmpdir.join("stderr")
print("running:", " ".join(cmdargs))
print(" in:", str(py.path.local()))
f1 = codecs.open(str(p1), "w", encoding="utf8")
f2 = codecs.open(str(p2), "w", encoding="utf8")
try:
now = time.time()
popen = self.popen(
cmdargs, stdout=f1, stderr=f2, close_fds=(sys.platform != "win32")
)
ret = popen.wait()
finally:
f1.close()
f2.close()
f1 = codecs.open(str(p1), "r", encoding="utf8")
f2 = codecs.open(str(p2), "r", encoding="utf8")
try:
out = f1.read().splitlines()
err = f2.read().splitlines()
finally:
f1.close()
f2.close()
self._dump_lines(out, sys.stdout)
self._dump_lines(err, sys.stderr)
return RunResult(ret, out, err, time.time() - now)
def _dump_lines(self, lines, fp):
try:
for line in lines:
print(line, file=fp)
except UnicodeEncodeError:
print("couldn't print to %s because of encoding" % (fp,))
def _getpytestargs(self):
return (sys.executable, PYTEST_FULLPATH)
def runpython(self, script):
return self.run(sys.executable, script)
def runpython_c(self, command):
return self.run(sys.executable, "-c", command)
def runpytest_subprocess(self, *args, **kwargs):
p = py.path.local.make_numbered_dir(
prefix="runpytest-", keep=None, rootdir=self.tmpdir
)
args = ("--basetemp=%s" % p,) + args
plugins = [x for x in self.plugins if isinstance(x, str)]
if plugins:
args = ("-p", plugins[0]) + args
args = self._getpytestargs() + args
return self.run(*args)
def spawn_pytest(self, string, expect_timeout=10.0):
basetemp = self.tmpdir.mkdir("temp-pexpect")
invoke = " ".join(map(str, self._getpytestargs()))
cmd = "%s --basetemp=%s %s" % (invoke, basetemp, string)
return self.spawn(cmd, expect_timeout=expect_timeout)
def spawn(self, cmd, expect_timeout=10.0):
pexpect = pytest.importorskip("pexpect", "3.0")
if hasattr(sys, "pypy_version_info") and "64" in platform.machine():
pytest.skip("pypy-64 bit not supported")
if sys.platform.startswith("freebsd"):
pytest.xfail("pexpect does not work reliably on freebsd")
logfile = self.tmpdir.join("spawn.out").open("wb")
child = pexpect.spawn(cmd, logfile=logfile)
self.request.addfinalizer(logfile.close)
child.timeout = expect_timeout
return child
def getdecoded(out):
try:
return out.decode("utf-8")
except UnicodeDecodeError:
return "INTERNAL not-utf8-decodeable, truncated string:\n%s" % (
py.io.saferepr(out),
)
class LineComp(object):
def __init__(self):
self.stringio = py.io.TextIO()
def assert_contains_lines(self, lines2):
__tracebackhide__ = True
val = self.stringio.getvalue()
self.stringio.truncate(0)
self.stringio.seek(0)
lines1 = val.split("\n")
return LineMatcher(lines1).fnmatch_lines(lines2)
class LineMatcher(object):
def __init__(self, lines):
self.lines = lines
self._log_output = []
def str(self):
return "\n".join(self.lines)
def _getlines(self, lines2):
if isinstance(lines2, str):
lines2 = Source(lines2)
if isinstance(lines2, Source):
lines2 = lines2.strip().lines
return lines2
def fnmatch_lines_random(self, lines2):
self._match_lines_random(lines2, fnmatch)
def re_match_lines_random(self, lines2):
self._match_lines_random(lines2, lambda name, pat: re.match(pat, name))
def _match_lines_random(self, lines2, match_func):
lines2 = self._getlines(lines2)
for line in lines2:
for x in self.lines:
if line == x or match_func(x, line):
self._log("matched: ", repr(line))
break
else:
self._log("line %r not found in output" % line)
raise ValueError(self._log_text)
def get_lines_after(self, fnline):
for i, line in enumerate(self.lines):
if fnline == line or fnmatch(line, fnline):
return self.lines[i + 1:]
raise ValueError("line %r not found in output" % fnline)
def _log(self, *args):
self._log_output.append(" ".join((str(x) for x in args)))
@property
def _log_text(self):
return "\n".join(self._log_output)
def fnmatch_lines(self, lines2):
self._match_lines(lines2, fnmatch, "fnmatch")
def re_match_lines(self, lines2):
self._match_lines(lines2, lambda name, pat: re.match(pat, name), "re.match")
def _match_lines(self, lines2, match_func, match_nickname):
lines2 = self._getlines(lines2)
lines1 = self.lines[:]
nextline = None
extralines = []
__tracebackhide__ = True
for line in lines2:
nomatchprinted = False
while lines1:
nextline = lines1.pop(0)
if line == nextline:
self._log("exact match:", repr(line))
break
elif match_func(nextline, line):
self._log("%s:" % match_nickname, repr(line))
self._log(" with:", repr(nextline))
break
else:
if not nomatchprinted:
self._log("nomatch:", repr(line))
nomatchprinted = True
self._log(" and:", repr(nextline))
extralines.append(nextline)
else:
self._log("remains unmatched: %r" % (line,))
pytest.fail(self._log_text)