import os
import subprocess
import sys
import threading
import time
import pytest
import wingfoil as wf
pytestmark = pytest.mark.requires_iceoryx2
def _unique_service_name(prefix: str) -> str:
return f"wingfoil/python/test/{prefix}/{os.getpid()}/{time.time_ns()}"
@pytest.mark.parametrize("mode_name", ["spin", "threaded", "signaled"])
def test_iceoryx2_local_pubsub_bytes(mode_name):
mode = getattr(wf.Iceoryx2Mode, mode_name.capitalize())
service_name = _unique_service_name(f"local/{mode_name}")
sub = wf.iceoryx2_sub(
service_name,
variant=wf.Iceoryx2ServiceVariant.Local,
mode=mode,
history_size=5,
)
collected = sub.collect()
pub = (
wf.ticker(0.02)
.count()
.map(lambda _: b"hello")
.iceoryx2_pub(service_name, variant=wf.Iceoryx2ServiceVariant.Local)
)
graph = wf.Graph([pub, collected])
graph.run(realtime=True, duration=0.4)
ticks = collected.peek_value()
assert ticks, "expected to receive at least one tick"
values = [item for tick in ticks for item in tick]
assert values, "expected to receive at least one message"
assert all(v == b"hello" for v in values)
def test_iceoryx2_local_slice_large_payload():
service_name = _unique_service_name("local/large")
payload = b"x" * (64 * 1024)
sub = wf.iceoryx2_sub(
service_name,
variant=wf.Iceoryx2ServiceVariant.Local,
mode=wf.Iceoryx2Mode.Signaled,
history_size=5,
)
collected = sub.collect()
pub = (
wf.ticker(0.05)
.count()
.map(lambda _: payload)
.iceoryx2_pub(service_name, variant=wf.Iceoryx2ServiceVariant.Local)
)
graph = wf.Graph([pub, collected])
graph.run(realtime=True, duration=0.4)
ticks = collected.peek_value()
assert ticks, "expected to receive at least one tick"
values = [item for tick in ticks for item in tick]
assert values, "expected to receive at least one message"
assert payload in values
@pytest.mark.skipif(
os.getenv("WINGFOIL_ICEORYX2_IPC_TESTS") != "1",
reason="Set WINGFOIL_ICEORYX2_IPC_TESTS=1 to enable IPC (shared memory) tests.",
)
def test_iceoryx2_ipc_pubsub_subprocess():
if not os.path.isdir("/dev/shm") or not os.access("/dev/shm", os.W_OK):
pytest.skip("IPC tests require a writable /dev/shm (or equivalent shared memory setup).")
service_name = _unique_service_name("ipc/subprocess")
sub = wf.iceoryx2_sub(
service_name,
variant=wf.Iceoryx2ServiceVariant.Ipc,
mode=wf.Iceoryx2Mode.Spin,
history_size=10,
)
collected = sub.collect()
def run_subscriber():
wf.Graph([collected]).run(realtime=True, duration=1.0)
t = threading.Thread(target=run_subscriber, daemon=True)
t.start()
time.sleep(0.15)
code = f"""
import wingfoil as wf
service_name = {service_name!r}
pub = (
wf.ticker(0.02)
.count()
.map(lambda _: b"hello-ipc")
.iceoryx2_pub(service_name, variant=wf.Iceoryx2ServiceVariant.Ipc, history_size=10)
)
wf.Graph([pub]).run(realtime=True, duration=0.4)
"""
subprocess.run([sys.executable, "-c", code], check=True)
t.join(timeout=2.0)
ticks = collected.peek_value()
values = [item for tick in ticks for item in tick]
assert values, "expected to receive at least one IPC message"
assert b"hello-ipc" in values