version = "0.9.0"
__author__ = "Lars Gust\u00e4bel (lars@gustaebel.de)"
__credits__ = "Gustavo Niemeyer, Niels Gust\u00e4bel, Richard Townsend."
from builtins import open as bltn_open
import sys
import os
import io
import shutil
import stat
import time
import struct
import copy
import re
try:
import pwd
except ImportError:
pwd = None
try:
import grp
except ImportError:
grp = None
symlink_exception = (AttributeError, NotImplementedError, OSError)
__all__ = ["TarFile", "TarInfo", "is_tarfile", "TarError", "ReadError",
"CompressionError", "StreamError", "ExtractError", "HeaderError",
"ENCODING", "USTAR_FORMAT", "GNU_FORMAT", "PAX_FORMAT",
"DEFAULT_FORMAT", "open","fully_trusted_filter", "data_filter",
"tar_filter", "FilterError", "AbsoluteLinkError",
"OutsideDestinationError", "SpecialFileError", "AbsolutePathError",
"LinkOutsideDestinationError", "LinkFallbackError"]
NUL = b"\0" BLOCKSIZE = 512 RECORDSIZE = BLOCKSIZE * 20 GNU_MAGIC = b"ustar \0" POSIX_MAGIC = b"ustar\x0000"
LENGTH_NAME = 100 LENGTH_LINK = 100 LENGTH_PREFIX = 155
REGTYPE = b"0" AREGTYPE = b"\0" LNKTYPE = b"1" SYMTYPE = b"2" CHRTYPE = b"3" BLKTYPE = b"4" DIRTYPE = b"5" FIFOTYPE = b"6" CONTTYPE = b"7"
GNUTYPE_LONGNAME = b"L" GNUTYPE_LONGLINK = b"K" GNUTYPE_SPARSE = b"S"
XHDTYPE = b"x" XGLTYPE = b"g" SOLARIS_XHDTYPE = b"X"
USTAR_FORMAT = 0 GNU_FORMAT = 1 PAX_FORMAT = 2 DEFAULT_FORMAT = PAX_FORMAT
SUPPORTED_TYPES = (REGTYPE, AREGTYPE, LNKTYPE,
SYMTYPE, DIRTYPE, FIFOTYPE,
CONTTYPE, CHRTYPE, BLKTYPE,
GNUTYPE_LONGNAME, GNUTYPE_LONGLINK,
GNUTYPE_SPARSE)
REGULAR_TYPES = (REGTYPE, AREGTYPE,
CONTTYPE, GNUTYPE_SPARSE)
GNU_TYPES = (GNUTYPE_LONGNAME, GNUTYPE_LONGLINK,
GNUTYPE_SPARSE)
PAX_FIELDS = ("path", "linkpath", "size", "mtime",
"uid", "gid", "uname", "gname")
PAX_NAME_FIELDS = {"path", "linkpath", "uname", "gname"}
PAX_NUMBER_FIELDS = {
"atime": float,
"ctime": float,
"mtime": float,
"uid": int,
"gid": int,
"size": int
}
if os.name == "nt":
ENCODING = "utf-8"
else:
ENCODING = sys.getfilesystemencoding()
def stn(s, length, encoding, errors):
if s is None:
raise ValueError("metadata cannot contain None")
s = s.encode(encoding, errors)
return s[:length] + (length - len(s)) * NUL
def nts(s, encoding, errors):
p = s.find(b"\0")
if p != -1:
s = s[:p]
return s.decode(encoding, errors)
def nti(s):
if s[0] in (0o200, 0o377):
n = 0
for i in range(len(s) - 1):
n <<= 8
n += s[i + 1]
if s[0] == 0o377:
n = -(256 ** (len(s) - 1) - n)
else:
try:
s = nts(s, "ascii", "strict")
n = int(s.strip() or "0", 8)
except ValueError:
raise InvalidHeaderError("invalid header")
return n
def itn(n, digits=8, format=DEFAULT_FORMAT):
original_n = n
n = int(n)
if 0 <= n < 8 ** (digits - 1):
s = bytes("%0*o" % (digits - 1, n), "ascii") + NUL
elif format == GNU_FORMAT and -256 ** (digits - 1) <= n < 256 ** (digits - 1):
if n >= 0:
s = bytearray([0o200])
else:
s = bytearray([0o377])
n = 256 ** digits + n
for i in range(digits - 1):
s.insert(1, n & 0o377)
n >>= 8
else:
raise ValueError("overflow in number field")
return s
def calc_chksums(buf):
unsigned_chksum = 256 + sum(struct.unpack_from("148B8x356B", buf))
signed_chksum = 256 + sum(struct.unpack_from("148b8x356b", buf))
return unsigned_chksum, signed_chksum
def copyfileobj(src, dst, length=None, exception=OSError, bufsize=None):
bufsize = bufsize or 16 * 1024
if length == 0:
return
if length is None:
shutil.copyfileobj(src, dst, bufsize)
return
blocks, remainder = divmod(length, bufsize)
for b in range(blocks):
buf = src.read(bufsize)
if len(buf) < bufsize:
raise exception("unexpected end of data")
dst.write(buf)
if remainder != 0:
buf = src.read(remainder)
if len(buf) < remainder:
raise exception("unexpected end of data")
dst.write(buf)
return
def _safe_print(s):
encoding = getattr(sys.stdout, 'encoding', None)
if encoding is not None:
s = s.encode(encoding, 'backslashreplace').decode(encoding)
print(s, end=' ')
class TarError(Exception):
pass
class ExtractError(TarError):
pass
class ReadError(TarError):
pass
class CompressionError(TarError):
pass
class StreamError(TarError):
pass
class HeaderError(TarError):
pass
class EmptyHeaderError(HeaderError):
pass
class TruncatedHeaderError(HeaderError):
pass
class EOFHeaderError(HeaderError):
pass
class InvalidHeaderError(HeaderError):
pass
class SubsequentHeaderError(HeaderError):
pass
class _LowLevelFile:
def __init__(self, name, mode):
mode = {
"r": os.O_RDONLY,
"w": os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
}[mode]
if hasattr(os, "O_BINARY"):
mode |= os.O_BINARY
self.fd = os.open(name, mode, 0o666)
def close(self):
os.close(self.fd)
def read(self, size):
return os.read(self.fd, size)
def write(self, s):
os.write(self.fd, s)
class _Stream:
def __init__(self, name, mode, comptype, fileobj, bufsize,
compresslevel, preset):
self._extfileobj = True
if fileobj is None:
fileobj = _LowLevelFile(name, mode)
self._extfileobj = False
if comptype == '*':
fileobj = _StreamProxy(fileobj)
comptype = fileobj.getcomptype()
self.name = os.fspath(name) if name is not None else ""
self.mode = mode
self.comptype = comptype
self.fileobj = fileobj
self.bufsize = bufsize
self.buf = b""
self.pos = 0
self.closed = False
try:
if comptype == "gz":
try:
import zlib
except ImportError:
raise CompressionError("zlib module is not available") from None
self.zlib = zlib
self.crc = zlib.crc32(b"")
if mode == "r":
self.exception = zlib.error
self._init_read_gz()
else:
self._init_write_gz(compresslevel)
elif comptype == "bz2":
try:
import bz2
except ImportError:
raise CompressionError("bz2 module is not available") from None
if mode == "r":
self.dbuf = b""
self.cmp = bz2.BZ2Decompressor()
self.exception = OSError
else:
self.cmp = bz2.BZ2Compressor(compresslevel)
elif comptype == "xz":
try:
import lzma
except ImportError:
raise CompressionError("lzma module is not available") from None
if mode == "r":
self.dbuf = b""
self.cmp = lzma.LZMADecompressor()
self.exception = lzma.LZMAError
else:
self.cmp = lzma.LZMACompressor(preset=preset)
elif comptype == "zst":
try:
from compression import zstd
except ImportError:
raise CompressionError("compression.zstd module is not available") from None
if mode == "r":
self.dbuf = b""
self.cmp = zstd.ZstdDecompressor()
self.exception = zstd.ZstdError
else:
self.cmp = zstd.ZstdCompressor()
elif comptype != "tar":
raise CompressionError("unknown compression type %r" % comptype)
except:
if not self._extfileobj:
self.fileobj.close()
self.closed = True
raise
def __del__(self):
if hasattr(self, "closed") and not self.closed:
self.close()
def _init_write_gz(self, compresslevel):
self.cmp = self.zlib.compressobj(compresslevel,
self.zlib.DEFLATED,
-self.zlib.MAX_WBITS,
self.zlib.DEF_MEM_LEVEL,
0)
timestamp = struct.pack("<L", int(time.time()))
self.__write(b"\037\213\010\010" + timestamp + b"\002\377")
if self.name.endswith(".gz"):
self.name = self.name[:-3]
self.name = os.path.basename(self.name)
self.__write(self.name.encode("iso-8859-1", "replace") + NUL)
def write(self, s):
if self.comptype == "gz":
self.crc = self.zlib.crc32(s, self.crc)
self.pos += len(s)
if self.comptype != "tar":
s = self.cmp.compress(s)
self.__write(s)
def __write(self, s):
self.buf += s
while len(self.buf) > self.bufsize:
self.fileobj.write(self.buf[:self.bufsize])
self.buf = self.buf[self.bufsize:]
def close(self):
if self.closed:
return
self.closed = True
try:
if self.mode == "w" and self.comptype != "tar":
self.buf += self.cmp.flush()
if self.mode == "w" and self.buf:
self.fileobj.write(self.buf)
self.buf = b""
if self.comptype == "gz":
self.fileobj.write(struct.pack("<L", self.crc))
self.fileobj.write(struct.pack("<L", self.pos & 0xffffFFFF))
finally:
if not self._extfileobj:
self.fileobj.close()
def _init_read_gz(self):
self.cmp = self.zlib.decompressobj(-self.zlib.MAX_WBITS)
self.dbuf = b""
if self.__read(2) != b"\037\213":
raise ReadError("not a gzip file")
if self.__read(1) != b"\010":
raise CompressionError("unsupported compression method")
flag = ord(self.__read(1))
self.__read(6)
if flag & 4:
xlen = ord(self.__read(1)) + 256 * ord(self.__read(1))
self.read(xlen)
if flag & 8:
while True:
s = self.__read(1)
if not s or s == NUL:
break
if flag & 16:
while True:
s = self.__read(1)
if not s or s == NUL:
break
if flag & 2:
self.__read(2)
def tell(self):
return self.pos
def seek(self, pos=0):
if pos - self.pos >= 0:
blocks, remainder = divmod(pos - self.pos, self.bufsize)
for i in range(blocks):
self.read(self.bufsize)
self.read(remainder)
else:
raise StreamError("seeking backwards is not allowed")
return self.pos
def read(self, size):
assert size is not None
buf = self._read(size)
self.pos += len(buf)
return buf
def _read(self, size):
if self.comptype == "tar":
return self.__read(size)
c = len(self.dbuf)
t = [self.dbuf]
while c < size:
if self.buf:
buf = self.buf
self.buf = b""
else:
buf = self.fileobj.read(self.bufsize)
if not buf:
break
try:
buf = self.cmp.decompress(buf)
except self.exception as e:
raise ReadError("invalid compressed data") from e
t.append(buf)
c += len(buf)
t = b"".join(t)
self.dbuf = t[size:]
return t[:size]
def __read(self, size):
c = len(self.buf)
t = [self.buf]
while c < size:
buf = self.fileobj.read(self.bufsize)
if not buf:
break
t.append(buf)
c += len(buf)
t = b"".join(t)
self.buf = t[size:]
return t[:size]
class _StreamProxy(object):
def __init__(self, fileobj):
self.fileobj = fileobj
self.buf = self.fileobj.read(BLOCKSIZE)
def read(self, size):
self.read = self.fileobj.read
return self.buf
def getcomptype(self):
if self.buf.startswith(b"\x1f\x8b\x08"):
return "gz"
elif self.buf[0:3] == b"BZh" and self.buf[4:10] == b"1AY&SY":
return "bz2"
elif self.buf.startswith((b"\x5d\x00\x00\x80", b"\xfd7zXZ")):
return "xz"
elif self.buf.startswith(b"\x28\xb5\x2f\xfd"):
return "zst"
else:
return "tar"
def close(self):
self.fileobj.close()
class _FileInFile(object):
def __init__(self, fileobj, offset, size, name, blockinfo=None):
self.fileobj = fileobj
self.offset = offset
self.size = size
self.position = 0
self.name = name
self.closed = False
if blockinfo is None:
blockinfo = [(0, size)]
self.map_index = 0
self.map = []
lastpos = 0
realpos = self.offset
for offset, size in blockinfo:
if offset > lastpos:
self.map.append((False, lastpos, offset, None))
self.map.append((True, offset, offset + size, realpos))
realpos += size
lastpos = offset + size
if lastpos < self.size:
self.map.append((False, lastpos, self.size, None))
def flush(self):
pass
@property
def mode(self):
return 'rb'
def readable(self):
return True
def writable(self):
return False
def seekable(self):
return self.fileobj.seekable()
def tell(self):
return self.position
def seek(self, position, whence=io.SEEK_SET):
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def read(self, size=None):
if size is None:
size = self.size - self.position
else:
size = min(size, self.size - self.position)
buf = b""
while size > 0:
while True:
data, start, stop, offset = self.map[self.map_index]
if start <= self.position < stop:
break
else:
self.map_index += 1
if self.map_index == len(self.map):
self.map_index = 0
length = min(size, stop - self.position)
if data:
self.fileobj.seek(offset + (self.position - start))
b = self.fileobj.read(length)
if len(b) != length:
raise ReadError("unexpected end of data")
buf += b
else:
buf += NUL * length
size -= length
self.position += length
return buf
def readinto(self, b):
buf = self.read(len(b))
b[:len(buf)] = buf
return len(buf)
def close(self):
self.closed = True
class ExFileObject(io.BufferedReader):
def __init__(self, tarfile, tarinfo):
fileobj = _FileInFile(tarfile.fileobj, tarinfo.offset_data,
tarinfo.size, tarinfo.name, tarinfo.sparse)
super().__init__(fileobj)
class FilterError(TarError):
pass
class AbsolutePathError(FilterError):
def __init__(self, tarinfo):
self.tarinfo = tarinfo
super().__init__(f'member {tarinfo.name!r} has an absolute path')
class OutsideDestinationError(FilterError):
def __init__(self, tarinfo, path):
self.tarinfo = tarinfo
self._path = path
super().__init__(f'{tarinfo.name!r} would be extracted to {path!r}, '
+ 'which is outside the destination')
class SpecialFileError(FilterError):
def __init__(self, tarinfo):
self.tarinfo = tarinfo
super().__init__(f'{tarinfo.name!r} is a special file')
class AbsoluteLinkError(FilterError):
def __init__(self, tarinfo):
self.tarinfo = tarinfo
super().__init__(f'{tarinfo.name!r} is a link to an absolute path')
class LinkOutsideDestinationError(FilterError):
def __init__(self, tarinfo, path):
self.tarinfo = tarinfo
self._path = path
super().__init__(f'{tarinfo.name!r} would link to {path!r}, '
+ 'which is outside the destination')
class LinkFallbackError(FilterError):
def __init__(self, tarinfo, path):
self.tarinfo = tarinfo
self._path = path
super().__init__(f'link {tarinfo.name!r} would be extracted as a '
+ f'copy of {path!r}, which was rejected')
_FILTER_ERRORS = (FilterError, OSError, ExtractError)
def _get_filtered_attrs(member, dest_path, for_data=True):
new_attrs = {}
name = member.name
dest_path = os.path.realpath(dest_path, strict=os.path.ALLOW_MISSING)
if name.startswith(('/', os.sep)):
name = new_attrs['name'] = member.path.lstrip('/' + os.sep)
if os.path.isabs(name):
raise AbsolutePathError(member)
target_path = os.path.realpath(os.path.join(dest_path, name),
strict=os.path.ALLOW_MISSING)
if os.path.commonpath([target_path, dest_path]) != dest_path:
raise OutsideDestinationError(member, target_path)
mode = member.mode
if mode is not None:
mode = mode & 0o755
if for_data:
if member.isreg() or member.islnk():
if not mode & 0o100:
mode &= ~0o111
mode |= 0o600
elif member.isdir() or member.issym():
mode = None
else:
raise SpecialFileError(member)
if mode != member.mode:
new_attrs['mode'] = mode
if for_data:
if member.uid is not None:
new_attrs['uid'] = None
if member.gid is not None:
new_attrs['gid'] = None
if member.uname is not None:
new_attrs['uname'] = None
if member.gname is not None:
new_attrs['gname'] = None
if member.islnk() or member.issym():
if os.path.isabs(member.linkname):
raise AbsoluteLinkError(member)
normalized = os.path.normpath(member.linkname)
if normalized != member.linkname:
new_attrs['linkname'] = normalized
if member.issym():
target_path = os.path.join(dest_path,
os.path.dirname(name),
member.linkname)
else:
target_path = os.path.join(dest_path,
member.linkname)
target_path = os.path.realpath(target_path,
strict=os.path.ALLOW_MISSING)
if os.path.commonpath([target_path, dest_path]) != dest_path:
raise LinkOutsideDestinationError(member, target_path)
return new_attrs
def fully_trusted_filter(member, dest_path):
return member
def tar_filter(member, dest_path):
new_attrs = _get_filtered_attrs(member, dest_path, False)
if new_attrs:
return member.replace(**new_attrs, deep=False)
return member
def data_filter(member, dest_path):
new_attrs = _get_filtered_attrs(member, dest_path, True)
if new_attrs:
return member.replace(**new_attrs, deep=False)
return member
_NAMED_FILTERS = {
"fully_trusted": fully_trusted_filter,
"tar": tar_filter,
"data": data_filter,
}
_KEEP = object()
_header_length_prefix_re = re.compile(br"([0-9]{1,20}) ")
class TarInfo(object):
__slots__ = dict(
name = 'Name of the archive member.',
mode = 'Permission bits.',
uid = 'User ID of the user who originally stored this member.',
gid = 'Group ID of the user who originally stored this member.',
size = 'Size in bytes.',
mtime = 'Time of last modification.',
chksum = 'Header checksum.',
type = ('File type. type is usually one of these constants: '
'REGTYPE, AREGTYPE, LNKTYPE, SYMTYPE, DIRTYPE, FIFOTYPE, '
'CONTTYPE, CHRTYPE, BLKTYPE, GNUTYPE_SPARSE.'),
linkname = ('Name of the target file name, which is only present '
'in TarInfo objects of type LNKTYPE and SYMTYPE.'),
uname = 'User name.',
gname = 'Group name.',
devmajor = 'Device major number.',
devminor = 'Device minor number.',
offset = 'The tar header starts here.',
offset_data = "The file's data starts here.",
pax_headers = ('A dictionary containing key-value pairs of an '
'associated pax extended header.'),
sparse = 'Sparse member information.',
_tarfile = None,
_sparse_structs = None,
_link_target = None,
)
def __init__(self, name=""):
self.name = name self.mode = 0o644 self.uid = 0 self.gid = 0 self.size = 0 self.mtime = 0 self.chksum = 0 self.type = REGTYPE self.linkname = "" self.uname = "" self.gname = "" self.devmajor = 0 self.devminor = 0
self.offset = 0 self.offset_data = 0
self.sparse = None self.pax_headers = {}
@property
def tarfile(self):
import warnings
warnings.warn(
'The undocumented "tarfile" attribute of TarInfo objects '
+ 'is deprecated and will be removed in Python 3.16',
DeprecationWarning, stacklevel=2)
return self._tarfile
@tarfile.setter
def tarfile(self, tarfile):
import warnings
warnings.warn(
'The undocumented "tarfile" attribute of TarInfo objects '
+ 'is deprecated and will be removed in Python 3.16',
DeprecationWarning, stacklevel=2)
self._tarfile = tarfile
@property
def path(self):
'In pax headers, "name" is called "path".'
return self.name
@path.setter
def path(self, name):
self.name = name
@property
def linkpath(self):
'In pax headers, "linkname" is called "linkpath".'
return self.linkname
@linkpath.setter
def linkpath(self, linkname):
self.linkname = linkname
def __repr__(self):
return "<%s %r at %#x>" % (self.__class__.__name__,self.name,id(self))
def replace(self, *,
name=_KEEP, mtime=_KEEP, mode=_KEEP, linkname=_KEEP,
uid=_KEEP, gid=_KEEP, uname=_KEEP, gname=_KEEP,
deep=True, _KEEP=_KEEP):
if deep:
result = copy.deepcopy(self)
else:
result = copy.copy(self)
if name is not _KEEP:
result.name = name
if mtime is not _KEEP:
result.mtime = mtime
if mode is not _KEEP:
result.mode = mode
if linkname is not _KEEP:
result.linkname = linkname
if uid is not _KEEP:
result.uid = uid
if gid is not _KEEP:
result.gid = gid
if uname is not _KEEP:
result.uname = uname
if gname is not _KEEP:
result.gname = gname
return result
def get_info(self):
if self.mode is None:
mode = None
else:
mode = self.mode & 0o7777
info = {
"name": self.name,
"mode": mode,
"uid": self.uid,
"gid": self.gid,
"size": self.size,
"mtime": self.mtime,
"chksum": self.chksum,
"type": self.type,
"linkname": self.linkname,
"uname": self.uname,
"gname": self.gname,
"devmajor": self.devmajor,
"devminor": self.devminor
}
if info["type"] == DIRTYPE and not info["name"].endswith("/"):
info["name"] += "/"
return info
def tobuf(self, format=DEFAULT_FORMAT, encoding=ENCODING, errors="surrogateescape"):
info = self.get_info()
for name, value in info.items():
if value is None:
raise ValueError("%s may not be None" % name)
if format == USTAR_FORMAT:
return self.create_ustar_header(info, encoding, errors)
elif format == GNU_FORMAT:
return self.create_gnu_header(info, encoding, errors)
elif format == PAX_FORMAT:
return self.create_pax_header(info, encoding)
else:
raise ValueError("invalid format")
def create_ustar_header(self, info, encoding, errors):
info["magic"] = POSIX_MAGIC
if len(info["linkname"].encode(encoding, errors)) > LENGTH_LINK:
raise ValueError("linkname is too long")
if len(info["name"].encode(encoding, errors)) > LENGTH_NAME:
info["prefix"], info["name"] = self._posix_split_name(info["name"], encoding, errors)
return self._create_header(info, USTAR_FORMAT, encoding, errors)
def create_gnu_header(self, info, encoding, errors):
info["magic"] = GNU_MAGIC
buf = b""
if len(info["linkname"].encode(encoding, errors)) > LENGTH_LINK:
buf += self._create_gnu_long_header(info["linkname"], GNUTYPE_LONGLINK, encoding, errors)
if len(info["name"].encode(encoding, errors)) > LENGTH_NAME:
buf += self._create_gnu_long_header(info["name"], GNUTYPE_LONGNAME, encoding, errors)
return buf + self._create_header(info, GNU_FORMAT, encoding, errors)
def create_pax_header(self, info, encoding):
info["magic"] = POSIX_MAGIC
pax_headers = self.pax_headers.copy()
for name, hname, length in (
("name", "path", LENGTH_NAME), ("linkname", "linkpath", LENGTH_LINK),
("uname", "uname", 32), ("gname", "gname", 32)):
if hname in pax_headers:
continue
try:
info[name].encode("ascii", "strict")
except UnicodeEncodeError:
pax_headers[hname] = info[name]
continue
if len(info[name]) > length:
pax_headers[hname] = info[name]
for name, digits in (("uid", 8), ("gid", 8), ("size", 12), ("mtime", 12)):
needs_pax = False
val = info[name]
val_is_float = isinstance(val, float)
val_int = round(val) if val_is_float else val
if not 0 <= val_int < 8 ** (digits - 1):
info[name] = 0
needs_pax = True
elif val_is_float:
info[name] = val_int
needs_pax = True
if needs_pax and name not in pax_headers:
pax_headers[name] = str(val)
if pax_headers:
buf = self._create_pax_generic_header(pax_headers, XHDTYPE, encoding)
else:
buf = b""
return buf + self._create_header(info, USTAR_FORMAT, "ascii", "replace")
@classmethod
def create_pax_global_header(cls, pax_headers):
return cls._create_pax_generic_header(pax_headers, XGLTYPE, "utf-8")
def _posix_split_name(self, name, encoding, errors):
components = name.split("/")
for i in range(1, len(components)):
prefix = "/".join(components[:i])
name = "/".join(components[i:])
if len(prefix.encode(encoding, errors)) <= LENGTH_PREFIX and \
len(name.encode(encoding, errors)) <= LENGTH_NAME:
break
else:
raise ValueError("name is too long")
return prefix, name
@staticmethod
def _create_header(info, format, encoding, errors):
has_device_fields = info.get("type") in (CHRTYPE, BLKTYPE)
if has_device_fields:
devmajor = itn(info.get("devmajor", 0), 8, format)
devminor = itn(info.get("devminor", 0), 8, format)
else:
devmajor = stn("", 8, encoding, errors)
devminor = stn("", 8, encoding, errors)
filetype = info.get("type", REGTYPE)
if filetype is None:
raise ValueError("TarInfo.type must not be None")
parts = [
stn(info.get("name", ""), 100, encoding, errors),
itn(info.get("mode", 0) & 0o7777, 8, format),
itn(info.get("uid", 0), 8, format),
itn(info.get("gid", 0), 8, format),
itn(info.get("size", 0), 12, format),
itn(info.get("mtime", 0), 12, format),
b" ", filetype,
stn(info.get("linkname", ""), 100, encoding, errors),
info.get("magic", POSIX_MAGIC),
stn(info.get("uname", ""), 32, encoding, errors),
stn(info.get("gname", ""), 32, encoding, errors),
devmajor,
devminor,
stn(info.get("prefix", ""), 155, encoding, errors)
]
buf = struct.pack("%ds" % BLOCKSIZE, b"".join(parts))
chksum = calc_chksums(buf[-BLOCKSIZE:])[0]
buf = buf[:-364] + bytes("%06o\0" % chksum, "ascii") + buf[-357:]
return buf
@staticmethod
def _create_payload(payload):
blocks, remainder = divmod(len(payload), BLOCKSIZE)
if remainder > 0:
payload += (BLOCKSIZE - remainder) * NUL
return payload
@classmethod
def _create_gnu_long_header(cls, name, type, encoding, errors):
name = name.encode(encoding, errors) + NUL
info = {}
info["name"] = "././@LongLink"
info["type"] = type
info["size"] = len(name)
info["magic"] = GNU_MAGIC
return cls._create_header(info, USTAR_FORMAT, encoding, errors) + \
cls._create_payload(name)
@classmethod
def _create_pax_generic_header(cls, pax_headers, type, encoding):
binary = False
for keyword, value in pax_headers.items():
try:
value.encode("utf-8", "strict")
except UnicodeEncodeError:
binary = True
break
records = b""
if binary:
records += b"21 hdrcharset=BINARY\n"
for keyword, value in pax_headers.items():
keyword = keyword.encode("utf-8")
if binary:
value = value.encode(encoding, "surrogateescape")
else:
value = value.encode("utf-8")
l = len(keyword) + len(value) + 3 n = p = 0
while True:
n = l + len(str(p))
if n == p:
break
p = n
records += bytes(str(p), "ascii") + b" " + keyword + b"=" + value + b"\n"
info = {}
info["name"] = "././@PaxHeader"
info["type"] = type
info["size"] = len(records)
info["magic"] = POSIX_MAGIC
return cls._create_header(info, USTAR_FORMAT, "ascii", "replace") + \
cls._create_payload(records)
@classmethod
def frombuf(cls, buf, encoding, errors):
if len(buf) == 0:
raise EmptyHeaderError("empty header")
if len(buf) != BLOCKSIZE:
raise TruncatedHeaderError("truncated header")
if buf.count(NUL) == BLOCKSIZE:
raise EOFHeaderError("end of file header")
chksum = nti(buf[148:156])
if chksum not in calc_chksums(buf):
raise InvalidHeaderError("bad checksum")
obj = cls()
obj.name = nts(buf[0:100], encoding, errors)
obj.mode = nti(buf[100:108])
obj.uid = nti(buf[108:116])
obj.gid = nti(buf[116:124])
obj.size = nti(buf[124:136])
obj.mtime = nti(buf[136:148])
obj.chksum = chksum
obj.type = buf[156:157]
obj.linkname = nts(buf[157:257], encoding, errors)
obj.uname = nts(buf[265:297], encoding, errors)
obj.gname = nts(buf[297:329], encoding, errors)
obj.devmajor = nti(buf[329:337])
obj.devminor = nti(buf[337:345])
prefix = nts(buf[345:500], encoding, errors)
if obj.type == AREGTYPE and obj.name.endswith("/"):
obj.type = DIRTYPE
if obj.type == GNUTYPE_SPARSE:
pos = 386
structs = []
for i in range(4):
try:
offset = nti(buf[pos:pos + 12])
numbytes = nti(buf[pos + 12:pos + 24])
except ValueError:
break
structs.append((offset, numbytes))
pos += 24
isextended = bool(buf[482])
origsize = nti(buf[483:495])
obj._sparse_structs = (structs, isextended, origsize)
if obj.isdir():
obj.name = obj.name.rstrip("/")
if prefix and obj.type not in GNU_TYPES:
obj.name = prefix + "/" + obj.name
return obj
@classmethod
def fromtarfile(cls, tarfile):
buf = tarfile.fileobj.read(BLOCKSIZE)
obj = cls.frombuf(buf, tarfile.encoding, tarfile.errors)
obj.offset = tarfile.fileobj.tell() - BLOCKSIZE
return obj._proc_member(tarfile)
def _proc_member(self, tarfile):
if self.type in (GNUTYPE_LONGNAME, GNUTYPE_LONGLINK):
return self._proc_gnulong(tarfile)
elif self.type == GNUTYPE_SPARSE:
return self._proc_sparse(tarfile)
elif self.type in (XHDTYPE, XGLTYPE, SOLARIS_XHDTYPE):
return self._proc_pax(tarfile)
else:
return self._proc_builtin(tarfile)
def _proc_builtin(self, tarfile):
self.offset_data = tarfile.fileobj.tell()
offset = self.offset_data
if self.isreg() or self.type not in SUPPORTED_TYPES:
offset += self._block(self.size)
tarfile.offset = offset
self._apply_pax_info(tarfile.pax_headers, tarfile.encoding, tarfile.errors)
if self.isdir():
self.name = self.name.rstrip("/")
return self
def _proc_gnulong(self, tarfile):
buf = tarfile.fileobj.read(self._block(self.size))
try:
next = self.fromtarfile(tarfile)
except HeaderError as e:
raise SubsequentHeaderError(str(e)) from None
next.offset = self.offset
if self.type == GNUTYPE_LONGNAME:
next.name = nts(buf, tarfile.encoding, tarfile.errors)
elif self.type == GNUTYPE_LONGLINK:
next.linkname = nts(buf, tarfile.encoding, tarfile.errors)
if next.isdir():
next.name = next.name.removesuffix("/")
return next
def _proc_sparse(self, tarfile):
structs, isextended, origsize = self._sparse_structs
del self._sparse_structs
while isextended:
buf = tarfile.fileobj.read(BLOCKSIZE)
pos = 0
for i in range(21):
try:
offset = nti(buf[pos:pos + 12])
numbytes = nti(buf[pos + 12:pos + 24])
except ValueError:
break
if offset and numbytes:
structs.append((offset, numbytes))
pos += 24
isextended = bool(buf[504])
self.sparse = structs
self.offset_data = tarfile.fileobj.tell()
tarfile.offset = self.offset_data + self._block(self.size)
self.size = origsize
return self
def _proc_pax(self, tarfile):
buf = tarfile.fileobj.read(self._block(self.size))
if self.type == XGLTYPE:
pax_headers = tarfile.pax_headers
else:
pax_headers = tarfile.pax_headers.copy()
pos = 0
encoding = None
raw_headers = []
while len(buf) > pos and buf[pos] != 0x00:
if not (match := _header_length_prefix_re.match(buf, pos)):
raise InvalidHeaderError("invalid header")
try:
length = int(match.group(1))
except ValueError:
raise InvalidHeaderError("invalid header")
if length < 5:
raise InvalidHeaderError("invalid header")
if pos + length > len(buf):
raise InvalidHeaderError("invalid header")
header_value_end_offset = match.start(1) + length - 1 keyword_and_value = buf[match.end(1) + 1:header_value_end_offset]
raw_keyword, equals, raw_value = keyword_and_value.partition(b"=")
if not raw_keyword or equals != b"=" or buf[header_value_end_offset] != 0x0A:
raise InvalidHeaderError("invalid header")
raw_headers.append((length, raw_keyword, raw_value))
if raw_keyword == b"hdrcharset" and encoding is None:
if raw_value == b"BINARY":
encoding = tarfile.encoding
else: encoding = "utf-8"
pos += length
if encoding is None:
encoding = "utf-8"
for length, raw_keyword, raw_value in raw_headers:
keyword = self._decode_pax_field(raw_keyword, "utf-8", "utf-8",
tarfile.errors)
if keyword in PAX_NAME_FIELDS:
value = self._decode_pax_field(raw_value, encoding, tarfile.encoding,
tarfile.errors)
else:
value = self._decode_pax_field(raw_value, "utf-8", "utf-8",
tarfile.errors)
pax_headers[keyword] = value
try:
next = self.fromtarfile(tarfile)
except HeaderError as e:
raise SubsequentHeaderError(str(e)) from None
if "GNU.sparse.map" in pax_headers:
self._proc_gnusparse_01(next, pax_headers)
elif "GNU.sparse.size" in pax_headers:
self._proc_gnusparse_00(next, raw_headers)
elif pax_headers.get("GNU.sparse.major") == "1" and pax_headers.get("GNU.sparse.minor") == "0":
self._proc_gnusparse_10(next, pax_headers, tarfile)
if self.type in (XHDTYPE, SOLARIS_XHDTYPE):
next._apply_pax_info(pax_headers, tarfile.encoding, tarfile.errors)
next.offset = self.offset
if "size" in pax_headers:
offset = next.offset_data
if next.isreg() or next.type not in SUPPORTED_TYPES:
offset += next._block(next.size)
tarfile.offset = offset
return next
def _proc_gnusparse_00(self, next, raw_headers):
offsets = []
numbytes = []
for _, keyword, value in raw_headers:
if keyword == b"GNU.sparse.offset":
try:
offsets.append(int(value.decode()))
except ValueError:
raise InvalidHeaderError("invalid header")
elif keyword == b"GNU.sparse.numbytes":
try:
numbytes.append(int(value.decode()))
except ValueError:
raise InvalidHeaderError("invalid header")
next.sparse = list(zip(offsets, numbytes))
def _proc_gnusparse_01(self, next, pax_headers):
sparse = [int(x) for x in pax_headers["GNU.sparse.map"].split(",")]
next.sparse = list(zip(sparse[::2], sparse[1::2]))
def _proc_gnusparse_10(self, next, pax_headers, tarfile):
fields = None
sparse = []
buf = tarfile.fileobj.read(BLOCKSIZE)
fields, buf = buf.split(b"\n", 1)
fields = int(fields)
while len(sparse) < fields * 2:
if b"\n" not in buf:
buf += tarfile.fileobj.read(BLOCKSIZE)
number, buf = buf.split(b"\n", 1)
sparse.append(int(number))
next.offset_data = tarfile.fileobj.tell()
next.sparse = list(zip(sparse[::2], sparse[1::2]))
def _apply_pax_info(self, pax_headers, encoding, errors):
for keyword, value in pax_headers.items():
if keyword == "GNU.sparse.name":
setattr(self, "path", value)
elif keyword == "GNU.sparse.size":
setattr(self, "size", int(value))
elif keyword == "GNU.sparse.realsize":
setattr(self, "size", int(value))
elif keyword in PAX_FIELDS:
if keyword in PAX_NUMBER_FIELDS:
try:
value = PAX_NUMBER_FIELDS[keyword](value)
except ValueError:
value = 0
if keyword == "path":
value = value.rstrip("/")
setattr(self, keyword, value)
self.pax_headers = pax_headers.copy()
def _decode_pax_field(self, value, encoding, fallback_encoding, fallback_errors):
try:
return value.decode(encoding, "strict")
except UnicodeDecodeError:
return value.decode(fallback_encoding, fallback_errors)
def _block(self, count):
if count < 0:
raise InvalidHeaderError("invalid offset")
blocks, remainder = divmod(count, BLOCKSIZE)
if remainder:
blocks += 1
return blocks * BLOCKSIZE
def isreg(self):
'Return True if the Tarinfo object is a regular file.'
return self.type in REGULAR_TYPES
def isfile(self):
'Return True if the Tarinfo object is a regular file.'
return self.isreg()
def isdir(self):
'Return True if it is a directory.'
return self.type == DIRTYPE
def issym(self):
'Return True if it is a symbolic link.'
return self.type == SYMTYPE
def islnk(self):
'Return True if it is a hard link.'
return self.type == LNKTYPE
def ischr(self):
'Return True if it is a character device.'
return self.type == CHRTYPE
def isblk(self):
'Return True if it is a block device.'
return self.type == BLKTYPE
def isfifo(self):
'Return True if it is a FIFO.'
return self.type == FIFOTYPE
def issparse(self):
return self.sparse is not None
def isdev(self):
'Return True if it is one of character device, block device or FIFO.'
return self.type in (CHRTYPE, BLKTYPE, FIFOTYPE)
class TarFile(object):
debug = 0
dereference = False
ignore_zeros = False
errorlevel = 1
format = DEFAULT_FORMAT
encoding = ENCODING
errors = None
tarinfo = TarInfo
fileobject = ExFileObject
extraction_filter = None
def __init__(self, name=None, mode="r", fileobj=None, format=None,
tarinfo=None, dereference=None, ignore_zeros=None, encoding=None,
errors="surrogateescape", pax_headers=None, debug=None,
errorlevel=None, copybufsize=None, stream=False):
modes = {"r": "rb", "a": "r+b", "w": "wb", "x": "xb"}
if mode not in modes:
raise ValueError("mode must be 'r', 'a', 'w' or 'x'")
self.mode = mode
self._mode = modes[mode]
if not fileobj:
if self.mode == "a" and not os.path.exists(name):
self.mode = "w"
self._mode = "wb"
fileobj = bltn_open(name, self._mode)
self._extfileobj = False
else:
if (name is None and hasattr(fileobj, "name") and
isinstance(fileobj.name, (str, bytes))):
name = fileobj.name
if hasattr(fileobj, "mode"):
self._mode = fileobj.mode
self._extfileobj = True
self.name = os.path.abspath(name) if name else None
self.fileobj = fileobj
self.stream = stream
if format is not None:
self.format = format
if tarinfo is not None:
self.tarinfo = tarinfo
if dereference is not None:
self.dereference = dereference
if ignore_zeros is not None:
self.ignore_zeros = ignore_zeros
if encoding is not None:
self.encoding = encoding
self.errors = errors
if pax_headers is not None and self.format == PAX_FORMAT:
self.pax_headers = pax_headers
else:
self.pax_headers = {}
if debug is not None:
self.debug = debug
if errorlevel is not None:
self.errorlevel = errorlevel
self.copybufsize = copybufsize
self.closed = False
self.members = [] self._loaded = False self.offset = self.fileobj.tell()
self.inodes = {} self._unames = {} self._gnames = {}
try:
if self.mode == "r":
self.firstmember = None
self.firstmember = self.next()
if self.mode == "a":
while True:
self.fileobj.seek(self.offset)
try:
tarinfo = self.tarinfo.fromtarfile(self)
self.members.append(tarinfo)
except EOFHeaderError:
self.fileobj.seek(self.offset)
break
except HeaderError as e:
raise ReadError(str(e)) from None
if self.mode in ("a", "w", "x"):
self._loaded = True
if self.pax_headers:
buf = self.tarinfo.create_pax_global_header(self.pax_headers.copy())
self.fileobj.write(buf)
self.offset += len(buf)
except:
if not self._extfileobj:
self.fileobj.close()
self.closed = True
raise
@classmethod
def open(cls, name=None, mode="r", fileobj=None, bufsize=RECORDSIZE, **kwargs):
if not name and not fileobj:
raise ValueError("nothing to open")
if mode in ("r", "r:*"):
def not_compressed(comptype):
return cls.OPEN_METH[comptype] == 'taropen'
error_msgs = []
for comptype in sorted(cls.OPEN_METH, key=not_compressed):
func = getattr(cls, cls.OPEN_METH[comptype])
if fileobj is not None:
saved_pos = fileobj.tell()
try:
return func(name, "r", fileobj, **kwargs)
except (ReadError, CompressionError) as e:
error_msgs.append(f'- method {comptype}: {e!r}')
if fileobj is not None:
fileobj.seek(saved_pos)
continue
error_msgs_summary = '\n'.join(error_msgs)
raise ReadError(f"file could not be opened successfully:\n{error_msgs_summary}")
elif ":" in mode:
filemode, comptype = mode.split(":", 1)
filemode = filemode or "r"
comptype = comptype or "tar"
if comptype in cls.OPEN_METH:
func = getattr(cls, cls.OPEN_METH[comptype])
else:
raise CompressionError("unknown compression type %r" % comptype)
return func(name, filemode, fileobj, **kwargs)
elif "|" in mode:
filemode, comptype = mode.split("|", 1)
filemode = filemode or "r"
comptype = comptype or "tar"
if filemode not in ("r", "w"):
raise ValueError("mode must be 'r' or 'w'")
if "compresslevel" in kwargs and comptype not in ("gz", "bz2"):
raise ValueError(
"compresslevel is only valid for w|gz and w|bz2 modes"
)
if "preset" in kwargs and comptype not in ("xz",):
raise ValueError("preset is only valid for w|xz mode")
compresslevel = kwargs.pop("compresslevel", 9)
preset = kwargs.pop("preset", None)
stream = _Stream(name, filemode, comptype, fileobj, bufsize,
compresslevel, preset)
try:
t = cls(name, filemode, stream, **kwargs)
except:
stream.close()
raise
t._extfileobj = False
return t
elif mode in ("a", "w", "x"):
return cls.taropen(name, mode, fileobj, **kwargs)
raise ValueError("undiscernible mode")
@classmethod
def taropen(cls, name, mode="r", fileobj=None, **kwargs):
if mode not in ("r", "a", "w", "x"):
raise ValueError("mode must be 'r', 'a', 'w' or 'x'")
return cls(name, mode, fileobj, **kwargs)
@classmethod
def gzopen(cls, name, mode="r", fileobj=None, compresslevel=9, **kwargs):
if mode not in ("r", "w", "x"):
raise ValueError("mode must be 'r', 'w' or 'x'")
try:
from gzip import GzipFile
except ImportError:
raise CompressionError("gzip module is not available") from None
try:
fileobj = GzipFile(name, mode + "b", compresslevel, fileobj)
except OSError as e:
if fileobj is not None and mode == 'r':
raise ReadError("not a gzip file") from e
raise
try:
t = cls.taropen(name, mode, fileobj, **kwargs)
except OSError as e:
fileobj.close()
if mode == 'r':
raise ReadError("not a gzip file") from e
raise
except:
fileobj.close()
raise
t._extfileobj = False
return t
@classmethod
def bz2open(cls, name, mode="r", fileobj=None, compresslevel=9, **kwargs):
if mode not in ("r", "w", "x"):
raise ValueError("mode must be 'r', 'w' or 'x'")
try:
from bz2 import BZ2File
except ImportError:
raise CompressionError("bz2 module is not available") from None
fileobj = BZ2File(fileobj or name, mode, compresslevel=compresslevel)
try:
t = cls.taropen(name, mode, fileobj, **kwargs)
except (OSError, EOFError) as e:
fileobj.close()
if mode == 'r':
raise ReadError("not a bzip2 file") from e
raise
except:
fileobj.close()
raise
t._extfileobj = False
return t
@classmethod
def xzopen(cls, name, mode="r", fileobj=None, preset=None, **kwargs):
if mode not in ("r", "w", "x"):
raise ValueError("mode must be 'r', 'w' or 'x'")
try:
from lzma import LZMAFile, LZMAError
except ImportError:
raise CompressionError("lzma module is not available") from None
fileobj = LZMAFile(fileobj or name, mode, preset=preset)
try:
t = cls.taropen(name, mode, fileobj, **kwargs)
except (LZMAError, EOFError) as e:
fileobj.close()
if mode == 'r':
raise ReadError("not an lzma file") from e
raise
except:
fileobj.close()
raise
t._extfileobj = False
return t
@classmethod
def zstopen(cls, name, mode="r", fileobj=None, level=None, options=None,
zstd_dict=None, **kwargs):
if mode not in ("r", "w", "x"):
raise ValueError("mode must be 'r', 'w' or 'x'")
try:
from compression.zstd import ZstdFile, ZstdError
except ImportError:
raise CompressionError("compression.zstd module is not available") from None
fileobj = ZstdFile(
fileobj or name,
mode,
level=level,
options=options,
zstd_dict=zstd_dict
)
try:
t = cls.taropen(name, mode, fileobj, **kwargs)
except (ZstdError, EOFError) as e:
fileobj.close()
if mode == 'r':
raise ReadError("not a zstd file") from e
raise
except Exception:
fileobj.close()
raise
t._extfileobj = False
return t
OPEN_METH = {
"tar": "taropen", "gz": "gzopen", "bz2": "bz2open", "xz": "xzopen", "zst": "zstopen", }
def close(self):
if self.closed:
return
self.closed = True
try:
if self.mode in ("a", "w", "x"):
self.fileobj.write(NUL * (BLOCKSIZE * 2))
self.offset += (BLOCKSIZE * 2)
blocks, remainder = divmod(self.offset, RECORDSIZE)
if remainder > 0:
self.fileobj.write(NUL * (RECORDSIZE - remainder))
finally:
if not self._extfileobj:
self.fileobj.close()
def getmember(self, name):
tarinfo = self._getmember(name.rstrip('/'))
if tarinfo is None:
raise KeyError("filename %r not found" % name)
return tarinfo
def getmembers(self):
self._check()
if not self._loaded: self._load() return self.members
def getnames(self):
return [tarinfo.name for tarinfo in self.getmembers()]
def gettarinfo(self, name=None, arcname=None, fileobj=None):
self._check("awx")
if fileobj is not None:
name = fileobj.name
if arcname is None:
arcname = name
drv, arcname = os.path.splitdrive(arcname)
arcname = arcname.replace(os.sep, "/")
arcname = arcname.lstrip("/")
tarinfo = self.tarinfo()
tarinfo._tarfile = self
if fileobj is None:
if not self.dereference:
statres = os.lstat(name)
else:
statres = os.stat(name)
else:
statres = os.fstat(fileobj.fileno())
linkname = ""
stmd = statres.st_mode
if stat.S_ISREG(stmd):
inode = (statres.st_ino, statres.st_dev)
if not self.dereference and statres.st_nlink > 1 and \
inode in self.inodes and arcname != self.inodes[inode]:
type = LNKTYPE
linkname = self.inodes[inode]
else:
type = REGTYPE
if inode[0]:
self.inodes[inode] = arcname
elif stat.S_ISDIR(stmd):
type = DIRTYPE
elif stat.S_ISFIFO(stmd):
type = FIFOTYPE
elif stat.S_ISLNK(stmd):
type = SYMTYPE
linkname = os.readlink(name)
elif stat.S_ISCHR(stmd):
type = CHRTYPE
elif stat.S_ISBLK(stmd):
type = BLKTYPE
else:
return None
tarinfo.name = arcname
tarinfo.mode = stmd
tarinfo.uid = statres.st_uid
tarinfo.gid = statres.st_gid
if type == REGTYPE:
tarinfo.size = statres.st_size
else:
tarinfo.size = 0
tarinfo.mtime = statres.st_mtime
tarinfo.type = type
tarinfo.linkname = linkname
if pwd:
if tarinfo.uid not in self._unames:
try:
self._unames[tarinfo.uid] = pwd.getpwuid(tarinfo.uid)[0]
except KeyError:
self._unames[tarinfo.uid] = ''
tarinfo.uname = self._unames[tarinfo.uid]
if grp:
if tarinfo.gid not in self._gnames:
try:
self._gnames[tarinfo.gid] = grp.getgrgid(tarinfo.gid)[0]
except KeyError:
self._gnames[tarinfo.gid] = ''
tarinfo.gname = self._gnames[tarinfo.gid]
if type in (CHRTYPE, BLKTYPE):
if hasattr(os, "major") and hasattr(os, "minor"):
tarinfo.devmajor = os.major(statres.st_rdev)
tarinfo.devminor = os.minor(statres.st_rdev)
return tarinfo
def list(self, verbose=True, *, members=None):
type2mode = {REGTYPE: stat.S_IFREG, SYMTYPE: stat.S_IFLNK,
FIFOTYPE: stat.S_IFIFO, CHRTYPE: stat.S_IFCHR,
DIRTYPE: stat.S_IFDIR, BLKTYPE: stat.S_IFBLK}
self._check()
if members is None:
members = self
for tarinfo in members:
if verbose:
if tarinfo.mode is None:
_safe_print("??????????")
else:
modetype = type2mode.get(tarinfo.type, 0)
_safe_print(stat.filemode(modetype | tarinfo.mode))
_safe_print("%s/%s" % (tarinfo.uname or tarinfo.uid,
tarinfo.gname or tarinfo.gid))
if tarinfo.ischr() or tarinfo.isblk():
_safe_print("%10s" %
("%d,%d" % (tarinfo.devmajor, tarinfo.devminor)))
else:
_safe_print("%10d" % tarinfo.size)
if tarinfo.mtime is None:
_safe_print("????-??-?? ??:??:??")
else:
_safe_print("%d-%02d-%02d %02d:%02d:%02d" \
% time.localtime(tarinfo.mtime)[:6])
_safe_print(tarinfo.name + ("/" if tarinfo.isdir() else ""))
if verbose:
if tarinfo.issym():
_safe_print("-> " + tarinfo.linkname)
if tarinfo.islnk():
_safe_print("link to " + tarinfo.linkname)
print()
def add(self, name, arcname=None, recursive=True, *, filter=None):
self._check("awx")
if arcname is None:
arcname = name
if self.name is not None and os.path.abspath(name) == self.name:
self._dbg(2, "tarfile: Skipped %r" % name)
return
self._dbg(1, name)
tarinfo = self.gettarinfo(name, arcname)
if tarinfo is None:
self._dbg(1, "tarfile: Unsupported type %r" % name)
return
if filter is not None:
tarinfo = filter(tarinfo)
if tarinfo is None:
self._dbg(2, "tarfile: Excluded %r" % name)
return
if tarinfo.isreg():
with bltn_open(name, "rb") as f:
self.addfile(tarinfo, f)
elif tarinfo.isdir():
self.addfile(tarinfo)
if recursive:
for f in sorted(os.listdir(name)):
self.add(os.path.join(name, f), os.path.join(arcname, f),
recursive, filter=filter)
else:
self.addfile(tarinfo)
def addfile(self, tarinfo, fileobj=None):
self._check("awx")
if fileobj is None and tarinfo.isreg() and tarinfo.size != 0:
raise ValueError("fileobj not provided for non zero-size regular file")
tarinfo = copy.copy(tarinfo)
buf = tarinfo.tobuf(self.format, self.encoding, self.errors)
self.fileobj.write(buf)
self.offset += len(buf)
bufsize=self.copybufsize
if fileobj is not None:
copyfileobj(fileobj, self.fileobj, tarinfo.size, bufsize=bufsize)
blocks, remainder = divmod(tarinfo.size, BLOCKSIZE)
if remainder > 0:
self.fileobj.write(NUL * (BLOCKSIZE - remainder))
blocks += 1
self.offset += blocks * BLOCKSIZE
self.members.append(tarinfo)
def _get_filter_function(self, filter):
if filter is None:
filter = self.extraction_filter
if filter is None:
return data_filter
if isinstance(filter, str):
raise TypeError(
'String names are not supported for '
+ 'TarFile.extraction_filter. Use a function such as '
+ 'tarfile.data_filter directly.')
return filter
if callable(filter):
return filter
try:
return _NAMED_FILTERS[filter]
except KeyError:
raise ValueError(f"filter {filter!r} not found") from None
def extractall(self, path=".", members=None, *, numeric_owner=False,
filter=None):
directories = []
filter_function = self._get_filter_function(filter)
if members is None:
members = self
for member in members:
tarinfo, unfiltered = self._get_extract_tarinfo(
member, filter_function, path)
if tarinfo is None:
continue
if tarinfo.isdir():
directories.append(unfiltered)
self._extract_one(tarinfo, path, set_attrs=not tarinfo.isdir(),
numeric_owner=numeric_owner,
filter_function=filter_function)
directories.sort(key=lambda a: a.name, reverse=True)
for unfiltered in directories:
try:
try:
tarinfo = filter_function(unfiltered, path)
except _FILTER_ERRORS as exc:
self._log_no_directory_fixup(unfiltered, repr(exc))
continue
if tarinfo is None:
self._log_no_directory_fixup(unfiltered,
'excluded by filter')
continue
dirpath = os.path.join(path, tarinfo.name)
try:
lstat = os.lstat(dirpath)
except FileNotFoundError:
self._log_no_directory_fixup(tarinfo, 'missing')
continue
if not stat.S_ISDIR(lstat.st_mode):
self._log_no_directory_fixup(tarinfo, 'not a directory')
continue
self.chown(tarinfo, dirpath, numeric_owner=numeric_owner)
self.utime(tarinfo, dirpath)
self.chmod(tarinfo, dirpath)
except ExtractError as e:
self._handle_nonfatal_error(e)
def _log_no_directory_fixup(self, member, reason):
self._dbg(2, "tarfile: Not fixing up directory %r (%s)" %
(member.name, reason))
def extract(self, member, path="", set_attrs=True, *, numeric_owner=False,
filter=None):
filter_function = self._get_filter_function(filter)
tarinfo, unfiltered = self._get_extract_tarinfo(
member, filter_function, path)
if tarinfo is not None:
self._extract_one(tarinfo, path, set_attrs, numeric_owner)
def _get_extract_tarinfo(self, member, filter_function, path):
if isinstance(member, str):
unfiltered = self.getmember(member)
else:
unfiltered = member
filtered = None
try:
filtered = filter_function(unfiltered, path)
except (OSError, UnicodeEncodeError, FilterError) as e:
self._handle_fatal_error(e)
except ExtractError as e:
self._handle_nonfatal_error(e)
if filtered is None:
self._dbg(2, "tarfile: Excluded %r" % unfiltered.name)
return None, None
if filtered.islnk():
filtered = copy.copy(filtered)
filtered._link_target = os.path.join(path, filtered.linkname)
return filtered, unfiltered
def _extract_one(self, tarinfo, path, set_attrs, numeric_owner,
filter_function=None):
self._check("r")
try:
self._extract_member(tarinfo, os.path.join(path, tarinfo.name),
set_attrs=set_attrs,
numeric_owner=numeric_owner,
filter_function=filter_function,
extraction_root=path)
except (OSError, UnicodeEncodeError) as e:
self._handle_fatal_error(e)
except ExtractError as e:
self._handle_nonfatal_error(e)
def _handle_nonfatal_error(self, e):
if self.errorlevel > 1:
raise
else:
self._dbg(1, "tarfile: %s" % e)
def _handle_fatal_error(self, e):
if self.errorlevel > 0:
raise
elif isinstance(e, OSError):
if e.filename is None:
self._dbg(1, "tarfile: %s" % e.strerror)
else:
self._dbg(1, "tarfile: %s %r" % (e.strerror, e.filename))
else:
self._dbg(1, "tarfile: %s %s" % (type(e).__name__, e))
def extractfile(self, member):
self._check("r")
if isinstance(member, str):
tarinfo = self.getmember(member)
else:
tarinfo = member
if tarinfo.isreg() or tarinfo.type not in SUPPORTED_TYPES:
return self.fileobject(self, tarinfo)
elif tarinfo.islnk() or tarinfo.issym():
if isinstance(self.fileobj, _Stream):
raise StreamError("cannot extract (sym)link as file object")
else:
return self.extractfile(self._find_link_target(tarinfo))
else:
return None
def _extract_member(self, tarinfo, targetpath, set_attrs=True,
numeric_owner=False, *, filter_function=None,
extraction_root=None):
targetpath = targetpath.rstrip("/")
targetpath = targetpath.replace("/", os.sep)
upperdirs = os.path.dirname(targetpath)
if upperdirs and not os.path.exists(upperdirs):
os.makedirs(upperdirs, exist_ok=True)
if tarinfo.islnk() or tarinfo.issym():
self._dbg(1, "%s -> %s" % (tarinfo.name, tarinfo.linkname))
else:
self._dbg(1, tarinfo.name)
if tarinfo.isreg():
self.makefile(tarinfo, targetpath)
elif tarinfo.isdir():
self.makedir(tarinfo, targetpath)
elif tarinfo.isfifo():
self.makefifo(tarinfo, targetpath)
elif tarinfo.ischr() or tarinfo.isblk():
self.makedev(tarinfo, targetpath)
elif tarinfo.islnk() or tarinfo.issym():
self.makelink_with_filter(
tarinfo, targetpath,
filter_function=filter_function,
extraction_root=extraction_root)
elif tarinfo.type not in SUPPORTED_TYPES:
self.makeunknown(tarinfo, targetpath)
else:
self.makefile(tarinfo, targetpath)
if set_attrs:
self.chown(tarinfo, targetpath, numeric_owner)
if not tarinfo.issym():
self.chmod(tarinfo, targetpath)
self.utime(tarinfo, targetpath)
def makedir(self, tarinfo, targetpath):
try:
if tarinfo.mode is None:
os.mkdir(targetpath)
else:
os.mkdir(targetpath, 0o700)
except FileExistsError:
if not os.path.isdir(targetpath):
raise
def makefile(self, tarinfo, targetpath):
source = self.fileobj
source.seek(tarinfo.offset_data)
bufsize = self.copybufsize
with bltn_open(targetpath, "wb") as target:
if tarinfo.sparse is not None:
for offset, size in tarinfo.sparse:
target.seek(offset)
copyfileobj(source, target, size, ReadError, bufsize)
target.seek(tarinfo.size)
target.truncate()
else:
copyfileobj(source, target, tarinfo.size, ReadError, bufsize)
def makeunknown(self, tarinfo, targetpath):
self.makefile(tarinfo, targetpath)
self._dbg(1, "tarfile: Unknown file type %r, " \
"extracted as regular file." % tarinfo.type)
def makefifo(self, tarinfo, targetpath):
if hasattr(os, "mkfifo"):
os.mkfifo(targetpath)
else:
raise ExtractError("fifo not supported by system")
def makedev(self, tarinfo, targetpath):
if not hasattr(os, "mknod") or not hasattr(os, "makedev"):
raise ExtractError("special devices not supported by system")
mode = tarinfo.mode
if mode is None:
mode = 0o600
if tarinfo.isblk():
mode |= stat.S_IFBLK
else:
mode |= stat.S_IFCHR
os.mknod(targetpath, mode,
os.makedev(tarinfo.devmajor, tarinfo.devminor))
def makelink(self, tarinfo, targetpath):
return self.makelink_with_filter(tarinfo, targetpath, None, None)
def makelink_with_filter(self, tarinfo, targetpath,
filter_function, extraction_root):
keyerror_to_extracterror = False
try:
if tarinfo.issym():
if os.path.lexists(targetpath):
os.unlink(targetpath)
os.symlink(tarinfo.linkname, targetpath)
return
else:
if os.path.exists(tarinfo._link_target):
if os.path.lexists(targetpath):
os.unlink(targetpath)
os.link(tarinfo._link_target, targetpath)
return
except symlink_exception:
keyerror_to_extracterror = True
try:
unfiltered = self._find_link_target(tarinfo)
except KeyError:
if keyerror_to_extracterror:
raise ExtractError(
"unable to resolve link inside archive") from None
else:
raise
if filter_function is None:
filtered = unfiltered
else:
if extraction_root is None:
raise ExtractError(
"makelink_with_filter: if filter_function is not None, "
+ "extraction_root must also not be None")
try:
filtered = filter_function(unfiltered, extraction_root)
except _FILTER_ERRORS as cause:
raise LinkFallbackError(tarinfo, unfiltered.name) from cause
if filtered is not None:
self._extract_member(filtered, targetpath,
filter_function=filter_function,
extraction_root=extraction_root)
def chown(self, tarinfo, targetpath, numeric_owner):
if hasattr(os, "geteuid") and os.geteuid() == 0:
g = tarinfo.gid
u = tarinfo.uid
if not numeric_owner:
try:
if grp and tarinfo.gname:
g = grp.getgrnam(tarinfo.gname)[2]
except KeyError:
pass
try:
if pwd and tarinfo.uname:
u = pwd.getpwnam(tarinfo.uname)[2]
except KeyError:
pass
if g is None:
g = -1
if u is None:
u = -1
try:
if tarinfo.issym() and hasattr(os, "lchown"):
os.lchown(targetpath, u, g)
else:
os.chown(targetpath, u, g)
except (OSError, OverflowError) as e:
raise ExtractError("could not change owner") from e
def chmod(self, tarinfo, targetpath):
if tarinfo.mode is None:
return
try:
os.chmod(targetpath, tarinfo.mode)
except OSError as e:
raise ExtractError("could not change mode") from e
def utime(self, tarinfo, targetpath):
mtime = tarinfo.mtime
if mtime is None:
return
if not hasattr(os, 'utime'):
return
try:
os.utime(targetpath, (mtime, mtime))
except OSError as e:
raise ExtractError("could not change modification time") from e
def next(self):
self._check("ra")
if self.firstmember is not None:
m = self.firstmember
self.firstmember = None
return m
if self.offset != self.fileobj.tell():
if self.offset == 0:
return None
self.fileobj.seek(self.offset - 1)
if not self.fileobj.read(1):
raise ReadError("unexpected end of data")
tarinfo = None
while True:
try:
tarinfo = self.tarinfo.fromtarfile(self)
except EOFHeaderError as e:
if self.ignore_zeros:
self._dbg(2, "0x%X: %s" % (self.offset, e))
self.offset += BLOCKSIZE
continue
except InvalidHeaderError as e:
if self.ignore_zeros:
self._dbg(2, "0x%X: %s" % (self.offset, e))
self.offset += BLOCKSIZE
continue
elif self.offset == 0:
raise ReadError(str(e)) from None
except EmptyHeaderError:
if self.offset == 0:
raise ReadError("empty file") from None
except TruncatedHeaderError as e:
if self.offset == 0:
raise ReadError(str(e)) from None
except SubsequentHeaderError as e:
raise ReadError(str(e)) from None
except Exception as e:
try:
import zlib
if isinstance(e, zlib.error):
raise ReadError(f'zlib error: {e}') from None
else:
raise e
except ImportError:
raise e
break
if tarinfo is not None:
if not self.stream:
self.members.append(tarinfo)
else:
self._loaded = True
return tarinfo
def _getmember(self, name, tarinfo=None, normalize=False):
members = self.getmembers()
skipping = False
if tarinfo is not None:
try:
index = members.index(tarinfo)
except ValueError:
skipping = True
else:
members = members[:index]
if normalize:
name = os.path.normpath(name)
for member in reversed(members):
if skipping:
if tarinfo.offset == member.offset:
skipping = False
continue
if normalize:
member_name = os.path.normpath(member.name)
else:
member_name = member.name
if name == member_name:
return member
if skipping:
raise ValueError(tarinfo)
def _load(self):
if not self.stream:
while self.next() is not None:
pass
self._loaded = True
def _check(self, mode=None):
if self.closed:
raise OSError("%s is closed" % self.__class__.__name__)
if mode is not None and self.mode not in mode:
raise OSError("bad operation for mode %r" % self.mode)
def _find_link_target(self, tarinfo):
if tarinfo.issym():
linkname = "/".join(filter(None, (os.path.dirname(tarinfo.name), tarinfo.linkname)))
limit = None
else:
linkname = tarinfo.linkname
limit = tarinfo
member = self._getmember(linkname, tarinfo=limit, normalize=True)
if member is None:
raise KeyError("linkname %r not found" % linkname)
return member
def __iter__(self):
if self._loaded:
yield from self.members
return
index = 0
if self.firstmember is not None:
tarinfo = self.next()
index += 1
yield tarinfo
while True:
if index < len(self.members):
tarinfo = self.members[index]
elif not self._loaded:
tarinfo = self.next()
if not tarinfo:
self._loaded = True
return
else:
return
index += 1
yield tarinfo
def _dbg(self, level, msg):
if level <= self.debug:
print(msg, file=sys.stderr)
def __enter__(self):
self._check()
return self
def __exit__(self, type, value, traceback):
if type is None:
self.close()
else:
if not self._extfileobj:
self.fileobj.close()
self.closed = True
def is_tarfile(name):
try:
if hasattr(name, "read"):
pos = name.tell()
t = open(fileobj=name)
name.seek(pos)
else:
t = open(name)
t.close()
return True
except TarError:
return False
open = TarFile.open
def main():
import argparse
description = 'A simple command-line interface for tarfile module.'
parser = argparse.ArgumentParser(description=description, color=True)
parser.add_argument('-v', '--verbose', action='store_true', default=False,
help='Verbose output')
parser.add_argument('--filter', metavar='<filtername>',
choices=_NAMED_FILTERS,
help='Filter for extraction')
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('-l', '--list', metavar='<tarfile>',
help='Show listing of a tarfile')
group.add_argument('-e', '--extract', nargs='+',
metavar=('<tarfile>', '<output_dir>'),
help='Extract tarfile into target dir')
group.add_argument('-c', '--create', nargs='+',
metavar=('<name>', '<file>'),
help='Create tarfile from sources')
group.add_argument('-t', '--test', metavar='<tarfile>',
help='Test if a tarfile is valid')
args = parser.parse_args()
if args.filter and args.extract is None:
parser.exit(1, '--filter is only valid for extraction\n')
if args.test is not None:
src = args.test
if is_tarfile(src):
with open(src, 'r') as tar:
tar.getmembers()
print(tar.getmembers(), file=sys.stderr)
if args.verbose:
print('{!r} is a tar archive.'.format(src))
else:
parser.exit(1, '{!r} is not a tar archive.\n'.format(src))
elif args.list is not None:
src = args.list
if is_tarfile(src):
with TarFile.open(src, 'r:*') as tf:
tf.list(verbose=args.verbose)
else:
parser.exit(1, '{!r} is not a tar archive.\n'.format(src))
elif args.extract is not None:
if len(args.extract) == 1:
src = args.extract[0]
curdir = os.curdir
elif len(args.extract) == 2:
src, curdir = args.extract
else:
parser.exit(1, parser.format_help())
if is_tarfile(src):
with TarFile.open(src, 'r:*') as tf:
tf.extractall(path=curdir, filter=args.filter)
if args.verbose:
if curdir == '.':
msg = '{!r} file is extracted.'.format(src)
else:
msg = ('{!r} file is extracted '
'into {!r} directory.').format(src, curdir)
print(msg)
else:
parser.exit(1, '{!r} is not a tar archive.\n'.format(src))
elif args.create is not None:
tar_name = args.create.pop(0)
_, ext = os.path.splitext(tar_name)
compressions = {
'.gz': 'gz',
'.tgz': 'gz',
'.xz': 'xz',
'.txz': 'xz',
'.bz2': 'bz2',
'.tbz': 'bz2',
'.tbz2': 'bz2',
'.tb2': 'bz2',
'.zst': 'zst',
'.tzst': 'zst',
}
tar_mode = 'w:' + compressions[ext] if ext in compressions else 'w'
tar_files = args.create
with TarFile.open(tar_name, tar_mode) as tf:
for file_name in tar_files:
tf.add(file_name)
if args.verbose:
print('{!r} file created.'.format(tar_name))
if __name__ == '__main__':
main()