import builtins
import io
import os
import struct
import sys
import time
import weakref
import zlib
from compression._common import _streams
__all__ = ["BadGzipFile", "GzipFile", "open", "compress", "decompress"]
FTEXT, FHCRC, FEXTRA, FNAME, FCOMMENT = 1, 2, 4, 8, 16
READ = 'rb'
WRITE = 'wb'
_COMPRESS_LEVEL_FAST = 1
_COMPRESS_LEVEL_TRADEOFF = 6
_COMPRESS_LEVEL_BEST = 9
READ_BUFFER_SIZE = 128 * 1024
_WRITE_BUFFER_SIZE = 4 * io.DEFAULT_BUFFER_SIZE
def open(filename, mode="rb", compresslevel=_COMPRESS_LEVEL_BEST,
encoding=None, errors=None, newline=None):
if "t" in mode:
if "b" in mode:
raise ValueError("Invalid mode: %r" % (mode,))
else:
if encoding is not None:
raise ValueError("Argument 'encoding' not supported in binary mode")
if errors is not None:
raise ValueError("Argument 'errors' not supported in binary mode")
if newline is not None:
raise ValueError("Argument 'newline' not supported in binary mode")
gz_mode = mode.replace("t", "")
if isinstance(filename, (str, bytes, os.PathLike)):
binary_file = GzipFile(filename, gz_mode, compresslevel)
elif hasattr(filename, "read") or hasattr(filename, "write"):
binary_file = GzipFile(None, gz_mode, compresslevel, filename)
else:
raise TypeError("filename must be a str or bytes object, or a file")
if "t" in mode:
encoding = io.text_encoding(encoding)
return io.TextIOWrapper(binary_file, encoding, errors, newline)
else:
return binary_file
def write32u(output, value):
output.write(struct.pack("<L", value))
class _PaddedFile:
def __init__(self, f, prepend=b''):
self._buffer = prepend
self._length = len(prepend)
self.file = f
self._read = 0
def read(self, size):
if self._read is None:
return self.file.read(size)
if self._read + size <= self._length:
read = self._read
self._read += size
return self._buffer[read:self._read]
else:
read = self._read
self._read = None
return self._buffer[read:] + \
self.file.read(size-self._length+read)
def prepend(self, prepend=b''):
if self._read is None:
self._buffer = prepend
else: self._read -= len(prepend)
return
self._length = len(self._buffer)
self._read = 0
def seek(self, off):
self._read = None
self._buffer = None
return self.file.seek(off)
def seekable(self):
return True
class BadGzipFile(OSError):
class _WriteBufferStream(io.RawIOBase):
def __init__(self, gzip_file):
self.gzip_file = weakref.ref(gzip_file)
def write(self, data):
gzip_file = self.gzip_file()
if gzip_file is None:
raise RuntimeError("lost gzip_file")
return gzip_file._write_raw(data)
def seekable(self):
return False
def writable(self):
return True
class GzipFile(_streams.BaseStream):
myfileobj = None
def __init__(self, filename=None, mode=None,
compresslevel=_COMPRESS_LEVEL_BEST, fileobj=None, mtime=None):
self.mode = None
self.fileobj = None
self._buffer = None
if mode and ('t' in mode or 'U' in mode):
raise ValueError("Invalid mode: {!r}".format(mode))
if mode and 'b' not in mode:
mode += 'b'
try:
if fileobj is None:
fileobj = self.myfileobj = builtins.open(filename, mode or 'rb')
if filename is None:
filename = getattr(fileobj, 'name', '')
if not isinstance(filename, (str, bytes)):
filename = ''
else:
filename = os.fspath(filename)
origmode = mode
if mode is None:
mode = getattr(fileobj, 'mode', 'rb')
if mode.startswith('r'):
self.mode = READ
raw = _GzipReader(fileobj)
self._buffer = io.BufferedReader(raw)
self.name = filename
elif mode.startswith(('w', 'a', 'x')):
if origmode is None:
import warnings
warnings.warn(
"GzipFile was opened for writing, but this will "
"change in future Python releases. "
"Specify the mode argument for opening it for writing.",
FutureWarning, 2)
self.mode = WRITE
self._init_write(filename)
self.compress = zlib.compressobj(compresslevel,
zlib.DEFLATED,
-zlib.MAX_WBITS,
zlib.DEF_MEM_LEVEL,
0)
self._write_mtime = mtime
self._buffer_size = _WRITE_BUFFER_SIZE
self._buffer = io.BufferedWriter(_WriteBufferStream(self),
buffer_size=self._buffer_size)
else:
raise ValueError("Invalid mode: {!r}".format(mode))
self.fileobj = fileobj
if self.mode == WRITE:
self._write_gzip_header(compresslevel)
except:
self._close()
raise
@property
def mtime(self):
return self._buffer.raw._last_mtime
def __repr__(self):
s = repr(self.fileobj)
return '<gzip ' + s[1:-1] + ' ' + hex(id(self)) + '>'
def _init_write(self, filename):
self.name = filename
self.crc = zlib.crc32(b"")
self.size = 0
self.writebuf = []
self.bufsize = 0
self.offset = 0
def tell(self):
self._check_not_closed()
self._buffer.flush()
return super().tell()
def _write_gzip_header(self, compresslevel):
self.fileobj.write(b'\037\213') self.fileobj.write(b'\010') try:
fname = os.path.basename(self.name)
if not isinstance(fname, bytes):
fname = fname.encode('latin-1')
if fname.endswith(b'.gz'):
fname = fname[:-3]
except UnicodeEncodeError:
fname = b''
flags = 0
if fname:
flags = FNAME
self.fileobj.write(chr(flags).encode('latin-1'))
mtime = self._write_mtime
if mtime is None:
mtime = time.time()
write32u(self.fileobj, int(mtime))
if compresslevel == _COMPRESS_LEVEL_BEST:
xfl = b'\002'
elif compresslevel == _COMPRESS_LEVEL_FAST:
xfl = b'\004'
else:
xfl = b'\000'
self.fileobj.write(xfl)
self.fileobj.write(b'\377')
if fname:
self.fileobj.write(fname + b'\000')
def write(self,data):
self._check_not_closed()
if self.mode != WRITE:
import errno
raise OSError(errno.EBADF, "write() on read-only GzipFile object")
if self.fileobj is None:
raise ValueError("write() on closed GzipFile object")
return self._buffer.write(data)
def _write_raw(self, data):
if isinstance(data, (bytes, bytearray)):
length = len(data)
else:
data = memoryview(data)
length = data.nbytes
if length > 0:
self.fileobj.write(self.compress.compress(data))
self.size += length
self.crc = zlib.crc32(data, self.crc)
self.offset += length
return length
def _check_read(self, caller):
if self.mode != READ:
import errno
msg = f"{caller}() on write-only GzipFile object"
raise OSError(errno.EBADF, msg)
def read(self, size=-1):
self._check_not_closed()
self._check_read("read")
return self._buffer.read(size)
def read1(self, size=-1):
self._check_not_closed()
self._check_read("read1")
if size < 0:
size = io.DEFAULT_BUFFER_SIZE
return self._buffer.read1(size)
def readinto(self, b):
self._check_not_closed()
self._check_read("readinto")
return self._buffer.readinto(b)
def readinto1(self, b):
self._check_not_closed()
self._check_read("readinto1")
return self._buffer.readinto1(b)
def peek(self, n):
self._check_not_closed()
self._check_read("peek")
return self._buffer.peek(n)
@property
def closed(self):
return self.fileobj is None
def close(self):
fileobj = self.fileobj
if fileobj is None:
return
if self._buffer is None or self._buffer.closed:
return
try:
if self.mode == WRITE:
self._buffer.flush()
fileobj.write(self.compress.flush())
write32u(fileobj, self.crc)
write32u(fileobj, self.size & 0xffffffff)
elif self.mode == READ:
self._buffer.close()
finally:
self._close()
def _close(self):
self.fileobj = None
myfileobj = self.myfileobj
if myfileobj is not None:
self.myfileobj = None
myfileobj.close()
def flush(self,zlib_mode=zlib.Z_SYNC_FLUSH):
self._check_not_closed()
if self.mode == WRITE:
self._buffer.flush()
self.fileobj.write(self.compress.flush(zlib_mode))
self.fileobj.flush()
def fileno(self):
return self.fileobj.fileno()
def rewind(self):
if self.mode != READ:
raise OSError("Can't rewind in write mode")
self._buffer.seek(0)
def readable(self):
return self.mode == READ
def writable(self):
return self.mode == WRITE
def seekable(self):
return True
def seek(self, offset, whence=io.SEEK_SET):
if self.mode == WRITE:
self._check_not_closed()
self._buffer.flush()
if whence != io.SEEK_SET:
if whence == io.SEEK_CUR:
offset = self.offset + offset
else:
raise ValueError('Seek from end not supported')
if offset < self.offset:
raise OSError('Negative seek in write mode')
count = offset - self.offset
chunk = b'\0' * self._buffer_size
for i in range(count // self._buffer_size):
self.write(chunk)
self.write(b'\0' * (count % self._buffer_size))
elif self.mode == READ:
self._check_not_closed()
return self._buffer.seek(offset, whence)
return self.offset
def readline(self, size=-1):
self._check_not_closed()
return self._buffer.readline(size)
def __del__(self):
if self.mode == WRITE and not self.closed:
import warnings
warnings.warn("unclosed GzipFile",
ResourceWarning, source=self, stacklevel=2)
super().__del__()
def _read_exact(fp, n):
data = fp.read(n)
while len(data) < n:
b = fp.read(n - len(data))
if not b:
raise EOFError("Compressed file ended before the "
"end-of-stream marker was reached")
data += b
return data
def _read_gzip_header(fp):
magic = fp.read(2)
if magic == b'':
return None
if magic != b'\037\213':
raise BadGzipFile('Not a gzipped file (%r)' % magic)
(method, flag, last_mtime) = struct.unpack("<BBIxx", _read_exact(fp, 8))
if method != 8:
raise BadGzipFile('Unknown compression method')
if flag & FEXTRA:
extra_len, = struct.unpack("<H", _read_exact(fp, 2))
_read_exact(fp, extra_len)
if flag & FNAME:
while True:
s = fp.read(1)
if not s or s==b'\000':
break
if flag & FCOMMENT:
while True:
s = fp.read(1)
if not s or s==b'\000':
break
if flag & FHCRC:
_read_exact(fp, 2) return last_mtime
class _GzipReader(_streams.DecompressReader):
def __init__(self, fp):
super().__init__(_PaddedFile(fp), zlib._ZlibDecompressor,
wbits=-zlib.MAX_WBITS)
self._new_member = True
self._last_mtime = None
def _init_read(self):
self._crc = zlib.crc32(b"")
self._stream_size = 0
def _read_gzip_header(self):
last_mtime = _read_gzip_header(self._fp)
if last_mtime is None:
return False
self._last_mtime = last_mtime
return True
def read(self, size=-1):
if size < 0:
return self.readall()
if not size:
return b""
while True:
if self._decompressor.eof:
self._read_eof()
self._new_member = True
self._decompressor = self._decomp_factory(
**self._decomp_args)
if self._new_member:
self._init_read()
if not self._read_gzip_header():
self._size = self._pos
return b""
self._new_member = False
if self._decompressor.needs_input:
buf = self._fp.read(READ_BUFFER_SIZE)
uncompress = self._decompressor.decompress(buf, size)
else:
uncompress = self._decompressor.decompress(b"", size)
if self._decompressor.unused_data != b"":
self._fp.prepend(self._decompressor.unused_data)
if uncompress != b"":
break
if buf == b"":
raise EOFError("Compressed file ended before the "
"end-of-stream marker was reached")
self._crc = zlib.crc32(uncompress, self._crc)
self._stream_size += len(uncompress)
self._pos += len(uncompress)
return uncompress
def _read_eof(self):
crc32, isize = struct.unpack("<II", _read_exact(self._fp, 8))
if crc32 != self._crc:
raise BadGzipFile("CRC check failed %s != %s" % (hex(crc32),
hex(self._crc)))
elif isize != (self._stream_size & 0xffffffff):
raise BadGzipFile("Incorrect length of data produced")
c = b"\x00"
while c == b"\x00":
c = self._fp.read(1)
if c:
self._fp.prepend(c)
def _rewind(self):
super()._rewind()
self._new_member = True
def compress(data, compresslevel=_COMPRESS_LEVEL_BEST, *, mtime=0):
gzip_data = zlib.compress(data, level=compresslevel, wbits=31)
if mtime is None:
mtime = time.time()
header = struct.pack("<4sLBB", gzip_data, int(mtime), gzip_data[8], 255)
return header + gzip_data[10:]
def decompress(data):
decompressed_members = []
while True:
fp = io.BytesIO(data)
if _read_gzip_header(fp) is None:
return b"".join(decompressed_members)
do = zlib.decompressobj(wbits=-zlib.MAX_WBITS)
decompressed = do.decompress(data[fp.tell():])
if not do.eof or len(do.unused_data) < 8:
raise EOFError("Compressed file ended before the end-of-stream "
"marker was reached")
crc, length = struct.unpack("<II", do.unused_data[:8])
if crc != zlib.crc32(decompressed):
raise BadGzipFile("CRC check failed")
if length != (len(decompressed) & 0xffffffff):
raise BadGzipFile("Incorrect length of data produced")
decompressed_members.append(decompressed)
data = do.unused_data[8:].lstrip(b"\x00")
def main():
from argparse import ArgumentParser
parser = ArgumentParser(description=
"A simple command line interface for the gzip module: act like gzip, "
"but do not delete the input file.",
color=True,
)
group = parser.add_mutually_exclusive_group()
group.add_argument('--fast', action='store_true', help='compress faster')
group.add_argument('--best', action='store_true', help='compress better')
group.add_argument("-d", "--decompress", action="store_true",
help="act like gunzip instead of gzip")
parser.add_argument("args", nargs="*", default=["-"], metavar='file')
args = parser.parse_args()
compresslevel = _COMPRESS_LEVEL_TRADEOFF
if args.fast:
compresslevel = _COMPRESS_LEVEL_FAST
elif args.best:
compresslevel = _COMPRESS_LEVEL_BEST
for arg in args.args:
if args.decompress:
if arg == "-":
f = GzipFile(filename="", mode="rb", fileobj=sys.stdin.buffer)
g = sys.stdout.buffer
else:
if arg[-3:] != ".gz":
sys.exit(f"filename doesn't end in .gz: {arg!r}")
f = open(arg, "rb")
g = builtins.open(arg[:-3], "wb")
else:
if arg == "-":
f = sys.stdin.buffer
g = GzipFile(filename="", mode="wb", fileobj=sys.stdout.buffer,
compresslevel=compresslevel)
else:
f = builtins.open(arg, "rb")
g = open(arg + ".gz", "wb")
while True:
chunk = f.read(READ_BUFFER_SIZE)
if not chunk:
break
g.write(chunk)
if g is not sys.stdout.buffer:
g.close()
if f is not sys.stdin.buffer:
f.close()
if __name__ == '__main__':
main()