import os
import sys
import pathlib
import subprocess
from tqdm import tqdm
def run_openocd():
openocd_process = subprocess.Popen(
['openocd', '-f', 'interface/stlink.cfg', '-f', 'target/stm32f4x.cfg'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
for line in iter(openocd_process.stderr.readline, ''):
if "Listening on port 3333 for gdb connections" in line:
break
else:
print("Failed to detect OpenOCD connection.")
sys.exit(1)
return openocd_process
import subprocess
def run_gdb(category, subcategory, file_no_ext):
gdb_process = subprocess.Popen(
['arm-none-eabi-gdb', '-q',
f'target/thumbv7em-none-eabihf/release/examples/test-{category}-{subcategory}-{file_no_ext}',
'-ex', 'target extended-remote :3333',
'-ex', 'set print asm-demangle on',
'-ex', 'monitor arm semihosting enable',
'-ex', 'load',
'-ex', 'continue'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
return gdb_process
def monitor_output(openocd_process, gdb_process):
output = []
while True:
line = openocd_process.stdout.readline() if not line: print("OpenOCD process ended or error occurred.")
break
if 'test complete!' in line:
openocd_process.terminate()
gdb_process.terminate()
break
output.append(line)
return ''.join(output)
def enumerate_tests():
repo_path = pathlib.Path(__file__).parent.resolve()
all_tests_path = os.path.join(repo_path, 'examples/tests')
tests = []
for category in os.listdir(all_tests_path):
category_path = os.path.join(all_tests_path, category)
for subcategory in os.listdir(category_path):
subcategory_path = os.path.join(category_path, subcategory)
test_files = os.listdir(subcategory_path)
rust_files = [file for file in test_files if file.endswith('.rs')]
answer_files = [file for file in test_files if file.endswith('.py') or file.endswith('.txt')]
for file_rs_ext in rust_files:
file_no_ext = os.path.basename(file_rs_ext)[:-3]
file_txt_ext = file_no_ext + '.txt'
file_py_ext = file_no_ext + '.py'
if file_txt_ext in answer_files:
tests.append((
(category, subcategory, file_no_ext),
os.path.join(subcategory_path, file_txt_ext)
))
elif file_py_ext in answer_files:
tests.append((
(category, subcategory, file_no_ext),
os.path.join(subcategory_path, file_py_ext)
))
else:
print(
f'Error: Test file {category}-{subcategory}-{file_rs_ext} does not have an answer file.',
file=sys.stderr
)
sys.exit(1)
return tests
def main():
if len(sys.argv) != 2:
print(f'Usage: python {sys.argv[0]} <features>')
sys.exit(1)
features = sys.argv[1]
tests = enumerate_tests()
for (category, subcategory, file_no_ext), answer in tqdm(tests):
build_result = subprocess.run([
'cargo', 'build',
'--features', features,
'--release',
'--example', f'test-{category}-{subcategory}-{file_no_ext}'
], capture_output=True)
if build_result.returncode != 0:
print(
f'Error: Test case {category}-{subcategory}-{file_no_ext} failed to build.',
file=sys.stderr
)
print('Output from stdout:', file=sys.stderr)
print(str(build_result.stdout, encoding='utf-8'))
print('Output from stderr:', file=sys.stderr)
print(str(build_result.stderr, encoding='utf-8'))
exit(1)
openocd_process = run_openocd()
gdb_process = run_gdb(category, subcategory, file_no_ext)
output = monitor_output(openocd_process, gdb_process)
if answer.endswith('.txt'):
with open(answer, 'rb') as f:
answer = f.read().decode('utf-8')
if answer != output:
print(
f'Error: Test case {category}-{subcategory}-{file_no_ext} failed to provide correct output.\nExpeted:\n{answer}\nGot:\n{output}',
file=sys.stderr
)
elif answer.endswith('.py'):
decision = subprocess.run(['python', answer], input=output.encode('utf-8'), capture_output=True).stdout
if decision != 'Test Passed\n'.encode('utf-8'):
print(
f'Error: Test case {category}-{subcategory}-{file_no_ext} failed.\nGot:\n{output}',
file=sys.stderr
)
if __name__ == '__main__':
main()