import time
import pytest
from osdp import *
from conftest import make_fifo_pair, cleanup_fifo_pair
pd_cap = PDCapabilities([
(Capability.OutputControl, 1, 1),
(Capability.LEDControl, 1, 1),
(Capability.AudibleControl, 1, 1),
(Capability.TextOutput, 1, 1),
])
key = KeyStore.gen_key()
f1, f2 = make_fifo_pair("status")
pd_addr = 101
pd_info_list = [PDInfo(pd_addr, f2, scbk=key, flags=[LibFlag.EnforceSecure, LibFlag.EnableNotification])]
pd = PeripheralDevice(
PDInfo(pd_addr, f1, scbk=key, flags=[LibFlag.EnforceSecure]),
pd_cap,
log_level=LogLevel.Debug
)
cp = ControlPanel(pd_info_list, log_level=LogLevel.Debug)
@pytest.fixture(scope='module', autouse=True)
def setup_test():
pd.start()
cp.start()
if not cp.sc_wait_all(timeout=10):
teardown_test()
pytest.fail("Failed to establish secure channel within timeout")
yield
teardown_test()
def teardown_test():
try:
if cp.thread:
cp.teardown()
except RuntimeError:
pass try:
if pd.thread:
pd.teardown()
except RuntimeError:
pass cleanup_fifo_pair("status")
def test_cp_online_status():
assert cp.is_online(pd_addr), "PD should be online"
assert cp.online_wait(pd_addr, timeout=1), "online_wait should return True for already online PD"
online_mask = cp.status()
assert online_mask & (1 << 0), "PD should be set in online bitmask"
def test_cp_sc_status():
assert cp.is_sc_active(pd_addr), "Secure channel should be active"
assert cp.sc_wait(pd_addr, timeout=1), "sc_wait should return True for already active SC"
sc_mask = cp.sc_status()
assert sc_mask & (1 << 0), "PD should be set in SC bitmask"
def test_pd_online_status():
assert pd.is_online(), "PD should be online"
def test_pd_sc_status():
assert pd.is_sc_active(), "Secure channel should be active"
def test_status_bitmasks():
online_mask = cp.status()
assert online_mask & (1 << 0), "PD should be set in online bitmask"
assert cp.get_num_online() == 1, "Should have 1 PD online"
sc_mask = cp.sc_status()
assert sc_mask & (1 << 0), "PD should be set in SC bitmask"
assert cp.get_num_sc_active() == 1, "Should have 1 PD with active SC"
def test_status_wait_methods():
assert cp.online_wait_all(timeout=1), "online_wait_all should return True for already online PDs"
assert cp.sc_wait_all(timeout=1), "sc_wait_all should return True for already active SCs"
assert cp.online_wait(pd_addr, timeout=1), "online_wait should return True for already online PD"
assert cp.sc_wait(pd_addr, timeout=1), "sc_wait should return True for already active SC"