import gc
import sys
import numpy as np
import pytest
from edgefirst.schemas.builtin_interfaces import Time
from edgefirst.schemas.sensor_msgs import Image
from edgefirst.schemas.std_msgs import Header
def _make_image(payload: bytes, width: int = 64, height: int = 48) -> Image:
return Image(
header=Header(stamp=Time(1, 0), frame_id="t"),
height=height, width=width, encoding="rgb8",
is_bigendian=0, step=width * 3, data=payload,
)
class TestBorrowedBufBasics:
def test_len_matches_payload(self):
img = _make_image(b"\x00" * (64 * 48 * 3))
assert len(img.data) == 64 * 48 * 3
def test_tobytes_round_trips(self):
rng = np.random.default_rng(seed=99)
payload = rng.bytes(64 * 48 * 3)
img = _make_image(payload)
assert img.data.tobytes() == payload
def test_view_returns_bytes_like(self):
img = _make_image(b"\x00" * 100)
v = img.data.view()
assert isinstance(v, (memoryview, bytes))
assert len(v) == 100
def test_repr_includes_length(self):
img = _make_image(b"\x00" * 100)
r = repr(img.data)
assert "100" in r or str(len(img.data)) in r
class TestZeroCopyContract:
def test_buffer_protocol_aliases_parent(self):
try:
img = _make_image(b"\x00" * (640 * 480 * 3))
mv1 = memoryview(img.data)
mv2 = memoryview(img.data)
except TypeError:
pytest.skip("buffer protocol unavailable (abi3-py38 build)")
a1 = np.frombuffer(mv1, dtype=np.uint8)
a2 = np.frombuffer(mv2, dtype=np.uint8)
assert a1.ctypes.data == a2.ctypes.data
def test_view_aliases_payload_within_cdr(self):
rng = np.random.default_rng(seed=1)
payload = rng.bytes(16 * 16 * 3)
img = _make_image(payload, width=16, height=16)
cdr = img.to_bytes()
view_bytes = img.data.tobytes()
assert view_bytes == payload
assert view_bytes in cdr
class TestParentLifetimeManagement:
def test_view_keeps_parent_alive_via_tobytes(self):
def make_view():
img = _make_image(b"\x42" * 100)
v = img.data
assert v.tobytes()[:4] == b"\x42\x42\x42\x42"
return v
view = make_view()
gc.collect()
assert view.tobytes()[:4] == b"\x42\x42\x42\x42"
assert len(view) == 100
def test_buffer_protocol_anchors_parent_after_drop(self):
def make_mv():
local_img = _make_image(b"\x77" * 200)
try:
return memoryview(local_img.data)
except TypeError:
pytest.skip("buffer protocol unavailable (abi3-py38 build)")
mv = make_mv()
gc.collect()
gc.collect() assert bytes(mv[:4]) == b"\x77\x77\x77\x77"
assert len(mv) == 200
def test_parent_attribute_holds_image(self):
img = _make_image(b"\x00" * 30)
view = img.data
assert view.parent is not None
assert sys.getrefcount(view.parent) >= 2