#include <fcntl.h>
#include <osdp.h>
#include "test.h"
#define PD_STATE_DISABLED false
#define PD_STATE_ENABLED true
struct test_hotplug_ctx {
osdp_t *cp_ctx;
osdp_t *pd_ctx;
int cp_runner;
int pd_runner;
bool cmd_seen;
int last_cmd_id;
bool event_seen;
int last_event_type;
};
static struct test_hotplug_ctx g_test_ctx = {0};
int test_hotplug_event_callback(void *arg, int pd, struct osdp_event *ev)
{
ARG_UNUSED(pd);
struct test_hotplug_ctx *ctx = arg;
ctx->event_seen = true;
ctx->last_event_type = ev->type;
return 0;
}
int test_hotplug_command_callback(void *arg, struct osdp_cmd *cmd)
{
struct test_hotplug_ctx *ctx = arg;
ctx->cmd_seen = true;
ctx->last_cmd_id = cmd->id;
return 0;
}
static int setup_test_environment(struct test *t)
{
printf(SUB_1 "setting up OSDP devices\n");
if (test_setup_devices(t, &g_test_ctx.cp_ctx, &g_test_ctx.pd_ctx)) {
printf(SUB_1 "Failed to setup devices!\n");
return -1;
}
osdp_cp_set_event_callback(g_test_ctx.cp_ctx, test_hotplug_event_callback, &g_test_ctx);
osdp_pd_set_command_callback(g_test_ctx.pd_ctx, test_hotplug_command_callback, &g_test_ctx);
printf(SUB_1 "starting async runners\n");
g_test_ctx.cp_runner = async_runner_start(g_test_ctx.cp_ctx, osdp_cp_refresh);
g_test_ctx.pd_runner = async_runner_start(g_test_ctx.pd_ctx, osdp_pd_refresh);
if (g_test_ctx.cp_runner < 0 || g_test_ctx.pd_runner < 0) {
printf(SUB_1 "Failed to created CP/PD runners\n");
return -1;
}
int rc = 0;
uint8_t status = 0;
while (1) {
if (rc > 10) {
printf(SUB_1 "PD failed to come online\n");
return -1;
}
osdp_get_status_mask(g_test_ctx.cp_ctx, &status);
if (status & 1)
break;
usleep(1000 * 1000);
rc++;
}
return 0;
}
static void teardown_test_environment()
{
printf(SUB_1 "tearing down test environment\n");
async_runner_stop(g_test_ctx.cp_runner);
async_runner_stop(g_test_ctx.pd_runner);
osdp_cp_teardown(g_test_ctx.cp_ctx);
osdp_pd_teardown(g_test_ctx.pd_ctx);
memset(&g_test_ctx, 0, sizeof(g_test_ctx));
}
static void reset_test_state()
{
g_test_ctx.cmd_seen = false;
g_test_ctx.last_cmd_id = 0;
g_test_ctx.event_seen = false;
g_test_ctx.last_event_type = 0;
}
static bool wait_for_command(int expected_cmd_id, int timeout_sec)
{
int rc = 0;
while (rc < timeout_sec) {
if (g_test_ctx.cmd_seen && g_test_ctx.last_cmd_id == expected_cmd_id) {
return true;
}
usleep(1000 * 1000);
rc++;
}
return false;
}
static bool wait_for_pd_state(bool expected_state, int timeout_sec)
{
int rc = 0;
while (rc < timeout_sec) {
bool current_state = osdp_cp_is_pd_enabled(g_test_ctx.cp_ctx, 0);
if (current_state == expected_state) {
return true;
}
usleep(100 * 1000);
rc++;
}
return false;
}
static bool wait_for_pd_online(int timeout_sec)
{
int rc = 0;
while (rc < timeout_sec) {
bool enabled = osdp_cp_is_pd_enabled(g_test_ctx.cp_ctx, 0);
uint8_t status_mask = 0;
osdp_get_status_mask(g_test_ctx.cp_ctx, &status_mask);
bool online = (status_mask & 0x01) != 0;
if (enabled && online) {
return true;
}
usleep(100 * 1000);
rc++;
}
return false;
}
static bool test_pd_disable_enable_basic()
{
printf(SUB_2 "testing basic PD disable/enable\n");
reset_test_state();
bool enabled = osdp_cp_is_pd_enabled(g_test_ctx.cp_ctx, 0);
if (enabled != PD_STATE_ENABLED) {
printf(SUB_2 "PD should be enabled initially, got %s\n", enabled ? "true" : "false");
return false;
}
uint8_t status_mask = 0;
osdp_get_status_mask(g_test_ctx.cp_ctx, &status_mask);
printf(SUB_2 "Initial status mask: 0x%02X (PD may still be initializing)\n", status_mask);
if (osdp_cp_disable_pd(g_test_ctx.cp_ctx, 0) != 0) {
printf(SUB_2 "Failed to disable PD\n");
return false;
}
if (!wait_for_pd_state(PD_STATE_DISABLED, 3)) {
printf(SUB_2 "PD didn't reach disabled state within timeout\n");
return false;
}
osdp_get_status_mask(g_test_ctx.cp_ctx, &status_mask);
if (status_mask & 0x01) {
printf(SUB_2 "Disabled PD should appear offline in status mask\n");
return false;
}
if (osdp_cp_enable_pd(g_test_ctx.cp_ctx, 0) != 0) {
printf(SUB_2 "Failed to enable PD\n");
return false;
}
if (!wait_for_pd_state(PD_STATE_ENABLED, 3)) {
printf(SUB_2 "PD didn't reach enabled state within timeout\n");
return false;
}
return true;
}
static bool test_pd_command_blocking()
{
printf(SUB_2 "testing command blocking on disabled PD\n");
reset_test_state();
struct osdp_cmd cmd = {
.id = OSDP_CMD_BUZZER,
.buzzer = {
.control_code = 1,
.on_count = 10,
.off_count = 10,
.rep_count = 1,
},
};
int ret = osdp_cp_submit_command(g_test_ctx.cp_ctx, 0, &cmd);
if (ret == 0) {
printf(SUB_2 "Command submission succeeded on enabled PD\n");
if (wait_for_command(OSDP_CMD_BUZZER, 3)) {
printf(SUB_2 "Command received by PD\n");
} else {
printf(SUB_2 "Command not received (PD may not be fully online yet)\n");
}
} else {
printf(SUB_2 "Command submission failed on enabled PD (PD not online yet)\n");
}
if (osdp_cp_disable_pd(g_test_ctx.cp_ctx, 0) != 0) {
printf(SUB_2 "Failed to disable PD\n");
return false;
}
reset_test_state();
ret = osdp_cp_submit_command(g_test_ctx.cp_ctx, 0, &cmd);
if (ret == 0) {
printf(SUB_2 "Command should fail on disabled PD\n");
return false;
}
if (osdp_cp_enable_pd(g_test_ctx.cp_ctx, 0) != 0) {
printf(SUB_2 "Warning: enable returned error (might already be enabled)\n");
}
if (wait_for_pd_online(5)) {
ret = osdp_cp_submit_command(g_test_ctx.cp_ctx, 0, &cmd);
printf(SUB_2 "Command on re-enabled PD: %s\n",
ret == 0 ? "SUCCESS" : "FAILED");
} else {
printf(SUB_2 "PD didn't come online within timeout, skipping command test\n");
}
return true;
}
static bool test_pd_edge_cases()
{
printf(SUB_2 "testing edge cases\n");
osdp_cp_enable_pd(g_test_ctx.cp_ctx, 0);
if (!wait_for_pd_online(5)) {
printf(SUB_2 "Failed to get PD online for edge test, proceeding anyway\n");
}
if (osdp_cp_disable_pd(g_test_ctx.cp_ctx, 0) != 0) {
printf(SUB_2 "First disable failed\n");
return false;
}
if (!wait_for_pd_state(PD_STATE_DISABLED, 3)) {
printf(SUB_2 "PD didn't reach disabled state within timeout\n");
return false;
}
if (osdp_cp_disable_pd(g_test_ctx.cp_ctx, 0) == 0) {
printf(SUB_2 "Disabling already disabled PD should return error\n");
return false;
}
if (osdp_cp_enable_pd(g_test_ctx.cp_ctx, 0) != 0) {
printf(SUB_2 "Enable failed\n");
return false;
}
if (!wait_for_pd_state(PD_STATE_ENABLED, 3)) {
printf(SUB_2 "PD didn't reach enabled state within timeout\n");
return false;
}
if (osdp_cp_enable_pd(g_test_ctx.cp_ctx, 0) == 0) {
printf(SUB_2 "Enabling already enabled PD should return error\n");
return false;
}
if (osdp_cp_disable_pd(g_test_ctx.cp_ctx, 99) == 0) {
printf(SUB_2 "Invalid PD index should fail\n");
return false;
}
if (osdp_cp_enable_pd(g_test_ctx.cp_ctx, 99) == 0) {
printf(SUB_2 "Invalid PD index should fail\n");
return false;
}
bool enabled = osdp_cp_is_pd_enabled(g_test_ctx.cp_ctx, 99);
if (enabled != true) {
printf(SUB_2 "Invalid PD index returns -1 (converted to true), got %s\n", enabled ? "true" : "false");
return false;
}
return true;
}
static bool test_multiple_pd_hotplug()
{
printf(SUB_2 "testing multiple PD hot-plug simulation\n");
osdp_cp_enable_pd(g_test_ctx.cp_ctx, 0);
if (!wait_for_pd_online(5)) {
printf(SUB_2 "Failed to get PD online for hotplug test, proceeding anyway\n");
}
uint8_t status_mask = 0;
osdp_get_status_mask(g_test_ctx.cp_ctx, &status_mask);
printf(SUB_2 "Initial status mask: 0x%02X\n", status_mask);
printf(SUB_2 "Simulating PD 0 unplug...\n");
if (osdp_cp_disable_pd(g_test_ctx.cp_ctx, 0) != 0) {
printf(SUB_2 "Failed to disable PD 0\n");
return false;
}
osdp_get_status_mask(g_test_ctx.cp_ctx, &status_mask);
printf(SUB_2 "After PD 0 unplug: 0x%02X\n", status_mask);
usleep(500 * 1000);
printf(SUB_2 "Simulating PD 0 plug-in...\n");
if (osdp_cp_enable_pd(g_test_ctx.cp_ctx, 0) != 0) {
printf(SUB_2 "Failed to enable PD 0\n");
return false;
}
usleep(1000 * 1000);
osdp_get_status_mask(g_test_ctx.cp_ctx, &status_mask);
printf(SUB_2 "After PD 0 plug-in: 0x%02X (may still be initializing)\n", status_mask);
return true;
}
static bool test_dynamic_pd_management()
{
printf(SUB_2 "testing dynamic PD management scenarios\n");
osdp_cp_enable_pd(g_test_ctx.cp_ctx, 0);
if (!osdp_cp_is_pd_enabled(g_test_ctx.cp_ctx, 0)) {
printf(SUB_2 "PD should be enabled but isn't\n");
return false;
}
if (!wait_for_pd_online(5)) {
printf(SUB_2 "PD didn't come online, proceeding with test anyway\n");
}
printf(SUB_2 "PD management scenario: disable -> attempt commands -> enable\n");
if (!osdp_cp_is_pd_enabled(g_test_ctx.cp_ctx, 0)) {
printf(SUB_2 "PD was disabled from previous test, enabling first\n");
osdp_cp_enable_pd(g_test_ctx.cp_ctx, 0);
if (!wait_for_pd_state(PD_STATE_ENABLED, 3)) {
printf(SUB_2 "Failed to enable PD for management test\n");
return false;
}
}
if (osdp_cp_disable_pd(g_test_ctx.cp_ctx, 0) != 0) {
printf(SUB_2 "Failed to disable PD\n");
return false;
}
if (!wait_for_pd_state(PD_STATE_DISABLED, 3)) {
printf(SUB_2 "PD didn't reach disabled state within timeout\n");
return false;
}
struct osdp_cmd cmd1 = { .id = OSDP_CMD_BUZZER, .buzzer = { .control_code = 1 } };
struct osdp_cmd cmd2 = { .id = OSDP_CMD_LED, .led = { .led_number = 0 } };
int ret1 = osdp_cp_submit_command(g_test_ctx.cp_ctx, 0, &cmd1);
int ret2 = osdp_cp_submit_command(g_test_ctx.cp_ctx, 0, &cmd2);
printf(SUB_2 "Commands on disabled PD: buzzer=%s, led=%s (both should fail)\n",
ret1 == 0 ? "SUCCESS" : "FAILED", ret2 == 0 ? "SUCCESS" : "FAILED");
if (ret1 == 0 || ret2 == 0) {
printf(SUB_2 "Commands should fail on disabled PD\n");
return false;
}
if (osdp_cp_enable_pd(g_test_ctx.cp_ctx, 0) != 0) {
printf(SUB_2 "Failed to re-enable PD\n");
return false;
}
printf(SUB_2 "PD re-enabled - will restart initialization sequence\n");
usleep(2000 * 1000);
bool enabled = osdp_cp_is_pd_enabled(g_test_ctx.cp_ctx, 0);
uint8_t status_mask = 0;
osdp_get_status_mask(g_test_ctx.cp_ctx, &status_mask);
printf(SUB_2 "Final state: enabled=%s, status_mask=0x%02X\n",
enabled == PD_STATE_ENABLED ? "YES" : "NO", status_mask);
return enabled == PD_STATE_ENABLED;
}
void run_hotplug_tests(struct test *t)
{
bool overall_result = true;
printf("\nBegin Hot-Plug Tests\n");
if (setup_test_environment(t) != 0) {
printf(SUB_1 "Failed to setup test environment\n");
TEST_REPORT(t, false);
return;
}
printf(SUB_1 "running hot-plug tests\n");
overall_result &= test_pd_disable_enable_basic();
overall_result &= test_pd_command_blocking();
overall_result &= test_pd_edge_cases();
overall_result &= test_multiple_pd_hotplug();
overall_result &= test_dynamic_pd_management();
teardown_test_environment();
printf(SUB_1 "Hot-plug tests %s\n", overall_result ? "succeeded" : "failed");
TEST_REPORT(t, overall_result);
}