#include "DeviceMonitor.hpp"
#include "protocol/Protocol.hpp"
#include "property/InternalProperty.hpp"
namespace libobsensor {
const uint16_t MAX_RECV_DATA_SIZE = 1024;
DeviceMonitor::DeviceMonitor(IDevice *owner, std::shared_ptr<ISourcePort> dataPort)
: DeviceComponentBase(owner),
cbIdCounter_(0),
heartbeatAndFetchStateThreadStarted_(false),
heartbeatEnabled_(false),
heartbeatPaused_(false),
hbRecvData_(MAX_RECV_DATA_SIZE),
hbSendData_(MAX_RECV_DATA_SIZE) {
vendorDataPort_ = std::dynamic_pointer_cast<IVendorDataPort>(dataPort);
if(!vendorDataPort_) {
throw std::runtime_error("DeviceMonitor: data port must be a vendor data port!");
}
auto activityRecorder = owner->getComponentT<IDeviceActivityRecorder>(OB_DEV_COMPONENT_DEVICE_ACTIVITY_RECORDER, false);
if(activityRecorder) {
activityRecorder_ = activityRecorder.get();
}
}
DeviceMonitor::~DeviceMonitor() noexcept {
if(heartbeatEnabled_ && !heartbeatPaused_) {
TRY_EXECUTE(disableHeartbeat());
}
if(heartbeatAndFetchStateThreadStarted_) {
TRY_EXECUTE(stop());
}
}
void DeviceMonitor::start() {
if(heartbeatAndFetchStateThreadStarted_) {
LOG_DEBUG("Heartbeat and fetch state thread already started!");
}
heartbeatAndFetchStateThreadStarted_ = true;
heartbeatAndFetchStateThread_ = std::thread([this]() {
const uint32_t HEARTBEAT_INTERVAL_MS = 3000;
while(heartbeatAndFetchStateThreadStarted_) {
std::unique_lock<std::mutex> lock(commMutex_);
heartbeatAndFetchStateThreadCv_.wait_for(lock, std::chrono::milliseconds(HEARTBEAT_INTERVAL_MS),
[this]() { return !heartbeatAndFetchStateThreadStarted_; });
if(!heartbeatAndFetchStateThreadStarted_) {
break;
}
heartbeatAndFetchState();
}
});
}
void DeviceMonitor::stop() {
if(!heartbeatAndFetchStateThreadStarted_) {
LOG_DEBUG("Heartbeat and fetch state thread not started!");
return;
}
heartbeatAndFetchStateThreadStarted_ = false;
heartbeatAndFetchStateThreadCv_.notify_one();
heartbeatAndFetchStateThread_.join();
LOG_DEBUG("Heartbeat and fetch state thread stopped!");
}
void DeviceMonitor::heartbeatAndFetchState() {
bool emitNextHeartBeatImmediately = false;
do {
auto req = protocol::initHeartbeatAndStateReq(hbSendData_.data());
uint16_t respDataSize = 1024; auto res = protocol::execute(vendorDataPort_, hbSendData_.data(), sizeof(req), hbRecvData_.data(), &respDataSize);
if(!protocol::checkStatus(res, false)) {
utils::sleepMs(50);
if(!heartbeatAndFetchStateThreadStarted_) {
break;
}
continue;
}
protocol::HeartbeatAndStateResp *resp;
BEGIN_TRY_EXECUTE({ resp = protocol::parseHeartbeatAndStateResp(hbRecvData_.data(), respDataSize); })
CATCH_EXCEPTION_AND_EXECUTE({ continue; })
emitNextHeartBeatImmediately = (bool)(resp->state >> 63);
devState_ = (OBDeviceState)(resp->state & 0x7FFFFFFFFFFFFFFF);
auto msgSize = resp->header.sizeInHalfWords * 2 - 10;
if(0 != devState_ || msgSize) {
auto msg = std::string(resp->message, msgSize);
LOG_INFO("Firmware State/Log ({0}):\n{1}", devState_, msg);
std::lock_guard<std::mutex> lock(stateChangedCallbacksMutex_);
for(auto &callback: stateChangedCallbacks_) {
callback.second(devState_, msg);
}
}
if(activityRecorder_) {
activityRecorder_->touch(DeviceActivity::Command);
}
} while(emitNextHeartBeatImmediately);
}
OBDeviceState DeviceMonitor::getCurrentDeviceState() const {
if(!heartbeatAndFetchStateThreadStarted_) {
LOG_WARN("Heartbeat and fetch state thread not started, the state may expired!");
}
return devState_;
}
int DeviceMonitor::registerStateChangedCallback(DeviceStateChangedCallback callback) {
std::lock_guard<std::mutex> lock(stateChangedCallbacksMutex_);
auto callbackId = cbIdCounter_++;
stateChangedCallbacks_[callbackId] = callback;
if(!heartbeatAndFetchStateThreadStarted_) {
start();
}
return callbackId;
}
void DeviceMonitor::unregisterStateChangedCallback(int callbackId) {
std::lock_guard<std::mutex> lock(stateChangedCallbacksMutex_);
stateChangedCallbacks_.erase(callbackId);
if(stateChangedCallbacks_.empty() && !heartbeatPaused_ && heartbeatAndFetchStateThreadStarted_) {
stop();
}
}
void DeviceMonitor::enableHeartbeat() {
if(heartbeatEnabled_) {
LOG_DEBUG("Heartbeat already enabled!");
return;
}
auto owner = getOwner();
auto propServer = owner->getComponentT<IPropertyServer>(OB_DEV_COMPONENT_PROPERTY_SERVER);
if(propServer->isPropertySupported(OB_PROP_DEVICE_LOG_SEVERITY_LEVEL_INT, PROP_OP_WRITE, PROP_ACCESS_INTERNAL)) {
propServer->setPropertyValueT<int32_t>(OB_PROP_DEVICE_LOG_SEVERITY_LEVEL_INT, OB_LOG_SEVERITY_DEBUG);
}
OBPropertyValue value;
value.intValue = 1;
auto propAccessor = owner->getComponentT<IBasicPropertyAccessor>(OB_DEV_COMPONENT_MAIN_PROPERTY_ACCESSOR);
propAccessor->setPropertyValue(OB_PROP_HEARTBEAT_BOOL, value);
heartbeatEnabled_ = true;
heartbeatPaused_ = false;
if(!heartbeatAndFetchStateThreadStarted_) {
start();
}
LOG_DEBUG("Heartbeat enabled!");
}
void DeviceMonitor::disableHeartbeat() {
if(!heartbeatEnabled_) {
LOG_DEBUG("Heartbeat already disabled!");
return;
}
auto owner = getOwner();
OBPropertyValue value;
value.intValue = 0;
auto propAccessor = owner->getComponentT<IBasicPropertyAccessor>(OB_DEV_COMPONENT_MAIN_PROPERTY_ACCESSOR);
propAccessor->setPropertyValue(OB_PROP_HEARTBEAT_BOOL, value);
heartbeatEnabled_ = false;
heartbeatPaused_ = false;
if(heartbeatAndFetchStateThreadStarted_) {
stop();
}
LOG_DEBUG("Heartbeat disabled!");
}
bool DeviceMonitor::isHeartbeatEnabled() const {
return heartbeatEnabled_;
}
void DeviceMonitor::pauseHeartbeat() {
if(heartbeatPaused_) {
LOG_DEBUG("Heartbeat already paused!");
return;
}
auto owner = getOwner();
OBPropertyValue value;
value.intValue = 0;
auto propAccessor = owner->getComponentT<IBasicPropertyAccessor>(OB_DEV_COMPONENT_MAIN_PROPERTY_ACCESSOR);
propAccessor->setPropertyValue(OB_PROP_HEARTBEAT_BOOL, value);
if(heartbeatAndFetchStateThreadStarted_) {
stop();
}
heartbeatPaused_ = true;
LOG_DEBUG("Heartbeat paused!");
}
void DeviceMonitor::resumeHeartbeat() {
if(!heartbeatPaused_) {
LOG_DEBUG("Heartbeat already resumed!");
return;
}
if(heartbeatEnabled_) {
auto owner = getOwner();
OBPropertyValue value;
value.intValue = 1;
auto propAccessor = owner->getComponentT<IBasicPropertyAccessor>(OB_DEV_COMPONENT_MAIN_PROPERTY_ACCESSOR);
propAccessor->setPropertyValue(OB_PROP_HEARTBEAT_BOOL, value);
}
if(heartbeatEnabled_ || !stateChangedCallbacks_.empty()) {
start();
}
heartbeatPaused_ = false;
LOG_DEBUG("Heartbeat resumed!");
}
void DeviceMonitor::sendAndReceiveData(const uint8_t *sendData, uint32_t sendDataSize, uint8_t *receiveData, uint32_t *receiveDataSize) {
std::lock_guard<std::mutex> lock(commMutex_);
auto recvLen = vendorDataPort_->sendAndReceive(sendData, sendDataSize, receiveData, *receiveDataSize);
*receiveDataSize = recvLen;
if(activityRecorder_) {
activityRecorder_->touch(DeviceActivity::Command);
}
}
}