import gzip
import os
from io import BytesIO
import tarfile
import bz2
import hashlib
import shutil
import time
import zipfile
from ...errors import (
BzrError,
DependencyNotPresent,
)
from ...transport import get_transport, FileExists
from .util import open_file, open_file_via_transport
class UnsupportedRepackFormat(BzrError):
_fmt = ('Either the file extension of "%(location)s" indicates that '
'it is a format unsupported for repacking or it is a '
'remote directory.')
def __init__(self, location):
BzrError.__init__(self, location=location)
class TgzRepacker:
def __init__(self, source_f):
self.source_f = source_f
def repack(self, target_f):
raise NotImplementedError(self.repack)
class CopyRepacker(TgzRepacker):
def repack(self, target_f):
shutil.copyfileobj(self.source_f, target_f)
class TarTgzRepacker(TgzRepacker):
def repack(self, target_f):
with gzip.GzipFile(mode='w', fileobj=target_f) as gz:
shutil.copyfileobj(self.source_f, gz)
class Tbz2TgzRepacker(TgzRepacker):
def repack(self, target_f):
content = bz2.decompress(self.source_f.read())
with gzip.GzipFile(mode='w', fileobj=target_f) as gz:
gz.write(content)
class TarLzma2TgzRepacker(TgzRepacker):
def repack(self, target_f):
try:
import lzma
except ImportError as e:
raise DependencyNotPresent('lzma', e) from e
content = lzma.decompress(self.source_f.read())
with gzip.GzipFile(mode='w', fileobj=target_f) as gz:
gz.write(content)
class ZipTgzRepacker(TgzRepacker):
def _repack_zip_to_tar(self, zip, tar):
for info in zip.infolist():
tarinfo = tarfile.TarInfo(info.filename)
tarinfo.size = info.file_size
tarinfo.mtime = time.mktime(info.date_time + (0, 1, -1))
if info.filename.endswith("/"):
tarinfo.mode = 0o755
tarinfo.type = tarfile.DIRTYPE
else:
tarinfo.mode = 0o644
tarinfo.type = tarfile.REGTYPE
contents = BytesIO(zip.read(info.filename))
tar.addfile(tarinfo, contents)
def repack(self, target_f):
with zipfile.ZipFile(self.source_f, "r") as zip:
with tarfile.open(mode="w:gz", fileobj=target_f) as tar:
self._repack_zip_to_tar(zip, tar)
def get_filetype(filename):
types = {
".tar.gz": "gz",
".tgz": "gz",
".tar.bz2": "bz2",
".tar.xz": "xz",
".tar.lzma": "lzma",
".tbz2": "bz2",
".tar": "tar",
".zip": "zip"
}
for filetype, name in types.items():
if filename.endswith(filetype):
return name
def get_repacker_class(source_format, target_format):
if source_format == target_format:
return CopyRepacker
known_formatters = {
("bz2", "gz"): Tbz2TgzRepacker,
("lzma", "gz"): TarLzma2TgzRepacker,
("xz", "gz"): TarLzma2TgzRepacker,
("tar", "gz"): TarTgzRepacker,
("zip", "gz"): ZipTgzRepacker,
}
return known_formatters.get((source_format, target_format))
def _error_if_exists(target_transport, new_name, source_name):
with open_file(source_name) as source_f:
source_sha = hashlib.sha1(source_f.read()).hexdigest()
with open_file_via_transport(new_name, target_transport) as target_f:
target_sha = hashlib.sha1(target_f.read()).hexdigest()
if source_sha != target_sha:
raise FileExists(new_name)
def _repack_directory(target_transport, new_name, source_name):
target_transport.ensure_base()
with target_transport.open_write_stream(new_name) as target_f:
with tarfile.open(mode='w:gz', fileobj=target_f) as tar:
tar.add(source_name, os.path.basename(source_name))
def _repack_other(target_transport, new_name, source_name):
source_filetype = get_filetype(source_name)
target_filetype = get_filetype(new_name)
repacker_cls = get_repacker_class(source_filetype, target_filetype)
if repacker_cls is None:
raise UnsupportedRepackFormat(source_name)
target_transport.ensure_base()
with target_transport.open_write_stream(new_name) as target_f:
with open_file(source_name) as source_f:
repacker = repacker_cls(source_f)
repacker.repack(target_f)
def repack_tarball(source_name, new_name, target_dir=None):
if target_dir is None:
target_dir = "."
extra, new_name = os.path.split(new_name)
target_transport = get_transport(os.path.join(target_dir, extra))
if target_transport.has(new_name):
source_format = get_filetype(source_name)
target_format = get_filetype(new_name)
if source_format != target_format:
raise FileExists(new_name)
_error_if_exists(target_transport, new_name, source_name)
return
if os.path.isdir(source_name):
_repack_directory(target_transport, new_name, source_name)
else:
_repack_other(target_transport, new_name, source_name)