import json
import logging
import os
import random
import socket
import stat
import yaml
def create_file_with_given_data(ctx, filename=None, data=None):
logging.debug(f"creating file {filename} with {data!r}")
dirname = os.path.dirname(filename) or "."
os.makedirs(dirname, exist_ok=True)
with open(filename, "wb") as f:
f.write(data.encode("UTF-8"))
def create_file_with_random_data(ctx, filename=None):
N = 128
data = "".join(chr(random.randint(0, 255)) for i in range(N))
create_file_with_given_data(ctx, filename=filename, data=data)
def create_unix_socket(ctx, filename=None):
fd = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
fd.bind(filename)
def create_fifo(ctx, filename=None):
os.mkfifo(filename)
def create_cachedir_tag_in(ctx, dirpath=None):
filepath = f"{dirpath}/CACHEDIR.TAG"
logging.debug(f"creating {filepath}")
os.makedirs(dirpath, exist_ok=True)
with open(filepath, "w") as f:
f.write("Signature: 8a477f597d28d172789f06886806bc55")
def create_nonutf8_filename(ctx, dirname=None):
filename = "\x88"
os.mkdir(dirname)
open(filename, "wb").close()
def chmod_file(ctx, filename=None, mode=None):
os.chmod(filename, int(mode, 8))
def create_symlink(ctx, linkname=None, target=None):
os.symlink(target, linkname)
def create_manifest_of_live(ctx, dirname=None, manifest=None):
_create_manifest_of_directory(ctx, dirname=dirname, manifest=manifest)
def create_manifest_of_restored(ctx, dirname=None, restored=None, manifest=None):
_create_manifest_of_directory(
ctx, dirname=os.path.join(restored, "./" + dirname), manifest=manifest
)
def _create_manifest_of_directory(ctx, dirname=None, manifest=None):
runcmd_run = globals()["runcmd_run"]
runcmd_get_exit_code = globals()["runcmd_get_exit_code"]
runcmd_get_stdout = globals()["runcmd_get_stdout"]
logging.info(f"creating manifest for {dirname} in {manifest}")
runcmd_run(ctx, ["find", "-exec", "summain", "{}", "+"], cwd=dirname)
assert runcmd_get_exit_code(ctx) == 0
stdout = runcmd_get_stdout(ctx)
with open(manifest, "w") as f:
f.write(stdout)
def file_is_restored(ctx, filename=None, restored=None):
filename = os.path.join(restored, "./" + filename)
exists = os.path.exists(filename)
logging.debug(f"restored? {filename} {exists}")
assert exists
def file_is_not_restored(ctx, filename=None, restored=None):
filename = os.path.join(restored, "./" + filename)
logging.info(r"verifying that {filename} does not exist")
try:
exists = os.path.exists(filename)
except PermissionError:
exists = False
logging.debug(f"restored? {filename} {exists}")
assert not exists
def files_match(ctx, first=None, second=None):
assert_eq = globals()["assert_eq"]
f = open(first).read()
s = open(second).read()
logging.debug(f"files_match: f:\n{f}")
logging.debug(f"files_match: s:\n{s}")
assert_eq(f, s)
def convert_yaml_to_json(ctx, yaml_name=None, json_name=None):
with open(yaml_name) as f:
obj = yaml.safe_load(f)
with open(json_name, "w") as f:
json.dump(obj, f)
def match_stdout_to_json_file_superset(ctx, filename=None):
runcmd_get_stdout = globals()["runcmd_get_stdout"]
assert_eq = globals()["assert_eq"]
assert_dict_eq = globals()["assert_dict_eq"]
stdout = runcmd_get_stdout(ctx)
stdout = json.loads(stdout.strip())
obj = json.load(open(filename))
logging.debug(f"match_stdout_to_json_file: stdout={stdout!r}")
logging.debug(f"match_stdout_to_json_file: file={obj!r}")
if isinstance(obj, dict):
stdout = {key: value for key, value in stdout.items() if key in obj}
assert_dict_eq(obj, stdout)
elif isinstance(obj, list):
obj = {"key": obj}
stdout = {"key": stdout}
assert_dict_eq(obj, stdout)
assert_dict_eq(obj, stdout)
else:
assert_eq(obj, stdout)
def match_stdout_to_json_file_exactly(ctx, filename=None):
runcmd_get_stdout = globals()["runcmd_get_stdout"]
assert_eq = globals()["assert_eq"]
assert_dict_eq = globals()["assert_dict_eq"]
stdout = runcmd_get_stdout(ctx)
stdout = json.loads(stdout.strip())
obj = json.load(open(filename))
logging.debug(f"match_stdout_to_json_file: stdout={stdout!r}")
logging.debug(f"match_stdout_to_json_file: file={obj!r}")
if isinstance(obj, list):
obj = {"key": obj}
stdout = {"key": stdout}
assert_dict_eq(obj, stdout)
elif isinstance(obj, dict):
assert_dict_eq(obj, stdout)
else:
assert_eq(obj, stdout)
def manifests_match(ctx, expected=None, actual=None):
assert_eq = globals()["assert_eq"]
assert_dict_eq = globals()["assert_dict_eq"]
logging.debug(f"comparing manifests {expected} and {actual}")
expected_objs = list(yaml.safe_load_all(open(expected)))
actual_objs = list(yaml.safe_load_all(open(actual)))
logging.debug(f"there are {len(expected_objs)} and {len(actual_objs)} objects")
i = 0
while expected_objs and actual_objs:
e = expected_objs.pop(0)
a = actual_objs.pop(0)
logging.debug(f"comparing manifest objects at index {i}:")
logging.debug(f" expected: {e}")
logging.debug(f" actual : {a}")
assert_dict_eq(e, a)
i += 1
logging.debug(f"remaining expected objecvts: {expected_objs}")
logging.debug(f"remaining actual objecvts : {actual_objs}")
assert_eq(expected_objs, [])
assert_eq(actual_objs, [])
logging.debug(f"manifests {expected} and {actual} match")
def file_is_readable_by_owner(ctx, filename=None):
assert_eq = globals()["assert_eq"]
st = os.lstat(filename)
mode = stat.S_IMODE(st.st_mode)
logging.debug("file mode: %o", mode)
assert_eq(mode, 0o400)
def file_does_not_contain(ctx, filename=None, pattern=None):
data = open(filename).read()
assert pattern not in data
def files_are_different(ctx, filename1=None, filename2=None):
assert_ne = globals()["assert_ne"]
data1 = open(filename1, "rb").read()
data2 = open(filename2, "rb").read()
assert_ne(data1, data2)
def files_are_identical(ctx, filename1=None, filename2=None):
assert_eq = globals()["assert_eq"]
data1 = open(filename1, "rb").read()
data2 = open(filename2, "rb").read()
assert_eq(data1, data2)