import time
import random
import pytest
from osdp import *
from conftest import make_fifo_pair, cleanup_fifo_pair
sender_data = [ random.randint(0, 255) for _ in range(4096) ]
def sender_open(file_id: int, file_size: int) -> int:
assert file_id == 13
assert file_size == 0 return 4096
def sender_read(size: int, offset: int) -> bytes:
assert offset < 4096
if offset + size > 4096:
size = 4096 - offset
return bytes(sender_data[offset:offset+size])
def sender_write(data: bytes, offset: int) -> int:
assert False
def sender_close(file_id: int):
assert file_id == 13
sender_fops = {
'open': sender_open,
'read': sender_read,
'write': sender_write,
'close': sender_close
}
receiver_data = [0] * 4096
def receiver_open(file_id: int, file_size: int) -> int:
assert file_id == 13
assert file_size == 4096
return 0
def receiver_read(size: int, offset: int) -> bytes:
assert False
def receiver_write(data: bytes, offset: int) -> int:
global receiver_data
assert offset + len(data) <= 4096
receiver_data[offset:offset + len(data)] = list(data)
return len(data)
def receiver_close(file_id: int):
assert file_id == 13
receiver_fops = {
'open': receiver_open,
'read': receiver_read,
'write': receiver_write,
'close': receiver_close
}
pd_cap = PDCapabilities([])
f1, f2 = make_fifo_pair("file")
key = KeyStore.gen_key()
pd = PeripheralDevice(
PDInfo(101, f1, scbk=key, flags=[ LibFlag.EnforceSecure ]),
pd_cap,
log_level=LogLevel.Debug
)
cp = ControlPanel([
PDInfo(101, f2, scbk=key, flags=[ LibFlag.EnforceSecure ]),
],
log_level=LogLevel.Debug
)
@pytest.fixture(scope='module', autouse=True)
def setup_test():
pd.start()
cp.start()
cp.sc_wait_all()
yield
teardown_test()
def teardown_test():
cp.teardown()
pd.teardown()
cleanup_fifo_pair("file")
def test_file_transfer(utils):
assert cp.register_file_ops(101, sender_fops)
assert pd.register_file_ops(receiver_fops)
file_tx_cmd = {
'command': Command.FileTransfer,
'id': 13,
'flags': 0
}
assert cp.submit_command(101, file_tx_cmd)
assert pd.get_command() == file_tx_cmd
file_tx_status = False
tries = 0
while tries < 10:
time.sleep(0.5)
status = cp.get_file_tx_status(101)
if not status or 'size' not in status or 'offset' not in status:
break
if status['size'] <= 0:
break
if status['size'] == status['offset']:
file_tx_status = True
break
tries += 1
assert file_tx_status
assert sender_data == receiver_data
def test_file_tx_abort(utils):
assert cp.register_file_ops(101, sender_fops)
assert pd.register_file_ops(receiver_fops)
file_tx_cmd = {
'command': Command.FileTransfer,
'id': 13,
'flags': 0
}
assert cp.submit_command(101, file_tx_cmd)
assert pd.get_command() == file_tx_cmd
time.sleep(0.5)
file_tx_abort = {
'command': Command.FileTransfer,
'id': 13,
'flags': CommandFileTxFlags.Cancel
}
assert cp.submit_command(101, file_tx_abort)
time.sleep(0.2)
assert cp.get_file_tx_status(101) == None
assert pd.get_file_tx_status() == None