import errno
import hashlib
import signal
import shutil
import subprocess
import tempfile
import os
import re
from typing import Optional, Tuple, Iterator
from debian import deb822
from debian.changelog import Changelog, ChangelogParseError, Version
from debian.copyright import Copyright, NotMachineReadableError
from debmutate.changelog import (
changes_by_author,
find_thanks,
find_extra_authors,
find_last_distribution,
strip_changelog_message,
)
from debmutate.versions import get_snapshot_revision
from ... import (
bugtracker,
errors,
osutils,
urlutils,
)
from ...transport import NoSuchFile
from ...export import export
from ...trace import (
mutter,
note,
warning,
)
from ...transport import (
do_catching_redirections,
get_transport,
)
from ...tree import Tree
from . import (
global_conf,
)
from .config import (
DebBuildConfig,
BUILD_TYPE_MERGE,
BUILD_TYPE_NATIVE,
BUILD_TYPE_NORMAL,
)
from .errors import (
BzrError,
)
BUILDDEB_DIR = '.bzr-builddeb'
NEW_LOCAL_CONF = 'debian/local.conf.local'
NEW_CONF = 'debian/bzr-builddeb.conf'
DEFAULT_CONF = os.path.join(BUILDDEB_DIR, 'default.conf')
LOCAL_CONF = os.path.join(BUILDDEB_DIR, 'local.conf')
class MissingChangelogError(BzrError):
_fmt = 'Could not find changelog at %(location)s in tree.'
def __init__(self, locations):
BzrError.__init__(self, location=locations)
_DEBIAN_RELEASES = None
_UBUNTU_RELEASES = None
def _get_release_names():
global _DEBIAN_RELEASES, _UBUNTU_RELEASES
try:
from distro_info import DebianDistroInfo, UbuntuDistroInfo
except ImportError:
warning(
"distro_info not available. Unable to retrieve current "
"list of releases.")
_DEBIAN_RELEASES = []
_UBUNTU_RELEASES = []
else:
_DEBIAN_RELEASES = DebianDistroInfo().all
_UBUNTU_RELEASES = UbuntuDistroInfo().all
_DEBIAN_RELEASES.extend(['stable', 'testing', 'unstable', 'frozen'])
def debian_releases():
if _DEBIAN_RELEASES is None:
_get_release_names()
return _DEBIAN_RELEASES
def ubuntu_releases():
if _UBUNTU_RELEASES is None:
_get_release_names()
return _UBUNTU_RELEASES
DEBIAN_POCKETS = ('', '-security', '-proposed-updates', '-backports')
UBUNTU_POCKETS = ('', '-proposed', '-updates', '-security', '-backports')
def recursive_copy(fromdir, todir):
mutter("Copying %s to %s", fromdir, todir)
for entry in os.listdir(fromdir):
path = os.path.join(fromdir, entry)
if os.path.isdir(path):
tosubdir = os.path.join(todir, entry)
if not os.path.exists(tosubdir):
os.mkdir(tosubdir)
recursive_copy(path, tosubdir)
else:
if os.path.islink(path):
os.symlink(os.readlink(path), os.path.join(todir, entry))
else:
shutil.copy(path, todir)
class AddChangelogError(BzrError):
_fmt = 'Please add "%(changelog)s" to the branch using bzr add.'
def __init__(self, changelog):
BzrError.__init__(self, changelog=changelog)
def find_changelog(t, subpath='', merge=False, max_blocks=1, strict=False):
top_level = False
with t.lock_read():
changelog_file = osutils.pathjoin(subpath, 'debian/changelog')
if not t.has_filename(changelog_file):
checked_files = [changelog_file]
if merge:
changelog_file = osutils.pathjoin(subpath, 'changelog')
top_level = True
if not t.has_filename(changelog_file):
checked_files.append(changelog_file)
changelog_file = None
else:
changelog_file = None
if changelog_file is None:
if getattr(t, "abspath", None):
checked_files = [t.abspath(f) for f in checked_files]
raise MissingChangelogError(" or ".join(checked_files))
elif merge and t.has_filename(osutils.pathjoin(subpath, 'changelog')):
debian_file = osutils.pathjoin(subpath, 'debian')
if (t.is_versioned(debian_file)
and t.kind(debian_file) == 'symlink'
and t.get_symlink_target(debian_file) == '.'):
changelog_file = 'changelog'
top_level = True
mutter("Using '%s' to get package information", changelog_file)
if not t.is_versioned(changelog_file):
raise AddChangelogError(changelog_file)
contents = t.get_file_text(changelog_file)
changelog = Changelog()
changelog.parse_changelog(
contents, max_blocks=max_blocks, allow_empty_author=True,
strict=strict)
return changelog, top_level
def tarball_name(package, version, component=None, format=None):
if format is None:
format = 'gz'
name = "{}_{}.orig".format(package, str(version))
if component is not None:
name += "-" + component
return "{}.tar.{}".format(name, format)
def suite_to_distribution(suite):
all_debian = [r + t for r in debian_releases() for t in DEBIAN_POCKETS]
all_ubuntu = [r + t for r in ubuntu_releases() for t in UBUNTU_POCKETS]
if suite in all_debian:
return "debian"
if suite in all_ubuntu:
return "ubuntu"
if suite == 'kali' or suite.startswith('kali-'):
return "kali"
return None
def lookup_distribution(distribution_or_suite):
if distribution_or_suite.lower() in ("debian", "ubuntu"):
return distribution_or_suite.lower()
return suite_to_distribution(distribution_or_suite)
def md5sum_filename(filename):
m = hashlib.md5()
with open(filename, 'rb') as f:
for line in f:
m.update(line)
return m.hexdigest()
def move_file_if_different(source, target, md5sum):
if os.path.exists(target):
if os.path.samefile(source, target):
return
t_md5sum = md5sum_filename(target)
if t_md5sum == md5sum:
return
shutil.move(source, target)
def write_if_different(contents, target):
md5sum = hashlib.md5()
md5sum.update(contents)
fd, temp_path = tempfile.mkstemp("builddeb-rename-")
fobj = os.fdopen(fd, "wb")
try:
try:
fobj.write(contents)
finally:
fobj.close()
move_file_if_different(temp_path, target, md5sum.hexdigest())
finally:
if os.path.exists(temp_path):
os.unlink(temp_path)
def _download_part(name, base_transport, target_dir, md5sum):
part_base_dir, part_path = urlutils.split(name)
f_t = base_transport
if part_base_dir != '':
f_t = base_transport.clone(part_base_dir)
with f_t.get(part_path) as f_f:
target_path = os.path.join(target_dir, part_path)
fd, temp_path = tempfile.mkstemp(prefix="builddeb-")
fobj = os.fdopen(fd, "wb")
try:
try:
shutil.copyfileobj(f_f, fobj)
finally:
fobj.close()
move_file_if_different(temp_path, target_path, md5sum)
finally:
if os.path.exists(temp_path):
os.unlink(temp_path)
def open_file(url):
filename, transport = open_transport(url)
return open_file_via_transport(filename, transport)
def open_transport(path):
base_dir, path = urlutils.split(path)
transport = get_transport(base_dir)
return (path, transport)
def open_file_via_transport(filename, transport):
def open_file(transport):
return transport.get(filename)
def follow_redirection(transport, e, redirection_notice):
mutter(redirection_notice)
_filename, redirected_transport = open_transport(e.target)
return redirected_transport
result = do_catching_redirections(open_file, transport, follow_redirection)
return result
def _dget(cls, dsc_location, target_dir):
if not os.path.isdir(target_dir):
raise errors.NotADirectory(target_dir)
path, dsc_t = open_transport(dsc_location)
with open_file_via_transport(path, dsc_t) as f:
dsc_contents = f.read()
dsc = cls(dsc_contents)
for file_details in dsc['files']:
name = file_details['name']
_download_part(name, dsc_t, target_dir, file_details['md5sum'])
target_file = os.path.join(target_dir, path)
write_if_different(dsc_contents, target_file)
return target_file
def dget(dsc_location, target_dir):
return _dget(deb822.Dsc, dsc_location, target_dir)
def dget_changes(changes_location, target_dir):
return _dget(deb822.Changes, changes_location, target_dir)
def get_parent_dir(target):
parent = os.path.dirname(target)
if os.path.basename(target) == '':
parent = os.path.dirname(parent)
return parent
def find_bugs_fixed(changes, branch, _lplib=None):
if _lplib is None:
from . import launchpad as _lplib
bugs = []
for new_author, linenos, lines in changes_by_author(changes):
for match in re.finditer(
"closes:\\s*(?:bug)?\\#?\\s?\\d+"
"(?:,\\s*(?:bug)?\\#?\\s?\\d+)*", ''.join(lines),
re.IGNORECASE):
closes_list = match.group(0)
for match in re.finditer("\\d+", closes_list):
bug_url = bugtracker.get_bug_url("deb", branch, match.group(0))
bugs.append(bug_url + " fixed")
lp_bugs = _lplib.ubuntu_bugs_for_debian_bug(match.group(0))
if len(lp_bugs) == 1:
bug_url = bugtracker.get_bug_url("lp", branch, lp_bugs[0])
bugs.append(bug_url + " fixed")
for match in re.finditer(
"lp:\\s+\\#\\d+(?:,\\s*\\#\\d+)*",
''.join(lines), re.IGNORECASE):
closes_list = match.group(0)
for match in re.finditer("\\d+", closes_list):
bug_url = bugtracker.get_bug_url("lp", branch, match.group(0))
bugs.append(bug_url + " fixed")
deb_bugs = _lplib.debian_bugs_for_ubuntu_bug(match.group(0))
if len(deb_bugs) == 1:
bug_url = bugtracker.get_bug_url(
"deb", branch, deb_bugs[0])
bugs.append(bug_url + " fixed")
return bugs
def get_commit_info_from_changelog(changelog, branch, _lplib=None):
message = None
authors = []
thanks = []
bugs = []
if changelog._blocks:
block = changelog._blocks[0]
authors = [block.author]
authors += find_extra_authors(block.changes())
changes = strip_changelog_message(block.changes())
bugs = find_bugs_fixed(block.changes(), branch, _lplib=_lplib)
thanks = find_thanks(block.changes())
message = "\n".join(changes).replace("\r", "")
return (message, authors, thanks, bugs)
def subprocess_setup():
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
def debuild_config(tree, subpath):
config_files = []
user_config = None
if subpath in ('.', None):
subpath = ''
new_local_conf = osutils.pathjoin(subpath, NEW_LOCAL_CONF)
local_conf = osutils.pathjoin(subpath, LOCAL_CONF)
new_conf = osutils.pathjoin(subpath, NEW_CONF)
default_conf = osutils.pathjoin(subpath, DEFAULT_CONF)
if tree.has_filename(new_local_conf):
if not tree.is_versioned(new_local_conf):
config_files.append(
(tree.get_file(new_local_conf), True, "local.conf"))
else:
warning('Not using configuration from %s as it is versioned.',
new_local_conf)
if tree.has_filename(local_conf):
if not tree.is_versioned(local_conf):
config_files.append(
(tree.get_file(local_conf), True, "local.conf"))
else:
warning('Not using configuration from %s as it is versioned.',
local_conf)
config_files.append((global_conf(), True))
user_config = global_conf()
if tree.is_versioned(new_conf):
config_files.append(
(tree.get_file(new_conf), False, "bzr-builddeb.conf"))
if tree.is_versioned(default_conf):
config_files.append(
(tree.get_file(default_conf), False, "default.conf"))
config = DebBuildConfig(config_files, tree=tree)
config.set_user_config(user_config)
return config
class UnableToFindPreviousUpload(BzrError):
_fmt = ("Unable to determine the previous upload for --package-merge.")
def find_previous_upload(tree, subpath, merge=False):
try:
cl, _top_level = find_changelog(tree, subpath, merge, max_blocks=None)
except ChangelogParseError as ex:
raise UnableToFindPreviousUpload() from ex
return changelog_find_previous_upload(cl)
class NoPreviousUpload(BzrError):
_fmt = ("There was no previous upload to %(distribution)s.")
def __init__(self, distribution):
BzrError.__init__(self, distribution=distribution)
def changelog_find_previous_upload(cl):
current_target = find_last_distribution(cl)
all_debian = [r + t for r in debian_releases() for t in DEBIAN_POCKETS]
all_ubuntu = [r + t for r in ubuntu_releases() for t in UBUNTU_POCKETS]
if current_target in all_debian:
match_targets = (current_target,)
elif current_target in all_ubuntu:
match_targets = ubuntu_releases()
if "-" in current_target:
match_targets += tuple(
[current_target.split("-", 1)[0] + t for t in UBUNTU_POCKETS])
else:
match_targets = (current_target,)
for block in cl._blocks[1:]:
if block.distributions.split(" ")[0] in match_targets:
return block.version
raise NoPreviousUpload(current_target)
def tree_contains_upstream_source(tree, subpath=''):
present_files = {
f[0] for f in tree.list_files(recursive=False, from_dir=subpath)
if f[1] == 'V'}
if len(present_files) == 0:
return None
packaging_files = frozenset([
"debian", ".bzr-builddeb", ".bzrignore", ".gitignore"])
return (len(present_files - packaging_files) > 0)
def tree_get_source_format(tree, subpath=''):
filename = osutils.pathjoin(subpath, "debian/source/format")
try:
text = tree.get_file_text(filename)
except OSError as ex:
if ex.errno == errno.ENOENT:
return FORMAT_1_0
raise
except NoSuchFile:
return FORMAT_1_0
return text.strip().decode('ascii')
FORMAT_1_0 = "1.0"
FORMAT_3_0_QUILT = "3.0 (quilt)"
FORMAT_3_0_NATIVE = "3.0 (native)"
NATIVE_SOURCE_FORMATS = [FORMAT_3_0_NATIVE]
NORMAL_SOURCE_FORMATS = [FORMAT_3_0_QUILT]
class InconsistentSourceFormatError(BzrError):
_fmt = ("Inconsistency between source format and version: "
"version %(version)s is %(version_bool)snative, "
"format '%(format)s' is %(format_bool)snative.")
def __init__(self, version_native, format_native, version_str, format_str):
if version_native:
version_bool = ""
else:
version_bool = "not "
if format_native:
format_bool = ""
else:
format_bool = "not "
BzrError.__init__(
self, version_bool=version_bool, format_bool=format_bool,
version=version_str, format=format_str)
def guess_build_type(tree, version, subpath='', contains_upstream_source=True):
source_format = tree_get_source_format(tree, subpath)
if source_format in NATIVE_SOURCE_FORMATS:
format_native = True
elif source_format in NORMAL_SOURCE_FORMATS:
format_native = False
else:
format_native = None
if version is not None:
version_native = (not version.debian_version)
else:
version_native = None
if type(version_native) is bool and type(format_native) is bool:
if version_native is True and format_native is False:
raise InconsistentSourceFormatError(
version_native, format_native, version, source_format)
if version_native is False and format_native is True:
warning(
'Version (%s) suggests non-native package, '
'but format (%s) is for native.',
version, source_format)
if version_native or format_native:
return BUILD_TYPE_NATIVE
if contains_upstream_source is False:
return BUILD_TYPE_MERGE
else:
return BUILD_TYPE_NORMAL
def component_from_orig_tarball(tarball_filename, package, version):
tarball_filename = os.path.basename(tarball_filename)
prefix = f"{package}_{version}.orig"
if not tarball_filename.startswith(prefix):
raise ValueError(
f"invalid orig tarball file {tarball_filename} "
f"does not have expected prefix {prefix}")
base = tarball_filename[len(prefix):]
for ext in (".tar.gz", ".tar.bz2", ".tar.lzma", ".tar.xz"):
if tarball_filename.endswith(ext):
base = base[:-len(ext)]
break
else:
raise ValueError(
f"orig tarball file {tarball_filename} has unknown extension")
if base == "":
return None
elif base[0] == "-":
return base[1:]
else:
raise ValueError(
f"Invalid extra characters in tarball filename {tarball_filename}")
class TarFailed(BzrError):
_fmt = ("There was an error executing tar to %(operation)s %(tarball)s: "
"%(error)s.")
def __init__(self, operation, tarball, error):
BzrError.__init__(
self, operation=operation, tarball=tarball, error=error)
def needs_strip_components(tf):
top_level_directories = set()
for name in tf.getnames():
top_level_directories.add(name.split('/')[0])
return len(top_level_directories) == 1
def extract_orig_tarball(tarball_filename, component, target,
strip_components: Optional[bool] = None) -> None:
from tarfile import TarFile
tar_args = ["tar"]
if tarball_filename.endswith(".tar.bz2"):
tar_args.append('xjf')
tf = TarFile.bz2open(tarball_filename)
elif (tarball_filename.endswith(".tar.lzma")
or tarball_filename.endswith(".tar.xz")):
tar_args.append('xJf')
tf = TarFile.xzopen(tarball_filename)
elif tarball_filename.endswith(".tar"):
tar_args.append('xf')
tf = TarFile.open(tarball_filename)
elif (tarball_filename.endswith(".tar.gz") or
tarball_filename.endswith(".tgz")):
tf = TarFile.gzopen(tarball_filename)
tar_args.append('xzf')
else:
note('Unable to figure out type of %s, '
'assuming .tar.gz', tarball_filename)
tf = TarFile.gzopen(tarball_filename)
tar_args.append('xzf')
try:
if strip_components is None:
if needs_strip_components(tf):
strip_components = 1
else:
strip_components = 0
finally:
tf.close()
if component is not None:
target_path = os.path.join(target, component)
os.mkdir(target_path)
else:
target_path = target
tar_args.extend([tarball_filename, "-C", target_path])
if strip_components is not None:
tar_args.extend(["--strip-components", str(strip_components)])
proc = subprocess.Popen(tar_args, preexec_fn=subprocess_setup,
stderr=subprocess.PIPE)
(stdout, stderr) = proc.communicate()
if proc.returncode != 0:
raise TarFailed("extract", tarball_filename, error=stderr)
def extract_orig_tarballs(tarballs, target, strip_components=None):
for tarball_filename, component in tarballs:
extract_orig_tarball(
tarball_filename, component, target,
strip_components=strip_components)
def dput_changes(path: str) -> None:
(bd, changes_file) = os.path.split(path)
subprocess.check_call(["dput", changes_file], cwd=bd)
def find_changes_files(
path: str, package: str, version: Version) -> Iterator[
Tuple[str, os.DirEntry]]:
non_epoch_version = version.upstream_version
if version.debian_version is not None:
non_epoch_version += "-%s" % version.debian_version
c = re.compile('{}_{}_(.*).changes'.format(
re.escape(package), re.escape(non_epoch_version)))
for entry in os.scandir(path):
m = c.match(entry.name)
if m:
yield m.group(1), entry
def get_files_excluded(tree, subpath='', top_level=False):
if top_level:
path = os.path.join(subpath, 'copyright')
else:
path = os.path.join(subpath, 'debian', 'copyright')
with tree.get_file(path) as f:
try:
copyright = Copyright(f, strict=False)
except NotMachineReadableError:
return []
try:
return copyright.header["Files-Excluded"].split()
except KeyError:
return []
def control_files_in_root(tree: Tree, subpath: str) -> bool:
debian_path = os.path.join(subpath, "debian")
if tree.has_filename(debian_path):
return False
control_path = os.path.join(subpath, "control")
if tree.has_filename(control_path):
return True
if tree.has_filename(control_path + ".in"):
return True
return False
def full_branch_url(branch):
if branch.name is None:
return branch.user_url
url, params = urlutils.split_segment_parameters(branch.user_url)
if branch.name != "":
params["branch"] = urlutils.quote(branch.name, "")
return urlutils.join_segment_parameters(url, params)
def detect_version_kind(upstream_version):
snapshot_info = get_snapshot_revision(upstream_version)
if snapshot_info is None:
return 'release'
return 'snapshot'
def export_with_nested(tree, dest, **kwargs):
with tree.lock_read():
try:
return export(tree, dest, recurse_nested=True, **kwargs)
except BaseException:
if os.path.isfile(dest):
os.unlink(dest)
raise
def debsign(path: str, keyid: Optional[str] = None) -> None:
(bd, changes_file) = os.path.split(path)
args = ["debsign"]
if keyid:
args.append("-k%s" % keyid)
args.append(changes_file)
subprocess.check_call(args, cwd=bd)