import time
import pytest
from osdp import *
from conftest import make_fifo_pair, cleanup_fifo_pair, assert_command_received, wait_for_notification_event, wait_for_non_notification_event
secure_pd_addr = 101
insecure_pd_addr = 102
f1_1, f1_2 = make_fifo_pair("secure_pd")
f2_1, f2_2 = make_fifo_pair("insecure_pd")
key = KeyStore.gen_key()
pd_cap = PDCapabilities([
(Capability.OutputControl, 1, 1),
(Capability.LEDControl, 1, 1),
(Capability.AudibleControl, 1, 1),
(Capability.TextOutput, 1, 1),
])
pd_info_list = [
PDInfo(secure_pd_addr, f1_1, scbk=key, flags=[ LibFlag.EnforceSecure, LibFlag.EnableNotification ]),
PDInfo(insecure_pd_addr, f2_1, flags=[ LibFlag.EnableNotification ])
]
secure_pd = PeripheralDevice(
PDInfo(secure_pd_addr, f1_2, scbk=key, flags=[ LibFlag.EnforceSecure ]),
pd_cap,
log_level=LogLevel.Debug
)
insecure_pd = PeripheralDevice(
PDInfo(insecure_pd_addr, f2_2),
pd_cap,
log_level=LogLevel.Debug
)
pd_list = [
secure_pd,
insecure_pd,
]
cp = ControlPanel(pd_info_list, log_level=LogLevel.Debug)
@pytest.fixture(scope='module', autouse=True)
def setup_test():
for pd in pd_list:
pd.start()
cp.start()
if not cp.online_wait_all(timeout=10):
teardown_test()
pytest.fail("Failed to bring all PDs online within timeout")
yield
teardown_test()
def teardown_test():
cp.teardown()
for pd in pd_list:
pd.teardown()
cleanup_fifo_pair("secure_pd")
cleanup_fifo_pair("insecure_pd")
def test_hotplug_enable_already_enabled():
assert cp.is_online(secure_pd_addr)
assert cp.is_pd_enabled(secure_pd_addr) == True
result = cp.enable_pd(secure_pd_addr)
assert result == False, "enable_pd should return False when PD is already enabled"
def test_hotplug_disable_functionality():
assert cp.is_online(secure_pd_addr)
assert cp.is_pd_enabled(secure_pd_addr) == True
result = cp.disable_pd(secure_pd_addr)
assert result == True, "disable_pd should return True on success"
time.sleep(0.5)
assert cp.is_pd_enabled(secure_pd_addr) == False, "PD should be disabled after disable_pd"
def test_hotplug_enable_after_disable():
cp.disable_pd(secure_pd_addr)
time.sleep(0.5)
assert cp.is_pd_enabled(secure_pd_addr) == False
result = cp.enable_pd(secure_pd_addr)
assert result == True, "enable_pd should return True on success"
time.sleep(1)
assert cp.is_pd_enabled(secure_pd_addr) == True, "PD should be enabled after enable_pd"
def test_hotplug_command_blocking():
assert cp.is_pd_enabled(secure_pd_addr) == True
cmd = {
'command': Command.Buzzer,
'reader': 0,
'control_code': 1,
'on_count': 1,
'off_count': 1,
'rep_count': 1
}
initial_result = cp.submit_command(secure_pd_addr, cmd)
assert initial_result == True, "Commands should be accepted for enabled PDs"
cp.disable_pd(secure_pd_addr)
time.sleep(0.5)
assert cp.is_pd_enabled(secure_pd_addr) == False
disabled_result = cp.submit_command(secure_pd_addr, cmd)
assert disabled_result == False, "Commands should be blocked for disabled PDs"