from __future__ import (absolute_import, division, print_function,
unicode_literals)
import os
import sys
from itertools import chain, groupby
from collections import OrderedDict
from contextlib import contextmanager
from ._compat import InstallRequirement
from .click import style
UNSAFE_PACKAGES = {'setuptools', 'distribute', 'pip'}
def key_from_ireq(ireq):
if ireq.req is None and ireq.link is not None:
return str(ireq.link)
else:
return key_from_req(ireq.req)
def key_from_req(req):
if hasattr(req, 'key'):
key = req.key
else:
key = req.name
key = key.replace('_', '-').lower()
return key
def comment(text):
return style(text, fg='green')
def make_install_requirement(name, version, extras, constraint=False):
extras_string = ""
if extras:
extras_string = "[{}]".format(",".join(sorted(extras)))
return InstallRequirement.from_line(
str('{}{}=={}'.format(name, extras_string, version)),
constraint=constraint)
def format_requirement(ireq, marker=None):
if ireq.editable:
line = '-e {}'.format(ireq.link)
else:
line = str(ireq.req).lower()
if marker:
line = '{} ; {}'.format(line, marker)
return line
def format_specifier(ireq):
specs = ireq.specifier._specs if ireq.req is not None else []
specs = sorted(specs, key=lambda x: x._spec[1])
return ','.join(str(s) for s in specs) or '<any>'
def is_pinned_requirement(ireq):
if ireq.editable:
return False
if len(ireq.specifier._specs) != 1:
return False
op, version = next(iter(ireq.specifier._specs))._spec
return (op == '==' or op == '===') and not version.endswith('.*')
def as_tuple(ireq):
if not is_pinned_requirement(ireq):
raise TypeError('Expected a pinned InstallRequirement, got {}'.format(ireq))
name = key_from_req(ireq.req)
version = next(iter(ireq.specifier._specs))._spec[1]
extras = tuple(sorted(ireq.extras))
return name, version, extras
def full_groupby(iterable, key=None):
return groupby(sorted(iterable, key=key), key=key)
def flat_map(fn, collection):
return chain.from_iterable(map(fn, collection))
def lookup_table(values, key=None, keyval=None, unique=False, use_lists=False):
if keyval is None:
if key is None:
keyval = (lambda v: v)
else:
keyval = (lambda v: (key(v), v))
if unique:
return dict(keyval(v) for v in values)
lut = {}
for value in values:
k, v = keyval(value)
try:
s = lut[k]
except KeyError:
if use_lists:
s = lut[k] = list()
else:
s = lut[k] = set()
if use_lists:
s.append(v)
else:
s.add(v)
return dict(lut)
def dedup(iterable):
return iter(OrderedDict.fromkeys(iterable))
def name_from_req(req):
if hasattr(req, 'project_name'):
return req.project_name
else:
return req.name
def fs_str(string):
if isinstance(string, str):
return string
assert not isinstance(string, bytes)
return string.encode(_fs_encoding)
_fs_encoding = sys.getfilesystemencoding() or sys.getdefaultencoding()
@contextmanager
def temp_environ():
environ = dict(os.environ)
try:
yield
finally:
os.environ.clear()
os.environ.update(environ)