#include <iostream>
#include <boost/asio/ip/udp.hpp>
#include <boost/asio/ip/multicast.hpp>
#include <boost/asio/placeholders.hpp>
#include <boost/bind.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <boost/thread/thread_only.hpp>
#include "udp_server.h"
#include "socket_utils.h"
#include "stream_info_impl.h"
using namespace lsl;
using namespace lslboost::asio;
udp_server::udp_server(const stream_info_impl_p &info, io_context &io, udp protocol): info_(info), io_(io), socket_(new udp::socket(io)), time_services_enabled_(true) {
socket_->open(protocol);
uint16_t port = bind_port_in_range(*socket_,protocol);
if (protocol == udp::v4())
info_->v4service_port(port);
else
info_->v6service_port(port);
}
udp_server::udp_server(const stream_info_impl_p &info, io_context &io, const std::string &address, uint16_t port, int ttl, const std::string &listen_address): info_(info), io_(io), socket_(new udp::socket(io)), time_services_enabled_(false) {
ip::address addr = ip::make_address(address);
bool is_broadcast = addr == ip::address_v4::broadcast();
udp::endpoint listen_endpoint;
if (listen_address.empty()) {
if (addr.is_v4())
listen_endpoint = udp::endpoint(udp::v4(), port);
else
listen_endpoint = udp::endpoint(udp::v6(), port);
}
else {
ip::address listen_addr = ip::make_address(listen_address);
listen_endpoint = udp::endpoint(listen_addr, (uint16_t)port);
}
socket_->open(listen_endpoint.protocol());
socket_->set_option(udp::socket::reuse_address(true));
if (addr.is_multicast() && !is_broadcast)
socket_->set_option(ip::multicast::hops(ttl));
socket_->bind(listen_endpoint);
if (addr.is_multicast() && !is_broadcast) {
if (addr.is_v4())
socket_->set_option(ip::multicast::join_group(addr.to_v4(),listen_endpoint.address().to_v4()));
else
socket_->set_option(ip::multicast::join_group(addr));
}
}
void udp_server::begin_serving() {
shortinfo_msg_ = info_->to_shortinfo_message();
request_next_packet();
}
void close_if_open(udp_socket_p sock) {
try {
if (sock->is_open())
sock->close();
} catch(std::exception &e) {
std::cerr << "Error during close_if_open (thread id: " << lslboost::this_thread::get_id() << "): " << e.what() << std::endl;
}
}
void udp_server::end_serving() {
post(io_, lslboost::bind(&close_if_open, socket_));
}
void udp_server::request_next_packet() {
socket_->async_receive_from(lslboost::asio::buffer(buffer_), remote_endpoint_,
lslboost::bind(&udp_server::handle_receive_outcome, shared_from_this(), placeholders::error, placeholders::bytes_transferred));
}
void udp_server::handle_receive_outcome(error_code err, std::size_t len) {
if (err != error::operation_aborted && err != error::shut_down) {
try {
if (!err) {
double t1 = time_services_enabled_ ? lsl_clock() : 0.0;
std::istringstream request_stream(std::string(buffer_,buffer_+len));
std::string method; getline(request_stream,method); lslboost::trim(method);
if (method == "LSL:shortinfo") {
std::string query; getline(request_stream,query); lslboost::trim(query);
uint16_t return_port; request_stream >> return_port;
std::string query_id; request_stream >> query_id;
if (info_->matches_query(query)) {
udp::endpoint return_endpoint(remote_endpoint_.address(), return_port);
string_p replymsg(new std::string((query_id += "\r\n") += shortinfo_msg_));
socket_->async_send_to(lslboost::asio::buffer(*replymsg), return_endpoint,
lslboost::bind(&udp_server::handle_send_outcome,shared_from_this(),replymsg,placeholders::error));
return;
}
} else {
if (time_services_enabled_ && method == "LSL:timedata") {
int wave_id; request_stream >> wave_id;
double t0; request_stream >> t0;
std::ostringstream reply; reply.precision(16); reply << " " << wave_id << " " << t0 << " " << t1 << " " << lsl_clock();
string_p replymsg(new std::string(reply.str()));
socket_->async_send_to(lslboost::asio::buffer(*replymsg), remote_endpoint_,
lslboost::bind(&udp_server::handle_send_outcome,shared_from_this(),replymsg,placeholders::error));
return;
}
}
}
} catch(std::exception &e) {
std::cerr << "udp_server: hiccup during request processing: " << e.what() << std::endl;
}
request_next_packet();
}
}
void udp_server::handle_send_outcome(string_p , error_code err) {
if (err != error::operation_aborted && err != error::shut_down)
request_next_packet();
}