from __future__ import print_function
import json
import os
import subprocess
import sys
import re
import tempfile
import unittest
from collections import defaultdict, namedtuple
from shutil import which
Frame = namedtuple("Frame", ["file", "name", "line", "col"])
GIL = ["--gil"]
PYSPY = which("py-spy")
class TestPyspy(unittest.TestCase):
def _sample_process(self, script_name, options=None, include_profile_name=False):
if not PYSPY:
raise ValueError("Failed to find py-spy on the path")
with tempfile.NamedTemporaryFile() as profile_file:
filename = profile_file.name
if sys.platform.startswith("win"):
filename = "profile.json"
cmdline = [
PYSPY,
"record",
"-o",
filename,
"--format",
"speedscope",
"-d",
"2",
]
cmdline.extend(options or [])
cmdline.extend(["--", sys.executable, script_name])
env = dict(os.environ, RUST_LOG="info", RUST_BACKTRACE="1")
subprocess.check_output(cmdline, env=env)
with open(filename) as f:
profiles = json.load(f)
frames = profiles["shared"]["frames"]
samples = defaultdict(int)
for p in profiles["profiles"]:
for sample in p["samples"]:
if include_profile_name:
samples[
tuple(
[p["name"]] + [Frame(**frames[frame]) for frame in sample]
)
] += 1
else:
samples[tuple(Frame(**frames[frame]) for frame in sample)] += 1
return samples
def test_longsleep(self):
if GIL:
profile = self._sample_process(_get_script("longsleep.py"), GIL)
print(profile)
assert sum(profile.values()) <= 10
profile = self._sample_process(_get_script("longsleep.py"), ["--idle"])
sample, count = _most_frequent_sample(profile)
assert count >= 95
assert len(sample) == 2
assert sample[0].name == "<module>"
assert sample[0].line == 9
assert sample[1].name == "longsleep"
assert sample[1].line == 5
def test_busyloop(self):
profile = self._sample_process(_get_script("busyloop.py"), GIL)
assert sum(profile.values()) >= 95
def test_thread_names(self):
v = sys.version_info
if v.major < 3 or v.minor < 6:
return
for _ in range(3):
profile = self._sample_process(
_get_script("thread_names.py"),
["--threads", "--idle"],
include_profile_name=True,
)
expected_thread_names = set("CustomThreadName-" + str(i) for i in range(10))
expected_thread_names.add("MainThread")
name_re = re.compile(r"\"(.*)\"")
actual_thread_names = {name_re.search(p[0]).groups()[0] for p in profile}
if expected_thread_names == actual_thread_names:
break
if expected_thread_names != actual_thread_names:
print(
"failed to get thread names",
expected_thread_names,
actual_thread_names,
)
assert expected_thread_names == actual_thread_names
def test_shell_completions(self):
cmdline = [PYSPY, "completions", "bash"]
subprocess.check_output(cmdline)
def _get_script(name):
base_dir = os.path.dirname(__file__)
return os.path.join(base_dir, "scripts", name)
def _most_frequent_sample(samples):
frames, count = max(samples.items(), key=lambda x: x[1])
return frames, int(100 * count / sum(samples.values()))
if __name__ == "__main__":
print("Testing py-spy @", PYSPY)
unittest.main()