#include <iostream>
#include <boost/asio/ip/udp.hpp>
#include <boost/asio/placeholders.hpp>
#include <boost/bind.hpp>
#include <boost/thread/thread_only.hpp>
#include "api_config.h"
#include "cast.h"
#include "resolve_attempt_udp.h"
#include "resolver_impl.h"
#include "socket_utils.h"
using namespace lsl;
using namespace lslboost::asio;
resolver_impl::resolver_impl(): cfg_(api_config::get_instance()), cancelled_(false), expired_(false), forget_after_(FOREVER), fast_mode_(true),
io_(io_context_p(new io_context())), resolve_timeout_expired_(*io_), wave_timer_(*io_), unicast_timer_(*io_)
{
std::vector<std::string> mcast_addrs = cfg_->multicast_addresses();
uint16_t mcast_port = cfg_->multicast_port();
for (std::size_t k=0;k<mcast_addrs.size();k++) {
try {
mcast_endpoints_.push_back(udp::endpoint(ip::make_address(mcast_addrs[k]),(uint16_t)mcast_port));
}
catch(std::exception &) { }
}
std::vector<std::string> peers = cfg_->known_peers();
udp::resolver udp_resolver(*io_);
for (std::size_t k=0;k<peers.size();k++) {
try {
udp::resolver::results_type res = udp_resolver.resolve(peers[k], to_string(cfg_->base_port()));
for (udp::resolver::results_type::iterator i=res.begin(); i != res.end(); i++) {
for (int p=cfg_->base_port(); p<cfg_->base_port()+cfg_->port_range(); p++)
ucast_endpoints_.push_back(udp::endpoint(i->endpoint().address(),p));
}
} catch(std::exception &) { }
}
if (cfg_->allow_ipv6()) {
udp_protocols_.push_back(udp::v6());
tcp_protocols_.push_back(tcp::v6());
}
if (cfg_->allow_ipv4()) {
udp_protocols_.push_back(udp::v4());
tcp_protocols_.push_back(tcp::v4());
}
}
std::vector<stream_info_impl> resolver_impl::resolve_oneshot(const std::string &query, int minimum, double timeout, double minimum_time) {
io_->restart();
query_ = query;
minimum_ = minimum;
wait_until_ = lsl_clock() + minimum_time;
results_.clear();
forget_after_ = FOREVER;
fast_mode_ = true;
expired_ = false;
if (timeout != FOREVER) {
resolve_timeout_expired_.expires_after(timeout_sec(timeout));
resolve_timeout_expired_.async_wait(lslboost::bind(&resolver_impl::resolve_timeout_expired,this,placeholders::error));
}
next_resolve_wave();
if (!cancelled_) {
io_->run();
std::vector<stream_info_impl> output;
for(result_container::iterator i=results_.begin(); i!= results_.end();i++)
output.push_back(i->second.first);
return output;
} else
return std::vector<stream_info_impl>();
}
void resolver_impl::resolve_continuous(const std::string &query, double forget_after) {
io_->restart();
query_ = query;
minimum_ = 0;
wait_until_ = 0;
results_.clear();
forget_after_ = forget_after;
fast_mode_ = false;
expired_ = false;
next_resolve_wave();
background_io_.reset(new lslboost::thread(lslboost::bind(&io_context::run,io_)));
}
std::vector<stream_info_impl> resolver_impl::results() {
std::vector<stream_info_impl> output;
lslboost::lock_guard<lslboost::mutex> lock(results_mut_);
double expired_before = lsl_clock() - forget_after_;
for(result_container::iterator i=results_.begin(); i!=results_.end();) {
if (i->second.second < expired_before) {
result_container::iterator tmp = i++;
results_.erase(tmp);
} else {
output.push_back(i->second.first);
i++;
}
}
return output;
}
void resolver_impl::next_resolve_wave() {
std::size_t num_results = 0;
{
lslboost::lock_guard<lslboost::mutex> lock(results_mut_);
num_results = results_.size();
}
if (cancelled_ || expired_ || (minimum_ && (num_results >= (std::size_t)minimum_) && lsl_clock() >= wait_until_)) {
cancel_ongoing_resolve();
} else {
udp_multicast_burst();
if (!ucast_endpoints_.empty()) {
unicast_timer_.expires_after(timeout_sec(cfg_->multicast_min_rtt()));
unicast_timer_.async_wait(lslboost::bind(&resolver_impl::udp_unicast_burst,this,placeholders::error));
wave_timer_.expires_after(timeout_sec((fast_mode_?0:cfg_->continuous_resolve_interval())+(cfg_->multicast_min_rtt()+cfg_->unicast_min_rtt())));
wave_timer_.async_wait(lslboost::bind(&resolver_impl::wave_timeout_expired,this,placeholders::error));
} else {
wave_timer_.expires_after(timeout_sec((fast_mode_?0:cfg_->continuous_resolve_interval())+cfg_->multicast_min_rtt()));
wave_timer_.async_wait(lslboost::bind(&resolver_impl::wave_timeout_expired,this,placeholders::error));
}
}
}
void resolver_impl::udp_multicast_burst() {
for (std::size_t k=0,failures=0;k<udp_protocols_.size();k++) {
try {
resolve_attempt_udp_p attempt(new resolve_attempt_udp(*io_,udp_protocols_[k],mcast_endpoints_,query_,results_,results_mut_,cfg_->multicast_max_rtt(),this));
attempt->begin();
} catch(std::exception &e) {
if (++failures == udp_protocols_.size())
std::cerr << "Could not start a multicast resolve attempt for any of the allowed protocol stacks: " << e.what() << std::endl;
}
}
}
void resolver_impl::udp_unicast_burst(error_code err) {
if (err != error::operation_aborted) {
for (std::size_t k=0,failures=0;k<udp_protocols_.size();k++) {
try {
resolve_attempt_udp_p attempt(new resolve_attempt_udp(*io_,udp_protocols_[k],ucast_endpoints_,query_,results_,results_mut_,cfg_->unicast_max_rtt(),this));
attempt->begin();
} catch(std::exception &e) {
if (++failures == udp_protocols_.size())
std::cerr << "Could not start a unicast resolve attempt for any of the allowed protocol stacks: " << e.what() << std::endl;
}
}
}
}
void resolver_impl::resolve_timeout_expired(error_code err) {
if (err != error::operation_aborted)
cancel_ongoing_resolve();
}
void resolver_impl::wave_timeout_expired(error_code err) {
if (err != error::operation_aborted)
next_resolve_wave();
}
void resolver_impl::cancel() {
cancelled_ = true;
cancel_ongoing_resolve();
}
void resolver_impl::cancel_ongoing_resolve() {
expired_ = true;
post(*io_, lslboost::bind(&steady_timer::cancel, &wave_timer_));
post(*io_, lslboost::bind(&steady_timer::cancel, &unicast_timer_));
post(*io_, lslboost::bind(&steady_timer::cancel, &resolve_timeout_expired_));
cancel_all_registered();
}
resolver_impl::~resolver_impl() {
try {
if (background_io_) {
cancel();
background_io_->join();
}
}
catch(std::exception &e) {
std::cerr << "Error during destruction of a resolver_impl: " << e.what() << std::endl;
}
catch(...) {
std::cerr << "Severe error during destruction of a resolver_impl." << std::endl;
}
}