__all__ = [
"NamedTemporaryFile", "TemporaryFile", "SpooledTemporaryFile", "TemporaryDirectory",
"mkstemp", "mkdtemp", "mktemp", "TMP_MAX", "gettempprefix", "tempdir", "gettempdir",
"gettempprefixb", "gettempdirb",
]
import functools as _functools
import warnings as _warnings
import io as _io
import os as _os
import shutil as _shutil
import errno as _errno
from random import Random as _Random
import sys as _sys
import types as _types
import weakref as _weakref
import _thread
_allocate_lock = _thread.allocate_lock
_text_openflags = _os.O_RDWR | _os.O_CREAT | _os.O_EXCL
if hasattr(_os, 'O_NOFOLLOW'):
_text_openflags |= _os.O_NOFOLLOW
_bin_openflags = _text_openflags
if hasattr(_os, 'O_BINARY'):
_bin_openflags |= _os.O_BINARY
if hasattr(_os, 'TMP_MAX'):
TMP_MAX = _os.TMP_MAX
else:
TMP_MAX = 10000
template = "tmp"
_once_lock = _allocate_lock()
def _exists(fn):
try:
_os.lstat(fn)
except OSError:
return False
else:
return True
def _infer_return_type(*args):
return_type = None
for arg in args:
if arg is None:
continue
if isinstance(arg, _os.PathLike):
arg = _os.fspath(arg)
if isinstance(arg, bytes):
if return_type is str:
raise TypeError("Can't mix bytes and non-bytes in "
"path components.")
return_type = bytes
else:
if return_type is bytes:
raise TypeError("Can't mix bytes and non-bytes in "
"path components.")
return_type = str
if return_type is None:
if tempdir is None or isinstance(tempdir, str):
return str else:
return bytes
return return_type
def _sanitize_params(prefix, suffix, dir):
output_type = _infer_return_type(prefix, suffix, dir)
if suffix is None:
suffix = output_type()
if prefix is None:
if output_type is str:
prefix = template
else:
prefix = _os.fsencode(template)
if dir is None:
if output_type is str:
dir = gettempdir()
else:
dir = gettempdirb()
return prefix, suffix, dir, output_type
class _RandomNameSequence:
characters = "abcdefghijklmnopqrstuvwxyz0123456789_"
@property
def rng(self):
cur_pid = _os.getpid()
if cur_pid != getattr(self, '_rng_pid', None):
self._rng = _Random()
self._rng_pid = cur_pid
return self._rng
def __iter__(self):
return self
def __next__(self):
return ''.join(self.rng.choices(self.characters, k=8))
def _candidate_tempdir_list():
dirlist = []
for envname in 'TMPDIR', 'TEMP', 'TMP':
dirname = _os.getenv(envname)
if dirname: dirlist.append(dirname)
if _os.name == 'nt':
dirlist.extend([ _os.path.expanduser(r'~\AppData\Local\Temp'),
_os.path.expandvars(r'%SYSTEMROOT%\Temp'),
r'c:\temp', r'c:\tmp', r'\temp', r'\tmp' ])
else:
dirlist.extend([ '/tmp', '/var/tmp', '/usr/tmp' ])
try:
dirlist.append(_os.getcwd())
except (AttributeError, OSError):
dirlist.append(_os.curdir)
return dirlist
def _get_default_tempdir(dirlist=None):
namer = _RandomNameSequence()
if dirlist is None:
dirlist = _candidate_tempdir_list()
for dir in dirlist:
if dir != _os.curdir:
dir = _os.path.abspath(dir)
for seq in range(100):
name = next(namer)
filename = _os.path.join(dir, name)
try:
fd = _os.open(filename, _bin_openflags, 0o600)
try:
try:
_os.write(fd, b'blat')
finally:
_os.close(fd)
finally:
_os.unlink(filename)
return dir
except FileExistsError:
pass
except PermissionError:
if (_os.name == 'nt' and _os.path.isdir(dir) and
_os.access(dir, _os.W_OK)):
continue
break except OSError:
break raise FileNotFoundError(_errno.ENOENT,
"No usable temporary directory found in %s" %
dirlist)
_name_sequence = None
def _get_candidate_names():
global _name_sequence
if _name_sequence is None:
_once_lock.acquire()
try:
if _name_sequence is None:
_name_sequence = _RandomNameSequence()
finally:
_once_lock.release()
return _name_sequence
def _mkstemp_inner(dir, pre, suf, flags, output_type):
dir = _os.path.abspath(dir)
names = _get_candidate_names()
if output_type is bytes:
names = map(_os.fsencode, names)
for seq in range(TMP_MAX):
name = next(names)
file = _os.path.join(dir, pre + name + suf)
_sys.audit("tempfile.mkstemp", file)
try:
fd = _os.open(file, flags, 0o600)
except FileExistsError:
continue except PermissionError:
if (_os.name == 'nt' and _os.path.isdir(dir) and
_os.access(dir, _os.W_OK)):
continue
else:
raise
return fd, file
raise FileExistsError(_errno.EEXIST,
"No usable temporary file name found")
def _dont_follow_symlinks(func, path, *args):
if func in _os.supports_follow_symlinks:
func(path, *args, follow_symlinks=False)
elif not _os.path.islink(path):
func(path, *args)
def _resetperms(path):
try:
chflags = _os.chflags
except AttributeError:
pass
else:
_dont_follow_symlinks(chflags, path, 0)
_dont_follow_symlinks(_os.chmod, path, 0o700)
def gettempprefix():
return _os.fsdecode(template)
def gettempprefixb():
return _os.fsencode(template)
tempdir = None
def _gettempdir():
global tempdir
if tempdir is None:
_once_lock.acquire()
try:
if tempdir is None:
tempdir = _get_default_tempdir()
finally:
_once_lock.release()
return tempdir
def gettempdir():
return _os.fsdecode(_gettempdir())
def gettempdirb():
return _os.fsencode(_gettempdir())
def mkstemp(suffix=None, prefix=None, dir=None, text=False):
prefix, suffix, dir, output_type = _sanitize_params(prefix, suffix, dir)
if text:
flags = _text_openflags
else:
flags = _bin_openflags
return _mkstemp_inner(dir, prefix, suffix, flags, output_type)
def mkdtemp(suffix=None, prefix=None, dir=None):
prefix, suffix, dir, output_type = _sanitize_params(prefix, suffix, dir)
names = _get_candidate_names()
if output_type is bytes:
names = map(_os.fsencode, names)
for seq in range(TMP_MAX):
name = next(names)
file = _os.path.join(dir, prefix + name + suffix)
_sys.audit("tempfile.mkdtemp", file)
try:
_os.mkdir(file, 0o700)
except FileExistsError:
continue except PermissionError:
if (_os.name == 'nt' and _os.path.isdir(dir) and
_os.access(dir, _os.W_OK)):
continue
else:
raise
return _os.path.abspath(file)
raise FileExistsError(_errno.EEXIST,
"No usable temporary directory name found")
def mktemp(suffix="", prefix=template, dir=None):
if dir is None:
dir = gettempdir()
names = _get_candidate_names()
for seq in range(TMP_MAX):
name = next(names)
file = _os.path.join(dir, prefix + name + suffix)
if not _exists(file):
return file
raise FileExistsError(_errno.EEXIST,
"No usable temporary filename found")
class _TemporaryFileCloser:
cleanup_called = False
close_called = False
def __init__(
self,
file,
name,
delete=True,
delete_on_close=True,
warn_message="Implicitly cleaning up unknown file",
):
self.file = file
self.name = name
self.delete = delete
self.delete_on_close = delete_on_close
self.warn_message = warn_message
def cleanup(self, windows=(_os.name == 'nt'), unlink=_os.unlink):
if not self.cleanup_called:
self.cleanup_called = True
try:
if not self.close_called:
self.close_called = True
self.file.close()
finally:
if self.delete and not (windows and self.delete_on_close):
try:
unlink(self.name)
except FileNotFoundError:
pass
def close(self):
if not self.close_called:
self.close_called = True
try:
self.file.close()
finally:
if self.delete and self.delete_on_close:
self.cleanup()
def __del__(self):
close_called = self.close_called
self.cleanup()
if not close_called:
_warnings.warn(self.warn_message, ResourceWarning)
class _TemporaryFileWrapper:
def __init__(self, file, name, delete=True, delete_on_close=True):
self.file = file
self.name = name
self._closer = _TemporaryFileCloser(
file,
name,
delete,
delete_on_close,
warn_message=f"Implicitly cleaning up {self!r}",
)
def __repr__(self):
file = self.__dict__['file']
return f"<{type(self).__name__} {file=}>"
def __getattr__(self, name):
file = self.__dict__['file']
a = getattr(file, name)
if hasattr(a, '__call__'):
func = a
@_functools.wraps(func)
def func_wrapper(*args, **kwargs):
return func(*args, **kwargs)
func_wrapper._closer = self._closer
a = func_wrapper
if not isinstance(a, int):
setattr(self, name, a)
return a
def __enter__(self):
self.file.__enter__()
return self
def __exit__(self, exc, value, tb):
result = self.file.__exit__(exc, value, tb)
self._closer.cleanup()
return result
def close(self):
self._closer.close()
def __iter__(self):
for line in self.file:
yield line
def NamedTemporaryFile(mode='w+b', buffering=-1, encoding=None,
newline=None, suffix=None, prefix=None,
dir=None, delete=True, *, errors=None,
delete_on_close=True):
prefix, suffix, dir, output_type = _sanitize_params(prefix, suffix, dir)
flags = _bin_openflags
if _os.name == 'nt' and delete and delete_on_close:
flags |= _os.O_TEMPORARY
if "b" not in mode:
encoding = _io.text_encoding(encoding)
name = None
def opener(*args):
nonlocal name
fd, name = _mkstemp_inner(dir, prefix, suffix, flags, output_type)
return fd
try:
file = _io.open(dir, mode, buffering=buffering,
newline=newline, encoding=encoding, errors=errors,
opener=opener)
try:
raw = getattr(file, 'buffer', file)
raw = getattr(raw, 'raw', raw)
raw.name = name
return _TemporaryFileWrapper(file, name, delete, delete_on_close)
except:
file.close()
raise
except:
if name is not None and not (
_os.name == 'nt' and delete and delete_on_close):
_os.unlink(name)
raise
if _os.name != 'posix' or _sys.platform == 'cygwin':
TemporaryFile = NamedTemporaryFile
else:
_O_TMPFILE_WORKS = hasattr(_os, 'O_TMPFILE')
def TemporaryFile(mode='w+b', buffering=-1, encoding=None,
newline=None, suffix=None, prefix=None,
dir=None, *, errors=None):
global _O_TMPFILE_WORKS
if "b" not in mode:
encoding = _io.text_encoding(encoding)
prefix, suffix, dir, output_type = _sanitize_params(prefix, suffix, dir)
flags = _bin_openflags
if _O_TMPFILE_WORKS:
fd = None
def opener(*args):
nonlocal fd
flags2 = (flags | _os.O_TMPFILE) & ~_os.O_CREAT
fd = _os.open(dir, flags2, 0o600)
return fd
try:
file = _io.open(dir, mode, buffering=buffering,
newline=newline, encoding=encoding,
errors=errors, opener=opener)
raw = getattr(file, 'buffer', file)
raw = getattr(raw, 'raw', raw)
raw.name = fd
return file
except IsADirectoryError:
_O_TMPFILE_WORKS = False
except OSError:
pass
fd = None
def opener(*args):
nonlocal fd
fd, name = _mkstemp_inner(dir, prefix, suffix, flags, output_type)
try:
_os.unlink(name)
except BaseException as e:
_os.close(fd)
raise
return fd
file = _io.open(dir, mode, buffering=buffering,
newline=newline, encoding=encoding, errors=errors,
opener=opener)
raw = getattr(file, 'buffer', file)
raw = getattr(raw, 'raw', raw)
raw.name = fd
return file
class SpooledTemporaryFile(_io.IOBase):
_rolled = False
def __init__(self, max_size=0, mode='w+b', buffering=-1,
encoding=None, newline=None,
suffix=None, prefix=None, dir=None, *, errors=None):
if 'b' in mode:
self._file = _io.BytesIO()
else:
encoding = _io.text_encoding(encoding)
self._file = _io.TextIOWrapper(_io.BytesIO(),
encoding=encoding, errors=errors,
newline=newline)
self._max_size = max_size
self._rolled = False
self._TemporaryFileArgs = {'mode': mode, 'buffering': buffering,
'suffix': suffix, 'prefix': prefix,
'encoding': encoding, 'newline': newline,
'dir': dir, 'errors': errors}
__class_getitem__ = classmethod(_types.GenericAlias)
def _check(self, file):
if self._rolled: return
max_size = self._max_size
if max_size and file.tell() > max_size:
self.rollover()
def rollover(self):
if self._rolled: return
file = self._file
newfile = self._file = TemporaryFile(**self._TemporaryFileArgs)
del self._TemporaryFileArgs
pos = file.tell()
if hasattr(newfile, 'buffer'):
newfile.buffer.write(file.detach().getvalue())
else:
newfile.write(file.getvalue())
newfile.seek(pos, 0)
self._rolled = True
def __enter__(self):
if self._file.closed:
raise ValueError("Cannot enter context with closed file")
return self
def __exit__(self, exc, value, tb):
self._file.close()
def __iter__(self):
return self._file.__iter__()
def __del__(self):
if not self.closed:
_warnings.warn(
"Unclosed file {!r}".format(self),
ResourceWarning,
stacklevel=2,
source=self
)
self.close()
def close(self):
self._file.close()
@property
def closed(self):
return self._file.closed
@property
def encoding(self):
return self._file.encoding
@property
def errors(self):
return self._file.errors
def fileno(self):
self.rollover()
return self._file.fileno()
def flush(self):
self._file.flush()
def isatty(self):
return self._file.isatty()
@property
def mode(self):
try:
return self._file.mode
except AttributeError:
return self._TemporaryFileArgs['mode']
@property
def name(self):
try:
return self._file.name
except AttributeError:
return None
@property
def newlines(self):
return self._file.newlines
def readable(self):
return self._file.readable()
def read(self, *args):
return self._file.read(*args)
def read1(self, *args):
return self._file.read1(*args)
def readinto(self, b):
return self._file.readinto(b)
def readinto1(self, b):
return self._file.readinto1(b)
def readline(self, *args):
return self._file.readline(*args)
def readlines(self, *args):
return self._file.readlines(*args)
def seekable(self):
return self._file.seekable()
def seek(self, *args):
return self._file.seek(*args)
def tell(self):
return self._file.tell()
def truncate(self, size=None):
if size is None:
return self._file.truncate()
else:
if size > self._max_size:
self.rollover()
return self._file.truncate(size)
def writable(self):
return self._file.writable()
def write(self, s):
file = self._file
rv = file.write(s)
self._check(file)
return rv
def writelines(self, iterable):
if self._max_size == 0 or self._rolled:
return self._file.writelines(iterable)
it = iter(iterable)
for line in it:
self.write(line)
if self._rolled:
return self._file.writelines(it)
def detach(self):
return self._file.detach()
class TemporaryDirectory:
def __init__(self, suffix=None, prefix=None, dir=None,
ignore_cleanup_errors=False, *, delete=True):
self.name = mkdtemp(suffix, prefix, dir)
self._ignore_cleanup_errors = ignore_cleanup_errors
self._delete = delete
self._finalizer = _weakref.finalize(
self, self._cleanup, self.name,
warn_message="Implicitly cleaning up {!r}".format(self),
ignore_errors=self._ignore_cleanup_errors, delete=self._delete)
@classmethod
def _rmtree(cls, name, ignore_errors=False, repeated=False):
def onexc(func, path, exc):
if isinstance(exc, PermissionError):
if repeated and path == name:
if ignore_errors:
return
raise
try:
if path != name:
_resetperms(_os.path.dirname(path))
_resetperms(path)
try:
_os.unlink(path)
except IsADirectoryError:
cls._rmtree(path, ignore_errors=ignore_errors)
except PermissionError:
if not _os.path.isdir(path) or _os.path.isjunction(path):
if ignore_errors:
return
raise
cls._rmtree(path, ignore_errors=ignore_errors,
repeated=(path == name))
except FileNotFoundError:
pass
elif isinstance(exc, FileNotFoundError):
pass
else:
if not ignore_errors:
raise
_shutil.rmtree(name, onexc=onexc)
@classmethod
def _cleanup(cls, name, warn_message, ignore_errors=False, delete=True):
if delete:
cls._rmtree(name, ignore_errors=ignore_errors)
_warnings.warn(warn_message, ResourceWarning)
def __repr__(self):
return "<{} {!r}>".format(self.__class__.__name__, self.name)
def __enter__(self):
return self.name
def __exit__(self, exc, value, tb):
if self._delete:
self.cleanup()
def cleanup(self):
if self._finalizer.detach() or _os.path.exists(self.name):
self._rmtree(self.name, ignore_errors=self._ignore_cleanup_errors)
__class_getitem__ = classmethod(_types.GenericAlias)