from __future__ import annotations
import argparse
import atexit
import json
import os
import platform
import random
import re
import signal
import subprocess
import sys
import tempfile
import time
from pathlib import Path
from io import TextIOWrapper
from typing import Final
from typing import NoReturn
from typing import TypedDict
from typing import cast
from typing import final
from urllib.error import URLError
from urllib.request import Request
from urllib.request import urlopen
BRP_URL: Final = "http://127.0.0.1:15702/jsonrpc"
POLL_INTERVAL: Final = 0.05
MAX_POLLS: Final = 200
COMP_WINDOW: Final = "bevy_window::window::Window"
COMP_PRIMARY: Final = "bevy_window::window::PrimaryWindow"
COMP_MANAGED: Final = "bevy_window_manager::types::ManagedWindow"
COMP_CURRENT_MONITOR: Final = "bevy_window_manager::monitors::CurrentMonitor"
COMP_MONITORS: Final = "bevy_window_manager::monitors::Monitors"
COMP_MONITOR: Final = "bevy_window::monitor::Monitor"
COMP_PERSISTENCE: Final = "bevy_window_manager::types::ManagedWindowPersistence"
RES_RESTORED: Final = "restore_window::WindowRestoredReceived"
RES_MISMATCH: Final = "restore_window::WindowRestoreMismatchReceived"
RES_SETTLED_COUNT: Final = "restore_window::WindowsSettledCount"
type JsonDict = dict[str, object]
class TestRequirements(TypedDict, total=False):
min_monitors: int
different_scales: bool
class MutationConfig(TypedDict, total=False):
target_monitor: str
position_offset: list[int]
size: list[int]
class WorkaroundValidation(TypedDict, total=False):
feature_flag: str
build_without: str
build_with: str
without_behavior: str
with_behavior: str
class PersistenceValidation(TypedDict, total=False):
mode: str
expected_ron_keys: list[str]
unexpected_ron_keys: list[str]
class WindowConfig(TypedDict, total=False):
validate: list[str]
expected_mode: str
spawn_event: str
class _TestEntryRequired(TypedDict):
id: str
ron_file: str
class TestEntry(_TestEntryRequired, total=False):
description: str
automation: str
launch_monitor: int
requires: TestRequirements
mutation: MutationConfig
workaround_validation: WorkaroundValidation
workaround_keys: list[str]
windows: dict[str, WindowConfig]
click_fullscreen_button: bool
persistence_validation: PersistenceValidation
expected_log_warning: str
backend: str
instructions: list[str]
instructions_without_workaround: list[str]
instructions_with_workaround: list[str]
success_criteria: str | dict[str, str]
success_criteria_without: str
success_criteria_with: str
class PlatformConfig(TypedDict):
platform: str
example_ron_path: str
test_ron_dir: str
tests: list[TestEntry]
class RonWindowValues(TypedDict, total=False):
pos_x: str
pos_y: str
width: str
height: str
monitor: str
mode: str
mutation_physical_x: str
mutation_physical_y: str
mutation_physical_w: str
mutation_physical_h: str
class Args(argparse.Namespace):
prebuild: bool = False
discover: bool = False
human_setup: bool = False
config: str = ""
test_id: str = ""
feature_flags: str = ""
backend: str = "native"
env_file: str = ""
no_shutdown: bool = False
app_process: subprocess.Popen[bytes] | None = None
stderr_handle: TextIOWrapper | None = None
pass_count = 0
fail_count = 0
def pass_line(key: str, field: str, details: str) -> None:
global pass_count
print(f"PASS {key} {field} {details}")
pass_count += 1
def fail_line(key: str, field: str, details: str) -> None:
global fail_count
print(f"FAIL {key} {field} {details}")
fail_count += 1
def die(msg: str) -> NoReturn:
print(f"ERROR {msg}", file=sys.stderr)
sys.exit(2)
def json_get(obj: object, *keys: str) -> object:
val = obj
for k in keys:
if isinstance(val, dict):
val = cast(JsonDict, val).get(k)
else:
return None
return val
def json_str(val: object) -> str:
if val is None:
return ""
if isinstance(val, float):
int_val = int(val)
if val == float(int_val):
return str(int_val)
return str(val)
def json_list(val: object) -> list[object]:
if isinstance(val, list):
return cast(list[object], val)
return []
def json_dict(val: object) -> JsonDict:
if isinstance(val, dict):
return cast(JsonDict, val)
return {}
@final
class BrpClient:
def __init__(self, url: str = BRP_URL) -> None:
self.url = url
def call(self, method: str, params: object = None) -> JsonDict:
payload = {
"jsonrpc": "2.0",
"method": method,
"id": 1,
"params": params,
}
data = json.dumps(payload).encode()
req = Request(self.url, data=data, headers={"Content-Type": "application/json"})
resp = urlopen(req, timeout=10) raw: object = json.loads(resp.read()) resp.close() return json_dict(raw)
def wait_ready(self) -> None:
for _ in range(MAX_POLLS):
try:
_ = self.call("rpc.discover")
return
except (URLError, OSError, json.JSONDecodeError):
time.sleep(POLL_INTERVAL)
die("BRP did not become ready within timeout")
def query_primary(self) -> JsonDict:
return self.call("world.query", {
"data": {"components": [COMP_WINDOW, COMP_CURRENT_MONITOR]},
"filter": {"with": [COMP_PRIMARY]},
})
def query_managed(self) -> JsonDict:
return self.call("world.query", {
"data": {"components": [COMP_WINDOW, COMP_MANAGED, COMP_CURRENT_MONITOR]},
"filter": {"with": [COMP_MANAGED], "without": [COMP_PRIMARY]},
})
def shutdown(self) -> None:
try:
_ = self.call("brp_extras/shutdown")
except (URLError, OSError):
pass
brp = BrpClient()
def substitute_ron(template: str, env_vars: dict[str, str]) -> str:
def _replacer(match: re.Match[str]) -> str:
expr = match.group(1)
arith = re.match(r"^(\w+)([+-])(\d+)$", expr)
if arith:
var_name = arith.group(1)
op = arith.group(2)
offset = int(arith.group(3))
val = env_vars.get(var_name)
if val is not None:
result = int(val) + offset if op == "+" else int(val) - offset
return str(result)
return match.group(0)
val = env_vars.get(expr)
if val is not None:
return val
return match.group(0)
return re.sub(r"\$\{([^}]+)\}", _replacer, template)
def parse_ron_values(content: str) -> dict[str, RonWindowValues]:
result: dict[str, RonWindowValues] = {}
current_key = ""
re_managed = re.compile(r'key: Managed\("([^"]+)"\)')
re_position = re.compile(r"logical_position: Some\(\((-?\d+),\s*(-?\d+)\)\)")
re_position_none = re.compile(r"logical_position: None")
re_width = re.compile(r"logical_width: (\d+)")
re_height = re.compile(r"logical_height: (\d+)")
re_monitor = re.compile(r"monitor_index: (\d+)")
re_mode = re.compile(r"mode: (Windowed|BorderlessFullscreen|Fullscreen)")
for line in content.splitlines():
if "key: Primary" in line:
current_key = "primary"
if current_key not in result:
result[current_key] = RonWindowValues()
else:
m = re_managed.search(line)
if m:
current_key = m.group(1)
if current_key not in result:
result[current_key] = RonWindowValues()
continue
if not current_key:
continue
entry = result[current_key]
m = re_position.search(line)
if m:
entry["pos_x"] = m.group(1)
entry["pos_y"] = m.group(2)
continue
if re_position_none.search(line):
entry["pos_x"] = ""
entry["pos_y"] = ""
continue
m = re_width.search(line)
if m:
entry["width"] = m.group(1)
continue
m = re_height.search(line)
if m:
entry["height"] = m.group(1)
continue
m = re_monitor.search(line)
if m:
entry["monitor"] = m.group(1)
continue
m = re_mode.search(line)
if m:
entry["mode"] = m.group(1)
continue
return result
def load_env_file(path: str) -> dict[str, str]:
env: dict[str, str] = {}
if not os.path.isfile(path):
return env
with open(path) as f:
for line in f:
line = line.strip()
if line.startswith("#") or not line:
continue
if line.startswith("export "):
line = line[7:]
eq = line.find("=")
if eq > 0:
env[line[:eq]] = line[eq + 1:]
return env
def write_env_file(path: str, env: dict[str, str]) -> None:
with open(path, "w") as f:
for key, val in env.items():
_ = f.write(f"export {key}={val}\n")
def _stderr_has_panic(path: str) -> bool:
try:
with open(path) as f:
content = f.read()
return "panicked at" in content
except OSError:
return False
def kill_stale_apps() -> None:
if sys.platform == "win32":
_ = subprocess.run(["taskkill", "/F", "/IM", "restore_window.exe"], capture_output=True)
else:
_ = subprocess.run(["pkill", "-9", "-x", "restore_window"], capture_output=True)
time.sleep(0.5)
def cleanup() -> None:
global app_process
if app_process is not None:
try:
app_process.kill()
_ = app_process.wait(timeout=5)
except (ProcessLookupError, subprocess.TimeoutExpired):
pass
app_process = None
_ = atexit.register(cleanup)
def _handle_signal(_signum: int, _frame: object) -> None:
cleanup()
sys.exit(2)
_ = signal.signal(signal.SIGTERM, _handle_signal)
_ = signal.signal(signal.SIGINT, _handle_signal)
def wait_for_restore() -> str:
for _ in range(MAX_POLLS):
if app_process is not None and app_process.poll() is not None:
return "crashed"
try:
result = brp.call("world.get_resources", {"resource": RES_RESTORED})
value = json_get(result, "result", "value")
if value is not None:
return "restored"
except (URLError, OSError):
if app_process is not None and app_process.poll() is not None:
return "crashed"
try:
result = brp.call("world.get_resources", {"resource": RES_MISMATCH})
value = json_get(result, "result", "value")
if value is not None:
return "mismatch"
except (URLError, OSError):
if app_process is not None and app_process.poll() is not None:
return "crashed"
time.sleep(POLL_INTERVAL)
die("Neither WindowRestoredReceived nor WindowRestoreMismatchReceived found within timeout")
def launch_app(
feature_flags: str = "",
backend: str = "native",
capture_stderr: bool = False,
wait_restore: bool = True,
test_mode: bool = True,
) -> tuple[str | None, str]:
global app_process
global stderr_handle
kill_stale_apps()
cmd = ["cargo", "run", "--example", "restore_window"]
if feature_flags:
cmd.extend(feature_flags.split())
env = dict(os.environ)
if test_mode:
env["BWM_TEST_MODE"] = "1"
if backend == "x11":
env["WAYLAND_DISPLAY"] = ""
stderr_path: str | None = None
if capture_stderr:
default_tmpdir = "/private/tmp/claude-501" if sys.platform == "darwin" else "/tmp/claude"
tmpdir = os.environ.get("TMPDIR", default_tmpdir)
_ = os.makedirs(tmpdir, exist_ok=True)
fd, stderr_path = tempfile.mkstemp(prefix="brp_stderr_", dir=tmpdir)
stderr_handle = os.fdopen(fd, "w")
app_process = subprocess.Popen(
cmd,
stdout=subprocess.DEVNULL,
stderr=stderr_handle,
env=env,
)
else:
default_tmpdir = "/private/tmp/claude-501" if sys.platform == "darwin" else "/tmp/claude"
tmpdir = os.environ.get("TMPDIR", default_tmpdir)
_ = os.makedirs(tmpdir, exist_ok=True)
debug_fd, debug_stderr_path = tempfile.mkstemp(prefix="brp_debug_", dir=tmpdir)
debug_handle = os.fdopen(debug_fd, "w")
app_process = subprocess.Popen(
cmd,
stdout=subprocess.DEVNULL,
stderr=debug_handle,
env=env,
)
print(f"# App stderr: {debug_stderr_path}", flush=True)
stderr_path = debug_stderr_path
brp.wait_ready()
restore_result = ""
if wait_restore:
restore_result = wait_for_restore()
return stderr_path, restore_result
def shutdown_app() -> None:
global app_process
global stderr_handle
brp.shutdown()
if app_process is not None:
deadline = time.monotonic() + 10.0
while time.monotonic() < deadline:
if app_process.poll() is not None:
break
time.sleep(0.25)
if app_process.poll() is None:
app_process.kill()
time.sleep(0.5)
try:
_ = app_process.wait(timeout=5)
except subprocess.TimeoutExpired:
pass
app_process = None
if stderr_handle is not None:
try:
stderr_handle.close()
except OSError:
pass
stderr_handle = None
def extract_from_entity(entity: JsonDict, component: str, *path: str) -> object:
val = json_get(entity, "components", component)
for p in path:
if isinstance(val, dict):
val = cast(JsonDict, val).get(p)
else:
return None
return val
def normalize_mode(mode_raw: object) -> str:
if isinstance(mode_raw, str):
return mode_raw
if isinstance(mode_raw, dict):
d = cast(JsonDict, mode_raw)
if "BorderlessFullscreen" in d:
return "BorderlessFullscreen"
if "Fullscreen" in d:
return "Fullscreen"
return str(cast(object, mode_raw))
def extract_position_at(entity: JsonDict, component: str) -> tuple[str, str]:
pos = extract_from_entity(entity, component, "position")
if isinstance(pos, dict):
d = cast(JsonDict, pos)
at = json_list(d.get("At"))
if len(at) >= 2:
return (json_str(at[0]), json_str(at[1]))
return ("", "")
def _monitor_scale(entity: JsonDict, env_vars: dict[str, str]) -> float:
idx = json_str(extract_from_entity(entity, COMP_CURRENT_MONITOR, "monitor", "index"))
if idx and f"MONITOR_{idx}_SCALE" in env_vars:
return float(env_vars[f"MONITOR_{idx}_SCALE"])
return float(json_str(extract_from_entity(entity, COMP_WINDOW, "resolution", "scale_factor")))
def _rust_round(x: float) -> int:
if x >= 0:
return int(x + 0.5)
return -int(-x + 0.5)
def _logical_to_physical_pos(logical: int, scale: float) -> int:
return _rust_round(float(logical) * scale)
def _physical_to_logical_pos(physical: int, scale: float) -> int:
return _rust_round(float(physical) / scale)
def _logical_to_physical_size(logical: int, scale: float) -> int:
return int(float(logical) * scale)
def _physical_to_logical_size(physical: int, scale: float) -> int:
return int(float(physical) / scale)
def _check_field(
key: str,
field: str,
expected: str,
actual: str,
prefix: str = "",
) -> None:
if actual == expected:
pass_line(key, f"{prefix}{field}", f"expected={expected} actual={actual}")
else:
fail_line(key, f"{prefix}{field}", f"expected={expected} actual={actual}")
def _check_field_pair(
key: str,
field: str,
exp_a: str,
exp_b: str,
act_a: str,
act_b: str,
prefix: str = "",
) -> None:
if act_a == exp_a and act_b == exp_b:
pass_line(key, f"{prefix}{field}", f"expected=[{exp_a},{exp_b}] actual=[{act_a},{act_b}]")
else:
fail_line(key, f"{prefix}{field}", f"expected=[{exp_a},{exp_b}] actual=[{act_a},{act_b}]")
def _check_persistence_key(key: str, ron_content: str, expect_present: bool) -> None:
marker = "key: Primary" if key == "primary" else f'key: Managed("{key}")'
is_present = marker in ron_content
if expect_present:
if is_present:
pass_line("persistence", f"key={key}", "present")
else:
fail_line("persistence", f"key={key}", "missing")
else:
if is_present:
fail_line("persistence", f"key={key}", "should_be_absent")
else:
pass_line("persistence", f"key={key}", "absent")
def _check_position_saved(key: str, ron_content: str) -> None:
parsed = parse_ron_values(ron_content)
entry = parsed.get(key)
if entry is None:
fail_line("persistence", f"position_saved={key}", "window_key_not_found")
return
pos_x = entry.get("pos_x", "")
if pos_x:
pass_line("persistence", f"position_saved={key}", f"position=({pos_x},{entry.get('pos_y', '')})")
else:
fail_line("persistence", f"position_saved={key}", "position=None")
def validate_window(
key: str,
validate_fields: list[str],
entity: JsonDict,
ron_values: RonWindowValues,
prefix: str = "",
expected_mode_override: str = "",
backend: str = "native",
env_vars: dict[str, str] | None = None,
) -> None:
for field in validate_fields:
if field == "position":
if backend == "wayland":
continue
actual_x, actual_y = extract_position_at(entity, COMP_WINDOW)
scale = _monitor_scale(entity, env_vars or {})
logical_x = ron_values.get("pos_x", "")
logical_y = ron_values.get("pos_y", "")
exp_x = str(_logical_to_physical_pos(int(logical_x), scale)) if logical_x else ""
exp_y = str(_logical_to_physical_pos(int(logical_y), scale)) if logical_y else ""
_check_field_pair(key, "position", exp_x, exp_y, actual_x, actual_y, prefix)
elif field == "size":
actual_w = json_str(extract_from_entity(entity, COMP_WINDOW, "resolution", "physical_width"))
actual_h = json_str(extract_from_entity(entity, COMP_WINDOW, "resolution", "physical_height"))
scale = _monitor_scale(entity, env_vars or {})
logical_w = ron_values.get("width", "")
logical_h = ron_values.get("height", "")
exp_w = str(_logical_to_physical_size(int(logical_w), scale)) if logical_w else ""
exp_h = str(_logical_to_physical_size(int(logical_h), scale)) if logical_h else ""
_check_field_pair(key, "size", exp_w, exp_h, actual_w, actual_h, prefix)
elif field == "mode":
mode_raw = extract_from_entity(entity, COMP_WINDOW, "mode")
actual_mode = normalize_mode(mode_raw)
exp_mode = expected_mode_override if expected_mode_override else ron_values.get("mode", "")
_check_field(key, "mode", exp_mode, actual_mode, prefix)
elif field == "monitor_index":
actual_idx = json_str(extract_from_entity(entity, COMP_CURRENT_MONITOR, "monitor", "index"))
exp_idx = ron_values.get("monitor", "")
_check_field(key, "monitor_index", exp_idx, actual_idx, prefix)
elif field == "exit_code":
pass
def resolve_primary_entity() -> JsonDict | None:
result = brp.query_primary()
entities = json_list(result.get("result"))
if entities:
return json_dict(entities[0])
return None
def wait_for_all_windows_settled(expected_count: int) -> None:
for _ in range(MAX_POLLS):
try:
result = brp.call("world.get_resources", {"resource": RES_SETTLED_COUNT})
value = json_get(result, "result", "value")
if isinstance(value, dict):
count = cast(JsonDict, value).get("count", 0)
if isinstance(count, int) and count >= expected_count:
return
except (URLError, OSError):
pass
time.sleep(POLL_INTERVAL)
die(f"WindowsSettledCount did not reach {expected_count} within timeout")
def spawn_and_poll_managed(
spawn_event: str,
expected_total_windows: int,
) -> list[JsonDict]:
_ = brp.call("world.trigger_event", {"event": spawn_event, "value": None})
wait_for_all_windows_settled(expected_total_windows)
for _ in range(20):
time.sleep(0.2)
try:
result = brp.query_managed()
raw_entities = json_list(result.get("result"))
entities = [json_dict(e) for e in raw_entities]
if entities:
return entities
except (URLError, OSError):
pass
die("Managed window not found after settling")
def get_managed_by_name(entities: list[JsonDict], window_name: str) -> JsonDict | None:
for ent in entities:
managed = extract_from_entity(ent, COMP_MANAGED)
if isinstance(managed, dict):
d = cast(JsonDict, managed)
if d.get("name") == window_name:
return ent
return None
def validate_all_windows(
windows: dict[str, WindowConfig],
ron_values: dict[str, RonWindowValues],
prefix: str = "",
backend: str = "native",
env_vars: dict[str, str] | None = None,
) -> None:
triggered_events: set[str] = set()
managed_entities: list[JsonDict] = []
total_window_count = len(windows)
for wkey, wconfig in windows.items():
validate_fields = wconfig.get("validate", [])
expected_mode_override = wconfig.get("expected_mode", "")
spawn_event = wconfig.get("spawn_event", "")
entity: JsonDict | None = None
if wkey == "primary":
entity = resolve_primary_entity()
else:
if spawn_event and spawn_event not in triggered_events:
managed_entities = spawn_and_poll_managed(
spawn_event,
expected_total_windows=total_window_count,
)
triggered_events.add(spawn_event)
if managed_entities:
entity = get_managed_by_name(managed_entities, wkey)
if entity is None:
fail_line(wkey, f"{prefix}query", "window not found")
continue
wkey_values = ron_values.get(wkey, RonWindowValues())
validate_window(
wkey,
validate_fields,
entity,
wkey_values,
prefix=prefix,
expected_mode_override=expected_mode_override,
backend=backend,
env_vars=env_vars,
)
def apply_mutations(
test: TestEntry,
entity: JsonDict,
ron_values: dict[str, RonWindowValues],
env_vars: dict[str, str] | None = None,
) -> None:
entity_id = entity.get("entity")
mutation = test.get("mutation", {})
scale = _monitor_scale(entity, env_vars or {})
if "position_offset" in mutation:
offset = mutation["position_offset"]
primary_vals = ron_values.get("primary", RonWindowValues())
logical_x = int(primary_vals.get("pos_x", "0") or "0")
logical_y = int(primary_vals.get("pos_y", "0") or "0")
physical_x = _logical_to_physical_pos(logical_x, scale) + offset[0]
physical_y = _logical_to_physical_pos(logical_y, scale) + offset[1]
_ = brp.call("world.mutate_components", {
"entity": entity_id,
"component": COMP_WINDOW,
"path": ".position",
"value": {"At": [physical_x, physical_y]},
})
primary_vals["pos_x"] = str(_physical_to_logical_pos(physical_x, scale))
primary_vals["pos_y"] = str(_physical_to_logical_pos(physical_y, scale))
primary_vals["mutation_physical_x"] = str(physical_x)
primary_vals["mutation_physical_y"] = str(physical_y)
if "size" in mutation:
new_w = mutation["size"][0]
new_h = mutation["size"][1]
_ = brp.call("world.mutate_components", {
"entity": entity_id,
"component": COMP_WINDOW,
"path": ".resolution",
"value": {
"physical_width": new_w,
"physical_height": new_h,
"scale_factor": scale,
"scale_factor_override": None,
},
})
primary_vals = ron_values.get("primary", RonWindowValues())
primary_vals["width"] = str(_physical_to_logical_size(new_w, scale))
primary_vals["height"] = str(_physical_to_logical_size(new_h, scale))
primary_vals["mutation_physical_w"] = str(new_w)
primary_vals["mutation_physical_h"] = str(new_h)
def verify_mutations(
ron_values: dict[str, RonWindowValues],
backend: str = "native",
env_vars: dict[str, str] | None = None,
) -> None:
primary_entity = resolve_primary_entity()
if primary_entity is None:
fail_line("primary", "mutation_query", "primary window not found")
return
primary_vals = ron_values.get("primary", RonWindowValues())
actual_w = json_str(extract_from_entity(primary_entity, COMP_WINDOW, "resolution", "physical_width"))
actual_h = json_str(extract_from_entity(primary_entity, COMP_WINDOW, "resolution", "physical_height"))
scale = _monitor_scale(primary_entity, env_vars or {})
exp_w = primary_vals.get("mutation_physical_w", "")
exp_h = primary_vals.get("mutation_physical_h", "")
if not exp_w:
logical_w = primary_vals.get("width", "")
logical_h = primary_vals.get("height", "")
exp_w = str(_logical_to_physical_size(int(logical_w), scale)) if logical_w else ""
exp_h = str(_logical_to_physical_size(int(logical_h), scale)) if logical_h else ""
_check_field_pair("primary", "mutation_size", exp_w, exp_h, actual_w, actual_h)
logical_px = primary_vals.get("pos_x", "")
if logical_px:
actual_x, actual_y = extract_position_at(primary_entity, COMP_WINDOW)
exp_px = primary_vals.get("mutation_physical_x", "")
exp_py = primary_vals.get("mutation_physical_y", "")
if not exp_px:
logical_py = primary_vals.get("pos_y", "")
exp_px = str(_logical_to_physical_pos(int(logical_px), scale)) if logical_px else ""
exp_py = str(_logical_to_physical_pos(int(logical_py), scale)) if logical_py else ""
if backend == "x11":
pass_line("primary", "mutation_position",
f"x11_readback=[{actual_x},{actual_y}] (W6 offset expected)")
else:
_check_field_pair("primary", "mutation_position", exp_px, exp_py, actual_x, actual_y)
primary_vals["pos_x"] = str(_physical_to_logical_pos(int(actual_x), scale)) if actual_x else ""
primary_vals["pos_y"] = str(_physical_to_logical_pos(int(actual_y), scale)) if actual_y else ""
def validate_persistence(test: TestEntry, ron_path: str) -> None:
persistence = test.get("persistence_validation", {})
ron_content = Path(ron_path).read_text()
for key in persistence.get("expected_ron_keys", []):
_check_persistence_key(key, ron_content, expect_present=True)
for key in persistence.get("unexpected_ron_keys", []):
_check_persistence_key(key, ron_content, expect_present=False)
for key in cast(list[str], persistence.get("expect_position_saved", [])):
_check_position_saved(key, ron_content)
def click_fullscreen_button() -> None:
_ = subprocess.run(
[
"osascript", "-e",
'tell application "System Events" to tell process "restore_window" to click button 2 of window 1',
],
capture_output=True,
)
time.sleep(2)
def write_ron(
ron_file: str,
ron_dir: str,
ron_path: str,
env_vars: dict[str, str],
) -> None:
template_path = os.path.join(ron_dir, ron_file)
if not os.path.isfile(template_path):
die(f"RON template not found: {template_path}")
template = Path(template_path).read_text()
substituted = substitute_ron(template, env_vars)
os.makedirs(os.path.dirname(ron_path), exist_ok=True)
_ = Path(ron_path).write_text(substituted)
def find_test(config: PlatformConfig, test_id: str) -> TestEntry:
for t in config["tests"]:
if t["id"] == test_id:
return t
die(f"Test '{test_id}' not found in config")
def resolve_backend(backend: str, test: TestEntry) -> str:
test_backend = test.get("backend", "native")
if backend == "native" and test_backend and test_backend != "native":
return test_backend
return backend
def _swap_x11_env_vars(env_vars: dict[str, str]) -> None:
suffixes = ["_POS_X", "_POS_Y", "_LOGICAL_POS_X", "_LOGICAL_POS_Y", "_SCALE"]
i = 0
while f"MONITOR_{i}_SCALE" in env_vars:
for suffix in suffixes:
x11_key = f"MONITOR_{i}_X11{suffix}"
base_key = f"MONITOR_{i}{suffix}"
if x11_key in env_vars:
env_vars[base_key] = env_vars[x11_key]
i += 1
def setup_test(
config: PlatformConfig,
test_id: str,
ron_dir: str,
ron_path: str,
env_file: str,
backend: str,
) -> tuple[TestEntry, str, dict[str, str]]:
kill_stale_apps()
test = find_test(config, test_id)
resolved_backend = resolve_backend(backend, test)
env_vars = load_env_file(env_file) if env_file else {}
if resolved_backend == "x11":
_swap_x11_env_vars(env_vars)
write_ron(test["ron_file"], ron_dir, ron_path, env_vars)
return test, resolved_backend, env_vars
def _extract_video_modes(
entity: JsonDict,
prefix: str,
env_vars: dict[str, str],
) -> None:
modes_raw = extract_from_entity(entity, COMP_MONITOR, "video_modes")
video_modes = json_list(modes_raw)
if not video_modes:
return
selected = json_dict(random.choice(video_modes))
phys = json_list(selected.get("physical_size"))
phys_w = json_str(phys[0]) if len(phys) > 0 else "0"
phys_h = json_str(phys[1]) if len(phys) > 1 else "0"
depth = json_str(selected.get("bit_depth"))
refresh = json_str(selected.get("refresh_rate_millihertz"))
env_vars[f"{prefix}WIDTH"] = phys_w
env_vars[f"{prefix}HEIGHT"] = phys_h
env_vars[f"{prefix}DEPTH"] = depth
env_vars[f"{prefix}REFRESH"] = refresh
scale_key = prefix.replace("VIDEO_MODE_", "") + "SCALE"
scale_val = float(env_vars.get(scale_key, "1") or "1")
logical_w = str(int(float(phys_w) / scale_val))
logical_h = str(int(float(phys_h) / scale_val))
env_vars[f"{prefix}LOGICAL_WIDTH"] = logical_w
env_vars[f"{prefix}LOGICAL_HEIGHT"] = logical_h
print(f"export {prefix}WIDTH={phys_w}")
print(f"export {prefix}HEIGHT={phys_h}")
print(f"export {prefix}DEPTH={depth}")
print(f"export {prefix}REFRESH={refresh}")
print(f"export {prefix}LOGICAL_WIDTH={logical_w}")
print(f"export {prefix}LOGICAL_HEIGHT={logical_h}")
def run_discovery(
ron_dir: str,
ron_path: str,
env_file: str,
backend: str,
) -> None:
env_vars: dict[str, str] = {}
write_ron("discovery.ron", ron_dir, ron_path, env_vars)
_ = launch_app(feature_flags="", backend="native", wait_restore=False)
monitors_json: JsonDict = {}
for _ in range(MAX_POLLS):
try:
result = brp.call("world.get_resources", {"resource": COMP_MONITORS})
value = json_dict(json_get(result, "result", "value"))
monitor_list_raw = json_list(value.get("list"))
if monitor_list_raw:
monitors_json = value
break
except (URLError, OSError, KeyError):
pass
time.sleep(POLL_INTERVAL)
else:
die("Monitors resource not populated within timeout")
monitor_list = [json_dict(m) for m in json_list(monitors_json.get("list"))]
num_monitors = len(monitor_list)
env_vars["NUM_MONITORS"] = str(num_monitors)
print(f"export NUM_MONITORS={num_monitors}")
for i, mon in enumerate(monitor_list):
pos = json_list(mon.get("position"))
size = json_list(mon.get("size"))
scale = json_str(mon.get("scale"))
pos_x = json_str(pos[0]) if len(pos) > 0 else "0"
pos_y = json_str(pos[1]) if len(pos) > 1 else "0"
size_w = json_str(size[0]) if len(size) > 0 else "0"
size_h = json_str(size[1]) if len(size) > 1 else "0"
scale_f = float(scale) if scale else 1.0
logical_pos_x = str(round(float(pos_x) / scale_f)) if scale_f != 0 else pos_x
logical_pos_y = str(round(float(pos_y) / scale_f)) if scale_f != 0 else pos_y
env_vars[f"MONITOR_{i}_POS_X"] = pos_x
env_vars[f"MONITOR_{i}_POS_Y"] = pos_y
env_vars[f"MONITOR_{i}_LOGICAL_POS_X"] = logical_pos_x
env_vars[f"MONITOR_{i}_LOGICAL_POS_Y"] = logical_pos_y
env_vars[f"MONITOR_{i}_WIDTH"] = size_w
env_vars[f"MONITOR_{i}_HEIGHT"] = size_h
env_vars[f"MONITOR_{i}_SCALE"] = scale
print(f"export MONITOR_{i}_POS_X={pos_x}")
print(f"export MONITOR_{i}_POS_Y={pos_y}")
print(f"export MONITOR_{i}_LOGICAL_POS_X={logical_pos_x}")
print(f"export MONITOR_{i}_LOGICAL_POS_Y={logical_pos_y}")
print(f"export MONITOR_{i}_WIDTH={size_w}")
print(f"export MONITOR_{i}_HEIGHT={size_h}")
print(f"export MONITOR_{i}_SCALE={scale}")
try:
monitor_query = brp.call("world.query", {
"data": {"components": [COMP_MONITOR]},
"filter": {},
})
monitor_entities = [json_dict(e) for e in json_list(monitor_query.get("result"))]
for i in range(min(len(monitor_entities), num_monitors)):
_extract_video_modes(monitor_entities[i], f"MONITOR_{i}_VIDEO_MODE_", env_vars)
except (URLError, OSError):
pass
try:
restored_result = brp.call("world.get_resources", {"resource": RES_RESTORED})
restored_value = json_get(restored_result, "result", "value")
if restored_value is not None:
print("# WindowRestoredReceived validation: OK")
else:
mismatch_result = brp.call("world.get_resources", {"resource": RES_MISMATCH})
mismatch_value = json_get(mismatch_result, "result", "value")
if mismatch_value is not None:
print("# WindowRestoreMismatchReceived validation: OK (mismatch detected)")
else:
print("# WARNING: Neither WindowRestoredReceived nor WindowRestoreMismatchReceived found")
except (URLError, OSError):
print("# WARNING: WindowRestoredReceived resource query failed")
if num_monitors >= 2:
s0 = json_str(monitor_list[0].get("scale"))
s1 = json_str(monitor_list[1].get("scale"))
different = "true" if s0 != s1 else "false"
else:
different = "false"
env_vars["DIFFERENT_SCALES"] = different
print(f"export DIFFERENT_SCALES={different}")
shutdown_app()
if backend == "x11-also":
print("# X11 discovery...")
_ = launch_app(feature_flags="", backend="x11", wait_restore=False)
try:
x11_result = brp.call("world.get_resources", {"resource": COMP_MONITORS})
x11_value = json_dict(json_get(x11_result, "result", "value"))
x11_list = [json_dict(m) for m in json_list(x11_value.get("list"))]
for i in range(min(len(x11_list), num_monitors)):
x11_scale = json_str(x11_list[i].get("scale"))
env_vars[f"MONITOR_{i}_X11_SCALE"] = x11_scale
print(f"export MONITOR_{i}_X11_SCALE={x11_scale}")
x11_pos = json_list(x11_list[i].get("position"))
x11_pos_x = json_str(x11_pos[0]) if len(x11_pos) > 0 else "0"
x11_pos_y = json_str(x11_pos[1]) if len(x11_pos) > 1 else "0"
x11_scale_f = float(x11_scale) if x11_scale else 1.0
x11_lpos_x = str(round(float(x11_pos_x) / x11_scale_f)) if x11_scale_f != 0 else x11_pos_x
x11_lpos_y = str(round(float(x11_pos_y) / x11_scale_f)) if x11_scale_f != 0 else x11_pos_y
env_vars[f"MONITOR_{i}_X11_POS_X"] = x11_pos_x
env_vars[f"MONITOR_{i}_X11_POS_Y"] = x11_pos_y
env_vars[f"MONITOR_{i}_X11_LOGICAL_POS_X"] = x11_lpos_x
env_vars[f"MONITOR_{i}_X11_LOGICAL_POS_Y"] = x11_lpos_y
print(f"export MONITOR_{i}_X11_POS_X={x11_pos_x}")
print(f"export MONITOR_{i}_X11_POS_Y={x11_pos_y}")
print(f"export MONITOR_{i}_X11_LOGICAL_POS_X={x11_lpos_x}")
print(f"export MONITOR_{i}_X11_LOGICAL_POS_Y={x11_lpos_y}")
x11_query = brp.call("world.query", {
"data": {"components": [COMP_MONITOR]},
"filter": {},
})
x11_entities = [json_dict(e) for e in json_list(x11_query.get("result"))]
for i in range(min(len(x11_entities), num_monitors)):
_extract_video_modes(x11_entities[i], f"MONITOR_{i}_X11_VIDEO_MODE_", env_vars)
except (URLError, OSError):
pass
shutdown_app()
write_env_file(env_file, env_vars)
sys.exit(0)
def run_prebuild() -> None:
uname = platform.system()
if uname == "Darwin":
plat = "macos"
elif uname == "Linux":
plat = "linux"
elif uname in ("Windows", "MINGW", "MSYS", "CYGWIN") or uname.startswith("MINGW"):
plat = "windows"
else:
die(f"Unsupported platform: {uname}")
config_file = f"tests/config/{plat}.json"
print(f"PLATFORM={plat}")
print(f"CONFIG={config_file}")
if not os.path.isfile(config_file):
die(f"Config file not found: {config_file}")
default_tmpdir = "/private/tmp/claude-501" if sys.platform == "darwin" else "/tmp/claude"
tmpdir = os.environ.get("TMPDIR", default_tmpdir)
os.makedirs(tmpdir, exist_ok=True)
result = subprocess.run(
["cargo", "build", "--example", "restore_window"],
capture_output=False,
)
if result.returncode == 0:
print("BUILD_DEFAULT=ok")
else:
print("BUILD_DEFAULT=failed")
sys.exit(1)
with open(config_file) as f:
config: PlatformConfig = json.load(f)
flag_sets: set[str] = set()
for t in config["tests"]:
wv = t.get("workaround_validation")
if wv:
build_without = wv.get("build_without", "")
build_with = wv.get("build_with", "")
if build_without:
flag_sets.add(build_without)
if build_with:
flag_sets.add(build_with)
if flag_sets:
for i, flags in enumerate(sorted(flag_sets)):
cmd = ["cargo", "build", "--example", "restore_window"] + flags.split()
print(f"BUILD_VARIANT_{i}={flags}")
result = subprocess.run(cmd, capture_output=False)
if result.returncode == 0:
print(f"BUILD_VARIANT_{i}=ok")
else:
print(f"BUILD_VARIANT_{i}=failed")
sys.exit(1)
print(f"BUILD_VARIANTS={len(flag_sets)} ok")
else:
print("BUILD_VARIANTS=skipped")
def run_human_setup(
config: PlatformConfig,
test_id: str,
ron_dir: str,
ron_path: str,
env_file: str,
feature_flags: str,
backend: str,
) -> None:
test, resolved_backend, _ = setup_test(config, test_id, ron_dir, ron_path, env_file, backend)
_ = launch_app(feature_flags, resolved_backend, test_mode=False)
instructions = test.get("instructions", [])
if feature_flags and "--no-default-features" in feature_flags:
instructions = test.get("instructions_without_workaround", instructions)
elif test.get("workaround_validation"):
instructions = test.get("instructions_with_workaround", instructions)
print("HUMAN_TEST_READY")
print(f"TEST_ID={test_id}")
print(f"DESCRIPTION={test.get('description', '')}")
if instructions:
print("INSTRUCTIONS_START")
for line in instructions:
print(line)
print("INSTRUCTIONS_END")
criteria = test.get("success_criteria", "")
if feature_flags and "--no-default-features" in feature_flags:
criteria = test.get("success_criteria_without", criteria)
elif test.get("workaround_validation"):
criteria = test.get("success_criteria_with", criteria)
if isinstance(criteria, dict):
pass_msg = criteria.get("pass", "")
fail_msg = criteria.get("fail", "")
print(f"CRITERIA_PASS={pass_msg}")
print(f"CRITERIA_FAIL={fail_msg}")
elif criteria:
print(f"CRITERIA={criteria}")
global app_process
app_process = None
sys.exit(0)
def run_test(
config: PlatformConfig,
test_id: str,
ron_dir: str,
ron_path: str,
env_file: str,
feature_flags: str,
backend: str,
no_shutdown: bool = False,
) -> None:
global app_process
test, resolved_backend, env_vars = setup_test(config, test_id, ron_dir, ron_path, env_file, backend)
ron_content = Path(ron_path).read_text()
ron_values = parse_ron_values(ron_content)
has_click_fullscreen = test.get("click_fullscreen_button", False)
has_mutation = "mutation" in test
has_persistence = "persistence_validation" in test
expected_log = test.get("expected_log_warning", "")
windows = test.get("windows", {})
has_exit_code = any(
"exit_code" in wconfig.get("validate", [])
for wconfig in windows.values()
)
capture_stderr = bool(expected_log)
use_test_mode = not no_shutdown
stderr_path, restore_result = launch_app(feature_flags, resolved_backend, capture_stderr, test_mode=use_test_mode)
expect_mismatch = test.get("expect_mismatch", False)
if restore_result == "mismatch":
if expect_mismatch:
pass_line("restore", "event", "WindowRestoreMismatch fired (expected)")
else:
fail_line("restore", "event", "WindowRestoreMismatch fired (restore state did not match)")
if has_click_fullscreen:
click_fullscreen_button()
shutdown_app()
_ = launch_app(feature_flags, resolved_backend)
if has_exit_code:
time.sleep(2)
panicked = _stderr_has_panic(stderr_path) if stderr_path else False
if panicked:
fail_line("primary", "exit_code", "expected=no_panic actual=panic_detected")
else:
pass_line("primary", "exit_code", "expected=no_panic actual=no_panic")
if app_process is not None and app_process.poll() is not None:
app_process = None
elif no_shutdown:
app_process = None
else:
shutdown_app()
sys.exit(1 if fail_count > 0 else 0)
validate_all_windows(windows, ron_values, prefix="", backend=resolved_backend, env_vars=env_vars)
if has_persistence:
persistence = test.get("persistence_validation", {})
persist_mode = persistence.get("mode", "")
if persist_mode == "ActiveOnly":
_ = brp.call("world.insert_resources", {
"resource": COMP_PERSISTENCE,
"value": "ActiveOnly",
})
if has_mutation:
primary_entity = resolve_primary_entity()
if primary_entity is None:
fail_line("primary", "mutation_query", "primary window not found")
else:
apply_mutations(test, primary_entity, ron_values, env_vars=env_vars)
time.sleep(0.5)
verify_mutations(ron_values, backend=resolved_backend, env_vars=env_vars)
if no_shutdown and not has_mutation and not has_persistence:
app_process = None
sys.exit(1 if fail_count > 0 else 0)
shutdown_app()
if expected_log and stderr_path and os.path.isfile(stderr_path):
stderr_content = Path(stderr_path).read_text()
if expected_log in stderr_content:
pass_line("log", "expected_warning", f'found="{expected_log}"')
else:
fail_line("log", "expected_warning", f'not_found="{expected_log}"')
os.unlink(stderr_path)
if has_persistence:
validate_persistence(test, ron_path)
if has_mutation:
_, relaunch_result = launch_app(feature_flags, resolved_backend, test_mode=use_test_mode)
if relaunch_result == "mismatch":
fail_line("restore", "relaunch_event", "WindowRestoreMismatch fired on relaunch")
validate_all_windows(windows, ron_values, prefix="relaunch ", backend=resolved_backend, env_vars=env_vars)
if no_shutdown:
app_process = None
else:
shutdown_app()
sys.exit(1 if fail_count > 0 else 0)
def main() -> None:
parser = argparse.ArgumentParser(description="bevy_window_manager test runner v3")
_ = parser.add_argument("--prebuild", action="store_true", help="Build both binary variants")
_ = parser.add_argument("--discover", action="store_true", help="Discovery mode")
_ = parser.add_argument("--human-setup", action="store_true", help="Setup human test (write RON, launch app, print instructions)")
_ = parser.add_argument("--config", default="", help="Platform config JSON file")
_ = parser.add_argument("--test-id", default="", help="Test ID to run")
_ = parser.add_argument("--feature-flags", default="", help="Cargo feature flags override")
_ = parser.add_argument("--backend", default="native", help="Backend: native, x11, x11-also")
_ = parser.add_argument("--env-file", default="", help="Env file for MONITOR_* vars")
_ = parser.add_argument("--no-shutdown", action="store_true", help="Leave app running after test (skip shutdown)")
args = cast(Args, parser.parse_args())
if args.prebuild:
run_prebuild()
return
if not args.config:
die("Missing --config")
if not os.path.isfile(args.config):
die(f"Config file not found: {args.config}")
with open(args.config) as f:
config: PlatformConfig = json.load(f)
ron_dir = config["test_ron_dir"]
if not ron_dir:
die("Config missing test_ron_dir")
raw_ron_path = config["example_ron_path"]
if not raw_ron_path:
die("Config missing example_ron_path")
if raw_ron_path.startswith("~"):
ron_path = os.path.expanduser("~") + raw_ron_path[1:]
else:
ron_path = raw_ron_path
appdata = os.environ.get("APPDATA", "")
if appdata:
ron_path = ron_path.replace("%APPDATA%", appdata)
if args.discover:
if not args.env_file:
die("Missing --env-file (required for --discover)")
run_discovery(ron_dir, ron_path, args.env_file, args.backend)
elif args.human_setup:
if not args.test_id:
die("Missing --test-id (required for --human-setup)")
run_human_setup(config, args.test_id, ron_dir, ron_path, args.env_file, args.feature_flags, args.backend)
else:
if not args.test_id:
die("Missing --test-id (required unless --discover or --prebuild)")
run_test(config, args.test_id, ron_dir, ron_path, args.env_file, args.feature_flags, args.backend, args.no_shutdown)
if __name__ == "__main__":
main()