import pytest
from edgefirst.schemas import (
from_schema,
is_supported,
list_schemas,
schema_name,
)
from edgefirst.schemas import (
edgefirst_msgs,
foxglove_msgs,
geometry_msgs,
sensor_msgs,
std_msgs,
)
class TestFromSchema:
def test_sensor_msgs_image(self):
cls = from_schema("sensor_msgs/msg/Image")
assert cls is sensor_msgs.Image
def test_geometry_msgs_pose(self):
cls = from_schema("geometry_msgs/msg/Pose")
assert cls is geometry_msgs.Pose
def test_edgefirst_msgs_box(self):
cls = from_schema("edgefirst_msgs/msg/Box")
assert cls is edgefirst_msgs.Box
def test_foxglove_msgs_compressed_video(self):
cls = from_schema("foxglove_msgs/msg/CompressedVideo")
assert cls is foxglove_msgs.CompressedVideo
def test_std_msgs_header(self):
cls = from_schema("std_msgs/msg/Header")
assert cls is std_msgs.Header
def test_short_format(self):
cls = from_schema("sensor_msgs/Image")
assert cls is sensor_msgs.Image
def test_invalid_format_raises(self):
with pytest.raises(ValueError, match="Invalid schema format"):
from_schema("invalid")
def test_wrong_msg_marker_raises(self):
with pytest.raises(ValueError, match="expected 'msg'"):
from_schema("sensor_msgs/srv/Image")
def test_unknown_package_raises(self):
with pytest.raises(KeyError, match="Unknown package"):
from_schema("unknown_msgs/msg/Foo")
def test_unknown_type_raises(self):
with pytest.raises(KeyError, match="Unknown type"):
from_schema("sensor_msgs/msg/NonExistent")
def test_can_instantiate_class(self):
cls = from_schema("sensor_msgs/msg/Image")
instance = cls()
assert isinstance(instance, sensor_msgs.Image)
class TestSchemaName:
def test_sensor_msgs_image_class(self):
name = schema_name(sensor_msgs.Image)
assert name == "sensor_msgs/msg/Image"
def test_geometry_msgs_pose_class(self):
name = schema_name(geometry_msgs.Pose)
assert name == "geometry_msgs/msg/Pose"
def test_edgefirst_msgs_box_class(self):
name = schema_name(edgefirst_msgs.Box)
assert name == "edgefirst_msgs/msg/Box"
def test_from_instance(self):
img = sensor_msgs.Image()
name = schema_name(img)
assert name == "sensor_msgs/msg/Image"
def test_invalid_type_raises(self):
with pytest.raises(ValueError, match="does not have a typename"):
schema_name(str)
def test_roundtrip(self):
original = "sensor_msgs/msg/CameraInfo"
cls = from_schema(original)
name = schema_name(cls)
assert name == original
class TestListSchemas:
def test_returns_list(self):
schemas = list_schemas()
assert isinstance(schemas, list)
def test_contains_common_schemas(self):
schemas = list_schemas()
assert "sensor_msgs/msg/Image" in schemas
assert "geometry_msgs/msg/Pose" in schemas
assert "edgefirst_msgs/msg/Box" in schemas
def test_filter_by_package(self):
sensor_schemas = list_schemas("sensor_msgs")
assert all(s.startswith("sensor_msgs/") for s in sensor_schemas)
assert "sensor_msgs/msg/Image" in sensor_schemas
def test_unknown_package_returns_empty(self):
schemas = list_schemas("unknown_pkg")
assert schemas == []
def test_sorted(self):
schemas = list_schemas()
assert schemas == sorted(schemas)
class TestIsSupported:
def test_supported_schema(self):
assert is_supported("sensor_msgs/msg/Image")
assert is_supported("geometry_msgs/msg/Pose")
assert is_supported("edgefirst_msgs/msg/Box")
def test_unsupported_schema(self):
assert not is_supported("unknown_msgs/msg/Foo")
assert not is_supported("sensor_msgs/msg/NonExistent")
def test_invalid_format(self):
assert not is_supported("invalid")
class TestCrossLanguageCompatibility:
COMMON_SCHEMAS = [
"sensor_msgs/msg/CameraInfo",
"sensor_msgs/msg/CompressedImage",
"sensor_msgs/msg/Image",
"sensor_msgs/msg/Imu",
"sensor_msgs/msg/NavSatFix",
"sensor_msgs/msg/PointCloud2",
"geometry_msgs/msg/Pose",
"geometry_msgs/msg/Transform",
"geometry_msgs/msg/TransformStamped",
"geometry_msgs/msg/Twist",
"geometry_msgs/msg/Vector3",
"geometry_msgs/msg/Quaternion",
"foxglove_msgs/msg/CompressedVideo",
"edgefirst_msgs/msg/Box",
"edgefirst_msgs/msg/Detect",
"edgefirst_msgs/msg/DmaBuffer",
"edgefirst_msgs/msg/Mask",
"edgefirst_msgs/msg/ModelInfo",
"edgefirst_msgs/msg/RadarCube",
"edgefirst_msgs/msg/RadarInfo",
"edgefirst_msgs/msg/Track",
]
@pytest.mark.parametrize("schema", COMMON_SCHEMAS)
def test_schema_supported(self, schema: str):
assert is_supported(schema), f"Schema {schema} should be supported"
@pytest.mark.parametrize("schema", COMMON_SCHEMAS)
def test_schema_roundtrip(self, schema: str):
cls = from_schema(schema)
name = schema_name(cls)
assert name == schema, f"Roundtrip failed for {schema}: got {name}"