from __future__ import (absolute_import, division, print_function,
unicode_literals)
import copy
from functools import partial
from itertools import chain, count
import os
from ._compat import InstallRequirement
from . import click
from .cache import DependencyCache
from .exceptions import UnsupportedConstraint
from .logging import log
from .utils import (format_requirement, format_specifier, full_groupby,
is_pinned_requirement, key_from_ireq, key_from_req, UNSAFE_PACKAGES)
green = partial(click.style, fg='green')
magenta = partial(click.style, fg='magenta')
class RequirementSummary(object):
def __init__(self, ireq):
self.req = ireq.req
self.key = key_from_req(ireq.req)
self.extras = str(sorted(ireq.extras))
self.specifier = str(ireq.specifier)
def __eq__(self, other):
return str(self) == str(other)
def __hash__(self):
return hash(str(self))
def __str__(self):
return repr([self.key, self.specifier, self.extras])
class Resolver(object):
def __init__(self, constraints, repository, cache=None, prereleases=False, clear_caches=False, allow_unsafe=False):
self.our_constraints = set(constraints)
self.their_constraints = set()
self.repository = repository
if cache is None:
cache = DependencyCache() self.dependency_cache = cache
self.prereleases = prereleases
self.clear_caches = clear_caches
self.allow_unsafe = allow_unsafe
self.unsafe_constraints = set()
@property
def constraints(self):
return set(self._group_constraints(chain(self.our_constraints,
self.their_constraints)))
def resolve_hashes(self, ireqs):
with self.repository.allow_all_wheels():
return {ireq: self.repository.get_hashes(ireq) for ireq in ireqs}
def resolve(self, max_rounds=10):
if self.clear_caches:
self.dependency_cache.clear()
self.repository.clear_caches()
self.check_constraints(chain(self.our_constraints,
self.their_constraints))
os.environ[str('PIP_EXISTS_ACTION')] = str('i') for current_round in count(start=1):
if current_round > max_rounds:
raise RuntimeError('No stable configuration of concrete packages '
'could be found for the given constraints after '
'%d rounds of resolving.\n'
'This is likely a bug.' % max_rounds)
log.debug('')
log.debug(magenta('{:^60}'.format('ROUND {}'.format(current_round))))
has_changed, best_matches = self._resolve_one_round()
log.debug('-' * 60)
log.debug('Result of round {}: {}'.format(current_round,
'not stable' if has_changed else 'stable, done'))
if not has_changed:
break
self.repository.freshen_build_caches()
del os.environ['PIP_EXISTS_ACTION']
return {req for req in best_matches if not req.constraint}
@staticmethod
def check_constraints(constraints):
for constraint in constraints:
if constraint.link is not None and not constraint.editable:
msg = ('pip-compile does not support URLs as packages, unless they are editable. '
'Perhaps add -e option?')
raise UnsupportedConstraint(msg, constraint)
def _group_constraints(self, constraints):
for _, ireqs in full_groupby(constraints, key=key_from_ireq):
ireqs = list(ireqs)
editable_ireq = next((ireq for ireq in ireqs if ireq.editable), None)
if editable_ireq:
yield editable_ireq continue
ireqs = iter(ireqs)
combined_ireq = copy.deepcopy(next(ireqs))
combined_ireq.comes_from = None
for ireq in ireqs:
combined_ireq.req.specifier &= ireq.req.specifier
combined_ireq.constraint &= ireq.constraint
combined_ireq.extras = tuple(sorted(set(tuple(combined_ireq.extras) + tuple(ireq.extras))))
yield combined_ireq
def _resolve_one_round(self):
constraints = sorted(self.constraints, key=key_from_ireq)
unsafe_constraints = []
original_constraints = copy.copy(constraints)
if not self.allow_unsafe:
for constraint in original_constraints:
if constraint.name in UNSAFE_PACKAGES:
constraints.remove(constraint)
constraint.req.specifier = None
unsafe_constraints.append(constraint)
log.debug('Current constraints:')
for constraint in constraints:
log.debug(' {}'.format(constraint))
log.debug('')
log.debug('Finding the best candidates:')
best_matches = {self.get_best_match(ireq) for ireq in constraints}
log.debug('')
log.debug('Finding secondary dependencies:')
safe_constraints = []
for best_match in best_matches:
for dep in self._iter_dependencies(best_match):
if self.allow_unsafe or dep.name not in UNSAFE_PACKAGES:
safe_constraints.append(dep)
theirs = set(self._group_constraints(safe_constraints))
diff = {RequirementSummary(t) for t in theirs} - {RequirementSummary(t) for t in self.their_constraints}
removed = ({RequirementSummary(t) for t in self.their_constraints} -
{RequirementSummary(t) for t in theirs})
unsafe = ({RequirementSummary(t) for t in unsafe_constraints} -
{RequirementSummary(t) for t in self.unsafe_constraints})
has_changed = len(diff) > 0 or len(removed) > 0 or len(unsafe) > 0
if has_changed:
log.debug('')
log.debug('New dependencies found in this round:')
for new_dependency in sorted(diff, key=lambda req: key_from_req(req.req)):
log.debug(' adding {}'.format(new_dependency))
log.debug('Removed dependencies in this round:')
for removed_dependency in sorted(removed, key=lambda req: key_from_req(req.req)):
log.debug(' removing {}'.format(removed_dependency))
log.debug('Unsafe dependencies in this round:')
for unsafe_dependency in sorted(unsafe, key=lambda req: key_from_req(req.req)):
log.debug(' remembering unsafe {}'.format(unsafe_dependency))
self.their_constraints = theirs
self.unsafe_constraints = unsafe_constraints
return has_changed, best_matches
def get_best_match(self, ireq):
if ireq.editable:
best_match = ireq
elif is_pinned_requirement(ireq):
best_match = ireq
else:
best_match = self.repository.find_best_match(ireq, prereleases=self.prereleases)
log.debug(' found candidate {} (constraint was {})'.format(format_requirement(best_match),
format_specifier(ireq)))
return best_match
def _iter_dependencies(self, ireq):
if ireq.editable:
for dependency in self.repository.get_dependencies(ireq):
yield dependency
return
elif not is_pinned_requirement(ireq):
raise TypeError('Expected pinned or editable requirement, got {}'.format(ireq))
if ireq not in self.dependency_cache:
log.debug(' {} not in cache, need to check index'.format(format_requirement(ireq)), fg='yellow')
dependencies = self.repository.get_dependencies(ireq)
self.dependency_cache[ireq] = sorted(str(ireq.req) for ireq in dependencies)
dependency_strings = self.dependency_cache[ireq]
log.debug(' {:25} requires {}'.format(format_requirement(ireq),
', '.join(sorted(dependency_strings, key=lambda s: s.lower())) or '-'))
for dependency_string in dependency_strings:
yield InstallRequirement.from_line(dependency_string, constraint=ireq.constraint)
def reverse_dependencies(self, ireqs):
non_editable = [ireq for ireq in ireqs if not ireq.editable]
return self.dependency_cache.reverse_dependencies(non_editable)