#include <iostream>
#include <boost/bind.hpp>
#include "cast.h"
#include "inlet_connection.h"
#include "api_config.h"
using namespace lsl;
using namespace lslboost::asio;
inlet_connection::inlet_connection(const stream_info_impl &info, bool recover):
type_info_(info), host_info_(info), tcp_protocol_(tcp::v4()), udp_protocol_(udp::v4()),
recovery_enabled_(recover), lost_(false), shutdown_(false),
last_receive_time_(lsl_clock()), active_transmissions_(0) {
if (!host_info_.v4address().empty() || !host_info_.v6address().empty()) {
if (type_info_.version()/100 > api_config::get_instance()->use_protocol_version()/100)
throw std::runtime_error((std::string("The received stream (")+=host_info_.name()) += ") uses a newer protocol version than this inlet. Please update.");
if (api_config::get_instance()->allow_ipv6()) {
if (host_info_.v4address().empty() || !host_info_.v4data_port() || !host_info_.v4service_port()) {
tcp_protocol_ = tcp::v6();
udp_protocol_ = udp::v6();
} else {
tcp_protocol_ = tcp::v4();
udp_protocol_ = udp::v4();
}
} else {
tcp_protocol_ = api_config::get_instance()->allow_ipv4() ? tcp::v4() : tcp::v6();
udp_protocol_ = api_config::get_instance()->allow_ipv4() ? udp::v4() : udp::v6();
}
if (recovery_enabled_ && type_info_.source_id().empty()) {
std::clog << "Note: The stream named '" << host_info_.name() << "' could not be recovered automatically if its provider crashed because it does not specify a unique data source ID." << std::endl;
recovery_enabled_ = false;
}
} else {
if (type_info_.name().empty() && type_info_.type().empty() && type_info_.source_id().empty())
throw std::invalid_argument("When creating an inlet with a constructed (instead of resolved) stream_info, you must assign at least the name, type or source_id of the desired stream.");
if (type_info_.channel_count() == 0)
throw std::invalid_argument("When creating an inlet with a constructed (instead of resolved) stream_info, you must assign a nonzero channel count.");
if (type_info_.channel_format() == cft_undefined)
throw std::invalid_argument("When creating an inlet with a constructed (instead of resolved) stream_info, you must assign a channel format.");
tcp_protocol_ = api_config::get_instance()->allow_ipv4() ? tcp::v4() : tcp::v6();
udp_protocol_ = api_config::get_instance()->allow_ipv4() ? udp::v4() : udp::v6();
host_info_.v4address("127.0.0.1");
host_info_.v6address("::1");
host_info_.v4data_port(49999);
host_info_.v4service_port(49999);
host_info_.v6data_port(49999);
host_info_.v6service_port(49999);
recovery_enabled_ = true;
}
}
void inlet_connection::engage() {
if (recovery_enabled_)
watchdog_thread_ = lslboost::thread(&inlet_connection::watchdog_thread,this);
}
void inlet_connection::disengage() {
{
lslboost::lock_guard<lslboost::mutex> lock(shutdown_mut_);
shutdown_ = true;
}
shutdown_cond_.notify_all();
resolver_.cancel();
cancel_and_shutdown();
if (recovery_enabled_)
watchdog_thread_.join();
}
tcp::endpoint inlet_connection::get_tcp_endpoint() {
lslboost::shared_lock<lslboost::shared_mutex> lock(host_info_mut_);
if(tcp_protocol_ == tcp::v4()) {
std::string address = host_info_.v4address();
uint16_t port = host_info_.v4data_port();
return tcp::endpoint(ip::make_address(address), port);
} else {
std::string address = host_info_.v6address();
std::string port = to_string(host_info_.v6data_port());
io_context io;
ip::tcp::resolver resolver(io);
ip::tcp::resolver::results_type res = resolver.resolve(address, port);
if(res.size() == 0) {
throw lost_error("Unable to resolve tcp stream at address: " + address + ", port: " + port);
}
return *res.begin();
}
}
udp::endpoint inlet_connection::get_udp_endpoint() {
lslboost::shared_lock<lslboost::shared_mutex> lock(host_info_mut_);
if(udp_protocol_ == udp::v4()) {
std::string address = host_info_.v4address();
uint16_t port = host_info_.v4service_port();
return udp::endpoint(ip::make_address(address), port);
} else {
std::string address = host_info_.v6address();
std::string port = to_string(host_info_.v6service_port());
io_context io;
ip::udp::resolver resolver(io);
ip::udp::resolver::results_type res = resolver.resolve(address, port);
if(res.size() == 0) {
throw lost_error("Unable to resolve udp stream at address: " + address + ", port: " + port);
}
return *res.begin();
}
}
std::string inlet_connection::get_hostname() {
lslboost::shared_lock<lslboost::shared_mutex> lock(host_info_mut_);
return host_info_.hostname();
}
std::string inlet_connection::current_uid() {
lslboost::shared_lock<lslboost::shared_mutex> lock(host_info_mut_);
return host_info_.uid();
}
double inlet_connection::current_srate() {
lslboost::shared_lock<lslboost::shared_mutex> lock(host_info_mut_);
return host_info_.nominal_srate();
}
void inlet_connection::try_recover() {
if (recovery_enabled_) {
try {
lslboost::lock_guard<lslboost::mutex> lock(recovery_mut_);
std::ostringstream query;
{
lslboost::shared_lock<lslboost::shared_mutex> lock(host_info_mut_);
const char *channel_format_strings[] = {"undefined","float32","double64","string","int32","int16","int8","int64"};
query << "channel_count='" << host_info_.channel_count() << "'";
if (!host_info_.name().empty())
query << " and name='" << host_info_.name() << "'";
if (!host_info_.type().empty())
query << " and type='" << host_info_.type() << "'";
if (!host_info_.source_id().empty())
query << " and source_id='" << host_info_.source_id() << "'";
query << " and channel_format='" << channel_format_strings[host_info_.channel_format()] << "'";
}
for (int attempt=0;;attempt++) {
std::vector<stream_info_impl> infos = resolver_.resolve_oneshot(query.str(),1,FOREVER,attempt==0 ? 1.0 : 5.0);
if (!infos.empty()) {
lslboost::unique_lock<lslboost::shared_mutex> lock(host_info_mut_);
for (std::size_t k=0;k<infos.size();k++)
if (infos[k].uid() == host_info_.uid())
return; if (infos.size() == 1) {
host_info_ = infos[0];
cancel_all_registered();
lslboost::lock_guard<lslboost::mutex> lock(onrecover_mut_);
for(std::map<void*,lslboost::function<void()> >::iterator i=onrecover_.begin(),e=onrecover_.end();i!=e;i++)
(i->second)();
} else {
std::clog << "Found multiple streams with name='" << host_info_.name() << "' and source_id='" << host_info_.source_id() << "'. Cannot recover unless all but one are closed." << std::endl;
continue;
}
} else {
}
break;
}
} catch(std::exception &e) {
std::cerr << "A recovery attempt encountered an unexpected error: " << e.what() << std::endl;
}
}
}
void inlet_connection::watchdog_thread() {
while(!lost_ && !shutdown_) {
try {
{
lslboost::unique_lock<lslboost::mutex> lock(client_status_mut_);
if ((active_transmissions_ > 0) && (lsl_clock() - last_receive_time_ > api_config::get_instance()->watchdog_time_threshold())) {
lock.unlock();
try_recover();
}
}
{
lslboost::unique_lock<lslboost::mutex> lock(shutdown_mut_);
shutdown_cond_.wait_for(lock,lslboost::chrono::duration<double>(api_config::get_instance()->watchdog_check_interval()), lslboost::bind(&inlet_connection::shutdown,this));
}
} catch(std::exception &e) {
std::cerr << "Unexpected hiccup in the watchdog thread: " << e.what() << std::endl;
}
}
}
void inlet_connection::try_recover_from_error() {
if (!shutdown_) {
if (!recovery_enabled_) {
lost_ = true;
try {
lslboost::lock_guard<lslboost::mutex> lock(client_status_mut_);
for(std::map<void*,lslboost::condition_variable*>::iterator i=onlost_.begin(),e=onlost_.end();i!=e;i++)
i->second->notify_all();
} catch(std::exception &e) {
std::cerr << "Unexpected problem while trying to issue a connection loss notification: " << e.what() << std::endl;
}
throw lost_error("The stream read by this inlet has been lost. To recover, you need to re-resolve the source and re-create the inlet.");
} else
try_recover();
}
}
void inlet_connection::acquire_watchdog() {
lslboost::lock_guard<lslboost::mutex> lock(client_status_mut_);
active_transmissions_++;
}
void inlet_connection::release_watchdog() {
lslboost::lock_guard<lslboost::mutex> lock(client_status_mut_);
active_transmissions_--;
}
void inlet_connection::update_receive_time(double t) {
lslboost::lock_guard<lslboost::mutex> lock(client_status_mut_);
last_receive_time_ = t;
}
void inlet_connection::register_onlost(void *id, lslboost::condition_variable *cond) {
lslboost::lock_guard<lslboost::mutex> lock(client_status_mut_);
onlost_[id] = cond;
}
void inlet_connection::unregister_onlost(void *id) {
lslboost::lock_guard<lslboost::mutex> lock(client_status_mut_);
onlost_.erase(id);
}
void inlet_connection::register_onrecover(void *id, const lslboost::function<void()> &func) {
lslboost::lock_guard<lslboost::mutex> lock(onrecover_mut_);
onrecover_[id] = func;
}
void inlet_connection::unregister_onrecover(void *id) {
lslboost::lock_guard<lslboost::mutex> lock(onrecover_mut_);
onrecover_.erase(id);
}