__all__ = ['update_wrapper', 'wraps', 'WRAPPER_ASSIGNMENTS', 'WRAPPER_UPDATES',
'total_ordering', 'cache', 'cmp_to_key', 'lru_cache', 'reduce',
'partial', 'partialmethod', 'singledispatch', 'singledispatchmethod',
'cached_property', 'Placeholder']
from abc import get_cache_token
from collections import namedtuple
from operator import itemgetter
from reprlib import recursive_repr
from types import GenericAlias, MethodType, MappingProxyType, UnionType
from _thread import RLock
WRAPPER_ASSIGNMENTS = ('__module__', '__name__', '__qualname__', '__doc__',
'__annotate__', '__type_params__')
WRAPPER_UPDATES = ('__dict__',)
def update_wrapper(wrapper,
wrapped,
assigned = WRAPPER_ASSIGNMENTS,
updated = WRAPPER_UPDATES):
for attr in assigned:
try:
value = getattr(wrapped, attr)
except AttributeError:
pass
else:
setattr(wrapper, attr, value)
for attr in updated:
getattr(wrapper, attr).update(getattr(wrapped, attr, {}))
wrapper.__wrapped__ = wrapped
return wrapper
def wraps(wrapped,
assigned = WRAPPER_ASSIGNMENTS,
updated = WRAPPER_UPDATES):
return partial(update_wrapper, wrapped=wrapped,
assigned=assigned, updated=updated)
def _gt_from_lt(self, other):
'Return a > b. Computed by @total_ordering from (not a < b) and (a != b).'
op_result = type(self).__lt__(self, other)
if op_result is NotImplemented:
return op_result
return not op_result and self != other
def _le_from_lt(self, other):
'Return a <= b. Computed by @total_ordering from (a < b) or (a == b).'
op_result = type(self).__lt__(self, other)
if op_result is NotImplemented:
return op_result
return op_result or self == other
def _ge_from_lt(self, other):
'Return a >= b. Computed by @total_ordering from (not a < b).'
op_result = type(self).__lt__(self, other)
if op_result is NotImplemented:
return op_result
return not op_result
def _ge_from_le(self, other):
'Return a >= b. Computed by @total_ordering from (not a <= b) or (a == b).'
op_result = type(self).__le__(self, other)
if op_result is NotImplemented:
return op_result
return not op_result or self == other
def _lt_from_le(self, other):
'Return a < b. Computed by @total_ordering from (a <= b) and (a != b).'
op_result = type(self).__le__(self, other)
if op_result is NotImplemented:
return op_result
return op_result and self != other
def _gt_from_le(self, other):
'Return a > b. Computed by @total_ordering from (not a <= b).'
op_result = type(self).__le__(self, other)
if op_result is NotImplemented:
return op_result
return not op_result
def _lt_from_gt(self, other):
'Return a < b. Computed by @total_ordering from (not a > b) and (a != b).'
op_result = type(self).__gt__(self, other)
if op_result is NotImplemented:
return op_result
return not op_result and self != other
def _ge_from_gt(self, other):
'Return a >= b. Computed by @total_ordering from (a > b) or (a == b).'
op_result = type(self).__gt__(self, other)
if op_result is NotImplemented:
return op_result
return op_result or self == other
def _le_from_gt(self, other):
'Return a <= b. Computed by @total_ordering from (not a > b).'
op_result = type(self).__gt__(self, other)
if op_result is NotImplemented:
return op_result
return not op_result
def _le_from_ge(self, other):
'Return a <= b. Computed by @total_ordering from (not a >= b) or (a == b).'
op_result = type(self).__ge__(self, other)
if op_result is NotImplemented:
return op_result
return not op_result or self == other
def _gt_from_ge(self, other):
'Return a > b. Computed by @total_ordering from (a >= b) and (a != b).'
op_result = type(self).__ge__(self, other)
if op_result is NotImplemented:
return op_result
return op_result and self != other
def _lt_from_ge(self, other):
'Return a < b. Computed by @total_ordering from (not a >= b).'
op_result = type(self).__ge__(self, other)
if op_result is NotImplemented:
return op_result
return not op_result
_convert = {
'__lt__': [('__gt__', _gt_from_lt),
('__le__', _le_from_lt),
('__ge__', _ge_from_lt)],
'__le__': [('__ge__', _ge_from_le),
('__lt__', _lt_from_le),
('__gt__', _gt_from_le)],
'__gt__': [('__lt__', _lt_from_gt),
('__ge__', _ge_from_gt),
('__le__', _le_from_gt)],
'__ge__': [('__le__', _le_from_ge),
('__gt__', _gt_from_ge),
('__lt__', _lt_from_ge)]
}
def total_ordering(cls):
roots = {op for op in _convert if getattr(cls, op, None) is not getattr(object, op, None)}
if not roots:
raise ValueError('must define at least one ordering operation: < > <= >=')
root = max(roots) for opname, opfunc in _convert[root]:
if opname not in roots:
opfunc.__name__ = opname
setattr(cls, opname, opfunc)
return cls
def cmp_to_key(mycmp):
class K(object):
__slots__ = ['obj']
def __init__(self, obj):
self.obj = obj
def __lt__(self, other):
return mycmp(self.obj, other.obj) < 0
def __gt__(self, other):
return mycmp(self.obj, other.obj) > 0
def __eq__(self, other):
return mycmp(self.obj, other.obj) == 0
def __le__(self, other):
return mycmp(self.obj, other.obj) <= 0
def __ge__(self, other):
return mycmp(self.obj, other.obj) >= 0
__hash__ = None
return K
try:
from _functools import cmp_to_key
except ImportError:
pass
_initial_missing = object()
def reduce(function, sequence, initial=_initial_missing):
it = iter(sequence)
if initial is _initial_missing:
try:
value = next(it)
except StopIteration:
raise TypeError(
"reduce() of empty iterable with no initial value") from None
else:
value = initial
for element in it:
value = function(value, element)
return value
class _PlaceholderType:
__instance = None
__slots__ = ()
def __init_subclass__(cls, *args, **kwargs):
raise TypeError(f"type '{cls.__name__}' is not an acceptable base type")
def __new__(cls):
if cls.__instance is None:
cls.__instance = object.__new__(cls)
return cls.__instance
def __repr__(self):
return 'Placeholder'
def __reduce__(self):
return 'Placeholder'
Placeholder = _PlaceholderType()
def _partial_prepare_merger(args):
if not args:
return 0, None
nargs = len(args)
order = []
j = nargs
for i, a in enumerate(args):
if a is Placeholder:
order.append(j)
j += 1
else:
order.append(i)
phcount = j - nargs
merger = itemgetter(*order) if phcount else None
return phcount, merger
def _partial_new(cls, func, /, *args, **keywords):
if issubclass(cls, partial):
base_cls = partial
if not callable(func):
raise TypeError("the first argument must be callable")
else:
base_cls = partialmethod
if not callable(func) and not hasattr(func, "__get__"):
raise TypeError(f"the first argument {func!r} must be a callable "
"or a descriptor")
if args and args[-1] is Placeholder:
raise TypeError("trailing Placeholders are not allowed")
for value in keywords.values():
if value is Placeholder:
raise TypeError("Placeholder cannot be passed as a keyword argument")
if isinstance(func, base_cls):
pto_phcount = func._phcount
tot_args = func.args
if args:
tot_args += args
if pto_phcount:
nargs = len(args)
if nargs < pto_phcount:
tot_args += (Placeholder,) * (pto_phcount - nargs)
tot_args = func._merger(tot_args)
if nargs > pto_phcount:
tot_args += args[pto_phcount:]
phcount, merger = _partial_prepare_merger(tot_args)
else: phcount, merger = pto_phcount, func._merger
keywords = {**func.keywords, **keywords}
func = func.func
else:
tot_args = args
phcount, merger = _partial_prepare_merger(tot_args)
self = object.__new__(cls)
self.func = func
self.args = tot_args
self.keywords = keywords
self._phcount = phcount
self._merger = merger
return self
def _partial_repr(self):
cls = type(self)
module = cls.__module__
qualname = cls.__qualname__
args = [repr(self.func)]
args.extend(map(repr, self.args))
args.extend(f"{k}={v!r}" for k, v in self.keywords.items())
return f"{module}.{qualname}({', '.join(args)})"
class partial:
__slots__ = ("func", "args", "keywords", "_phcount", "_merger",
"__dict__", "__weakref__")
__new__ = _partial_new
__repr__ = recursive_repr()(_partial_repr)
def __call__(self, /, *args, **keywords):
phcount = self._phcount
if phcount:
try:
pto_args = self._merger(self.args + args)
args = args[phcount:]
except IndexError:
raise TypeError("missing positional arguments "
"in 'partial' call; expected "
f"at least {phcount}, got {len(args)}")
else:
pto_args = self.args
keywords = {**self.keywords, **keywords}
return self.func(*pto_args, *args, **keywords)
def __get__(self, obj, objtype=None):
if obj is None:
return self
return MethodType(self, obj)
def __reduce__(self):
return type(self), (self.func,), (self.func, self.args,
self.keywords or None, self.__dict__ or None)
def __setstate__(self, state):
if not isinstance(state, tuple):
raise TypeError("argument to __setstate__ must be a tuple")
if len(state) != 4:
raise TypeError(f"expected 4 items in state, got {len(state)}")
func, args, kwds, namespace = state
if (not callable(func) or not isinstance(args, tuple) or
(kwds is not None and not isinstance(kwds, dict)) or
(namespace is not None and not isinstance(namespace, dict))):
raise TypeError("invalid partial state")
if args and args[-1] is Placeholder:
raise TypeError("trailing Placeholders are not allowed")
phcount, merger = _partial_prepare_merger(args)
args = tuple(args) if kwds is None:
kwds = {}
elif type(kwds) is not dict: kwds = dict(kwds)
if namespace is None:
namespace = {}
self.__dict__ = namespace
self.func = func
self.args = args
self.keywords = kwds
self._phcount = phcount
self._merger = merger
__class_getitem__ = classmethod(GenericAlias)
try:
from _functools import partial, Placeholder, _PlaceholderType
except ImportError:
pass
class partialmethod:
__new__ = _partial_new
__repr__ = _partial_repr
def _make_unbound_method(self):
def _method(cls_or_self, /, *args, **keywords):
phcount = self._phcount
if phcount:
try:
pto_args = self._merger(self.args + args)
args = args[phcount:]
except IndexError:
raise TypeError("missing positional arguments "
"in 'partialmethod' call; expected "
f"at least {phcount}, got {len(args)}")
else:
pto_args = self.args
keywords = {**self.keywords, **keywords}
return self.func(cls_or_self, *pto_args, *args, **keywords)
_method.__isabstractmethod__ = self.__isabstractmethod__
_method.__partialmethod__ = self
return _method
def __get__(self, obj, cls=None):
get = getattr(self.func, "__get__", None)
result = None
if get is not None:
new_func = get(obj, cls)
if new_func is not self.func:
result = partial(new_func, *self.args, **self.keywords)
try:
result.__self__ = new_func.__self__
except AttributeError:
pass
if result is None:
result = self._make_unbound_method().__get__(obj, cls)
return result
@property
def __isabstractmethod__(self):
return getattr(self.func, "__isabstractmethod__", False)
__class_getitem__ = classmethod(GenericAlias)
def _unwrap_partial(func):
while isinstance(func, partial):
func = func.func
return func
def _unwrap_partialmethod(func):
prev = None
while func is not prev:
prev = func
while isinstance(getattr(func, "__partialmethod__", None), partialmethod):
func = func.__partialmethod__
while isinstance(func, partialmethod):
func = getattr(func, 'func')
func = _unwrap_partial(func)
return func
_CacheInfo = namedtuple("CacheInfo", ["hits", "misses", "maxsize", "currsize"])
def _make_key(args, kwds, typed,
kwd_mark = (object(),),
fasttypes = {int, str},
tuple=tuple, type=type, len=len):
key = args
if kwds:
key += kwd_mark
for item in kwds.items():
key += item
if typed:
key += tuple(type(v) for v in args)
if kwds:
key += tuple(type(v) for v in kwds.values())
elif len(key) == 1 and type(key[0]) in fasttypes:
return key[0]
return key
def lru_cache(maxsize=128, typed=False):
if isinstance(maxsize, int):
if maxsize < 0:
maxsize = 0
elif callable(maxsize) and isinstance(typed, bool):
user_function, maxsize = maxsize, 128
wrapper = _lru_cache_wrapper(user_function, maxsize, typed, _CacheInfo)
wrapper.cache_parameters = lambda : {'maxsize': maxsize, 'typed': typed}
return update_wrapper(wrapper, user_function)
elif maxsize is not None:
raise TypeError(
'Expected first argument to be an integer, a callable, or None')
def decorating_function(user_function):
wrapper = _lru_cache_wrapper(user_function, maxsize, typed, _CacheInfo)
wrapper.cache_parameters = lambda : {'maxsize': maxsize, 'typed': typed}
return update_wrapper(wrapper, user_function)
return decorating_function
def _lru_cache_wrapper(user_function, maxsize, typed, _CacheInfo):
sentinel = object() make_key = _make_key PREV, NEXT, KEY, RESULT = 0, 1, 2, 3
cache = {}
hits = misses = 0
full = False
cache_get = cache.get cache_len = cache.__len__ lock = RLock() root = [] root[:] = [root, root, None, None]
if maxsize == 0:
def wrapper(*args, **kwds):
nonlocal misses
misses += 1
result = user_function(*args, **kwds)
return result
elif maxsize is None:
def wrapper(*args, **kwds):
nonlocal hits, misses
key = make_key(args, kwds, typed)
result = cache_get(key, sentinel)
if result is not sentinel:
hits += 1
return result
misses += 1
result = user_function(*args, **kwds)
cache[key] = result
return result
else:
def wrapper(*args, **kwds):
nonlocal root, hits, misses, full
key = make_key(args, kwds, typed)
with lock:
link = cache_get(key)
if link is not None:
link_prev, link_next, _key, result = link
link_prev[NEXT] = link_next
link_next[PREV] = link_prev
last = root[PREV]
last[NEXT] = root[PREV] = link
link[PREV] = last
link[NEXT] = root
hits += 1
return result
misses += 1
result = user_function(*args, **kwds)
with lock:
if key in cache:
pass
elif full:
oldroot = root
oldroot[KEY] = key
oldroot[RESULT] = result
root = oldroot[NEXT]
oldkey = root[KEY]
oldresult = root[RESULT]
root[KEY] = root[RESULT] = None
del cache[oldkey]
cache[key] = oldroot
else:
last = root[PREV]
link = [last, root, key, result]
last[NEXT] = root[PREV] = cache[key] = link
full = (cache_len() >= maxsize)
return result
def cache_info():
with lock:
return _CacheInfo(hits, misses, maxsize, cache_len())
def cache_clear():
nonlocal hits, misses, full
with lock:
cache.clear()
root[:] = [root, root, None, None]
hits = misses = 0
full = False
wrapper.cache_info = cache_info
wrapper.cache_clear = cache_clear
return wrapper
try:
from _functools import _lru_cache_wrapper
except ImportError:
pass
def cache(user_function, /):
'Simple lightweight unbounded cache. Sometimes called "memoize".'
return lru_cache(maxsize=None)(user_function)
def _c3_merge(sequences):
result = []
while True:
sequences = [s for s in sequences if s] if not sequences:
return result
for s1 in sequences: candidate = s1[0]
for s2 in sequences:
if candidate in s2[1:]:
candidate = None
break else:
break
if candidate is None:
raise RuntimeError("Inconsistent hierarchy")
result.append(candidate)
for seq in sequences:
if seq[0] == candidate:
del seq[0]
def _c3_mro(cls, abcs=None):
for i, base in enumerate(reversed(cls.__bases__)):
if hasattr(base, '__abstractmethods__'):
boundary = len(cls.__bases__) - i
break else:
boundary = 0
abcs = list(abcs) if abcs else []
explicit_bases = list(cls.__bases__[:boundary])
abstract_bases = []
other_bases = list(cls.__bases__[boundary:])
for base in abcs:
if issubclass(cls, base) and not any(
issubclass(b, base) for b in cls.__bases__
):
abstract_bases.append(base)
for base in abstract_bases:
abcs.remove(base)
explicit_c3_mros = [_c3_mro(base, abcs=abcs) for base in explicit_bases]
abstract_c3_mros = [_c3_mro(base, abcs=abcs) for base in abstract_bases]
other_c3_mros = [_c3_mro(base, abcs=abcs) for base in other_bases]
return _c3_merge(
[[cls]] +
explicit_c3_mros + abstract_c3_mros + other_c3_mros +
[explicit_bases] + [abstract_bases] + [other_bases]
)
def _compose_mro(cls, types):
bases = set(cls.__mro__)
def is_related(typ):
return (typ not in bases and hasattr(typ, '__mro__')
and not isinstance(typ, GenericAlias)
and issubclass(cls, typ))
types = [n for n in types if is_related(n)]
def is_strict_base(typ):
for other in types:
if typ != other and typ in other.__mro__:
return True
return False
types = [n for n in types if not is_strict_base(n)]
type_set = set(types)
mro = []
for typ in types:
found = []
for sub in typ.__subclasses__():
if sub not in bases and issubclass(cls, sub):
found.append([s for s in sub.__mro__ if s in type_set])
if not found:
mro.append(typ)
continue
found.sort(key=len, reverse=True)
for sub in found:
for subcls in sub:
if subcls not in mro:
mro.append(subcls)
return _c3_mro(cls, abcs=mro)
def _find_impl(cls, registry):
mro = _compose_mro(cls, registry.keys())
match = None
for t in mro:
if match is not None:
if (t in registry and t not in cls.__mro__
and match not in cls.__mro__
and not issubclass(match, t)):
raise RuntimeError("Ambiguous dispatch: {} or {}".format(
match, t))
break
if t in registry:
match = t
return registry.get(match)
def singledispatch(func):
import weakref
registry = {}
dispatch_cache = weakref.WeakKeyDictionary()
cache_token = None
def dispatch(cls):
nonlocal cache_token
if cache_token is not None:
current_token = get_cache_token()
if cache_token != current_token:
dispatch_cache.clear()
cache_token = current_token
try:
impl = dispatch_cache[cls]
except KeyError:
try:
impl = registry[cls]
except KeyError:
impl = _find_impl(cls, registry)
dispatch_cache[cls] = impl
return impl
def _is_valid_dispatch_type(cls):
if isinstance(cls, type):
return True
return (isinstance(cls, UnionType) and
all(isinstance(arg, type) for arg in cls.__args__))
def register(cls, func=None):
nonlocal cache_token
if _is_valid_dispatch_type(cls):
if func is None:
return lambda f: register(cls, f)
else:
if func is not None:
raise TypeError(
f"Invalid first argument to `register()`. "
f"{cls!r} is not a class or union type."
)
ann = getattr(cls, '__annotate__', None)
if ann is None:
raise TypeError(
f"Invalid first argument to `register()`: {cls!r}. "
f"Use either `@register(some_class)` or plain `@register` "
f"on an annotated function."
)
func = cls
from typing import get_type_hints
from annotationlib import Format, ForwardRef
argname, cls = next(iter(get_type_hints(func, format=Format.FORWARDREF).items()))
if not _is_valid_dispatch_type(cls):
if isinstance(cls, UnionType):
raise TypeError(
f"Invalid annotation for {argname!r}. "
f"{cls!r} not all arguments are classes."
)
elif isinstance(cls, ForwardRef):
raise TypeError(
f"Invalid annotation for {argname!r}. "
f"{cls!r} is an unresolved forward reference."
)
else:
raise TypeError(
f"Invalid annotation for {argname!r}. "
f"{cls!r} is not a class."
)
if isinstance(cls, UnionType):
for arg in cls.__args__:
registry[arg] = func
else:
registry[cls] = func
if cache_token is None and hasattr(cls, '__abstractmethods__'):
cache_token = get_cache_token()
dispatch_cache.clear()
return func
def wrapper(*args, **kw):
if not args:
raise TypeError(f'{funcname} requires at least '
'1 positional argument')
return dispatch(args[0].__class__)(*args, **kw)
funcname = getattr(func, '__name__', 'singledispatch function')
registry[object] = func
wrapper.register = register
wrapper.dispatch = dispatch
wrapper.registry = MappingProxyType(registry)
wrapper._clear_cache = dispatch_cache.clear
update_wrapper(wrapper, func)
return wrapper
class singledispatchmethod:
def __init__(self, func):
if not callable(func) and not hasattr(func, "__get__"):
raise TypeError(f"{func!r} is not callable or a descriptor")
self.dispatcher = singledispatch(func)
self.func = func
def register(self, cls, method=None):
return self.dispatcher.register(cls, func=method)
def __get__(self, obj, cls=None):
return _singledispatchmethod_get(self, obj, cls)
@property
def __isabstractmethod__(self):
return getattr(self.func, '__isabstractmethod__', False)
def __repr__(self):
try:
name = self.func.__qualname__
except AttributeError:
try:
name = self.func.__name__
except AttributeError:
name = '?'
return f'<single dispatch method descriptor {name}>'
class _singledispatchmethod_get:
def __init__(self, unbound, obj, cls):
self._unbound = unbound
self._dispatch = unbound.dispatcher.dispatch
self._obj = obj
self._cls = cls
func = unbound.func
try:
self.__module__ = func.__module__
except AttributeError:
pass
try:
self.__doc__ = func.__doc__
except AttributeError:
pass
def __repr__(self):
try:
name = self.__qualname__
except AttributeError:
try:
name = self.__name__
except AttributeError:
name = '?'
if self._obj is not None:
return f'<bound single dispatch method {name} of {self._obj!r}>'
else:
return f'<single dispatch method {name}>'
def __call__(self, /, *args, **kwargs):
if not args:
funcname = getattr(self._unbound.func, '__name__',
'singledispatchmethod method')
raise TypeError(f'{funcname} requires at least '
'1 positional argument')
return self._dispatch(args[0].__class__).__get__(self._obj, self._cls)(*args, **kwargs)
def __getattr__(self, name):
if name not in {'__name__', '__qualname__', '__isabstractmethod__',
'__annotations__', '__type_params__'}:
raise AttributeError
return getattr(self._unbound.func, name)
@property
def __wrapped__(self):
return self._unbound.func
@property
def register(self):
return self._unbound.register
_NOT_FOUND = object()
class cached_property:
def __init__(self, func):
self.func = func
self.attrname = None
self.__doc__ = func.__doc__
self.__module__ = func.__module__
def __set_name__(self, owner, name):
if self.attrname is None:
self.attrname = name
elif name != self.attrname:
raise TypeError(
"Cannot assign the same cached_property to two different names "
f"({self.attrname!r} and {name!r})."
)
def __get__(self, instance, owner=None):
if instance is None:
return self
if self.attrname is None:
raise TypeError(
"Cannot use cached_property instance without calling __set_name__ on it.")
try:
cache = instance.__dict__
except AttributeError: msg = (
f"No '__dict__' attribute on {type(instance).__name__!r} "
f"instance to cache {self.attrname!r} property."
)
raise TypeError(msg) from None
val = cache.get(self.attrname, _NOT_FOUND)
if val is _NOT_FOUND:
val = self.func(instance)
try:
cache[self.attrname] = val
except TypeError:
msg = (
f"The '__dict__' attribute on {type(instance).__name__!r} instance "
f"does not support item assignment for caching {self.attrname!r} property."
)
raise TypeError(msg) from None
return val
__class_getitem__ = classmethod(GenericAlias)
def _warn_python_reduce_kwargs(py_reduce):
@wraps(py_reduce)
def wrapper(*args, **kwargs):
if 'function' in kwargs or 'sequence' in kwargs:
import os
import warnings
warnings.warn(
'Calling functools.reduce with keyword arguments '
'"function" or "sequence" '
'is deprecated in Python 3.14 and will be '
'forbidden in Python 3.16.',
DeprecationWarning,
skip_file_prefixes=(os.path.dirname(__file__),))
return py_reduce(*args, **kwargs)
return wrapper
reduce = _warn_python_reduce_kwargs(reduce)
del _warn_python_reduce_kwargs
try:
from _functools import reduce
except ImportError:
pass