from __future__ import annotations
import os
import shutil
import subprocess
import sys
import time
import pytest
import zerodds
DDS_PERF = shutil.which("ddsperf")
pytestmark = [
pytest.mark.skipif(
not getattr(zerodds, "_CORE_AVAILABLE", False),
reason="zerodds._core not compiled — maturin develop needed",
),
pytest.mark.skipif(
DDS_PERF is None,
reason="ddsperf not on PATH — install Cyclone DDS for the cross-vendor test",
),
pytest.mark.skipif(
sys.platform == "win32",
reason="cross-vendor test needs Linux/macOS loopback multicast",
),
]
def _shape_type_name() -> str:
factory = zerodds.DomainParticipantFactory.instance()
p = factory.create_participant_offline(90)
topic = p.create_shape_topic("CrossVendorProbe")
return topic.type_name
def test_shape_type_name_matches_shapesdemo_convention():
assert _shape_type_name() == "ShapeType"
def test_cross_vendor_participant_discovery_against_ddsperf():
domain = 91
factory = zerodds.DomainParticipantFactory.instance()
p = factory.create_participant_fast(domain)
env = {**os.environ, "CYCLONEDDS_URI": ""}
publisher = subprocess.Popen(
[DDS_PERF, "-D", "20", "-i", str(domain), "pub", "1Hz"],
env=env,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
)
try:
deadline = time.time() + 10.0
discovered = 0
while time.time() < deadline:
discovered = p.discovered_participants_count()
if discovered >= 1:
break
time.sleep(0.2)
assert discovered >= 1, (
f"Cyclone ddsperf participant on domain {domain} not discovered via "
f"SPDP (discovered_participants_count={discovered})"
)
finally:
publisher.terminate()
try:
publisher.wait(timeout=5)
except subprocess.TimeoutExpired:
publisher.kill()
time.sleep(0.1)