import functools
import os
import os.path
import re
import subprocess
import sys
import tempfile
import threading
import time
from typing import Callable, List
home_path: str
data_path: str
bitbottle: str
unbottle: str
all_tests: List[Callable] = []
def exec(*args: str, **kw) -> subprocess.CompletedProcess:
print("+", " ".join(args))
if "delay" in kw:
delay = kw["delay"]
del kw["delay"]
r_stdin, w_stdin = os.pipe()
input: str = kw.get("input") or ""
del kw["input"]
kw["stdin"] = r_stdin
def other_thread():
time.sleep(delay)
os.write(w_stdin, input.encode("utf-8"))
threading.Thread(target = other_thread).start()
expected_status = kw.get("expected_status", 0)
kw.pop("expected_status", None)
rv = subprocess.run(args, **kw)
if rv.returncode != expected_status:
print("*** Fail.")
sys.exit(rv.returncode)
return rv
def test(f: Callable) -> Callable:
@functools.wraps(f)
def new_f():
with tempfile.TemporaryDirectory() as temp_path:
os.chdir(temp_path)
f()
os.chdir(home_path)
all_tests.append(new_f)
return new_f
@test
def test_basic():
"archive the source folder and ensure it expands identically"
count = 1
for compress in [ "--no-compress", "--snappy", "--lzma2" ]:
exec(bitbottle, compress, "-o", f"./test{count}.bb", "-C", home_path, "src")
exec(unbottle, "--check", f"./test{count}.bb")
exec(unbottle, "-d", f"./test{count}", f"./test{count}.bb")
exec("diff", "-r", f"{home_path}/src", f"./test{count}/src")
count += 1
assert(os.stat("./test1.bb").st_size > os.stat("./test2.bb").st_size)
assert(os.stat("./test2.bb").st_size > os.stat("./test3.bb").st_size)
@test
def test_hashes():
"other hash functions are fine"
count = 1
for hash in [ "--blake2", "--sha256" ]:
exec(bitbottle, hash, "-o", f"./test{count}.bb", "-C", home_path, "src")
exec(unbottle, "--check", f"./test{count}.bb")
exec(unbottle, "-d", f"./test{count}", f"./test{count}.bb")
exec("diff", "-r", f"{home_path}/src", f"./test{count}/src")
count += 1
@test
def test_encryption():
"encrypt the source folder and ensure it expands identically"
for encrypt in [ "--no-compress", "--aes" ]:
exec(bitbottle, encrypt, "--password", "--pipe-password", "-o", f"./test-pass.bb", "-C", home_path, "src", text = True, input="hello\n")
exec(unbottle, "--password", "--pipe-password", "-d", f"./test-pass", f"./test-pass.bb", text = True, input="hello\n")
exec("diff", "-r", f"{home_path}/src", f"./test-pass/src")
exec(bitbottle, encrypt, "-e", f"{data_path}/test-key.pub", "-o", f"./test-ssh.bb", "-C", home_path, "src")
exec(unbottle, "-s", f"{data_path}/test-key", "-d", f"./test-ssh", f"./test-ssh.bb")
exec("diff", "-r", f"{home_path}/src", f"./test-ssh/src")
@test
def test_key_commitment():
"require key commitment on encrypted archives"
status = exec(unbottle, "-i", f"{data_path}/test-no-commit.bb", "-s", f"{data_path}/test-key", expected_status=1).returncode
assert(status == 1)
exec(unbottle, "-i", f"{data_path}/test-no-commit.bb", "-s", f"{data_path}/test-key", "--old", capture_output=True, text=True)
@test
def test_ssh_password_encryption():
"encrypt using an SSH key protected by a password"
exec(bitbottle, "-e", f"{data_path}/test-key-pw.pub", "-o", f"./test-ssh.bb", "-C", home_path, "src")
exec(unbottle, "-s", f"{data_path}/test-key-pw", "--pipe-password", "-d", f"./test-ssh", f"./test-ssh.bb", text = True, input="password\n")
exec("diff", "-r", f"{home_path}/src", f"./test-ssh/src")
@test
def test_sign_and_verify():
"sign an archive and verify the signature"
exec(bitbottle, "--sign", f"{data_path}/test-key", "-o", f"./test-sign.bb", "-C", home_path, "src")
output: List[str] = exec(unbottle, "--verify", f"{data_path}/test-key.pub", "--info", f"./test-sign.bb", capture_output=True, text=True).stderr.split("\n")
assert(any(re.search(r"signed by robey@togusa", line) for line in output))
assert(any(re.search(r"Signature is VALID", line) for line in output))
@test
def test_verify_no_signature():
"refuse to verify an unsigned archive"
exec(bitbottle, "-o", f"./test-sign.bb", "-C", home_path, "src")
status = exec(unbottle, "--verify", f"{data_path}/test-key.pub", "--info", f"./test-sign.bb", expected_status=1).returncode
assert(status == 1)
@test
def test_verify_wrong_signature():
"refuse to verify an archive signed by the wrong key"
exec(bitbottle, "--sign", f"{data_path}/test-key", "-o", f"./test-sign.bb", "-C", home_path, "src")
status = exec(unbottle, "--verify", f"{data_path}/test-key-pw.pub", "--info", f"./test-sign.bb", expected_status=1).returncode
assert(status == 1)
@test
def test_relative_paths():
"ensure absolute paths are stored as relative"
exec(bitbottle, "-o", "test.bb", f"{home_path}/src")
output: List[str] = exec(unbottle, "--info", "test.bb", capture_output = True, text = True).stderr.split("\n")
assert(any(re.search(r" src/file_atlas.rs", line) for line in output))
exec(unbottle, "-d", "./test-rel", "test.bb")
exec("diff", "-r", f"{home_path}/src", f"./test-rel/src")
@test
def test_defang_unbottle():
"defang bad paths when unbottling"
exec(unbottle, "-d", "./test-defang", f"{data_path}/message.bb")
assert(os.path.exists(f"./test-defang/tmp/message.jpg"))
@test
def test_zero_length_file():
"don't write contents for a zero-length file"
open("zero.txt", "wb").write(b"")
open("one.txt", "wb").write(b"x")
exec(bitbottle, "-o", "test.bb", "zero.txt", "one.txt")
output: List[str] = exec(unbottle, "--dump", "test.bb", capture_output = True, text = True).stderr.split("\n")
assert(len([line for line in output if re.search(r"Bitbottle type 3:", line)]) == 1)
exec(unbottle, "-d", "test-out", "test.bb")
exec("test", "-f", "test-out/zero.txt")
@test
def test_bad_symlink():
"don't archive a bad symlink"
exec("mkdir", "-p", "inner")
open("inner/good.txt", "wb").write(b"")
exec("ln", "-s", "../../foo", "inner/bad")
exec(bitbottle, "-o", "test.bb", "inner")
output: str = exec(unbottle, "--info", "-v", "test.bb", capture_output = True, text = True).stderr
assert(re.search(r"inner/bad", output) is None)
exec(unbottle, "-d", "test-out", "test.bb")
assert(os.path.exists("test-out/inner/good.txt"))
assert(not os.path.exists("test-out/inner/bad"))
@test
def test_good_symlink():
"archive a good symlink"
exec("mkdir", "-p", "inner")
exec("mkdir", "-p", "inner/deep")
open("inner/good.txt", "wb").write(b"good")
exec("ln", "-s", "../good.txt", "inner/deep/ok.txt")
exec(bitbottle, "-o", "test.bb", "inner")
exec(unbottle, "-d", "test-out", "test.bb")
assert(os.path.exists("test-out/inner/good.txt"))
assert(os.path.exists("test-out/inner/deep/ok.txt"))
assert(open("test-out/inner/good.txt", "rb").read() == b"good")
assert(open("test-out/inner/deep/ok.txt", "rb").read() == b"good")
if __name__ == "__main__":
home_path = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
data_path = os.path.join(os.getcwd(), "tests/data")
bitbottle = os.path.join(home_path, "target/release/bitbottle")
unbottle = os.path.join(home_path, "target/release/unbottle")
for test in all_tests:
print()
print(f"\x1b[1m*** {test.__doc__ or '?'}\x1b[0m")
print()
test()
print()
print("\x1b[1m*** ALL TESTS PASS. :)\x1b[0m")
print()