import os
import struct
import time
import pytest
SERIAL_SERVER_ID = os.environ.get("XOQ_SERIAL_SERVER_ID")
CAMERA_SERVER_ID = os.environ.get("XOQ_CAMERA_SERVER_ID")
CAN_SERVER_ID = os.environ.get("XOQ_CAN_SERVER_ID")
STS_HEADER = bytes([0xFF, 0xFF])
STS_INST_PING = 0x01
STS_INST_READ = 0x02
STS_INST_WRITE = 0x03
STS_TORQUE_ENABLE = 40 STS_PRESENT_POSITION = 56
def _sts_checksum(packet_body: bytes) -> int:
return (~sum(packet_body)) & 0xFF
def sts_ping(servo_id: int) -> bytes:
length = 2 body = bytes([servo_id, length, STS_INST_PING])
return STS_HEADER + body + bytes([_sts_checksum(body)])
def sts_read(servo_id: int, address: int, count: int) -> bytes:
length = 4 body = bytes([servo_id, length, STS_INST_READ, address, count])
return STS_HEADER + body + bytes([_sts_checksum(body)])
def sts_write_byte(servo_id: int, address: int, value: int) -> bytes:
length = 4 body = bytes([servo_id, length, STS_INST_WRITE, address, value])
return STS_HEADER + body + bytes([_sts_checksum(body)])
def parse_sts_response(data: bytes):
idx = data.find(STS_HEADER)
if idx < 0 or idx + 4 > len(data):
return None
servo_id = data[idx + 2]
length = data[idx + 3]
if idx + 3 + length > len(data):
return None
error = data[idx + 4]
params = data[idx + 5 : idx + 3 + length] return servo_id, error, params
DAMIAO_ENABLE = bytes([0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFC])
DAMIAO_DISABLE = bytes([0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFD])
def damiao_parse_state(data: bytes):
if len(data) < 8:
return None
q_raw = (data[1] << 8) | data[2]
dq_raw = (data[3] << 4) | (data[4] >> 4)
tau_raw = ((data[4] & 0x0F) << 8) | data[5]
t_mos = data[6]
t_rotor = data[7]
return q_raw, dq_raw, tau_raw, t_mos, t_rotor
@pytest.fixture(scope="module")
def serial_conn():
if not SERIAL_SERVER_ID:
pytest.skip("XOQ_SERIAL_SERVER_ID not set")
import xoq_serial
ser = xoq_serial.Serial(SERIAL_SERVER_ID, timeout=5.0)
yield ser
ser.close()
@pytest.mark.skipif(not SERIAL_SERVER_ID, reason="XOQ_SERIAL_SERVER_ID not set")
class TestSerialIntegration:
@pytest.mark.timeout(30)
def test_ping_servo(self, serial_conn):
serial_conn.reset_input_buffer()
serial_conn.write(sts_ping(1))
time.sleep(0.05)
resp = serial_conn.read(32)
assert len(resp) >= 6, f"Ping response too short: {resp.hex()}"
parsed = parse_sts_response(resp)
assert parsed is not None, f"Could not parse response: {resp.hex()}"
servo_id, error, _ = parsed
assert servo_id == 1, f"Unexpected servo ID: {servo_id}"
assert error == 0, f"Servo returned error: {error}"
@pytest.mark.timeout(30)
def test_read_present_position(self, serial_conn):
serial_conn.reset_input_buffer()
serial_conn.write(sts_read(1, STS_PRESENT_POSITION, 2))
time.sleep(0.05)
resp = serial_conn.read(32)
parsed = parse_sts_response(resp)
assert parsed is not None, f"Could not parse response: {resp.hex()}"
servo_id, error, params = parsed
assert servo_id == 1
assert error == 0, f"Servo returned error: {error}"
assert len(params) >= 2, f"Expected 2 position bytes, got {len(params)}"
position = params[0] | (params[1] << 8)
assert 0 <= position <= 4095, f"Position out of range: {position}"
@pytest.mark.timeout(30)
def test_disable_torque(self, serial_conn):
serial_conn.reset_input_buffer()
serial_conn.write(sts_write_byte(1, STS_TORQUE_ENABLE, 0))
time.sleep(0.05)
resp = serial_conn.read(32)
parsed = parse_sts_response(resp)
assert parsed is not None, f"Could not parse response: {resp.hex()}"
servo_id, error, _ = parsed
assert servo_id == 1
assert error == 0, f"Servo returned error on torque disable: {error}"
@pytest.mark.timeout(30)
def test_read_all_servo_positions(self, serial_conn):
for sid in range(1, 6):
serial_conn.reset_input_buffer()
serial_conn.write(sts_read(sid, STS_PRESENT_POSITION, 2))
time.sleep(0.05)
resp = serial_conn.read(32)
parsed = parse_sts_response(resp)
assert parsed is not None, f"Servo {sid}: no response"
servo_id, error, params = parsed
assert servo_id == sid, f"Expected servo {sid}, got {servo_id}"
assert error == 0, f"Servo {sid} error: {error}"
assert len(params) >= 2, f"Servo {sid}: short position data"
position = params[0] | (params[1] << 8)
assert 0 <= position <= 4095, f"Servo {sid} position out of range: {position}"
@pytest.fixture(scope="module")
def can_conn():
if not CAN_SERVER_ID:
pytest.skip("XOQ_CAN_SERVER_ID not set")
import xoq_can
bus = xoq_can.Bus(channel=CAN_SERVER_ID, fd=True, timeout=10.0)
yield bus, xoq_can
bus.shutdown()
@pytest.mark.skipif(not CAN_SERVER_ID, reason="XOQ_CAN_SERVER_ID not set")
class TestCanIntegration:
@pytest.mark.timeout(30)
def test_disable_motor(self, can_conn):
bus, xoq_can = can_conn
msg = xoq_can.Message(
arbitration_id=0x01,
data=list(DAMIAO_DISABLE),
is_fd=False,
)
bus.send(msg)
resp = bus.recv(timeout=5.0)
if resp is not None:
assert len(resp.data) == 8, f"Expected 8-byte state, got {len(resp.data)}"
state = damiao_parse_state(bytes(resp.data))
assert state is not None, "Could not parse motor state"
@pytest.mark.timeout(30)
def test_enable_disable_roundtrip(self, can_conn):
bus, xoq_can = can_conn
enable_msg = xoq_can.Message(
arbitration_id=0x01,
data=list(DAMIAO_ENABLE),
)
bus.send(enable_msg)
resp = bus.recv(timeout=5.0)
if resp is not None:
assert len(resp.data) == 8
time.sleep(0.1)
disable_msg = xoq_can.Message(
arbitration_id=0x01,
data=list(DAMIAO_DISABLE),
)
bus.send(disable_msg)
resp = bus.recv(timeout=5.0)
if resp is not None:
assert len(resp.data) == 8
@pytest.mark.timeout(30)
def test_read_motor_state(self, can_conn):
bus, xoq_can = can_conn
msg = xoq_can.Message(
arbitration_id=0x01,
data=list(DAMIAO_DISABLE),
)
bus.send(msg)
resp = bus.recv(timeout=5.0)
if resp is not None:
state = damiao_parse_state(bytes(resp.data))
assert state is not None
q_raw, dq_raw, tau_raw, t_mos, t_rotor = state
assert 0 <= q_raw <= 65535
assert 0 <= t_mos <= 100, f"MOS temp out of range: {t_mos}"
@pytest.fixture(scope="module")
def camera_conn():
if not CAMERA_SERVER_ID:
pytest.skip("XOQ_CAMERA_SERVER_ID not set")
import xoq_cv2
cap = xoq_cv2.VideoCapture(CAMERA_SERVER_ID, "iroh")
yield cap
cap.release()
@pytest.mark.skipif(not CAMERA_SERVER_ID, reason="XOQ_CAMERA_SERVER_ID not set")
class TestCameraIntegration:
@pytest.mark.timeout(30)
def test_read_multiple_frames(self, camera_conn):
for i in range(5):
ret, frame = camera_conn.read()
assert ret is True, f"Frame {i}: read failed"
assert frame is not None, f"Frame {i}: frame is None"
assert len(frame.shape) == 3
assert frame.shape[2] == 3
@pytest.mark.timeout(30)
def test_frame_dimensions_consistent(self, camera_conn):
ret, first = camera_conn.read()
assert ret
h, w = first.shape[:2]
for _ in range(4):
ret, frame = camera_conn.read()
assert ret
assert frame.shape[0] == h
assert frame.shape[1] == w
@pytest.mark.timeout(30)
def test_frame_not_black(self, camera_conn):
import numpy as np
ret, frame = camera_conn.read()
assert ret
assert np.any(frame > 0), "Frame is completely black"