from __future__ import print_function
import os
import re
import sys
import stat
import errno
import fnmatch
from shutil import copy2, copystat, Error
__all__ = ['mkdir_p', 'atomic_save', 'AtomicSaver', 'FilePerms',
'iter_find_files', 'copytree']
FULL_PERMS = 511 RW_PERMS = 438
_SINGLE_FULL_PERM = 7 try:
basestring
except NameError:
unicode = str basestring = (str, bytes)
def mkdir_p(path):
try:
os.makedirs(path)
except OSError as exc:
if exc.errno == errno.EEXIST and os.path.isdir(path):
return
raise
return
class FilePerms(object):
class _FilePermProperty(object):
_perm_chars = 'rwx'
_perm_set = frozenset('rwx')
_perm_val = {'r': 4, 'w': 2, 'x': 1}
def __init__(self, attribute, offset):
self.attribute = attribute
self.offset = offset
def __get__(self, fp_obj, type_=None):
if fp_obj is None:
return self
return getattr(fp_obj, self.attribute)
def __set__(self, fp_obj, value):
cur = getattr(fp_obj, self.attribute)
if cur == value:
return
try:
invalid_chars = set(str(value)) - self._perm_set
except TypeError:
raise TypeError('expected string, not %r' % value)
if invalid_chars:
raise ValueError('got invalid chars %r in permission'
' specification %r, expected empty string'
' or one or more of %r'
% (invalid_chars, value, self._perm_chars))
sort_key = (lambda c: self._perm_val[c])
new_value = ''.join(sorted(set(value),
key=sort_key, reverse=True))
setattr(fp_obj, self.attribute, new_value)
self._update_integer(fp_obj, new_value)
def _update_integer(self, fp_obj, value):
mode = 0
key = 'xwr'
for symbol in value:
bit = 2 ** key.index(symbol)
mode |= (bit << (self.offset * 3))
fp_obj._integer |= mode
def __init__(self, user='', group='', other=''):
self._user, self._group, self._other = '', '', ''
self._integer = 0
self.user = user
self.group = group
self.other = other
@classmethod
def from_int(cls, i):
i &= FULL_PERMS
key = ('', 'x', 'w', 'xw', 'r', 'rx', 'rw', 'rwx')
parts = []
while i:
parts.append(key[i & _SINGLE_FULL_PERM])
i >>= 3
parts.reverse()
return cls(*parts)
@classmethod
def from_path(cls, path):
stat_res = os.stat(path)
return cls.from_int(stat.S_IMODE(stat_res.st_mode))
def __int__(self):
return self._integer
user = _FilePermProperty('_user', 2)
"Stores the ``rwx``-formatted *user* permission."
group = _FilePermProperty('_group', 1)
"Stores the ``rwx``-formatted *group* permission."
other = _FilePermProperty('_other', 0)
"Stores the ``rwx``-formatted *other* permission."
def __repr__(self):
cn = self.__class__.__name__
return ('%s(user=%r, group=%r, other=%r)'
% (cn, self.user, self.group, self.other))
_TEXT_OPENFLAGS = os.O_RDWR | os.O_CREAT | os.O_EXCL
if hasattr(os, 'O_NOINHERIT'):
_TEXT_OPENFLAGS |= os.O_NOINHERIT
if hasattr(os, 'O_NOFOLLOW'):
_TEXT_OPENFLAGS |= os.O_NOFOLLOW
_BIN_OPENFLAGS = _TEXT_OPENFLAGS
if hasattr(os, 'O_BINARY'):
_BIN_OPENFLAGS |= os.O_BINARY
try:
import fcntl as fcntl
except ImportError:
def set_cloexec(fd):
"Dummy set_cloexec for platforms without fcntl support"
pass
else:
def set_cloexec(fd):
try:
flags = fcntl.fcntl(fd, fcntl.F_GETFD, 0)
except IOError:
pass
else:
flags |= fcntl.FD_CLOEXEC
fcntl.fcntl(fd, fcntl.F_SETFD, flags)
return
def atomic_save(dest_path, **kwargs):
return AtomicSaver(dest_path, **kwargs)
def path_to_unicode(path):
if isinstance(path, unicode):
return path
encoding = sys.getfilesystemencoding() or sys.getdefaultencoding()
return path.decode(encoding)
if os.name == 'nt':
import ctypes
from ctypes import c_wchar_p
from ctypes.wintypes import DWORD, LPVOID
_ReplaceFile = ctypes.windll.kernel32.ReplaceFile
_ReplaceFile.argtypes = [c_wchar_p, c_wchar_p, c_wchar_p,
DWORD, LPVOID, LPVOID]
def replace(src, dst):
try:
os.rename(src, dst)
return
except WindowsError as we:
if we.errno == errno.EEXIST:
pass else:
raise
src = path_to_unicode(src)
dst = path_to_unicode(dst)
res = _ReplaceFile(c_wchar_p(dst), c_wchar_p(src),
None, 0, None, None)
if not res:
raise OSError('failed to replace %r with %r' % (dst, src))
return
def atomic_rename(src, dst, overwrite=False):
"Rename *src* to *dst*, replacing *dst* if *overwrite is True"
if overwrite:
replace(src, dst)
else:
os.rename(src, dst)
return
else:
def replace(src, dst):
return os.rename(src, dst)
def atomic_rename(src, dst, overwrite=False):
"Rename *src* to *dst*, replacing *dst* if *overwrite is True"
if overwrite:
os.rename(src, dst)
else:
os.link(src, dst)
os.unlink(dst)
return
_atomic_rename = atomic_rename
replace.__doc__ = """Similar to :func:`os.replace` in Python 3.3+,
this function will atomically create or replace the file at path
*dst* with the file at path *src*.
On Windows, this function uses the ReplaceFile API for maximum
possible atomicity on a range of filesystems.
"""
class AtomicSaver(object):
_default_file_perms = RW_PERMS
def __init__(self, dest_path, **kwargs):
self.dest_path = dest_path
self.overwrite = kwargs.pop('overwrite', True)
self.file_perms = kwargs.pop('file_perms', None)
self.overwrite_part = kwargs.pop('overwrite_part', False)
self.part_filename = kwargs.pop('part_file', None)
self.rm_part_on_exc = kwargs.pop('rm_part_on_exc', True)
self.text_mode = kwargs.pop('text_mode', False) self.buffering = kwargs.pop('buffering', -1)
if kwargs:
raise TypeError('unexpected kwargs: %r' % (kwargs.keys(),))
self.dest_path = os.path.abspath(self.dest_path)
self.dest_dir = os.path.dirname(self.dest_path)
if not self.part_filename:
self.part_path = dest_path + '.part'
else:
self.part_path = os.path.join(self.dest_dir, self.part_filename)
self.mode = 'w+' if self.text_mode else 'w+b'
self.open_flags = _TEXT_OPENFLAGS if self.text_mode else _BIN_OPENFLAGS
self.part_file = None
def _open_part_file(self):
do_chmod = True
file_perms = self.file_perms
if file_perms is None:
try:
stat_res = os.stat(self.dest_path)
file_perms = stat.S_IMODE(stat_res.st_mode)
except (OSError, IOError):
file_perms = self._default_file_perms
do_chmod = False
fd = os.open(self.part_path, self.open_flags, file_perms)
set_cloexec(fd)
self.part_file = os.fdopen(fd, self.mode, self.buffering)
if do_chmod:
try:
os.chmod(self.part_path, file_perms)
except (OSError, IOError):
self.part_file.close()
raise
return
def setup(self):
if os.path.lexists(self.dest_path):
if not self.overwrite:
raise OSError(errno.EEXIST,
'Overwrite disabled and file already exists',
self.dest_path)
if self.overwrite_part and os.path.lexists(self.part_path):
os.unlink(self.part_path)
self._open_part_file()
return
def __enter__(self):
self.setup()
return self.part_file
def __exit__(self, exc_type, exc_val, exc_tb):
self.part_file.close()
if exc_type:
if self.rm_part_on_exc:
try:
os.unlink(self.part_path)
except Exception:
pass return
try:
atomic_rename(self.part_path, self.dest_path,
overwrite=self.overwrite)
except OSError:
if self.rm_part_on_exc:
try:
os.unlink(self.part_path)
except Exception:
pass raise return
def iter_find_files(directory, patterns, ignored=None):
if isinstance(patterns, basestring):
patterns = [patterns]
pats_re = re.compile('|'.join([fnmatch.translate(p) for p in patterns]))
if not ignored:
ignored = []
elif isinstance(ignored, basestring):
ignored = [ignored]
ign_re = re.compile('|'.join([fnmatch.translate(p) for p in ignored]))
for root, dirs, files in os.walk(directory):
for basename in files:
if pats_re.match(basename):
if ignored and ign_re.match(basename):
continue
filename = os.path.join(root, basename)
yield filename
return
def copy_tree(src, dst, symlinks=False, ignore=None):
names = os.listdir(src)
if ignore is not None:
ignored_names = ignore(src, names)
else:
ignored_names = set()
mkdir_p(dst)
errors = []
for name in names:
if name in ignored_names:
continue
srcname = os.path.join(src, name)
dstname = os.path.join(dst, name)
try:
if symlinks and os.path.islink(srcname):
linkto = os.readlink(srcname)
os.symlink(linkto, dstname)
elif os.path.isdir(srcname):
copytree(srcname, dstname, symlinks, ignore)
else:
copy2(srcname, dstname)
except Error as e:
errors.extend(e.args[0])
except EnvironmentError as why:
errors.append((srcname, dstname, str(why)))
try:
copystat(src, dst)
except OSError as why:
if WindowsError is not None and isinstance(why, WindowsError):
pass
else:
errors.append((src, dst, str(why)))
if errors:
raise Error(errors)
copytree = copy_tree
try:
file
except NameError:
file = object
class DummyFile(file):
def __init__(self, path, mode='r', buffering=None):
self.name = path
self.mode = mode
self.closed = False
self.errors = None
self.isatty = False
self.encoding = None
self.newlines = None
self.softspace = 0
def close(self):
self.closed = True
def fileno(self):
return -1
def flush(self):
if self.closed:
raise ValueError('I/O operation on a closed file')
return
def next(self):
raise StopIteration()
def read(self, size=0):
if self.closed:
raise ValueError('I/O operation on a closed file')
return ''
def readline(self, size=0):
if self.closed:
raise ValueError('I/O operation on a closed file')
return ''
def readlines(self, size=0):
if self.closed:
raise ValueError('I/O operation on a closed file')
return []
def seek(self):
if self.closed:
raise ValueError('I/O operation on a closed file')
return
def tell(self):
if self.closed:
raise ValueError('I/O operation on a closed file')
return 0
def truncate(self):
if self.closed:
raise ValueError('I/O operation on a closed file')
return
def write(self, string):
if self.closed:
raise ValueError('I/O operation on a closed file')
return
def writelines(self, list_of_strings):
if self.closed:
raise ValueError('I/O operation on a closed file')
return
def __next__(self):
raise StopIteration()
def __enter__(self):
if self.closed:
raise ValueError('I/O operation on a closed file')
return
def __exit__(self, exc_type, exc_val, exc_tb):
return