import argparse
import datetime
import os
import re
import shutil
import subprocess
import sys
import tempfile
from argparse import Namespace
from itertools import zip_longest
from pathlib import Path
from subprocess import Popen, PIPE, STDOUT
from tempfile import NamedTemporaryFile
from typing import Any, Generator, Optional, TextIO, Union
from psutil import Process
type AnyIO = Union[TextIO, Any]
type AnyPath = Union[Path, str]
def run_process(*command):
command = [str(x) for x in command]
subprocess.run(command)
def test_bash() -> bool:
process = Process()
while process:
if process.name() == 'bash':
return True
process = process.parent()
return False
class Directory:
def __init__(self, newdir: AnyPath):
self.olddir = Path.cwd()
self.newdir = newdir
def __enter__(self):
os.chdir(self.newdir)
def __exit__(self, exc_type, exc_value, exc_trace):
os.chdir(self.olddir)
class FileSystem:
def __init__(self):
self.user = os.environ.get('USER')
self.bash = test_bash()
self.home = Path.home()
self.temp = Path(tempfile.gettempdir()) / 'ex'
self.attribs = dict()
def remove_temp(self):
try:
shutil.rmtree(self.temp)
print(f'Removed {self.temp}')
except FileNotFoundError:
pass
def create_temp(self):
os.mkdir(self.temp)
def create_dir(self, mode: int, root: bool, size: int, date: str, path: AnyPath, link: Optional[str]):
path = self.append_path(path)
if link:
link = self.append_path(link)
try:
self.create_dir(mode, root, size, date, link, None)
os.symlink(link, path)
except FileExistsError:
pass
else:
try:
os.mkdir(path)
print(f'Created {path}')
except FileExistsError:
pass
self.defer_attribs(mode, root, date, path)
def change_dir(self, path: AnyPath):
path = self.append_path(path)
return Directory(path)
def create_file(self, mode: int, root: bool, size: int, date: str, sig: str, path: AnyPath, link: Optional[str]):
path = self.append_path(path)
if link:
link = self.append_path(link)
try:
self.create_file(mode, root, size, date, sig, link, None)
os.symlink(link, path)
except FileExistsError:
pass
else:
with open(path, 'wb') as file:
if len(sig) == 1:
data = sig.encode() * size
file.write(data)
else:
sig = sig.encode()
file.write(sig)
if len(sig) < size:
data = bytes(size - len(sig))
file.write(data)
print(f'Created {path}')
self.defer_attribs(mode, root, date, path)
def append_file(self, path: AnyPath):
path = self.append_path(path)
with open(path, 'ab') as file:
file.write(b'.')
def rename_ref(self, old: AnyPath, new: AnyPath):
old = self.append_path(old)
new = self.append_path(new)
if attrib := self.attribs.pop(old, None):
self.attribs[new] = attrib
def create_zip(self, mode: int, root: bool, _size: int, date: str, path: AnyPath, zformat: str):
path = str(self.append_path(path))
path = shutil.make_archive(path, zformat, path)
print(f'Created {path}')
self.set_time(date, path)
self.defer_attribs(mode, root, None, path)
def create_7zip(self, mode: int, root: bool, _size: int, date: str, path: AnyPath):
import py7zr base = self.append_path(path)
path = base.with_suffix('.7z')
with py7zr.SevenZipFile(path, 'w') as arcfile:
for parent, subdirs, files in base.walk():
for subdir in subdirs:
subdir = parent /subdir
arcname = str(subdir.relative_to(base))
arcfile.write(subdir, arcname)
for file in files:
file = parent / file
arcname = str(file.relative_to(base))
arcfile.write(file, arcname)
print(f'Created {path}')
self.set_time(date, path)
self.defer_attribs(mode, root, None, path)
def defer_attribs(self, mode: int, root: bool, date: Optional[str], path: AnyPath):
self.attribs[path] = date, root
os.chmod(path, mode)
def apply_attribs(self):
for path, (date, root) in reversed(self.attribs.items()):
if date:
self.set_time(date, path)
if root and self.bash:
run_process('sudo', 'chown', 'root', path)
run_process('sudo', 'chgrp', 'root', path)
def append_path(self, path: str) -> Path:
return self.temp / path
@classmethod
def set_time(cls, date: str, path: AnyPath):
date = datetime.date.fromisoformat(date)
time = datetime.datetime.combine(date, datetime.datetime.min.time())
time = time.timestamp()
os.utime(path, (time, time))
class PushbackIO:
def __init__(self, lines: list[str]):
self.lines = lines
self.index = 0
def __iter__(self):
return self
def __next__(self):
if self.index < len(self.lines):
line = self.lines[self.index]
self.index += 1
return line
else:
raise StopIteration
def pushback(self):
self.index = max(self.index - 1, 0)
class FileModifier:
def __init__(self, system: FileSystem):
script = Path(__file__)
self.program = script.parent.parent / 'target' / 'debug' / 'ex'
self.system = system
def modify_file(self, path: Path, stdout: bool):
if stdout:
with open(path, 'r') as reader:
self.modify_stream(reader, sys.stdout)
else:
dirty = False
with open(path, 'r') as reader:
with NamedTemporaryFile('w', dir=path.parent) as writer:
if self.modify_stream(reader, writer):
dirty = True
writer._closer.delete = False
if dirty:
print(path)
os.rename(writer.name, path)
@classmethod
def read_stream(cls, reader: TextIO) -> tuple[list[str], list[tuple]]:
lines = list()
toc = list()
for line in reader:
lines.append(line)
if match := re.search(r'^(##+) (.+)$', line):
indent = len(match.group(1)) - 2
heading = match.group(2)
toc.append((indent, heading))
return lines, toc
def modify_stream(self, reader: TextIO, writer: AnyIO) -> bool:
dirty = False
quoted = False
lines, toc = self.read_stream(reader)
reader = PushbackIO(lines)
for line in reader:
print(line, end='', file=writer)
if quoted:
if match := re.search(r'^(?:~/(\w+) \$|C:\\Users\\username\\(\w+)>) ex(?:\.exe)? ?(.*)$', line):
linux, windows, args = match.groups()
args = args.split()
if 'xargs' not in args:
if linux:
olds = self.read_quoted(reader)
news = self.read_process(linux, args, False)
dirty |= self.modify_chunk(writer, olds, news)
if windows:
olds = self.read_quoted(reader)
news = self.read_process(windows, args, True)
dirty |= self.modify_chunk(writer, olds, news)
quoted = False
else:
if re.search(r'^```$', line):
quoted = True
elif re.search(r'^#+ Contents$', line):
olds = self.read_contents(reader)
news = self.create_contents(toc)
dirty |= self.modify_chunk(writer, olds, news)
return dirty
@classmethod
def modify_chunk(cls, writer: AnyIO, olds: Generator[str, Any, None], news: Generator[str, Any, None]):
dirty = False
for old, new in zip_longest(olds, news):
if new is not None:
print(new, end='', file=writer)
dirty |= (new != old)
elif old is not None:
dirty = True
return dirty
@classmethod
def read_quoted(cls, reader: PushbackIO):
for line in reader:
if re.search(r'^```$', line):
reader.pushback()
break
yield line
def read_process(self, directory: str, args: list[str], windows: bool):
context = ProcessContext(args)
command = [self.program, *context.args]
local = str(self.system.temp / directory)
replace = str(self.system.home.parent / 'username')
regex = re.compile(rf'{self.system.temp}\b')
process = Popen(command, stdout=PIPE, stderr=STDOUT, cwd=local, text=True)
for line in process.stdout:
line = regex.sub(replace, line)
if windows or context.windows:
line = re.sub(r'^([dl-])([r-][w-][x-]).{6}', r'\1\2\2\2', line)
line = re.sub(r'/', r'\\', line)
line = re.sub(r'\\home\b', r'C:\\Users', line)
if context.version:
if match := re.search(r'^(\S+\s+\d+\s+\S+\s+\S+\s+\S+\s+)((\.\w+)\s+\S+)$', line):
prefix, suffix, ext = match.groups()
if ext == '.exe':
insert = '2.1.0.999 '
elif ext == '.dll':
insert = '2.1.0.1001 '
else:
insert = ' '
line = f'{prefix}{insert}{suffix}\n'
if context.owner:
if match := re.search(rf'^(\S+\s+){self.system.user}(\s+){self.system.user}(.+)$', line):
prefix, padding, suffix = match.groups()
line = f'{prefix}username{padding}username{suffix}\n'
if context.decrypt:
if match := re.search(r'\b(\S+)$', line):
filename = match.group(1)
prompt = Path(replace) / directory / filename
prompt = f'Password for {prompt}?\n'
yield prompt
yield prompt
context.decrypt = False
yield line
@classmethod
def read_contents(cls, reader: PushbackIO):
for line in reader:
if re.search(r'^#+', line):
reader.pushback()
break
yield line
@classmethod
def create_contents(cls, toc: list[tuple]):
yield '\n'
for indent, heading in toc:
indent = ' ' * indent
link = '-'.join(heading.split()).lower()
yield f'{indent}* [{heading}](#{link})\n'
yield '\n'
class ProcessContext:
def __init__(self, args: list[str]):
self.args = list()
self.windows = False
self.version = False
self.owner = False
self._decrypt1 = False
self._decrypt2 = False
self._decrypt3 = False
self.import_args(args)
@property
def decrypt(self) -> bool:
return self._decrypt1 and self._decrypt2 and not self._decrypt3
@decrypt.setter
def decrypt(self, value: bool):
self._decrypt1 = value
self._decrypt2 = value
self._decrypt3 = value
def import_args(self, args: list[str]):
piped = False
for arg in args:
if arg == '|':
piped = True
break
if match := re.search(r'^-(\w*)w(\w*)$', arg):
prefix, suffix = match.groups()
arg = f'-{prefix}{suffix}'
self.windows = True
if match := re.search(r'^-(\w*)v(\w*)$', arg):
prefix, suffix = match.groups()
arg = f'-{prefix}{suffix}'
self.version = True
if arg == '--owner':
self.owner = True
if re.search(r'^-(\w*)z(\w*)$', arg):
self._decrypt1 = True
if arg == '--sig':
self._decrypt2 = True
if arg == '--password':
self._decrypt3 = True
if arg not in ('-', ''):
self.args.append(arg)
if not piped:
self.args.append('--terminal')
self.args.extend(['--now', '2025-01-01T00:00:00Z'])
def parse_args() -> Namespace:
parser = argparse.ArgumentParser()
parser.add_argument('paths', nargs='*', default=[], help='file to modify')
parser.add_argument('--stdout', action='store_true', help='write to standard output')
parser.add_argument('-c', '--create', action='store_true', help='create example directories')
parser.add_argument('-r', '--remove', action='store_true', help='remove example directories')
return parser.parse_args()
def run_main():
settings = parse_args()
system = FileSystem()
if settings.create or settings.remove:
system.remove_temp()
if settings.create:
system.create_temp()
system.create_dir(0o755, False, 0, '2024-12-01', 'bin', None)
system.create_file(0o777, False, 123000, '2024-12-01', '', 'bin/binary.exe', None)
system.create_file(0o666, False, 45000, '2024-12-01', '', 'bin/library.dll', None)
system.create_file(0o666, False, 678, '2024-12-01', '', 'bin/README.txt', None)
system.create_dir(0o755, False, 0, '2024-12-01', 'code', None)
system.create_file(0o664, False, 0, '2024-12-31', 'Cargo.lock\n', 'code/.gitignore', None)
system.create_file(0o664, False, 455, '2024-12-31', 'A', 'code/build.rs', None)
system.create_file(0o664, False, 1144, '2024-12-31', 'B', 'code/Cargo.toml', None)
system.create_file(0o664, False, 27288, '2024-12-31', 'C', 'code/README.md', None)
system.create_dir(0o755, False, 0, '2024-12-01', 'code/src', None)
system.create_file(0o664, False, 69983, '2024-12-31', 'D', 'code/src/config.rs', None)
system.create_file(0o664, False, 2399, '2024-12-31', 'E', 'code/src/error.rs', None)
system.create_file(0o664, False, 48483, '2024-12-31', 'F', 'code/src/finder.rs', None)
system.create_file(0o664, False, 132, '2024-12-31', 'G', 'code/src/lib.rs', None)
system.create_file(0o664, False, 5808, '2024-12-31', 'H', 'code/src/main.rs', None)
system.create_file(0o664, False, 203844, '2024-12-31', 'I', 'code/src/printer.rs', None)
system.create_file(0o664, False, 38464, '2024-12-31', 'J', 'code/src/sorter2.rs', None)
with system.change_dir('code'):
run_process('git', 'init')
run_process('git', 'add', '.gitignore', '*')
run_process('git', 'commit', '-m', 'Initial commit')
run_process('git', 'mv', 'src/sorter2.rs', 'src/sorter.rs')
system.rename_ref('code/src/sorter2.rs', 'code/src/sorter.rs')
system.create_file(0o664, False, 49254, '2024-12-31', 'K', 'code/Cargo.lock', None)
system.create_file(0o664, False, 177, '2024-12-31', 'L', 'code/notes.txt', None)
system.create_file(0o664, False, 48463, '2024-12-31', 'M', 'code/src/option.rs', None)
system.append_file('code/src/printer.rs')
system.append_file('code/src/sorter.rs')
with system.change_dir('code'):
run_process('git', 'add', 'src/option.rs', 'src/printer.rs', 'src/sorter.rs')
system.create_dir(0o755, False, 0, '2024-12-01', 'example', None)
system.create_dir(0o755, False, 0, '2024-12-01', 'numbers', None)
system.create_file(0o744, True, 10, '2024-11-01', '#!/u', 'example/find.sh', None)
system.create_dir(0o755, False, 0, '2024-12-31', 'example/.hidden', None)
system.create_file(0o744, False, 15, '2024-12-31', '', 'example/.hidden/password.dat', None)
system.create_file(0o744, False, 15, '2024-12-31', '', 'example/.hidden/secret.dat', None)
system.create_dir(0o755, False, 0, '2024-12-31', 'example/files', None)
system.create_dir(0o755, False, 0, '2024-12-31', 'example/files/colours', None)
system.create_file(0o744, False, 20, '2024-10-01', '#!/u', 'example/files/colours/alpha.sh', None)
system.create_file(0o644, False, 30, '2024-09-01', 'BLUE', 'example/files/colours/blue.txt', None)
system.create_file(0o644, False, 40, '2024-08-01', 'GREE', 'example/files/colours/green.txt', None)
system.create_file(0o644, False, 50, '2024-07-01', 'RED', 'example/files/colours/red.txt', None)
system.create_dir(0o755, False, 0, '2024-12-31', 'example/files/numbers', None)
system.create_file(0o744, False, 60, '2024-06-01', '#!/u', 'example/files/numbers/count.sh', 'numbers/count.sh')
system.create_file(0o644, False, 999999, '2024-05-01', '', 'example/files/numbers/googolplex.gz', 'numbers/googolplex.gz')
system.create_dir(0o644, False, 0, '2024-04-01', 'example/files/numbers/ordinals', 'numbers/ordinals')
system.create_dir(0o755, False, 0, '2024-12-31', 'example/files/numbers/one two', None)
system.create_file(0o644, False, 70, '2024-03-01', '34', 'example/files/numbers/one two/"three" \'four\'.txt', None)
system.create_dir(0o755, False, 0, '2024-12-01', 'ordered', None)
system.create_file(0o664, False, 0, '2024-01-01', '', 'ordered/file8.txt', None)
system.create_file(0o664, False, 0, '2024-01-01', '', 'ordered/file9.txt', None)
system.create_file(0o664, False, 0, '2024-01-01', '', 'ordered/file10.txt', None)
system.create_file(0o664, False, 0, '2024-01-01', '', 'ordered/file11.txt', None)
system.create_file(0o664, False, 0, '2024-01-01', '', 'ordered/file98.txt', None)
system.create_file(0o664, False, 0, '2024-01-01', '', 'ordered/file99.txt', None)
system.create_file(0o664, False, 0, '2024-01-01', '', 'ordered/file100.txt', None)
system.create_file(0o664, False, 0, '2024-01-01', '', 'ordered/file101.txt', None)
system.create_dir(0o755, False, 0, '2024-12-01', 'zipped', None)
system.create_dir(0o755, False, 0, '2024-12-31', 'zipped/backup', None)
system.create_dir(0o755, False, 0, '2024-12-31', 'zipped/backup/.hidden', None)
system.create_file(0o664, False, 100, '2024-01-01', 'ABCD', 'zipped/backup/file.txt', None)
system.create_file(0o664, False, 100, '2024-01-01', 'EFGH', 'zipped/backup/.hidden/hidden.txt', None)
system.apply_attribs()
system.create_7zip(0o644, False, 0, '2024-01-01', 'zipped/backup')
system.create_zip(0o644, False, 0, '2024-01-01', 'zipped/backup', 'tar')
system.create_zip(0o644, False, 0, '2024-01-01', 'zipped/backup', 'gztar')
system.create_zip(0o644, False, 0, '2024-01-01', 'zipped/backup', 'zip')
else:
modifier = FileModifier(system)
if settings.paths:
for path in settings.paths:
path = Path(path)
modifier.modify_file(path, settings.stdout)
else:
modifier.modify_stream(sys.stdin, sys.stdout)
try:
run_main()
except (OSError, KeyboardInterrupt) as error:
print(error, file=sys.stderr)