import os
import pathlib
import posixpath
import stat
import time
import zipfile
_FIXED_ZIP_HEADER_LEN = 30
def _set_alignment(zip_obj, zip_info, alignment):
header_size = _FIXED_ZIP_HEADER_LEN + len(zip_info.filename)
pos = zip_obj.fp.tell() + header_size
padding_needed = (alignment - (pos % alignment)) % alignment
zip_info.extra = b'\0' * padding_needed
def _hermetic_date_time(timestamp=None):
if not timestamp:
return (2001, 1, 1, 0, 0, 0)
utc_time = time.gmtime(timestamp)
return (utc_time.tm_year, utc_time.tm_mon, utc_time.tm_mday, utc_time.tm_hour,
utc_time.tm_min, utc_time.tm_sec)
def add_to_zip_hermetic(zip_file,
zip_path,
*,
src_path=None,
data=None,
compress=None,
compress_level=1,
alignment=None,
timestamp=None):
assert (src_path is None) != (data is None), (
'|src_path| and |data| are mutually exclusive.')
if isinstance(zip_path, zipfile.ZipInfo):
zipinfo = zip_path
zip_path = zipinfo.filename
else:
zipinfo = zipfile.ZipInfo(filename=zip_path)
zipinfo.external_attr = 0o644 << 16
zipinfo.date_time = _hermetic_date_time(timestamp)
if alignment:
_set_alignment(zip_file, zipinfo, alignment)
assert '\\' not in zip_path, 'zip_path should not contain \\: ' + zip_path
assert not posixpath.isabs(zip_path), 'Absolute zip path: ' + zip_path
assert not zip_path.startswith('..'), 'Should not start with ..: ' + zip_path
assert posixpath.normpath(zip_path) == zip_path, (
f'Non-canonical zip_path: {zip_path} vs: {posixpath.normpath(zip_path)}')
assert zip_path not in zip_file.namelist(), (
'Tried to add a duplicate zip entry: ' + zip_path)
if src_path and os.path.islink(src_path):
zipinfo.external_attr |= stat.S_IFLNK << 16 zip_file.writestr(zipinfo, os.readlink(src_path))
return
if src_path:
st = os.stat(src_path)
for mode in (stat.S_IXUSR, stat.S_IXGRP, stat.S_IXOTH):
if st.st_mode & mode:
zipinfo.external_attr |= mode << 16
if src_path:
with open(src_path, 'rb') as f:
data = f.read()
if len(data) < 16:
compress = False
compress_type = zip_file.compression
if compress is not None:
compress_type = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED
zip_file.writestr(zipinfo, data, compress_type, compresslevel=compress_level)
def add_files_to_zip(inputs,
output,
*,
base_dir=None,
path_transform=None,
compress=None,
compress_level=1,
zip_prefix_path=None,
timestamp=None):
if base_dir is None:
base_dir = '.'
input_tuples = []
for tup in inputs:
if isinstance(tup, str):
src_path = tup
zip_path = os.path.relpath(src_path, base_dir)
if os.path.sep != posixpath.sep:
zip_path = str(pathlib.Path(zip_path).as_posix())
tup = (zip_path, src_path)
input_tuples.append(tup)
input_tuples.sort(key=lambda tup: tup[0])
out_zip = output
if not isinstance(output, zipfile.ZipFile):
out_zip = zipfile.ZipFile(output, 'w')
try:
for zip_path, fs_path in input_tuples:
if zip_prefix_path:
zip_path = posixpath.join(zip_prefix_path, zip_path)
if path_transform:
zip_path = path_transform(zip_path)
if zip_path is None:
continue
add_to_zip_hermetic(out_zip,
zip_path,
src_path=fs_path,
compress=compress,
compress_level=compress_level,
timestamp=timestamp)
finally:
if output is not out_zip:
out_zip.close()
def zip_directory(output, base_dir, **kwargs):
inputs = []
for root, _, files in os.walk(base_dir):
for f in files:
inputs.append(os.path.join(root, f))
add_files_to_zip(inputs, output, base_dir=base_dir, **kwargs)
def merge_zips(output,
input_zips,
path_transform=None,
compress=None,
compress_level=1):
assert not isinstance(input_zips, str) if isinstance(output, zipfile.ZipFile):
out_zip = output
out_filename = output.filename
else:
assert isinstance(output, str), 'Was: ' + repr(output)
out_zip = zipfile.ZipFile(output, 'w')
out_filename = output
crc_by_name = {i.filename: (out_filename, i.CRC) for i in out_zip.infolist()}
try:
for in_file in input_zips:
with zipfile.ZipFile(in_file, 'r') as in_zip:
for info in in_zip.infolist():
if info.filename[-1] == '/':
continue
if path_transform:
dst_name = path_transform(info.filename)
if dst_name is None:
continue
else:
dst_name = info.filename
data = in_zip.read(info)
if dst_name in crc_by_name:
orig_filename, orig_crc = crc_by_name[dst_name]
new_crc = zipfile.crc32(data)
if new_crc == orig_crc:
continue
msg = f"""File appeared in multiple inputs with differing contents.
File: {dst_name}
Input1: {orig_filename}
Input2: {in_file}"""
raise Exception(msg)
if compress is not None:
compress_entry = compress
else:
compress_entry = info.compress_type != zipfile.ZIP_STORED
add_to_zip_hermetic(out_zip,
dst_name,
data=data,
compress=compress_entry,
compress_level=compress_level)
crc_by_name[dst_name] = (in_file, out_zip.getinfo(dst_name).CRC)
finally:
if output is not out_zip:
out_zip.close()