from abc import ABC, abstractmethod
from glob import _PathGlobber
from io import text_encoding
from pathlib._os import magic_open, ensure_distinct_paths, ensure_different_files, copyfileobj
from pathlib import PurePath, Path
from typing import Optional, Protocol, runtime_checkable
def _explode_path(path, split):
parent, name = split(path)
names = []
while path != parent:
names.append(name)
path = parent
parent, name = split(path)
return path, names
@runtime_checkable
class _PathParser(Protocol):
sep: str
altsep: Optional[str]
def split(self, path: str) -> tuple[str, str]: ...
def splitext(self, path: str) -> tuple[str, str]: ...
def normcase(self, path: str) -> str: ...
@runtime_checkable
class PathInfo(Protocol):
def exists(self, *, follow_symlinks: bool = True) -> bool: ...
def is_dir(self, *, follow_symlinks: bool = True) -> bool: ...
def is_file(self, *, follow_symlinks: bool = True) -> bool: ...
def is_symlink(self) -> bool: ...
class _JoinablePath(ABC):
__slots__ = ()
@property
@abstractmethod
def parser(self):
raise NotImplementedError
@abstractmethod
def with_segments(self, *pathsegments):
raise NotImplementedError
@abstractmethod
def __str__(self):
raise NotImplementedError
@property
def anchor(self):
return _explode_path(str(self), self.parser.split)[0]
@property
def name(self):
return self.parser.split(str(self))[1]
@property
def suffix(self):
return self.parser.splitext(self.name)[1]
@property
def suffixes(self):
split = self.parser.splitext
stem, suffix = split(self.name)
suffixes = []
while suffix:
suffixes.append(suffix)
stem, suffix = split(stem)
return suffixes[::-1]
@property
def stem(self):
return self.parser.splitext(self.name)[0]
def with_name(self, name):
split = self.parser.split
if split(name)[0]:
raise ValueError(f"Invalid name {name!r}")
path = str(self)
path = path.removesuffix(split(path)[1]) + name
return self.with_segments(path)
def with_stem(self, stem):
suffix = self.suffix
if not suffix:
return self.with_name(stem)
elif not stem:
raise ValueError(f"{self!r} has a non-empty suffix")
else:
return self.with_name(stem + suffix)
def with_suffix(self, suffix):
stem = self.stem
if not stem:
raise ValueError(f"{self!r} has an empty name")
elif suffix and not suffix.startswith('.'):
raise ValueError(f"Invalid suffix {suffix!r}")
else:
return self.with_name(stem + suffix)
@property
def parts(self):
anchor, parts = _explode_path(str(self), self.parser.split)
if anchor:
parts.append(anchor)
return tuple(reversed(parts))
def joinpath(self, *pathsegments):
return self.with_segments(str(self), *pathsegments)
def __truediv__(self, key):
try:
return self.with_segments(str(self), key)
except TypeError:
return NotImplemented
def __rtruediv__(self, key):
try:
return self.with_segments(key, str(self))
except TypeError:
return NotImplemented
@property
def parent(self):
path = str(self)
parent = self.parser.split(path)[0]
if path != parent:
return self.with_segments(parent)
return self
@property
def parents(self):
split = self.parser.split
path = str(self)
parent = split(path)[0]
parents = []
while path != parent:
parents.append(self.with_segments(parent))
path = parent
parent = split(path)[0]
return tuple(parents)
def full_match(self, pattern):
case_sensitive = self.parser.normcase('Aa') == 'Aa'
globber = _PathGlobber(self.parser.sep, case_sensitive, recursive=True)
match = globber.compile(pattern, altsep=self.parser.altsep)
return match(str(self)) is not None
class _ReadablePath(_JoinablePath):
__slots__ = ()
@property
@abstractmethod
def info(self):
raise NotImplementedError
@abstractmethod
def __open_rb__(self, buffering=-1):
raise NotImplementedError
def read_bytes(self):
with magic_open(self, mode='rb', buffering=0) as f:
return f.read()
def read_text(self, encoding=None, errors=None, newline=None):
encoding = text_encoding(encoding)
with magic_open(self, mode='r', encoding=encoding, errors=errors, newline=newline) as f:
return f.read()
@abstractmethod
def iterdir(self):
raise NotImplementedError
def glob(self, pattern, *, recurse_symlinks=True):
anchor, parts = _explode_path(pattern, self.parser.split)
if anchor:
raise NotImplementedError("Non-relative patterns are unsupported")
elif not parts:
raise ValueError(f"Unacceptable pattern: {pattern!r}")
elif not recurse_symlinks:
raise NotImplementedError("recurse_symlinks=False is unsupported")
case_sensitive = self.parser.normcase('Aa') == 'Aa'
globber = _PathGlobber(self.parser.sep, case_sensitive, recursive=True)
select = globber.selector(parts)
return select(self.joinpath(''))
def walk(self, top_down=True, on_error=None, follow_symlinks=False):
paths = [self]
while paths:
path = paths.pop()
if isinstance(path, tuple):
yield path
continue
dirnames = []
filenames = []
if not top_down:
paths.append((path, dirnames, filenames))
try:
for child in path.iterdir():
if child.info.is_dir(follow_symlinks=follow_symlinks):
if not top_down:
paths.append(child)
dirnames.append(child.name)
else:
filenames.append(child.name)
except OSError as error:
if on_error is not None:
on_error(error)
if not top_down:
while not isinstance(paths.pop(), tuple):
pass
continue
if top_down:
yield path, dirnames, filenames
paths += [path.joinpath(d) for d in reversed(dirnames)]
@abstractmethod
def readlink(self):
raise NotImplementedError
def copy(self, target, **kwargs):
ensure_distinct_paths(self, target)
target._copy_from(self, **kwargs)
return target.joinpath()
def copy_into(self, target_dir, **kwargs):
name = self.name
if not name:
raise ValueError(f"{self!r} has an empty name")
return self.copy(target_dir / name, **kwargs)
class _WritablePath(_JoinablePath):
__slots__ = ()
@abstractmethod
def symlink_to(self, target, target_is_directory=False):
raise NotImplementedError
@abstractmethod
def mkdir(self):
raise NotImplementedError
@abstractmethod
def __open_wb__(self, buffering=-1):
raise NotImplementedError
def write_bytes(self, data):
view = memoryview(data)
with magic_open(self, mode='wb') as f:
return f.write(view)
def write_text(self, data, encoding=None, errors=None, newline=None):
encoding = text_encoding(encoding)
if not isinstance(data, str):
raise TypeError('data must be str, not %s' %
data.__class__.__name__)
with magic_open(self, mode='w', encoding=encoding, errors=errors, newline=newline) as f:
return f.write(data)
def _copy_from(self, source, follow_symlinks=True):
stack = [(source, self)]
while stack:
src, dst = stack.pop()
if not follow_symlinks and src.info.is_symlink():
dst.symlink_to(str(src.readlink()), src.info.is_dir())
elif src.info.is_dir():
children = src.iterdir()
dst.mkdir()
for child in children:
stack.append((child, dst.joinpath(child.name)))
else:
ensure_different_files(src, dst)
with magic_open(src, 'rb') as source_f:
with magic_open(dst, 'wb') as target_f:
copyfileobj(source_f, target_f)
_JoinablePath.register(PurePath)
_ReadablePath.register(Path)
_WritablePath.register(Path)