from __future__ import annotations
import threading
import time
import pytest
import zerodds
pytestmark = pytest.mark.skipif(
not getattr(zerodds, "_CORE_AVAILABLE", False),
reason="zerodds._core not compiled — maturin develop needed",
)
def test_guard_condition_default_trigger_is_false():
gc = zerodds.GuardCondition()
assert gc.get_trigger_value() is False
def test_guard_condition_set_trigger_roundtrip():
gc = zerodds.GuardCondition()
gc.set_trigger_value(True)
assert gc.get_trigger_value() is True
gc.set_trigger_value(False)
assert gc.get_trigger_value() is False
def test_guard_condition_via_outer_namespace():
assert hasattr(zerodds, "GuardCondition")
assert zerodds.GuardCondition is not None
gc = zerodds.GuardCondition()
assert gc.get_trigger_value() is False
def test_waitset_attach_and_wait_raises_timeout():
gc = zerodds.GuardCondition()
ws = zerodds.WaitSet()
ws.attach_guard_condition(gc)
with pytest.raises(TimeoutError):
ws.wait(0.05)
def test_waitset_wakes_when_guard_condition_fires():
gc = zerodds.GuardCondition()
ws = zerodds.WaitSet()
ws.attach_guard_condition(gc)
def fire_after_delay() -> None:
time.sleep(0.05)
gc.set_trigger_value(True)
threading.Thread(target=fire_after_delay, daemon=True).start()
triggered = ws.wait(2.0)
assert triggered >= 1
assert gc.get_trigger_value() is True
def test_waitset_via_outer_namespace():
assert hasattr(zerodds, "WaitSet")
assert zerodds.WaitSet is not None
ws = zerodds.WaitSet()
with pytest.raises((TimeoutError, RuntimeError)):
ws.wait(0.01)