import os
import socket
import pytest
from osdp import *
import os
import abc
import fcntl
import errno
class FIFOChannel(Channel):
def __init__(self, fifo_read_path: str, fifo_write_path: str) -> None:
super().__init__()
self.fifo_read_path = fifo_read_path
self.fifo_write_path = fifo_write_path
self.fifo_read_fd = -1
self.fifo_write_fd = -1
if not os.path.exists(self.fifo_read_path):
os.mkfifo(self.fifo_read_path)
self.fifo_read_fd = os.open(self.fifo_read_path, os.O_RDONLY | os.O_NONBLOCK)
def _open_fifo(self) -> None:
import time
if not os.path.exists(self.fifo_read_path):
os.mkfifo(self.fifo_read_path)
if not os.path.exists(self.fifo_write_path):
os.mkfifo(self.fifo_write_path)
if self.fifo_read_fd < 0:
self.fifo_read_fd = os.open(self.fifo_read_path, os.O_RDONLY | os.O_NONBLOCK)
if self.fifo_write_fd < 0:
for _ in range(100): try:
self.fifo_write_fd = os.open(self.fifo_write_path, os.O_WRONLY | os.O_NONBLOCK)
flags = fcntl.fcntl(self.fifo_write_fd, fcntl.F_GETFL)
fcntl.fcntl(self.fifo_write_fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
break
except OSError as e:
if e.errno == errno.ENXIO: time.sleep(0.1)
continue
raise
else:
raise TimeoutError(f"Failed to open write FIFO {self.fifo_write_path} - no reader")
def read(self, max_bytes: int) -> bytes:
if self.fifo_read_fd < 0:
self._open_fifo()
try:
return os.read(self.fifo_read_fd, max_bytes)
except OSError as e:
if e.errno == errno.EAGAIN or e.errno == errno.EWOULDBLOCK:
return b'' raise
def write(self, buf: bytes) -> int:
if self.fifo_write_fd < 0:
self._open_fifo()
return os.write(self.fifo_write_fd, buf)
def flush(self) -> None:
if self.fifo_read_fd >= 0:
try:
while True:
data = os.read(self.fifo_read_fd, 1024)
if not data:
break
except OSError as e:
if e.errno != errno.EAGAIN and e.errno != errno.EWOULDBLOCK:
raise
def close(self) -> None:
if self.fifo_read_fd >= 0:
os.close(self.fifo_read_fd)
self.fifo_read_fd = -1
if self.fifo_write_fd >= 0:
os.close(self.fifo_write_fd)
self.fifo_write_fd = -1
if os.path.exists(self.fifo_read_path):
os.remove(self.fifo_read_path)
if os.path.exists(self.fifo_write_path):
os.remove(self.fifo_write_path)
def assert_command_received(pd, expected_cmd, timeout=2):
cmd = pd.get_command(timeout=timeout)
if cmd is None:
pytest.fail(f"Timeout waiting for command after {timeout}s")
assert cmd == expected_cmd
return cmd
def assert_event_received(cp, pd_address, expected_event, timeout=2):
event = cp.get_event(pd_address, timeout=timeout)
if event is None:
pytest.fail(f"Timeout waiting for event after {timeout}s")
assert event == expected_event
return event
def wait_for_notification_event(cp, pd_address, expected_event, timeout=5):
timeout_count = 0
max_attempts = int(timeout * 2)
while timeout_count < max_attempts:
e = cp.get_event(pd_address, timeout=0.5)
if e and e['event'] == Event.Notification and e['type'] == expected_event['type']:
if e == expected_event:
return e
timeout_count += 1
pytest.fail(f"Timeout waiting for notification event after {timeout}s")
def wait_for_non_notification_event(cp, pd_address, expected_event, timeout=5):
timeout_count = 0
max_attempts = int(timeout * 2)
while timeout_count < max_attempts:
e = cp.get_event(pd_address, timeout=0.5)
if e and e['event'] != Event.Notification:
if e == expected_event:
return e
timeout_count += 1
pytest.fail(f"Timeout waiting for non-notification event after {timeout}s")
class TestUtils:
__test__ = False
def __init__(self):
self.ks = KeyStore()
def create_cp(self, pd_info_list, sc_wait=False, online_wait=False):
cp = ControlPanel(pd_info_list, log_level=LogLevel.Debug)
cp.start()
if sc_wait:
if not cp.sc_wait_all(timeout=10):
cp.teardown()
pytest.fail("Failed to establish secure channel within timeout")
elif online_wait:
if not cp.online_wait_all(timeout=10):
cp.teardown()
pytest.fail("Failed to bring all PDs online within timeout")
return cp
def create_pd(self, pd_info, capabilities=None):
if capabilities is None:
capabilities = PDCapabilities()
pd = PeripheralDevice(pd_info, capabilities, log_level=LogLevel.Debug)
pd.start()
return pd
_FIFO_REGISTRY = {}
def make_fifo_pair(name):
one = f'/tmp/fifo-{name}.one'
two = f'/tmp/fifo-{name}.two'
pair = (FIFOChannel(one, two), FIFOChannel(two, one))
_FIFO_REGISTRY[name] = pair
return pair
def cleanup_fifo_pair(name):
pair = _FIFO_REGISTRY.pop(name, None)
if pair:
for chan in pair:
close_fn = getattr(chan, "close", None)
if callable(close_fn):
close_fn()
one = f'/tmp/fifo-{name}.one'
two = f'/tmp/fifo-{name}.two'
if os.path.exists(one):
os.remove(one)
if os.path.exists(two):
os.remove(two)
@pytest.fixture(scope='session')
def utils():
return TestUtils()