from __future__ import annotations
import asyncio
import pytest
import zerodds
from zerodds import aio as zaio
pytestmark = pytest.mark.skipif(
not getattr(zerodds, "_CORE_AVAILABLE", False),
reason="zerodds._core not compiled — maturin develop needed",
)
def _make_writer_reader(domain: int = 250):
p = zerodds.DomainParticipantFactory.instance().create_participant_offline(domain)
topic = p.create_bytes_topic("AsyncChatter")
writer = p.create_publisher().create_bytes_writer(topic)
reader = p.create_subscriber().create_bytes_reader(topic)
return writer, reader
def test_async_wrappers_are_constructible():
writer, reader = _make_writer_reader(251)
async_w = zaio.AsyncBytesWriter(writer)
async_r = zaio.AsyncBytesReader(reader)
assert async_w is not None
assert async_r is not None
def test_async_passthrough_status_getters():
writer, reader = _make_writer_reader(252)
async_w = zaio.AsyncBytesWriter(writer)
async_r = zaio.AsyncBytesReader(reader)
assert async_w.matched_subscription_count() == 0
assert async_r.matched_publication_count() == 0
assert async_w.publication_matched_status() == (0, 0, 0, 0, 0)
assert async_r.subscription_matched_status() == (0, 0, 0, 0, 0)
def test_async_take_returns_list_passthrough():
_w, reader = _make_writer_reader(253)
async_r = zaio.AsyncBytesReader(reader)
assert async_r.take() == []
def test_async_wait_for_data_uses_event_loop():
_w, reader = _make_writer_reader(254)
async_r = zaio.AsyncBytesReader(reader)
completed_in_parallel: list[bool] = []
async def main() -> None:
async def parallel_task() -> None:
await asyncio.sleep(0.05)
completed_in_parallel.append(True)
await asyncio.gather(
async_r.wait_for_data(0.3),
parallel_task(),
return_exceptions=True,
)
asyncio.run(main())
assert completed_in_parallel == [True], "parallel task did not run — event loop blocked?"
def test_async_waitset_wait_raises_timeout():
gc = zerodds.GuardCondition()
ws = zerodds.WaitSet()
ws.attach_guard_condition(gc)
async_ws = zaio.AsyncWaitSet(ws)
async def main() -> None:
await async_ws.wait(0.05)
with pytest.raises(TimeoutError):
asyncio.run(main())