#include <iostream>
#include <boost/asio/ip/multicast.hpp>
#include <boost/asio/placeholders.hpp>
#include <boost/bind.hpp>
#include <boost/algorithm/string/trim.hpp>
#include "api_config.h"
#include "cast.h"
#include "resolve_attempt_udp.h"
#include "resolver_impl.h"
#include "socket_utils.h"
using namespace lsl;
using namespace lslboost::asio;
resolve_attempt_udp::resolve_attempt_udp(io_context &io, const udp &protocol,
const std::vector<udp::endpoint> &targets, const std::string &query, result_container &results,
lslboost::mutex &results_mut, double cancel_after, cancellable_registry *registry)
: io_(io), results_(results), results_mut_(results_mut), cancel_after_(cancel_after),
cancelled_(false), targets_(targets), query_(query), unicast_socket_(io),
broadcast_socket_(io), multicast_socket_(io), recv_socket_(io), cancel_timer_(io) {
recv_socket_.open(protocol);
try {
bind_port_in_range(recv_socket_,protocol);
} catch(std::exception &e) {
std::cerr << "Could not bind to a port in the configured port range; using a randomly assigned one: " << e.what() << std::endl;
}
unicast_socket_.open(protocol);
try {
broadcast_socket_.open(protocol);
broadcast_socket_.set_option(socket_base::broadcast(true));
} catch(std::exception &e) {
std::cerr << "Cannot open UDP broadcast socket for resolves: " << e.what() << std::endl;
}
try {
multicast_socket_.open(protocol);
multicast_socket_.set_option(ip::multicast::hops(api_config::get_instance()->multicast_ttl()));
} catch(std::exception &e) {
std::cerr << "Cannot open UDP multicast socket for resolves: " << e.what() << std::endl;
}
query_id_ = to_string(lslboost::hash<std::string>()(query));
std::ostringstream os; os.precision(16);
os << "LSL:shortinfo\r\n";
os << query_ << "\r\n";
os << recv_socket_.local_endpoint().port() << " " << query_id_ << "\r\n";
query_msg_ = os.str();
if (registry)
register_at(registry);
}
resolve_attempt_udp::~resolve_attempt_udp() {
unregister_from_all();
}
void resolve_attempt_udp::begin() {
receive_next_result();
send_next_query(targets_.begin());
if (cancel_after_ != FOREVER) {
cancel_timer_.expires_after(timeout_sec(cancel_after_));
cancel_timer_.async_wait(lslboost::bind(&resolve_attempt_udp::handle_timeout,shared_from_this(),placeholders::error));
}
}
void resolve_attempt_udp::cancel() {
post(io_, lslboost::bind(&resolve_attempt_udp::do_cancel, shared_from_this()));
}
void resolve_attempt_udp::receive_next_result() {
recv_socket_.async_receive_from(buffer(resultbuf_),remote_endpoint_,
lslboost::bind(&resolve_attempt_udp::handle_receive_outcome,shared_from_this(),placeholders::error,placeholders::bytes_transferred));
}
void resolve_attempt_udp::handle_receive_outcome(error_code err, std::size_t len) {
if (!cancelled_ && err != error::operation_aborted && err != error::not_connected && err != error::not_socket) {
if (!err) {
try {
std::istringstream is(std::string(resultbuf_,len));
std::string returned_id; getline(is,returned_id); lslboost::trim(returned_id);
if (returned_id == query_id_) {
stream_info_impl info;
std::ostringstream os; os << is.rdbuf();
info.from_shortinfo_message(os.str());
std::string uid = info.uid();
{
lslboost::lock_guard<lslboost::mutex> lock(results_mut_);
if (results_.find(uid) == results_.end())
results_[uid] = std::make_pair(info,lsl_clock()); else
results_[uid].second = lsl_clock(); if (remote_endpoint_.address().is_v4()) {
if (results_[uid].first.v4address().empty())
results_[uid].first.v4address(remote_endpoint_.address().to_string());
} else {
if (results_[uid].first.v6address().empty())
results_[uid].first.v6address(remote_endpoint_.address().to_string());
}
}
}
} catch(std::exception &e) {
std::cerr << "resolve_attempt_udp: hiccup while processing the received data: " << e.what() << std::endl;
}
}
receive_next_result();
}
}
void resolve_attempt_udp::send_next_query(endpoint_list::const_iterator i) {
if (i != targets_.end() && !cancelled_) {
udp::endpoint ep(*i);
if (ep.protocol() == recv_socket_.local_endpoint().protocol()) {
udp::socket &sock =
(ep.address() == ip::address_v4::broadcast())
? broadcast_socket_
: (ep.address().is_multicast() ? multicast_socket_ : unicast_socket_);
sock.async_send_to(lslboost::asio::buffer(query_msg_), ep,
lslboost::bind(&resolve_attempt_udp::handle_send_outcome,shared_from_this(),++i,placeholders::error));
} else
send_next_query(++i);
}
}
void resolve_attempt_udp::handle_send_outcome(endpoint_list::const_iterator i, error_code err) {
if (!cancelled_ && err != error::operation_aborted && err != error::not_connected && err != error::not_socket)
send_next_query(i);
}
void resolve_attempt_udp::handle_timeout(error_code err) {
if (!err)
do_cancel();
}
void resolve_attempt_udp::do_cancel() {
try {
cancelled_ = true;
if (unicast_socket_.is_open())
unicast_socket_.close();
if (multicast_socket_.is_open())
multicast_socket_.close();
if (broadcast_socket_.is_open())
broadcast_socket_.close();
if (recv_socket_.is_open())
recv_socket_.close();
cancel_timer_.cancel();
} catch(std::exception &e) {
std::cerr << "Unexpected error while trying to cancel operations of resolve_attempt_udp: " << e.what() << std::endl;
}
}