import sys
import _contextvars
import _thread
__all__ = ["warn", "warn_explicit", "showwarning",
"formatwarning", "filterwarnings", "simplefilter",
"resetwarnings", "catch_warnings", "deprecated"]
_wm = None
def _set_module(module):
global _wm
_wm = module
filters = []
defaultaction = "default"
onceregistry = {}
_lock = _thread.RLock()
_filters_version = 1
_use_context = sys.flags.context_aware_warnings
class _Context:
def __init__(self, filters):
self._filters = filters
self.log = None
def copy(self):
context = _Context(self._filters[:])
if self.log is not None:
context.log = self.log
return context
def _record_warning(self, msg):
self.log.append(msg)
class _GlobalContext(_Context):
def __init__(self):
self.log = None
@property
def _filters(self):
try:
return _wm.filters
except AttributeError:
return []
_global_context = _GlobalContext()
_warnings_context = _contextvars.ContextVar('warnings_context')
def _get_context():
if not _use_context:
return _global_context
try:
return _wm._warnings_context.get()
except LookupError:
return _global_context
def _set_context(context):
assert _use_context
_wm._warnings_context.set(context)
def _new_context():
assert _use_context
old_context = _wm._get_context()
new_context = old_context.copy()
_wm._set_context(new_context)
return old_context, new_context
def _get_filters():
return _wm._get_context()._filters
def _filters_mutated_lock_held():
_wm._filters_version += 1
def showwarning(message, category, filename, lineno, file=None, line=None):
msg = _wm.WarningMessage(message, category, filename, lineno, file, line)
_wm._showwarnmsg_impl(msg)
def formatwarning(message, category, filename, lineno, line=None):
msg = _wm.WarningMessage(message, category, filename, lineno, None, line)
return _wm._formatwarnmsg_impl(msg)
def _showwarnmsg_impl(msg):
context = _wm._get_context()
if context.log is not None:
context._record_warning(msg)
return
file = msg.file
if file is None:
file = sys.stderr
if file is None:
return
text = _wm._formatwarnmsg(msg)
try:
file.write(text)
except OSError:
pass
def _formatwarnmsg_impl(msg):
category = msg.category.__name__
s = f"{msg.filename}:{msg.lineno}: {category}: {msg.message}\n"
if msg.line is None:
try:
import linecache
line = linecache.getline(msg.filename, msg.lineno)
except Exception:
line = None
linecache = None
else:
line = msg.line
if line:
line = line.strip()
s += " %s\n" % line
if msg.source is not None:
try:
import tracemalloc
except Exception:
suggest_tracemalloc = False
tb = None
else:
try:
suggest_tracemalloc = not tracemalloc.is_tracing()
tb = tracemalloc.get_object_traceback(msg.source)
except Exception:
suggest_tracemalloc = False
tb = None
if tb is not None:
s += 'Object allocated at (most recent call last):\n'
for frame in tb:
s += (' File "%s", lineno %s\n'
% (frame.filename, frame.lineno))
try:
if linecache is not None:
line = linecache.getline(frame.filename, frame.lineno)
else:
line = None
except Exception:
line = None
if line:
line = line.strip()
s += ' %s\n' % line
elif suggest_tracemalloc:
s += (f'{category}: Enable tracemalloc to get the object '
f'allocation traceback\n')
return s
_showwarning_orig = showwarning
def _showwarnmsg(msg):
try:
sw = _wm.showwarning
except AttributeError:
pass
else:
if sw is not _showwarning_orig:
if not callable(sw):
raise TypeError("warnings.showwarning() must be set to a "
"function or method")
sw(msg.message, msg.category, msg.filename, msg.lineno,
msg.file, msg.line)
return
_wm._showwarnmsg_impl(msg)
_formatwarning_orig = formatwarning
def _formatwarnmsg(msg):
try:
fw = _wm.formatwarning
except AttributeError:
pass
else:
if fw is not _formatwarning_orig:
return fw(msg.message, msg.category,
msg.filename, msg.lineno, msg.line)
return _wm._formatwarnmsg_impl(msg)
def filterwarnings(action, message="", category=Warning, module="", lineno=0,
append=False):
if action not in {"error", "ignore", "always", "all", "default", "module", "once"}:
raise ValueError(f"invalid action: {action!r}")
if not isinstance(message, str):
raise TypeError("message must be a string")
if not isinstance(category, type) or not issubclass(category, Warning):
raise TypeError("category must be a Warning subclass")
if not isinstance(module, str):
raise TypeError("module must be a string")
if not isinstance(lineno, int):
raise TypeError("lineno must be an int")
if lineno < 0:
raise ValueError("lineno must be an int >= 0")
if message or module:
import re
if message:
message = re.compile(message, re.I)
else:
message = None
if module:
module = re.compile(module)
else:
module = None
_wm._add_filter(action, message, category, module, lineno, append=append)
def simplefilter(action, category=Warning, lineno=0, append=False):
if action not in {"error", "ignore", "always", "all", "default", "module", "once"}:
raise ValueError(f"invalid action: {action!r}")
if not isinstance(lineno, int):
raise TypeError("lineno must be an int")
if lineno < 0:
raise ValueError("lineno must be an int >= 0")
_wm._add_filter(action, None, category, None, lineno, append=append)
def _filters_mutated():
with _wm._lock:
_wm._filters_mutated_lock_held()
def _add_filter(*item, append):
with _wm._lock:
filters = _wm._get_filters()
if not append:
try:
filters.remove(item)
except ValueError:
pass
filters.insert(0, item)
else:
if item not in filters:
filters.append(item)
_wm._filters_mutated_lock_held()
def resetwarnings():
with _wm._lock:
del _wm._get_filters()[:]
_wm._filters_mutated_lock_held()
class _OptionError(Exception):
pass
def _processoptions(args):
for arg in args:
try:
_wm._setoption(arg)
except _wm._OptionError as msg:
print("Invalid -W option ignored:", msg, file=sys.stderr)
def _setoption(arg):
parts = arg.split(':')
if len(parts) > 5:
raise _wm._OptionError("too many fields (max 5): %r" % (arg,))
while len(parts) < 5:
parts.append('')
action, message, category, module, lineno = [s.strip()
for s in parts]
action = _wm._getaction(action)
category = _wm._getcategory(category)
if message or module:
import re
if message:
message = re.escape(message)
if module:
module = re.escape(module) + r'\z'
if lineno:
try:
lineno = int(lineno)
if lineno < 0:
raise ValueError
except (ValueError, OverflowError):
raise _wm._OptionError("invalid lineno %r" % (lineno,)) from None
else:
lineno = 0
_wm.filterwarnings(action, message, category, module, lineno)
def _getaction(action):
if not action:
return "default"
for a in ('default', 'always', 'all', 'ignore', 'module', 'once', 'error'):
if a.startswith(action):
return a
raise _wm._OptionError("invalid action: %r" % (action,))
def _getcategory(category):
if not category:
return Warning
if '.' not in category:
import builtins as m
klass = category
else:
module, _, klass = category.rpartition('.')
try:
m = __import__(module, None, None, [klass])
except ImportError:
raise _wm._OptionError("invalid module name: %r" % (module,)) from None
try:
cat = getattr(m, klass)
except AttributeError:
raise _wm._OptionError("unknown warning category: %r" % (category,)) from None
if not issubclass(cat, Warning):
raise _wm._OptionError("invalid warning category: %r" % (category,))
return cat
def _is_internal_filename(filename):
return 'importlib' in filename and '_bootstrap' in filename
def _is_filename_to_skip(filename, skip_file_prefixes):
return any(filename.startswith(prefix) for prefix in skip_file_prefixes)
def _is_internal_frame(frame):
return _is_internal_filename(frame.f_code.co_filename)
def _next_external_frame(frame, skip_file_prefixes):
frame = frame.f_back
while frame is not None and (
_is_internal_filename(filename := frame.f_code.co_filename) or
_is_filename_to_skip(filename, skip_file_prefixes)):
frame = frame.f_back
return frame
def warn(message, category=None, stacklevel=1, source=None,
*, skip_file_prefixes=()):
if isinstance(message, Warning):
category = message.__class__
if category is None:
category = UserWarning
if not (isinstance(category, type) and issubclass(category, Warning)):
raise TypeError("category must be a Warning subclass, "
"not '{:s}'".format(type(category).__name__))
if not isinstance(skip_file_prefixes, tuple):
raise TypeError('skip_file_prefixes must be a tuple of strs.')
if skip_file_prefixes:
stacklevel = max(2, stacklevel)
try:
if stacklevel <= 1 or _is_internal_frame(sys._getframe(1)):
frame = sys._getframe(stacklevel)
else:
frame = sys._getframe(1)
for x in range(stacklevel-1):
frame = _next_external_frame(frame, skip_file_prefixes)
if frame is None:
raise ValueError
except ValueError:
globals = sys.__dict__
filename = "<sys>"
lineno = 0
else:
globals = frame.f_globals
filename = frame.f_code.co_filename
lineno = frame.f_lineno
if '__name__' in globals:
module = globals['__name__']
else:
module = "<string>"
registry = globals.setdefault("__warningregistry__", {})
_wm.warn_explicit(
message,
category,
filename,
lineno,
module,
registry,
globals,
source=source,
)
def warn_explicit(message, category, filename, lineno,
module=None, registry=None, module_globals=None,
source=None):
lineno = int(lineno)
if module is None:
module = filename or "<unknown>"
if module[-3:].lower() == ".py":
module = module[:-3] if isinstance(message, Warning):
text = str(message)
category = message.__class__
else:
text = message
message = category(message)
key = (text, category, lineno)
with _wm._lock:
if registry is None:
registry = {}
if registry.get('version', 0) != _wm._filters_version:
registry.clear()
registry['version'] = _wm._filters_version
if registry.get(key):
return
for item in _wm._get_filters():
action, msg, cat, mod, ln = item
if ((msg is None or msg.match(text)) and
issubclass(category, cat) and
(mod is None or mod.match(module)) and
(ln == 0 or lineno == ln)):
break
else:
action = _wm.defaultaction
if action == "ignore":
return
if action == "error":
raise message
if action == "once":
registry[key] = 1
oncekey = (text, category)
if _wm.onceregistry.get(oncekey):
return
_wm.onceregistry[oncekey] = 1
elif action in {"always", "all"}:
pass
elif action == "module":
registry[key] = 1
altkey = (text, category, 0)
if registry.get(altkey):
return
registry[altkey] = 1
elif action == "default":
registry[key] = 1
else:
raise RuntimeError(
"Unrecognized action (%r) in warnings.filters:\n %s" %
(action, item))
import linecache
linecache.getlines(filename, module_globals)
msg = _wm.WarningMessage(message, category, filename, lineno, source=source)
_wm._showwarnmsg(msg)
class WarningMessage(object):
_WARNING_DETAILS = ("message", "category", "filename", "lineno", "file",
"line", "source")
def __init__(self, message, category, filename, lineno, file=None,
line=None, source=None):
self.message = message
self.category = category
self.filename = filename
self.lineno = lineno
self.file = file
self.line = line
self.source = source
self._category_name = category.__name__ if category else None
def __str__(self):
return ("{message : %r, category : %r, filename : %r, lineno : %s, "
"line : %r}" % (self.message, self._category_name,
self.filename, self.lineno, self.line))
def __repr__(self):
return f'<{type(self).__qualname__} {self}>'
class catch_warnings(object):
def __init__(self, *, record=False, module=None,
action=None, category=Warning, lineno=0, append=False):
self._record = record
self._module = sys.modules['warnings'] if module is None else module
self._entered = False
if action is None:
self._filter = None
else:
self._filter = (action, category, lineno, append)
def __repr__(self):
args = []
if self._record:
args.append("record=True")
if self._module is not sys.modules['warnings']:
args.append("module=%r" % self._module)
name = type(self).__name__
return "%s(%s)" % (name, ", ".join(args))
def __enter__(self):
if self._entered:
raise RuntimeError("Cannot enter %r twice" % self)
self._entered = True
with _wm._lock:
if _use_context:
self._saved_context, context = self._module._new_context()
else:
context = None
self._filters = self._module.filters
self._module.filters = self._filters[:]
self._showwarning = self._module.showwarning
self._showwarnmsg_impl = self._module._showwarnmsg_impl
self._module._filters_mutated_lock_held()
if self._record:
if _use_context:
context.log = log = []
else:
log = []
self._module._showwarnmsg_impl = log.append
self._module.showwarning = self._module._showwarning_orig
else:
log = None
if self._filter is not None:
self._module.simplefilter(*self._filter)
return log
def __exit__(self, *exc_info):
if not self._entered:
raise RuntimeError("Cannot exit %r without entering first" % self)
with _wm._lock:
if _use_context:
self._module._warnings_context.set(self._saved_context)
else:
self._module.filters = self._filters
self._module.showwarning = self._showwarning
self._module._showwarnmsg_impl = self._showwarnmsg_impl
self._module._filters_mutated_lock_held()
class deprecated:
def __init__(
self,
message: str,
/,
*,
category: type[Warning] | None = DeprecationWarning,
stacklevel: int = 1,
) -> None:
if not isinstance(message, str):
raise TypeError(
f"Expected an object of type str for 'message', not {type(message).__name__!r}"
)
self.message = message
self.category = category
self.stacklevel = stacklevel
def __call__(self, arg, /):
msg = self.message
category = self.category
stacklevel = self.stacklevel
if category is None:
arg.__deprecated__ = msg
return arg
elif isinstance(arg, type):
import functools
from types import MethodType
original_new = arg.__new__
@functools.wraps(original_new)
def __new__(cls, /, *args, **kwargs):
if cls is arg:
_wm.warn(msg, category=category, stacklevel=stacklevel + 1)
if original_new is not object.__new__:
return original_new(cls, *args, **kwargs)
elif cls.__init__ is object.__init__ and (args or kwargs):
raise TypeError(f"{cls.__name__}() takes no arguments")
else:
return original_new(cls)
arg.__new__ = staticmethod(__new__)
if "__init_subclass__" in arg.__dict__:
original_init_subclass = arg.__init_subclass__
if isinstance(original_init_subclass, MethodType):
original_init_subclass = original_init_subclass.__func__
@functools.wraps(original_init_subclass)
def __init_subclass__(*args, **kwargs):
_wm.warn(msg, category=category, stacklevel=stacklevel + 1)
return original_init_subclass(*args, **kwargs)
else:
def __init_subclass__(cls, *args, **kwargs):
_wm.warn(msg, category=category, stacklevel=stacklevel + 1)
return super(arg, cls).__init_subclass__(*args, **kwargs)
arg.__init_subclass__ = classmethod(__init_subclass__)
arg.__deprecated__ = __new__.__deprecated__ = msg
__init_subclass__.__deprecated__ = msg
return arg
elif callable(arg):
import functools
import inspect
@functools.wraps(arg)
def wrapper(*args, **kwargs):
_wm.warn(msg, category=category, stacklevel=stacklevel + 1)
return arg(*args, **kwargs)
if inspect.iscoroutinefunction(arg):
wrapper = inspect.markcoroutinefunction(wrapper)
arg.__deprecated__ = wrapper.__deprecated__ = msg
return wrapper
else:
raise TypeError(
"@deprecated decorator with non-None category must be applied to "
f"a class or callable, not {arg!r}"
)
_DEPRECATED_MSG = "{name!r} is deprecated and slated for removal in Python {remove}"
def _deprecated(name, message=_DEPRECATED_MSG, *, remove, _version=sys.version_info):
remove_formatted = f"{remove[0]}.{remove[1]}"
if (_version[:2] > remove) or (_version[:2] == remove and _version[3] != "alpha"):
msg = f"{name!r} was slated for removal after Python {remove_formatted} alpha"
raise RuntimeError(msg)
else:
msg = message.format(name=name, remove=remove_formatted)
_wm.warn(msg, DeprecationWarning, stacklevel=3)
def _warn_unawaited_coroutine(coro):
msg_lines = [
f"coroutine '{coro.__qualname__}' was never awaited\n"
]
if coro.cr_origin is not None:
import linecache, traceback
def extract():
for filename, lineno, funcname in reversed(coro.cr_origin):
line = linecache.getline(filename, lineno)
yield (filename, lineno, funcname, line)
msg_lines.append("Coroutine created at (most recent call last)\n")
msg_lines += traceback.format_list(list(extract()))
msg = "".join(msg_lines).rstrip("\n")
_wm.warn(
msg, category=RuntimeWarning, stacklevel=2, source=coro
)
def _setup_defaults():
if hasattr(sys, 'gettotalrefcount'):
return
_wm.filterwarnings("default", category=DeprecationWarning, module="__main__", append=1)
_wm.simplefilter("ignore", category=DeprecationWarning, append=1)
_wm.simplefilter("ignore", category=PendingDeprecationWarning, append=1)
_wm.simplefilter("ignore", category=ImportWarning, append=1)
_wm.simplefilter("ignore", category=ResourceWarning, append=1)