import io
import sys
import traceback
from . import util
from functools import wraps
__unittest = True
def failfast(method):
@wraps(method)
def inner(self, *args, **kw):
if getattr(self, 'failfast', False):
self.stop()
return method(self, *args, **kw)
return inner
STDOUT_LINE = '\nStdout:\n%s'
STDERR_LINE = '\nStderr:\n%s'
class TestResult(object):
_previousTestClass = None
_testRunEntered = False
_moduleSetUpFailed = False
def __init__(self, stream=None, descriptions=None, verbosity=None):
self.failfast = False
self.failures = []
self.errors = []
self.testsRun = 0
self.skipped = []
self.expectedFailures = []
self.unexpectedSuccesses = []
self.shouldStop = False
self.buffer = False
self.tb_locals = False
self._stdout_buffer = None
self._stderr_buffer = None
self._original_stdout = sys.stdout
self._original_stderr = sys.stderr
self._mirrorOutput = False
def printErrors(self):
"Called by TestRunner after test run"
def startTest(self, test):
"Called when the given test is about to be run"
self.testsRun += 1
self._mirrorOutput = False
self._setupStdout()
def _setupStdout(self):
if self.buffer:
if self._stderr_buffer is None:
self._stderr_buffer = io.StringIO()
self._stdout_buffer = io.StringIO()
sys.stdout = self._stdout_buffer
sys.stderr = self._stderr_buffer
def startTestRun(self):
def stopTest(self, test):
self._restoreStdout()
self._mirrorOutput = False
def _restoreStdout(self):
if self.buffer:
if self._mirrorOutput:
output = sys.stdout.getvalue()
error = sys.stderr.getvalue()
if output:
if not output.endswith('\n'):
output += '\n'
self._original_stdout.write(STDOUT_LINE % output)
if error:
if not error.endswith('\n'):
error += '\n'
self._original_stderr.write(STDERR_LINE % error)
sys.stdout = self._original_stdout
sys.stderr = self._original_stderr
self._stdout_buffer.seek(0)
self._stdout_buffer.truncate()
self._stderr_buffer.seek(0)
self._stderr_buffer.truncate()
def stopTestRun(self):
@failfast
def addError(self, test, err):
self.errors.append((test, self._exc_info_to_string(err, test)))
self._mirrorOutput = True
@failfast
def addFailure(self, test, err):
self.failures.append((test, self._exc_info_to_string(err, test)))
self._mirrorOutput = True
def addSubTest(self, test, subtest, err):
if err is not None:
if getattr(self, 'failfast', False):
self.stop()
if issubclass(err[0], test.failureException):
errors = self.failures
else:
errors = self.errors
errors.append((subtest, self._exc_info_to_string(err, test)))
self._mirrorOutput = True
def addSuccess(self, test):
"Called when a test has completed successfully"
pass
def addSkip(self, test, reason):
self.skipped.append((test, reason))
def addExpectedFailure(self, test, err):
self.expectedFailures.append(
(test, self._exc_info_to_string(err, test)))
@failfast
def addUnexpectedSuccess(self, test):
self.unexpectedSuccesses.append(test)
def wasSuccessful(self):
return ((len(self.failures) == len(self.errors) == 0) and
(not hasattr(self, 'unexpectedSuccesses') or
len(self.unexpectedSuccesses) == 0))
def stop(self):
self.shouldStop = True
def _exc_info_to_string(self, err, test):
exctype, value, tb = err
while tb and self._is_relevant_tb_level(tb):
tb = tb.tb_next
if exctype is test.failureException:
length = self._count_relevant_tb_levels(tb)
else:
length = None
tb_e = traceback.TracebackException(
exctype, value, tb, limit=length, capture_locals=self.tb_locals)
msgLines = list(tb_e.format())
if self.buffer:
output = sys.stdout.getvalue()
error = sys.stderr.getvalue()
if output:
if not output.endswith('\n'):
output += '\n'
msgLines.append(STDOUT_LINE % output)
if error:
if not error.endswith('\n'):
error += '\n'
msgLines.append(STDERR_LINE % error)
return ''.join(msgLines)
def _is_relevant_tb_level(self, tb):
return '__unittest' in tb.tb_frame.f_globals
def _count_relevant_tb_levels(self, tb):
length = 0
while tb and not self._is_relevant_tb_level(tb):
length += 1
tb = tb.tb_next
return length
def __repr__(self):
return ("<%s run=%i errors=%i failures=%i>" %
(util.strclass(self.__class__), self.testsRun, len(self.errors),
len(self.failures)))