import ctypes
import gc
from edgefirst.schemas.builtin_interfaces import Time
from edgefirst.schemas.sensor_msgs import Image, CompressedImage, Imu
from edgefirst.schemas.std_msgs import Header
from edgefirst.schemas.geometry_msgs import TwistStamped
def _make_image_cdr() -> bytes:
img = Image(
header=Header(stamp=Time(42, 100), frame_id="cam0"),
height=2,
width=2,
encoding="mono8",
is_bigendian=0,
step=2,
data=b"\x01\x02\x03\x04",
)
return img.to_bytes()
def _make_imu_cdr() -> bytes:
imu = Imu(header=Header(stamp=Time(1, 0), frame_id="imu"))
return imu.to_bytes()
def _make_twist_cdr() -> bytes:
tw = TwistStamped(header=Header(stamp=Time(5, 500), frame_id="base"))
return tw.to_bytes()
class TestFromCdrBytes:
def test_image_from_bytes(self):
cdr = _make_image_cdr()
img = Image.from_cdr(cdr)
assert img.height == 2
assert img.width == 2
assert img.encoding == "mono8"
assert img.data.tobytes() == b"\x01\x02\x03\x04"
def test_imu_from_bytes(self):
cdr = _make_imu_cdr()
imu = Imu.from_cdr(cdr)
assert imu.frame_id == "imu"
assert imu.stamp == Time(1, 0)
def test_twist_from_bytes(self):
cdr = _make_twist_cdr()
tw = TwistStamped.from_cdr(cdr)
assert tw.frame_id == "base"
def test_zero_copy_shares_buffer(self):
cdr = _make_image_cdr()
img = Image.from_cdr(cdr)
assert img.height == 2
PyBytes_AsString = ctypes.pythonapi.PyBytes_AsString
PyBytes_AsString.restype = ctypes.POINTER(ctypes.c_ubyte)
PyBytes_AsString.argtypes = [ctypes.py_object]
buf = PyBytes_AsString(cdr)
height_le = (2).to_bytes(4, "little")
raw = bytes(cdr)
offset = raw.find(height_le)
assert offset >= 0, "Could not locate height field in CDR"
buf[offset] = 99
buf[offset + 1] = 0
buf[offset + 2] = 0
buf[offset + 3] = 0
assert img.height == 99
def test_message_valid_after_source_unreferenced(self):
cdr = _make_image_cdr()
img = Image.from_cdr(cdr)
del cdr
gc.collect()
assert img.height == 2
assert img.width == 2
assert img.encoding == "mono8"
assert img.data.tobytes() == b"\x01\x02\x03\x04"
assert img.stamp == Time(42, 100)
def test_multiple_messages_from_same_bytes(self):
cdr = _make_image_cdr()
img1 = Image.from_cdr(cdr)
img2 = Image.from_cdr(cdr)
assert img1.height == img2.height
assert img1.data.tobytes() == img2.data.tobytes()
class TestFromCdrBytearray:
def test_image_from_bytearray(self):
cdr = bytearray(_make_image_cdr())
img = Image.from_cdr(cdr)
assert img.height == 2
assert img.encoding == "mono8"
def test_mutation_after_from_cdr_does_not_affect_message(self):
cdr = bytearray(_make_image_cdr())
img = Image.from_cdr(cdr)
cdr[:] = b"\x00" * len(cdr)
assert img.height == 2
assert img.width == 2
assert img.encoding == "mono8"
assert img.data.tobytes() == b"\x01\x02\x03\x04"
def test_imu_from_bytearray_isolated(self):
cdr = bytearray(_make_imu_cdr())
imu = Imu.from_cdr(cdr)
cdr[:] = b"\xff" * len(cdr)
assert imu.frame_id == "imu"
class TestFromCdrMemoryview:
def test_image_from_memoryview(self):
cdr = memoryview(_make_image_cdr())
img = Image.from_cdr(cdr)
assert img.height == 2
assert img.encoding == "mono8"
def test_readonly_memoryview_from_bytes(self):
cdr = _make_image_cdr()
mv = memoryview(cdr)
img = Image.from_cdr(mv)
assert img.height == 2
def test_mutable_memoryview_isolated(self):
source = bytearray(_make_image_cdr())
mv = memoryview(source)
img = Image.from_cdr(mv)
source[:] = b"\x00" * len(source)
assert img.height == 2
assert img.data.tobytes() == b"\x01\x02\x03\x04"
class TestCdrRoundTrip:
def test_bytes_round_trip(self):
cdr = _make_image_cdr()
img = Image.from_cdr(cdr)
assert img.to_bytes() == cdr
def test_bytearray_round_trip(self):
cdr_bytes = _make_image_cdr()
cdr_ba = bytearray(cdr_bytes)
img = Image.from_cdr(cdr_ba)
assert img.to_bytes() == cdr_bytes
def test_compressed_image_round_trip(self):
ci = CompressedImage(
header=Header(stamp=Time(1, 0), frame_id="c"),
format="jpeg",
data=b"\xff\xd8\xff\xe0",
)
cdr = ci.to_bytes()
ci2 = CompressedImage.from_cdr(cdr)
assert ci2.format == "jpeg"
assert ci2.data.tobytes() == b"\xff\xd8\xff\xe0"
assert ci2.to_bytes() == cdr