import os
import platform
import time
from contextlib import ExitStack
from pathlib import Path
from deimos import Controller, peripheral, socket, Termination, LoopMethod
HAS_UNIX_SOCKET = hasattr(socket, "UnixSocket") and hasattr(
peripheral.HootlTransport, "unix_socket"
)
def _loopback_udp_socket() -> socket.UdpSocket:
targets = socket.UdpSocket.possible_broadcast_targets()
if not targets:
raise RuntimeError("No UDP broadcast targets available for loopback test")
return socket.UdpSocket.with_broadcast_targets([targets[0]])
def _should_retry_under_test() -> bool:
return (
os.environ.get("DEIMOS_TESTING", "").lower() == "true"
and platform.system() == "Darwin"
)
def _run_once() -> None:
here = Path(__file__).parent.resolve()
for loop_method in [LoopMethod.performant(), LoopMethod.efficient()]:
print(f"Testing with loop method {loop_method}")
ctrl = Controller(op_name="mockup_demo", op_dir=str(here / "op"), rate_hz=20.0)
ctrl.termination_criteria = Termination.timeout_s(2.0)
ctrl.loop_method = loop_method
ctrl.clear_sockets()
ctrl.add_socket("mockup_chan", socket.ThreadChannelSocket("mockup_chan"))
if HAS_UNIX_SOCKET:
ctrl.add_socket("ctrl", socket.UnixSocket("ctrl"))
ctrl.add_socket("udp", _loopback_udp_socket())
ctrl.add_peripheral("mock_thread", peripheral.DeimosDaqRev6(1))
if HAS_UNIX_SOCKET:
ctrl.add_peripheral("mock_unix", peripheral.DeimosDaqRev6(2))
ctrl.add_peripheral("mock_udp", peripheral.DeimosDaqRev6(3))
with ExitStack() as stack:
stack.enter_context(
ctrl.attach_hootl_driver(
"mock_thread",
peripheral.HootlTransport.thread_channel("mockup_chan"),
)
)
if HAS_UNIX_SOCKET:
stack.enter_context(
ctrl.attach_hootl_driver(
"mock_unix",
peripheral.HootlTransport.unix_socket("mockup_unix"),
)
)
stack.enter_context(
ctrl.attach_hootl_driver(
"mock_udp",
peripheral.HootlTransport.udp(),
)
)
manual_inputs = ctrl.available_inputs()
print("Manual inputs available:")
for name in manual_inputs[:3]: print(f" {name}")
print(f" ...and {len(manual_inputs) - 3} more")
start = time.perf_counter()
handle = ctrl.run_nonblocking()
try:
time.sleep(0.2) handle.write({"mock_thread.dac0": 0.0})
for k, v in handle.read().values.items():
if "loss_of_contact_counter" in k:
assert v < 2.0, f"Missed packet: {k} = {v:.0f}"
except Exception:
end = time.perf_counter()
print(
"Sending termination signal to run handle"
f" from Python after {end - start:.2f}s."
)
handle.stop()
raise
finally:
handle.join()
def main() -> None:
attempts = 10 if _should_retry_under_test() else 1
last_exc: Exception | None = None
for attempt in range(1, attempts + 1):
try:
if attempts > 1:
print(f"macOS test run attempt {attempt}/{attempts}")
_run_once()
return
except Exception as exc:
last_exc = exc
if attempt == attempts:
raise
print(
f"macOS test run attempt {attempt}/{attempts} failed;"
" retrying after cleanup."
)
time.sleep(0.2)
if last_exc is not None:
raise last_exc
if __name__ == "__main__":
main()